Skip to main content

fxfs/object_store/
data_object_handle.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::log::*;
7use crate::lsm_tree::Query;
8use crate::lsm_tree::types::{ItemRef, LayerIterator};
9use crate::object_handle::{
10    ObjectHandle, ObjectProperties, ReadObjectHandle, WriteBytes, WriteObjectHandle,
11};
12use crate::object_store::extent_record::{ExtentKey, ExtentMode, ExtentValue};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::object_record::{
15    AttributeKey, FsverityMetadata, ObjectAttributes, ObjectItem, ObjectKey, ObjectKeyData,
16    ObjectKind, ObjectValue, Timestamp,
17};
18use crate::object_store::store_object_handle::{MaybeChecksums, NeedsTrim};
19use crate::object_store::transaction::{
20    self, AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Operation, Options,
21    Transaction, lock_keys,
22};
23use crate::object_store::{
24    DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions, HandleOwner,
25    RootDigest, StoreObjectHandle, TRANSACTION_MUTATION_THRESHOLD, TrimMode, TrimResult,
26};
27use crate::range::RangeExt;
28use crate::round::{round_down, round_up};
29use anyhow::{Context, Error, anyhow, bail, ensure};
30use async_trait::async_trait;
31use fidl_fuchsia_io as fio;
32use fsverity_merkle::{
33    FsVerityDescriptor, FsVerityDescriptorRaw, FsVerityHasher, FsVerityHasherOptions, MerkleTree,
34    MerkleTreeBuilder,
35};
36use fuchsia_sync::Mutex;
37use futures::TryStreamExt;
38use futures::stream::FuturesUnordered;
39use fxfs_trace::trace;
40use std::cmp::min;
41use std::ops::{Deref, DerefMut, Range};
42use std::sync::Arc;
43use std::sync::atomic::{self, AtomicU64, Ordering};
44use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
45
46mod allocated_ranges;
47pub use allocated_ranges::{AllocatedRanges, RangeType};
48
49/// How much data each transaction will cover when writing an attribute across batches. Pulled from
50/// `FLUSH_BATCH_SIZE` in paged_object_handle.rs.
51pub const WRITE_ATTR_BATCH_SIZE: usize = 524_288;
52
53/// DataObjectHandle is a typed handle for file-like objects that store data in the default data
54/// attribute. In addition to traditional files, this means things like the journal, superblocks,
55/// and layer files.
56///
57/// It caches the content size of the data attribute it was configured for, and has helpers for
58/// complex extent manipulation, as well as implementations of ReadObjectHandle and
59/// WriteObjectHandle.
60pub struct DataObjectHandle<S: HandleOwner> {
61    handle: StoreObjectHandle<S>,
62    attribute_id: u64,
63    content_size: AtomicU64,
64    fsverity_state: Mutex<FsverityState>,
65    overwrite_ranges: AllocatedRanges,
66}
67
68/// Represents the mapping of a file's contents to the physical storage backing it.
69#[derive(Debug, Clone)]
70pub struct FileExtent {
71    logical_offset: u64,
72    device_range: Range<u64>,
73}
74
75impl FileExtent {
76    pub fn new(logical_offset: u64, device_range: Range<u64>) -> Result<Self, Error> {
77        // Ensure `device_range` is valid.
78        let length = device_range.length()?;
79        // Ensure no overflow when we calculate the end of the logical range.
80        let _ = logical_offset.checked_add(length).ok_or(FxfsError::OutOfRange)?;
81        Ok(Self { logical_offset, device_range })
82    }
83}
84
85impl FileExtent {
86    pub fn length(&self) -> u64 {
87        // SAFETY: We verified that the device_range's length is valid in Self::new.
88        unsafe { self.device_range.unchecked_length() }
89    }
90
91    pub fn logical_offset(&self) -> u64 {
92        self.logical_offset
93    }
94
95    pub fn logical_range(&self) -> Range<u64> {
96        // SAFETY: We verified logical_offset plus device_range length won't overflow in Self::new.
97        unsafe { self.logical_offset..self.logical_offset.unchecked_add(self.length()) }
98    }
99
100    pub fn device_range(&self) -> &Range<u64> {
101        &self.device_range
102    }
103}
104
105#[derive(Debug)]
106pub enum FsverityState {
107    None,
108    Started,
109    Pending(FsverityStateInner),
110    Some(FsverityStateInner),
111}
112
113#[derive(Debug)]
114pub struct FsverityStateInner {
115    root_digest: RootDigest,
116    salt: Vec<u8>,
117    // TODO(b/309656632): This should store the entire merkle tree and not just the leaf nodes.
118    // Potentially store a pager-backed vmo instead of passing around a boxed array.
119    merkle_tree: Box<[u8]>,
120}
121
122#[derive(Debug, Default)]
123pub struct OverwriteOptions {
124    // If false, then all the extents for the overwrite range must have been preallocated using
125    // preallocate_range or from existing writes.
126    pub allow_allocations: bool,
127    pub barrier_on_first_write: bool,
128}
129
130impl FsverityStateInner {
131    pub fn new(root_digest: RootDigest, salt: Vec<u8>, merkle_tree: Box<[u8]>) -> Self {
132        FsverityStateInner { root_digest, salt, merkle_tree }
133    }
134
135    fn get_hasher_for_block_size(&self, block_size: usize) -> FsVerityHasher {
136        match self.root_digest {
137            RootDigest::Sha256(_) => {
138                FsVerityHasher::Sha256(FsVerityHasherOptions::new(self.salt.clone(), block_size))
139            }
140            RootDigest::Sha512(_) => {
141                FsVerityHasher::Sha512(FsVerityHasherOptions::new(self.salt.clone(), block_size))
142            }
143        }
144    }
145
146    fn from_bytes(data: &[u8], block_size: usize) -> Result<(Self, FsVerityHasher), Error> {
147        let descriptor = FsVerityDescriptor::from_bytes(&data, block_size)
148            .map_err(|e| anyhow!(FxfsError::IntegrityError).context(e))?;
149
150        let root_digest = match descriptor.digest_algorithm() {
151            fio::HashAlgorithm::Sha256 => {
152                RootDigest::Sha256(descriptor.root_digest().try_into().unwrap())
153            }
154            fio::HashAlgorithm::Sha512 => RootDigest::Sha512(descriptor.root_digest().to_vec()),
155            _ => return Err(anyhow!(FxfsError::NotSupported).context("Unsupported hash algorithm")),
156        };
157        let hasher = descriptor.hasher();
158        let leaves =
159            descriptor.leaf_digests().map_err(|e| anyhow!(FxfsError::IntegrityError).context(e))?;
160
161        Ok((Self::new(root_digest, descriptor.salt().to_vec(), leaves.into()), hasher))
162    }
163}
164
165impl<S: HandleOwner> Deref for DataObjectHandle<S> {
166    type Target = StoreObjectHandle<S>;
167    fn deref(&self) -> &Self::Target {
168        &self.handle
169    }
170}
171
172impl<S: HandleOwner> DataObjectHandle<S> {
173    pub fn new(
174        owner: Arc<S>,
175        object_id: u64,
176        permanent_keys: bool,
177        attribute_id: u64,
178        size: u64,
179        fsverity_state: FsverityState,
180        options: HandleOptions,
181        trace: bool,
182        overwrite_ranges: &[Range<u64>],
183    ) -> Self {
184        Self {
185            handle: StoreObjectHandle::new(owner, object_id, permanent_keys, options, trace),
186            attribute_id,
187            content_size: AtomicU64::new(size),
188            fsverity_state: Mutex::new(fsverity_state),
189            overwrite_ranges: AllocatedRanges::new(overwrite_ranges),
190        }
191    }
192
193    pub fn attribute_id(&self) -> u64 {
194        self.attribute_id
195    }
196
197    pub fn overwrite_ranges(&self) -> &AllocatedRanges {
198        &self.overwrite_ranges
199    }
200
201    pub fn is_verified_file(&self) -> bool {
202        matches!(*self.fsverity_state.lock(), FsverityState::Some(_))
203    }
204
205    /// Sets `self.fsverity_state` to FsverityState::Started. Called at the top of `enable_verity`.
206    /// If another caller has already started but not completed `enabled_verity`, returns
207    /// FxfsError::AlreadyBound. If another caller has already completed `enable_verity`, returns
208    /// FxfsError::AlreadyExists.
209    pub fn set_fsverity_state_started(&self) -> Result<(), Error> {
210        let mut fsverity_guard = self.fsverity_state.lock();
211        match *fsverity_guard {
212            FsverityState::None => {
213                *fsverity_guard = FsverityState::Started;
214                Ok(())
215            }
216            FsverityState::Started | FsverityState::Pending(_) => {
217                Err(anyhow!(FxfsError::Unavailable))
218            }
219            FsverityState::Some(_) => Err(anyhow!(FxfsError::AlreadyExists)),
220        }
221    }
222
223    /// Sets `self.fsverity_state` to Pending. Must be called before `finalize_fsverity_state()`.
224    /// Asserts that the prior state of `self.fsverity_state` was `FsverityState::Started`.
225    pub fn set_fsverity_state_pending(&self, descriptor: FsverityStateInner) {
226        let mut fsverity_guard = self.fsverity_state.lock();
227        assert!(matches!(*fsverity_guard, FsverityState::Started));
228        *fsverity_guard = FsverityState::Pending(descriptor);
229    }
230
231    /// Sets `self.fsverity_state` to Some. Panics if the prior state of `self.fsverity_state` was
232    /// not `FsverityState::Pending(_)`.
233    pub fn finalize_fsverity_state(&self) {
234        let mut fsverity_state_guard = self.fsverity_state.lock();
235        let mut_fsverity_state = fsverity_state_guard.deref_mut();
236        let fsverity_state = std::mem::replace(mut_fsverity_state, FsverityState::None);
237        match fsverity_state {
238            FsverityState::None => panic!("Cannot go from FsverityState::None to Some"),
239            FsverityState::Started => panic!("Cannot go from FsverityState::Started to Some"),
240            FsverityState::Pending(inner) => *mut_fsverity_state = FsverityState::Some(inner),
241            FsverityState::Some(_) => panic!("Fsverity state was already set to Some"),
242        }
243        // Once we finalize the fsverity state, the file is permanently read-only. The in-memory
244        // overwrite ranges tracking is only used for writing, so we don't need them anymore. This
245        // leaves any uninitialized, but allocated, overwrite regions if there are any, rather than
246        // converting them back to sparse regions.
247        self.overwrite_ranges.clear();
248    }
249
250    /// Sets `self.fsverity_state` directly to Some without going through the entire state machine.
251    /// Used to set `self.fsverity_state` on open of a verified file. The merkle tree data is
252    /// verified against the root digest here, and will return an error if the tree is not correct.
253    pub async fn set_fsverity_state_some(&self, descriptor: FsverityMetadata) -> Result<(), Error> {
254        let (metadata, hasher) = match descriptor {
255            FsverityMetadata::Internal(root_digest, salt) => {
256                let merkle_tree = self
257                    .read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID)
258                    .await?
259                    .ok_or_else(|| anyhow!(FxfsError::Inconsistent))?;
260                let metadata = FsverityStateInner { root_digest, salt, merkle_tree };
261                let hasher = metadata.get_hasher_for_block_size(self.block_size() as usize);
262                (metadata, hasher)
263            }
264            FsverityMetadata::F2fs(verity_range) => {
265                let expected_length = verity_range.length()? as usize;
266                let mut buffer = self
267                    .allocate_buffer(expected_length.next_multiple_of(self.block_size() as usize))
268                    .await;
269                ensure!(
270                    expected_length
271                        == self
272                            .handle
273                            .read(FSVERITY_MERKLE_ATTRIBUTE_ID, verity_range.start, buffer.as_mut())
274                            .await?,
275                    FxfsError::Inconsistent
276                );
277                FsverityStateInner::from_bytes(
278                    buffer.as_slice()[0..expected_length].into(),
279                    self.block_size() as usize,
280                )?
281            }
282        };
283        // Validate the merkle tree data against the root before applying it.
284        ensure!(metadata.merkle_tree.len() % hasher.hash_size() == 0, FxfsError::Inconsistent);
285        let leaf_chunks = metadata.merkle_tree.chunks_exact(hasher.hash_size());
286        let mut builder = MerkleTreeBuilder::new(hasher);
287        for leaf in leaf_chunks {
288            builder.push_data_hash(leaf.to_vec());
289        }
290        let tree = builder.finish();
291        let root_hash = match &metadata.root_digest {
292            RootDigest::Sha256(root_hash) => root_hash.as_slice(),
293            RootDigest::Sha512(root_hash) => root_hash.as_slice(),
294        };
295
296        ensure!(root_hash == tree.root(), FxfsError::IntegrityError);
297
298        let mut fsverity_guard = self.fsverity_state.lock();
299        assert!(matches!(*fsverity_guard, FsverityState::None));
300        *fsverity_guard = FsverityState::Some(metadata);
301
302        Ok(())
303    }
304
305    /// Verifies contents of `buffer` against the corresponding hashes in the stored merkle tree.
306    /// `offset` is the logical offset in the file that `buffer` starts at. `offset` must be
307    /// block-aligned. Fails on non fsverity-enabled files.
308    fn verify_data(&self, mut offset: usize, buffer: &[u8]) -> Result<(), Error> {
309        let block_size = self.block_size() as usize;
310        assert!(offset % block_size == 0);
311        let fsverity_state = self.fsverity_state.lock();
312        match &*fsverity_state {
313            FsverityState::None => {
314                Err(anyhow!("Tried to verify read on a non verity-enabled file"))
315            }
316            FsverityState::Started | FsverityState::Pending(_) => Err(anyhow!(
317                "Enable verity has not yet completed, fsverity state: {:?}",
318                &*fsverity_state
319            )),
320            FsverityState::Some(metadata) => {
321                let hasher = metadata.get_hasher_for_block_size(block_size);
322                let leaf_nodes: Vec<&[u8]> =
323                    metadata.merkle_tree.chunks(hasher.hash_size()).collect();
324                fxfs_trace::duration!("fsverity-verify", "len" => buffer.len());
325                // TODO(b/318880297): Consider parallelizing computation.
326                for b in buffer.chunks(block_size) {
327                    ensure!(
328                        hasher.hash_block(b) == leaf_nodes[offset / block_size],
329                        anyhow!(FxfsError::Inconsistent).context("Hash mismatch")
330                    );
331                    offset += block_size;
332                }
333                Ok(())
334            }
335        }
336    }
337
338    /// Extend the file with the given extent.  The only use case for this right now is for files
339    /// that must exist at certain offsets on the device, such as super-blocks.
340    pub async fn extend<'a>(
341        &'a self,
342        transaction: &mut Transaction<'a>,
343        device_range: Range<u64>,
344    ) -> Result<(), Error> {
345        let old_end =
346            round_up(self.txn_get_size(transaction), self.block_size()).ok_or(FxfsError::TooBig)?;
347        let new_size = old_end + device_range.end - device_range.start;
348        self.store().allocator().mark_allocated(
349            transaction,
350            self.store().store_object_id(),
351            device_range.clone(),
352        )?;
353        self.txn_update_size(transaction, new_size, None).await?;
354        let key_id = self.get_key(None).await?.0;
355        transaction.add(
356            self.store().store_object_id,
357            Mutation::merge_object(
358                ObjectKey::extent(self.object_id(), self.attribute_id(), old_end..new_size),
359                ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
360            ),
361        );
362        self.update_allocated_size(transaction, device_range.end - device_range.start, 0).await
363    }
364
365    // Returns a new aligned buffer (reading the head and tail blocks if necessary) with a copy of
366    // the data from `buf`.
367    async fn align_buffer(
368        &self,
369        offset: u64,
370        buf: BufferRef<'_>,
371    ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
372        self.handle.align_buffer(self.attribute_id(), offset, buf).await
373    }
374
375    // Writes potentially unaligned data at `device_offset` and returns checksums if requested. The
376    // data will be encrypted if necessary.
377    // `buf` is mutable as an optimization, since the write may require encryption, we can encrypt
378    // the buffer in-place rather than copying to another buffer if the write is already aligned.
379    async fn write_at(
380        &self,
381        offset: u64,
382        buf: MutableBufferRef<'_>,
383        device_offset: u64,
384    ) -> Result<MaybeChecksums, Error> {
385        self.handle.write_at(self.attribute_id(), offset, buf, None, device_offset).await
386    }
387
388    /// Zeroes the given range.  The range must be aligned.  Returns the amount of data deallocated.
389    pub async fn zero(
390        &self,
391        transaction: &mut Transaction<'_>,
392        range: Range<u64>,
393    ) -> Result<(), Error> {
394        self.handle.zero(transaction, self.attribute_id(), range).await
395    }
396
397    /// The cached value for `self.fsverity_state` is set either in `open_object` or on
398    /// `enable_verity`. If set, translates `self.fsverity_state.descriptor` into an
399    /// fio::VerificationOptions instance and a root hash. Otherwise, returns None.
400    pub fn get_descriptor(&self) -> Option<(fio::VerificationOptions, Vec<u8>)> {
401        let fsverity_state = self.fsverity_state.lock();
402        match &*fsverity_state {
403            FsverityState::Some(metadata) => {
404                let (options, root_hash) = match &metadata.root_digest {
405                    RootDigest::Sha256(root_hash) => (
406                        fio::VerificationOptions {
407                            hash_algorithm: Some(fio::HashAlgorithm::Sha256),
408                            salt: Some(metadata.salt.clone()),
409                            ..Default::default()
410                        },
411                        root_hash.to_vec(),
412                    ),
413                    RootDigest::Sha512(root_hash) => (
414                        fio::VerificationOptions {
415                            hash_algorithm: Some(fio::HashAlgorithm::Sha512),
416                            salt: Some(metadata.salt.clone()),
417                            ..Default::default()
418                        },
419                        root_hash.clone(),
420                    ),
421                };
422                Some((options, root_hash))
423            }
424            _ => None,
425        }
426    }
427
428    async fn build_verity_tree(
429        &self,
430        hasher: FsVerityHasher,
431        hash_alg: fio::HashAlgorithm,
432        salt: &[u8],
433    ) -> Result<(MerkleTree, Vec<u8>), Error> {
434        let hash_len = hasher.hash_size();
435        let mut builder = MerkleTreeBuilder::new(hasher);
436        let mut offset = 0;
437        let size = self.get_size();
438        // TODO(b/314836822): Consider further tuning the buffer size to optimize
439        // performance. Experimentally, most verity-enabled files are <256K.
440        let mut buf = self.allocate_buffer(64 * self.block_size() as usize).await;
441        while offset < size {
442            // TODO(b/314842875): Consider optimizations for sparse files.
443            let read = self.read(offset, buf.as_mut()).await? as u64;
444            assert!(offset + read <= size);
445            builder.write(&buf.as_slice()[0..read as usize]);
446            offset += read;
447        }
448        let tree = builder.finish();
449        // This will include a block for the root layer, which will be used to house the descriptor.
450        let tree_data_len = tree
451            .as_ref()
452            .iter()
453            .map(|layer| (layer.len() * hash_len).next_multiple_of(self.block_size() as usize))
454            .sum();
455        let mut merkle_tree_data = Vec::<u8>::with_capacity(tree_data_len);
456        // Iterating from the top layers down to the leaves.
457        for layer in tree.as_ref().iter().rev() {
458            // Skip the root layer.
459            if layer.len() <= 1 {
460                continue;
461            }
462            merkle_tree_data.extend(layer.iter().flatten());
463            // Pad to the end of the block.
464            let padded_size = merkle_tree_data.len().next_multiple_of(self.block_size() as usize);
465            merkle_tree_data.resize(padded_size, 0);
466        }
467
468        // Zero the last block, then write the descriptor to the start of it.
469        let descriptor_offset = merkle_tree_data.len();
470        merkle_tree_data.resize(descriptor_offset + self.block_size() as usize, 0);
471        let descriptor = FsVerityDescriptorRaw::new(
472            hash_alg,
473            self.block_size(),
474            self.get_size(),
475            tree.root(),
476            salt,
477        )?;
478        descriptor.write_to_slice(&mut merkle_tree_data[descriptor_offset..])?;
479
480        Ok((tree, merkle_tree_data))
481    }
482
483    /// Reads the data attribute and computes a merkle tree from the data. The values of the
484    /// parameters required to build the merkle tree are supplied by `descriptor` (i.e. salt,
485    /// hash_algorithm, etc.) Writes the leaf nodes of the merkle tree to an attribute with id
486    /// `FSVERITY_MERKLE_ATTRIBUTE_ID`. Updates the root_hash of the `descriptor` according to the
487    /// computed merkle tree and then replaces the ObjectValue of the data attribute with
488    /// ObjectValue::VerifiedAttribute, which stores the `descriptor` inline.
489    #[trace]
490    pub async fn enable_verity(&self, options: fio::VerificationOptions) -> Result<(), Error> {
491        self.set_fsverity_state_started()?;
492        // If the merkle attribute was tombstoned in the last attempt of `enable_verity`, flushing
493        // the graveyard should process the tombstone before we start rewriting the attribute.
494        if let Some(_) = self
495            .store()
496            .tree()
497            .find(&ObjectKey::graveyard_attribute_entry(
498                self.store().graveyard_directory_object_id(),
499                self.object_id(),
500                FSVERITY_MERKLE_ATTRIBUTE_ID,
501            ))
502            .await?
503        {
504            self.store().filesystem().graveyard().flush().await;
505        }
506        let mut transaction = self.new_transaction().await?;
507        let hash_alg =
508            options.hash_algorithm.ok_or_else(|| anyhow!("No hash algorithm provided"))?;
509        let salt = options.salt.ok_or_else(|| anyhow!("No salt provided"))?;
510        let (root_digest, merkle_tree) = match hash_alg {
511            fio::HashAlgorithm::Sha256 => {
512                let hasher = FsVerityHasher::Sha256(FsVerityHasherOptions::new(
513                    salt.clone(),
514                    self.block_size() as usize,
515                ));
516                let (tree, merkle_tree_data) =
517                    self.build_verity_tree(hasher, hash_alg.clone(), &salt).await?;
518                let root: [u8; 32] = tree.root().try_into().unwrap();
519                (RootDigest::Sha256(root), merkle_tree_data)
520            }
521            fio::HashAlgorithm::Sha512 => {
522                let hasher = FsVerityHasher::Sha512(FsVerityHasherOptions::new(
523                    salt.clone(),
524                    self.block_size() as usize,
525                ));
526                let (tree, merkle_tree_data) =
527                    self.build_verity_tree(hasher, hash_alg.clone(), &salt).await?;
528                (RootDigest::Sha512(tree.root().to_vec()), merkle_tree_data)
529            }
530            _ => {
531                bail!(
532                    anyhow!(FxfsError::NotSupported)
533                        .context(format!("hash algorithm not supported"))
534                );
535            }
536        };
537        // TODO(b/314194485): Eventually want streaming writes.
538        // The merkle tree attribute should not require trimming because it should not
539        // exist.
540        self.handle
541            .write_new_attr_in_batches(
542                &mut transaction,
543                FSVERITY_MERKLE_ATTRIBUTE_ID,
544                &merkle_tree,
545                WRITE_ATTR_BATCH_SIZE,
546            )
547            .await?;
548        if merkle_tree.len() > WRITE_ATTR_BATCH_SIZE {
549            self.store().remove_attribute_from_graveyard(
550                &mut transaction,
551                self.object_id(),
552                FSVERITY_MERKLE_ATTRIBUTE_ID,
553            );
554        };
555        let descriptor_decoded =
556            FsVerityDescriptor::from_bytes(&merkle_tree, self.block_size() as usize)?;
557        let descriptor = FsverityStateInner {
558            root_digest: root_digest.clone(),
559            salt: salt.clone(),
560            merkle_tree: descriptor_decoded.leaf_digests()?.to_vec().into(),
561        };
562        self.set_fsverity_state_pending(descriptor);
563        transaction.add_with_object(
564            self.store().store_object_id(),
565            Mutation::replace_or_insert_object(
566                ObjectKey::attribute(
567                    self.object_id(),
568                    DEFAULT_DATA_ATTRIBUTE_ID,
569                    AttributeKey::Attribute,
570                ),
571                ObjectValue::verified_attribute(
572                    self.get_size(),
573                    FsverityMetadata::F2fs(0..merkle_tree.len() as u64),
574                ),
575            ),
576            AssocObj::Borrowed(self),
577        );
578        transaction.commit().await?;
579        Ok(())
580    }
581
582    /// Pre-allocate disk space for the given logical file range. If any part of the allocation
583    /// range is beyond the end of the file, the file size is updated.
584    pub async fn allocate(&self, range: Range<u64>) -> Result<(), Error> {
585        debug_assert!(range.start < range.end);
586
587        // It's not required that callers of allocate use block aligned ranges, but we need to make
588        // the extents block aligned. Luckily, fallocate in posix is allowed to allocate more than
589        // what was asked for for block alignment purposes. We just need to make sure that the size
590        // of the file is still the non-block-aligned end of the range if the size was changed.
591        let mut new_range = range.clone();
592        new_range.start = round_down(new_range.start, self.block_size());
593        // NB: FxfsError::TooBig turns into EFBIG when passed through starnix, which is the
594        // required error code when the requested range is larger than the file size.
595        new_range.end = round_up(new_range.end, self.block_size()).ok_or(FxfsError::TooBig)?;
596
597        let mut transaction = self.new_transaction().await?;
598        let mut to_allocate = Vec::new();
599        let mut to_switch = Vec::new();
600        let key_id = self.get_key(None).await?.0;
601
602        {
603            let tree = &self.store().tree;
604            let layer_set = tree.layer_set();
605            let offset_key = ObjectKey::attribute(
606                self.object_id(),
607                self.attribute_id(),
608                AttributeKey::Extent(ExtentKey::search_key_from_offset(new_range.start)),
609            );
610            let mut merger = layer_set.merger();
611            let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
612
613            loop {
614                match iter.get() {
615                    Some(ItemRef {
616                        key:
617                            ObjectKey {
618                                object_id,
619                                data:
620                                    ObjectKeyData::Attribute(
621                                        attribute_id,
622                                        AttributeKey::Extent(extent_key),
623                                    ),
624                            },
625                        value: ObjectValue::Extent(extent_value),
626                        ..
627                    }) if *object_id == self.object_id()
628                        && *attribute_id == self.attribute_id() =>
629                    {
630                        // If the start of this extent is beyond the end of the range we are
631                        // allocating, we don't have any more work to do.
632                        if new_range.end <= extent_key.range.start {
633                            break;
634                        }
635                        // Add any prefix we might need to allocate.
636                        if new_range.start < extent_key.range.start {
637                            to_allocate.push(new_range.start..extent_key.range.start);
638                            new_range.start = extent_key.range.start;
639                        }
640                        let device_offset = match extent_value {
641                            ExtentValue::None => {
642                                // If the extent value is None, it indicates a deleted extent. In
643                                // that case, we just skip it entirely. By keeping the new_range
644                                // where it is, this section will get included in the new
645                                // allocations.
646                                iter.advance().await?;
647                                continue;
648                            }
649                            ExtentValue::Some { mode: ExtentMode::OverwritePartial(_), .. }
650                            | ExtentValue::Some { mode: ExtentMode::Overwrite, .. } => {
651                                // If this extent is already in overwrite mode, we can skip it.
652                                if extent_key.range.end < new_range.end {
653                                    new_range.start = extent_key.range.end;
654                                    iter.advance().await?;
655                                    continue;
656                                } else {
657                                    new_range.start = new_range.end;
658                                    break;
659                                }
660                            }
661                            ExtentValue::Some { device_offset, .. } => *device_offset,
662                        };
663
664                        // Figure out how we have to break up the ranges.
665                        let device_offset =
666                            device_offset + (new_range.start - extent_key.range.start);
667                        if extent_key.range.end < new_range.end {
668                            to_switch.push((new_range.start..extent_key.range.end, device_offset));
669                            new_range.start = extent_key.range.end;
670                        } else {
671                            to_switch.push((new_range.start..new_range.end, device_offset));
672                            new_range.start = new_range.end;
673                            break;
674                        }
675                    }
676                    // The records are sorted so if we find something that isn't an extent or
677                    // doesn't match the object id then there are no more extent records for this
678                    // object.
679                    _ => break,
680                }
681                iter.advance().await?;
682            }
683        }
684
685        if new_range.start < new_range.end {
686            to_allocate.push(new_range.clone());
687        }
688
689        // We can update the size in the first transaction because even if subsequent transactions
690        // don't get replayed, the data between the current and new end of the file will be zero
691        // (either sparse zero or allocated zero). On the other hand, if we don't update the size
692        // in the first transaction, overwrite extents may be written past the end of the file
693        // which is an fsck error.
694        //
695        // The potential new size needs to be the non-block-aligned range end - we round up to the
696        // nearest block size for the actual allocation, but shouldn't do that for the file size.
697        let new_size = std::cmp::max(range.end, self.get_size());
698        // Make sure the mutation that flips the has_overwrite_extents advisory flag is in the
699        // first transaction, in case we split transactions. This makes it okay to only replay the
700        // first transaction if power loss occurs - the file will be in an unusual state, but not
701        // an invalid one, if only part of the allocate goes through.
702        transaction.add_with_object(
703            self.store().store_object_id(),
704            Mutation::replace_or_insert_object(
705                ObjectKey::attribute(
706                    self.object_id(),
707                    self.attribute_id(),
708                    AttributeKey::Attribute,
709                ),
710                ObjectValue::Attribute { size: new_size, has_overwrite_extents: true },
711            ),
712            AssocObj::Borrowed(self),
713        );
714
715        // The maximum number of mutations we are going to allow per transaction in allocate. This
716        // is probably quite a bit lower than the actual limit, but it should be large enough to
717        // handle most non-edge-case versions of allocate without splitting the transaction.
718        const MAX_TRANSACTION_SIZE: usize = 256;
719        for (switch_range, device_offset) in to_switch {
720            transaction.add_with_object(
721                self.store().store_object_id(),
722                Mutation::merge_object(
723                    ObjectKey::extent(self.object_id(), self.attribute_id(), switch_range),
724                    ObjectValue::Extent(ExtentValue::initialized_overwrite_extent(
725                        device_offset,
726                        key_id,
727                    )),
728                ),
729                AssocObj::Borrowed(self),
730            );
731            if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
732                transaction.commit_and_continue().await?;
733            }
734        }
735
736        let mut allocated = 0;
737        let allocator = self.store().allocator();
738        for mut allocate_range in to_allocate {
739            while allocate_range.start < allocate_range.end {
740                let device_range = allocator
741                    .allocate(
742                        &mut transaction,
743                        self.store().store_object_id(),
744                        allocate_range.end - allocate_range.start,
745                    )
746                    .await
747                    .context("allocation failed")?;
748                let device_range_len = device_range.end - device_range.start;
749
750                transaction.add_with_object(
751                    self.store().store_object_id(),
752                    Mutation::merge_object(
753                        ObjectKey::extent(
754                            self.object_id(),
755                            self.attribute_id(),
756                            allocate_range.start..allocate_range.start + device_range_len,
757                        ),
758                        ObjectValue::Extent(ExtentValue::blank_overwrite_extent(
759                            device_range.start,
760                            (device_range_len / self.block_size()) as usize,
761                            key_id,
762                        )),
763                    ),
764                    AssocObj::Borrowed(self),
765                );
766
767                allocate_range.start += device_range_len;
768                allocated += device_range_len;
769
770                if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
771                    self.update_allocated_size(&mut transaction, allocated, 0).await?;
772                    transaction.commit_and_continue().await?;
773                    allocated = 0;
774                }
775            }
776        }
777
778        self.update_allocated_size(&mut transaction, allocated, 0).await?;
779        transaction.commit().await?;
780
781        Ok(())
782    }
783
784    /// Return information on a contiguous set of extents that has the same allocation status,
785    /// starting from `start_offset`. The information returned is if this set of extents are marked
786    /// allocated/not allocated and also the size of this set (in bytes). This is used when
787    /// querying slices for volumes.
788    /// This function expects `start_offset` to be aligned to block size
789    pub async fn is_allocated(&self, start_offset: u64) -> Result<(bool, u64), Error> {
790        let block_size = self.block_size();
791        assert_eq!(start_offset % block_size, 0);
792
793        if start_offset > self.get_size() {
794            bail!(FxfsError::OutOfRange)
795        }
796
797        if start_offset == self.get_size() {
798            return Ok((false, 0));
799        }
800
801        let tree = &self.store().tree;
802        let layer_set = tree.layer_set();
803        let offset_key = ObjectKey::attribute(
804            self.object_id(),
805            self.attribute_id(),
806            AttributeKey::Extent(ExtentKey::search_key_from_offset(start_offset)),
807        );
808        let mut merger = layer_set.merger();
809        let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
810
811        let mut allocated = None;
812        let mut end = start_offset;
813
814        loop {
815            // Iterate through the extents, each time setting `end` as the end of the previous
816            // extent
817            match iter.get() {
818                Some(ItemRef {
819                    key:
820                        ObjectKey {
821                            object_id,
822                            data:
823                                ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
824                        },
825                    value: ObjectValue::Extent(extent_value),
826                    ..
827                }) => {
828                    // Equivalent of getting no extents back
829                    if *object_id != self.object_id() || *attribute_id != self.attribute_id() {
830                        if allocated == Some(false) || allocated.is_none() {
831                            end = self.get_size();
832                            allocated = Some(false);
833                        }
834                        break;
835                    }
836                    ensure!(extent_key.range.is_aligned(block_size), FxfsError::Inconsistent);
837                    if extent_key.range.start > end {
838                        // If a previous extent has already been visited and we are tracking an
839                        // allocated set, we are only interested in an extent where the range of the
840                        // current extent follows immediately after the previous one.
841                        if allocated == Some(true) {
842                            break;
843                        } else {
844                            // The gap between the previous `end` and this extent is not allocated
845                            end = extent_key.range.start;
846                            allocated = Some(false);
847                            // Continue this iteration, except now the `end` is set to the end of
848                            // the "previous" extent which is this gap between the start_offset
849                            // and the current extent
850                        }
851                    }
852
853                    // We can assume that from here, the `end` points to the end of a previous
854                    // extent.
855                    match extent_value {
856                        // The current extent has been allocated
857                        ExtentValue::Some { .. } => {
858                            // Stop searching if previous extent was marked deleted
859                            if allocated == Some(false) {
860                                break;
861                            }
862                            allocated = Some(true);
863                        }
864                        // This extent has been marked deleted
865                        ExtentValue::None => {
866                            // Stop searching if previous extent was marked allocated
867                            if allocated == Some(true) {
868                                break;
869                            }
870                            allocated = Some(false);
871                        }
872                    }
873                    end = extent_key.range.end;
874                }
875                // This occurs when there are no extents left
876                None => {
877                    if allocated == Some(false) || allocated.is_none() {
878                        end = self.get_size();
879                        allocated = Some(false);
880                    }
881                    // Otherwise, we were monitoring extents that were allocated, so just exit.
882                    break;
883                }
884                // Non-extent records (Object, Child, GraveyardEntry) are ignored.
885                Some(_) => {}
886            }
887            iter.advance().await?;
888        }
889
890        Ok((allocated.unwrap(), end - start_offset))
891    }
892
893    pub async fn txn_write<'a>(
894        &'a self,
895        transaction: &mut Transaction<'a>,
896        offset: u64,
897        buf: BufferRef<'_>,
898    ) -> Result<(), Error> {
899        if buf.is_empty() {
900            return Ok(());
901        }
902        let (aligned, mut transfer_buf) = self.align_buffer(offset, buf).await?;
903        self.multi_write(
904            transaction,
905            self.attribute_id(),
906            std::slice::from_ref(&aligned),
907            transfer_buf.as_mut(),
908        )
909        .await?;
910        if offset + buf.len() as u64 > self.txn_get_size(transaction) {
911            self.txn_update_size(transaction, offset + buf.len() as u64, None).await?;
912        }
913        Ok(())
914    }
915
916    // Writes to multiple ranges with data provided in `buf`.  The buffer can be modified in place
917    // if encryption takes place.  The ranges must all be aligned and no change to content size is
918    // applied; the caller is responsible for updating size if required.
919    pub async fn multi_write<'a>(
920        &'a self,
921        transaction: &mut Transaction<'a>,
922        attribute_id: u64,
923        ranges: &[Range<u64>],
924        buf: MutableBufferRef<'_>,
925    ) -> Result<(), Error> {
926        self.handle.multi_write(transaction, attribute_id, None, ranges, buf).await
927    }
928
929    // `buf` is mutable as an optimization, since the write may require encryption, we can
930    // encrypt the buffer in-place rather than copying to another buffer if the write is
931    // already aligned.
932    //
933    // Note: in the event of power failure during an overwrite() call, it is possible that
934    // old data (which hasn't been overwritten with new bytes yet) may be exposed to the user.
935    // Since the old data should be encrypted, it is probably safe to expose, although not ideal.
936    pub async fn overwrite(
937        &self,
938        mut offset: u64,
939        mut buf: MutableBufferRef<'_>,
940        options: OverwriteOptions,
941    ) -> Result<(), Error> {
942        assert_eq!((buf.len() as u32) % self.store().device.block_size(), 0);
943        let end = offset + buf.len() as u64;
944
945        let key_id = self.get_key(None).await?.0;
946
947        // The transaction only ends up being used if allow_allocations is true
948        let mut transaction =
949            if options.allow_allocations { Some(self.new_transaction().await?) } else { None };
950
951        // We build up a list of writes to perform later
952        let writes = FuturesUnordered::new();
953
954        if options.barrier_on_first_write {
955            self.store().device.barrier();
956        }
957
958        // We create a new scope here, so that the merger iterator will get dropped before we try to
959        // commit our transaction. Otherwise the transaction commit would block.
960        {
961            let store = self.store();
962            let store_object_id = store.store_object_id;
963            let allocator = store.allocator();
964            let tree = &store.tree;
965            let layer_set = tree.layer_set();
966            let mut merger = layer_set.merger();
967            let mut iter = merger
968                .query(Query::FullRange(&ObjectKey::attribute(
969                    self.object_id(),
970                    self.attribute_id(),
971                    AttributeKey::Extent(ExtentKey::search_key_from_offset(offset)),
972                )))
973                .await?;
974            let block_size = self.block_size();
975
976            loop {
977                let (device_offset, bytes_to_write, should_advance) = match iter.get() {
978                    Some(ItemRef {
979                        key:
980                            ObjectKey {
981                                object_id,
982                                data:
983                                    ObjectKeyData::Attribute(
984                                        attribute_id,
985                                        AttributeKey::Extent(ExtentKey { range }),
986                                    ),
987                            },
988                        value: ObjectValue::Extent(ExtentValue::Some { .. }),
989                        ..
990                    }) if *object_id == self.object_id()
991                        && *attribute_id == self.attribute_id()
992                        && range.end == offset =>
993                    {
994                        iter.advance().await?;
995                        continue;
996                    }
997                    Some(ItemRef {
998                        key:
999                            ObjectKey {
1000                                object_id,
1001                                data:
1002                                    ObjectKeyData::Attribute(
1003                                        attribute_id,
1004                                        AttributeKey::Extent(ExtentKey { range }),
1005                                    ),
1006                            },
1007                        value,
1008                        ..
1009                    }) if *object_id == self.object_id()
1010                        && *attribute_id == self.attribute_id()
1011                        && range.start <= offset =>
1012                    {
1013                        match value {
1014                            ObjectValue::Extent(ExtentValue::Some {
1015                                device_offset,
1016                                mode: ExtentMode::Raw,
1017                                ..
1018                            }) => {
1019                                ensure!(
1020                                    range.is_aligned(block_size) && device_offset % block_size == 0,
1021                                    FxfsError::Inconsistent
1022                                );
1023                                let offset_within_extent = offset - range.start;
1024                                let remaining_length_of_extent = (range
1025                                    .end
1026                                    .checked_sub(offset)
1027                                    .ok_or(FxfsError::Inconsistent)?)
1028                                    as usize;
1029                                // Yields (device_offset, bytes_to_write, should_advance)
1030                                (
1031                                    device_offset + offset_within_extent,
1032                                    min(buf.len(), remaining_length_of_extent),
1033                                    true,
1034                                )
1035                            }
1036                            ObjectValue::Extent(ExtentValue::Some { .. }) => {
1037                                // TODO(https://fxbug.dev/42066056): Maybe we should create
1038                                // a new extent without checksums?
1039                                bail!(
1040                                    "extent from ({},{}) which overlaps offset \
1041                                        {} has the wrong extent mode",
1042                                    range.start,
1043                                    range.end,
1044                                    offset
1045                                )
1046                            }
1047                            _ => {
1048                                bail!(
1049                                    "overwrite failed: extent overlapping offset {} has \
1050                                      unexpected ObjectValue",
1051                                    offset
1052                                )
1053                            }
1054                        }
1055                    }
1056                    maybe_item_ref => {
1057                        if let Some(transaction) = transaction.as_mut() {
1058                            assert_eq!(options.allow_allocations, true);
1059                            assert_eq!(offset % self.block_size(), 0);
1060
1061                            // We are going to make a new extent, but let's check if there is an
1062                            // extent after us. If there is an extent after us, then we don't want
1063                            // our new extent to bump into it...
1064                            let mut bytes_to_allocate =
1065                                round_up(buf.len() as u64, self.block_size())
1066                                    .ok_or(FxfsError::TooBig)?;
1067                            if let Some(ItemRef {
1068                                key:
1069                                    ObjectKey {
1070                                        object_id,
1071                                        data:
1072                                            ObjectKeyData::Attribute(
1073                                                attribute_id,
1074                                                AttributeKey::Extent(ExtentKey { range }),
1075                                            ),
1076                                    },
1077                                ..
1078                            }) = maybe_item_ref
1079                            {
1080                                if *object_id == self.object_id()
1081                                    && *attribute_id == self.attribute_id()
1082                                    && offset < range.start
1083                                {
1084                                    let bytes_until_next_extent = range.start - offset;
1085                                    bytes_to_allocate =
1086                                        min(bytes_to_allocate, bytes_until_next_extent);
1087                                }
1088                            }
1089
1090                            let device_range = allocator
1091                                .allocate(transaction, store_object_id, bytes_to_allocate)
1092                                .await?;
1093                            let device_range_len = device_range.end - device_range.start;
1094                            transaction.add(
1095                                store_object_id,
1096                                Mutation::insert_object(
1097                                    ObjectKey::extent(
1098                                        self.object_id(),
1099                                        self.attribute_id(),
1100                                        offset..offset + device_range_len,
1101                                    ),
1102                                    ObjectValue::Extent(ExtentValue::new_raw(
1103                                        device_range.start,
1104                                        key_id,
1105                                    )),
1106                                ),
1107                            );
1108
1109                            self.update_allocated_size(transaction, device_range_len, 0).await?;
1110
1111                            // Yields (device_offset, bytes_to_write, should_advance)
1112                            (device_range.start, min(buf.len(), device_range_len as usize), false)
1113                        } else {
1114                            bail!(
1115                                "no extent overlapping offset {}, \
1116                                and new allocations are not allowed",
1117                                offset
1118                            )
1119                        }
1120                    }
1121                };
1122                let (current_buf, remaining_buf) = buf.split_at_mut(bytes_to_write);
1123                writes.push(self.write_at(offset, current_buf, device_offset));
1124                if remaining_buf.len() == 0 {
1125                    break;
1126                } else {
1127                    buf = remaining_buf;
1128                    offset += bytes_to_write as u64;
1129                    if should_advance {
1130                        iter.advance().await?;
1131                    }
1132                }
1133            }
1134        }
1135
1136        self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1137        // The checksums are being ignored here, but we don't need to know them
1138        writes.try_collect::<Vec<MaybeChecksums>>().await?;
1139
1140        if let Some(mut transaction) = transaction {
1141            assert_eq!(options.allow_allocations, true);
1142            if !transaction.is_empty() {
1143                if end > self.get_size() {
1144                    self.grow(&mut transaction, self.get_size(), end).await?;
1145                }
1146                transaction.commit().await?;
1147            }
1148        }
1149
1150        Ok(())
1151    }
1152
1153    // Within a transaction, the size of the object might have changed, so get the size from there
1154    // if it exists, otherwise, fall back on the cached size.
1155    fn txn_get_size(&self, transaction: &Transaction<'_>) -> u64 {
1156        transaction
1157            .get_object_mutation(
1158                self.store().store_object_id,
1159                ObjectKey::attribute(
1160                    self.object_id(),
1161                    self.attribute_id(),
1162                    AttributeKey::Attribute,
1163                ),
1164            )
1165            .and_then(|m| {
1166                if let ObjectItem { value: ObjectValue::Attribute { size, .. }, .. } = m.item {
1167                    Some(size)
1168                } else {
1169                    None
1170                }
1171            })
1172            .unwrap_or_else(|| self.get_size())
1173    }
1174
1175    pub async fn txn_update_size<'a>(
1176        &'a self,
1177        transaction: &mut Transaction<'a>,
1178        new_size: u64,
1179        // Allow callers to update the has_overwrite_extents metadata if they want. If this is
1180        // Some it is set to the value, if None it is left unchanged.
1181        update_has_overwrite_extents: Option<bool>,
1182    ) -> Result<(), Error> {
1183        let key =
1184            ObjectKey::attribute(self.object_id(), self.attribute_id(), AttributeKey::Attribute);
1185        let mut mutation = if let Some(mutation) =
1186            transaction.get_object_mutation(self.store().store_object_id(), key.clone())
1187        {
1188            mutation.clone()
1189        } else {
1190            ObjectStoreMutation {
1191                item: self.store().tree().find(&key).await?.ok_or(FxfsError::NotFound)?,
1192                op: Operation::ReplaceOrInsert,
1193            }
1194        };
1195        if let ObjectValue::Attribute { size, has_overwrite_extents } = &mut mutation.item.value {
1196            *size = new_size;
1197            if let Some(update_has_overwrite_extents) = update_has_overwrite_extents {
1198                *has_overwrite_extents = update_has_overwrite_extents;
1199            }
1200        } else {
1201            bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
1202        }
1203        transaction.add_with_object(
1204            self.store().store_object_id(),
1205            Mutation::ObjectStore(mutation),
1206            AssocObj::Borrowed(self),
1207        );
1208        Ok(())
1209    }
1210
1211    async fn update_allocated_size(
1212        &self,
1213        transaction: &mut Transaction<'_>,
1214        allocated: u64,
1215        deallocated: u64,
1216    ) -> Result<(), Error> {
1217        self.handle.update_allocated_size(transaction, allocated, deallocated).await
1218    }
1219
1220    pub fn truncate_overwrite_ranges(&self, size: u64) -> Result<Option<bool>, Error> {
1221        if self
1222            .overwrite_ranges
1223            .truncate(round_up(size, self.block_size()).ok_or(FxfsError::TooBig)?)
1224        {
1225            // This returns true if there were ranges, but this truncate removed them all, which
1226            // indicates that we need to flip the has_overwrite_extents metadata flag to false.
1227            Ok(Some(false))
1228        } else {
1229            Ok(None)
1230        }
1231    }
1232
1233    pub async fn shrink<'a>(
1234        &'a self,
1235        transaction: &mut Transaction<'a>,
1236        size: u64,
1237        update_has_overwrite_extents: Option<bool>,
1238    ) -> Result<NeedsTrim, Error> {
1239        let needs_trim = self.handle.shrink(transaction, self.attribute_id(), size).await?;
1240        self.txn_update_size(transaction, size, update_has_overwrite_extents).await?;
1241        Ok(needs_trim)
1242    }
1243
1244    pub async fn grow<'a>(
1245        &'a self,
1246        transaction: &mut Transaction<'a>,
1247        old_size: u64,
1248        size: u64,
1249    ) -> Result<(), Error> {
1250        // Before growing the file, we must make sure that a previous trim has completed.
1251        let store = self.store();
1252        while matches!(
1253            store
1254                .trim_some(
1255                    transaction,
1256                    self.object_id(),
1257                    self.attribute_id(),
1258                    TrimMode::FromOffset(old_size)
1259                )
1260                .await?,
1261            TrimResult::Incomplete
1262        ) {
1263            transaction.commit_and_continue().await?;
1264        }
1265        // We might need to zero out the tail of the old last block.
1266        let block_size = self.block_size();
1267        if old_size % block_size != 0 {
1268            let layer_set = store.tree.layer_set();
1269            let mut merger = layer_set.merger();
1270            let aligned_old_size = round_down(old_size, block_size);
1271            let iter = merger
1272                .query(Query::FullRange(&ObjectKey::extent(
1273                    self.object_id(),
1274                    self.attribute_id(),
1275                    aligned_old_size..aligned_old_size + 1,
1276                )))
1277                .await?;
1278            if let Some(ItemRef {
1279                key:
1280                    ObjectKey {
1281                        object_id,
1282                        data:
1283                            ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
1284                    },
1285                value: ObjectValue::Extent(ExtentValue::Some { device_offset, key_id, .. }),
1286                ..
1287            }) = iter.get()
1288            {
1289                if *object_id == self.object_id() && *attribute_id == self.attribute_id() {
1290                    let device_offset = device_offset
1291                        .checked_add(aligned_old_size - extent_key.range.start)
1292                        .ok_or(FxfsError::Inconsistent)?;
1293                    ensure!(device_offset % block_size == 0, FxfsError::Inconsistent);
1294                    let mut buf = self.allocate_buffer(block_size as usize).await;
1295                    // In the case that this extent is in OverwritePartial mode, there is a
1296                    // possibility that the last block is allocated, but not initialized yet, in
1297                    // which case we don't actually need to bother zeroing out the tail. However,
1298                    // it's not strictly incorrect to change uninitialized data, so we skip the
1299                    // check and blindly do it to keep it simpler here.
1300                    self.read_and_decrypt(device_offset, aligned_old_size, buf.as_mut(), *key_id)
1301                        .await?;
1302                    buf.as_mut_slice()[(old_size % block_size) as usize..].fill(0);
1303                    self.multi_write(
1304                        transaction,
1305                        *attribute_id,
1306                        &[aligned_old_size..aligned_old_size + block_size],
1307                        buf.as_mut(),
1308                    )
1309                    .await?;
1310                }
1311            }
1312        }
1313        self.txn_update_size(transaction, size, None).await?;
1314        Ok(())
1315    }
1316
1317    /// Attempts to pre-allocate a `file_range` of bytes for this object.
1318    /// Returns a set of device ranges (i.e. potentially multiple extents).
1319    ///
1320    /// It may not be possible to preallocate the entire requested range in one request
1321    /// due to limitations on transaction size. In such cases, we will preallocate as much as
1322    /// we can up to some (arbitrary, internal) limit on transaction size.
1323    ///
1324    /// `file_range.start` is modified to point at the end of the logical range
1325    /// that was preallocated such that repeated calls to `preallocate_range` with new
1326    /// transactions can be used to preallocate ranges of any size.
1327    ///
1328    /// Requested range must be a multiple of block size.
1329    pub async fn preallocate_range<'a>(
1330        &'a self,
1331        transaction: &mut Transaction<'a>,
1332        file_range: &mut Range<u64>,
1333    ) -> Result<Vec<Range<u64>>, Error> {
1334        let block_size = self.block_size();
1335        assert!(file_range.is_aligned(block_size));
1336        assert!(!self.handle.is_encrypted());
1337        let mut ranges = Vec::new();
1338        let tree = &self.store().tree;
1339        let layer_set = tree.layer_set();
1340        let mut merger = layer_set.merger();
1341        let mut iter = merger
1342            .query(Query::FullRange(&ObjectKey::attribute(
1343                self.object_id(),
1344                self.attribute_id(),
1345                AttributeKey::Extent(ExtentKey::search_key_from_offset(file_range.start)),
1346            )))
1347            .await?;
1348        let mut allocated = 0;
1349        let key_id = self.get_key(None).await?.0;
1350        'outer: while file_range.start < file_range.end {
1351            let allocate_end = loop {
1352                match iter.get() {
1353                    // Case for allocated extents for the same object that overlap with file_range.
1354                    Some(ItemRef {
1355                        key:
1356                            ObjectKey {
1357                                object_id,
1358                                data:
1359                                    ObjectKeyData::Attribute(
1360                                        attribute_id,
1361                                        AttributeKey::Extent(ExtentKey { range }),
1362                                    ),
1363                            },
1364                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1365                        ..
1366                    }) if *object_id == self.object_id()
1367                        && *attribute_id == self.attribute_id()
1368                        && range.start < file_range.end =>
1369                    {
1370                        ensure!(
1371                            range.is_valid()
1372                                && range.is_aligned(block_size)
1373                                && device_offset % block_size == 0,
1374                            FxfsError::Inconsistent
1375                        );
1376                        // If the start of the requested file_range overlaps with an existing extent...
1377                        if range.start <= file_range.start {
1378                            // Record the existing extent and move on.
1379                            let device_range = device_offset
1380                                .checked_add(file_range.start - range.start)
1381                                .ok_or(FxfsError::Inconsistent)?
1382                                ..device_offset
1383                                    .checked_add(min(range.end, file_range.end) - range.start)
1384                                    .ok_or(FxfsError::Inconsistent)?;
1385                            file_range.start += device_range.end - device_range.start;
1386                            ranges.push(device_range);
1387                            if file_range.start >= file_range.end {
1388                                break 'outer;
1389                            }
1390                            iter.advance().await?;
1391                            continue;
1392                        } else {
1393                            // There's nothing allocated between file_range.start and the beginning
1394                            // of this extent.
1395                            break range.start;
1396                        }
1397                    }
1398                    // Case for deleted extents eclipsed by file_range.
1399                    Some(ItemRef {
1400                        key:
1401                            ObjectKey {
1402                                object_id,
1403                                data:
1404                                    ObjectKeyData::Attribute(
1405                                        attribute_id,
1406                                        AttributeKey::Extent(ExtentKey { range }),
1407                                    ),
1408                            },
1409                        value: ObjectValue::Extent(ExtentValue::None),
1410                        ..
1411                    }) if *object_id == self.object_id()
1412                        && *attribute_id == self.attribute_id()
1413                        && range.end < file_range.end =>
1414                    {
1415                        iter.advance().await?;
1416                    }
1417                    _ => {
1418                        // We can just preallocate the rest.
1419                        break file_range.end;
1420                    }
1421                }
1422            };
1423            let device_range = self
1424                .store()
1425                .allocator()
1426                .allocate(
1427                    transaction,
1428                    self.store().store_object_id(),
1429                    allocate_end - file_range.start,
1430                )
1431                .await
1432                .context("Allocation failed")?;
1433            allocated += device_range.end - device_range.start;
1434            let this_file_range =
1435                file_range.start..file_range.start + device_range.end - device_range.start;
1436            file_range.start = this_file_range.end;
1437            transaction.add(
1438                self.store().store_object_id,
1439                Mutation::merge_object(
1440                    ObjectKey::extent(self.object_id(), self.attribute_id(), this_file_range),
1441                    ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
1442                ),
1443            );
1444            ranges.push(device_range);
1445            // If we didn't allocate all that we requested, we'll loop around and try again.
1446            // ... unless we have filled the transaction. The caller should check file_range.
1447            if transaction.mutations().len() > TRANSACTION_MUTATION_THRESHOLD {
1448                break;
1449            }
1450        }
1451        // Update the file size if it changed.
1452        if file_range.start > round_up(self.txn_get_size(transaction), block_size).unwrap() {
1453            self.txn_update_size(transaction, file_range.start, None).await?;
1454        }
1455        self.update_allocated_size(transaction, allocated, 0).await?;
1456        Ok(ranges)
1457    }
1458
1459    pub async fn update_attributes<'a>(
1460        &self,
1461        transaction: &mut Transaction<'a>,
1462        node_attributes: Option<&fio::MutableNodeAttributes>,
1463        change_time: Option<Timestamp>,
1464    ) -> Result<(), Error> {
1465        // This codepath is only called by files, whose wrapping key id users cannot directly set
1466        // as per fscrypt.
1467        ensure!(
1468            !matches!(
1469                node_attributes,
1470                Some(fio::MutableNodeAttributes { wrapping_key_id: Some(_), .. })
1471            ),
1472            FxfsError::BadPath
1473        );
1474        self.handle.update_attributes(transaction, node_attributes, change_time).await
1475    }
1476
1477    /// Get the default set of transaction options for this object. This is mostly the overall
1478    /// default, modified by any [`HandleOptions`] held by this handle.
1479    pub fn default_transaction_options<'b>(&self) -> Options<'b> {
1480        self.handle.default_transaction_options()
1481    }
1482
1483    pub async fn new_transaction<'b>(&self) -> Result<Transaction<'b>, Error> {
1484        self.new_transaction_with_options(self.default_transaction_options()).await
1485    }
1486
1487    pub async fn new_transaction_with_options<'b>(
1488        &self,
1489        options: Options<'b>,
1490    ) -> Result<Transaction<'b>, Error> {
1491        self.handle.new_transaction_with_options(self.attribute_id(), options).await
1492    }
1493
1494    /// Flushes the underlying device.  This is expensive and should be used sparingly.
1495    pub async fn flush_device(&self) -> Result<(), Error> {
1496        self.handle.flush_device().await
1497    }
1498
1499    /// Reads an entire attribute.
1500    pub async fn read_attr(&self, attribute_id: u64) -> Result<Option<Box<[u8]>>, Error> {
1501        self.handle.read_attr(attribute_id).await
1502    }
1503
1504    /// Writes an entire attribute.  This *always* uses the volume data key.
1505    pub async fn write_attr(&self, attribute_id: u64, data: &[u8]) -> Result<(), Error> {
1506        // Must be different attribute otherwise cached size gets out of date.
1507        assert_ne!(attribute_id, self.attribute_id());
1508        let store = self.store();
1509        let mut transaction = self.new_transaction().await?;
1510        if self.handle.write_attr(&mut transaction, attribute_id, data).await?.0 {
1511            transaction.commit_and_continue().await?;
1512            while matches!(
1513                store
1514                    .trim_some(
1515                        &mut transaction,
1516                        self.object_id(),
1517                        attribute_id,
1518                        TrimMode::FromOffset(data.len() as u64),
1519                    )
1520                    .await?,
1521                TrimResult::Incomplete
1522            ) {
1523                transaction.commit_and_continue().await?;
1524            }
1525        }
1526        transaction.commit().await?;
1527        Ok(())
1528    }
1529
1530    async fn read_and_decrypt(
1531        &self,
1532        device_offset: u64,
1533        file_offset: u64,
1534        buffer: MutableBufferRef<'_>,
1535        key_id: u64,
1536    ) -> Result<(), Error> {
1537        self.handle.read_and_decrypt(device_offset, file_offset, buffer, key_id).await
1538    }
1539
1540    /// Truncates a file to a given size (growing/shrinking as required).
1541    ///
1542    /// Nb: Most code will want to call truncate() instead. This method is used
1543    /// to update the super block -- a case where we must borrow metadata space.
1544    pub async fn truncate_with_options(
1545        &self,
1546        options: Options<'_>,
1547        size: u64,
1548    ) -> Result<(), Error> {
1549        let mut transaction = self.new_transaction_with_options(options).await?;
1550        let old_size = self.get_size();
1551        if size == old_size {
1552            return Ok(());
1553        }
1554        if size < old_size {
1555            let update_has_overwrite_ranges = self.truncate_overwrite_ranges(size)?;
1556            if self.shrink(&mut transaction, size, update_has_overwrite_ranges).await?.0 {
1557                // The file needs to be trimmed.
1558                transaction.commit_and_continue().await?;
1559                let store = self.store();
1560                while matches!(
1561                    store
1562                        .trim_some(
1563                            &mut transaction,
1564                            self.object_id(),
1565                            self.attribute_id(),
1566                            TrimMode::FromOffset(size)
1567                        )
1568                        .await?,
1569                    TrimResult::Incomplete
1570                ) {
1571                    if let Err(error) = transaction.commit_and_continue().await {
1572                        warn!(error:?; "Failed to trim after truncate");
1573                        return Ok(());
1574                    }
1575                }
1576                if let Err(error) = transaction.commit().await {
1577                    warn!(error:?; "Failed to trim after truncate");
1578                }
1579                return Ok(());
1580            }
1581        } else {
1582            self.grow(&mut transaction, old_size, size).await?;
1583        }
1584        transaction.commit().await?;
1585        Ok(())
1586    }
1587
1588    pub async fn get_properties(&self) -> Result<ObjectProperties, Error> {
1589        // We don't take a read guard here since the object properties are contained in a single
1590        // object, which cannot be inconsistent with itself. The LSM tree does not return
1591        // intermediate states for a single object.
1592        let item = self
1593            .store()
1594            .tree
1595            .find(&ObjectKey::object(self.object_id()))
1596            .await?
1597            .expect("Unable to find object record");
1598        match item.value {
1599            ObjectValue::Object {
1600                kind: ObjectKind::File { refs, .. },
1601                attributes:
1602                    ObjectAttributes {
1603                        creation_time,
1604                        modification_time,
1605                        posix_attributes,
1606                        allocated_size,
1607                        access_time,
1608                        change_time,
1609                        ..
1610                    },
1611            } => Ok(ObjectProperties {
1612                refs,
1613                allocated_size,
1614                data_attribute_size: self.get_size(),
1615                creation_time,
1616                modification_time,
1617                access_time,
1618                change_time,
1619                sub_dirs: 0,
1620                posix_attributes,
1621                casefold: false,
1622                wrapping_key_id: None,
1623            }),
1624            _ => bail!(FxfsError::NotFile),
1625        }
1626    }
1627
1628    // Returns the contents of this object. This object must be < |limit| bytes in size.
1629    pub async fn contents(&self, limit: usize) -> Result<Box<[u8]>, Error> {
1630        let size = self.get_size();
1631        if size > limit as u64 {
1632            bail!("Object too big ({} > {})", size, limit);
1633        }
1634        let mut buf = self.allocate_buffer(size as usize).await;
1635        self.read(0u64, buf.as_mut()).await?;
1636        Ok(buf.as_slice().into())
1637    }
1638
1639    /// Returns the set of file_offset->extent mappings for this file. The extents will be sorted by
1640    /// their logical offset within the file.
1641    ///
1642    /// *NOTE*: This operation is potentially expensive and should generally be avoided.
1643    pub async fn device_extents(&self) -> Result<Vec<FileExtent>, Error> {
1644        let mut extents = Vec::new();
1645        let tree = &self.store().tree;
1646        let layer_set = tree.layer_set();
1647        let mut merger = layer_set.merger();
1648        let mut iter = merger
1649            .query(Query::FullRange(&ObjectKey::attribute(
1650                self.object_id(),
1651                self.attribute_id(),
1652                AttributeKey::Extent(ExtentKey::search_key_from_offset(0)),
1653            )))
1654            .await?;
1655        loop {
1656            match iter.get() {
1657                Some(ItemRef {
1658                    key:
1659                        ObjectKey {
1660                            object_id,
1661                            data:
1662                                ObjectKeyData::Attribute(
1663                                    attribute_id,
1664                                    AttributeKey::Extent(ExtentKey { range }),
1665                                ),
1666                        },
1667                    value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1668                    ..
1669                }) if *object_id == self.object_id() && *attribute_id == self.attribute_id() => {
1670                    let logical_offset = range.start;
1671                    let device_range = *device_offset..*device_offset + range.length()?;
1672                    extents.push(FileExtent::new(logical_offset, device_range)?);
1673                }
1674                _ => break,
1675            }
1676            iter.advance().await?;
1677        }
1678        Ok(extents)
1679    }
1680}
1681
1682impl<S: HandleOwner> AssociatedObject for DataObjectHandle<S> {
1683    fn will_apply_mutation(&self, mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
1684        match mutation {
1685            Mutation::ObjectStore(ObjectStoreMutation {
1686                item: ObjectItem { value: ObjectValue::Attribute { size, .. }, .. },
1687                ..
1688            }) => self.content_size.store(*size, atomic::Ordering::Relaxed),
1689            Mutation::ObjectStore(ObjectStoreMutation {
1690                item: ObjectItem { value: ObjectValue::VerifiedAttribute { size, .. }, .. },
1691                ..
1692            }) => {
1693                debug_assert_eq!(
1694                    self.get_size(),
1695                    *size,
1696                    "size should be set when verity is enabled and must not change"
1697                );
1698                self.finalize_fsverity_state()
1699            }
1700            Mutation::ObjectStore(ObjectStoreMutation {
1701                item:
1702                    ObjectItem {
1703                        key:
1704                            ObjectKey {
1705                                object_id,
1706                                data:
1707                                    ObjectKeyData::Attribute(
1708                                        attr_id,
1709                                        AttributeKey::Extent(ExtentKey { range }),
1710                                    ),
1711                            },
1712                        value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
1713                        ..
1714                    },
1715                ..
1716            }) if self.object_id() == *object_id && self.attribute_id() == *attr_id => match mode {
1717                ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => {
1718                    self.overwrite_ranges.apply_range(range.clone())
1719                }
1720                ExtentMode::Raw | ExtentMode::Cow(_) => (),
1721            },
1722            _ => {}
1723        }
1724    }
1725}
1726
1727impl<S: HandleOwner> ObjectHandle for DataObjectHandle<S> {
1728    fn set_trace(&self, v: bool) {
1729        self.handle.set_trace(v)
1730    }
1731
1732    fn object_id(&self) -> u64 {
1733        self.handle.object_id()
1734    }
1735
1736    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
1737        self.handle.allocate_buffer(size)
1738    }
1739
1740    fn block_size(&self) -> u64 {
1741        self.handle.block_size()
1742    }
1743}
1744
1745#[async_trait]
1746impl<S: HandleOwner> ReadObjectHandle for DataObjectHandle<S> {
1747    async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
1748        let fs = self.store().filesystem();
1749        let guard = fs
1750            .lock_manager()
1751            .read_lock(lock_keys![LockKey::object_attribute(
1752                self.store().store_object_id,
1753                self.object_id(),
1754                self.attribute_id(),
1755            )])
1756            .await;
1757
1758        let size = self.get_size();
1759        if offset >= size {
1760            return Ok(0);
1761        }
1762        let length = min(buf.len() as u64, size - offset) as usize;
1763        buf = buf.subslice_mut(0..length);
1764        self.handle.read_unchecked(self.attribute_id(), offset, buf.reborrow(), &guard).await?;
1765        if self.is_verified_file() {
1766            self.verify_data(offset as usize, buf.as_slice())?;
1767        }
1768        Ok(length)
1769    }
1770
1771    fn get_size(&self) -> u64 {
1772        self.content_size.load(atomic::Ordering::Relaxed)
1773    }
1774}
1775
1776impl<S: HandleOwner> WriteObjectHandle for DataObjectHandle<S> {
1777    async fn write_or_append(&self, offset: Option<u64>, buf: BufferRef<'_>) -> Result<u64, Error> {
1778        let offset = offset.unwrap_or_else(|| self.get_size());
1779        let mut transaction = self.new_transaction().await?;
1780        self.txn_write(&mut transaction, offset, buf).await?;
1781        let new_size = self.txn_get_size(&transaction);
1782        transaction.commit().await?;
1783        Ok(new_size)
1784    }
1785
1786    async fn truncate(&self, size: u64) -> Result<(), Error> {
1787        self.truncate_with_options(self.default_transaction_options(), size).await
1788    }
1789
1790    async fn flush(&self) -> Result<(), Error> {
1791        Ok(())
1792    }
1793}
1794
1795/// Like object_handle::Writer, but allows custom transaction options to be set, and makes every
1796/// write go directly to the handle in a transaction.
1797pub struct DirectWriter<'a, S: HandleOwner> {
1798    handle: &'a DataObjectHandle<S>,
1799    options: transaction::Options<'a>,
1800    buffer: Buffer<'a>,
1801    offset: u64,
1802    buf_offset: usize,
1803}
1804
1805const BUFFER_SIZE: usize = 1_048_576;
1806
1807impl<S: HandleOwner> Drop for DirectWriter<'_, S> {
1808    fn drop(&mut self) {
1809        if self.buf_offset != 0 {
1810            warn!("DirectWriter: dropping data, did you forget to call complete?");
1811        }
1812    }
1813}
1814
1815impl<'a, S: HandleOwner> DirectWriter<'a, S> {
1816    pub async fn new(
1817        handle: &'a DataObjectHandle<S>,
1818        options: transaction::Options<'a>,
1819    ) -> DirectWriter<'a, S> {
1820        Self {
1821            handle,
1822            options,
1823            buffer: handle.allocate_buffer(BUFFER_SIZE).await,
1824            offset: 0,
1825            buf_offset: 0,
1826        }
1827    }
1828
1829    async fn flush(&mut self) -> Result<(), Error> {
1830        let mut transaction = self.handle.new_transaction_with_options(self.options).await?;
1831        self.handle
1832            .txn_write(&mut transaction, self.offset, self.buffer.subslice(..self.buf_offset))
1833            .await?;
1834        transaction.commit().await?;
1835        self.offset += self.buf_offset as u64;
1836        self.buf_offset = 0;
1837        Ok(())
1838    }
1839}
1840
1841impl<'a, S: HandleOwner> WriteBytes for DirectWriter<'a, S> {
1842    fn block_size(&self) -> u64 {
1843        self.handle.block_size()
1844    }
1845
1846    async fn write_bytes(&mut self, mut buf: &[u8]) -> Result<(), Error> {
1847        while buf.len() > 0 {
1848            let to_do = std::cmp::min(buf.len(), BUFFER_SIZE - self.buf_offset);
1849            self.buffer
1850                .subslice_mut(self.buf_offset..self.buf_offset + to_do)
1851                .as_mut_slice()
1852                .copy_from_slice(&buf[..to_do]);
1853            self.buf_offset += to_do;
1854            if self.buf_offset == BUFFER_SIZE {
1855                self.flush().await?;
1856            }
1857            buf = &buf[to_do..];
1858        }
1859        Ok(())
1860    }
1861
1862    async fn complete(&mut self) -> Result<(), Error> {
1863        self.flush().await?;
1864        Ok(())
1865    }
1866
1867    async fn skip(&mut self, amount: u64) -> Result<(), Error> {
1868        if (BUFFER_SIZE - self.buf_offset) as u64 > amount {
1869            self.buffer
1870                .subslice_mut(self.buf_offset..self.buf_offset + amount as usize)
1871                .as_mut_slice()
1872                .fill(0);
1873            self.buf_offset += amount as usize;
1874        } else {
1875            self.flush().await?;
1876            self.offset += amount;
1877        }
1878        Ok(())
1879    }
1880}
1881
1882#[cfg(test)]
1883mod tests {
1884    use crate::errors::FxfsError;
1885    use crate::filesystem::{
1886        FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem, SyncOptions,
1887    };
1888    use crate::fsck::{
1889        FsckOptions, fsck, fsck_volume, fsck_volume_with_options, fsck_with_options,
1890    };
1891    use crate::lsm_tree::Query;
1892    use crate::lsm_tree::types::{ItemRef, LayerIterator};
1893    use crate::object_handle::{
1894        ObjectHandle, ObjectProperties, ReadObjectHandle, WriteObjectHandle,
1895    };
1896    use crate::object_store::data_object_handle::{OverwriteOptions, WRITE_ATTR_BATCH_SIZE};
1897    use crate::object_store::directory::replace_child;
1898    use crate::object_store::object_record::{FsverityMetadata, ObjectKey, ObjectValue, Timestamp};
1899    use crate::object_store::transaction::{Mutation, Options, lock_keys};
1900    use crate::object_store::volume::root_volume;
1901    use crate::object_store::{
1902        AttributeKey, DEFAULT_DATA_ATTRIBUTE_ID, DataObjectHandle, Directory, ExtentKey,
1903        ExtentMode, ExtentValue, FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions, LockKey,
1904        NewChildStoreOptions, ObjectKeyData, ObjectStore, PosixAttributes, StoreOptions,
1905        TRANSACTION_MUTATION_THRESHOLD,
1906    };
1907    use crate::range::RangeExt;
1908    use crate::round::{round_down, round_up};
1909    use assert_matches::assert_matches;
1910    use bit_vec::BitVec;
1911    use fsverity_merkle::{FsVerityDescriptor, FsVerityDescriptorRaw};
1912    use fuchsia_sync::Mutex;
1913    use futures::FutureExt;
1914    use futures::channel::oneshot::channel;
1915    use futures::stream::{FuturesUnordered, StreamExt};
1916    use fxfs_crypto::{Crypt, EncryptionKey, KeyPurpose};
1917    use fxfs_insecure_crypto::new_insecure_crypt;
1918    use std::ops::Range;
1919    use std::sync::Arc;
1920    use std::time::Duration;
1921    use storage_device::DeviceHolder;
1922    use storage_device::fake_device::FakeDevice;
1923    use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
1924
1925    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1926
1927    // Some tests (the preallocate_range ones) currently assume that the data only occupies a single
1928    // device block.
1929    const TEST_DATA_OFFSET: u64 = 5000;
1930    const TEST_DATA: &[u8] = b"hello";
1931    const TEST_OBJECT_SIZE: u64 = 5678;
1932    const TEST_OBJECT_ALLOCATED_SIZE: u64 = 4096;
1933    const TEST_OBJECT_NAME: &str = "foo";
1934
1935    async fn test_filesystem() -> OpenFxFilesystem {
1936        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1937        FxFilesystem::new_empty(device).await.expect("new_empty failed")
1938    }
1939
1940    async fn test_filesystem_and_object_with_key(
1941        crypt: Option<&dyn Crypt>,
1942        write_object_test_data: bool,
1943    ) -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
1944        let fs = test_filesystem().await;
1945        let store = fs.root_store();
1946        let object;
1947
1948        let mut transaction = fs
1949            .clone()
1950            .new_transaction(
1951                lock_keys![LockKey::object(
1952                    store.store_object_id(),
1953                    store.root_directory_object_id()
1954                )],
1955                Options::default(),
1956            )
1957            .await
1958            .expect("new_transaction failed");
1959
1960        object = if let Some(crypt) = crypt {
1961            let object_id = store.get_next_object_id(transaction.txn_guard()).await.unwrap();
1962            let (key, unwrapped_key) =
1963                crypt.create_key(object_id.get(), KeyPurpose::Data).await.unwrap();
1964            ObjectStore::create_object_with_key(
1965                &store,
1966                &mut transaction,
1967                object_id,
1968                HandleOptions::default(),
1969                EncryptionKey::Fxfs(key),
1970                unwrapped_key,
1971            )
1972            .await
1973            .expect("create_object failed")
1974        } else {
1975            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
1976                .await
1977                .expect("create_object failed")
1978        };
1979
1980        let root_directory =
1981            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1982        root_directory
1983            .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
1984            .await
1985            .expect("add_child_file failed");
1986
1987        if write_object_test_data {
1988            let align = TEST_DATA_OFFSET as usize % TEST_DEVICE_BLOCK_SIZE as usize;
1989            let mut buf = object.allocate_buffer(align + TEST_DATA.len()).await;
1990            buf.as_mut_slice()[align..].copy_from_slice(TEST_DATA);
1991            object
1992                .txn_write(&mut transaction, TEST_DATA_OFFSET, buf.subslice(align..))
1993                .await
1994                .expect("write failed");
1995        }
1996        transaction.commit().await.expect("commit failed");
1997        object.truncate(TEST_OBJECT_SIZE).await.expect("truncate failed");
1998        (fs, object)
1999    }
2000
2001    async fn test_filesystem_and_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
2002        test_filesystem_and_object_with_key(Some(&new_insecure_crypt()), true).await
2003    }
2004
2005    async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
2006    {
2007        test_filesystem_and_object_with_key(Some(&new_insecure_crypt()), false).await
2008    }
2009
2010    #[fuchsia::test]
2011    async fn test_zero_buf_len_read() {
2012        let (fs, object) = test_filesystem_and_object().await;
2013        let mut buf = object.allocate_buffer(0).await;
2014        assert_eq!(object.read(0u64, buf.as_mut()).await.expect("read failed"), 0);
2015        fs.close().await.expect("Close failed");
2016    }
2017
2018    #[fuchsia::test]
2019    async fn test_beyond_eof_read() {
2020        let (fs, object) = test_filesystem_and_object().await;
2021        let offset = TEST_OBJECT_SIZE as usize - 2;
2022        let align = offset % fs.block_size() as usize;
2023        let len: usize = 2;
2024        let mut buf = object.allocate_buffer(align + len + 1).await;
2025        buf.as_mut_slice().fill(123u8);
2026        assert_eq!(
2027            object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
2028            align + len
2029        );
2030        assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
2031        assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
2032        fs.close().await.expect("Close failed");
2033    }
2034
2035    #[fuchsia::test]
2036    async fn test_beyond_eof_read_from() {
2037        let (fs, object) = test_filesystem_and_object().await;
2038        let handle = &*object;
2039        let offset = TEST_OBJECT_SIZE as usize - 2;
2040        let align = offset % fs.block_size() as usize;
2041        let len: usize = 2;
2042        let mut buf = object.allocate_buffer(align + len + 1).await;
2043        buf.as_mut_slice().fill(123u8);
2044        assert_eq!(
2045            handle.read(0, (offset - align) as u64, buf.as_mut()).await.expect("read failed"),
2046            align + len
2047        );
2048        assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
2049        assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
2050        fs.close().await.expect("Close failed");
2051    }
2052
2053    #[fuchsia::test]
2054    async fn test_beyond_eof_read_unchecked() {
2055        let (fs, object) = test_filesystem_and_object().await;
2056        let offset = TEST_OBJECT_SIZE as usize - 2;
2057        let align = offset % fs.block_size() as usize;
2058        let len: usize = 2;
2059        let mut buf = object.allocate_buffer(align + len + 1).await;
2060        buf.as_mut_slice().fill(123u8);
2061        let guard = fs
2062            .lock_manager()
2063            .read_lock(lock_keys![LockKey::object_attribute(
2064                object.store().store_object_id,
2065                object.object_id(),
2066                0,
2067            )])
2068            .await;
2069        object
2070            .read_unchecked(0, (offset - align) as u64, buf.as_mut(), &guard)
2071            .await
2072            .expect("read failed");
2073        assert_eq!(&buf.as_slice()[align..], &vec![0u8; len + 1]);
2074        fs.close().await.expect("Close failed");
2075    }
2076
2077    #[fuchsia::test]
2078    async fn test_read_sparse() {
2079        let (fs, object) = test_filesystem_and_object().await;
2080        // Deliberately read not right to eof.
2081        let len = TEST_OBJECT_SIZE as usize - 1;
2082        let mut buf = object.allocate_buffer(len).await;
2083        buf.as_mut_slice().fill(123u8);
2084        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
2085        let mut expected = vec![0; len];
2086        let offset = TEST_DATA_OFFSET as usize;
2087        expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
2088        assert_eq!(buf.as_slice()[..len], expected[..]);
2089        fs.close().await.expect("Close failed");
2090    }
2091
2092    #[fuchsia::test]
2093    async fn test_read_after_writes_interspersed_with_flush() {
2094        let (fs, object) = test_filesystem_and_object().await;
2095
2096        object.owner().flush().await.expect("flush failed");
2097
2098        // Write more test data to the first block fo the file.
2099        let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
2100        buf.as_mut_slice().copy_from_slice(TEST_DATA);
2101        object.write_or_append(Some(0u64), buf.as_ref()).await.expect("write failed");
2102
2103        let len = TEST_OBJECT_SIZE as usize - 1;
2104        let mut buf = object.allocate_buffer(len).await;
2105        buf.as_mut_slice().fill(123u8);
2106        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
2107
2108        let mut expected = vec![0u8; len];
2109        let offset = TEST_DATA_OFFSET as usize;
2110        expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
2111        expected[..TEST_DATA.len()].copy_from_slice(TEST_DATA);
2112        assert_eq!(buf.as_slice(), &expected);
2113        fs.close().await.expect("Close failed");
2114    }
2115
2116    #[fuchsia::test]
2117    async fn test_read_after_truncate_and_extend() {
2118        let (fs, object) = test_filesystem_and_object().await;
2119
2120        // Arrange for there to be <extent><deleted-extent><extent>.
2121        let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
2122        buf.as_mut_slice().copy_from_slice(TEST_DATA);
2123        // This adds an extent at 0..512.
2124        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2125        // This deletes 512..1024.
2126        object.truncate(3).await.expect("truncate failed");
2127        let data = b"foo";
2128        let offset = 1500u64;
2129        let align = (offset % fs.block_size() as u64) as usize;
2130        let mut buf = object.allocate_buffer(align + data.len()).await;
2131        buf.as_mut_slice()[align..].copy_from_slice(data);
2132        // This adds 1024..1536.
2133        object.write_or_append(Some(1500), buf.subslice(align..)).await.expect("write failed");
2134
2135        const LEN1: usize = 1503;
2136        let mut buf = object.allocate_buffer(LEN1).await;
2137        buf.as_mut_slice().fill(123u8);
2138        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN1);
2139        let mut expected = [0; LEN1];
2140        expected[..3].copy_from_slice(&TEST_DATA[..3]);
2141        expected[1500..].copy_from_slice(b"foo");
2142        assert_eq!(buf.as_slice(), &expected);
2143
2144        // Also test a read that ends midway through the deleted extent.
2145        const LEN2: usize = 601;
2146        let mut buf = object.allocate_buffer(LEN2).await;
2147        buf.as_mut_slice().fill(123u8);
2148        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN2);
2149        assert_eq!(buf.as_slice(), &expected[..LEN2]);
2150        fs.close().await.expect("Close failed");
2151    }
2152
2153    #[fuchsia::test]
2154    async fn test_read_whole_blocks_with_multiple_objects() {
2155        let (fs, object) = test_filesystem_and_object().await;
2156        let block_size = object.block_size() as usize;
2157        let mut buffer = object.allocate_buffer(block_size).await;
2158        buffer.as_mut_slice().fill(0xaf);
2159        object.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2160
2161        let store = object.owner();
2162        let mut transaction = fs
2163            .clone()
2164            .new_transaction(lock_keys![], Options::default())
2165            .await
2166            .expect("new_transaction failed");
2167        let object2 =
2168            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2169                .await
2170                .expect("create_object failed");
2171        transaction.commit().await.expect("commit failed");
2172        let mut ef_buffer = object.allocate_buffer(block_size).await;
2173        ef_buffer.as_mut_slice().fill(0xef);
2174        object2.write_or_append(Some(0), ef_buffer.as_ref()).await.expect("write failed");
2175
2176        let mut buffer = object.allocate_buffer(block_size).await;
2177        buffer.as_mut_slice().fill(0xaf);
2178        object
2179            .write_or_append(Some(block_size as u64), buffer.as_ref())
2180            .await
2181            .expect("write failed");
2182        object.truncate(3 * block_size as u64).await.expect("truncate failed");
2183        object2
2184            .write_or_append(Some(block_size as u64), ef_buffer.as_ref())
2185            .await
2186            .expect("write failed");
2187
2188        let mut buffer = object.allocate_buffer(4 * block_size).await;
2189        buffer.as_mut_slice().fill(123);
2190        assert_eq!(object.read(0, buffer.as_mut()).await.expect("read failed"), 3 * block_size);
2191        assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xaf; 2 * block_size]);
2192        assert_eq!(&buffer.as_slice()[2 * block_size..3 * block_size], &vec![0; block_size]);
2193        assert_eq!(object2.read(0, buffer.as_mut()).await.expect("read failed"), 2 * block_size);
2194        assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xef; 2 * block_size]);
2195        fs.close().await.expect("Close failed");
2196    }
2197
2198    #[fuchsia::test]
2199    async fn test_alignment() {
2200        let (fs, object) = test_filesystem_and_object().await;
2201
2202        struct AlignTest {
2203            fill: u8,
2204            object: DataObjectHandle<ObjectStore>,
2205            mirror: Vec<u8>,
2206        }
2207
2208        impl AlignTest {
2209            async fn new(object: DataObjectHandle<ObjectStore>) -> Self {
2210                let mirror = {
2211                    let mut buf = object.allocate_buffer(object.get_size() as usize).await;
2212                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2213                    buf.as_slice().to_vec()
2214                };
2215                Self { fill: 0, object, mirror }
2216            }
2217
2218            // Fills |range| of self.object with a byte value (self.fill) and mirrors the same
2219            // operation to an in-memory copy of the object.
2220            // Each subsequent call bumps the value of fill.
2221            // It is expected that the object and its mirror maintain identical content.
2222            async fn test(&mut self, range: Range<u64>) {
2223                let mut buf = self.object.allocate_buffer((range.end - range.start) as usize).await;
2224                self.fill += 1;
2225                buf.as_mut_slice().fill(self.fill);
2226                self.object
2227                    .write_or_append(Some(range.start), buf.as_ref())
2228                    .await
2229                    .expect("write_or_append failed");
2230                if range.end > self.mirror.len() as u64 {
2231                    self.mirror.resize(range.end as usize, 0);
2232                }
2233                self.mirror[range.start as usize..range.end as usize].fill(self.fill);
2234                let mut buf = self.object.allocate_buffer(self.mirror.len() + 1).await;
2235                assert_eq!(
2236                    self.object.read(0, buf.as_mut()).await.expect("read failed"),
2237                    self.mirror.len()
2238                );
2239                assert_eq!(&buf.as_slice()[..self.mirror.len()], self.mirror.as_slice());
2240            }
2241        }
2242
2243        let block_size = object.block_size() as u64;
2244        let mut align = AlignTest::new(object).await;
2245
2246        // Fill the object to start with (with 1).
2247        align.test(0..2 * block_size + 1).await;
2248
2249        // Unaligned head (fills with 2, overwrites that with 3).
2250        align.test(1..block_size).await;
2251        align.test(1..2 * block_size).await;
2252
2253        // Unaligned tail (fills with 4 and 5).
2254        align.test(0..block_size - 1).await;
2255        align.test(0..2 * block_size - 1).await;
2256
2257        // Both unaligned (fills with 6 and 7).
2258        align.test(1..block_size - 1).await;
2259        align.test(1..2 * block_size - 1).await;
2260
2261        fs.close().await.expect("Close failed");
2262    }
2263
2264    async fn test_preallocate_common(fs: &FxFilesystem, object: DataObjectHandle<ObjectStore>) {
2265        let allocator = fs.allocator();
2266        let allocated_before = allocator.get_allocated_bytes();
2267        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2268        object
2269            .preallocate_range(&mut transaction, &mut (0..fs.block_size() as u64))
2270            .await
2271            .expect("preallocate_range failed");
2272        transaction.commit().await.expect("commit failed");
2273        assert!(object.get_size() < 1048576);
2274        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2275        object
2276            .preallocate_range(&mut transaction, &mut (0..1048576))
2277            .await
2278            .expect("preallocate_range failed");
2279        transaction.commit().await.expect("commit failed");
2280        assert_eq!(object.get_size(), 1048576);
2281        // Check that it didn't reallocate the space for the existing extent
2282        let allocated_after = allocator.get_allocated_bytes();
2283        assert_eq!(allocated_after - allocated_before, 1048576 - fs.block_size() as u64);
2284
2285        let mut buf = object
2286            .allocate_buffer(round_up(TEST_DATA_OFFSET, fs.block_size()).unwrap() as usize)
2287            .await;
2288        buf.as_mut_slice().fill(47);
2289        object
2290            .write_or_append(Some(0), buf.subslice(..TEST_DATA_OFFSET as usize))
2291            .await
2292            .expect("write failed");
2293        buf.as_mut_slice().fill(95);
2294        let offset = round_up(TEST_OBJECT_SIZE, fs.block_size()).unwrap();
2295        object
2296            .overwrite(offset, buf.as_mut(), OverwriteOptions::default())
2297            .await
2298            .expect("write failed");
2299
2300        // Make sure there were no more allocations.
2301        assert_eq!(allocator.get_allocated_bytes(), allocated_after);
2302
2303        // Read back the data and make sure it is what we expect.
2304        let mut buf = object.allocate_buffer(104876).await;
2305        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2306        assert_eq!(&buf.as_slice()[..TEST_DATA_OFFSET as usize], &[47; TEST_DATA_OFFSET as usize]);
2307        assert_eq!(
2308            &buf.as_slice()[TEST_DATA_OFFSET as usize..TEST_DATA_OFFSET as usize + TEST_DATA.len()],
2309            TEST_DATA
2310        );
2311        assert_eq!(&buf.as_slice()[offset as usize..offset as usize + 2048], &[95; 2048]);
2312    }
2313
2314    #[fuchsia::test]
2315    async fn test_preallocate_range() {
2316        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2317        test_preallocate_common(&fs, object).await;
2318        fs.close().await.expect("Close failed");
2319    }
2320
2321    // This is identical to the previous test except that we flush so that extents end up in
2322    // different layers.
2323    #[fuchsia::test]
2324    async fn test_preallocate_succeeds_when_extents_are_in_different_layers() {
2325        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2326        object.owner().flush().await.expect("flush failed");
2327        test_preallocate_common(&fs, object).await;
2328        fs.close().await.expect("Close failed");
2329    }
2330
2331    #[fuchsia::test]
2332    async fn test_already_preallocated() {
2333        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2334        let allocator = fs.allocator();
2335        let allocated_before = allocator.get_allocated_bytes();
2336        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2337        let offset = TEST_DATA_OFFSET - TEST_DATA_OFFSET % fs.block_size() as u64;
2338        object
2339            .preallocate_range(&mut transaction, &mut (offset..offset + fs.block_size() as u64))
2340            .await
2341            .expect("preallocate_range failed");
2342        transaction.commit().await.expect("commit failed");
2343        // Check that it didn't reallocate any new space.
2344        assert_eq!(allocator.get_allocated_bytes(), allocated_before);
2345        fs.close().await.expect("Close failed");
2346    }
2347
2348    #[fuchsia::test]
2349    async fn test_overwrite_when_preallocated_at_start_of_file() {
2350        // The standard test data we put in the test object would cause an extent with checksums
2351        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2352        let (fs, object) = test_filesystem_and_empty_object().await;
2353
2354        let object = ObjectStore::open_object(
2355            object.owner(),
2356            object.object_id(),
2357            HandleOptions::default(),
2358            None,
2359        )
2360        .await
2361        .expect("open_object failed");
2362
2363        assert_eq!(fs.block_size(), 4096);
2364
2365        let mut write_buf = object.allocate_buffer(4096).await;
2366        write_buf.as_mut_slice().fill(95);
2367
2368        // First try to overwrite without allowing allocations
2369        // We expect this to fail, since nothing is allocated yet
2370        object
2371            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2372            .await
2373            .expect_err("overwrite succeeded");
2374
2375        // Now preallocate some space (exactly one block)
2376        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2377        object
2378            .preallocate_range(&mut transaction, &mut (0..4096 as u64))
2379            .await
2380            .expect("preallocate_range failed");
2381        transaction.commit().await.expect("commit failed");
2382
2383        // Now try the same overwrite command as before, it should work this time,
2384        // even with allocations disabled...
2385        {
2386            let mut read_buf = object.allocate_buffer(4096).await;
2387            object.read(0, read_buf.as_mut()).await.expect("read failed");
2388            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2389        }
2390        object
2391            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2392            .await
2393            .expect("overwrite failed");
2394        {
2395            let mut read_buf = object.allocate_buffer(4096).await;
2396            object.read(0, read_buf.as_mut()).await.expect("read failed");
2397            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2398        }
2399
2400        // Now try to overwrite at offset 4096. We expect this to fail, since we only preallocated
2401        // one block earlier at offset 0
2402        object
2403            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2404            .await
2405            .expect_err("overwrite succeeded");
2406
2407        // We can't assert anything about the existing bytes, because they haven't been allocated
2408        // yet and they could contain any values
2409        object
2410            .overwrite(
2411                4096,
2412                write_buf.as_mut(),
2413                OverwriteOptions { allow_allocations: true, ..Default::default() },
2414            )
2415            .await
2416            .expect("overwrite failed");
2417        {
2418            let mut read_buf = object.allocate_buffer(4096).await;
2419            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2420            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2421        }
2422
2423        // Check that the overwrites haven't messed up the filesystem state
2424        let fsck_options = FsckOptions {
2425            fail_on_warning: true,
2426            no_lock: true,
2427            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2428            ..Default::default()
2429        };
2430        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2431
2432        fs.close().await.expect("Close failed");
2433    }
2434
2435    #[fuchsia::test]
2436    async fn test_overwrite_large_buffer_and_file_with_many_holes() {
2437        // The standard test data we put in the test object would cause an extent with checksums
2438        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2439        let (fs, object) = test_filesystem_and_empty_object().await;
2440
2441        let object = ObjectStore::open_object(
2442            object.owner(),
2443            object.object_id(),
2444            HandleOptions::default(),
2445            None,
2446        )
2447        .await
2448        .expect("open_object failed");
2449
2450        assert_eq!(fs.block_size(), 4096);
2451        assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2452
2453        // Let's create some non-holes
2454        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2455        object
2456            .preallocate_range(&mut transaction, &mut (4096..8192 as u64))
2457            .await
2458            .expect("preallocate_range failed");
2459        object
2460            .preallocate_range(&mut transaction, &mut (16384..32768 as u64))
2461            .await
2462            .expect("preallocate_range failed");
2463        object
2464            .preallocate_range(&mut transaction, &mut (65536..131072 as u64))
2465            .await
2466            .expect("preallocate_range failed");
2467        object
2468            .preallocate_range(&mut transaction, &mut (262144..524288 as u64))
2469            .await
2470            .expect("preallocate_range failed");
2471        transaction.commit().await.expect("commit failed");
2472
2473        assert_eq!(object.get_size(), 524288);
2474
2475        let mut write_buf = object.allocate_buffer(4096).await;
2476        write_buf.as_mut_slice().fill(95);
2477
2478        // We shouldn't be able to overwrite in the holes if new allocations aren't enabled
2479        object
2480            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2481            .await
2482            .expect_err("overwrite succeeded");
2483        object
2484            .overwrite(8192, write_buf.as_mut(), OverwriteOptions::default())
2485            .await
2486            .expect_err("overwrite succeeded");
2487        object
2488            .overwrite(32768, write_buf.as_mut(), OverwriteOptions::default())
2489            .await
2490            .expect_err("overwrite succeeded");
2491        object
2492            .overwrite(131072, write_buf.as_mut(), OverwriteOptions::default())
2493            .await
2494            .expect_err("overwrite succeeded");
2495
2496        // But we should be able to overwrite in the prealloc'd areas without needing allocations
2497        {
2498            let mut read_buf = object.allocate_buffer(4096).await;
2499            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2500            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2501        }
2502        object
2503            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2504            .await
2505            .expect("overwrite failed");
2506        {
2507            let mut read_buf = object.allocate_buffer(4096).await;
2508            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2509            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2510        }
2511        {
2512            let mut read_buf = object.allocate_buffer(4096).await;
2513            object.read(16384, read_buf.as_mut()).await.expect("read failed");
2514            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2515        }
2516        object
2517            .overwrite(16384, write_buf.as_mut(), OverwriteOptions::default())
2518            .await
2519            .expect("overwrite failed");
2520        {
2521            let mut read_buf = object.allocate_buffer(4096).await;
2522            object.read(16384, read_buf.as_mut()).await.expect("read failed");
2523            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2524        }
2525        {
2526            let mut read_buf = object.allocate_buffer(4096).await;
2527            object.read(65536, read_buf.as_mut()).await.expect("read failed");
2528            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2529        }
2530        object
2531            .overwrite(65536, write_buf.as_mut(), OverwriteOptions::default())
2532            .await
2533            .expect("overwrite failed");
2534        {
2535            let mut read_buf = object.allocate_buffer(4096).await;
2536            object.read(65536, read_buf.as_mut()).await.expect("read failed");
2537            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2538        }
2539        {
2540            let mut read_buf = object.allocate_buffer(4096).await;
2541            object.read(262144, read_buf.as_mut()).await.expect("read failed");
2542            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2543        }
2544        object
2545            .overwrite(262144, write_buf.as_mut(), OverwriteOptions::default())
2546            .await
2547            .expect("overwrite failed");
2548        {
2549            let mut read_buf = object.allocate_buffer(4096).await;
2550            object.read(262144, read_buf.as_mut()).await.expect("read failed");
2551            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2552        }
2553
2554        // Now let's try to do a huge overwrite, that spans over many holes and non-holes
2555        let mut huge_write_buf = object.allocate_buffer(524288).await;
2556        huge_write_buf.as_mut_slice().fill(96);
2557
2558        // With allocations disabled, the big overwrite should fail...
2559        object
2560            .overwrite(0, huge_write_buf.as_mut(), OverwriteOptions::default())
2561            .await
2562            .expect_err("overwrite succeeded");
2563        // ... but it should work when allocations are enabled
2564        object
2565            .overwrite(
2566                0,
2567                huge_write_buf.as_mut(),
2568                OverwriteOptions { allow_allocations: true, ..Default::default() },
2569            )
2570            .await
2571            .expect("overwrite failed");
2572        {
2573            let mut read_buf = object.allocate_buffer(524288).await;
2574            object.read(0, read_buf.as_mut()).await.expect("read failed");
2575            assert_eq!(&read_buf.as_slice(), &[96; 524288]);
2576        }
2577
2578        // Check that the overwrites haven't messed up the filesystem state
2579        let fsck_options = FsckOptions {
2580            fail_on_warning: true,
2581            no_lock: true,
2582            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2583            ..Default::default()
2584        };
2585        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2586
2587        fs.close().await.expect("Close failed");
2588    }
2589
2590    #[fuchsia::test]
2591    async fn test_overwrite_when_unallocated_at_start_of_file() {
2592        // The standard test data we put in the test object would cause an extent with checksums
2593        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2594        let (fs, object) = test_filesystem_and_empty_object().await;
2595
2596        let object = ObjectStore::open_object(
2597            object.owner(),
2598            object.object_id(),
2599            HandleOptions::default(),
2600            None,
2601        )
2602        .await
2603        .expect("open_object failed");
2604
2605        assert_eq!(fs.block_size(), 4096);
2606
2607        let mut write_buf = object.allocate_buffer(4096).await;
2608        write_buf.as_mut_slice().fill(95);
2609
2610        // First try to overwrite without allowing allocations
2611        // We expect this to fail, since nothing is allocated yet
2612        object
2613            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2614            .await
2615            .expect_err("overwrite succeeded");
2616
2617        // Now try the same overwrite command as before, but allow allocations
2618        object
2619            .overwrite(
2620                0,
2621                write_buf.as_mut(),
2622                OverwriteOptions { allow_allocations: true, ..Default::default() },
2623            )
2624            .await
2625            .expect("overwrite failed");
2626        {
2627            let mut read_buf = object.allocate_buffer(4096).await;
2628            object.read(0, read_buf.as_mut()).await.expect("read failed");
2629            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2630        }
2631
2632        // Now try to overwrite at the next block. This should fail if allocations are disabled
2633        object
2634            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2635            .await
2636            .expect_err("overwrite succeeded");
2637
2638        // ... but it should work if allocations are enabled
2639        object
2640            .overwrite(
2641                4096,
2642                write_buf.as_mut(),
2643                OverwriteOptions { allow_allocations: true, ..Default::default() },
2644            )
2645            .await
2646            .expect("overwrite failed");
2647        {
2648            let mut read_buf = object.allocate_buffer(4096).await;
2649            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2650            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2651        }
2652
2653        // Check that the overwrites haven't messed up the filesystem state
2654        let fsck_options = FsckOptions {
2655            fail_on_warning: true,
2656            no_lock: true,
2657            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2658            ..Default::default()
2659        };
2660        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2661
2662        fs.close().await.expect("Close failed");
2663    }
2664
2665    #[fuchsia::test]
2666    async fn test_overwrite_can_extend_a_file() {
2667        // The standard test data we put in the test object would cause an extent with checksums
2668        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2669        let (fs, object) = test_filesystem_and_empty_object().await;
2670
2671        let object = ObjectStore::open_object(
2672            object.owner(),
2673            object.object_id(),
2674            HandleOptions::default(),
2675            None,
2676        )
2677        .await
2678        .expect("open_object failed");
2679
2680        assert_eq!(fs.block_size(), 4096);
2681        assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2682
2683        let mut write_buf = object.allocate_buffer(4096).await;
2684        write_buf.as_mut_slice().fill(95);
2685
2686        // Let's try to fill up the last block, and increase the file size in doing so
2687        let last_block_offset = round_down(TEST_OBJECT_SIZE, 4096 as u32);
2688
2689        // Expected to fail with allocations disabled
2690        object
2691            .overwrite(last_block_offset, write_buf.as_mut(), OverwriteOptions::default())
2692            .await
2693            .expect_err("overwrite succeeded");
2694        // ... but expected to succeed with allocations enabled
2695        object
2696            .overwrite(
2697                last_block_offset,
2698                write_buf.as_mut(),
2699                OverwriteOptions { allow_allocations: true, ..Default::default() },
2700            )
2701            .await
2702            .expect("overwrite failed");
2703        {
2704            let mut read_buf = object.allocate_buffer(4096).await;
2705            object.read(last_block_offset, read_buf.as_mut()).await.expect("read failed");
2706            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2707        }
2708
2709        assert_eq!(object.get_size(), 8192);
2710
2711        // Let's try to write at the next block, too
2712        let next_block_offset = round_up(TEST_OBJECT_SIZE, 4096 as u32).unwrap();
2713
2714        // Expected to fail with allocations disabled
2715        object
2716            .overwrite(next_block_offset, write_buf.as_mut(), OverwriteOptions::default())
2717            .await
2718            .expect_err("overwrite succeeded");
2719        // ... but expected to succeed with allocations enabled
2720        object
2721            .overwrite(
2722                next_block_offset,
2723                write_buf.as_mut(),
2724                OverwriteOptions { allow_allocations: true, ..Default::default() },
2725            )
2726            .await
2727            .expect("overwrite failed");
2728        {
2729            let mut read_buf = object.allocate_buffer(4096).await;
2730            object.read(next_block_offset, read_buf.as_mut()).await.expect("read failed");
2731            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2732        }
2733
2734        assert_eq!(object.get_size(), 12288);
2735
2736        // Check that the overwrites haven't messed up the filesystem state
2737        let fsck_options = FsckOptions {
2738            fail_on_warning: true,
2739            no_lock: true,
2740            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2741            ..Default::default()
2742        };
2743        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2744
2745        fs.close().await.expect("Close failed");
2746    }
2747
2748    #[fuchsia::test]
2749    async fn test_enable_verity() {
2750        let fs: OpenFxFilesystem = test_filesystem().await;
2751        let mut transaction = fs
2752            .clone()
2753            .new_transaction(lock_keys![], Options::default())
2754            .await
2755            .expect("new_transaction failed");
2756        let store = fs.root_store();
2757        let object = Arc::new(
2758            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2759                .await
2760                .expect("create_object failed"),
2761        );
2762
2763        transaction.commit().await.unwrap();
2764
2765        object
2766            .enable_verity(fio::VerificationOptions {
2767                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2768                salt: Some(vec![]),
2769                ..Default::default()
2770            })
2771            .await
2772            .expect("set verified file metadata failed");
2773
2774        let handle =
2775            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2776                .await
2777                .expect("open_object failed");
2778
2779        assert!(handle.is_verified_file());
2780
2781        fs.close().await.expect("Close failed");
2782    }
2783
2784    #[fuchsia::test]
2785    async fn test_enable_verity_large_file() {
2786        // Need to make a large FakeDevice to create space for a 67 MB file.
2787        let device = DeviceHolder::new(FakeDevice::new(262144, TEST_DEVICE_BLOCK_SIZE));
2788        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2789        let root_store = fs.root_store();
2790        let mut transaction = fs
2791            .clone()
2792            .new_transaction(lock_keys![], Options::default())
2793            .await
2794            .expect("new_transaction failed");
2795
2796        let handle = ObjectStore::create_object(
2797            &root_store,
2798            &mut transaction,
2799            HandleOptions::default(),
2800            None,
2801        )
2802        .await
2803        .expect("failed to create object");
2804        transaction.commit().await.expect("commit failed");
2805        let mut offset = 0;
2806
2807        // Write a file big enough to trigger multiple transactions on enable_verity().
2808        let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2809        buf.as_mut_slice().fill(1);
2810        for _ in 0..130 {
2811            handle.write_or_append(Some(offset), buf.as_ref()).await.expect("write failed");
2812            offset += WRITE_ATTR_BATCH_SIZE as u64;
2813        }
2814
2815        handle
2816            .enable_verity(fio::VerificationOptions {
2817                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2818                salt: Some(vec![]),
2819                ..Default::default()
2820            })
2821            .await
2822            .expect("set verified file metadata failed");
2823
2824        let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2825        offset = 0;
2826        for _ in 0..130 {
2827            handle.read(offset, buf.as_mut()).await.expect("verification during read should fail");
2828            assert_eq!(buf.as_slice(), &[1; WRITE_ATTR_BATCH_SIZE]);
2829            offset += WRITE_ATTR_BATCH_SIZE as u64;
2830        }
2831
2832        fsck(fs.clone()).await.expect("fsck failed");
2833        fs.close().await.expect("Close failed");
2834    }
2835
2836    #[fuchsia::test]
2837    async fn test_retry_enable_verity_on_reboot() {
2838        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2839        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2840        let root_store = fs.root_store();
2841        let mut transaction = fs
2842            .clone()
2843            .new_transaction(lock_keys![], Options::default())
2844            .await
2845            .expect("new_transaction failed");
2846
2847        let handle = ObjectStore::create_object(
2848            &root_store,
2849            &mut transaction,
2850            HandleOptions::default(),
2851            None,
2852        )
2853        .await
2854        .expect("failed to create object");
2855        transaction.commit().await.expect("commit failed");
2856
2857        let object_id = {
2858            let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
2859            transaction.add(
2860                root_store.store_object_id(),
2861                Mutation::replace_or_insert_object(
2862                    ObjectKey::graveyard_attribute_entry(
2863                        root_store.graveyard_directory_object_id(),
2864                        handle.object_id(),
2865                        FSVERITY_MERKLE_ATTRIBUTE_ID,
2866                    ),
2867                    ObjectValue::Some,
2868                ),
2869            );
2870
2871            // This write should span three transactions. This test mimics the behavior when the
2872            // last transaction gets interrupted by a filesystem.close().
2873            handle
2874                .write_new_attr_in_batches(
2875                    &mut transaction,
2876                    FSVERITY_MERKLE_ATTRIBUTE_ID,
2877                    &vec![0; 2 * WRITE_ATTR_BATCH_SIZE],
2878                    WRITE_ATTR_BATCH_SIZE,
2879                )
2880                .await
2881                .expect("failed to write merkle attribute");
2882
2883            handle.object_id()
2884            // Drop the transaction to simulate interrupting the merkle tree creation as well as to
2885            // release the transaction locks.
2886        };
2887
2888        fs.close().await.expect("failed to close filesystem");
2889        let device = fs.take_device().await;
2890        device.reopen(false);
2891
2892        let fs =
2893            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
2894        fsck(fs.clone()).await.expect("fsck failed");
2895        fs.close().await.expect("failed to close filesystem");
2896        let device = fs.take_device().await;
2897        device.reopen(false);
2898
2899        // On open, the filesystem will call initial_reap which will call queue_tombstone().
2900        let fs = FxFilesystem::open(device).await.expect("open failed");
2901        let root_store = fs.root_store();
2902        let handle =
2903            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
2904                .await
2905                .expect("open_object failed");
2906        handle
2907            .enable_verity(fio::VerificationOptions {
2908                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2909                salt: Some(vec![]),
2910                ..Default::default()
2911            })
2912            .await
2913            .expect("set verified file metadata failed");
2914
2915        // `flush` will ensure that initial reap fully processes all the graveyard entries. This
2916        // isn't strictly necessary for the test to pass (the graveyard marker was already
2917        // processed during `enable_verity`), but it does help catch bugs, such as the attribute
2918        // graveyard entry not being removed upon processing.
2919        fs.graveyard().flush().await;
2920        assert!(
2921            FsVerityDescriptor::from_bytes(
2922                &handle
2923                    .read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID)
2924                    .await
2925                    .expect("read_attr failed")
2926                    .expect("No attr found"),
2927                handle.block_size() as usize
2928            )
2929            .is_ok()
2930        );
2931        fsck(fs.clone()).await.expect("fsck failed");
2932        fs.close().await.expect("Close failed");
2933    }
2934
2935    #[fuchsia::test]
2936    async fn test_verify_data_corrupt_file() {
2937        let fs: OpenFxFilesystem = test_filesystem().await;
2938        let mut transaction = fs
2939            .clone()
2940            .new_transaction(lock_keys![], Options::default())
2941            .await
2942            .expect("new_transaction failed");
2943        let store = fs.root_store();
2944        let object = Arc::new(
2945            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2946                .await
2947                .expect("create_object failed"),
2948        );
2949
2950        transaction.commit().await.unwrap();
2951
2952        let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
2953        buf.as_mut_slice().fill(123);
2954        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2955
2956        object
2957            .enable_verity(fio::VerificationOptions {
2958                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2959                salt: Some(vec![]),
2960                ..Default::default()
2961            })
2962            .await
2963            .expect("set verified file metadata failed");
2964
2965        // Change file contents and ensure verification fails
2966        buf.as_mut_slice().fill(234);
2967        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2968        object.read(0, buf.as_mut()).await.expect_err("verification during read should fail");
2969
2970        fs.close().await.expect("Close failed");
2971    }
2972
2973    // TODO(https://fxbug.dev/450398331): More tests to be added when this can support writing the
2974    // f2fs format natively. For now, relying on tests inside of the f2fs_reader to exercise more
2975    // paths.
2976    #[fuchsia::test]
2977    async fn test_parse_f2fs_verity() {
2978        let fs: OpenFxFilesystem = test_filesystem().await;
2979        let mut transaction = fs
2980            .clone()
2981            .new_transaction(lock_keys![], Options::default())
2982            .await
2983            .expect("new_transaction failed");
2984        let store = fs.root_store();
2985        let object = Arc::new(
2986            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2987                .await
2988                .expect("create_object failed"),
2989        );
2990
2991        transaction.commit().await.unwrap();
2992        let file_size = fs.block_size() * 2;
2993        // Write over one block to make there be leaf hashes.
2994        {
2995            let mut buf = object.allocate_buffer(file_size as usize).await;
2996            buf.as_mut_slice().fill(64);
2997            assert_eq!(
2998                object.write_or_append(None, buf.as_ref()).await.expect("Writing to file."),
2999                file_size
3000            );
3001        }
3002
3003        // Enable verity normally, then shift the type.
3004        object
3005            .enable_verity(fio::VerificationOptions {
3006                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
3007                salt: Some(vec![]),
3008                ..Default::default()
3009            })
3010            .await
3011            .expect("set verified file metadata failed");
3012        let (verity_info, root_hash) = object.get_descriptor().unwrap();
3013
3014        let mut transaction = fs
3015            .clone()
3016            .new_transaction(
3017                lock_keys![LockKey::Object {
3018                    store_object_id: store.store_object_id(),
3019                    object_id: object.object_id()
3020                }],
3021                Options::default(),
3022            )
3023            .await
3024            .expect("new_transaction failed");
3025        transaction.add(
3026            store.store_object_id(),
3027            Mutation::replace_or_insert_object(
3028                ObjectKey::attribute(
3029                    object.object_id(),
3030                    DEFAULT_DATA_ATTRIBUTE_ID,
3031                    AttributeKey::Attribute,
3032                ),
3033                ObjectValue::verified_attribute(
3034                    file_size,
3035                    FsverityMetadata::F2fs(0..(fs.block_size() * 2)),
3036                ),
3037            ),
3038        );
3039        transaction.add(
3040            store.store_object_id(),
3041            Mutation::replace_or_insert_object(
3042                ObjectKey::attribute(
3043                    object.object_id(),
3044                    FSVERITY_MERKLE_ATTRIBUTE_ID,
3045                    AttributeKey::Attribute,
3046                ),
3047                ObjectValue::attribute(fs.block_size() * 2, false),
3048            ),
3049        );
3050        {
3051            let descriptor = FsVerityDescriptorRaw::new(
3052                fio::HashAlgorithm::Sha256,
3053                fs.block_size(),
3054                file_size,
3055                root_hash.as_slice(),
3056                match &verity_info.salt {
3057                    Some(salt) => salt.as_slice(),
3058                    None => [0u8; 0].as_slice(),
3059                },
3060            )
3061            .expect("Creating descriptor");
3062            let mut buf = object.allocate_buffer(fs.block_size() as usize).await;
3063            descriptor.write_to_slice(buf.as_mut_slice()).expect("Writing descriptor to buf");
3064            object
3065                .multi_write(
3066                    &mut transaction,
3067                    FSVERITY_MERKLE_ATTRIBUTE_ID,
3068                    &[fs.block_size()..(fs.block_size() * 2)],
3069                    buf.as_mut(),
3070                )
3071                .await
3072                .expect("Writing descriptor");
3073        }
3074        transaction.commit().await.unwrap();
3075
3076        let handle =
3077            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3078                .await
3079                .expect("open_object failed");
3080
3081        assert!(handle.is_verified_file());
3082
3083        let mut buf = object.allocate_buffer(file_size as usize).await;
3084        assert_eq!(
3085            handle.read(0, buf.as_mut()).await.expect("Read whole file."),
3086            file_size as usize
3087        );
3088
3089        fs.close().await.expect("Close failed");
3090    }
3091
3092    #[fuchsia::test]
3093    async fn test_verify_data_corrupt_tree() {
3094        let fs: OpenFxFilesystem = test_filesystem().await;
3095        let object_id = {
3096            let store = fs.root_store();
3097            let mut transaction = fs
3098                .clone()
3099                .new_transaction(lock_keys![], Options::default())
3100                .await
3101                .expect("new_transaction failed");
3102            let object = Arc::new(
3103                ObjectStore::create_object(
3104                    &store,
3105                    &mut transaction,
3106                    HandleOptions::default(),
3107                    None,
3108                )
3109                .await
3110                .expect("create_object failed"),
3111            );
3112            let object_id = object.object_id();
3113
3114            transaction.commit().await.unwrap();
3115
3116            let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3117            buf.as_mut_slice().fill(123);
3118            object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3119
3120            object
3121                .enable_verity(fio::VerificationOptions {
3122                    hash_algorithm: Some(fio::HashAlgorithm::Sha256),
3123                    salt: Some(vec![]),
3124                    ..Default::default()
3125                })
3126                .await
3127                .expect("set verified file metadata failed");
3128            object.read(0, buf.as_mut()).await.expect("verified read");
3129
3130            // Corrupt the merkle tree before closing.
3131            let mut merkle = object
3132                .read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID)
3133                .await
3134                .unwrap()
3135                .expect("Reading merkle tree");
3136            merkle[0] = merkle[0].wrapping_add(1);
3137            object
3138                .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &*merkle)
3139                .await
3140                .expect("Overwriting merkle");
3141
3142            object_id
3143        }; // Close object.
3144
3145        // Reopening the object should complain about the corrupted merkle tree.
3146        assert!(
3147            ObjectStore::open_object(&fs.root_store(), object_id, HandleOptions::default(), None)
3148                .await
3149                .is_err()
3150        );
3151        fs.close().await.expect("Close failed");
3152    }
3153
3154    #[fuchsia::test]
3155    async fn test_extend() {
3156        let fs = test_filesystem().await;
3157        let handle;
3158        let mut transaction = fs
3159            .clone()
3160            .new_transaction(lock_keys![], Options::default())
3161            .await
3162            .expect("new_transaction failed");
3163        let store = fs.root_store();
3164        handle =
3165            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3166                .await
3167                .expect("create_object failed");
3168
3169        // As of writing, an empty filesystem has two 512kiB superblock extents and a little over
3170        // 256kiB of additional allocations (journal, etc) so we start use a 'magic' starting point
3171        // of 2MiB here.
3172        const START_OFFSET: u64 = 2048 * 1024;
3173        handle
3174            .extend(&mut transaction, START_OFFSET..START_OFFSET + 5 * fs.block_size() as u64)
3175            .await
3176            .expect("extend failed");
3177        transaction.commit().await.expect("commit failed");
3178        let mut buf = handle.allocate_buffer(5 * fs.block_size() as usize).await;
3179        buf.as_mut_slice().fill(123);
3180        handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3181        buf.as_mut_slice().fill(67);
3182        handle.read(0, buf.as_mut()).await.expect("read failed");
3183        assert_eq!(buf.as_slice(), &vec![123; 5 * fs.block_size() as usize]);
3184        fs.close().await.expect("Close failed");
3185    }
3186
3187    #[fuchsia::test]
3188    async fn test_truncate_deallocates_old_extents() {
3189        let (fs, object) = test_filesystem_and_object().await;
3190        let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3191        buf.as_mut_slice().fill(0xaa);
3192        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3193
3194        let allocator = fs.allocator();
3195        let allocated_before = allocator.get_allocated_bytes();
3196        object.truncate(fs.block_size() as u64).await.expect("truncate failed");
3197        let allocated_after = allocator.get_allocated_bytes();
3198        assert!(
3199            allocated_after < allocated_before,
3200            "before = {} after = {}",
3201            allocated_before,
3202            allocated_after
3203        );
3204        fs.close().await.expect("Close failed");
3205    }
3206
3207    #[fuchsia::test]
3208    async fn test_truncate_zeroes_tail_block() {
3209        let (fs, object) = test_filesystem_and_object().await;
3210
3211        WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + 3).await.expect("truncate failed");
3212        WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + TEST_DATA.len() as u64)
3213            .await
3214            .expect("truncate failed");
3215
3216        let mut buf = object.allocate_buffer(fs.block_size() as usize).await;
3217        let offset = (TEST_DATA_OFFSET % fs.block_size()) as usize;
3218        object.read(TEST_DATA_OFFSET - offset as u64, buf.as_mut()).await.expect("read failed");
3219
3220        let mut expected = TEST_DATA.to_vec();
3221        expected[3..].fill(0);
3222        assert_eq!(&buf.as_slice()[offset..offset + expected.len()], &expected);
3223    }
3224
3225    #[fuchsia::test]
3226    async fn test_trim() {
3227        // Format a new filesystem.
3228        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
3229        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
3230        let block_size = fs.block_size();
3231        root_volume(fs.clone())
3232            .await
3233            .expect("root_volume failed")
3234            .new_volume("test", NewChildStoreOptions::default())
3235            .await
3236            .expect("volume failed");
3237        fs.close().await.expect("close failed");
3238        let device = fs.take_device().await;
3239        device.reopen(false);
3240
3241        // To test trim, we open the filesystem and set up a post commit hook that runs after every
3242        // transaction.  When the hook triggers, we can fsck the volume, take a snapshot of the
3243        // device and check that it gets replayed correctly on the snapshot.  We can check that the
3244        // graveyard trims the file as expected.
3245        #[derive(Default)]
3246        struct Context {
3247            store: Option<Arc<ObjectStore>>,
3248            object_id: Option<u64>,
3249        }
3250        let shared_context = Arc::new(Mutex::new(Context::default()));
3251
3252        let object_size = (TRANSACTION_MUTATION_THRESHOLD as u64 + 10) * 2 * block_size;
3253
3254        // Wait for an object to get tombstoned by the graveyard.
3255        async fn expect_tombstoned(store: &Arc<ObjectStore>, object_id: u64) {
3256            loop {
3257                if let Err(e) =
3258                    ObjectStore::open_object(store, object_id, HandleOptions::default(), None).await
3259                {
3260                    assert!(
3261                        FxfsError::NotFound.matches(&e),
3262                        "open_object didn't fail with NotFound: {:?}",
3263                        e
3264                    );
3265                    break;
3266                }
3267                // The graveyard should eventually tombstone the object.
3268                fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3269            }
3270        }
3271
3272        // Checks to see if the object needs to be trimmed.
3273        async fn needs_trim(store: &Arc<ObjectStore>) -> Option<DataObjectHandle<ObjectStore>> {
3274            let root_directory = Directory::open(store, store.root_directory_object_id())
3275                .await
3276                .expect("open failed");
3277            let oid = root_directory.lookup("foo").await.expect("lookup failed");
3278            if let Some((oid, _, _)) = oid {
3279                let object = ObjectStore::open_object(store, oid, HandleOptions::default(), None)
3280                    .await
3281                    .expect("open_object failed");
3282                let props = object.get_properties().await.expect("get_properties failed");
3283                if props.allocated_size > 0 && props.data_attribute_size == 0 {
3284                    Some(object)
3285                } else {
3286                    None
3287                }
3288            } else {
3289                None
3290            }
3291        }
3292
3293        let shared_context_clone = shared_context.clone();
3294        let post_commit = move || {
3295            let store = shared_context_clone.lock().store.as_ref().cloned().unwrap();
3296            let shared_context = shared_context_clone.clone();
3297            async move {
3298                // First run fsck on the current filesystem.
3299                let options = FsckOptions {
3300                    fail_on_warning: true,
3301                    no_lock: true,
3302                    on_error: Box::new(|err| println!("fsck error: {:?}", err)),
3303                    ..Default::default()
3304                };
3305                let fs = store.filesystem();
3306
3307                fsck_with_options(fs.clone(), &options).await.expect("fsck_with_options failed");
3308                fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
3309                    .await
3310                    .expect("fsck_volume_with_options failed");
3311
3312                // Now check that we can replay this correctly.
3313                fs.sync(SyncOptions { flush_device: true, ..Default::default() })
3314                    .await
3315                    .expect("sync failed");
3316                let device = fs.device().snapshot().expect("snapshot failed");
3317
3318                let object_id = shared_context.lock().object_id.clone();
3319
3320                let fs2 = FxFilesystemBuilder::new()
3321                    .skip_initial_reap(object_id.is_none())
3322                    .open(device)
3323                    .await
3324                    .expect("open failed");
3325
3326                // If the "foo" file exists check that allocated size matches content size.
3327                let root_vol = root_volume(fs2.clone()).await.expect("root_volume failed");
3328                let store =
3329                    root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
3330
3331                if let Some(oid) = object_id {
3332                    // For the second pass, the object should get tombstoned.
3333                    expect_tombstoned(&store, oid).await;
3334                } else if let Some(object) = needs_trim(&store).await {
3335                    // Extend the file and make sure that it is correctly trimmed.
3336                    object.truncate(object_size).await.expect("truncate failed");
3337                    let mut buf = object.allocate_buffer(block_size as usize).await;
3338                    object
3339                        .read(object_size - block_size * 2, buf.as_mut())
3340                        .await
3341                        .expect("read failed");
3342                    assert_eq!(buf.as_slice(), &vec![0; block_size as usize]);
3343
3344                    // Remount, this time with the graveyard performing an initial reap and the
3345                    // object should get trimmed.
3346                    let fs = FxFilesystem::open(fs.device().snapshot().expect("snapshot failed"))
3347                        .await
3348                        .expect("open failed");
3349                    let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
3350                    let store = root_vol
3351                        .volume("test", StoreOptions::default())
3352                        .await
3353                        .expect("volume failed");
3354                    while needs_trim(&store).await.is_some() {
3355                        // The object has been truncated, but still has some data allocated to
3356                        // it.  The graveyard should trim the object eventually.
3357                        fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3358                    }
3359
3360                    // Run fsck.
3361                    fsck_with_options(fs.clone(), &options)
3362                        .await
3363                        .expect("fsck_with_options failed");
3364                    fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
3365                        .await
3366                        .expect("fsck_volume_with_options failed");
3367                    fs.close().await.expect("close failed");
3368                }
3369
3370                // Run fsck on fs2.
3371                fsck_with_options(fs2.clone(), &options).await.expect("fsck_with_options failed");
3372                fsck_volume_with_options(fs2.as_ref(), &options, store.store_object_id(), None)
3373                    .await
3374                    .expect("fsck_volume_with_options failed");
3375                fs2.close().await.expect("close failed");
3376            }
3377            .boxed()
3378        };
3379
3380        let fs = FxFilesystemBuilder::new()
3381            .post_commit_hook(post_commit)
3382            .open(device)
3383            .await
3384            .expect("open failed");
3385
3386        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
3387        let store = root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
3388
3389        shared_context.lock().store = Some(store.clone());
3390
3391        let root_directory =
3392            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3393
3394        let object;
3395        let mut transaction = fs
3396            .clone()
3397            .new_transaction(
3398                lock_keys![LockKey::object(
3399                    store.store_object_id(),
3400                    store.root_directory_object_id()
3401                )],
3402                Options::default(),
3403            )
3404            .await
3405            .expect("new_transaction failed");
3406        object = root_directory
3407            .create_child_file(&mut transaction, "foo")
3408            .await
3409            .expect("create_object failed");
3410        transaction.commit().await.expect("commit failed");
3411
3412        let mut transaction = fs
3413            .clone()
3414            .new_transaction(
3415                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3416                Options::default(),
3417            )
3418            .await
3419            .expect("new_transaction failed");
3420
3421        // Two passes: first with a regular object, and then with that object moved into the
3422        // graveyard.
3423        let mut pass = 0;
3424        loop {
3425            // Create enough extents in it such that when we truncate the object it will require
3426            // more than one transaction.
3427            let mut buf = object.allocate_buffer(5).await;
3428            buf.as_mut_slice().fill(1);
3429            // Write every other block.
3430            for offset in (0..object_size).into_iter().step_by(2 * block_size as usize) {
3431                object
3432                    .txn_write(&mut transaction, offset, buf.as_ref())
3433                    .await
3434                    .expect("write failed");
3435            }
3436            transaction.commit().await.expect("commit failed");
3437            // This should take up more than one transaction.
3438            WriteObjectHandle::truncate(&object, 0).await.expect("truncate failed");
3439
3440            if pass == 1 {
3441                break;
3442            }
3443
3444            // Store the object ID so that we can make sure the object is always tombstoned
3445            // after remount (see above).
3446            shared_context.lock().object_id = Some(object.object_id());
3447
3448            transaction = fs
3449                .clone()
3450                .new_transaction(
3451                    lock_keys![
3452                        LockKey::object(store.store_object_id(), store.root_directory_object_id()),
3453                        LockKey::object(store.store_object_id(), object.object_id()),
3454                    ],
3455                    Options::default(),
3456                )
3457                .await
3458                .expect("new_transaction failed");
3459
3460            // Move the object into the graveyard.
3461            replace_child(&mut transaction, None, (&root_directory, "foo"))
3462                .await
3463                .expect("replace_child failed");
3464            store.add_to_graveyard(&mut transaction, object.object_id());
3465
3466            pass += 1;
3467        }
3468
3469        fs.close().await.expect("Close failed");
3470    }
3471
3472    #[fuchsia::test]
3473    async fn test_adjust_refs() {
3474        let (fs, object) = test_filesystem_and_object().await;
3475        let store = object.owner();
3476        let mut transaction = fs
3477            .clone()
3478            .new_transaction(
3479                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3480                Options::default(),
3481            )
3482            .await
3483            .expect("new_transaction failed");
3484        assert_eq!(
3485            store
3486                .adjust_refs(&mut transaction, object.object_id(), 1)
3487                .await
3488                .expect("adjust_refs failed"),
3489            false
3490        );
3491        transaction.commit().await.expect("commit failed");
3492
3493        let allocator = fs.allocator();
3494        let allocated_before = allocator.get_allocated_bytes();
3495        let mut transaction = fs
3496            .clone()
3497            .new_transaction(
3498                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3499                Options::default(),
3500            )
3501            .await
3502            .expect("new_transaction failed");
3503        assert_eq!(
3504            store
3505                .adjust_refs(&mut transaction, object.object_id(), -2)
3506                .await
3507                .expect("adjust_refs failed"),
3508            true
3509        );
3510        transaction.commit().await.expect("commit failed");
3511
3512        assert_eq!(allocator.get_allocated_bytes(), allocated_before);
3513
3514        store
3515            .tombstone_object(
3516                object.object_id(),
3517                Options { borrow_metadata_space: true, ..Default::default() },
3518            )
3519            .await
3520            .expect("purge failed");
3521
3522        assert_eq!(allocated_before - allocator.get_allocated_bytes(), fs.block_size() as u64);
3523
3524        // We need to remove the directory entry, too, otherwise fsck will complain
3525        {
3526            let mut transaction = fs
3527                .clone()
3528                .new_transaction(
3529                    lock_keys![LockKey::object(
3530                        store.store_object_id(),
3531                        store.root_directory_object_id()
3532                    )],
3533                    Options::default(),
3534                )
3535                .await
3536                .expect("new_transaction failed");
3537            let root_directory = Directory::open(&store, store.root_directory_object_id())
3538                .await
3539                .expect("open failed");
3540            transaction.add(
3541                store.store_object_id(),
3542                Mutation::replace_or_insert_object(
3543                    ObjectKey::child(root_directory.object_id(), TEST_OBJECT_NAME, false),
3544                    ObjectValue::None,
3545                ),
3546            );
3547            transaction.commit().await.expect("commit failed");
3548        }
3549
3550        fsck_with_options(
3551            fs.clone(),
3552            &FsckOptions {
3553                fail_on_warning: true,
3554                on_error: Box::new(|err| println!("fsck error: {:?}", err)),
3555                ..Default::default()
3556            },
3557        )
3558        .await
3559        .expect("fsck_with_options failed");
3560
3561        fs.close().await.expect("Close failed");
3562    }
3563
3564    #[fuchsia::test]
3565    async fn test_locks() {
3566        let (fs, object) = test_filesystem_and_object().await;
3567        let (send1, recv1) = channel();
3568        let (send2, recv2) = channel();
3569        let (send3, recv3) = channel();
3570        let done = Mutex::new(false);
3571        let mut futures = FuturesUnordered::new();
3572        futures.push(
3573            async {
3574                let mut t = object.new_transaction().await.expect("new_transaction failed");
3575                send1.send(()).unwrap(); // Tell the next future to continue.
3576                send3.send(()).unwrap(); // Tell the last future to continue.
3577                recv2.await.unwrap();
3578                let mut buf = object.allocate_buffer(5).await;
3579                buf.as_mut_slice().copy_from_slice(b"hello");
3580                object.txn_write(&mut t, 0, buf.as_ref()).await.expect("write failed");
3581                // This is a halting problem so all we can do is sleep.
3582                fasync::Timer::new(Duration::from_millis(100)).await;
3583                assert!(!*done.lock());
3584                t.commit().await.expect("commit failed");
3585            }
3586            .boxed(),
3587        );
3588        futures.push(
3589            async {
3590                recv1.await.unwrap();
3591                // Reads should not block.
3592                let offset = TEST_DATA_OFFSET as usize;
3593                let align = offset % fs.block_size() as usize;
3594                let len = TEST_DATA.len();
3595                let mut buf = object.allocate_buffer(align + len).await;
3596                assert_eq!(
3597                    object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
3598                    align + TEST_DATA.len()
3599                );
3600                assert_eq!(&buf.as_slice()[align..], TEST_DATA);
3601                // Tell the first future to continue.
3602                send2.send(()).unwrap();
3603            }
3604            .boxed(),
3605        );
3606        futures.push(
3607            async {
3608                // This should block until the first future has completed.
3609                recv3.await.unwrap();
3610                let _t = object.new_transaction().await.expect("new_transaction failed");
3611                let mut buf = object.allocate_buffer(5).await;
3612                assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 5);
3613                assert_eq!(buf.as_slice(), b"hello");
3614            }
3615            .boxed(),
3616        );
3617        while let Some(()) = futures.next().await {}
3618        fs.close().await.expect("Close failed");
3619    }
3620
3621    #[fuchsia::test(threads = 10)]
3622    async fn test_racy_reads() {
3623        let fs = test_filesystem().await;
3624        let object;
3625        let mut transaction = fs
3626            .clone()
3627            .new_transaction(lock_keys![], Options::default())
3628            .await
3629            .expect("new_transaction failed");
3630        let store = fs.root_store();
3631        object = Arc::new(
3632            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3633                .await
3634                .expect("create_object failed"),
3635        );
3636        transaction.commit().await.expect("commit failed");
3637        for _ in 0..100 {
3638            let cloned_object = object.clone();
3639            let writer = fasync::Task::spawn(async move {
3640                let mut buf = cloned_object.allocate_buffer(10).await;
3641                buf.as_mut_slice().fill(123);
3642                cloned_object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3643            });
3644            let cloned_object = object.clone();
3645            let reader = fasync::Task::spawn(async move {
3646                let wait_time = rand::random_range(0..5);
3647                fasync::Timer::new(Duration::from_millis(wait_time)).await;
3648                let mut buf = cloned_object.allocate_buffer(10).await;
3649                buf.as_mut_slice().fill(23);
3650                let amount = cloned_object.read(0, buf.as_mut()).await.expect("write failed");
3651                // If we succeed in reading data, it must include the write; i.e. if we see the size
3652                // change, we should see the data too.  For this to succeed it requires locking on
3653                // the read size to ensure that when we read the size, we get the extents changed in
3654                // that same transaction.
3655                if amount != 0 {
3656                    assert_eq!(amount, 10);
3657                    assert_eq!(buf.as_slice(), &[123; 10]);
3658                }
3659            });
3660            writer.await;
3661            reader.await;
3662            object.truncate(0).await.expect("truncate failed");
3663        }
3664        fs.close().await.expect("Close failed");
3665    }
3666
3667    #[fuchsia::test]
3668    async fn test_allocated_size() {
3669        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
3670
3671        let before = object.get_properties().await.expect("get_properties failed").allocated_size;
3672        let mut buf = object.allocate_buffer(5).await;
3673        buf.as_mut_slice().copy_from_slice(b"hello");
3674        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3675        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3676        assert_eq!(after, before + fs.block_size() as u64);
3677
3678        // Do the same write again and there should be no change.
3679        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3680        assert_eq!(
3681            object.get_properties().await.expect("get_properties failed").allocated_size,
3682            after
3683        );
3684
3685        // extend...
3686        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3687        let offset = 1000 * fs.block_size() as u64;
3688        let before = after;
3689        object
3690            .extend(&mut transaction, offset..offset + fs.block_size() as u64)
3691            .await
3692            .expect("extend failed");
3693        transaction.commit().await.expect("commit failed");
3694        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3695        assert_eq!(after, before + fs.block_size() as u64);
3696
3697        // truncate...
3698        let before = after;
3699        let size = object.get_size();
3700        object.truncate(size - fs.block_size() as u64).await.expect("extend failed");
3701        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3702        assert_eq!(after, before - fs.block_size() as u64);
3703
3704        // preallocate_range...
3705        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3706        let before = after;
3707        let mut file_range = offset..offset + fs.block_size() as u64;
3708        object.preallocate_range(&mut transaction, &mut file_range).await.expect("extend failed");
3709        transaction.commit().await.expect("commit failed");
3710        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3711        assert_eq!(after, before + fs.block_size() as u64);
3712        fs.close().await.expect("Close failed");
3713    }
3714
3715    #[fuchsia::test(threads = 10)]
3716    async fn test_zero() {
3717        let (fs, object) = test_filesystem_and_object().await;
3718        let expected_size = object.get_size();
3719        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3720        object.zero(&mut transaction, 0..fs.block_size() as u64 * 10).await.expect("zero failed");
3721        transaction.commit().await.expect("commit failed");
3722        assert_eq!(object.get_size(), expected_size);
3723        let mut buf = object.allocate_buffer(fs.block_size() as usize * 10).await;
3724        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed") as u64, expected_size);
3725        assert_eq!(
3726            &buf.as_slice()[0..expected_size as usize],
3727            vec![0u8; expected_size as usize].as_slice()
3728        );
3729        fs.close().await.expect("Close failed");
3730    }
3731
3732    #[fuchsia::test]
3733    async fn test_properties() {
3734        let (fs, object) = test_filesystem_and_object().await;
3735        const CRTIME: Timestamp = Timestamp::from_nanos(1234);
3736        const MTIME: Timestamp = Timestamp::from_nanos(5678);
3737        const CTIME: Timestamp = Timestamp::from_nanos(8765);
3738
3739        // ObjectProperties can be updated through `update_attributes`.
3740        // `get_properties` should reflect the latest changes.
3741        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3742        object
3743            .update_attributes(
3744                &mut transaction,
3745                Some(&fio::MutableNodeAttributes {
3746                    creation_time: Some(CRTIME.as_nanos()),
3747                    modification_time: Some(MTIME.as_nanos()),
3748                    mode: Some(111),
3749                    gid: Some(222),
3750                    ..Default::default()
3751                }),
3752                None,
3753            )
3754            .await
3755            .expect("update_attributes failed");
3756        const MTIME_NEW: Timestamp = Timestamp::from_nanos(12345678);
3757        object
3758            .update_attributes(
3759                &mut transaction,
3760                Some(&fio::MutableNodeAttributes {
3761                    modification_time: Some(MTIME_NEW.as_nanos()),
3762                    gid: Some(333),
3763                    rdev: Some(444),
3764                    ..Default::default()
3765                }),
3766                Some(CTIME),
3767            )
3768            .await
3769            .expect("update_timestamps failed");
3770        transaction.commit().await.expect("commit failed");
3771
3772        let properties = object.get_properties().await.expect("get_properties failed");
3773        assert_matches!(
3774            properties,
3775            ObjectProperties {
3776                refs: 1u64,
3777                allocated_size: TEST_OBJECT_ALLOCATED_SIZE,
3778                data_attribute_size: TEST_OBJECT_SIZE,
3779                creation_time: CRTIME,
3780                modification_time: MTIME_NEW,
3781                posix_attributes: Some(PosixAttributes { mode: 111, gid: 333, rdev: 444, .. }),
3782                change_time: CTIME,
3783                ..
3784            }
3785        );
3786        fs.close().await.expect("Close failed");
3787    }
3788
3789    #[fuchsia::test]
3790    async fn test_is_allocated() {
3791        let (fs, object) = test_filesystem_and_object().await;
3792
3793        // `test_filesystem_and_object()` wrote the buffer `TEST_DATA` to the device at offset
3794        // `TEST_DATA_OFFSET` where the length and offset are aligned to the block size.
3795        let aligned_offset = round_down(TEST_DATA_OFFSET, fs.block_size());
3796        let aligned_length = round_up(TEST_DATA.len() as u64, fs.block_size()).unwrap();
3797
3798        // Check for the case where where we have the following extent layout
3799        //       [ unallocated ][ `TEST_DATA` ]
3800        // The extents before `aligned_offset` should not be allocated
3801        let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3802        assert_eq!(count, aligned_offset);
3803        assert_eq!(allocated, false);
3804
3805        let (allocated, count) =
3806            object.is_allocated(aligned_offset).await.expect("is_allocated failed");
3807        assert_eq!(count, aligned_length);
3808        assert_eq!(allocated, true);
3809
3810        // Check for the case where where we query out of range
3811        let end = aligned_offset + aligned_length;
3812        object
3813            .is_allocated(end)
3814            .await
3815            .expect_err("is_allocated should have returned ERR_OUT_OF_RANGE");
3816
3817        // Check for the case where where we start querying for allocation starting from
3818        // an allocated range to the end of the device
3819        let size = 50 * fs.block_size() as u64;
3820        object.truncate(size).await.expect("extend failed");
3821
3822        let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3823        assert_eq!(count, size - end);
3824        assert_eq!(allocated, false);
3825
3826        // Check for the case where where we have the following extent layout
3827        //      [ unallocated ][ `buf` ][ `buf` ]
3828        let buf_length = 5 * fs.block_size();
3829        let mut buf = object.allocate_buffer(buf_length as usize).await;
3830        buf.as_mut_slice().fill(123);
3831        let new_offset = end + 20 * fs.block_size() as u64;
3832        object.write_or_append(Some(new_offset), buf.as_ref()).await.expect("write failed");
3833        object
3834            .write_or_append(Some(new_offset + buf_length), buf.as_ref())
3835            .await
3836            .expect("write failed");
3837
3838        let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3839        assert_eq!(count, new_offset - end);
3840        assert_eq!(allocated, false);
3841
3842        let (allocated, count) =
3843            object.is_allocated(new_offset).await.expect("is_allocated failed");
3844        assert_eq!(count, 2 * buf_length);
3845        assert_eq!(allocated, true);
3846
3847        // Check the case where we query from the middle of an extent
3848        let (allocated, count) = object
3849            .is_allocated(new_offset + 4 * fs.block_size())
3850            .await
3851            .expect("is_allocated failed");
3852        assert_eq!(count, 2 * buf_length - 4 * fs.block_size());
3853        assert_eq!(allocated, true);
3854
3855        // Now, write buffer to a location already written to.
3856        // Check for the case when we the following extent layout
3857        //      [ unallocated ][ `other_buf` ][ (part of) `buf` ][ `buf` ]
3858        let other_buf_length = 3 * fs.block_size();
3859        let mut other_buf = object.allocate_buffer(other_buf_length as usize).await;
3860        other_buf.as_mut_slice().fill(231);
3861        object.write_or_append(Some(new_offset), other_buf.as_ref()).await.expect("write failed");
3862
3863        // We still expect that `is_allocated(..)` will return that  there are 2*`buf_length bytes`
3864        // allocated from `new_offset`
3865        let (allocated, count) =
3866            object.is_allocated(new_offset).await.expect("is_allocated failed");
3867        assert_eq!(count, 2 * buf_length);
3868        assert_eq!(allocated, true);
3869
3870        // Check for the case when we the following extent layout
3871        //   [ unallocated ][ deleted ][ unallocated ][ deleted ][ allocated ]
3872        // Mark TEST_DATA as deleted
3873        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3874        object
3875            .zero(&mut transaction, aligned_offset..aligned_offset + aligned_length)
3876            .await
3877            .expect("zero failed");
3878        // Mark `other_buf` as deleted
3879        object
3880            .zero(&mut transaction, new_offset..new_offset + buf_length)
3881            .await
3882            .expect("zero failed");
3883        transaction.commit().await.expect("commit transaction failed");
3884
3885        let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3886        assert_eq!(count, new_offset + buf_length);
3887        assert_eq!(allocated, false);
3888
3889        let (allocated, count) =
3890            object.is_allocated(new_offset + buf_length).await.expect("is_allocated failed");
3891        assert_eq!(count, buf_length);
3892        assert_eq!(allocated, true);
3893
3894        let new_end = new_offset + buf_length + count;
3895
3896        // Check for the case where there are objects with different keys.
3897        // Case that we're checking for:
3898        //      [ unallocated ][ extent (object with different key) ][ unallocated ]
3899        let store = object.owner();
3900        let mut transaction = fs
3901            .clone()
3902            .new_transaction(lock_keys![], Options::default())
3903            .await
3904            .expect("new_transaction failed");
3905        let object2 =
3906            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3907                .await
3908                .expect("create_object failed");
3909        transaction.commit().await.expect("commit failed");
3910
3911        object2
3912            .write_or_append(Some(new_end + fs.block_size()), buf.as_ref())
3913            .await
3914            .expect("write failed");
3915
3916        // Expecting that the extent with a different key is treated like unallocated extent
3917        let (allocated, count) = object.is_allocated(new_end).await.expect("is_allocated failed");
3918        assert_eq!(count, size - new_end);
3919        assert_eq!(allocated, false);
3920
3921        fs.close().await.expect("close failed");
3922    }
3923
3924    #[fuchsia::test(threads = 10)]
3925    async fn test_read_write_attr() {
3926        let (_fs, object) = test_filesystem_and_object().await;
3927        let data = [0xffu8; 16_384];
3928        object.write_attr(20, &data).await.expect("write_attr failed");
3929        let rdata =
3930            object.read_attr(20).await.expect("read_attr failed").expect("no attribute data found");
3931        assert_eq!(&data[..], &rdata[..]);
3932
3933        assert_eq!(object.read_attr(21).await.expect("read_attr failed"), None);
3934    }
3935
3936    #[fuchsia::test(threads = 10)]
3937    async fn test_allocate_basic() {
3938        let (fs, object) = test_filesystem_and_empty_object().await;
3939        let block_size = fs.block_size();
3940        let file_size = block_size * 10;
3941        object.truncate(file_size).await.unwrap();
3942
3943        let small_buf_size = 1024;
3944        let large_buf_aligned_size = block_size as usize * 2;
3945        let large_buf_size = block_size as usize * 2 + 1024;
3946
3947        let mut small_buf = object.allocate_buffer(small_buf_size).await;
3948        let mut large_buf_aligned = object.allocate_buffer(large_buf_aligned_size).await;
3949        let mut large_buf = object.allocate_buffer(large_buf_size).await;
3950
3951        assert_eq!(object.read(0, small_buf.as_mut()).await.unwrap(), small_buf_size);
3952        assert_eq!(small_buf.as_slice(), &vec![0; small_buf_size]);
3953        assert_eq!(object.read(0, large_buf.as_mut()).await.unwrap(), large_buf_size);
3954        assert_eq!(large_buf.as_slice(), &vec![0; large_buf_size]);
3955        assert_eq!(
3956            object.read(0, large_buf_aligned.as_mut()).await.unwrap(),
3957            large_buf_aligned_size
3958        );
3959        assert_eq!(large_buf_aligned.as_slice(), &vec![0; large_buf_aligned_size]);
3960
3961        // Allocation succeeds, and without any writes to the location it shows up as zero.
3962        object.allocate(block_size..block_size * 3).await.unwrap();
3963
3964        // Test starting before, inside, and after the allocated section with every sized buffer.
3965        for (buf_index, buf) in [small_buf, large_buf, large_buf_aligned].iter_mut().enumerate() {
3966            for offset in 0..4 {
3967                assert_eq!(
3968                    object.read(block_size * offset, buf.as_mut()).await.unwrap(),
3969                    buf.len(),
3970                    "buf_index: {}, read offset: {}",
3971                    buf_index,
3972                    offset,
3973                );
3974                assert_eq!(
3975                    buf.as_slice(),
3976                    &vec![0; buf.len()],
3977                    "buf_index: {}, read offset: {}",
3978                    buf_index,
3979                    offset,
3980                );
3981            }
3982        }
3983
3984        fs.close().await.expect("close failed");
3985    }
3986
3987    #[fuchsia::test(threads = 10)]
3988    async fn test_allocate_extends_file() {
3989        const BUF_SIZE: usize = 1024;
3990        let (fs, object) = test_filesystem_and_empty_object().await;
3991        let mut buf = object.allocate_buffer(BUF_SIZE).await;
3992        let block_size = fs.block_size();
3993
3994        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
3995        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3996
3997        assert!(TEST_OBJECT_SIZE < block_size * 4);
3998        // Allocation succeeds, and without any writes to the location it shows up as zero.
3999        object.allocate(0..block_size * 4).await.unwrap();
4000        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4001        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4002        assert_eq!(object.read(block_size, buf.as_mut()).await.unwrap(), buf.len());
4003        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4004        assert_eq!(object.read(block_size * 3, buf.as_mut()).await.unwrap(), buf.len());
4005        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4006
4007        fs.close().await.expect("close failed");
4008    }
4009
4010    #[fuchsia::test(threads = 10)]
4011    async fn test_allocate_past_end() {
4012        const BUF_SIZE: usize = 1024;
4013        let (fs, object) = test_filesystem_and_empty_object().await;
4014        let mut buf = object.allocate_buffer(BUF_SIZE).await;
4015        let block_size = fs.block_size();
4016
4017        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4018        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4019
4020        assert!(TEST_OBJECT_SIZE < block_size * 4);
4021        // Allocation succeeds, and without any writes to the location it shows up as zero.
4022        object.allocate(block_size * 4..block_size * 6).await.unwrap();
4023        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4024        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4025        assert_eq!(object.read(block_size * 4, buf.as_mut()).await.unwrap(), buf.len());
4026        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4027        assert_eq!(object.read(block_size * 5, buf.as_mut()).await.unwrap(), buf.len());
4028        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4029
4030        fs.close().await.expect("close failed");
4031    }
4032
4033    #[fuchsia::test(threads = 10)]
4034    async fn test_allocate_read_attr() {
4035        let (fs, object) = test_filesystem_and_empty_object().await;
4036        let block_size = fs.block_size();
4037        let file_size = block_size * 4;
4038        object.truncate(file_size).await.unwrap();
4039
4040        let content = object
4041            .read_attr(object.attribute_id())
4042            .await
4043            .expect("failed to read attr")
4044            .expect("attr returned none");
4045        assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
4046
4047        object.allocate(block_size..block_size * 3).await.unwrap();
4048
4049        let content = object
4050            .read_attr(object.attribute_id())
4051            .await
4052            .expect("failed to read attr")
4053            .expect("attr returned none");
4054        assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
4055
4056        fs.close().await.expect("close failed");
4057    }
4058
4059    #[fuchsia::test(threads = 10)]
4060    async fn test_allocate_existing_data() {
4061        struct Case {
4062            written_ranges: Vec<Range<usize>>,
4063            allocate_range: Range<u64>,
4064        }
4065        let cases = [
4066            Case { written_ranges: vec![4..7], allocate_range: 4..7 },
4067            Case { written_ranges: vec![4..7], allocate_range: 3..8 },
4068            Case { written_ranges: vec![4..7], allocate_range: 5..6 },
4069            Case { written_ranges: vec![4..7], allocate_range: 5..8 },
4070            Case { written_ranges: vec![4..7], allocate_range: 3..5 },
4071            Case { written_ranges: vec![0..1, 2..3, 4..5, 6..7, 8..9], allocate_range: 0..10 },
4072            Case { written_ranges: vec![0..2, 4..6, 7..10], allocate_range: 1..8 },
4073        ];
4074
4075        for case in cases {
4076            let (fs, object) = test_filesystem_and_empty_object().await;
4077            let block_size = fs.block_size();
4078            let file_size = block_size * 10;
4079            object.truncate(file_size).await.unwrap();
4080
4081            for write in &case.written_ranges {
4082                let write_len = (write.end - write.start) * block_size as usize;
4083                let mut write_buf = object.allocate_buffer(write_len).await;
4084                write_buf.as_mut_slice().fill(0xff);
4085                assert_eq!(
4086                    object
4087                        .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
4088                        .await
4089                        .unwrap(),
4090                    file_size
4091                );
4092            }
4093
4094            let mut expected_buf = object.allocate_buffer(file_size as usize).await;
4095            assert_eq!(object.read(0, expected_buf.as_mut()).await.unwrap(), expected_buf.len());
4096
4097            object
4098                .allocate(
4099                    case.allocate_range.start * block_size..case.allocate_range.end * block_size,
4100                )
4101                .await
4102                .unwrap();
4103
4104            let mut read_buf = object.allocate_buffer(file_size as usize).await;
4105            assert_eq!(object.read(0, read_buf.as_mut()).await.unwrap(), read_buf.len());
4106            assert_eq!(read_buf.as_slice(), expected_buf.as_slice());
4107
4108            fs.close().await.expect("close failed");
4109        }
4110    }
4111
4112    async fn get_modes(
4113        obj: &DataObjectHandle<ObjectStore>,
4114        mut search_range: Range<u64>,
4115    ) -> Vec<(Range<u64>, ExtentMode)> {
4116        let mut modes = Vec::new();
4117        let store = obj.store();
4118        let tree = store.tree();
4119        let layer_set = tree.layer_set();
4120        let mut merger = layer_set.merger();
4121        let mut iter = merger
4122            .query(Query::FullRange(&ObjectKey::attribute(
4123                obj.object_id(),
4124                0,
4125                AttributeKey::Extent(ExtentKey::search_key_from_offset(search_range.start)),
4126            )))
4127            .await
4128            .unwrap();
4129        loop {
4130            match iter.get() {
4131                Some(ItemRef {
4132                    key:
4133                        ObjectKey {
4134                            object_id,
4135                            data:
4136                                ObjectKeyData::Attribute(
4137                                    attribute_id,
4138                                    AttributeKey::Extent(ExtentKey { range }),
4139                                ),
4140                        },
4141                    value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
4142                    ..
4143                }) if *object_id == obj.object_id() && *attribute_id == 0 => {
4144                    if search_range.end <= range.start {
4145                        break;
4146                    }
4147                    let found_range = std::cmp::max(search_range.start, range.start)
4148                        ..std::cmp::min(search_range.end, range.end);
4149                    search_range.start = found_range.end;
4150                    modes.push((found_range, mode.clone()));
4151                    if search_range.start == search_range.end {
4152                        break;
4153                    }
4154                    iter.advance().await.unwrap();
4155                }
4156                x => panic!("looking for extent record, found this {:?}", x),
4157            }
4158        }
4159        modes
4160    }
4161
4162    async fn assert_all_overwrite(
4163        obj: &DataObjectHandle<ObjectStore>,
4164        mut search_range: Range<u64>,
4165    ) {
4166        let modes = get_modes(obj, search_range.clone()).await;
4167        for mode in modes {
4168            assert_eq!(
4169                mode.0.start, search_range.start,
4170                "missing mode in range {}..{}",
4171                search_range.start, mode.0.start
4172            );
4173            match mode.1 {
4174                ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => (),
4175                m => panic!("mode at range {:?} was not overwrite, instead found {:?}", mode.0, m),
4176            }
4177            assert!(
4178                mode.0.end <= search_range.end,
4179                "mode ends beyond search range (bug in test) - search_range: {:?}, mode: {:?}",
4180                search_range,
4181                mode,
4182            );
4183            search_range.start = mode.0.end;
4184        }
4185        assert_eq!(
4186            search_range.start, search_range.end,
4187            "missing mode in range {:?}",
4188            search_range
4189        );
4190    }
4191
4192    #[fuchsia::test(threads = 10)]
4193    async fn test_multi_overwrite() {
4194        #[derive(Debug)]
4195        struct Case {
4196            pre_writes: Vec<Range<usize>>,
4197            allocate_ranges: Vec<Range<u64>>,
4198            overwrites: Vec<Vec<Range<u64>>>,
4199        }
4200        let cases = [
4201            Case {
4202                pre_writes: Vec::new(),
4203                allocate_ranges: vec![1..3],
4204                overwrites: vec![vec![1..3]],
4205            },
4206            Case {
4207                pre_writes: Vec::new(),
4208                allocate_ranges: vec![0..1, 1..2, 2..3, 3..4],
4209                overwrites: vec![vec![0..4]],
4210            },
4211            Case {
4212                pre_writes: Vec::new(),
4213                allocate_ranges: vec![0..4],
4214                overwrites: vec![vec![0..1], vec![1..2], vec![3..4]],
4215            },
4216            Case {
4217                pre_writes: Vec::new(),
4218                allocate_ranges: vec![0..4],
4219                overwrites: vec![vec![3..4]],
4220            },
4221            Case {
4222                pre_writes: Vec::new(),
4223                allocate_ranges: vec![0..4],
4224                overwrites: vec![vec![3..4], vec![2..3], vec![1..2]],
4225            },
4226            Case {
4227                pre_writes: Vec::new(),
4228                allocate_ranges: vec![1..2, 5..6, 7..8],
4229                overwrites: vec![vec![5..6]],
4230            },
4231            Case {
4232                pre_writes: Vec::new(),
4233                allocate_ranges: vec![1..3],
4234                overwrites: vec![
4235                    vec![1..3],
4236                    vec![1..3],
4237                    vec![1..3],
4238                    vec![1..3],
4239                    vec![1..3],
4240                    vec![1..3],
4241                    vec![1..3],
4242                    vec![1..3],
4243                ],
4244            },
4245            Case {
4246                pre_writes: Vec::new(),
4247                allocate_ranges: vec![0..5],
4248                overwrites: vec![
4249                    vec![1..3],
4250                    vec![1..3],
4251                    vec![1..3],
4252                    vec![1..3],
4253                    vec![1..3],
4254                    vec![1..3],
4255                    vec![1..3],
4256                    vec![1..3],
4257                ],
4258            },
4259            Case {
4260                pre_writes: Vec::new(),
4261                allocate_ranges: vec![0..5],
4262                overwrites: vec![vec![0..2, 2..4, 4..5]],
4263            },
4264            Case {
4265                pre_writes: Vec::new(),
4266                allocate_ranges: vec![0..5, 5..10],
4267                overwrites: vec![vec![1..2, 2..3, 4..7, 7..8]],
4268            },
4269            Case {
4270                pre_writes: Vec::new(),
4271                allocate_ranges: vec![0..4, 6..10],
4272                overwrites: vec![vec![2..3, 7..9]],
4273            },
4274            Case {
4275                pre_writes: Vec::new(),
4276                allocate_ranges: vec![0..10],
4277                overwrites: vec![vec![1..2, 5..10], vec![0..1, 5..10], vec![0..5, 5..10]],
4278            },
4279            Case {
4280                pre_writes: Vec::new(),
4281                allocate_ranges: vec![0..10],
4282                overwrites: vec![vec![0..2, 2..4, 4..6, 6..8, 8..10], vec![0..5, 5..10]],
4283            },
4284            Case {
4285                pre_writes: vec![1..3],
4286                allocate_ranges: vec![1..3],
4287                overwrites: vec![vec![1..3]],
4288            },
4289            Case {
4290                pre_writes: vec![1..3],
4291                allocate_ranges: vec![4..6],
4292                overwrites: vec![vec![5..6]],
4293            },
4294            Case {
4295                pre_writes: vec![1..3],
4296                allocate_ranges: vec![0..4],
4297                overwrites: vec![vec![0..4]],
4298            },
4299            Case {
4300                pre_writes: vec![1..3],
4301                allocate_ranges: vec![2..4],
4302                overwrites: vec![vec![2..4]],
4303            },
4304            Case {
4305                pre_writes: vec![3..5],
4306                allocate_ranges: vec![1..3, 6..7],
4307                overwrites: vec![vec![1..3, 6..7]],
4308            },
4309            Case {
4310                pre_writes: vec![1..3, 5..7, 8..9],
4311                allocate_ranges: vec![0..5],
4312                overwrites: vec![vec![0..2, 2..5], vec![0..5]],
4313            },
4314            Case {
4315                pre_writes: Vec::new(),
4316                allocate_ranges: vec![0..10, 4..6],
4317                overwrites: Vec::new(),
4318            },
4319            Case {
4320                pre_writes: Vec::new(),
4321                allocate_ranges: vec![3..8, 5..10],
4322                overwrites: Vec::new(),
4323            },
4324            Case {
4325                pre_writes: Vec::new(),
4326                allocate_ranges: vec![5..10, 3..8],
4327                overwrites: Vec::new(),
4328            },
4329        ];
4330
4331        for (i, case) in cases.into_iter().enumerate() {
4332            log::info!("running case {} - {:?}", i, case);
4333            let (fs, object) = test_filesystem_and_empty_object().await;
4334            let block_size = fs.block_size();
4335            let file_size = block_size * 10;
4336            object.truncate(file_size).await.unwrap();
4337
4338            for write in case.pre_writes {
4339                let write_len = (write.end - write.start) * block_size as usize;
4340                let mut write_buf = object.allocate_buffer(write_len).await;
4341                write_buf.as_mut_slice().fill(0xff);
4342                assert_eq!(
4343                    object
4344                        .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
4345                        .await
4346                        .unwrap(),
4347                    file_size
4348                );
4349            }
4350
4351            for allocate_range in &case.allocate_ranges {
4352                object
4353                    .allocate(allocate_range.start * block_size..allocate_range.end * block_size)
4354                    .await
4355                    .unwrap();
4356            }
4357
4358            for allocate_range in case.allocate_ranges {
4359                assert_all_overwrite(
4360                    &object,
4361                    allocate_range.start * block_size..allocate_range.end * block_size,
4362                )
4363                .await;
4364            }
4365
4366            for overwrite in case.overwrites {
4367                let mut write_len = 0;
4368                let overwrite = overwrite
4369                    .into_iter()
4370                    .map(|r| {
4371                        write_len += (r.end - r.start) * block_size;
4372                        r.start * block_size..r.end * block_size
4373                    })
4374                    .collect::<Vec<_>>();
4375                let mut write_buf = object.allocate_buffer(write_len as usize).await;
4376                let data = (0..20).cycle().take(write_len as usize).collect::<Vec<_>>();
4377                write_buf.as_mut_slice().copy_from_slice(&data);
4378
4379                let mut expected_buf = object.allocate_buffer(file_size as usize).await;
4380                assert_eq!(
4381                    object.read(0, expected_buf.as_mut()).await.unwrap(),
4382                    expected_buf.len()
4383                );
4384                let expected_buf_slice = expected_buf.as_mut_slice();
4385                let mut data_slice = data.as_slice();
4386                for r in &overwrite {
4387                    let len = r.length().unwrap() as usize;
4388                    let (copy_from, rest) = data_slice.split_at(len);
4389                    expected_buf_slice[r.start as usize..r.end as usize]
4390                        .copy_from_slice(&copy_from);
4391                    data_slice = rest;
4392                }
4393
4394                let mut transaction = object.new_transaction().await.unwrap();
4395                object
4396                    .multi_overwrite(&mut transaction, 0, &overwrite, write_buf.as_mut())
4397                    .await
4398                    .unwrap_or_else(|_| panic!("multi_overwrite error on case {}", i));
4399                // Double check the emitted checksums. We should have one u64 checksum for every
4400                // block we wrote to disk.
4401                let mut checksummed_range_length = 0;
4402                let mut num_checksums = 0;
4403                for (device_range, checksums, _) in transaction.checksums() {
4404                    let range_len = device_range.end - device_range.start;
4405                    let checksums_len = checksums.len() as u64;
4406                    assert_eq!(range_len / checksums_len, block_size);
4407                    checksummed_range_length += range_len;
4408                    num_checksums += checksums_len;
4409                }
4410                assert_eq!(checksummed_range_length, write_len);
4411                assert_eq!(num_checksums, write_len / block_size);
4412                transaction.commit().await.unwrap();
4413
4414                let mut buf = object.allocate_buffer(file_size as usize).await;
4415                assert_eq!(
4416                    object.read(0, buf.as_mut()).await.unwrap(),
4417                    buf.len(),
4418                    "failed length check on case {}",
4419                    i,
4420                );
4421                assert_eq!(buf.as_slice(), expected_buf.as_slice(), "failed on case {}", i);
4422            }
4423
4424            fsck_volume(&fs, object.store().store_object_id(), None).await.expect("fsck failed");
4425            fs.close().await.expect("close failed");
4426        }
4427    }
4428
4429    #[fuchsia::test(threads = 10)]
4430    async fn test_multi_overwrite_mode_updates() {
4431        let (fs, object) = test_filesystem_and_empty_object().await;
4432        let block_size = fs.block_size();
4433        let file_size = block_size * 10;
4434        object.truncate(file_size).await.unwrap();
4435
4436        let mut expected_bitmap = BitVec::from_elem(10, false);
4437
4438        object.allocate(0..10 * block_size).await.unwrap();
4439        assert_eq!(
4440            get_modes(&object, 0..10 * block_size).await,
4441            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4442        );
4443
4444        let mut write_buf = object.allocate_buffer(2 * block_size as usize).await;
4445        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4446        write_buf.as_mut_slice().copy_from_slice(&data);
4447        let mut transaction = object.new_transaction().await.unwrap();
4448        object
4449            .multi_overwrite(
4450                &mut transaction,
4451                0,
4452                &[2 * block_size..4 * block_size],
4453                write_buf.as_mut(),
4454            )
4455            .await
4456            .unwrap();
4457        transaction.commit().await.unwrap();
4458
4459        expected_bitmap.set(2, true);
4460        expected_bitmap.set(3, true);
4461        assert_eq!(
4462            get_modes(&object, 0..10 * block_size).await,
4463            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4464        );
4465
4466        let mut write_buf = object.allocate_buffer(3 * block_size as usize).await;
4467        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4468        write_buf.as_mut_slice().copy_from_slice(&data);
4469        let mut transaction = object.new_transaction().await.unwrap();
4470        object
4471            .multi_overwrite(
4472                &mut transaction,
4473                0,
4474                &[3 * block_size..5 * block_size, 6 * block_size..7 * block_size],
4475                write_buf.as_mut(),
4476            )
4477            .await
4478            .unwrap();
4479        transaction.commit().await.unwrap();
4480
4481        expected_bitmap.set(4, true);
4482        expected_bitmap.set(6, true);
4483        assert_eq!(
4484            get_modes(&object, 0..10 * block_size).await,
4485            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4486        );
4487
4488        let mut write_buf = object.allocate_buffer(6 * block_size as usize).await;
4489        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4490        write_buf.as_mut_slice().copy_from_slice(&data);
4491        let mut transaction = object.new_transaction().await.unwrap();
4492        object
4493            .multi_overwrite(
4494                &mut transaction,
4495                0,
4496                &[
4497                    0..2 * block_size,
4498                    5 * block_size..6 * block_size,
4499                    7 * block_size..10 * block_size,
4500                ],
4501                write_buf.as_mut(),
4502            )
4503            .await
4504            .unwrap();
4505        transaction.commit().await.unwrap();
4506
4507        assert_eq!(
4508            get_modes(&object, 0..10 * block_size).await,
4509            vec![(0..10 * block_size, ExtentMode::Overwrite)]
4510        );
4511
4512        fs.close().await.expect("close failed");
4513    }
4514}