Skip to main content

fxfs/object_store/
data_object_handle.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::log::*;
7use crate::lsm_tree::Query;
8use crate::lsm_tree::types::{ItemRef, LayerIterator};
9use crate::object_handle::{
10    ObjectHandle, ObjectProperties, ReadObjectHandle, WriteBytes, WriteObjectHandle,
11};
12use crate::object_store::extent_record::{ExtentKey, ExtentMode, ExtentValue};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::object_record::{
15    AttributeKey, FsverityMetadata, ObjectAttributes, ObjectItem, ObjectKey, ObjectKeyData,
16    ObjectKind, ObjectValue, Timestamp,
17};
18use crate::object_store::store_object_handle::{MaybeChecksums, NeedsTrim};
19use crate::object_store::transaction::{
20    self, AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Operation, Options,
21    Transaction, lock_keys,
22};
23use crate::object_store::{
24    DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions, HandleOwner,
25    RootDigest, StoreObjectHandle, TRANSACTION_MUTATION_THRESHOLD, TrimMode, TrimResult,
26};
27use crate::range::RangeExt;
28use crate::round::{round_down, round_up};
29use anyhow::{Context, Error, anyhow, bail, ensure};
30use async_trait::async_trait;
31use fidl_fuchsia_io as fio;
32use fsverity_merkle::{
33    FsVerityDescriptor, FsVerityDescriptorRaw, FsVerityHasher, FsVerityHasherOptions, MerkleTree,
34    MerkleTreeBuilder,
35};
36use fuchsia_sync::Mutex;
37use futures::TryStreamExt;
38use futures::stream::FuturesUnordered;
39use fxfs_trace::trace;
40use std::cmp::min;
41use std::ops::{Deref, DerefMut, Range};
42use std::sync::Arc;
43use std::sync::atomic::{self, AtomicU64, Ordering};
44use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
45
46mod allocated_ranges;
47pub use allocated_ranges::{AllocatedRanges, RangeType};
48
49/// How much data each transaction will cover when writing an attribute across batches. Pulled from
50/// `FLUSH_BATCH_SIZE` in paged_object_handle.rs.
51pub const WRITE_ATTR_BATCH_SIZE: usize = 524_288;
52
53/// DataObjectHandle is a typed handle for file-like objects that store data in the default data
54/// attribute. In addition to traditional files, this means things like the journal, superblocks,
55/// and layer files.
56///
57/// It caches the content size of the data attribute it was configured for, and has helpers for
58/// complex extent manipulation, as well as implementations of ReadObjectHandle and
59/// WriteObjectHandle.
60pub struct DataObjectHandle<S: HandleOwner> {
61    handle: StoreObjectHandle<S>,
62    attribute_id: u64,
63    content_size: AtomicU64,
64    fsverity_state: Mutex<FsverityState>,
65    overwrite_ranges: AllocatedRanges,
66}
67
68/// Represents the mapping of a file's contents to the physical storage backing it.
69#[derive(Debug, Clone)]
70pub struct FileExtent {
71    logical_offset: u64,
72    device_range: Range<u64>,
73}
74
75impl FileExtent {
76    pub fn new(logical_offset: u64, device_range: Range<u64>) -> Result<Self, Error> {
77        // Ensure `device_range` is valid.
78        let length = device_range.length()?;
79        // Ensure no overflow when we calculate the end of the logical range.
80        let _ = logical_offset.checked_add(length).ok_or(FxfsError::OutOfRange)?;
81        Ok(Self { logical_offset, device_range })
82    }
83}
84
85impl FileExtent {
86    pub fn length(&self) -> u64 {
87        // SAFETY: We verified that the device_range's length is valid in Self::new.
88        unsafe { self.device_range.unchecked_length() }
89    }
90
91    pub fn logical_offset(&self) -> u64 {
92        self.logical_offset
93    }
94
95    pub fn logical_range(&self) -> Range<u64> {
96        // SAFETY: We verified logical_offset plus device_range length won't overflow in Self::new.
97        unsafe { self.logical_offset..self.logical_offset.unchecked_add(self.length()) }
98    }
99
100    pub fn device_range(&self) -> &Range<u64> {
101        &self.device_range
102    }
103}
104
105#[derive(Debug)]
106pub enum FsverityState {
107    None,
108    Started,
109    Pending(FsverityStateInner),
110    Some(FsverityStateInner),
111}
112
113#[derive(Debug)]
114pub struct FsverityStateInner {
115    root_digest: RootDigest,
116    salt: Vec<u8>,
117    // TODO(b/309656632): This should store the entire merkle tree and not just the leaf nodes.
118    // Potentially store a pager-backed vmo instead of passing around a boxed array.
119    merkle_tree: Box<[u8]>,
120}
121
122#[derive(Debug, Default)]
123pub struct OverwriteOptions {
124    // If false, then all the extents for the overwrite range must have been preallocated using
125    // preallocate_range or from existing writes.
126    pub allow_allocations: bool,
127    pub barrier_on_first_write: bool,
128}
129
130impl FsverityStateInner {
131    pub fn new(root_digest: RootDigest, salt: Vec<u8>, merkle_tree: Box<[u8]>) -> Self {
132        FsverityStateInner { root_digest, salt, merkle_tree }
133    }
134
135    fn get_hasher_for_block_size(&self, block_size: usize) -> FsVerityHasher {
136        match self.root_digest {
137            RootDigest::Sha256(_) => {
138                FsVerityHasher::Sha256(FsVerityHasherOptions::new(self.salt.clone(), block_size))
139            }
140            RootDigest::Sha512(_) => {
141                FsVerityHasher::Sha512(FsVerityHasherOptions::new(self.salt.clone(), block_size))
142            }
143        }
144    }
145
146    fn from_bytes(data: &[u8], block_size: usize) -> Result<(Self, FsVerityHasher), Error> {
147        let descriptor = FsVerityDescriptor::from_bytes(&data, block_size)
148            .map_err(|e| anyhow!(FxfsError::IntegrityError).context(e))?;
149
150        let root_digest = match descriptor.digest_algorithm() {
151            fio::HashAlgorithm::Sha256 => {
152                RootDigest::Sha256(descriptor.root_digest().try_into().unwrap())
153            }
154            fio::HashAlgorithm::Sha512 => RootDigest::Sha512(descriptor.root_digest().to_vec()),
155            _ => return Err(anyhow!(FxfsError::NotSupported).context("Unsupported hash algorithm")),
156        };
157        let hasher = descriptor.hasher();
158        let leaves =
159            descriptor.leaf_digests().map_err(|e| anyhow!(FxfsError::IntegrityError).context(e))?;
160
161        Ok((Self::new(root_digest, descriptor.salt().to_vec(), leaves.into()), hasher))
162    }
163}
164
165impl<S: HandleOwner> Deref for DataObjectHandle<S> {
166    type Target = StoreObjectHandle<S>;
167    fn deref(&self) -> &Self::Target {
168        &self.handle
169    }
170}
171
172impl<S: HandleOwner> DataObjectHandle<S> {
173    pub fn new(
174        owner: Arc<S>,
175        object_id: u64,
176        permanent_keys: bool,
177        attribute_id: u64,
178        size: u64,
179        fsverity_state: FsverityState,
180        options: HandleOptions,
181        trace: bool,
182        overwrite_ranges: &[Range<u64>],
183    ) -> Self {
184        Self {
185            handle: StoreObjectHandle::new(owner, object_id, permanent_keys, options, trace),
186            attribute_id,
187            content_size: AtomicU64::new(size),
188            fsverity_state: Mutex::new(fsverity_state),
189            overwrite_ranges: AllocatedRanges::new(overwrite_ranges),
190        }
191    }
192
193    pub fn attribute_id(&self) -> u64 {
194        self.attribute_id
195    }
196
197    pub fn overwrite_ranges(&self) -> &AllocatedRanges {
198        &self.overwrite_ranges
199    }
200
201    pub fn is_verified_file(&self) -> bool {
202        matches!(*self.fsverity_state.lock(), FsverityState::Some(_))
203    }
204
205    /// Sets `self.fsverity_state` to FsverityState::Started. Called at the top of `enable_verity`.
206    /// If another caller has already started but not completed `enabled_verity`, returns
207    /// FxfsError::AlreadyBound. If another caller has already completed `enable_verity`, returns
208    /// FxfsError::AlreadyExists.
209    pub fn set_fsverity_state_started(&self) -> Result<(), Error> {
210        let mut fsverity_guard = self.fsverity_state.lock();
211        match *fsverity_guard {
212            FsverityState::None => {
213                *fsverity_guard = FsverityState::Started;
214                Ok(())
215            }
216            FsverityState::Started | FsverityState::Pending(_) => {
217                Err(anyhow!(FxfsError::Unavailable))
218            }
219            FsverityState::Some(_) => Err(anyhow!(FxfsError::AlreadyExists)),
220        }
221    }
222
223    /// Sets `self.fsverity_state` to Pending. Must be called before `finalize_fsverity_state()`.
224    /// Asserts that the prior state of `self.fsverity_state` was `FsverityState::Started`.
225    pub fn set_fsverity_state_pending(&self, descriptor: FsverityStateInner) {
226        let mut fsverity_guard = self.fsverity_state.lock();
227        assert!(matches!(*fsverity_guard, FsverityState::Started));
228        *fsverity_guard = FsverityState::Pending(descriptor);
229    }
230
231    /// Sets `self.fsverity_state` to Some. Panics if the prior state of `self.fsverity_state` was
232    /// not `FsverityState::Pending(_)`.
233    pub fn finalize_fsverity_state(&self) {
234        let mut fsverity_state_guard = self.fsverity_state.lock();
235        let mut_fsverity_state = fsverity_state_guard.deref_mut();
236        let fsverity_state = std::mem::replace(mut_fsverity_state, FsverityState::None);
237        match fsverity_state {
238            FsverityState::None => panic!("Cannot go from FsverityState::None to Some"),
239            FsverityState::Started => panic!("Cannot go from FsverityState::Started to Some"),
240            FsverityState::Pending(inner) => *mut_fsverity_state = FsverityState::Some(inner),
241            FsverityState::Some(_) => panic!("Fsverity state was already set to Some"),
242        }
243        // Once we finalize the fsverity state, the file is permanently read-only. The in-memory
244        // overwrite ranges tracking is only used for writing, so we don't need them anymore. This
245        // leaves any uninitialized, but allocated, overwrite regions if there are any, rather than
246        // converting them back to sparse regions.
247        self.overwrite_ranges.clear();
248    }
249
250    /// Sets `self.fsverity_state` directly to Some without going through the entire state machine.
251    /// Used to set `self.fsverity_state` on open of a verified file. The merkle tree data is
252    /// verified against the root digest here, and will return an error if the tree is not correct.
253    pub async fn set_fsverity_state_some(&self, descriptor: FsverityMetadata) -> Result<(), Error> {
254        let (metadata, hasher) = match descriptor {
255            FsverityMetadata::Internal(root_digest, salt) => {
256                let merkle_tree = self
257                    .read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID)
258                    .await?
259                    .ok_or_else(|| anyhow!(FxfsError::Inconsistent))?;
260                let metadata = FsverityStateInner { root_digest, salt, merkle_tree };
261                let hasher = metadata.get_hasher_for_block_size(self.block_size() as usize);
262                (metadata, hasher)
263            }
264            FsverityMetadata::F2fs(verity_range) => {
265                let expected_length = verity_range.length()? as usize;
266                let mut buffer = self
267                    .allocate_buffer(expected_length.next_multiple_of(self.block_size() as usize))
268                    .await;
269                ensure!(
270                    expected_length
271                        == self
272                            .handle
273                            .read(FSVERITY_MERKLE_ATTRIBUTE_ID, verity_range.start, buffer.as_mut())
274                            .await?,
275                    FxfsError::Inconsistent
276                );
277                FsverityStateInner::from_bytes(
278                    buffer.as_slice()[0..expected_length].into(),
279                    self.block_size() as usize,
280                )?
281            }
282        };
283        // Validate the merkle tree data against the root before applying it.
284        ensure!(metadata.merkle_tree.len() % hasher.hash_size() == 0, FxfsError::Inconsistent);
285        let leaf_chunks = metadata.merkle_tree.chunks_exact(hasher.hash_size());
286        let mut builder = MerkleTreeBuilder::new(hasher);
287        for leaf in leaf_chunks {
288            builder.push_data_hash(leaf.to_vec());
289        }
290        let tree = builder.finish();
291        let root_hash = match &metadata.root_digest {
292            RootDigest::Sha256(root_hash) => root_hash.as_slice(),
293            RootDigest::Sha512(root_hash) => root_hash.as_slice(),
294        };
295
296        ensure!(root_hash == tree.root(), FxfsError::IntegrityError);
297
298        let mut fsverity_guard = self.fsverity_state.lock();
299        assert!(matches!(*fsverity_guard, FsverityState::None));
300        *fsverity_guard = FsverityState::Some(metadata);
301
302        Ok(())
303    }
304
305    /// Verifies contents of `buffer` against the corresponding hashes in the stored merkle tree.
306    /// `offset` is the logical offset in the file that `buffer` starts at. `offset` must be
307    /// block-aligned. Fails on non fsverity-enabled files.
308    fn verify_data(&self, mut offset: usize, buffer: &[u8]) -> Result<(), Error> {
309        let block_size = self.block_size() as usize;
310        assert!(offset % block_size == 0);
311        let fsverity_state = self.fsverity_state.lock();
312        match &*fsverity_state {
313            FsverityState::None => {
314                Err(anyhow!("Tried to verify read on a non verity-enabled file"))
315            }
316            FsverityState::Started | FsverityState::Pending(_) => Err(anyhow!(
317                "Enable verity has not yet completed, fsverity state: {:?}",
318                &*fsverity_state
319            )),
320            FsverityState::Some(metadata) => {
321                let hasher = metadata.get_hasher_for_block_size(block_size);
322                let leaf_nodes: Vec<&[u8]> =
323                    metadata.merkle_tree.chunks(hasher.hash_size()).collect();
324                fxfs_trace::duration!("fsverity-verify", "len" => buffer.len());
325                // TODO(b/318880297): Consider parallelizing computation.
326                for b in buffer.chunks(block_size) {
327                    ensure!(
328                        hasher.hash_block(b) == leaf_nodes[offset / block_size],
329                        anyhow!(FxfsError::Inconsistent).context("Hash mismatch")
330                    );
331                    offset += block_size;
332                }
333                Ok(())
334            }
335        }
336    }
337
338    /// Extend the file with the given extent.  The only use case for this right now is for files
339    /// that must exist at certain offsets on the device, such as super-blocks.
340    pub async fn extend<'a>(
341        &'a self,
342        transaction: &mut Transaction<'a>,
343        device_range: Range<u64>,
344    ) -> Result<(), Error> {
345        let old_end =
346            round_up(self.txn_get_size(transaction), self.block_size()).ok_or(FxfsError::TooBig)?;
347        let new_size = old_end + device_range.end - device_range.start;
348        self.store().allocator().mark_allocated(
349            transaction,
350            self.store().store_object_id(),
351            device_range.clone(),
352        )?;
353        self.txn_update_size(transaction, new_size, None).await?;
354        let key_id = self.get_key(None).await?.0;
355        transaction.add(
356            self.store().store_object_id,
357            Mutation::merge_object(
358                ObjectKey::extent(self.object_id(), self.attribute_id(), old_end..new_size),
359                ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
360            ),
361        );
362        self.update_allocated_size(transaction, device_range.end - device_range.start, 0).await
363    }
364
365    // Returns a new aligned buffer (reading the head and tail blocks if necessary) with a copy of
366    // the data from `buf`.
367    async fn align_buffer(
368        &self,
369        offset: u64,
370        buf: BufferRef<'_>,
371    ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
372        self.handle.align_buffer(self.attribute_id(), offset, buf).await
373    }
374
375    // Writes potentially unaligned data at `device_offset` and returns checksums if requested. The
376    // data will be encrypted if necessary.
377    // `buf` is mutable as an optimization, since the write may require encryption, we can encrypt
378    // the buffer in-place rather than copying to another buffer if the write is already aligned.
379    async fn write_at(
380        &self,
381        offset: u64,
382        buf: MutableBufferRef<'_>,
383        device_offset: u64,
384    ) -> Result<MaybeChecksums, Error> {
385        self.handle.write_at(self.attribute_id(), offset, buf, None, device_offset).await
386    }
387
388    /// Verifies that the entire range in the file is zeroes, as either uninitialized overwrite
389    /// range, or no extent at all. If a single allocated and written extent is found, this returns
390    /// false.
391    pub async fn check_unwritten_zero(&self, range: Range<u64>) -> Result<bool, Error> {
392        let tree = &self.store().tree();
393        let layer_set = tree.layer_set();
394        let key = ExtentKey { range };
395        let lower_bound = ObjectKey::attribute(
396            self.object_id(),
397            self.attribute_id,
398            AttributeKey::Extent(key.search_key()),
399        );
400        let mut merger = layer_set.merger();
401        let mut iter = merger.query(Query::FullRange(&lower_bound)).await?;
402        while let Some(ItemRef {
403            key:
404                ObjectKey {
405                    object_id,
406                    data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
407                },
408            value: ObjectValue::Extent(value),
409            ..
410        }) = iter.get()
411            && *object_id == self.object_id()
412            && *attr_id == self.attribute_id
413        {
414            if let ExtentValue::Some { mode, .. } = value {
415                if let Some(overlap) = key.overlap(extent_key) {
416                    if let ExtentMode::OverwritePartial(bits) = mode {
417                        let starting_index =
418                            (overlap.start - extent_key.range.start) / self.block_size();
419                        for initialized in bits
420                            .iter()
421                            .skip(starting_index as usize)
422                            .take((overlap.length().unwrap() / self.block_size()) as usize)
423                        {
424                            if initialized {
425                                return Ok(false);
426                            }
427                        }
428                    } else {
429                        return Ok(false);
430                    }
431                } else {
432                    break;
433                }
434            }
435            iter.advance().await?;
436        }
437        Ok(true)
438    }
439
440    /// Zeroes the given range.  The range must be aligned.  Returns the amount of data deallocated.
441    pub async fn zero(
442        &self,
443        transaction: &mut Transaction<'_>,
444        range: Range<u64>,
445    ) -> Result<(), Error> {
446        self.handle.zero(transaction, self.attribute_id(), range).await
447    }
448
449    /// The cached value for `self.fsverity_state` is set either in `open_object` or on
450    /// `enable_verity`. If set, translates `self.fsverity_state.descriptor` into an
451    /// fio::VerificationOptions instance and a root hash. Otherwise, returns None.
452    pub fn get_descriptor(&self) -> Option<(fio::VerificationOptions, Vec<u8>)> {
453        let fsverity_state = self.fsverity_state.lock();
454        match &*fsverity_state {
455            FsverityState::Some(metadata) => {
456                let (options, root_hash) = match &metadata.root_digest {
457                    RootDigest::Sha256(root_hash) => (
458                        fio::VerificationOptions {
459                            hash_algorithm: Some(fio::HashAlgorithm::Sha256),
460                            salt: Some(metadata.salt.clone()),
461                            ..Default::default()
462                        },
463                        root_hash.to_vec(),
464                    ),
465                    RootDigest::Sha512(root_hash) => (
466                        fio::VerificationOptions {
467                            hash_algorithm: Some(fio::HashAlgorithm::Sha512),
468                            salt: Some(metadata.salt.clone()),
469                            ..Default::default()
470                        },
471                        root_hash.clone(),
472                    ),
473                };
474                Some((options, root_hash))
475            }
476            _ => None,
477        }
478    }
479
480    async fn build_verity_tree(
481        &self,
482        hasher: FsVerityHasher,
483        hash_alg: fio::HashAlgorithm,
484        salt: &[u8],
485    ) -> Result<(MerkleTree, Vec<u8>), Error> {
486        let hash_len = hasher.hash_size();
487        let mut builder = MerkleTreeBuilder::new(hasher);
488        let mut offset = 0;
489        let size = self.get_size();
490        // TODO(b/314836822): Consider further tuning the buffer size to optimize
491        // performance. Experimentally, most verity-enabled files are <256K.
492        let mut buf = self.allocate_buffer(64 * self.block_size() as usize).await;
493        while offset < size {
494            // TODO(b/314842875): Consider optimizations for sparse files.
495            let read = self.read(offset, buf.as_mut()).await? as u64;
496            assert!(offset + read <= size);
497            builder.write(&buf.as_slice()[0..read as usize]);
498            offset += read;
499        }
500        let tree = builder.finish();
501        // This will include a block for the root layer, which will be used to house the descriptor.
502        let tree_data_len = tree
503            .as_ref()
504            .iter()
505            .map(|layer| (layer.len() * hash_len).next_multiple_of(self.block_size() as usize))
506            .sum();
507        let mut merkle_tree_data = Vec::<u8>::with_capacity(tree_data_len);
508        // Iterating from the top layers down to the leaves.
509        for layer in tree.as_ref().iter().rev() {
510            // Skip the root layer.
511            if layer.len() <= 1 {
512                continue;
513            }
514            merkle_tree_data.extend(layer.iter().flatten());
515            // Pad to the end of the block.
516            let padded_size = merkle_tree_data.len().next_multiple_of(self.block_size() as usize);
517            merkle_tree_data.resize(padded_size, 0);
518        }
519
520        // Zero the last block, then write the descriptor to the start of it.
521        let descriptor_offset = merkle_tree_data.len();
522        merkle_tree_data.resize(descriptor_offset + self.block_size() as usize, 0);
523        let descriptor = FsVerityDescriptorRaw::new(
524            hash_alg,
525            self.block_size(),
526            self.get_size(),
527            tree.root(),
528            salt,
529        )?;
530        descriptor.write_to_slice(&mut merkle_tree_data[descriptor_offset..])?;
531
532        Ok((tree, merkle_tree_data))
533    }
534
535    /// Reads the data attribute and computes a merkle tree from the data. The values of the
536    /// parameters required to build the merkle tree are supplied by `descriptor` (i.e. salt,
537    /// hash_algorithm, etc.) Writes the leaf nodes of the merkle tree to an attribute with id
538    /// `FSVERITY_MERKLE_ATTRIBUTE_ID`. Updates the root_hash of the `descriptor` according to the
539    /// computed merkle tree and then replaces the ObjectValue of the data attribute with
540    /// ObjectValue::VerifiedAttribute, which stores the `descriptor` inline.
541    #[trace]
542    pub async fn enable_verity(&self, options: fio::VerificationOptions) -> Result<(), Error> {
543        self.set_fsverity_state_started()?;
544        // If the merkle attribute was tombstoned in the last attempt of `enable_verity`, flushing
545        // the graveyard should process the tombstone before we start rewriting the attribute.
546        if let Some(_) = self
547            .store()
548            .tree()
549            .find(&ObjectKey::graveyard_attribute_entry(
550                self.store().graveyard_directory_object_id(),
551                self.object_id(),
552                FSVERITY_MERKLE_ATTRIBUTE_ID,
553            ))
554            .await?
555        {
556            self.store().filesystem().graveyard().flush().await;
557        }
558        let mut transaction = self.new_transaction().await?;
559        let hash_alg =
560            options.hash_algorithm.ok_or_else(|| anyhow!("No hash algorithm provided"))?;
561        let salt = options.salt.ok_or_else(|| anyhow!("No salt provided"))?;
562        let (root_digest, merkle_tree) = match hash_alg {
563            fio::HashAlgorithm::Sha256 => {
564                let hasher = FsVerityHasher::Sha256(FsVerityHasherOptions::new(
565                    salt.clone(),
566                    self.block_size() as usize,
567                ));
568                let (tree, merkle_tree_data) =
569                    self.build_verity_tree(hasher, hash_alg.clone(), &salt).await?;
570                let root: [u8; 32] = tree.root().try_into().unwrap();
571                (RootDigest::Sha256(root), merkle_tree_data)
572            }
573            fio::HashAlgorithm::Sha512 => {
574                let hasher = FsVerityHasher::Sha512(FsVerityHasherOptions::new(
575                    salt.clone(),
576                    self.block_size() as usize,
577                ));
578                let (tree, merkle_tree_data) =
579                    self.build_verity_tree(hasher, hash_alg.clone(), &salt).await?;
580                (RootDigest::Sha512(tree.root().to_vec()), merkle_tree_data)
581            }
582            _ => {
583                bail!(
584                    anyhow!(FxfsError::NotSupported)
585                        .context(format!("hash algorithm not supported"))
586                );
587            }
588        };
589        // TODO(b/314194485): Eventually want streaming writes.
590        // The merkle tree attribute should not require trimming because it should not
591        // exist.
592        self.handle
593            .write_new_attr_in_batches(
594                &mut transaction,
595                FSVERITY_MERKLE_ATTRIBUTE_ID,
596                &merkle_tree,
597                WRITE_ATTR_BATCH_SIZE,
598            )
599            .await?;
600        if merkle_tree.len() > WRITE_ATTR_BATCH_SIZE {
601            self.store().remove_attribute_from_graveyard(
602                &mut transaction,
603                self.object_id(),
604                FSVERITY_MERKLE_ATTRIBUTE_ID,
605            );
606        };
607        let descriptor_decoded =
608            FsVerityDescriptor::from_bytes(&merkle_tree, self.block_size() as usize)?;
609        let descriptor = FsverityStateInner {
610            root_digest: root_digest.clone(),
611            salt: salt.clone(),
612            merkle_tree: descriptor_decoded.leaf_digests()?.to_vec().into(),
613        };
614        self.set_fsverity_state_pending(descriptor);
615        transaction.add_with_object(
616            self.store().store_object_id(),
617            Mutation::replace_or_insert_object(
618                ObjectKey::attribute(
619                    self.object_id(),
620                    DEFAULT_DATA_ATTRIBUTE_ID,
621                    AttributeKey::Attribute,
622                ),
623                ObjectValue::verified_attribute(
624                    self.get_size(),
625                    FsverityMetadata::F2fs(0..merkle_tree.len() as u64),
626                ),
627            ),
628            AssocObj::Borrowed(self),
629        );
630        transaction.commit().await?;
631        Ok(())
632    }
633
634    /// Pre-allocate disk space for the given logical file range. If any part of the allocation
635    /// range is beyond the end of the file, the file size is updated.
636    pub async fn allocate(&self, range: Range<u64>) -> Result<(), Error> {
637        debug_assert!(range.start < range.end);
638
639        // It's not required that callers of allocate use block aligned ranges, but we need to make
640        // the extents block aligned. Luckily, fallocate in posix is allowed to allocate more than
641        // what was asked for for block alignment purposes. We just need to make sure that the size
642        // of the file is still the non-block-aligned end of the range if the size was changed.
643        let mut new_range = range.clone();
644        new_range.start = round_down(new_range.start, self.block_size());
645        // NB: FxfsError::TooBig turns into EFBIG when passed through starnix, which is the
646        // required error code when the requested range is larger than the file size.
647        new_range.end = round_up(new_range.end, self.block_size()).ok_or(FxfsError::TooBig)?;
648
649        let mut transaction = self.new_transaction().await?;
650        let mut to_allocate = Vec::new();
651        let mut to_switch = Vec::new();
652        let key_id = self.get_key(None).await?.0;
653
654        {
655            let tree = &self.store().tree;
656            let layer_set = tree.layer_set();
657            let offset_key = ObjectKey::attribute(
658                self.object_id(),
659                self.attribute_id(),
660                AttributeKey::Extent(ExtentKey::search_key_from_offset(new_range.start)),
661            );
662            let mut merger = layer_set.merger();
663            let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
664
665            loop {
666                match iter.get() {
667                    Some(ItemRef {
668                        key:
669                            ObjectKey {
670                                object_id,
671                                data:
672                                    ObjectKeyData::Attribute(
673                                        attribute_id,
674                                        AttributeKey::Extent(extent_key),
675                                    ),
676                            },
677                        value: ObjectValue::Extent(extent_value),
678                        ..
679                    }) if *object_id == self.object_id()
680                        && *attribute_id == self.attribute_id() =>
681                    {
682                        // If the start of this extent is beyond the end of the range we are
683                        // allocating, we don't have any more work to do.
684                        if new_range.end <= extent_key.range.start {
685                            break;
686                        }
687                        // Add any prefix we might need to allocate.
688                        if new_range.start < extent_key.range.start {
689                            to_allocate.push(new_range.start..extent_key.range.start);
690                            new_range.start = extent_key.range.start;
691                        }
692                        let device_offset = match extent_value {
693                            ExtentValue::None => {
694                                // If the extent value is None, it indicates a deleted extent. In
695                                // that case, we just skip it entirely. By keeping the new_range
696                                // where it is, this section will get included in the new
697                                // allocations.
698                                iter.advance().await?;
699                                continue;
700                            }
701                            ExtentValue::Some { mode: ExtentMode::OverwritePartial(_), .. }
702                            | ExtentValue::Some { mode: ExtentMode::Overwrite, .. } => {
703                                // If this extent is already in overwrite mode, we can skip it.
704                                if extent_key.range.end < new_range.end {
705                                    new_range.start = extent_key.range.end;
706                                    iter.advance().await?;
707                                    continue;
708                                } else {
709                                    new_range.start = new_range.end;
710                                    break;
711                                }
712                            }
713                            ExtentValue::Some { device_offset, .. } => *device_offset,
714                        };
715
716                        // Figure out how we have to break up the ranges.
717                        let device_offset =
718                            device_offset + (new_range.start - extent_key.range.start);
719                        if extent_key.range.end < new_range.end {
720                            to_switch.push((new_range.start..extent_key.range.end, device_offset));
721                            new_range.start = extent_key.range.end;
722                        } else {
723                            to_switch.push((new_range.start..new_range.end, device_offset));
724                            new_range.start = new_range.end;
725                            break;
726                        }
727                    }
728                    // The records are sorted so if we find something that isn't an extent or
729                    // doesn't match the object id then there are no more extent records for this
730                    // object.
731                    _ => break,
732                }
733                iter.advance().await?;
734            }
735        }
736
737        if new_range.start < new_range.end {
738            to_allocate.push(new_range.clone());
739        }
740
741        // We can update the size in the first transaction because even if subsequent transactions
742        // don't get replayed, the data between the current and new end of the file will be zero
743        // (either sparse zero or allocated zero). On the other hand, if we don't update the size
744        // in the first transaction, overwrite extents may be written past the end of the file
745        // which is an fsck error.
746        //
747        // The potential new size needs to be the non-block-aligned range end - we round up to the
748        // nearest block size for the actual allocation, but shouldn't do that for the file size.
749        let new_size = std::cmp::max(range.end, self.get_size());
750        // Make sure the mutation that flips the has_overwrite_extents advisory flag is in the
751        // first transaction, in case we split transactions. This makes it okay to only replay the
752        // first transaction if power loss occurs - the file will be in an unusual state, but not
753        // an invalid one, if only part of the allocate goes through.
754        transaction.add_with_object(
755            self.store().store_object_id(),
756            Mutation::replace_or_insert_object(
757                ObjectKey::attribute(
758                    self.object_id(),
759                    self.attribute_id(),
760                    AttributeKey::Attribute,
761                ),
762                ObjectValue::Attribute { size: new_size, has_overwrite_extents: true },
763            ),
764            AssocObj::Borrowed(self),
765        );
766
767        // The maximum number of mutations we are going to allow per transaction in allocate. This
768        // is probably quite a bit lower than the actual limit, but it should be large enough to
769        // handle most non-edge-case versions of allocate without splitting the transaction.
770        const MAX_TRANSACTION_SIZE: usize = 256;
771        for (switch_range, device_offset) in to_switch {
772            transaction.add_with_object(
773                self.store().store_object_id(),
774                Mutation::merge_object(
775                    ObjectKey::extent(self.object_id(), self.attribute_id(), switch_range),
776                    ObjectValue::Extent(ExtentValue::initialized_overwrite_extent(
777                        device_offset,
778                        key_id,
779                    )),
780                ),
781                AssocObj::Borrowed(self),
782            );
783            if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
784                transaction.commit_and_continue().await?;
785            }
786        }
787
788        let mut allocated = 0;
789        let allocator = self.store().allocator();
790        for mut allocate_range in to_allocate {
791            while allocate_range.start < allocate_range.end {
792                let device_range = allocator
793                    .allocate(
794                        &mut transaction,
795                        self.store().store_object_id(),
796                        allocate_range.end - allocate_range.start,
797                    )
798                    .await
799                    .context("allocation failed")?;
800                let device_range_len = device_range.end - device_range.start;
801
802                transaction.add_with_object(
803                    self.store().store_object_id(),
804                    Mutation::merge_object(
805                        ObjectKey::extent(
806                            self.object_id(),
807                            self.attribute_id(),
808                            allocate_range.start..allocate_range.start + device_range_len,
809                        ),
810                        ObjectValue::Extent(ExtentValue::blank_overwrite_extent(
811                            device_range.start,
812                            (device_range_len / self.block_size()) as usize,
813                            key_id,
814                        )),
815                    ),
816                    AssocObj::Borrowed(self),
817                );
818
819                allocate_range.start += device_range_len;
820                allocated += device_range_len;
821
822                if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
823                    self.update_allocated_size(&mut transaction, allocated, 0).await?;
824                    transaction.commit_and_continue().await?;
825                    allocated = 0;
826                }
827            }
828        }
829
830        self.update_allocated_size(&mut transaction, allocated, 0).await?;
831        transaction.commit().await?;
832
833        Ok(())
834    }
835
836    /// Return information on a contiguous set of extents that has the same allocation status,
837    /// starting from `start_offset`. The information returned is if this set of extents are marked
838    /// allocated/not allocated and also the size of this set (in bytes). This is used when
839    /// querying slices for volumes.
840    /// This function expects `start_offset` to be aligned to block size
841    pub async fn is_allocated(&self, start_offset: u64) -> Result<(bool, u64), Error> {
842        let block_size = self.block_size();
843        assert_eq!(start_offset % block_size, 0);
844
845        if start_offset > self.get_size() {
846            bail!(FxfsError::OutOfRange)
847        }
848
849        if start_offset == self.get_size() {
850            return Ok((false, 0));
851        }
852
853        let tree = &self.store().tree;
854        let layer_set = tree.layer_set();
855        let offset_key = ObjectKey::attribute(
856            self.object_id(),
857            self.attribute_id(),
858            AttributeKey::Extent(ExtentKey::search_key_from_offset(start_offset)),
859        );
860        let mut merger = layer_set.merger();
861        let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
862
863        let mut allocated = None;
864        let mut end = start_offset;
865
866        loop {
867            // Iterate through the extents, each time setting `end` as the end of the previous
868            // extent
869            match iter.get() {
870                Some(ItemRef {
871                    key:
872                        ObjectKey {
873                            object_id,
874                            data:
875                                ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
876                        },
877                    value: ObjectValue::Extent(extent_value),
878                    ..
879                }) => {
880                    // Equivalent of getting no extents back
881                    if *object_id != self.object_id() || *attribute_id != self.attribute_id() {
882                        if allocated == Some(false) || allocated.is_none() {
883                            end = self.get_size();
884                            allocated = Some(false);
885                        }
886                        break;
887                    }
888                    ensure!(extent_key.range.is_aligned(block_size), FxfsError::Inconsistent);
889                    if extent_key.range.start > end {
890                        // If a previous extent has already been visited and we are tracking an
891                        // allocated set, we are only interested in an extent where the range of the
892                        // current extent follows immediately after the previous one.
893                        if allocated == Some(true) {
894                            break;
895                        } else {
896                            // The gap between the previous `end` and this extent is not allocated
897                            end = extent_key.range.start;
898                            allocated = Some(false);
899                            // Continue this iteration, except now the `end` is set to the end of
900                            // the "previous" extent which is this gap between the start_offset
901                            // and the current extent
902                        }
903                    }
904
905                    // We can assume that from here, the `end` points to the end of a previous
906                    // extent.
907                    match extent_value {
908                        // The current extent has been allocated
909                        ExtentValue::Some { .. } => {
910                            // Stop searching if previous extent was marked deleted
911                            if allocated == Some(false) {
912                                break;
913                            }
914                            allocated = Some(true);
915                        }
916                        // This extent has been marked deleted
917                        ExtentValue::None => {
918                            // Stop searching if previous extent was marked allocated
919                            if allocated == Some(true) {
920                                break;
921                            }
922                            allocated = Some(false);
923                        }
924                    }
925                    end = extent_key.range.end;
926                }
927                // This occurs when there are no extents left
928                None => {
929                    if allocated == Some(false) || allocated.is_none() {
930                        end = self.get_size();
931                        allocated = Some(false);
932                    }
933                    // Otherwise, we were monitoring extents that were allocated, so just exit.
934                    break;
935                }
936                // Non-extent records (Object, Child, GraveyardEntry) are ignored.
937                Some(_) => {}
938            }
939            iter.advance().await?;
940        }
941
942        Ok((allocated.unwrap(), end - start_offset))
943    }
944
945    pub async fn txn_write<'a>(
946        &'a self,
947        transaction: &mut Transaction<'a>,
948        offset: u64,
949        buf: BufferRef<'_>,
950    ) -> Result<(), Error> {
951        if buf.is_empty() {
952            return Ok(());
953        }
954        let (aligned, mut transfer_buf) = self.align_buffer(offset, buf).await?;
955        self.multi_write(
956            transaction,
957            self.attribute_id(),
958            std::slice::from_ref(&aligned),
959            transfer_buf.as_mut(),
960        )
961        .await?;
962        if offset + buf.len() as u64 > self.txn_get_size(transaction) {
963            self.txn_update_size(transaction, offset + buf.len() as u64, None).await?;
964        }
965        Ok(())
966    }
967
968    // Writes to multiple ranges with data provided in `buf`.  The buffer can be modified in place
969    // if encryption takes place.  The ranges must all be aligned and no change to content size is
970    // applied; the caller is responsible for updating size if required.
971    pub async fn multi_write<'a>(
972        &'a self,
973        transaction: &mut Transaction<'a>,
974        attribute_id: u64,
975        ranges: &[Range<u64>],
976        buf: MutableBufferRef<'_>,
977    ) -> Result<(), Error> {
978        self.handle.multi_write(transaction, attribute_id, None, ranges, buf).await
979    }
980
981    // `buf` is mutable as an optimization, since the write may require encryption, we can
982    // encrypt the buffer in-place rather than copying to another buffer if the write is
983    // already aligned.
984    //
985    // Note: in the event of power failure during an overwrite() call, it is possible that
986    // old data (which hasn't been overwritten with new bytes yet) may be exposed to the user.
987    // Since the old data should be encrypted, it is probably safe to expose, although not ideal.
988    pub async fn overwrite(
989        &self,
990        mut offset: u64,
991        mut buf: MutableBufferRef<'_>,
992        options: OverwriteOptions,
993    ) -> Result<(), Error> {
994        assert_eq!((buf.len() as u32) % self.store().device.block_size(), 0);
995        let end = offset + buf.len() as u64;
996
997        let key_id = self.get_key(None).await?.0;
998
999        // The transaction only ends up being used if allow_allocations is true
1000        let mut transaction =
1001            if options.allow_allocations { Some(self.new_transaction().await?) } else { None };
1002
1003        // We build up a list of writes to perform later
1004        let writes = FuturesUnordered::new();
1005
1006        if options.barrier_on_first_write {
1007            self.store().device.barrier();
1008        }
1009
1010        // We create a new scope here, so that the merger iterator will get dropped before we try to
1011        // commit our transaction. Otherwise the transaction commit would block.
1012        {
1013            let store = self.store();
1014            let store_object_id = store.store_object_id;
1015            let allocator = store.allocator();
1016            let tree = &store.tree;
1017            let layer_set = tree.layer_set();
1018            let mut merger = layer_set.merger();
1019            let mut iter = merger
1020                .query(Query::FullRange(&ObjectKey::attribute(
1021                    self.object_id(),
1022                    self.attribute_id(),
1023                    AttributeKey::Extent(ExtentKey::search_key_from_offset(offset)),
1024                )))
1025                .await?;
1026            let block_size = self.block_size();
1027
1028            loop {
1029                let (device_offset, bytes_to_write, should_advance) = match iter.get() {
1030                    Some(ItemRef {
1031                        key:
1032                            ObjectKey {
1033                                object_id,
1034                                data:
1035                                    ObjectKeyData::Attribute(
1036                                        attribute_id,
1037                                        AttributeKey::Extent(ExtentKey { range }),
1038                                    ),
1039                            },
1040                        value: ObjectValue::Extent(ExtentValue::Some { .. }),
1041                        ..
1042                    }) if *object_id == self.object_id()
1043                        && *attribute_id == self.attribute_id()
1044                        && range.end == offset =>
1045                    {
1046                        iter.advance().await?;
1047                        continue;
1048                    }
1049                    Some(ItemRef {
1050                        key:
1051                            ObjectKey {
1052                                object_id,
1053                                data:
1054                                    ObjectKeyData::Attribute(
1055                                        attribute_id,
1056                                        AttributeKey::Extent(ExtentKey { range }),
1057                                    ),
1058                            },
1059                        value,
1060                        ..
1061                    }) if *object_id == self.object_id()
1062                        && *attribute_id == self.attribute_id()
1063                        && range.start <= offset =>
1064                    {
1065                        match value {
1066                            ObjectValue::Extent(ExtentValue::Some {
1067                                device_offset,
1068                                mode: ExtentMode::Raw,
1069                                ..
1070                            }) => {
1071                                ensure!(
1072                                    range.is_aligned(block_size) && device_offset % block_size == 0,
1073                                    FxfsError::Inconsistent
1074                                );
1075                                let offset_within_extent = offset - range.start;
1076                                let remaining_length_of_extent = (range
1077                                    .end
1078                                    .checked_sub(offset)
1079                                    .ok_or(FxfsError::Inconsistent)?)
1080                                    as usize;
1081                                // Yields (device_offset, bytes_to_write, should_advance)
1082                                (
1083                                    device_offset + offset_within_extent,
1084                                    min(buf.len(), remaining_length_of_extent),
1085                                    true,
1086                                )
1087                            }
1088                            ObjectValue::Extent(ExtentValue::Some { .. }) => {
1089                                // TODO(https://fxbug.dev/42066056): Maybe we should create
1090                                // a new extent without checksums?
1091                                bail!(
1092                                    "extent from ({},{}) which overlaps offset \
1093                                        {} has the wrong extent mode",
1094                                    range.start,
1095                                    range.end,
1096                                    offset
1097                                )
1098                            }
1099                            _ => {
1100                                bail!(
1101                                    "overwrite failed: extent overlapping offset {} has \
1102                                      unexpected ObjectValue",
1103                                    offset
1104                                )
1105                            }
1106                        }
1107                    }
1108                    maybe_item_ref => {
1109                        if let Some(transaction) = transaction.as_mut() {
1110                            assert_eq!(options.allow_allocations, true);
1111                            assert_eq!(offset % self.block_size(), 0);
1112
1113                            // We are going to make a new extent, but let's check if there is an
1114                            // extent after us. If there is an extent after us, then we don't want
1115                            // our new extent to bump into it...
1116                            let mut bytes_to_allocate =
1117                                round_up(buf.len() as u64, self.block_size())
1118                                    .ok_or(FxfsError::TooBig)?;
1119                            if let Some(ItemRef {
1120                                key:
1121                                    ObjectKey {
1122                                        object_id,
1123                                        data:
1124                                            ObjectKeyData::Attribute(
1125                                                attribute_id,
1126                                                AttributeKey::Extent(ExtentKey { range }),
1127                                            ),
1128                                    },
1129                                ..
1130                            }) = maybe_item_ref
1131                            {
1132                                if *object_id == self.object_id()
1133                                    && *attribute_id == self.attribute_id()
1134                                    && offset < range.start
1135                                {
1136                                    let bytes_until_next_extent = range.start - offset;
1137                                    bytes_to_allocate =
1138                                        min(bytes_to_allocate, bytes_until_next_extent);
1139                                }
1140                            }
1141
1142                            let device_range = allocator
1143                                .allocate(transaction, store_object_id, bytes_to_allocate)
1144                                .await?;
1145                            let device_range_len = device_range.end - device_range.start;
1146                            transaction.add(
1147                                store_object_id,
1148                                Mutation::insert_object(
1149                                    ObjectKey::extent(
1150                                        self.object_id(),
1151                                        self.attribute_id(),
1152                                        offset..offset + device_range_len,
1153                                    ),
1154                                    ObjectValue::Extent(ExtentValue::new_raw(
1155                                        device_range.start,
1156                                        key_id,
1157                                    )),
1158                                ),
1159                            );
1160
1161                            self.update_allocated_size(transaction, device_range_len, 0).await?;
1162
1163                            // Yields (device_offset, bytes_to_write, should_advance)
1164                            (device_range.start, min(buf.len(), device_range_len as usize), false)
1165                        } else {
1166                            bail!(
1167                                "no extent overlapping offset {}, \
1168                                and new allocations are not allowed",
1169                                offset
1170                            )
1171                        }
1172                    }
1173                };
1174                let (current_buf, remaining_buf) = buf.split_at_mut(bytes_to_write);
1175                writes.push(self.write_at(offset, current_buf, device_offset));
1176                if remaining_buf.len() == 0 {
1177                    break;
1178                } else {
1179                    buf = remaining_buf;
1180                    offset += bytes_to_write as u64;
1181                    if should_advance {
1182                        iter.advance().await?;
1183                    }
1184                }
1185            }
1186        }
1187
1188        self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1189        // The checksums are being ignored here, but we don't need to know them
1190        writes.try_collect::<Vec<MaybeChecksums>>().await?;
1191
1192        if let Some(mut transaction) = transaction {
1193            assert_eq!(options.allow_allocations, true);
1194            if !transaction.is_empty() {
1195                if end > self.get_size() {
1196                    self.grow(&mut transaction, self.get_size(), end).await?;
1197                }
1198                transaction.commit().await?;
1199            }
1200        }
1201
1202        Ok(())
1203    }
1204
1205    // Within a transaction, the size of the object might have changed, so get the size from there
1206    // if it exists, otherwise, fall back on the cached size.
1207    fn txn_get_size(&self, transaction: &Transaction<'_>) -> u64 {
1208        transaction
1209            .get_object_mutation(
1210                self.store().store_object_id,
1211                ObjectKey::attribute(
1212                    self.object_id(),
1213                    self.attribute_id(),
1214                    AttributeKey::Attribute,
1215                ),
1216            )
1217            .and_then(|m| {
1218                if let ObjectItem { value: ObjectValue::Attribute { size, .. }, .. } = m.item {
1219                    Some(size)
1220                } else {
1221                    None
1222                }
1223            })
1224            .unwrap_or_else(|| self.get_size())
1225    }
1226
1227    pub async fn txn_update_size<'a>(
1228        &'a self,
1229        transaction: &mut Transaction<'a>,
1230        new_size: u64,
1231        // Allow callers to update the has_overwrite_extents metadata if they want. If this is
1232        // Some it is set to the value, if None it is left unchanged.
1233        update_has_overwrite_extents: Option<bool>,
1234    ) -> Result<(), Error> {
1235        let key =
1236            ObjectKey::attribute(self.object_id(), self.attribute_id(), AttributeKey::Attribute);
1237        let mut mutation = if let Some(mutation) =
1238            transaction.get_object_mutation(self.store().store_object_id(), key.clone())
1239        {
1240            mutation.clone()
1241        } else {
1242            ObjectStoreMutation {
1243                item: self.store().tree().find(&key).await?.ok_or(FxfsError::NotFound)?,
1244                op: Operation::ReplaceOrInsert,
1245            }
1246        };
1247        if let ObjectValue::Attribute { size, has_overwrite_extents } = &mut mutation.item.value {
1248            *size = new_size;
1249            if let Some(update_has_overwrite_extents) = update_has_overwrite_extents {
1250                *has_overwrite_extents = update_has_overwrite_extents;
1251            }
1252        } else {
1253            bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
1254        }
1255        transaction.add_with_object(
1256            self.store().store_object_id(),
1257            Mutation::ObjectStore(mutation),
1258            AssocObj::Borrowed(self),
1259        );
1260        Ok(())
1261    }
1262
1263    async fn update_allocated_size(
1264        &self,
1265        transaction: &mut Transaction<'_>,
1266        allocated: u64,
1267        deallocated: u64,
1268    ) -> Result<(), Error> {
1269        self.handle.update_allocated_size(transaction, allocated, deallocated).await
1270    }
1271
1272    pub fn truncate_overwrite_ranges(&self, size: u64) -> Result<Option<bool>, Error> {
1273        if self
1274            .overwrite_ranges
1275            .truncate(round_up(size, self.block_size()).ok_or(FxfsError::TooBig)?)
1276        {
1277            // This returns true if there were ranges, but this truncate removed them all, which
1278            // indicates that we need to flip the has_overwrite_extents metadata flag to false.
1279            Ok(Some(false))
1280        } else {
1281            Ok(None)
1282        }
1283    }
1284
1285    pub async fn shrink<'a>(
1286        &'a self,
1287        transaction: &mut Transaction<'a>,
1288        size: u64,
1289        update_has_overwrite_extents: Option<bool>,
1290    ) -> Result<NeedsTrim, Error> {
1291        let needs_trim = self.handle.shrink(transaction, self.attribute_id(), size).await?;
1292        self.txn_update_size(transaction, size, update_has_overwrite_extents).await?;
1293        Ok(needs_trim)
1294    }
1295
1296    pub async fn grow<'a>(
1297        &'a self,
1298        transaction: &mut Transaction<'a>,
1299        old_size: u64,
1300        size: u64,
1301    ) -> Result<(), Error> {
1302        // Before growing the file, we must make sure that a previous trim has completed.
1303        let store = self.store();
1304        while matches!(
1305            store
1306                .trim_some(
1307                    transaction,
1308                    self.object_id(),
1309                    self.attribute_id(),
1310                    TrimMode::FromOffset(old_size)
1311                )
1312                .await?,
1313            TrimResult::Incomplete
1314        ) {
1315            transaction.commit_and_continue().await?;
1316        }
1317        // We might need to zero out the tail of the old last block.
1318        let block_size = self.block_size();
1319        if old_size % block_size != 0 {
1320            let layer_set = store.tree.layer_set();
1321            let mut merger = layer_set.merger();
1322            let aligned_old_size = round_down(old_size, block_size);
1323            let iter = merger
1324                .query(Query::FullRange(&ObjectKey::extent(
1325                    self.object_id(),
1326                    self.attribute_id(),
1327                    aligned_old_size..aligned_old_size + 1,
1328                )))
1329                .await?;
1330            if let Some(ItemRef {
1331                key:
1332                    ObjectKey {
1333                        object_id,
1334                        data:
1335                            ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
1336                    },
1337                value: ObjectValue::Extent(ExtentValue::Some { device_offset, key_id, .. }),
1338                ..
1339            }) = iter.get()
1340            {
1341                if *object_id == self.object_id() && *attribute_id == self.attribute_id() {
1342                    let device_offset = device_offset
1343                        .checked_add(aligned_old_size - extent_key.range.start)
1344                        .ok_or(FxfsError::Inconsistent)?;
1345                    ensure!(device_offset % block_size == 0, FxfsError::Inconsistent);
1346                    let mut buf = self.allocate_buffer(block_size as usize).await;
1347                    // In the case that this extent is in OverwritePartial mode, there is a
1348                    // possibility that the last block is allocated, but not initialized yet, in
1349                    // which case we don't actually need to bother zeroing out the tail. However,
1350                    // it's not strictly incorrect to change uninitialized data, so we skip the
1351                    // check and blindly do it to keep it simpler here.
1352                    self.read_and_decrypt(device_offset, aligned_old_size, buf.as_mut(), *key_id)
1353                        .await?;
1354                    buf.as_mut_slice()[(old_size % block_size) as usize..].fill(0);
1355                    self.multi_write(
1356                        transaction,
1357                        *attribute_id,
1358                        &[aligned_old_size..aligned_old_size + block_size],
1359                        buf.as_mut(),
1360                    )
1361                    .await?;
1362                }
1363            }
1364        }
1365        self.txn_update_size(transaction, size, None).await?;
1366        Ok(())
1367    }
1368
1369    /// Attempts to pre-allocate a `file_range` of bytes for this object.
1370    /// Returns a set of device ranges (i.e. potentially multiple extents).
1371    ///
1372    /// It may not be possible to preallocate the entire requested range in one request
1373    /// due to limitations on transaction size. In such cases, we will preallocate as much as
1374    /// we can up to some (arbitrary, internal) limit on transaction size.
1375    ///
1376    /// `file_range.start` is modified to point at the end of the logical range
1377    /// that was preallocated such that repeated calls to `preallocate_range` with new
1378    /// transactions can be used to preallocate ranges of any size.
1379    ///
1380    /// Requested range must be a multiple of block size.
1381    pub async fn preallocate_range<'a>(
1382        &'a self,
1383        transaction: &mut Transaction<'a>,
1384        file_range: &mut Range<u64>,
1385    ) -> Result<Vec<Range<u64>>, Error> {
1386        let block_size = self.block_size();
1387        assert!(file_range.is_aligned(block_size));
1388        assert!(!self.handle.is_encrypted());
1389        let mut ranges = Vec::new();
1390        let tree = &self.store().tree;
1391        let layer_set = tree.layer_set();
1392        let mut merger = layer_set.merger();
1393        let mut iter = merger
1394            .query(Query::FullRange(&ObjectKey::attribute(
1395                self.object_id(),
1396                self.attribute_id(),
1397                AttributeKey::Extent(ExtentKey::search_key_from_offset(file_range.start)),
1398            )))
1399            .await?;
1400        let mut allocated = 0;
1401        let key_id = self.get_key(None).await?.0;
1402        'outer: while file_range.start < file_range.end {
1403            let allocate_end = loop {
1404                match iter.get() {
1405                    // Case for allocated extents for the same object that overlap with file_range.
1406                    Some(ItemRef {
1407                        key:
1408                            ObjectKey {
1409                                object_id,
1410                                data:
1411                                    ObjectKeyData::Attribute(
1412                                        attribute_id,
1413                                        AttributeKey::Extent(ExtentKey { range }),
1414                                    ),
1415                            },
1416                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1417                        ..
1418                    }) if *object_id == self.object_id()
1419                        && *attribute_id == self.attribute_id()
1420                        && range.start < file_range.end =>
1421                    {
1422                        ensure!(
1423                            range.is_valid()
1424                                && range.is_aligned(block_size)
1425                                && device_offset % block_size == 0,
1426                            FxfsError::Inconsistent
1427                        );
1428                        // If the start of the requested file_range overlaps with an existing extent...
1429                        if range.start <= file_range.start {
1430                            // Record the existing extent and move on.
1431                            let device_range = device_offset
1432                                .checked_add(file_range.start - range.start)
1433                                .ok_or(FxfsError::Inconsistent)?
1434                                ..device_offset
1435                                    .checked_add(min(range.end, file_range.end) - range.start)
1436                                    .ok_or(FxfsError::Inconsistent)?;
1437                            file_range.start += device_range.end - device_range.start;
1438                            ranges.push(device_range);
1439                            if file_range.start >= file_range.end {
1440                                break 'outer;
1441                            }
1442                            iter.advance().await?;
1443                            continue;
1444                        } else {
1445                            // There's nothing allocated between file_range.start and the beginning
1446                            // of this extent.
1447                            break range.start;
1448                        }
1449                    }
1450                    // Case for deleted extents eclipsed by file_range.
1451                    Some(ItemRef {
1452                        key:
1453                            ObjectKey {
1454                                object_id,
1455                                data:
1456                                    ObjectKeyData::Attribute(
1457                                        attribute_id,
1458                                        AttributeKey::Extent(ExtentKey { range }),
1459                                    ),
1460                            },
1461                        value: ObjectValue::Extent(ExtentValue::None),
1462                        ..
1463                    }) if *object_id == self.object_id()
1464                        && *attribute_id == self.attribute_id()
1465                        && range.end < file_range.end =>
1466                    {
1467                        iter.advance().await?;
1468                    }
1469                    _ => {
1470                        // We can just preallocate the rest.
1471                        break file_range.end;
1472                    }
1473                }
1474            };
1475            let device_range = self
1476                .store()
1477                .allocator()
1478                .allocate(
1479                    transaction,
1480                    self.store().store_object_id(),
1481                    allocate_end - file_range.start,
1482                )
1483                .await
1484                .context("Allocation failed")?;
1485            allocated += device_range.end - device_range.start;
1486            let this_file_range =
1487                file_range.start..file_range.start + device_range.end - device_range.start;
1488            file_range.start = this_file_range.end;
1489            transaction.add(
1490                self.store().store_object_id,
1491                Mutation::merge_object(
1492                    ObjectKey::extent(self.object_id(), self.attribute_id(), this_file_range),
1493                    ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
1494                ),
1495            );
1496            ranges.push(device_range);
1497            // If we didn't allocate all that we requested, we'll loop around and try again.
1498            // ... unless we have filled the transaction. The caller should check file_range.
1499            if transaction.mutations().len() > TRANSACTION_MUTATION_THRESHOLD {
1500                break;
1501            }
1502        }
1503        // Update the file size if it changed.
1504        if file_range.start > round_up(self.txn_get_size(transaction), block_size).unwrap() {
1505            self.txn_update_size(transaction, file_range.start, None).await?;
1506        }
1507        self.update_allocated_size(transaction, allocated, 0).await?;
1508        Ok(ranges)
1509    }
1510
1511    pub async fn update_attributes<'a>(
1512        &self,
1513        transaction: &mut Transaction<'a>,
1514        node_attributes: Option<&fio::MutableNodeAttributes>,
1515        change_time: Option<Timestamp>,
1516    ) -> Result<(), Error> {
1517        // This codepath is only called by files, whose wrapping key id users cannot directly set
1518        // as per fscrypt.
1519        ensure!(
1520            !matches!(
1521                node_attributes,
1522                Some(fio::MutableNodeAttributes { wrapping_key_id: Some(_), .. })
1523            ),
1524            FxfsError::BadPath
1525        );
1526        self.handle.update_attributes(transaction, node_attributes, change_time).await
1527    }
1528
1529    /// Get the default set of transaction options for this object. This is mostly the overall
1530    /// default, modified by any [`HandleOptions`] held by this handle.
1531    pub fn default_transaction_options<'b>(&self) -> Options<'b> {
1532        self.handle.default_transaction_options()
1533    }
1534
1535    pub async fn new_transaction<'b>(&self) -> Result<Transaction<'b>, Error> {
1536        self.new_transaction_with_options(self.default_transaction_options()).await
1537    }
1538
1539    pub async fn new_transaction_with_options<'b>(
1540        &self,
1541        options: Options<'b>,
1542    ) -> Result<Transaction<'b>, Error> {
1543        self.handle.new_transaction_with_options(self.attribute_id(), options).await
1544    }
1545
1546    /// Flushes the underlying device.  This is expensive and should be used sparingly.
1547    pub async fn flush_device(&self) -> Result<(), Error> {
1548        self.handle.flush_device().await
1549    }
1550
1551    /// Reads an entire attribute.
1552    pub async fn read_attr(&self, attribute_id: u64) -> Result<Option<Box<[u8]>>, Error> {
1553        self.handle.read_attr(attribute_id).await
1554    }
1555
1556    /// Writes an entire attribute.  This *always* uses the volume data key.
1557    pub async fn write_attr(&self, attribute_id: u64, data: &[u8]) -> Result<(), Error> {
1558        // Must be different attribute otherwise cached size gets out of date.
1559        assert_ne!(attribute_id, self.attribute_id());
1560        let store = self.store();
1561        let mut transaction = self.new_transaction().await?;
1562        if self.handle.write_attr(&mut transaction, attribute_id, data).await?.0 {
1563            transaction.commit_and_continue().await?;
1564            while matches!(
1565                store
1566                    .trim_some(
1567                        &mut transaction,
1568                        self.object_id(),
1569                        attribute_id,
1570                        TrimMode::FromOffset(data.len() as u64),
1571                    )
1572                    .await?,
1573                TrimResult::Incomplete
1574            ) {
1575                transaction.commit_and_continue().await?;
1576            }
1577        }
1578        transaction.commit().await?;
1579        Ok(())
1580    }
1581
1582    async fn read_and_decrypt(
1583        &self,
1584        device_offset: u64,
1585        file_offset: u64,
1586        buffer: MutableBufferRef<'_>,
1587        key_id: u64,
1588    ) -> Result<(), Error> {
1589        self.handle.read_and_decrypt(device_offset, file_offset, buffer, key_id).await
1590    }
1591
1592    /// Truncates a file to a given size (growing/shrinking as required).
1593    ///
1594    /// Nb: Most code will want to call truncate() instead. This method is used
1595    /// to update the super block -- a case where we must borrow metadata space.
1596    pub async fn truncate_with_options(
1597        &self,
1598        options: Options<'_>,
1599        size: u64,
1600    ) -> Result<(), Error> {
1601        let mut transaction = self.new_transaction_with_options(options).await?;
1602        let old_size = self.get_size();
1603        if size == old_size {
1604            return Ok(());
1605        }
1606        if size < old_size {
1607            let update_has_overwrite_ranges = self.truncate_overwrite_ranges(size)?;
1608            if self.shrink(&mut transaction, size, update_has_overwrite_ranges).await?.0 {
1609                // The file needs to be trimmed.
1610                transaction.commit_and_continue().await?;
1611                let store = self.store();
1612                while matches!(
1613                    store
1614                        .trim_some(
1615                            &mut transaction,
1616                            self.object_id(),
1617                            self.attribute_id(),
1618                            TrimMode::FromOffset(size)
1619                        )
1620                        .await?,
1621                    TrimResult::Incomplete
1622                ) {
1623                    if let Err(error) = transaction.commit_and_continue().await {
1624                        warn!(error:?; "Failed to trim after truncate");
1625                        return Ok(());
1626                    }
1627                }
1628                if let Err(error) = transaction.commit().await {
1629                    warn!(error:?; "Failed to trim after truncate");
1630                }
1631                return Ok(());
1632            }
1633        } else {
1634            self.grow(&mut transaction, old_size, size).await?;
1635        }
1636        transaction.commit().await?;
1637        Ok(())
1638    }
1639
1640    pub async fn get_properties(&self) -> Result<ObjectProperties, Error> {
1641        // We don't take a read guard here since the object properties are contained in a single
1642        // object, which cannot be inconsistent with itself. The LSM tree does not return
1643        // intermediate states for a single object.
1644        let item = self
1645            .store()
1646            .tree
1647            .find(&ObjectKey::object(self.object_id()))
1648            .await?
1649            .expect("Unable to find object record");
1650        match item.value {
1651            ObjectValue::Object {
1652                kind: ObjectKind::File { refs, .. },
1653                attributes:
1654                    ObjectAttributes {
1655                        creation_time,
1656                        modification_time,
1657                        posix_attributes,
1658                        allocated_size,
1659                        access_time,
1660                        change_time,
1661                        ..
1662                    },
1663            } => Ok(ObjectProperties {
1664                refs,
1665                allocated_size,
1666                data_attribute_size: self.get_size(),
1667                creation_time,
1668                modification_time,
1669                access_time,
1670                change_time,
1671                sub_dirs: 0,
1672                posix_attributes,
1673                casefold: false,
1674                wrapping_key_id: None,
1675            }),
1676            _ => bail!(FxfsError::NotFile),
1677        }
1678    }
1679
1680    // Returns the contents of this object. This object must be < |limit| bytes in size.
1681    pub async fn contents(&self, limit: usize) -> Result<Box<[u8]>, Error> {
1682        let size = self.get_size();
1683        if size > limit as u64 {
1684            bail!("Object too big ({} > {})", size, limit);
1685        }
1686        let mut buf = self.allocate_buffer(size as usize).await;
1687        self.read(0u64, buf.as_mut()).await?;
1688        Ok(buf.as_slice().into())
1689    }
1690
1691    /// Returns the set of file_offset->extent mappings for this file. The extents will be sorted by
1692    /// their logical offset within the file.
1693    ///
1694    /// *NOTE*: This operation is potentially expensive and should generally be avoided.
1695    pub async fn device_extents(&self) -> Result<Vec<FileExtent>, Error> {
1696        let mut extents = Vec::new();
1697        let tree = &self.store().tree;
1698        let layer_set = tree.layer_set();
1699        let mut merger = layer_set.merger();
1700        let mut iter = merger
1701            .query(Query::FullRange(&ObjectKey::attribute(
1702                self.object_id(),
1703                self.attribute_id(),
1704                AttributeKey::Extent(ExtentKey::search_key_from_offset(0)),
1705            )))
1706            .await?;
1707        loop {
1708            match iter.get() {
1709                Some(ItemRef {
1710                    key:
1711                        ObjectKey {
1712                            object_id,
1713                            data:
1714                                ObjectKeyData::Attribute(
1715                                    attribute_id,
1716                                    AttributeKey::Extent(ExtentKey { range }),
1717                                ),
1718                        },
1719                    value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1720                    ..
1721                }) if *object_id == self.object_id() && *attribute_id == self.attribute_id() => {
1722                    let logical_offset = range.start;
1723                    let device_range = *device_offset..*device_offset + range.length()?;
1724                    extents.push(FileExtent::new(logical_offset, device_range)?);
1725                }
1726                _ => break,
1727            }
1728            iter.advance().await?;
1729        }
1730        Ok(extents)
1731    }
1732}
1733
1734impl<S: HandleOwner> AssociatedObject for DataObjectHandle<S> {
1735    fn will_apply_mutation(&self, mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
1736        match mutation {
1737            Mutation::ObjectStore(ObjectStoreMutation {
1738                item: ObjectItem { value: ObjectValue::Attribute { size, .. }, .. },
1739                ..
1740            }) => self.content_size.store(*size, atomic::Ordering::Relaxed),
1741            Mutation::ObjectStore(ObjectStoreMutation {
1742                item: ObjectItem { value: ObjectValue::VerifiedAttribute { size, .. }, .. },
1743                ..
1744            }) => {
1745                debug_assert_eq!(
1746                    self.get_size(),
1747                    *size,
1748                    "size should be set when verity is enabled and must not change"
1749                );
1750                self.finalize_fsverity_state()
1751            }
1752            Mutation::ObjectStore(ObjectStoreMutation {
1753                item:
1754                    ObjectItem {
1755                        key:
1756                            ObjectKey {
1757                                object_id,
1758                                data:
1759                                    ObjectKeyData::Attribute(
1760                                        attr_id,
1761                                        AttributeKey::Extent(ExtentKey { range }),
1762                                    ),
1763                            },
1764                        value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
1765                        ..
1766                    },
1767                ..
1768            }) if self.object_id() == *object_id && self.attribute_id() == *attr_id => match mode {
1769                ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => {
1770                    self.overwrite_ranges.apply_range(range.clone())
1771                }
1772                ExtentMode::Raw | ExtentMode::Cow(_) => (),
1773            },
1774            _ => {}
1775        }
1776    }
1777}
1778
1779impl<S: HandleOwner> ObjectHandle for DataObjectHandle<S> {
1780    fn set_trace(&self, v: bool) {
1781        self.handle.set_trace(v)
1782    }
1783
1784    fn object_id(&self) -> u64 {
1785        self.handle.object_id()
1786    }
1787
1788    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
1789        self.handle.allocate_buffer(size)
1790    }
1791
1792    fn block_size(&self) -> u64 {
1793        self.handle.block_size()
1794    }
1795}
1796
1797#[async_trait]
1798impl<S: HandleOwner> ReadObjectHandle for DataObjectHandle<S> {
1799    async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
1800        let fs = self.store().filesystem();
1801        let guard = fs
1802            .lock_manager()
1803            .read_lock(lock_keys![LockKey::object_attribute(
1804                self.store().store_object_id,
1805                self.object_id(),
1806                self.attribute_id(),
1807            )])
1808            .await;
1809
1810        let size = self.get_size();
1811        if offset >= size {
1812            return Ok(0);
1813        }
1814        let length = min(buf.len() as u64, size - offset) as usize;
1815        buf = buf.subslice_mut(0..length);
1816        self.handle.read_unchecked(self.attribute_id(), offset, buf.reborrow(), &guard).await?;
1817        if self.is_verified_file() {
1818            self.verify_data(offset as usize, buf.as_slice())?;
1819        }
1820        Ok(length)
1821    }
1822
1823    fn get_size(&self) -> u64 {
1824        self.content_size.load(atomic::Ordering::Relaxed)
1825    }
1826}
1827
1828impl<S: HandleOwner> WriteObjectHandle for DataObjectHandle<S> {
1829    async fn write_or_append(&self, offset: Option<u64>, buf: BufferRef<'_>) -> Result<u64, Error> {
1830        let offset = offset.unwrap_or_else(|| self.get_size());
1831        let mut transaction = self.new_transaction().await?;
1832        self.txn_write(&mut transaction, offset, buf).await?;
1833        let new_size = self.txn_get_size(&transaction);
1834        transaction.commit().await?;
1835        Ok(new_size)
1836    }
1837
1838    async fn truncate(&self, size: u64) -> Result<(), Error> {
1839        self.truncate_with_options(self.default_transaction_options(), size).await
1840    }
1841
1842    async fn flush(&self) -> Result<(), Error> {
1843        Ok(())
1844    }
1845}
1846
1847/// Like object_handle::Writer, but allows custom transaction options to be set, and makes every
1848/// write go directly to the handle in a transaction.
1849pub struct DirectWriter<'a, S: HandleOwner> {
1850    handle: &'a DataObjectHandle<S>,
1851    options: transaction::Options<'a>,
1852    buffer: Buffer<'a>,
1853    offset: u64,
1854    buf_offset: usize,
1855}
1856
1857const BUFFER_SIZE: usize = 1_048_576;
1858
1859impl<S: HandleOwner> Drop for DirectWriter<'_, S> {
1860    fn drop(&mut self) {
1861        if self.buf_offset != 0 {
1862            warn!("DirectWriter: dropping data, did you forget to call complete?");
1863        }
1864    }
1865}
1866
1867impl<'a, S: HandleOwner> DirectWriter<'a, S> {
1868    pub async fn new(
1869        handle: &'a DataObjectHandle<S>,
1870        options: transaction::Options<'a>,
1871    ) -> DirectWriter<'a, S> {
1872        Self {
1873            handle,
1874            options,
1875            buffer: handle.allocate_buffer(BUFFER_SIZE).await,
1876            offset: 0,
1877            buf_offset: 0,
1878        }
1879    }
1880
1881    async fn flush(&mut self) -> Result<(), Error> {
1882        let mut transaction = self.handle.new_transaction_with_options(self.options).await?;
1883        self.handle
1884            .txn_write(&mut transaction, self.offset, self.buffer.subslice(..self.buf_offset))
1885            .await?;
1886        transaction.commit().await?;
1887        self.offset += self.buf_offset as u64;
1888        self.buf_offset = 0;
1889        Ok(())
1890    }
1891}
1892
1893impl<'a, S: HandleOwner> WriteBytes for DirectWriter<'a, S> {
1894    fn block_size(&self) -> u64 {
1895        self.handle.block_size()
1896    }
1897
1898    async fn write_bytes(&mut self, mut buf: &[u8]) -> Result<(), Error> {
1899        while buf.len() > 0 {
1900            let to_do = std::cmp::min(buf.len(), BUFFER_SIZE - self.buf_offset);
1901            self.buffer
1902                .subslice_mut(self.buf_offset..self.buf_offset + to_do)
1903                .as_mut_slice()
1904                .copy_from_slice(&buf[..to_do]);
1905            self.buf_offset += to_do;
1906            if self.buf_offset == BUFFER_SIZE {
1907                self.flush().await?;
1908            }
1909            buf = &buf[to_do..];
1910        }
1911        Ok(())
1912    }
1913
1914    async fn complete(&mut self) -> Result<(), Error> {
1915        self.flush().await?;
1916        Ok(())
1917    }
1918
1919    async fn skip(&mut self, amount: u64) -> Result<(), Error> {
1920        if (BUFFER_SIZE - self.buf_offset) as u64 > amount {
1921            self.buffer
1922                .subslice_mut(self.buf_offset..self.buf_offset + amount as usize)
1923                .as_mut_slice()
1924                .fill(0);
1925            self.buf_offset += amount as usize;
1926        } else {
1927            self.flush().await?;
1928            self.offset += amount;
1929        }
1930        Ok(())
1931    }
1932
1933    /// The number of bytes written to this writer (including unflushed bytes).
1934    fn bytes_written(&self) -> u64 {
1935        self.offset + self.buf_offset as u64
1936    }
1937}
1938
1939#[cfg(test)]
1940mod tests {
1941    use crate::errors::FxfsError;
1942    use crate::filesystem::{
1943        FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem, SyncOptions,
1944    };
1945    use crate::fsck::{
1946        FsckOptions, fsck, fsck_volume, fsck_volume_with_options, fsck_with_options,
1947    };
1948    use crate::lsm_tree::Query;
1949    use crate::lsm_tree::types::{ItemRef, LayerIterator};
1950    use crate::object_handle::{
1951        ObjectHandle, ObjectProperties, ReadObjectHandle, WriteObjectHandle,
1952    };
1953    use crate::object_store::data_object_handle::{OverwriteOptions, WRITE_ATTR_BATCH_SIZE};
1954    use crate::object_store::directory::replace_child;
1955    use crate::object_store::object_record::{FsverityMetadata, ObjectKey, ObjectValue, Timestamp};
1956    use crate::object_store::transaction::{Mutation, Options, lock_keys};
1957    use crate::object_store::volume::root_volume;
1958    use crate::object_store::{
1959        AttributeKey, DEFAULT_DATA_ATTRIBUTE_ID, DataObjectHandle, Directory, ExtentKey,
1960        ExtentMode, ExtentValue, FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions, LockKey,
1961        NewChildStoreOptions, ObjectKeyData, ObjectStore, PosixAttributes, StoreOptions,
1962        TRANSACTION_MUTATION_THRESHOLD,
1963    };
1964    use crate::range::RangeExt;
1965    use crate::round::{round_down, round_up};
1966    use assert_matches::assert_matches;
1967    use bit_vec::BitVec;
1968    use fidl_fuchsia_io as fio;
1969    use fsverity_merkle::{FsVerityDescriptor, FsVerityDescriptorRaw};
1970    use fuchsia_async as fasync;
1971    use fuchsia_sync::Mutex;
1972    use futures::FutureExt;
1973    use futures::channel::oneshot::channel;
1974    use futures::stream::{FuturesUnordered, StreamExt};
1975    use fxfs_crypto::{Crypt, EncryptionKey, KeyPurpose};
1976    use fxfs_insecure_crypto::new_insecure_crypt;
1977    use std::ops::Range;
1978    use std::sync::Arc;
1979    use std::time::Duration;
1980    use storage_device::DeviceHolder;
1981    use storage_device::fake_device::FakeDevice;
1982
1983    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1984
1985    // Some tests (the preallocate_range ones) currently assume that the data only occupies a single
1986    // device block.
1987    const TEST_DATA_OFFSET: u64 = 5000;
1988    const TEST_DATA: &[u8] = b"hello";
1989    const TEST_OBJECT_SIZE: u64 = 5678;
1990    const TEST_OBJECT_ALLOCATED_SIZE: u64 = 4096;
1991    const TEST_OBJECT_NAME: &str = "foo";
1992
1993    async fn test_filesystem() -> OpenFxFilesystem {
1994        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1995        FxFilesystem::new_empty(device).await.expect("new_empty failed")
1996    }
1997
1998    async fn create_object_with_key(
1999        fs: Arc<FxFilesystem>,
2000        crypt: Option<&dyn Crypt>,
2001        write_object_test_data: bool,
2002    ) -> DataObjectHandle<ObjectStore> {
2003        let store = fs.root_store();
2004        let object;
2005
2006        let mut transaction = fs
2007            .clone()
2008            .new_transaction(
2009                lock_keys![LockKey::object(
2010                    store.store_object_id(),
2011                    store.root_directory_object_id()
2012                )],
2013                Options::default(),
2014            )
2015            .await
2016            .expect("new_transaction failed");
2017
2018        object = if let Some(crypt) = crypt {
2019            let object_id = store.get_next_object_id(transaction.txn_guard()).await.unwrap();
2020            let (key, unwrapped_key) =
2021                crypt.create_key(object_id.get(), KeyPurpose::Data).await.unwrap();
2022            ObjectStore::create_object_with_key(
2023                &store,
2024                &mut transaction,
2025                object_id,
2026                HandleOptions::default(),
2027                EncryptionKey::Fxfs(key),
2028                unwrapped_key,
2029            )
2030            .await
2031            .expect("create_object failed")
2032        } else {
2033            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2034                .await
2035                .expect("create_object failed")
2036        };
2037
2038        let root_directory =
2039            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2040        root_directory
2041            .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
2042            .await
2043            .expect("add_child_file failed");
2044
2045        if write_object_test_data {
2046            let align = TEST_DATA_OFFSET as usize % TEST_DEVICE_BLOCK_SIZE as usize;
2047            let mut buf = object.allocate_buffer(align + TEST_DATA.len()).await;
2048            buf.as_mut_slice()[align..].copy_from_slice(TEST_DATA);
2049            object
2050                .txn_write(&mut transaction, TEST_DATA_OFFSET, buf.subslice(align..))
2051                .await
2052                .expect("write failed");
2053        }
2054        transaction.commit().await.expect("commit failed");
2055        object.truncate(TEST_OBJECT_SIZE).await.expect("truncate failed");
2056        object
2057    }
2058
2059    async fn test_filesystem_and_object_with_key(
2060        crypt: Option<&dyn Crypt>,
2061        write_object_test_data: bool,
2062    ) -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
2063        let fs = test_filesystem().await;
2064        let object = create_object_with_key(fs.clone(), crypt, write_object_test_data).await;
2065        (fs, object)
2066    }
2067
2068    async fn test_filesystem_and_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
2069        test_filesystem_and_object_with_key(Some(&new_insecure_crypt()), true).await
2070    }
2071
2072    async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
2073    {
2074        test_filesystem_and_object_with_key(Some(&new_insecure_crypt()), false).await
2075    }
2076
2077    #[fuchsia::test]
2078    async fn test_zero_buf_len_read() {
2079        let (fs, object) = test_filesystem_and_object().await;
2080        let mut buf = object.allocate_buffer(0).await;
2081        assert_eq!(object.read(0u64, buf.as_mut()).await.expect("read failed"), 0);
2082        fs.close().await.expect("Close failed");
2083    }
2084
2085    #[fuchsia::test]
2086    async fn test_beyond_eof_read() {
2087        let (fs, object) = test_filesystem_and_object().await;
2088        let offset = TEST_OBJECT_SIZE as usize - 2;
2089        let align = offset % fs.block_size() as usize;
2090        let len: usize = 2;
2091        let mut buf = object.allocate_buffer(align + len + 1).await;
2092        buf.as_mut_slice().fill(123u8);
2093        assert_eq!(
2094            object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
2095            align + len
2096        );
2097        assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
2098        assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
2099        fs.close().await.expect("Close failed");
2100    }
2101
2102    #[fuchsia::test]
2103    async fn test_beyond_eof_read_from() {
2104        let (fs, object) = test_filesystem_and_object().await;
2105        let handle = &*object;
2106        let offset = TEST_OBJECT_SIZE as usize - 2;
2107        let align = offset % fs.block_size() as usize;
2108        let len: usize = 2;
2109        let mut buf = object.allocate_buffer(align + len + 1).await;
2110        buf.as_mut_slice().fill(123u8);
2111        assert_eq!(
2112            handle.read(0, (offset - align) as u64, buf.as_mut()).await.expect("read failed"),
2113            align + len
2114        );
2115        assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
2116        assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
2117        fs.close().await.expect("Close failed");
2118    }
2119
2120    #[fuchsia::test]
2121    async fn test_beyond_eof_read_unchecked() {
2122        let (fs, object) = test_filesystem_and_object().await;
2123        let offset = TEST_OBJECT_SIZE as usize - 2;
2124        let align = offset % fs.block_size() as usize;
2125        let len: usize = 2;
2126        let mut buf = object.allocate_buffer(align + len + 1).await;
2127        buf.as_mut_slice().fill(123u8);
2128        let guard = fs
2129            .lock_manager()
2130            .read_lock(lock_keys![LockKey::object_attribute(
2131                object.store().store_object_id,
2132                object.object_id(),
2133                0,
2134            )])
2135            .await;
2136        object
2137            .read_unchecked(0, (offset - align) as u64, buf.as_mut(), &guard)
2138            .await
2139            .expect("read failed");
2140        assert_eq!(&buf.as_slice()[align..], &vec![0u8; len + 1]);
2141        fs.close().await.expect("Close failed");
2142    }
2143
2144    #[fuchsia::test]
2145    async fn test_read_sparse() {
2146        let (fs, object) = test_filesystem_and_object().await;
2147        // Deliberately read not right to eof.
2148        let len = TEST_OBJECT_SIZE as usize - 1;
2149        let mut buf = object.allocate_buffer(len).await;
2150        buf.as_mut_slice().fill(123u8);
2151        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
2152        let mut expected = vec![0; len];
2153        let offset = TEST_DATA_OFFSET as usize;
2154        expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
2155        assert_eq!(buf.as_slice()[..len], expected[..]);
2156        fs.close().await.expect("Close failed");
2157    }
2158
2159    #[fuchsia::test]
2160    async fn test_read_after_writes_interspersed_with_flush() {
2161        let (fs, object) = test_filesystem_and_object().await;
2162
2163        object.owner().flush().await.expect("flush failed");
2164
2165        // Write more test data to the first block fo the file.
2166        let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
2167        buf.as_mut_slice().copy_from_slice(TEST_DATA);
2168        object.write_or_append(Some(0u64), buf.as_ref()).await.expect("write failed");
2169
2170        let len = TEST_OBJECT_SIZE as usize - 1;
2171        let mut buf = object.allocate_buffer(len).await;
2172        buf.as_mut_slice().fill(123u8);
2173        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
2174
2175        let mut expected = vec![0u8; len];
2176        let offset = TEST_DATA_OFFSET as usize;
2177        expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
2178        expected[..TEST_DATA.len()].copy_from_slice(TEST_DATA);
2179        assert_eq!(buf.as_slice(), &expected);
2180        fs.close().await.expect("Close failed");
2181    }
2182
2183    #[fuchsia::test]
2184    async fn test_read_after_truncate_and_extend() {
2185        let (fs, object) = test_filesystem_and_object().await;
2186
2187        // Arrange for there to be <extent><deleted-extent><extent>.
2188        let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
2189        buf.as_mut_slice().copy_from_slice(TEST_DATA);
2190        // This adds an extent at 0..512.
2191        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2192        // This deletes 512..1024.
2193        object.truncate(3).await.expect("truncate failed");
2194        let data = b"foo";
2195        let offset = 1500u64;
2196        let align = (offset % fs.block_size() as u64) as usize;
2197        let mut buf = object.allocate_buffer(align + data.len()).await;
2198        buf.as_mut_slice()[align..].copy_from_slice(data);
2199        // This adds 1024..1536.
2200        object.write_or_append(Some(1500), buf.subslice(align..)).await.expect("write failed");
2201
2202        const LEN1: usize = 1503;
2203        let mut buf = object.allocate_buffer(LEN1).await;
2204        buf.as_mut_slice().fill(123u8);
2205        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN1);
2206        let mut expected = [0; LEN1];
2207        expected[..3].copy_from_slice(&TEST_DATA[..3]);
2208        expected[1500..].copy_from_slice(b"foo");
2209        assert_eq!(buf.as_slice(), &expected);
2210
2211        // Also test a read that ends midway through the deleted extent.
2212        const LEN2: usize = 601;
2213        let mut buf = object.allocate_buffer(LEN2).await;
2214        buf.as_mut_slice().fill(123u8);
2215        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN2);
2216        assert_eq!(buf.as_slice(), &expected[..LEN2]);
2217        fs.close().await.expect("Close failed");
2218    }
2219
2220    #[fuchsia::test]
2221    async fn test_read_whole_blocks_with_multiple_objects() {
2222        let (fs, object) = test_filesystem_and_object().await;
2223        let block_size = object.block_size() as usize;
2224        let mut buffer = object.allocate_buffer(block_size).await;
2225        buffer.as_mut_slice().fill(0xaf);
2226        object.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2227
2228        let store = object.owner();
2229        let mut transaction = fs
2230            .clone()
2231            .new_transaction(lock_keys![], Options::default())
2232            .await
2233            .expect("new_transaction failed");
2234        let object2 =
2235            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2236                .await
2237                .expect("create_object failed");
2238        transaction.commit().await.expect("commit failed");
2239        let mut ef_buffer = object.allocate_buffer(block_size).await;
2240        ef_buffer.as_mut_slice().fill(0xef);
2241        object2.write_or_append(Some(0), ef_buffer.as_ref()).await.expect("write failed");
2242
2243        let mut buffer = object.allocate_buffer(block_size).await;
2244        buffer.as_mut_slice().fill(0xaf);
2245        object
2246            .write_or_append(Some(block_size as u64), buffer.as_ref())
2247            .await
2248            .expect("write failed");
2249        object.truncate(3 * block_size as u64).await.expect("truncate failed");
2250        object2
2251            .write_or_append(Some(block_size as u64), ef_buffer.as_ref())
2252            .await
2253            .expect("write failed");
2254
2255        let mut buffer = object.allocate_buffer(4 * block_size).await;
2256        buffer.as_mut_slice().fill(123);
2257        assert_eq!(object.read(0, buffer.as_mut()).await.expect("read failed"), 3 * block_size);
2258        assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xaf; 2 * block_size]);
2259        assert_eq!(&buffer.as_slice()[2 * block_size..3 * block_size], &vec![0; block_size]);
2260        assert_eq!(object2.read(0, buffer.as_mut()).await.expect("read failed"), 2 * block_size);
2261        assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xef; 2 * block_size]);
2262        fs.close().await.expect("Close failed");
2263    }
2264
2265    #[fuchsia::test]
2266    async fn test_alignment() {
2267        let (fs, object) = test_filesystem_and_object().await;
2268
2269        struct AlignTest {
2270            fill: u8,
2271            object: DataObjectHandle<ObjectStore>,
2272            mirror: Vec<u8>,
2273        }
2274
2275        impl AlignTest {
2276            async fn new(object: DataObjectHandle<ObjectStore>) -> Self {
2277                let mirror = {
2278                    let mut buf = object.allocate_buffer(object.get_size() as usize).await;
2279                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2280                    buf.as_slice().to_vec()
2281                };
2282                Self { fill: 0, object, mirror }
2283            }
2284
2285            // Fills |range| of self.object with a byte value (self.fill) and mirrors the same
2286            // operation to an in-memory copy of the object.
2287            // Each subsequent call bumps the value of fill.
2288            // It is expected that the object and its mirror maintain identical content.
2289            async fn test(&mut self, range: Range<u64>) {
2290                let mut buf = self.object.allocate_buffer((range.end - range.start) as usize).await;
2291                self.fill += 1;
2292                buf.as_mut_slice().fill(self.fill);
2293                self.object
2294                    .write_or_append(Some(range.start), buf.as_ref())
2295                    .await
2296                    .expect("write_or_append failed");
2297                if range.end > self.mirror.len() as u64 {
2298                    self.mirror.resize(range.end as usize, 0);
2299                }
2300                self.mirror[range.start as usize..range.end as usize].fill(self.fill);
2301                let mut buf = self.object.allocate_buffer(self.mirror.len() + 1).await;
2302                assert_eq!(
2303                    self.object.read(0, buf.as_mut()).await.expect("read failed"),
2304                    self.mirror.len()
2305                );
2306                assert_eq!(&buf.as_slice()[..self.mirror.len()], self.mirror.as_slice());
2307            }
2308        }
2309
2310        let block_size = object.block_size() as u64;
2311        let mut align = AlignTest::new(object).await;
2312
2313        // Fill the object to start with (with 1).
2314        align.test(0..2 * block_size + 1).await;
2315
2316        // Unaligned head (fills with 2, overwrites that with 3).
2317        align.test(1..block_size).await;
2318        align.test(1..2 * block_size).await;
2319
2320        // Unaligned tail (fills with 4 and 5).
2321        align.test(0..block_size - 1).await;
2322        align.test(0..2 * block_size - 1).await;
2323
2324        // Both unaligned (fills with 6 and 7).
2325        align.test(1..block_size - 1).await;
2326        align.test(1..2 * block_size - 1).await;
2327
2328        fs.close().await.expect("Close failed");
2329    }
2330
2331    async fn test_preallocate_common(fs: &FxFilesystem, object: DataObjectHandle<ObjectStore>) {
2332        let allocator = fs.allocator();
2333        let allocated_before = allocator.get_allocated_bytes();
2334        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2335        object
2336            .preallocate_range(&mut transaction, &mut (0..fs.block_size() as u64))
2337            .await
2338            .expect("preallocate_range failed");
2339        transaction.commit().await.expect("commit failed");
2340        assert!(object.get_size() < 1048576);
2341        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2342        object
2343            .preallocate_range(&mut transaction, &mut (0..1048576))
2344            .await
2345            .expect("preallocate_range failed");
2346        transaction.commit().await.expect("commit failed");
2347        assert_eq!(object.get_size(), 1048576);
2348        // Check that it didn't reallocate the space for the existing extent
2349        let allocated_after = allocator.get_allocated_bytes();
2350        assert_eq!(allocated_after - allocated_before, 1048576 - fs.block_size() as u64);
2351
2352        let mut buf = object
2353            .allocate_buffer(round_up(TEST_DATA_OFFSET, fs.block_size()).unwrap() as usize)
2354            .await;
2355        buf.as_mut_slice().fill(47);
2356        object
2357            .write_or_append(Some(0), buf.subslice(..TEST_DATA_OFFSET as usize))
2358            .await
2359            .expect("write failed");
2360        buf.as_mut_slice().fill(95);
2361        let offset = round_up(TEST_OBJECT_SIZE, fs.block_size()).unwrap();
2362        object
2363            .overwrite(offset, buf.as_mut(), OverwriteOptions::default())
2364            .await
2365            .expect("write failed");
2366
2367        // Make sure there were no more allocations.
2368        assert_eq!(allocator.get_allocated_bytes(), allocated_after);
2369
2370        // Read back the data and make sure it is what we expect.
2371        let mut buf = object.allocate_buffer(104876).await;
2372        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2373        assert_eq!(&buf.as_slice()[..TEST_DATA_OFFSET as usize], &[47; TEST_DATA_OFFSET as usize]);
2374        assert_eq!(
2375            &buf.as_slice()[TEST_DATA_OFFSET as usize..TEST_DATA_OFFSET as usize + TEST_DATA.len()],
2376            TEST_DATA
2377        );
2378        assert_eq!(&buf.as_slice()[offset as usize..offset as usize + 2048], &[95; 2048]);
2379    }
2380
2381    #[fuchsia::test]
2382    async fn test_preallocate_range() {
2383        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2384        test_preallocate_common(&fs, object).await;
2385        fs.close().await.expect("Close failed");
2386    }
2387
2388    // This is identical to the previous test except that we flush so that extents end up in
2389    // different layers.
2390    #[fuchsia::test]
2391    async fn test_preallocate_succeeds_when_extents_are_in_different_layers() {
2392        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2393        object.owner().flush().await.expect("flush failed");
2394        test_preallocate_common(&fs, object).await;
2395        fs.close().await.expect("Close failed");
2396    }
2397
2398    #[fuchsia::test]
2399    async fn test_already_preallocated() {
2400        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2401        let allocator = fs.allocator();
2402        let allocated_before = allocator.get_allocated_bytes();
2403        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2404        let offset = TEST_DATA_OFFSET - TEST_DATA_OFFSET % fs.block_size() as u64;
2405        object
2406            .preallocate_range(&mut transaction, &mut (offset..offset + fs.block_size() as u64))
2407            .await
2408            .expect("preallocate_range failed");
2409        transaction.commit().await.expect("commit failed");
2410        // Check that it didn't reallocate any new space.
2411        assert_eq!(allocator.get_allocated_bytes(), allocated_before);
2412        fs.close().await.expect("Close failed");
2413    }
2414
2415    #[fuchsia::test]
2416    async fn test_overwrite_when_preallocated_at_start_of_file() {
2417        // The standard test data we put in the test object would cause an extent with checksums
2418        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2419        let (fs, object) = test_filesystem_and_empty_object().await;
2420
2421        let object = ObjectStore::open_object(
2422            object.owner(),
2423            object.object_id(),
2424            HandleOptions::default(),
2425            None,
2426        )
2427        .await
2428        .expect("open_object failed");
2429
2430        assert_eq!(fs.block_size(), 4096);
2431
2432        let mut write_buf = object.allocate_buffer(4096).await;
2433        write_buf.as_mut_slice().fill(95);
2434
2435        // First try to overwrite without allowing allocations
2436        // We expect this to fail, since nothing is allocated yet
2437        object
2438            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2439            .await
2440            .expect_err("overwrite succeeded");
2441
2442        // Now preallocate some space (exactly one block)
2443        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2444        object
2445            .preallocate_range(&mut transaction, &mut (0..4096 as u64))
2446            .await
2447            .expect("preallocate_range failed");
2448        transaction.commit().await.expect("commit failed");
2449
2450        // Now try the same overwrite command as before, it should work this time,
2451        // even with allocations disabled...
2452        {
2453            let mut read_buf = object.allocate_buffer(4096).await;
2454            object.read(0, read_buf.as_mut()).await.expect("read failed");
2455            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2456        }
2457        object
2458            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2459            .await
2460            .expect("overwrite failed");
2461        {
2462            let mut read_buf = object.allocate_buffer(4096).await;
2463            object.read(0, read_buf.as_mut()).await.expect("read failed");
2464            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2465        }
2466
2467        // Now try to overwrite at offset 4096. We expect this to fail, since we only preallocated
2468        // one block earlier at offset 0
2469        object
2470            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2471            .await
2472            .expect_err("overwrite succeeded");
2473
2474        // We can't assert anything about the existing bytes, because they haven't been allocated
2475        // yet and they could contain any values
2476        object
2477            .overwrite(
2478                4096,
2479                write_buf.as_mut(),
2480                OverwriteOptions { allow_allocations: true, ..Default::default() },
2481            )
2482            .await
2483            .expect("overwrite failed");
2484        {
2485            let mut read_buf = object.allocate_buffer(4096).await;
2486            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2487            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2488        }
2489
2490        // Check that the overwrites haven't messed up the filesystem state
2491        let fsck_options = FsckOptions {
2492            fail_on_warning: true,
2493            no_lock: true,
2494            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2495            ..Default::default()
2496        };
2497        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2498
2499        fs.close().await.expect("Close failed");
2500    }
2501
2502    #[fuchsia::test]
2503    async fn test_overwrite_large_buffer_and_file_with_many_holes() {
2504        // The standard test data we put in the test object would cause an extent with checksums
2505        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2506        let (fs, object) = test_filesystem_and_empty_object().await;
2507
2508        let object = ObjectStore::open_object(
2509            object.owner(),
2510            object.object_id(),
2511            HandleOptions::default(),
2512            None,
2513        )
2514        .await
2515        .expect("open_object failed");
2516
2517        assert_eq!(fs.block_size(), 4096);
2518        assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2519
2520        // Let's create some non-holes
2521        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2522        object
2523            .preallocate_range(&mut transaction, &mut (4096..8192 as u64))
2524            .await
2525            .expect("preallocate_range failed");
2526        object
2527            .preallocate_range(&mut transaction, &mut (16384..32768 as u64))
2528            .await
2529            .expect("preallocate_range failed");
2530        object
2531            .preallocate_range(&mut transaction, &mut (65536..131072 as u64))
2532            .await
2533            .expect("preallocate_range failed");
2534        object
2535            .preallocate_range(&mut transaction, &mut (262144..524288 as u64))
2536            .await
2537            .expect("preallocate_range failed");
2538        transaction.commit().await.expect("commit failed");
2539
2540        assert_eq!(object.get_size(), 524288);
2541
2542        let mut write_buf = object.allocate_buffer(4096).await;
2543        write_buf.as_mut_slice().fill(95);
2544
2545        // We shouldn't be able to overwrite in the holes if new allocations aren't enabled
2546        object
2547            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2548            .await
2549            .expect_err("overwrite succeeded");
2550        object
2551            .overwrite(8192, write_buf.as_mut(), OverwriteOptions::default())
2552            .await
2553            .expect_err("overwrite succeeded");
2554        object
2555            .overwrite(32768, write_buf.as_mut(), OverwriteOptions::default())
2556            .await
2557            .expect_err("overwrite succeeded");
2558        object
2559            .overwrite(131072, write_buf.as_mut(), OverwriteOptions::default())
2560            .await
2561            .expect_err("overwrite succeeded");
2562
2563        // But we should be able to overwrite in the prealloc'd areas without needing allocations
2564        {
2565            let mut read_buf = object.allocate_buffer(4096).await;
2566            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2567            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2568        }
2569        object
2570            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2571            .await
2572            .expect("overwrite failed");
2573        {
2574            let mut read_buf = object.allocate_buffer(4096).await;
2575            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2576            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2577        }
2578        {
2579            let mut read_buf = object.allocate_buffer(4096).await;
2580            object.read(16384, read_buf.as_mut()).await.expect("read failed");
2581            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2582        }
2583        object
2584            .overwrite(16384, write_buf.as_mut(), OverwriteOptions::default())
2585            .await
2586            .expect("overwrite failed");
2587        {
2588            let mut read_buf = object.allocate_buffer(4096).await;
2589            object.read(16384, read_buf.as_mut()).await.expect("read failed");
2590            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2591        }
2592        {
2593            let mut read_buf = object.allocate_buffer(4096).await;
2594            object.read(65536, read_buf.as_mut()).await.expect("read failed");
2595            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2596        }
2597        object
2598            .overwrite(65536, write_buf.as_mut(), OverwriteOptions::default())
2599            .await
2600            .expect("overwrite failed");
2601        {
2602            let mut read_buf = object.allocate_buffer(4096).await;
2603            object.read(65536, read_buf.as_mut()).await.expect("read failed");
2604            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2605        }
2606        {
2607            let mut read_buf = object.allocate_buffer(4096).await;
2608            object.read(262144, read_buf.as_mut()).await.expect("read failed");
2609            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2610        }
2611        object
2612            .overwrite(262144, write_buf.as_mut(), OverwriteOptions::default())
2613            .await
2614            .expect("overwrite failed");
2615        {
2616            let mut read_buf = object.allocate_buffer(4096).await;
2617            object.read(262144, read_buf.as_mut()).await.expect("read failed");
2618            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2619        }
2620
2621        // Now let's try to do a huge overwrite, that spans over many holes and non-holes
2622        let mut huge_write_buf = object.allocate_buffer(524288).await;
2623        huge_write_buf.as_mut_slice().fill(96);
2624
2625        // With allocations disabled, the big overwrite should fail...
2626        object
2627            .overwrite(0, huge_write_buf.as_mut(), OverwriteOptions::default())
2628            .await
2629            .expect_err("overwrite succeeded");
2630        // ... but it should work when allocations are enabled
2631        object
2632            .overwrite(
2633                0,
2634                huge_write_buf.as_mut(),
2635                OverwriteOptions { allow_allocations: true, ..Default::default() },
2636            )
2637            .await
2638            .expect("overwrite failed");
2639        {
2640            let mut read_buf = object.allocate_buffer(524288).await;
2641            object.read(0, read_buf.as_mut()).await.expect("read failed");
2642            assert_eq!(&read_buf.as_slice(), &[96; 524288]);
2643        }
2644
2645        // Check that the overwrites haven't messed up the filesystem state
2646        let fsck_options = FsckOptions {
2647            fail_on_warning: true,
2648            no_lock: true,
2649            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2650            ..Default::default()
2651        };
2652        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2653
2654        fs.close().await.expect("Close failed");
2655    }
2656
2657    #[fuchsia::test]
2658    async fn test_overwrite_when_unallocated_at_start_of_file() {
2659        // The standard test data we put in the test object would cause an extent with checksums
2660        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2661        let (fs, object) = test_filesystem_and_empty_object().await;
2662
2663        let object = ObjectStore::open_object(
2664            object.owner(),
2665            object.object_id(),
2666            HandleOptions::default(),
2667            None,
2668        )
2669        .await
2670        .expect("open_object failed");
2671
2672        assert_eq!(fs.block_size(), 4096);
2673
2674        let mut write_buf = object.allocate_buffer(4096).await;
2675        write_buf.as_mut_slice().fill(95);
2676
2677        // First try to overwrite without allowing allocations
2678        // We expect this to fail, since nothing is allocated yet
2679        object
2680            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2681            .await
2682            .expect_err("overwrite succeeded");
2683
2684        // Now try the same overwrite command as before, but allow allocations
2685        object
2686            .overwrite(
2687                0,
2688                write_buf.as_mut(),
2689                OverwriteOptions { allow_allocations: true, ..Default::default() },
2690            )
2691            .await
2692            .expect("overwrite failed");
2693        {
2694            let mut read_buf = object.allocate_buffer(4096).await;
2695            object.read(0, read_buf.as_mut()).await.expect("read failed");
2696            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2697        }
2698
2699        // Now try to overwrite at the next block. This should fail if allocations are disabled
2700        object
2701            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2702            .await
2703            .expect_err("overwrite succeeded");
2704
2705        // ... but it should work if allocations are enabled
2706        object
2707            .overwrite(
2708                4096,
2709                write_buf.as_mut(),
2710                OverwriteOptions { allow_allocations: true, ..Default::default() },
2711            )
2712            .await
2713            .expect("overwrite failed");
2714        {
2715            let mut read_buf = object.allocate_buffer(4096).await;
2716            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2717            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2718        }
2719
2720        // Check that the overwrites haven't messed up the filesystem state
2721        let fsck_options = FsckOptions {
2722            fail_on_warning: true,
2723            no_lock: true,
2724            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2725            ..Default::default()
2726        };
2727        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2728
2729        fs.close().await.expect("Close failed");
2730    }
2731
2732    #[fuchsia::test]
2733    async fn test_overwrite_can_extend_a_file() {
2734        // The standard test data we put in the test object would cause an extent with checksums
2735        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2736        let (fs, object) = test_filesystem_and_empty_object().await;
2737
2738        let object = ObjectStore::open_object(
2739            object.owner(),
2740            object.object_id(),
2741            HandleOptions::default(),
2742            None,
2743        )
2744        .await
2745        .expect("open_object failed");
2746
2747        assert_eq!(fs.block_size(), 4096);
2748        assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2749
2750        let mut write_buf = object.allocate_buffer(4096).await;
2751        write_buf.as_mut_slice().fill(95);
2752
2753        // Let's try to fill up the last block, and increase the file size in doing so
2754        let last_block_offset = round_down(TEST_OBJECT_SIZE, 4096 as u32);
2755
2756        // Expected to fail with allocations disabled
2757        object
2758            .overwrite(last_block_offset, write_buf.as_mut(), OverwriteOptions::default())
2759            .await
2760            .expect_err("overwrite succeeded");
2761        // ... but expected to succeed with allocations enabled
2762        object
2763            .overwrite(
2764                last_block_offset,
2765                write_buf.as_mut(),
2766                OverwriteOptions { allow_allocations: true, ..Default::default() },
2767            )
2768            .await
2769            .expect("overwrite failed");
2770        {
2771            let mut read_buf = object.allocate_buffer(4096).await;
2772            object.read(last_block_offset, read_buf.as_mut()).await.expect("read failed");
2773            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2774        }
2775
2776        assert_eq!(object.get_size(), 8192);
2777
2778        // Let's try to write at the next block, too
2779        let next_block_offset = round_up(TEST_OBJECT_SIZE, 4096 as u32).unwrap();
2780
2781        // Expected to fail with allocations disabled
2782        object
2783            .overwrite(next_block_offset, write_buf.as_mut(), OverwriteOptions::default())
2784            .await
2785            .expect_err("overwrite succeeded");
2786        // ... but expected to succeed with allocations enabled
2787        object
2788            .overwrite(
2789                next_block_offset,
2790                write_buf.as_mut(),
2791                OverwriteOptions { allow_allocations: true, ..Default::default() },
2792            )
2793            .await
2794            .expect("overwrite failed");
2795        {
2796            let mut read_buf = object.allocate_buffer(4096).await;
2797            object.read(next_block_offset, read_buf.as_mut()).await.expect("read failed");
2798            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2799        }
2800
2801        assert_eq!(object.get_size(), 12288);
2802
2803        // Check that the overwrites haven't messed up the filesystem state
2804        let fsck_options = FsckOptions {
2805            fail_on_warning: true,
2806            no_lock: true,
2807            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2808            ..Default::default()
2809        };
2810        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2811
2812        fs.close().await.expect("Close failed");
2813    }
2814
2815    #[fuchsia::test]
2816    async fn test_enable_verity() {
2817        let fs: OpenFxFilesystem = test_filesystem().await;
2818        let mut transaction = fs
2819            .clone()
2820            .new_transaction(lock_keys![], Options::default())
2821            .await
2822            .expect("new_transaction failed");
2823        let store = fs.root_store();
2824        let object = Arc::new(
2825            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2826                .await
2827                .expect("create_object failed"),
2828        );
2829
2830        transaction.commit().await.unwrap();
2831
2832        object
2833            .enable_verity(fio::VerificationOptions {
2834                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2835                salt: Some(vec![]),
2836                ..Default::default()
2837            })
2838            .await
2839            .expect("set verified file metadata failed");
2840
2841        let handle =
2842            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2843                .await
2844                .expect("open_object failed");
2845
2846        assert!(handle.is_verified_file());
2847
2848        fs.close().await.expect("Close failed");
2849    }
2850
2851    #[fuchsia::test]
2852    async fn test_enable_verity_large_file() {
2853        // Need to make a large FakeDevice to create space for a 67 MB file.
2854        let device = DeviceHolder::new(FakeDevice::new(262144, TEST_DEVICE_BLOCK_SIZE));
2855        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2856        let root_store = fs.root_store();
2857        let mut transaction = fs
2858            .clone()
2859            .new_transaction(lock_keys![], Options::default())
2860            .await
2861            .expect("new_transaction failed");
2862
2863        let handle = ObjectStore::create_object(
2864            &root_store,
2865            &mut transaction,
2866            HandleOptions::default(),
2867            None,
2868        )
2869        .await
2870        .expect("failed to create object");
2871        transaction.commit().await.expect("commit failed");
2872        let mut offset = 0;
2873
2874        // Write a file big enough to trigger multiple transactions on enable_verity().
2875        let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2876        buf.as_mut_slice().fill(1);
2877        for _ in 0..130 {
2878            handle.write_or_append(Some(offset), buf.as_ref()).await.expect("write failed");
2879            offset += WRITE_ATTR_BATCH_SIZE as u64;
2880        }
2881
2882        handle
2883            .enable_verity(fio::VerificationOptions {
2884                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2885                salt: Some(vec![]),
2886                ..Default::default()
2887            })
2888            .await
2889            .expect("set verified file metadata failed");
2890
2891        let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2892        offset = 0;
2893        for _ in 0..130 {
2894            handle.read(offset, buf.as_mut()).await.expect("verification during read should fail");
2895            assert_eq!(buf.as_slice(), &[1; WRITE_ATTR_BATCH_SIZE]);
2896            offset += WRITE_ATTR_BATCH_SIZE as u64;
2897        }
2898
2899        fsck(fs.clone()).await.expect("fsck failed");
2900        fs.close().await.expect("Close failed");
2901    }
2902
2903    #[fuchsia::test]
2904    async fn test_retry_enable_verity_on_reboot() {
2905        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2906        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2907        let root_store = fs.root_store();
2908        let mut transaction = fs
2909            .clone()
2910            .new_transaction(lock_keys![], Options::default())
2911            .await
2912            .expect("new_transaction failed");
2913
2914        let handle = ObjectStore::create_object(
2915            &root_store,
2916            &mut transaction,
2917            HandleOptions::default(),
2918            None,
2919        )
2920        .await
2921        .expect("failed to create object");
2922        transaction.commit().await.expect("commit failed");
2923
2924        let object_id = {
2925            let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
2926            transaction.add(
2927                root_store.store_object_id(),
2928                Mutation::replace_or_insert_object(
2929                    ObjectKey::graveyard_attribute_entry(
2930                        root_store.graveyard_directory_object_id(),
2931                        handle.object_id(),
2932                        FSVERITY_MERKLE_ATTRIBUTE_ID,
2933                    ),
2934                    ObjectValue::Some,
2935                ),
2936            );
2937
2938            // This write should span three transactions. This test mimics the behavior when the
2939            // last transaction gets interrupted by a filesystem.close().
2940            handle
2941                .write_new_attr_in_batches(
2942                    &mut transaction,
2943                    FSVERITY_MERKLE_ATTRIBUTE_ID,
2944                    &vec![0; 2 * WRITE_ATTR_BATCH_SIZE],
2945                    WRITE_ATTR_BATCH_SIZE,
2946                )
2947                .await
2948                .expect("failed to write merkle attribute");
2949
2950            handle.object_id()
2951            // Drop the transaction to simulate interrupting the merkle tree creation as well as to
2952            // release the transaction locks.
2953        };
2954
2955        fs.close().await.expect("failed to close filesystem");
2956        let device = fs.take_device().await;
2957        device.reopen(false);
2958
2959        let fs =
2960            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
2961        fsck(fs.clone()).await.expect("fsck failed");
2962        fs.close().await.expect("failed to close filesystem");
2963        let device = fs.take_device().await;
2964        device.reopen(false);
2965
2966        // On open, the filesystem will call initial_reap which will call queue_tombstone().
2967        let fs = FxFilesystem::open(device).await.expect("open failed");
2968        let root_store = fs.root_store();
2969        let handle =
2970            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
2971                .await
2972                .expect("open_object failed");
2973        handle
2974            .enable_verity(fio::VerificationOptions {
2975                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2976                salt: Some(vec![]),
2977                ..Default::default()
2978            })
2979            .await
2980            .expect("set verified file metadata failed");
2981
2982        // `flush` will ensure that initial reap fully processes all the graveyard entries. This
2983        // isn't strictly necessary for the test to pass (the graveyard marker was already
2984        // processed during `enable_verity`), but it does help catch bugs, such as the attribute
2985        // graveyard entry not being removed upon processing.
2986        fs.graveyard().flush().await;
2987        assert!(
2988            FsVerityDescriptor::from_bytes(
2989                &handle
2990                    .read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID)
2991                    .await
2992                    .expect("read_attr failed")
2993                    .expect("No attr found"),
2994                handle.block_size() as usize
2995            )
2996            .is_ok()
2997        );
2998        fsck(fs.clone()).await.expect("fsck failed");
2999        fs.close().await.expect("Close failed");
3000    }
3001
3002    #[fuchsia::test]
3003    async fn test_verify_data_corrupt_file() {
3004        let fs: OpenFxFilesystem = test_filesystem().await;
3005        let mut transaction = fs
3006            .clone()
3007            .new_transaction(lock_keys![], Options::default())
3008            .await
3009            .expect("new_transaction failed");
3010        let store = fs.root_store();
3011        let object = Arc::new(
3012            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3013                .await
3014                .expect("create_object failed"),
3015        );
3016
3017        transaction.commit().await.unwrap();
3018
3019        let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3020        buf.as_mut_slice().fill(123);
3021        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3022
3023        object
3024            .enable_verity(fio::VerificationOptions {
3025                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
3026                salt: Some(vec![]),
3027                ..Default::default()
3028            })
3029            .await
3030            .expect("set verified file metadata failed");
3031
3032        // Change file contents and ensure verification fails
3033        buf.as_mut_slice().fill(234);
3034        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3035        object.read(0, buf.as_mut()).await.expect_err("verification during read should fail");
3036
3037        fs.close().await.expect("Close failed");
3038    }
3039
3040    // TODO(https://fxbug.dev/450398331): More tests to be added when this can support writing the
3041    // f2fs format natively. For now, relying on tests inside of the f2fs_reader to exercise more
3042    // paths.
3043    #[fuchsia::test]
3044    async fn test_parse_f2fs_verity() {
3045        let fs: OpenFxFilesystem = test_filesystem().await;
3046        let mut transaction = fs
3047            .clone()
3048            .new_transaction(lock_keys![], Options::default())
3049            .await
3050            .expect("new_transaction failed");
3051        let store = fs.root_store();
3052        let object = Arc::new(
3053            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3054                .await
3055                .expect("create_object failed"),
3056        );
3057
3058        transaction.commit().await.unwrap();
3059        let file_size = fs.block_size() * 2;
3060        // Write over one block to make there be leaf hashes.
3061        {
3062            let mut buf = object.allocate_buffer(file_size as usize).await;
3063            buf.as_mut_slice().fill(64);
3064            assert_eq!(
3065                object.write_or_append(None, buf.as_ref()).await.expect("Writing to file."),
3066                file_size
3067            );
3068        }
3069
3070        // Enable verity normally, then shift the type.
3071        object
3072            .enable_verity(fio::VerificationOptions {
3073                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
3074                salt: Some(vec![]),
3075                ..Default::default()
3076            })
3077            .await
3078            .expect("set verified file metadata failed");
3079        let (verity_info, root_hash) = object.get_descriptor().unwrap();
3080
3081        let mut transaction = fs
3082            .clone()
3083            .new_transaction(
3084                lock_keys![LockKey::Object {
3085                    store_object_id: store.store_object_id(),
3086                    object_id: object.object_id()
3087                }],
3088                Options::default(),
3089            )
3090            .await
3091            .expect("new_transaction failed");
3092        transaction.add(
3093            store.store_object_id(),
3094            Mutation::replace_or_insert_object(
3095                ObjectKey::attribute(
3096                    object.object_id(),
3097                    DEFAULT_DATA_ATTRIBUTE_ID,
3098                    AttributeKey::Attribute,
3099                ),
3100                ObjectValue::verified_attribute(
3101                    file_size,
3102                    FsverityMetadata::F2fs(0..(fs.block_size() * 2)),
3103                ),
3104            ),
3105        );
3106        transaction.add(
3107            store.store_object_id(),
3108            Mutation::replace_or_insert_object(
3109                ObjectKey::attribute(
3110                    object.object_id(),
3111                    FSVERITY_MERKLE_ATTRIBUTE_ID,
3112                    AttributeKey::Attribute,
3113                ),
3114                ObjectValue::attribute(fs.block_size() * 2, false),
3115            ),
3116        );
3117        {
3118            let descriptor = FsVerityDescriptorRaw::new(
3119                fio::HashAlgorithm::Sha256,
3120                fs.block_size(),
3121                file_size,
3122                root_hash.as_slice(),
3123                match &verity_info.salt {
3124                    Some(salt) => salt.as_slice(),
3125                    None => [0u8; 0].as_slice(),
3126                },
3127            )
3128            .expect("Creating descriptor");
3129            let mut buf = object.allocate_buffer(fs.block_size() as usize).await;
3130            descriptor.write_to_slice(buf.as_mut_slice()).expect("Writing descriptor to buf");
3131            object
3132                .multi_write(
3133                    &mut transaction,
3134                    FSVERITY_MERKLE_ATTRIBUTE_ID,
3135                    &[fs.block_size()..(fs.block_size() * 2)],
3136                    buf.as_mut(),
3137                )
3138                .await
3139                .expect("Writing descriptor");
3140        }
3141        transaction.commit().await.unwrap();
3142
3143        let handle =
3144            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3145                .await
3146                .expect("open_object failed");
3147
3148        assert!(handle.is_verified_file());
3149
3150        let mut buf = object.allocate_buffer(file_size as usize).await;
3151        assert_eq!(
3152            handle.read(0, buf.as_mut()).await.expect("Read whole file."),
3153            file_size as usize
3154        );
3155
3156        fs.close().await.expect("Close failed");
3157    }
3158
3159    #[fuchsia::test]
3160    async fn test_verify_data_corrupt_tree() {
3161        let fs: OpenFxFilesystem = test_filesystem().await;
3162        let object_id = {
3163            let store = fs.root_store();
3164            let mut transaction = fs
3165                .clone()
3166                .new_transaction(lock_keys![], Options::default())
3167                .await
3168                .expect("new_transaction failed");
3169            let object = Arc::new(
3170                ObjectStore::create_object(
3171                    &store,
3172                    &mut transaction,
3173                    HandleOptions::default(),
3174                    None,
3175                )
3176                .await
3177                .expect("create_object failed"),
3178            );
3179            let object_id = object.object_id();
3180
3181            transaction.commit().await.unwrap();
3182
3183            let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3184            buf.as_mut_slice().fill(123);
3185            object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3186
3187            object
3188                .enable_verity(fio::VerificationOptions {
3189                    hash_algorithm: Some(fio::HashAlgorithm::Sha256),
3190                    salt: Some(vec![]),
3191                    ..Default::default()
3192                })
3193                .await
3194                .expect("set verified file metadata failed");
3195            object.read(0, buf.as_mut()).await.expect("verified read");
3196
3197            // Corrupt the merkle tree before closing.
3198            let mut merkle = object
3199                .read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID)
3200                .await
3201                .unwrap()
3202                .expect("Reading merkle tree");
3203            merkle[0] = merkle[0].wrapping_add(1);
3204            object
3205                .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &*merkle)
3206                .await
3207                .expect("Overwriting merkle");
3208
3209            object_id
3210        }; // Close object.
3211
3212        // Reopening the object should complain about the corrupted merkle tree.
3213        assert!(
3214            ObjectStore::open_object(&fs.root_store(), object_id, HandleOptions::default(), None)
3215                .await
3216                .is_err()
3217        );
3218        fs.close().await.expect("Close failed");
3219    }
3220
3221    #[fuchsia::test]
3222    async fn test_extend() {
3223        let fs = test_filesystem().await;
3224        let handle;
3225        let mut transaction = fs
3226            .clone()
3227            .new_transaction(lock_keys![], Options::default())
3228            .await
3229            .expect("new_transaction failed");
3230        let store = fs.root_store();
3231        handle =
3232            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3233                .await
3234                .expect("create_object failed");
3235
3236        // As of writing, an empty filesystem has two 512kiB superblock extents and a little over
3237        // 256kiB of additional allocations (journal, etc) so we start use a 'magic' starting point
3238        // of 2MiB here.
3239        const START_OFFSET: u64 = 2048 * 1024;
3240        handle
3241            .extend(&mut transaction, START_OFFSET..START_OFFSET + 5 * fs.block_size() as u64)
3242            .await
3243            .expect("extend failed");
3244        transaction.commit().await.expect("commit failed");
3245        let mut buf = handle.allocate_buffer(5 * fs.block_size() as usize).await;
3246        buf.as_mut_slice().fill(123);
3247        handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3248        buf.as_mut_slice().fill(67);
3249        handle.read(0, buf.as_mut()).await.expect("read failed");
3250        assert_eq!(buf.as_slice(), &vec![123; 5 * fs.block_size() as usize]);
3251        fs.close().await.expect("Close failed");
3252    }
3253
3254    #[fuchsia::test]
3255    async fn test_truncate_deallocates_old_extents() {
3256        let (fs, object) = test_filesystem_and_object().await;
3257        let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3258        buf.as_mut_slice().fill(0xaa);
3259        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3260
3261        let allocator = fs.allocator();
3262        let allocated_before = allocator.get_allocated_bytes();
3263        object.truncate(fs.block_size() as u64).await.expect("truncate failed");
3264        let allocated_after = allocator.get_allocated_bytes();
3265        assert!(
3266            allocated_after < allocated_before,
3267            "before = {} after = {}",
3268            allocated_before,
3269            allocated_after
3270        );
3271        fs.close().await.expect("Close failed");
3272    }
3273
3274    #[fuchsia::test]
3275    async fn test_truncate_zeroes_tail_block() {
3276        let (fs, object) = test_filesystem_and_object().await;
3277
3278        WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + 3).await.expect("truncate failed");
3279        WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + TEST_DATA.len() as u64)
3280            .await
3281            .expect("truncate failed");
3282
3283        let mut buf = object.allocate_buffer(fs.block_size() as usize).await;
3284        let offset = (TEST_DATA_OFFSET % fs.block_size()) as usize;
3285        object.read(TEST_DATA_OFFSET - offset as u64, buf.as_mut()).await.expect("read failed");
3286
3287        let mut expected = TEST_DATA.to_vec();
3288        expected[3..].fill(0);
3289        assert_eq!(&buf.as_slice()[offset..offset + expected.len()], &expected);
3290    }
3291
3292    #[fuchsia::test]
3293    async fn test_trim() {
3294        // Format a new filesystem.
3295        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
3296        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
3297        let block_size = fs.block_size();
3298        root_volume(fs.clone())
3299            .await
3300            .expect("root_volume failed")
3301            .new_volume("test", NewChildStoreOptions::default())
3302            .await
3303            .expect("volume failed");
3304        fs.close().await.expect("close failed");
3305        let device = fs.take_device().await;
3306        device.reopen(false);
3307
3308        // To test trim, we open the filesystem and set up a post commit hook that runs after every
3309        // transaction.  When the hook triggers, we can fsck the volume, take a snapshot of the
3310        // device and check that it gets replayed correctly on the snapshot.  We can check that the
3311        // graveyard trims the file as expected.
3312        #[derive(Default)]
3313        struct Context {
3314            store: Option<Arc<ObjectStore>>,
3315            object_id: Option<u64>,
3316        }
3317        let shared_context = Arc::new(Mutex::new(Context::default()));
3318
3319        let object_size = (TRANSACTION_MUTATION_THRESHOLD as u64 + 10) * 2 * block_size;
3320
3321        // Wait for an object to get tombstoned by the graveyard.
3322        async fn expect_tombstoned(store: &Arc<ObjectStore>, object_id: u64) {
3323            loop {
3324                if let Err(e) =
3325                    ObjectStore::open_object(store, object_id, HandleOptions::default(), None).await
3326                {
3327                    assert!(
3328                        FxfsError::NotFound.matches(&e),
3329                        "open_object didn't fail with NotFound: {:?}",
3330                        e
3331                    );
3332                    break;
3333                }
3334                // The graveyard should eventually tombstone the object.
3335                fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3336            }
3337        }
3338
3339        // Checks to see if the object needs to be trimmed.
3340        async fn needs_trim(store: &Arc<ObjectStore>) -> Option<DataObjectHandle<ObjectStore>> {
3341            let root_directory = Directory::open(store, store.root_directory_object_id())
3342                .await
3343                .expect("open failed");
3344            let oid = root_directory.lookup("foo").await.expect("lookup failed");
3345            if let Some((oid, _, _)) = oid {
3346                let object = ObjectStore::open_object(store, oid, HandleOptions::default(), None)
3347                    .await
3348                    .expect("open_object failed");
3349                let props = object.get_properties().await.expect("get_properties failed");
3350                if props.allocated_size > 0 && props.data_attribute_size == 0 {
3351                    Some(object)
3352                } else {
3353                    None
3354                }
3355            } else {
3356                None
3357            }
3358        }
3359
3360        let shared_context_clone = shared_context.clone();
3361        let post_commit = move || {
3362            let store = shared_context_clone.lock().store.as_ref().cloned().unwrap();
3363            let shared_context = shared_context_clone.clone();
3364            async move {
3365                // First run fsck on the current filesystem.
3366                let options = FsckOptions {
3367                    fail_on_warning: true,
3368                    no_lock: true,
3369                    on_error: Box::new(|err| println!("fsck error: {:?}", err)),
3370                    ..Default::default()
3371                };
3372                let fs = store.filesystem();
3373
3374                fsck_with_options(fs.clone(), &options).await.expect("fsck_with_options failed");
3375                fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
3376                    .await
3377                    .expect("fsck_volume_with_options failed");
3378
3379                // Now check that we can replay this correctly.
3380                fs.sync(SyncOptions { flush_device: true, ..Default::default() })
3381                    .await
3382                    .expect("sync failed");
3383                let device = fs.device().snapshot().expect("snapshot failed");
3384
3385                let object_id = shared_context.lock().object_id.clone();
3386
3387                let fs2 = FxFilesystemBuilder::new()
3388                    .skip_initial_reap(object_id.is_none())
3389                    .open(device)
3390                    .await
3391                    .expect("open failed");
3392
3393                // If the "foo" file exists check that allocated size matches content size.
3394                let root_vol = root_volume(fs2.clone()).await.expect("root_volume failed");
3395                let store =
3396                    root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
3397
3398                if let Some(oid) = object_id {
3399                    // For the second pass, the object should get tombstoned.
3400                    expect_tombstoned(&store, oid).await;
3401                } else if let Some(object) = needs_trim(&store).await {
3402                    // Extend the file and make sure that it is correctly trimmed.
3403                    object.truncate(object_size).await.expect("truncate failed");
3404                    let mut buf = object.allocate_buffer(block_size as usize).await;
3405                    object
3406                        .read(object_size - block_size * 2, buf.as_mut())
3407                        .await
3408                        .expect("read failed");
3409                    assert_eq!(buf.as_slice(), &vec![0; block_size as usize]);
3410
3411                    // Remount, this time with the graveyard performing an initial reap and the
3412                    // object should get trimmed.
3413                    let fs = FxFilesystem::open(fs.device().snapshot().expect("snapshot failed"))
3414                        .await
3415                        .expect("open failed");
3416                    let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
3417                    let store = root_vol
3418                        .volume("test", StoreOptions::default())
3419                        .await
3420                        .expect("volume failed");
3421                    while needs_trim(&store).await.is_some() {
3422                        // The object has been truncated, but still has some data allocated to
3423                        // it.  The graveyard should trim the object eventually.
3424                        fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3425                    }
3426
3427                    // Run fsck.
3428                    fsck_with_options(fs.clone(), &options)
3429                        .await
3430                        .expect("fsck_with_options failed");
3431                    fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
3432                        .await
3433                        .expect("fsck_volume_with_options failed");
3434                    fs.close().await.expect("close failed");
3435                }
3436
3437                // Run fsck on fs2.
3438                fsck_with_options(fs2.clone(), &options).await.expect("fsck_with_options failed");
3439                fsck_volume_with_options(fs2.as_ref(), &options, store.store_object_id(), None)
3440                    .await
3441                    .expect("fsck_volume_with_options failed");
3442                fs2.close().await.expect("close failed");
3443            }
3444            .boxed()
3445        };
3446
3447        let fs = FxFilesystemBuilder::new()
3448            .post_commit_hook(post_commit)
3449            .open(device)
3450            .await
3451            .expect("open failed");
3452
3453        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
3454        let store = root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
3455
3456        shared_context.lock().store = Some(store.clone());
3457
3458        let root_directory =
3459            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3460
3461        let object;
3462        let mut transaction = fs
3463            .clone()
3464            .new_transaction(
3465                lock_keys![LockKey::object(
3466                    store.store_object_id(),
3467                    store.root_directory_object_id()
3468                )],
3469                Options::default(),
3470            )
3471            .await
3472            .expect("new_transaction failed");
3473        object = root_directory
3474            .create_child_file(&mut transaction, "foo")
3475            .await
3476            .expect("create_object failed");
3477        transaction.commit().await.expect("commit failed");
3478
3479        let mut transaction = fs
3480            .clone()
3481            .new_transaction(
3482                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3483                Options::default(),
3484            )
3485            .await
3486            .expect("new_transaction failed");
3487
3488        // Two passes: first with a regular object, and then with that object moved into the
3489        // graveyard.
3490        let mut pass = 0;
3491        loop {
3492            // Create enough extents in it such that when we truncate the object it will require
3493            // more than one transaction.
3494            let mut buf = object.allocate_buffer(5).await;
3495            buf.as_mut_slice().fill(1);
3496            // Write every other block.
3497            for offset in (0..object_size).into_iter().step_by(2 * block_size as usize) {
3498                object
3499                    .txn_write(&mut transaction, offset, buf.as_ref())
3500                    .await
3501                    .expect("write failed");
3502            }
3503            transaction.commit().await.expect("commit failed");
3504            // This should take up more than one transaction.
3505            WriteObjectHandle::truncate(&object, 0).await.expect("truncate failed");
3506
3507            if pass == 1 {
3508                break;
3509            }
3510
3511            // Store the object ID so that we can make sure the object is always tombstoned
3512            // after remount (see above).
3513            shared_context.lock().object_id = Some(object.object_id());
3514
3515            transaction = fs
3516                .clone()
3517                .new_transaction(
3518                    lock_keys![
3519                        LockKey::object(store.store_object_id(), store.root_directory_object_id()),
3520                        LockKey::object(store.store_object_id(), object.object_id()),
3521                    ],
3522                    Options::default(),
3523                )
3524                .await
3525                .expect("new_transaction failed");
3526
3527            // Move the object into the graveyard.
3528            replace_child(&mut transaction, None, (&root_directory, "foo"))
3529                .await
3530                .expect("replace_child failed");
3531            store.add_to_graveyard(&mut transaction, object.object_id());
3532
3533            pass += 1;
3534        }
3535
3536        fs.close().await.expect("Close failed");
3537    }
3538
3539    #[fuchsia::test]
3540    async fn test_adjust_refs() {
3541        let (fs, object) = test_filesystem_and_object().await;
3542        let store = object.owner();
3543        let mut transaction = fs
3544            .clone()
3545            .new_transaction(
3546                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3547                Options::default(),
3548            )
3549            .await
3550            .expect("new_transaction failed");
3551        assert_eq!(
3552            store
3553                .adjust_refs(&mut transaction, object.object_id(), 1)
3554                .await
3555                .expect("adjust_refs failed"),
3556            false
3557        );
3558        transaction.commit().await.expect("commit failed");
3559
3560        let allocator = fs.allocator();
3561        let allocated_before = allocator.get_allocated_bytes();
3562        let mut transaction = fs
3563            .clone()
3564            .new_transaction(
3565                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3566                Options::default(),
3567            )
3568            .await
3569            .expect("new_transaction failed");
3570        assert_eq!(
3571            store
3572                .adjust_refs(&mut transaction, object.object_id(), -2)
3573                .await
3574                .expect("adjust_refs failed"),
3575            true
3576        );
3577        transaction.commit().await.expect("commit failed");
3578
3579        assert_eq!(allocator.get_allocated_bytes(), allocated_before);
3580
3581        store
3582            .tombstone_object(
3583                object.object_id(),
3584                Options { borrow_metadata_space: true, ..Default::default() },
3585            )
3586            .await
3587            .expect("purge failed");
3588
3589        assert_eq!(allocated_before - allocator.get_allocated_bytes(), fs.block_size() as u64);
3590
3591        // We need to remove the directory entry, too, otherwise fsck will complain
3592        {
3593            let mut transaction = fs
3594                .clone()
3595                .new_transaction(
3596                    lock_keys![LockKey::object(
3597                        store.store_object_id(),
3598                        store.root_directory_object_id()
3599                    )],
3600                    Options::default(),
3601                )
3602                .await
3603                .expect("new_transaction failed");
3604            let root_directory = Directory::open(&store, store.root_directory_object_id())
3605                .await
3606                .expect("open failed");
3607            transaction.add(
3608                store.store_object_id(),
3609                Mutation::replace_or_insert_object(
3610                    ObjectKey::child(root_directory.object_id(), TEST_OBJECT_NAME, false),
3611                    ObjectValue::None,
3612                ),
3613            );
3614            transaction.commit().await.expect("commit failed");
3615        }
3616
3617        fsck_with_options(
3618            fs.clone(),
3619            &FsckOptions {
3620                fail_on_warning: true,
3621                on_error: Box::new(|err| println!("fsck error: {:?}", err)),
3622                ..Default::default()
3623            },
3624        )
3625        .await
3626        .expect("fsck_with_options failed");
3627
3628        fs.close().await.expect("Close failed");
3629    }
3630
3631    #[fuchsia::test]
3632    async fn test_locks() {
3633        let (fs, object) = test_filesystem_and_object().await;
3634        let (send1, recv1) = channel();
3635        let (send2, recv2) = channel();
3636        let (send3, recv3) = channel();
3637        let done = Mutex::new(false);
3638        let mut futures = FuturesUnordered::new();
3639        futures.push(
3640            async {
3641                let mut t = object.new_transaction().await.expect("new_transaction failed");
3642                send1.send(()).unwrap(); // Tell the next future to continue.
3643                send3.send(()).unwrap(); // Tell the last future to continue.
3644                recv2.await.unwrap();
3645                let mut buf = object.allocate_buffer(5).await;
3646                buf.as_mut_slice().copy_from_slice(b"hello");
3647                object.txn_write(&mut t, 0, buf.as_ref()).await.expect("write failed");
3648                // This is a halting problem so all we can do is sleep.
3649                fasync::Timer::new(Duration::from_millis(100)).await;
3650                assert!(!*done.lock());
3651                t.commit().await.expect("commit failed");
3652            }
3653            .boxed(),
3654        );
3655        futures.push(
3656            async {
3657                recv1.await.unwrap();
3658                // Reads should not block.
3659                let offset = TEST_DATA_OFFSET as usize;
3660                let align = offset % fs.block_size() as usize;
3661                let len = TEST_DATA.len();
3662                let mut buf = object.allocate_buffer(align + len).await;
3663                assert_eq!(
3664                    object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
3665                    align + TEST_DATA.len()
3666                );
3667                assert_eq!(&buf.as_slice()[align..], TEST_DATA);
3668                // Tell the first future to continue.
3669                send2.send(()).unwrap();
3670            }
3671            .boxed(),
3672        );
3673        futures.push(
3674            async {
3675                // This should block until the first future has completed.
3676                recv3.await.unwrap();
3677                let _t = object.new_transaction().await.expect("new_transaction failed");
3678                let mut buf = object.allocate_buffer(5).await;
3679                assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 5);
3680                assert_eq!(buf.as_slice(), b"hello");
3681            }
3682            .boxed(),
3683        );
3684        while let Some(()) = futures.next().await {}
3685        fs.close().await.expect("Close failed");
3686    }
3687
3688    #[fuchsia::test(threads = 10)]
3689    async fn test_racy_reads() {
3690        let fs = test_filesystem().await;
3691        let object;
3692        let mut transaction = fs
3693            .clone()
3694            .new_transaction(lock_keys![], Options::default())
3695            .await
3696            .expect("new_transaction failed");
3697        let store = fs.root_store();
3698        object = Arc::new(
3699            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3700                .await
3701                .expect("create_object failed"),
3702        );
3703        transaction.commit().await.expect("commit failed");
3704        for _ in 0..100 {
3705            let cloned_object = object.clone();
3706            let writer = fasync::Task::spawn(async move {
3707                let mut buf = cloned_object.allocate_buffer(10).await;
3708                buf.as_mut_slice().fill(123);
3709                cloned_object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3710            });
3711            let cloned_object = object.clone();
3712            let reader = fasync::Task::spawn(async move {
3713                let wait_time = rand::random_range(0..5);
3714                fasync::Timer::new(Duration::from_millis(wait_time)).await;
3715                let mut buf = cloned_object.allocate_buffer(10).await;
3716                buf.as_mut_slice().fill(23);
3717                let amount = cloned_object.read(0, buf.as_mut()).await.expect("write failed");
3718                // If we succeed in reading data, it must include the write; i.e. if we see the size
3719                // change, we should see the data too.  For this to succeed it requires locking on
3720                // the read size to ensure that when we read the size, we get the extents changed in
3721                // that same transaction.
3722                if amount != 0 {
3723                    assert_eq!(amount, 10);
3724                    assert_eq!(buf.as_slice(), &[123; 10]);
3725                }
3726            });
3727            writer.await;
3728            reader.await;
3729            object.truncate(0).await.expect("truncate failed");
3730        }
3731        fs.close().await.expect("Close failed");
3732    }
3733
3734    #[fuchsia::test]
3735    async fn test_allocated_size() {
3736        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
3737
3738        let before = object.get_properties().await.expect("get_properties failed").allocated_size;
3739        let mut buf = object.allocate_buffer(5).await;
3740        buf.as_mut_slice().copy_from_slice(b"hello");
3741        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3742        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3743        assert_eq!(after, before + fs.block_size() as u64);
3744
3745        // Do the same write again and there should be no change.
3746        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3747        assert_eq!(
3748            object.get_properties().await.expect("get_properties failed").allocated_size,
3749            after
3750        );
3751
3752        // extend...
3753        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3754        let offset = 1000 * fs.block_size() as u64;
3755        let before = after;
3756        object
3757            .extend(&mut transaction, offset..offset + fs.block_size() as u64)
3758            .await
3759            .expect("extend failed");
3760        transaction.commit().await.expect("commit failed");
3761        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3762        assert_eq!(after, before + fs.block_size() as u64);
3763
3764        // truncate...
3765        let before = after;
3766        let size = object.get_size();
3767        object.truncate(size - fs.block_size() as u64).await.expect("extend failed");
3768        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3769        assert_eq!(after, before - fs.block_size() as u64);
3770
3771        // preallocate_range...
3772        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3773        let before = after;
3774        let mut file_range = offset..offset + fs.block_size() as u64;
3775        object.preallocate_range(&mut transaction, &mut file_range).await.expect("extend failed");
3776        transaction.commit().await.expect("commit failed");
3777        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3778        assert_eq!(after, before + fs.block_size() as u64);
3779        fs.close().await.expect("Close failed");
3780    }
3781
3782    #[fuchsia::test(threads = 10)]
3783    async fn test_zero() {
3784        let (fs, object) = test_filesystem_and_object().await;
3785        let expected_size = object.get_size();
3786        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3787        object.zero(&mut transaction, 0..fs.block_size() as u64 * 10).await.expect("zero failed");
3788        transaction.commit().await.expect("commit failed");
3789        assert_eq!(object.get_size(), expected_size);
3790        let mut buf = object.allocate_buffer(fs.block_size() as usize * 10).await;
3791        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed") as u64, expected_size);
3792        assert_eq!(
3793            &buf.as_slice()[0..expected_size as usize],
3794            vec![0u8; expected_size as usize].as_slice()
3795        );
3796        fs.close().await.expect("Close failed");
3797    }
3798
3799    #[fuchsia::test]
3800    async fn test_properties() {
3801        let (fs, object) = test_filesystem_and_object().await;
3802        const CRTIME: Timestamp = Timestamp::from_nanos(1234);
3803        const MTIME: Timestamp = Timestamp::from_nanos(5678);
3804        const CTIME: Timestamp = Timestamp::from_nanos(8765);
3805
3806        // ObjectProperties can be updated through `update_attributes`.
3807        // `get_properties` should reflect the latest changes.
3808        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3809        object
3810            .update_attributes(
3811                &mut transaction,
3812                Some(&fio::MutableNodeAttributes {
3813                    creation_time: Some(CRTIME.as_nanos()),
3814                    modification_time: Some(MTIME.as_nanos()),
3815                    mode: Some(111),
3816                    gid: Some(222),
3817                    ..Default::default()
3818                }),
3819                None,
3820            )
3821            .await
3822            .expect("update_attributes failed");
3823        const MTIME_NEW: Timestamp = Timestamp::from_nanos(12345678);
3824        object
3825            .update_attributes(
3826                &mut transaction,
3827                Some(&fio::MutableNodeAttributes {
3828                    modification_time: Some(MTIME_NEW.as_nanos()),
3829                    gid: Some(333),
3830                    rdev: Some(444),
3831                    ..Default::default()
3832                }),
3833                Some(CTIME),
3834            )
3835            .await
3836            .expect("update_timestamps failed");
3837        transaction.commit().await.expect("commit failed");
3838
3839        let properties = object.get_properties().await.expect("get_properties failed");
3840        assert_matches!(
3841            properties,
3842            ObjectProperties {
3843                refs: 1u64,
3844                allocated_size: TEST_OBJECT_ALLOCATED_SIZE,
3845                data_attribute_size: TEST_OBJECT_SIZE,
3846                creation_time: CRTIME,
3847                modification_time: MTIME_NEW,
3848                posix_attributes: Some(PosixAttributes { mode: 111, gid: 333, rdev: 444, .. }),
3849                change_time: CTIME,
3850                ..
3851            }
3852        );
3853        fs.close().await.expect("Close failed");
3854    }
3855
3856    #[fuchsia::test]
3857    async fn test_is_allocated() {
3858        let (fs, object) = test_filesystem_and_object().await;
3859
3860        // `test_filesystem_and_object()` wrote the buffer `TEST_DATA` to the device at offset
3861        // `TEST_DATA_OFFSET` where the length and offset are aligned to the block size.
3862        let aligned_offset = round_down(TEST_DATA_OFFSET, fs.block_size());
3863        let aligned_length = round_up(TEST_DATA.len() as u64, fs.block_size()).unwrap();
3864
3865        // Check for the case where where we have the following extent layout
3866        //       [ unallocated ][ `TEST_DATA` ]
3867        // The extents before `aligned_offset` should not be allocated
3868        let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3869        assert_eq!(count, aligned_offset);
3870        assert_eq!(allocated, false);
3871
3872        let (allocated, count) =
3873            object.is_allocated(aligned_offset).await.expect("is_allocated failed");
3874        assert_eq!(count, aligned_length);
3875        assert_eq!(allocated, true);
3876
3877        // Check for the case where where we query out of range
3878        let end = aligned_offset + aligned_length;
3879        object
3880            .is_allocated(end)
3881            .await
3882            .expect_err("is_allocated should have returned ERR_OUT_OF_RANGE");
3883
3884        // Check for the case where where we start querying for allocation starting from
3885        // an allocated range to the end of the device
3886        let size = 50 * fs.block_size() as u64;
3887        object.truncate(size).await.expect("extend failed");
3888
3889        let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3890        assert_eq!(count, size - end);
3891        assert_eq!(allocated, false);
3892
3893        // Check for the case where where we have the following extent layout
3894        //      [ unallocated ][ `buf` ][ `buf` ]
3895        let buf_length = 5 * fs.block_size();
3896        let mut buf = object.allocate_buffer(buf_length as usize).await;
3897        buf.as_mut_slice().fill(123);
3898        let new_offset = end + 20 * fs.block_size() as u64;
3899        object.write_or_append(Some(new_offset), buf.as_ref()).await.expect("write failed");
3900        object
3901            .write_or_append(Some(new_offset + buf_length), buf.as_ref())
3902            .await
3903            .expect("write failed");
3904
3905        let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3906        assert_eq!(count, new_offset - end);
3907        assert_eq!(allocated, false);
3908
3909        let (allocated, count) =
3910            object.is_allocated(new_offset).await.expect("is_allocated failed");
3911        assert_eq!(count, 2 * buf_length);
3912        assert_eq!(allocated, true);
3913
3914        // Check the case where we query from the middle of an extent
3915        let (allocated, count) = object
3916            .is_allocated(new_offset + 4 * fs.block_size())
3917            .await
3918            .expect("is_allocated failed");
3919        assert_eq!(count, 2 * buf_length - 4 * fs.block_size());
3920        assert_eq!(allocated, true);
3921
3922        // Now, write buffer to a location already written to.
3923        // Check for the case when we the following extent layout
3924        //      [ unallocated ][ `other_buf` ][ (part of) `buf` ][ `buf` ]
3925        let other_buf_length = 3 * fs.block_size();
3926        let mut other_buf = object.allocate_buffer(other_buf_length as usize).await;
3927        other_buf.as_mut_slice().fill(231);
3928        object.write_or_append(Some(new_offset), other_buf.as_ref()).await.expect("write failed");
3929
3930        // We still expect that `is_allocated(..)` will return that  there are 2*`buf_length bytes`
3931        // allocated from `new_offset`
3932        let (allocated, count) =
3933            object.is_allocated(new_offset).await.expect("is_allocated failed");
3934        assert_eq!(count, 2 * buf_length);
3935        assert_eq!(allocated, true);
3936
3937        // Check for the case when we the following extent layout
3938        //   [ unallocated ][ deleted ][ unallocated ][ deleted ][ allocated ]
3939        // Mark TEST_DATA as deleted
3940        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3941        object
3942            .zero(&mut transaction, aligned_offset..aligned_offset + aligned_length)
3943            .await
3944            .expect("zero failed");
3945        // Mark `other_buf` as deleted
3946        object
3947            .zero(&mut transaction, new_offset..new_offset + buf_length)
3948            .await
3949            .expect("zero failed");
3950        transaction.commit().await.expect("commit transaction failed");
3951
3952        let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3953        assert_eq!(count, new_offset + buf_length);
3954        assert_eq!(allocated, false);
3955
3956        let (allocated, count) =
3957            object.is_allocated(new_offset + buf_length).await.expect("is_allocated failed");
3958        assert_eq!(count, buf_length);
3959        assert_eq!(allocated, true);
3960
3961        let new_end = new_offset + buf_length + count;
3962
3963        // Check for the case where there are objects with different keys.
3964        // Case that we're checking for:
3965        //      [ unallocated ][ extent (object with different key) ][ unallocated ]
3966        let store = object.owner();
3967        let mut transaction = fs
3968            .clone()
3969            .new_transaction(lock_keys![], Options::default())
3970            .await
3971            .expect("new_transaction failed");
3972        let object2 =
3973            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3974                .await
3975                .expect("create_object failed");
3976        transaction.commit().await.expect("commit failed");
3977
3978        object2
3979            .write_or_append(Some(new_end + fs.block_size()), buf.as_ref())
3980            .await
3981            .expect("write failed");
3982
3983        // Expecting that the extent with a different key is treated like unallocated extent
3984        let (allocated, count) = object.is_allocated(new_end).await.expect("is_allocated failed");
3985        assert_eq!(count, size - new_end);
3986        assert_eq!(allocated, false);
3987
3988        fs.close().await.expect("close failed");
3989    }
3990
3991    #[fuchsia::test(threads = 10)]
3992    async fn test_read_write_attr() {
3993        let (_fs, object) = test_filesystem_and_object().await;
3994        let data = [0xffu8; 16_384];
3995        object.write_attr(20, &data).await.expect("write_attr failed");
3996        let rdata =
3997            object.read_attr(20).await.expect("read_attr failed").expect("no attribute data found");
3998        assert_eq!(&data[..], &rdata[..]);
3999
4000        assert_eq!(object.read_attr(21).await.expect("read_attr failed"), None);
4001    }
4002
4003    #[fuchsia::test(threads = 10)]
4004    async fn test_allocate_basic() {
4005        let (fs, object) = test_filesystem_and_empty_object().await;
4006        let block_size = fs.block_size();
4007        let file_size = block_size * 10;
4008        object.truncate(file_size).await.unwrap();
4009
4010        let small_buf_size = 1024;
4011        let large_buf_aligned_size = block_size as usize * 2;
4012        let large_buf_size = block_size as usize * 2 + 1024;
4013
4014        let mut small_buf = object.allocate_buffer(small_buf_size).await;
4015        let mut large_buf_aligned = object.allocate_buffer(large_buf_aligned_size).await;
4016        let mut large_buf = object.allocate_buffer(large_buf_size).await;
4017
4018        assert_eq!(object.read(0, small_buf.as_mut()).await.unwrap(), small_buf_size);
4019        assert_eq!(small_buf.as_slice(), &vec![0; small_buf_size]);
4020        assert_eq!(object.read(0, large_buf.as_mut()).await.unwrap(), large_buf_size);
4021        assert_eq!(large_buf.as_slice(), &vec![0; large_buf_size]);
4022        assert_eq!(
4023            object.read(0, large_buf_aligned.as_mut()).await.unwrap(),
4024            large_buf_aligned_size
4025        );
4026        assert_eq!(large_buf_aligned.as_slice(), &vec![0; large_buf_aligned_size]);
4027
4028        // Allocation succeeds, and without any writes to the location it shows up as zero.
4029        object.allocate(block_size..block_size * 3).await.unwrap();
4030
4031        // Test starting before, inside, and after the allocated section with every sized buffer.
4032        for (buf_index, buf) in [small_buf, large_buf, large_buf_aligned].iter_mut().enumerate() {
4033            for offset in 0..4 {
4034                assert_eq!(
4035                    object.read(block_size * offset, buf.as_mut()).await.unwrap(),
4036                    buf.len(),
4037                    "buf_index: {}, read offset: {}",
4038                    buf_index,
4039                    offset,
4040                );
4041                assert_eq!(
4042                    buf.as_slice(),
4043                    &vec![0; buf.len()],
4044                    "buf_index: {}, read offset: {}",
4045                    buf_index,
4046                    offset,
4047                );
4048            }
4049        }
4050
4051        fs.close().await.expect("close failed");
4052    }
4053
4054    #[fuchsia::test(threads = 10)]
4055    async fn test_allocate_extends_file() {
4056        const BUF_SIZE: usize = 1024;
4057        let (fs, object) = test_filesystem_and_empty_object().await;
4058        let mut buf = object.allocate_buffer(BUF_SIZE).await;
4059        let block_size = fs.block_size();
4060
4061        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4062        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4063
4064        assert!(TEST_OBJECT_SIZE < block_size * 4);
4065        // Allocation succeeds, and without any writes to the location it shows up as zero.
4066        object.allocate(0..block_size * 4).await.unwrap();
4067        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4068        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4069        assert_eq!(object.read(block_size, buf.as_mut()).await.unwrap(), buf.len());
4070        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4071        assert_eq!(object.read(block_size * 3, buf.as_mut()).await.unwrap(), buf.len());
4072        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4073
4074        fs.close().await.expect("close failed");
4075    }
4076
4077    #[fuchsia::test(threads = 10)]
4078    async fn test_allocate_past_end() {
4079        const BUF_SIZE: usize = 1024;
4080        let (fs, object) = test_filesystem_and_empty_object().await;
4081        let mut buf = object.allocate_buffer(BUF_SIZE).await;
4082        let block_size = fs.block_size();
4083
4084        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4085        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4086
4087        assert!(TEST_OBJECT_SIZE < block_size * 4);
4088        // Allocation succeeds, and without any writes to the location it shows up as zero.
4089        object.allocate(block_size * 4..block_size * 6).await.unwrap();
4090        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
4091        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4092        assert_eq!(object.read(block_size * 4, buf.as_mut()).await.unwrap(), buf.len());
4093        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4094        assert_eq!(object.read(block_size * 5, buf.as_mut()).await.unwrap(), buf.len());
4095        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
4096
4097        fs.close().await.expect("close failed");
4098    }
4099
4100    #[fuchsia::test(threads = 10)]
4101    async fn test_allocate_read_attr() {
4102        let (fs, object) = test_filesystem_and_empty_object().await;
4103        let block_size = fs.block_size();
4104        let file_size = block_size * 4;
4105        object.truncate(file_size).await.unwrap();
4106
4107        let content = object
4108            .read_attr(object.attribute_id())
4109            .await
4110            .expect("failed to read attr")
4111            .expect("attr returned none");
4112        assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
4113
4114        object.allocate(block_size..block_size * 3).await.unwrap();
4115
4116        let content = object
4117            .read_attr(object.attribute_id())
4118            .await
4119            .expect("failed to read attr")
4120            .expect("attr returned none");
4121        assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
4122
4123        fs.close().await.expect("close failed");
4124    }
4125
4126    #[fuchsia::test(threads = 10)]
4127    async fn test_allocate_existing_data() {
4128        struct Case {
4129            written_ranges: Vec<Range<usize>>,
4130            allocate_range: Range<u64>,
4131        }
4132        let cases = [
4133            Case { written_ranges: vec![4..7], allocate_range: 4..7 },
4134            Case { written_ranges: vec![4..7], allocate_range: 3..8 },
4135            Case { written_ranges: vec![4..7], allocate_range: 5..6 },
4136            Case { written_ranges: vec![4..7], allocate_range: 5..8 },
4137            Case { written_ranges: vec![4..7], allocate_range: 3..5 },
4138            Case { written_ranges: vec![0..1, 2..3, 4..5, 6..7, 8..9], allocate_range: 0..10 },
4139            Case { written_ranges: vec![0..2, 4..6, 7..10], allocate_range: 1..8 },
4140        ];
4141
4142        for case in cases {
4143            let (fs, object) = test_filesystem_and_empty_object().await;
4144            let block_size = fs.block_size();
4145            let file_size = block_size * 10;
4146            object.truncate(file_size).await.unwrap();
4147
4148            for write in &case.written_ranges {
4149                let write_len = (write.end - write.start) * block_size as usize;
4150                let mut write_buf = object.allocate_buffer(write_len).await;
4151                write_buf.as_mut_slice().fill(0xff);
4152                assert_eq!(
4153                    object
4154                        .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
4155                        .await
4156                        .unwrap(),
4157                    file_size
4158                );
4159            }
4160
4161            let mut expected_buf = object.allocate_buffer(file_size as usize).await;
4162            assert_eq!(object.read(0, expected_buf.as_mut()).await.unwrap(), expected_buf.len());
4163
4164            object
4165                .allocate(
4166                    case.allocate_range.start * block_size..case.allocate_range.end * block_size,
4167                )
4168                .await
4169                .unwrap();
4170
4171            let mut read_buf = object.allocate_buffer(file_size as usize).await;
4172            assert_eq!(object.read(0, read_buf.as_mut()).await.unwrap(), read_buf.len());
4173            assert_eq!(read_buf.as_slice(), expected_buf.as_slice());
4174
4175            fs.close().await.expect("close failed");
4176        }
4177    }
4178
4179    async fn get_modes(
4180        obj: &DataObjectHandle<ObjectStore>,
4181        mut search_range: Range<u64>,
4182    ) -> Vec<(Range<u64>, ExtentMode)> {
4183        let mut modes = Vec::new();
4184        let store = obj.store();
4185        let tree = store.tree();
4186        let layer_set = tree.layer_set();
4187        let mut merger = layer_set.merger();
4188        let mut iter = merger
4189            .query(Query::FullRange(&ObjectKey::attribute(
4190                obj.object_id(),
4191                0,
4192                AttributeKey::Extent(ExtentKey::search_key_from_offset(search_range.start)),
4193            )))
4194            .await
4195            .unwrap();
4196        loop {
4197            match iter.get() {
4198                Some(ItemRef {
4199                    key:
4200                        ObjectKey {
4201                            object_id,
4202                            data:
4203                                ObjectKeyData::Attribute(
4204                                    attribute_id,
4205                                    AttributeKey::Extent(ExtentKey { range }),
4206                                ),
4207                        },
4208                    value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
4209                    ..
4210                }) if *object_id == obj.object_id() && *attribute_id == 0 => {
4211                    if search_range.end <= range.start {
4212                        break;
4213                    }
4214                    let found_range = std::cmp::max(search_range.start, range.start)
4215                        ..std::cmp::min(search_range.end, range.end);
4216                    search_range.start = found_range.end;
4217                    modes.push((found_range, mode.clone()));
4218                    if search_range.start == search_range.end {
4219                        break;
4220                    }
4221                    iter.advance().await.unwrap();
4222                }
4223                x => panic!("looking for extent record, found this {:?}", x),
4224            }
4225        }
4226        modes
4227    }
4228
4229    async fn assert_all_overwrite(
4230        obj: &DataObjectHandle<ObjectStore>,
4231        mut search_range: Range<u64>,
4232    ) {
4233        let modes = get_modes(obj, search_range.clone()).await;
4234        for mode in modes {
4235            assert_eq!(
4236                mode.0.start, search_range.start,
4237                "missing mode in range {}..{}",
4238                search_range.start, mode.0.start
4239            );
4240            match mode.1 {
4241                ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => (),
4242                m => panic!("mode at range {:?} was not overwrite, instead found {:?}", mode.0, m),
4243            }
4244            assert!(
4245                mode.0.end <= search_range.end,
4246                "mode ends beyond search range (bug in test) - search_range: {:?}, mode: {:?}",
4247                search_range,
4248                mode,
4249            );
4250            search_range.start = mode.0.end;
4251        }
4252        assert_eq!(
4253            search_range.start, search_range.end,
4254            "missing mode in range {:?}",
4255            search_range
4256        );
4257    }
4258
4259    #[fuchsia::test(threads = 10)]
4260    async fn test_multi_overwrite() {
4261        #[derive(Debug)]
4262        struct Case {
4263            pre_writes: Vec<Range<usize>>,
4264            allocate_ranges: Vec<Range<u64>>,
4265            overwrites: Vec<Vec<Range<u64>>>,
4266        }
4267        let cases = [
4268            Case {
4269                pre_writes: Vec::new(),
4270                allocate_ranges: vec![1..3],
4271                overwrites: vec![vec![1..3]],
4272            },
4273            Case {
4274                pre_writes: Vec::new(),
4275                allocate_ranges: vec![0..1, 1..2, 2..3, 3..4],
4276                overwrites: vec![vec![0..4]],
4277            },
4278            Case {
4279                pre_writes: Vec::new(),
4280                allocate_ranges: vec![0..4],
4281                overwrites: vec![vec![0..1], vec![1..2], vec![3..4]],
4282            },
4283            Case {
4284                pre_writes: Vec::new(),
4285                allocate_ranges: vec![0..4],
4286                overwrites: vec![vec![3..4]],
4287            },
4288            Case {
4289                pre_writes: Vec::new(),
4290                allocate_ranges: vec![0..4],
4291                overwrites: vec![vec![3..4], vec![2..3], vec![1..2]],
4292            },
4293            Case {
4294                pre_writes: Vec::new(),
4295                allocate_ranges: vec![1..2, 5..6, 7..8],
4296                overwrites: vec![vec![5..6]],
4297            },
4298            Case {
4299                pre_writes: Vec::new(),
4300                allocate_ranges: vec![1..3],
4301                overwrites: vec![
4302                    vec![1..3],
4303                    vec![1..3],
4304                    vec![1..3],
4305                    vec![1..3],
4306                    vec![1..3],
4307                    vec![1..3],
4308                    vec![1..3],
4309                    vec![1..3],
4310                ],
4311            },
4312            Case {
4313                pre_writes: Vec::new(),
4314                allocate_ranges: vec![0..5],
4315                overwrites: vec![
4316                    vec![1..3],
4317                    vec![1..3],
4318                    vec![1..3],
4319                    vec![1..3],
4320                    vec![1..3],
4321                    vec![1..3],
4322                    vec![1..3],
4323                    vec![1..3],
4324                ],
4325            },
4326            Case {
4327                pre_writes: Vec::new(),
4328                allocate_ranges: vec![0..5],
4329                overwrites: vec![vec![0..2, 2..4, 4..5]],
4330            },
4331            Case {
4332                pre_writes: Vec::new(),
4333                allocate_ranges: vec![0..5, 5..10],
4334                overwrites: vec![vec![1..2, 2..3, 4..7, 7..8]],
4335            },
4336            Case {
4337                pre_writes: Vec::new(),
4338                allocate_ranges: vec![0..4, 6..10],
4339                overwrites: vec![vec![2..3, 7..9]],
4340            },
4341            Case {
4342                pre_writes: Vec::new(),
4343                allocate_ranges: vec![0..10],
4344                overwrites: vec![vec![1..2, 5..10], vec![0..1, 5..10], vec![0..5, 5..10]],
4345            },
4346            Case {
4347                pre_writes: Vec::new(),
4348                allocate_ranges: vec![0..10],
4349                overwrites: vec![vec![0..2, 2..4, 4..6, 6..8, 8..10], vec![0..5, 5..10]],
4350            },
4351            Case {
4352                pre_writes: vec![1..3],
4353                allocate_ranges: vec![1..3],
4354                overwrites: vec![vec![1..3]],
4355            },
4356            Case {
4357                pre_writes: vec![1..3],
4358                allocate_ranges: vec![4..6],
4359                overwrites: vec![vec![5..6]],
4360            },
4361            Case {
4362                pre_writes: vec![1..3],
4363                allocate_ranges: vec![0..4],
4364                overwrites: vec![vec![0..4]],
4365            },
4366            Case {
4367                pre_writes: vec![1..3],
4368                allocate_ranges: vec![2..4],
4369                overwrites: vec![vec![2..4]],
4370            },
4371            Case {
4372                pre_writes: vec![3..5],
4373                allocate_ranges: vec![1..3, 6..7],
4374                overwrites: vec![vec![1..3, 6..7]],
4375            },
4376            Case {
4377                pre_writes: vec![1..3, 5..7, 8..9],
4378                allocate_ranges: vec![0..5],
4379                overwrites: vec![vec![0..2, 2..5], vec![0..5]],
4380            },
4381            Case {
4382                pre_writes: Vec::new(),
4383                allocate_ranges: vec![0..10, 4..6],
4384                overwrites: Vec::new(),
4385            },
4386            Case {
4387                pre_writes: Vec::new(),
4388                allocate_ranges: vec![3..8, 5..10],
4389                overwrites: Vec::new(),
4390            },
4391            Case {
4392                pre_writes: Vec::new(),
4393                allocate_ranges: vec![5..10, 3..8],
4394                overwrites: Vec::new(),
4395            },
4396        ];
4397
4398        for (i, case) in cases.into_iter().enumerate() {
4399            log::info!("running case {} - {:?}", i, case);
4400            let (fs, object) = test_filesystem_and_empty_object().await;
4401            let block_size = fs.block_size();
4402            let file_size = block_size * 10;
4403            object.truncate(file_size).await.unwrap();
4404
4405            for write in case.pre_writes {
4406                let write_len = (write.end - write.start) * block_size as usize;
4407                let mut write_buf = object.allocate_buffer(write_len).await;
4408                write_buf.as_mut_slice().fill(0xff);
4409                assert_eq!(
4410                    object
4411                        .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
4412                        .await
4413                        .unwrap(),
4414                    file_size
4415                );
4416            }
4417
4418            for allocate_range in &case.allocate_ranges {
4419                object
4420                    .allocate(allocate_range.start * block_size..allocate_range.end * block_size)
4421                    .await
4422                    .unwrap();
4423            }
4424
4425            for allocate_range in case.allocate_ranges {
4426                assert_all_overwrite(
4427                    &object,
4428                    allocate_range.start * block_size..allocate_range.end * block_size,
4429                )
4430                .await;
4431            }
4432
4433            for overwrite in case.overwrites {
4434                let mut write_len = 0;
4435                let overwrite = overwrite
4436                    .into_iter()
4437                    .map(|r| {
4438                        write_len += (r.end - r.start) * block_size;
4439                        r.start * block_size..r.end * block_size
4440                    })
4441                    .collect::<Vec<_>>();
4442                let mut write_buf = object.allocate_buffer(write_len as usize).await;
4443                let data = (0..20).cycle().take(write_len as usize).collect::<Vec<_>>();
4444                write_buf.as_mut_slice().copy_from_slice(&data);
4445
4446                let mut expected_buf = object.allocate_buffer(file_size as usize).await;
4447                assert_eq!(
4448                    object.read(0, expected_buf.as_mut()).await.unwrap(),
4449                    expected_buf.len()
4450                );
4451                let expected_buf_slice = expected_buf.as_mut_slice();
4452                let mut data_slice = data.as_slice();
4453                for r in &overwrite {
4454                    let len = r.length().unwrap() as usize;
4455                    let (copy_from, rest) = data_slice.split_at(len);
4456                    expected_buf_slice[r.start as usize..r.end as usize]
4457                        .copy_from_slice(&copy_from);
4458                    data_slice = rest;
4459                }
4460
4461                let mut transaction = object.new_transaction().await.unwrap();
4462                object
4463                    .multi_overwrite(&mut transaction, 0, &overwrite, write_buf.as_mut())
4464                    .await
4465                    .unwrap_or_else(|_| panic!("multi_overwrite error on case {}", i));
4466                // Double check the emitted checksums. We should have one u64 checksum for every
4467                // block we wrote to disk.
4468                let mut checksummed_range_length = 0;
4469                let mut num_checksums = 0;
4470                for (device_range, checksums, _) in transaction.checksums() {
4471                    let range_len = device_range.end - device_range.start;
4472                    let checksums_len = checksums.len() as u64;
4473                    assert_eq!(range_len / checksums_len, block_size);
4474                    checksummed_range_length += range_len;
4475                    num_checksums += checksums_len;
4476                }
4477                assert_eq!(checksummed_range_length, write_len);
4478                assert_eq!(num_checksums, write_len / block_size);
4479                transaction.commit().await.unwrap();
4480
4481                let mut buf = object.allocate_buffer(file_size as usize).await;
4482                assert_eq!(
4483                    object.read(0, buf.as_mut()).await.unwrap(),
4484                    buf.len(),
4485                    "failed length check on case {}",
4486                    i,
4487                );
4488                assert_eq!(buf.as_slice(), expected_buf.as_slice(), "failed on case {}", i);
4489            }
4490
4491            fsck_volume(&fs, object.store().store_object_id(), None).await.expect("fsck failed");
4492            fs.close().await.expect("close failed");
4493        }
4494    }
4495
4496    #[fuchsia::test(threads = 10)]
4497    async fn test_multi_overwrite_mode_updates() {
4498        let (fs, object) = test_filesystem_and_empty_object().await;
4499        let block_size = fs.block_size();
4500        let file_size = block_size * 10;
4501        object.truncate(file_size).await.unwrap();
4502
4503        let mut expected_bitmap = BitVec::from_elem(10, false);
4504
4505        object.allocate(0..10 * block_size).await.unwrap();
4506        assert_eq!(
4507            get_modes(&object, 0..10 * block_size).await,
4508            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4509        );
4510
4511        let mut write_buf = object.allocate_buffer(2 * block_size as usize).await;
4512        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4513        write_buf.as_mut_slice().copy_from_slice(&data);
4514        let mut transaction = object.new_transaction().await.unwrap();
4515        object
4516            .multi_overwrite(
4517                &mut transaction,
4518                0,
4519                &[2 * block_size..4 * block_size],
4520                write_buf.as_mut(),
4521            )
4522            .await
4523            .unwrap();
4524        transaction.commit().await.unwrap();
4525
4526        expected_bitmap.set(2, true);
4527        expected_bitmap.set(3, true);
4528        assert_eq!(
4529            get_modes(&object, 0..10 * block_size).await,
4530            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4531        );
4532
4533        let mut write_buf = object.allocate_buffer(3 * block_size as usize).await;
4534        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4535        write_buf.as_mut_slice().copy_from_slice(&data);
4536        let mut transaction = object.new_transaction().await.unwrap();
4537        object
4538            .multi_overwrite(
4539                &mut transaction,
4540                0,
4541                &[3 * block_size..5 * block_size, 6 * block_size..7 * block_size],
4542                write_buf.as_mut(),
4543            )
4544            .await
4545            .unwrap();
4546        transaction.commit().await.unwrap();
4547
4548        expected_bitmap.set(4, true);
4549        expected_bitmap.set(6, true);
4550        assert_eq!(
4551            get_modes(&object, 0..10 * block_size).await,
4552            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4553        );
4554
4555        let mut write_buf = object.allocate_buffer(6 * block_size as usize).await;
4556        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4557        write_buf.as_mut_slice().copy_from_slice(&data);
4558        let mut transaction = object.new_transaction().await.unwrap();
4559        object
4560            .multi_overwrite(
4561                &mut transaction,
4562                0,
4563                &[
4564                    0..2 * block_size,
4565                    5 * block_size..6 * block_size,
4566                    7 * block_size..10 * block_size,
4567                ],
4568                write_buf.as_mut(),
4569            )
4570            .await
4571            .unwrap();
4572        transaction.commit().await.unwrap();
4573
4574        assert_eq!(
4575            get_modes(&object, 0..10 * block_size).await,
4576            vec![(0..10 * block_size, ExtentMode::Overwrite)]
4577        );
4578
4579        fs.close().await.expect("close failed");
4580    }
4581
4582    #[fuchsia::test(threads = 10)]
4583    async fn test_check_unwritten_zero() {
4584        let device = DeviceHolder::new(FakeDevice::new(256 * 1024, TEST_DEVICE_BLOCK_SIZE));
4585        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
4586        let object = create_object_with_key(fs.clone(), Some(&new_insecure_crypt()), false).await;
4587        let block_size = fs.block_size();
4588
4589        // Set up a file with eight blocks to look like this:
4590        // | None | COW | COW | None | Overwrite(unwritten) | Overwrite(written) | None |
4591        let file_size = block_size * 7;
4592        object.truncate(file_size).await.unwrap();
4593        assert!(object.check_unwritten_zero(0..file_size).await.unwrap());
4594
4595        let mut buffer = object.allocate_buffer(block_size as usize).await;
4596        buffer.as_mut_slice().fill(1);
4597        object.write_or_append(Some(block_size), buffer.as_ref()).await.expect("write failed");
4598        object.write_or_append(Some(block_size * 2), buffer.as_ref()).await.expect("write failed");
4599
4600        object.allocate((block_size * 4)..(block_size * 6)).await.expect("Allocate failed");
4601        let mut transaction = fs
4602            .clone()
4603            .new_transaction(
4604                lock_keys![LockKey::object(object.store().store_object_id(), object.object_id(),)],
4605                Options::default(),
4606            )
4607            .await
4608            .expect("new_transaction failed");
4609        object
4610            .multi_overwrite(
4611                &mut transaction,
4612                DEFAULT_DATA_ATTRIBUTE_ID,
4613                &vec![(block_size * 5)..(block_size * 6)],
4614                buffer.as_mut(),
4615            )
4616            .await
4617            .expect("Multi overwrite");
4618        transaction.commit().await.expect("Committing overwrite");
4619
4620        // Anything touching the COW ranges should fail.
4621        assert!(!object.check_unwritten_zero(0..(block_size * 2)).await.unwrap());
4622        assert!(!object.check_unwritten_zero(block_size..(block_size * 3)).await.unwrap());
4623        assert!(!object.check_unwritten_zero((block_size * 2)..(block_size * 4)).await.unwrap());
4624
4625        // This should be fine, as the OverwritePartial should only touch the unwritten block.
4626        assert!(object.check_unwritten_zero((block_size * 3)..(block_size * 5)).await.unwrap());
4627
4628        // These should touch the written overwrite block and fail.
4629        assert!(!object.check_unwritten_zero((block_size * 4)..(block_size * 6)).await.unwrap());
4630        assert!(!object.check_unwritten_zero((block_size * 5)..(block_size * 7)).await.unwrap());
4631
4632        fs.close().await.expect("close failed");
4633    }
4634}