fxfs/object_store/
data_object_handle.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::log::*;
7use crate::lsm_tree::Query;
8use crate::lsm_tree::types::{ItemRef, LayerIterator};
9use crate::object_handle::{
10    ObjectHandle, ObjectProperties, ReadObjectHandle, WriteBytes, WriteObjectHandle,
11};
12use crate::object_store::extent_record::{ExtentKey, ExtentMode, ExtentValue};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::object_record::{
15    AttributeKey, FsverityMetadata, ObjectAttributes, ObjectItem, ObjectKey, ObjectKeyData,
16    ObjectKind, ObjectValue, Timestamp,
17};
18use crate::object_store::store_object_handle::{MaybeChecksums, NeedsTrim};
19use crate::object_store::transaction::{
20    self, AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Operation, Options,
21    Transaction, lock_keys,
22};
23use crate::object_store::{
24    DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions, HandleOwner,
25    RootDigest, StoreObjectHandle, TRANSACTION_MUTATION_THRESHOLD, TrimMode, TrimResult,
26};
27use crate::range::RangeExt;
28use crate::round::{round_down, round_up};
29use anyhow::{Context, Error, anyhow, bail, ensure};
30use async_trait::async_trait;
31use fidl_fuchsia_io as fio;
32use fsverity_merkle::{FsVerityHasher, FsVerityHasherOptions, MerkleTreeBuilder};
33use fuchsia_sync::Mutex;
34use futures::TryStreamExt;
35use futures::stream::FuturesUnordered;
36use fxfs_trace::trace;
37use std::cmp::min;
38use std::ops::{Deref, DerefMut, Range};
39use std::sync::Arc;
40use std::sync::atomic::{self, AtomicU64, Ordering};
41use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
42
43mod allocated_ranges;
44pub use allocated_ranges::{AllocatedRanges, RangeType};
45
46/// How much data each transaction will cover when writing an attribute across batches. Pulled from
47/// `FLUSH_BATCH_SIZE` in paged_object_handle.rs.
48pub const WRITE_ATTR_BATCH_SIZE: usize = 524_288;
49
50/// DataObjectHandle is a typed handle for file-like objects that store data in the default data
51/// attribute. In addition to traditional files, this means things like the journal, superblocks,
52/// and layer files.
53///
54/// It caches the content size of the data attribute it was configured for, and has helpers for
55/// complex extent manipulation, as well as implementations of ReadObjectHandle and
56/// WriteObjectHandle.
57pub struct DataObjectHandle<S: HandleOwner> {
58    handle: StoreObjectHandle<S>,
59    attribute_id: u64,
60    content_size: AtomicU64,
61    fsverity_state: Mutex<FsverityState>,
62    overwrite_ranges: AllocatedRanges,
63}
64
65/// Represents the mapping of a file's contents to the physical storage backing it.
66#[derive(Debug, Clone)]
67pub struct FileExtent {
68    logical_offset: u64,
69    device_range: Range<u64>,
70}
71
72impl FileExtent {
73    pub fn new(logical_offset: u64, device_range: Range<u64>) -> Result<Self, Error> {
74        // Ensure `device_range` is valid.
75        let length = device_range.length()?;
76        // Ensure no overflow when we calculate the end of the logical range.
77        let _ = logical_offset.checked_add(length).ok_or(FxfsError::OutOfRange)?;
78        Ok(Self { logical_offset, device_range })
79    }
80}
81
82impl FileExtent {
83    pub fn length(&self) -> u64 {
84        // SAFETY: We verified that the device_range's length is valid in Self::new.
85        unsafe { self.device_range.unchecked_length() }
86    }
87
88    pub fn logical_offset(&self) -> u64 {
89        self.logical_offset
90    }
91
92    pub fn logical_range(&self) -> Range<u64> {
93        // SAFETY: We verified logical_offset plus device_range length won't overflow in Self::new.
94        unsafe { self.logical_offset..self.logical_offset.unchecked_add(self.length()) }
95    }
96
97    pub fn device_range(&self) -> &Range<u64> {
98        &self.device_range
99    }
100}
101
102#[derive(Debug)]
103pub enum FsverityState {
104    None,
105    Started,
106    Pending(FsverityStateInner),
107    Some(FsverityStateInner),
108}
109
110#[derive(Debug)]
111pub struct FsverityStateInner {
112    descriptor: FsverityMetadata,
113    // TODO(b/309656632): This should store the entire merkle tree and not just the leaf nodes.
114    // Potentially store a pager-backed vmo instead of passing around a boxed array.
115    merkle_tree: Box<[u8]>,
116}
117
118#[derive(Debug, Default)]
119pub struct OverwriteOptions {
120    // If false, then all the extents for the overwrite range must have been preallocated using
121    // preallocate_range or from existing writes.
122    pub allow_allocations: bool,
123    pub barrier_on_first_write: bool,
124}
125
126impl FsverityStateInner {
127    pub fn new(descriptor: FsverityMetadata, merkle_tree: Box<[u8]>) -> Self {
128        FsverityStateInner { descriptor, merkle_tree }
129    }
130
131    fn get_hasher_for_block_size(&self, block_size: usize) -> FsVerityHasher {
132        match self.descriptor.root_digest {
133            RootDigest::Sha256(_) => FsVerityHasher::Sha256(FsVerityHasherOptions::new(
134                self.descriptor.salt.clone(),
135                block_size,
136            )),
137            RootDigest::Sha512(_) => FsVerityHasher::Sha512(FsVerityHasherOptions::new(
138                self.descriptor.salt.clone(),
139                block_size,
140            )),
141        }
142    }
143}
144
145impl<S: HandleOwner> Deref for DataObjectHandle<S> {
146    type Target = StoreObjectHandle<S>;
147    fn deref(&self) -> &Self::Target {
148        &self.handle
149    }
150}
151
152impl<S: HandleOwner> DataObjectHandle<S> {
153    pub fn new(
154        owner: Arc<S>,
155        object_id: u64,
156        permanent_keys: bool,
157        attribute_id: u64,
158        size: u64,
159        fsverity_state: FsverityState,
160        options: HandleOptions,
161        trace: bool,
162        overwrite_ranges: &[Range<u64>],
163    ) -> Self {
164        Self {
165            handle: StoreObjectHandle::new(owner, object_id, permanent_keys, options, trace),
166            attribute_id,
167            content_size: AtomicU64::new(size),
168            fsverity_state: Mutex::new(fsverity_state),
169            overwrite_ranges: AllocatedRanges::new(overwrite_ranges),
170        }
171    }
172
173    pub fn attribute_id(&self) -> u64 {
174        self.attribute_id
175    }
176
177    pub fn overwrite_ranges(&self) -> &AllocatedRanges {
178        &self.overwrite_ranges
179    }
180
181    pub fn is_verified_file(&self) -> bool {
182        matches!(*self.fsverity_state.lock(), FsverityState::Some(_))
183    }
184
185    /// Sets `self.fsverity_state` to FsverityState::Started. Called at the top of `enable_verity`.
186    /// If another caller has already started but not completed `enabled_verity`, returns
187    /// FxfsError::AlreadyBound. If another caller has already completed `enable_verity`, returns
188    /// FxfsError::AlreadyExists.
189    pub fn set_fsverity_state_started(&self) -> Result<(), Error> {
190        let mut fsverity_guard = self.fsverity_state.lock();
191        match *fsverity_guard {
192            FsverityState::None => {
193                *fsverity_guard = FsverityState::Started;
194                Ok(())
195            }
196            FsverityState::Started | FsverityState::Pending(_) => {
197                Err(anyhow!(FxfsError::Unavailable))
198            }
199            FsverityState::Some(_) => Err(anyhow!(FxfsError::AlreadyExists)),
200        }
201    }
202
203    /// Sets `self.fsverity_state` to Pending. Must be called before `finalize_fsverity_state()`.
204    /// Asserts that the prior state of `self.fsverity_state` was `FsverityState::Started`.
205    pub fn set_fsverity_state_pending(&self, descriptor: FsverityMetadata, merkle_tree: Box<[u8]>) {
206        let mut fsverity_guard = self.fsverity_state.lock();
207        assert!(matches!(*fsverity_guard, FsverityState::Started));
208        *fsverity_guard = FsverityState::Pending(FsverityStateInner { descriptor, merkle_tree });
209    }
210
211    /// Sets `self.fsverity_state` to Some. Panics if the prior state of `self.fsverity_state` was
212    /// not `FsverityState::Pending(_)`.
213    pub fn finalize_fsverity_state(&self) {
214        let mut fsverity_state_guard = self.fsverity_state.lock();
215        let mut_fsverity_state = fsverity_state_guard.deref_mut();
216        let fsverity_state = std::mem::replace(mut_fsverity_state, FsverityState::None);
217        match fsverity_state {
218            FsverityState::None => panic!("Cannot go from FsverityState::None to Some"),
219            FsverityState::Started => panic!("Cannot go from FsverityState::Started to Some"),
220            FsverityState::Pending(inner) => *mut_fsverity_state = FsverityState::Some(inner),
221            FsverityState::Some(_) => panic!("Fsverity state was already set to Some"),
222        }
223        // Once we finalize the fsverity state, the file is permanently read-only. The in-memory
224        // overwrite ranges tracking is only used for writing, so we don't need them anymore. This
225        // leaves any uninitialized, but allocated, overwrite regions if there are any, rather than
226        // converting them back to sparse regions.
227        self.overwrite_ranges.clear();
228    }
229
230    /// Sets `self.fsverity_state` directly to Some without going through the entire state machine.
231    /// Used to set `self.fsverity_state` on open of a verified file. The merkle tree data is
232    /// verified against the root digest here, and will return an error if the tree is not correct.
233    pub fn set_fsverity_state_some(
234        &self,
235        descriptor: FsverityMetadata,
236        merkle_tree: Box<[u8]>,
237    ) -> Result<(), Error> {
238        // Validate the merkle tree data against the root before applying it.
239        let metadata = FsverityStateInner { descriptor, merkle_tree };
240        let hasher = metadata.get_hasher_for_block_size(self.block_size() as usize);
241        ensure!(metadata.merkle_tree.len() % hasher.hash_size() == 0, FxfsError::Inconsistent);
242        let leaf_chunks = metadata.merkle_tree.chunks_exact(hasher.hash_size());
243        let mut builder = MerkleTreeBuilder::new(hasher);
244        for leaf in leaf_chunks {
245            builder.push_data_hash(leaf.to_vec());
246        }
247        let tree = builder.finish();
248        let root_hash = match &metadata.descriptor.root_digest {
249            RootDigest::Sha256(root_hash) => root_hash.as_slice(),
250            RootDigest::Sha512(root_hash) => root_hash.as_slice(),
251        };
252        ensure!(root_hash == tree.root(), FxfsError::IntegrityError);
253
254        let mut fsverity_guard = self.fsverity_state.lock();
255        assert!(matches!(*fsverity_guard, FsverityState::None));
256        *fsverity_guard = FsverityState::Some(metadata);
257        Ok(())
258    }
259
260    /// Verifies contents of `buffer` against the corresponding hashes in the stored merkle tree.
261    /// `offset` is the logical offset in the file that `buffer` starts at. `offset` must be
262    /// block-aligned. Fails on non fsverity-enabled files.
263    fn verify_data(&self, mut offset: usize, buffer: &[u8]) -> Result<(), Error> {
264        let block_size = self.block_size() as usize;
265        assert!(offset % block_size == 0);
266        let fsverity_state = self.fsverity_state.lock();
267        match &*fsverity_state {
268            FsverityState::None => {
269                Err(anyhow!("Tried to verify read on a non verity-enabled file"))
270            }
271            FsverityState::Started | FsverityState::Pending(_) => Err(anyhow!(
272                "Enable verity has not yet completed, fsverity state: {:?}",
273                &*fsverity_state
274            )),
275            FsverityState::Some(metadata) => {
276                let hasher = metadata.get_hasher_for_block_size(block_size);
277                let leaf_nodes: Vec<&[u8]> =
278                    metadata.merkle_tree.chunks(hasher.hash_size()).collect();
279                fxfs_trace::duration!(c"fsverity-verify", "len" => buffer.len());
280                // TODO(b/318880297): Consider parallelizing computation.
281                for b in buffer.chunks(block_size) {
282                    ensure!(
283                        hasher.hash_block(b) == leaf_nodes[offset / block_size],
284                        anyhow!(FxfsError::Inconsistent).context("Hash mismatch")
285                    );
286                    offset += block_size;
287                }
288                Ok(())
289            }
290        }
291    }
292
293    /// Extend the file with the given extent.  The only use case for this right now is for files
294    /// that must exist at certain offsets on the device, such as super-blocks.
295    pub async fn extend<'a>(
296        &'a self,
297        transaction: &mut Transaction<'a>,
298        device_range: Range<u64>,
299    ) -> Result<(), Error> {
300        let old_end =
301            round_up(self.txn_get_size(transaction), self.block_size()).ok_or(FxfsError::TooBig)?;
302        let new_size = old_end + device_range.end - device_range.start;
303        self.store().allocator().mark_allocated(
304            transaction,
305            self.store().store_object_id(),
306            device_range.clone(),
307        )?;
308        self.txn_update_size(transaction, new_size, None).await?;
309        let key_id = self.get_key(None).await?.0;
310        transaction.add(
311            self.store().store_object_id,
312            Mutation::merge_object(
313                ObjectKey::extent(self.object_id(), self.attribute_id(), old_end..new_size),
314                ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
315            ),
316        );
317        self.update_allocated_size(transaction, device_range.end - device_range.start, 0).await
318    }
319
320    // Returns a new aligned buffer (reading the head and tail blocks if necessary) with a copy of
321    // the data from `buf`.
322    async fn align_buffer(
323        &self,
324        offset: u64,
325        buf: BufferRef<'_>,
326    ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
327        self.handle.align_buffer(self.attribute_id(), offset, buf).await
328    }
329
330    // Writes potentially unaligned data at `device_offset` and returns checksums if requested. The
331    // data will be encrypted if necessary.
332    // `buf` is mutable as an optimization, since the write may require encryption, we can encrypt
333    // the buffer in-place rather than copying to another buffer if the write is already aligned.
334    async fn write_at(
335        &self,
336        offset: u64,
337        buf: MutableBufferRef<'_>,
338        device_offset: u64,
339    ) -> Result<MaybeChecksums, Error> {
340        self.handle.write_at(self.attribute_id(), offset, buf, None, device_offset).await
341    }
342
343    /// Zeroes the given range.  The range must be aligned.  Returns the amount of data deallocated.
344    pub async fn zero(
345        &self,
346        transaction: &mut Transaction<'_>,
347        range: Range<u64>,
348    ) -> Result<(), Error> {
349        self.handle.zero(transaction, self.attribute_id(), range).await
350    }
351
352    /// The cached value for `self.fsverity_state` is set either in `open_object` or on
353    /// `enable_verity`. If set, translates `self.fsverity_state.descriptor` into an
354    /// fio::VerificationOptions instance and a root hash. Otherwise, returns None.
355    pub fn get_descriptor(&self) -> Result<Option<(fio::VerificationOptions, Vec<u8>)>, Error> {
356        let fsverity_state = self.fsverity_state.lock();
357        match &*fsverity_state {
358            FsverityState::None => Ok(None),
359            FsverityState::Started | FsverityState::Pending(_) => Err(anyhow!(
360                "Enable verity has not yet completed, fsverity state: {:?}",
361                &*fsverity_state
362            )),
363            FsverityState::Some(metadata) => {
364                let (options, root_hash) = match &metadata.descriptor.root_digest {
365                    RootDigest::Sha256(root_hash) => {
366                        let mut root_vec = root_hash.to_vec();
367                        // Need to zero out the rest of the vector so that there's no garbage.
368                        root_vec.extend_from_slice(&[0; 32]);
369                        (
370                            fio::VerificationOptions {
371                                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
372                                salt: Some(metadata.descriptor.salt.clone()),
373                                ..Default::default()
374                            },
375                            root_vec,
376                        )
377                    }
378                    RootDigest::Sha512(root_hash) => (
379                        fio::VerificationOptions {
380                            hash_algorithm: Some(fio::HashAlgorithm::Sha512),
381                            salt: Some(metadata.descriptor.salt.clone()),
382                            ..Default::default()
383                        },
384                        root_hash.clone(),
385                    ),
386                };
387                Ok(Some((options, root_hash)))
388            }
389        }
390    }
391
392    /// Reads the data attribute and computes a merkle tree from the data. The values of the
393    /// parameters required to build the merkle tree are supplied by `descriptor` (i.e. salt,
394    /// hash_algorithm, etc.) Writes the leaf nodes of the merkle tree to an attribute with id
395    /// `FSVERITY_MERKLE_ATTRIBUTE_ID`. Updates the root_hash of the `descriptor` according to the
396    /// computed merkle tree and then replaces the ObjectValue of the data attribute with
397    /// ObjectValue::VerifiedAttribute, which stores the `descriptor` inline.
398    #[trace]
399    pub async fn enable_verity(&self, options: fio::VerificationOptions) -> Result<(), Error> {
400        self.set_fsverity_state_started()?;
401        // If the merkle attribute was tombstoned in the last attempt of `enable_verity`, flushing
402        // the graveyard should process the tombstone before we start rewriting the attribute.
403        if let Some(_) = self
404            .store()
405            .tree()
406            .find(&ObjectKey::graveyard_attribute_entry(
407                self.store().graveyard_directory_object_id(),
408                self.object_id(),
409                FSVERITY_MERKLE_ATTRIBUTE_ID,
410            ))
411            .await?
412        {
413            self.store().filesystem().graveyard().flush().await;
414        }
415        let mut transaction = self.new_transaction().await?;
416        let hash_alg =
417            options.hash_algorithm.ok_or_else(|| anyhow!("No hash algorithm provided"))?;
418        let salt = options.salt.ok_or_else(|| anyhow!("No salt provided"))?;
419        let (root_digest, merkle_tree) = match hash_alg {
420            fio::HashAlgorithm::Sha256 => {
421                let hasher = FsVerityHasher::Sha256(FsVerityHasherOptions::new(
422                    salt.clone(),
423                    self.block_size() as usize,
424                ));
425                let mut builder = MerkleTreeBuilder::new(hasher);
426                let mut offset = 0;
427                let size = self.get_size();
428                // TODO(b/314836822): Consider further tuning the buffer size to optimize
429                // performance. Experimentally, most verity-enabled files are <256K.
430                let mut buf = self.allocate_buffer(64 * self.block_size() as usize).await;
431                while offset < size {
432                    // TODO(b/314842875): Consider optimizations for sparse files.
433                    let read = self.read(offset, buf.as_mut()).await? as u64;
434                    assert!(offset + read <= size);
435                    builder.write(&buf.as_slice()[0..read as usize]);
436                    offset += read;
437                }
438                let tree = builder.finish();
439                let merkle_leaf_nodes: Vec<u8> =
440                    tree.as_ref()[0].iter().flat_map(|x| x.clone()).collect();
441                // TODO(b/314194485): Eventually want streaming writes.
442                // The merkle tree attribute should not require trimming because it should not
443                // exist.
444                self.handle
445                    .write_new_attr_in_batches(
446                        &mut transaction,
447                        FSVERITY_MERKLE_ATTRIBUTE_ID,
448                        &merkle_leaf_nodes,
449                        WRITE_ATTR_BATCH_SIZE,
450                    )
451                    .await?;
452                let root: [u8; 32] = tree.root().try_into().unwrap();
453                (RootDigest::Sha256(root), merkle_leaf_nodes)
454            }
455            fio::HashAlgorithm::Sha512 => {
456                let hasher = FsVerityHasher::Sha512(FsVerityHasherOptions::new(
457                    salt.clone(),
458                    self.block_size() as usize,
459                ));
460                let mut builder = MerkleTreeBuilder::new(hasher);
461                let mut offset = 0;
462                let size = self.get_size();
463                // TODO(b/314836822): Consider further tuning the buffer size to optimize
464                // performance. Experimentally, most verity-enabled files are <256K.
465                let mut buf = self.allocate_buffer(64 * self.block_size() as usize).await;
466                while offset < size {
467                    // TODO(b/314842875): Consider optimizations for sparse files.
468                    let read = self.read(offset, buf.as_mut()).await? as u64;
469                    assert!(offset + read <= size);
470                    builder.write(&buf.as_slice()[0..read as usize]);
471                    offset += read;
472                }
473                let tree = builder.finish();
474                let merkle_leaf_nodes: Vec<u8> =
475                    tree.as_ref()[0].iter().flat_map(|x| x.clone()).collect();
476                // TODO(b/314194485): Eventually want streaming writes.
477                // The merkle tree attribute should not require trimming because it should not
478                // exist.
479                self.handle
480                    .write_new_attr_in_batches(
481                        &mut transaction,
482                        FSVERITY_MERKLE_ATTRIBUTE_ID,
483                        &merkle_leaf_nodes,
484                        WRITE_ATTR_BATCH_SIZE,
485                    )
486                    .await?;
487                (RootDigest::Sha512(tree.root().to_vec()), merkle_leaf_nodes)
488            }
489            _ => {
490                bail!(
491                    anyhow!(FxfsError::NotSupported)
492                        .context(format!("hash algorithm not supported"))
493                );
494            }
495        };
496        if merkle_tree.len() > WRITE_ATTR_BATCH_SIZE {
497            transaction.add(
498                self.store().store_object_id,
499                Mutation::replace_or_insert_object(
500                    ObjectKey::graveyard_attribute_entry(
501                        self.store().graveyard_directory_object_id(),
502                        self.object_id(),
503                        FSVERITY_MERKLE_ATTRIBUTE_ID,
504                    ),
505                    ObjectValue::None,
506                ),
507            );
508        };
509        let descriptor = FsverityMetadata { root_digest, salt };
510        self.set_fsverity_state_pending(descriptor.clone(), merkle_tree.into());
511        transaction.add_with_object(
512            self.store().store_object_id(),
513            Mutation::replace_or_insert_object(
514                ObjectKey::attribute(
515                    self.object_id(),
516                    DEFAULT_DATA_ATTRIBUTE_ID,
517                    AttributeKey::Attribute,
518                ),
519                ObjectValue::verified_attribute(self.get_size(), descriptor),
520            ),
521            AssocObj::Borrowed(self),
522        );
523        transaction.commit().await?;
524        Ok(())
525    }
526
527    /// Pre-allocate disk space for the given logical file range. If any part of the allocation
528    /// range is beyond the end of the file, the file size is updated.
529    pub async fn allocate(&self, range: Range<u64>) -> Result<(), Error> {
530        debug_assert!(range.start < range.end);
531
532        // It's not required that callers of allocate use block aligned ranges, but we need to make
533        // the extents block aligned. Luckily, fallocate in posix is allowed to allocate more than
534        // what was asked for for block alignment purposes. We just need to make sure that the size
535        // of the file is still the non-block-aligned end of the range if the size was changed.
536        let mut new_range = range.clone();
537        new_range.start = round_down(new_range.start, self.block_size());
538        // NB: FxfsError::TooBig turns into EFBIG when passed through starnix, which is the
539        // required error code when the requested range is larger than the file size.
540        new_range.end = round_up(new_range.end, self.block_size()).ok_or(FxfsError::TooBig)?;
541
542        let mut transaction = self.new_transaction().await?;
543        let mut to_allocate = Vec::new();
544        let mut to_switch = Vec::new();
545        let key_id = self.get_key(None).await?.0;
546
547        {
548            let tree = &self.store().tree;
549            let layer_set = tree.layer_set();
550            let offset_key = ObjectKey::attribute(
551                self.object_id(),
552                self.attribute_id(),
553                AttributeKey::Extent(ExtentKey::search_key_from_offset(new_range.start)),
554            );
555            let mut merger = layer_set.merger();
556            let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
557
558            loop {
559                match iter.get() {
560                    Some(ItemRef {
561                        key:
562                            ObjectKey {
563                                object_id,
564                                data:
565                                    ObjectKeyData::Attribute(
566                                        attribute_id,
567                                        AttributeKey::Extent(extent_key),
568                                    ),
569                            },
570                        value: ObjectValue::Extent(extent_value),
571                        ..
572                    }) if *object_id == self.object_id()
573                        && *attribute_id == self.attribute_id() =>
574                    {
575                        // If the start of this extent is beyond the end of the range we are
576                        // allocating, we don't have any more work to do.
577                        if new_range.end <= extent_key.range.start {
578                            break;
579                        }
580                        // Add any prefix we might need to allocate.
581                        if new_range.start < extent_key.range.start {
582                            to_allocate.push(new_range.start..extent_key.range.start);
583                            new_range.start = extent_key.range.start;
584                        }
585                        let device_offset = match extent_value {
586                            ExtentValue::None => {
587                                // If the extent value is None, it indicates a deleted extent. In
588                                // that case, we just skip it entirely. By keeping the new_range
589                                // where it is, this section will get included in the new
590                                // allocations.
591                                iter.advance().await?;
592                                continue;
593                            }
594                            ExtentValue::Some { mode: ExtentMode::OverwritePartial(_), .. }
595                            | ExtentValue::Some { mode: ExtentMode::Overwrite, .. } => {
596                                // If this extent is already in overwrite mode, we can skip it.
597                                if extent_key.range.end < new_range.end {
598                                    new_range.start = extent_key.range.end;
599                                    iter.advance().await?;
600                                    continue;
601                                } else {
602                                    new_range.start = new_range.end;
603                                    break;
604                                }
605                            }
606                            ExtentValue::Some { device_offset, .. } => *device_offset,
607                        };
608
609                        // Figure out how we have to break up the ranges.
610                        let device_offset =
611                            device_offset + (new_range.start - extent_key.range.start);
612                        if extent_key.range.end < new_range.end {
613                            to_switch.push((new_range.start..extent_key.range.end, device_offset));
614                            new_range.start = extent_key.range.end;
615                        } else {
616                            to_switch.push((new_range.start..new_range.end, device_offset));
617                            new_range.start = new_range.end;
618                            break;
619                        }
620                    }
621                    // The records are sorted so if we find something that isn't an extent or
622                    // doesn't match the object id then there are no more extent records for this
623                    // object.
624                    _ => break,
625                }
626                iter.advance().await?;
627            }
628        }
629
630        if new_range.start < new_range.end {
631            to_allocate.push(new_range.clone());
632        }
633
634        // We can update the size in the first transaction because even if subsequent transactions
635        // don't get replayed, the data between the current and new end of the file will be zero
636        // (either sparse zero or allocated zero). On the other hand, if we don't update the size
637        // in the first transaction, overwrite extents may be written past the end of the file
638        // which is an fsck error.
639        //
640        // The potential new size needs to be the non-block-aligned range end - we round up to the
641        // nearest block size for the actual allocation, but shouldn't do that for the file size.
642        let new_size = std::cmp::max(range.end, self.get_size());
643        // Make sure the mutation that flips the has_overwrite_extents advisory flag is in the
644        // first transaction, in case we split transactions. This makes it okay to only replay the
645        // first transaction if power loss occurs - the file will be in an unusual state, but not
646        // an invalid one, if only part of the allocate goes through.
647        transaction.add_with_object(
648            self.store().store_object_id(),
649            Mutation::replace_or_insert_object(
650                ObjectKey::attribute(
651                    self.object_id(),
652                    self.attribute_id(),
653                    AttributeKey::Attribute,
654                ),
655                ObjectValue::Attribute { size: new_size, has_overwrite_extents: true },
656            ),
657            AssocObj::Borrowed(self),
658        );
659
660        // The maximum number of mutations we are going to allow per transaction in allocate. This
661        // is probably quite a bit lower than the actual limit, but it should be large enough to
662        // handle most non-edge-case versions of allocate without splitting the transaction.
663        const MAX_TRANSACTION_SIZE: usize = 256;
664        for (switch_range, device_offset) in to_switch {
665            transaction.add_with_object(
666                self.store().store_object_id(),
667                Mutation::merge_object(
668                    ObjectKey::extent(self.object_id(), self.attribute_id(), switch_range),
669                    ObjectValue::Extent(ExtentValue::initialized_overwrite_extent(
670                        device_offset,
671                        key_id,
672                    )),
673                ),
674                AssocObj::Borrowed(self),
675            );
676            if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
677                transaction.commit_and_continue().await?;
678            }
679        }
680
681        let mut allocated = 0;
682        let allocator = self.store().allocator();
683        for mut allocate_range in to_allocate {
684            while allocate_range.start < allocate_range.end {
685                let device_range = allocator
686                    .allocate(
687                        &mut transaction,
688                        self.store().store_object_id(),
689                        allocate_range.end - allocate_range.start,
690                    )
691                    .await
692                    .context("allocation failed")?;
693                let device_range_len = device_range.end - device_range.start;
694
695                transaction.add_with_object(
696                    self.store().store_object_id(),
697                    Mutation::merge_object(
698                        ObjectKey::extent(
699                            self.object_id(),
700                            self.attribute_id(),
701                            allocate_range.start..allocate_range.start + device_range_len,
702                        ),
703                        ObjectValue::Extent(ExtentValue::blank_overwrite_extent(
704                            device_range.start,
705                            (device_range_len / self.block_size()) as usize,
706                            key_id,
707                        )),
708                    ),
709                    AssocObj::Borrowed(self),
710                );
711
712                allocate_range.start += device_range_len;
713                allocated += device_range_len;
714
715                if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
716                    self.update_allocated_size(&mut transaction, allocated, 0).await?;
717                    transaction.commit_and_continue().await?;
718                    allocated = 0;
719                }
720            }
721        }
722
723        self.update_allocated_size(&mut transaction, allocated, 0).await?;
724        transaction.commit().await?;
725
726        Ok(())
727    }
728
729    /// Return information on a contiguous set of extents that has the same allocation status,
730    /// starting from `start_offset`. The information returned is if this set of extents are marked
731    /// allocated/not allocated and also the size of this set (in bytes). This is used when
732    /// querying slices for volumes.
733    /// This function expects `start_offset` to be aligned to block size
734    pub async fn is_allocated(&self, start_offset: u64) -> Result<(bool, u64), Error> {
735        let block_size = self.block_size();
736        assert_eq!(start_offset % block_size, 0);
737
738        if start_offset > self.get_size() {
739            bail!(FxfsError::OutOfRange)
740        }
741
742        if start_offset == self.get_size() {
743            return Ok((false, 0));
744        }
745
746        let tree = &self.store().tree;
747        let layer_set = tree.layer_set();
748        let offset_key = ObjectKey::attribute(
749            self.object_id(),
750            self.attribute_id(),
751            AttributeKey::Extent(ExtentKey::search_key_from_offset(start_offset)),
752        );
753        let mut merger = layer_set.merger();
754        let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
755
756        let mut allocated = None;
757        let mut end = start_offset;
758
759        loop {
760            // Iterate through the extents, each time setting `end` as the end of the previous
761            // extent
762            match iter.get() {
763                Some(ItemRef {
764                    key:
765                        ObjectKey {
766                            object_id,
767                            data:
768                                ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
769                        },
770                    value: ObjectValue::Extent(extent_value),
771                    ..
772                }) => {
773                    // Equivalent of getting no extents back
774                    if *object_id != self.object_id() || *attribute_id != self.attribute_id() {
775                        if allocated == Some(false) || allocated.is_none() {
776                            end = self.get_size();
777                            allocated = Some(false);
778                        }
779                        break;
780                    }
781                    ensure!(extent_key.range.is_aligned(block_size), FxfsError::Inconsistent);
782                    if extent_key.range.start > end {
783                        // If a previous extent has already been visited and we are tracking an
784                        // allocated set, we are only interested in an extent where the range of the
785                        // current extent follows immediately after the previous one.
786                        if allocated == Some(true) {
787                            break;
788                        } else {
789                            // The gap between the previous `end` and this extent is not allocated
790                            end = extent_key.range.start;
791                            allocated = Some(false);
792                            // Continue this iteration, except now the `end` is set to the end of
793                            // the "previous" extent which is this gap between the start_offset
794                            // and the current extent
795                        }
796                    }
797
798                    // We can assume that from here, the `end` points to the end of a previous
799                    // extent.
800                    match extent_value {
801                        // The current extent has been allocated
802                        ExtentValue::Some { .. } => {
803                            // Stop searching if previous extent was marked deleted
804                            if allocated == Some(false) {
805                                break;
806                            }
807                            allocated = Some(true);
808                        }
809                        // This extent has been marked deleted
810                        ExtentValue::None => {
811                            // Stop searching if previous extent was marked allocated
812                            if allocated == Some(true) {
813                                break;
814                            }
815                            allocated = Some(false);
816                        }
817                    }
818                    end = extent_key.range.end;
819                }
820                // This occurs when there are no extents left
821                None => {
822                    if allocated == Some(false) || allocated.is_none() {
823                        end = self.get_size();
824                        allocated = Some(false);
825                    }
826                    // Otherwise, we were monitoring extents that were allocated, so just exit.
827                    break;
828                }
829                // Non-extent records (Object, Child, GraveyardEntry) are ignored.
830                Some(_) => {}
831            }
832            iter.advance().await?;
833        }
834
835        Ok((allocated.unwrap(), end - start_offset))
836    }
837
838    pub async fn txn_write<'a>(
839        &'a self,
840        transaction: &mut Transaction<'a>,
841        offset: u64,
842        buf: BufferRef<'_>,
843    ) -> Result<(), Error> {
844        if buf.is_empty() {
845            return Ok(());
846        }
847        let (aligned, mut transfer_buf) = self.align_buffer(offset, buf).await?;
848        self.multi_write(
849            transaction,
850            self.attribute_id(),
851            std::slice::from_ref(&aligned),
852            transfer_buf.as_mut(),
853        )
854        .await?;
855        if offset + buf.len() as u64 > self.txn_get_size(transaction) {
856            self.txn_update_size(transaction, offset + buf.len() as u64, None).await?;
857        }
858        Ok(())
859    }
860
861    // Writes to multiple ranges with data provided in `buf`.  The buffer can be modified in place
862    // if encryption takes place.  The ranges must all be aligned and no change to content size is
863    // applied; the caller is responsible for updating size if required.
864    pub async fn multi_write<'a>(
865        &'a self,
866        transaction: &mut Transaction<'a>,
867        attribute_id: u64,
868        ranges: &[Range<u64>],
869        buf: MutableBufferRef<'_>,
870    ) -> Result<(), Error> {
871        self.handle.multi_write(transaction, attribute_id, None, ranges, buf).await
872    }
873
874    // `buf` is mutable as an optimization, since the write may require encryption, we can
875    // encrypt the buffer in-place rather than copying to another buffer if the write is
876    // already aligned.
877    //
878    // Note: in the event of power failure during an overwrite() call, it is possible that
879    // old data (which hasn't been overwritten with new bytes yet) may be exposed to the user.
880    // Since the old data should be encrypted, it is probably safe to expose, although not ideal.
881    pub async fn overwrite(
882        &self,
883        mut offset: u64,
884        mut buf: MutableBufferRef<'_>,
885        options: OverwriteOptions,
886    ) -> Result<(), Error> {
887        assert_eq!((buf.len() as u32) % self.store().device.block_size(), 0);
888        let end = offset + buf.len() as u64;
889
890        let key_id = self.get_key(None).await?.0;
891
892        // The transaction only ends up being used if allow_allocations is true
893        let mut transaction =
894            if options.allow_allocations { Some(self.new_transaction().await?) } else { None };
895
896        // We build up a list of writes to perform later
897        let writes = FuturesUnordered::new();
898
899        if options.barrier_on_first_write {
900            self.store().device.barrier();
901        }
902
903        // We create a new scope here, so that the merger iterator will get dropped before we try to
904        // commit our transaction. Otherwise the transaction commit would block.
905        {
906            let store = self.store();
907            let store_object_id = store.store_object_id;
908            let allocator = store.allocator();
909            let tree = &store.tree;
910            let layer_set = tree.layer_set();
911            let mut merger = layer_set.merger();
912            let mut iter = merger
913                .query(Query::FullRange(&ObjectKey::attribute(
914                    self.object_id(),
915                    self.attribute_id(),
916                    AttributeKey::Extent(ExtentKey::search_key_from_offset(offset)),
917                )))
918                .await?;
919            let block_size = self.block_size();
920
921            loop {
922                let (device_offset, bytes_to_write, should_advance) = match iter.get() {
923                    Some(ItemRef {
924                        key:
925                            ObjectKey {
926                                object_id,
927                                data:
928                                    ObjectKeyData::Attribute(
929                                        attribute_id,
930                                        AttributeKey::Extent(ExtentKey { range }),
931                                    ),
932                            },
933                        value: ObjectValue::Extent(ExtentValue::Some { .. }),
934                        ..
935                    }) if *object_id == self.object_id()
936                        && *attribute_id == self.attribute_id()
937                        && range.end == offset =>
938                    {
939                        iter.advance().await?;
940                        continue;
941                    }
942                    Some(ItemRef {
943                        key:
944                            ObjectKey {
945                                object_id,
946                                data:
947                                    ObjectKeyData::Attribute(
948                                        attribute_id,
949                                        AttributeKey::Extent(ExtentKey { range }),
950                                    ),
951                            },
952                        value,
953                        ..
954                    }) if *object_id == self.object_id()
955                        && *attribute_id == self.attribute_id()
956                        && range.start <= offset =>
957                    {
958                        match value {
959                            ObjectValue::Extent(ExtentValue::Some {
960                                device_offset,
961                                mode: ExtentMode::Raw,
962                                ..
963                            }) => {
964                                ensure!(
965                                    range.is_aligned(block_size) && device_offset % block_size == 0,
966                                    FxfsError::Inconsistent
967                                );
968                                let offset_within_extent = offset - range.start;
969                                let remaining_length_of_extent = (range
970                                    .end
971                                    .checked_sub(offset)
972                                    .ok_or(FxfsError::Inconsistent)?)
973                                    as usize;
974                                // Yields (device_offset, bytes_to_write, should_advance)
975                                (
976                                    device_offset + offset_within_extent,
977                                    min(buf.len(), remaining_length_of_extent),
978                                    true,
979                                )
980                            }
981                            ObjectValue::Extent(ExtentValue::Some { .. }) => {
982                                // TODO(https://fxbug.dev/42066056): Maybe we should create
983                                // a new extent without checksums?
984                                bail!(
985                                    "extent from ({},{}) which overlaps offset \
986                                        {} has the wrong extent mode",
987                                    range.start,
988                                    range.end,
989                                    offset
990                                )
991                            }
992                            _ => {
993                                bail!(
994                                    "overwrite failed: extent overlapping offset {} has \
995                                      unexpected ObjectValue",
996                                    offset
997                                )
998                            }
999                        }
1000                    }
1001                    maybe_item_ref => {
1002                        if let Some(transaction) = transaction.as_mut() {
1003                            assert_eq!(options.allow_allocations, true);
1004                            assert_eq!(offset % self.block_size(), 0);
1005
1006                            // We are going to make a new extent, but let's check if there is an
1007                            // extent after us. If there is an extent after us, then we don't want
1008                            // our new extent to bump into it...
1009                            let mut bytes_to_allocate =
1010                                round_up(buf.len() as u64, self.block_size())
1011                                    .ok_or(FxfsError::TooBig)?;
1012                            if let Some(ItemRef {
1013                                key:
1014                                    ObjectKey {
1015                                        object_id,
1016                                        data:
1017                                            ObjectKeyData::Attribute(
1018                                                attribute_id,
1019                                                AttributeKey::Extent(ExtentKey { range }),
1020                                            ),
1021                                    },
1022                                ..
1023                            }) = maybe_item_ref
1024                            {
1025                                if *object_id == self.object_id()
1026                                    && *attribute_id == self.attribute_id()
1027                                    && offset < range.start
1028                                {
1029                                    let bytes_until_next_extent = range.start - offset;
1030                                    bytes_to_allocate =
1031                                        min(bytes_to_allocate, bytes_until_next_extent);
1032                                }
1033                            }
1034
1035                            let device_range = allocator
1036                                .allocate(transaction, store_object_id, bytes_to_allocate)
1037                                .await?;
1038                            let device_range_len = device_range.end - device_range.start;
1039                            transaction.add(
1040                                store_object_id,
1041                                Mutation::insert_object(
1042                                    ObjectKey::extent(
1043                                        self.object_id(),
1044                                        self.attribute_id(),
1045                                        offset..offset + device_range_len,
1046                                    ),
1047                                    ObjectValue::Extent(ExtentValue::new_raw(
1048                                        device_range.start,
1049                                        key_id,
1050                                    )),
1051                                ),
1052                            );
1053
1054                            self.update_allocated_size(transaction, device_range_len, 0).await?;
1055
1056                            // Yields (device_offset, bytes_to_write, should_advance)
1057                            (device_range.start, min(buf.len(), device_range_len as usize), false)
1058                        } else {
1059                            bail!(
1060                                "no extent overlapping offset {}, \
1061                                and new allocations are not allowed",
1062                                offset
1063                            )
1064                        }
1065                    }
1066                };
1067                let (current_buf, remaining_buf) = buf.split_at_mut(bytes_to_write);
1068                writes.push(self.write_at(offset, current_buf, device_offset));
1069                if remaining_buf.len() == 0 {
1070                    break;
1071                } else {
1072                    buf = remaining_buf;
1073                    offset += bytes_to_write as u64;
1074                    if should_advance {
1075                        iter.advance().await?;
1076                    }
1077                }
1078            }
1079        }
1080
1081        self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1082        // The checksums are being ignored here, but we don't need to know them
1083        writes.try_collect::<Vec<MaybeChecksums>>().await?;
1084
1085        if let Some(mut transaction) = transaction {
1086            assert_eq!(options.allow_allocations, true);
1087            if !transaction.is_empty() {
1088                if end > self.get_size() {
1089                    self.grow(&mut transaction, self.get_size(), end).await?;
1090                }
1091                transaction.commit().await?;
1092            }
1093        }
1094
1095        Ok(())
1096    }
1097
1098    // Within a transaction, the size of the object might have changed, so get the size from there
1099    // if it exists, otherwise, fall back on the cached size.
1100    fn txn_get_size(&self, transaction: &Transaction<'_>) -> u64 {
1101        transaction
1102            .get_object_mutation(
1103                self.store().store_object_id,
1104                ObjectKey::attribute(
1105                    self.object_id(),
1106                    self.attribute_id(),
1107                    AttributeKey::Attribute,
1108                ),
1109            )
1110            .and_then(|m| {
1111                if let ObjectItem { value: ObjectValue::Attribute { size, .. }, .. } = m.item {
1112                    Some(size)
1113                } else {
1114                    None
1115                }
1116            })
1117            .unwrap_or_else(|| self.get_size())
1118    }
1119
1120    pub async fn txn_update_size<'a>(
1121        &'a self,
1122        transaction: &mut Transaction<'a>,
1123        new_size: u64,
1124        // Allow callers to update the has_overwrite_extents metadata if they want. If this is
1125        // Some it is set to the value, if None it is left unchanged.
1126        update_has_overwrite_extents: Option<bool>,
1127    ) -> Result<(), Error> {
1128        let key =
1129            ObjectKey::attribute(self.object_id(), self.attribute_id(), AttributeKey::Attribute);
1130        let mut mutation = if let Some(mutation) =
1131            transaction.get_object_mutation(self.store().store_object_id(), key.clone())
1132        {
1133            mutation.clone()
1134        } else {
1135            ObjectStoreMutation {
1136                item: self.store().tree().find(&key).await?.ok_or(FxfsError::NotFound)?,
1137                op: Operation::ReplaceOrInsert,
1138            }
1139        };
1140        if let ObjectValue::Attribute { size, has_overwrite_extents } = &mut mutation.item.value {
1141            *size = new_size;
1142            if let Some(update_has_overwrite_extents) = update_has_overwrite_extents {
1143                *has_overwrite_extents = update_has_overwrite_extents;
1144            }
1145        } else {
1146            bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
1147        }
1148        transaction.add_with_object(
1149            self.store().store_object_id(),
1150            Mutation::ObjectStore(mutation),
1151            AssocObj::Borrowed(self),
1152        );
1153        Ok(())
1154    }
1155
1156    async fn update_allocated_size(
1157        &self,
1158        transaction: &mut Transaction<'_>,
1159        allocated: u64,
1160        deallocated: u64,
1161    ) -> Result<(), Error> {
1162        self.handle.update_allocated_size(transaction, allocated, deallocated).await
1163    }
1164
1165    pub fn truncate_overwrite_ranges(&self, size: u64) -> Result<Option<bool>, Error> {
1166        if self
1167            .overwrite_ranges
1168            .truncate(round_up(size, self.block_size()).ok_or(FxfsError::TooBig)?)
1169        {
1170            // This returns true if there were ranges, but this truncate removed them all, which
1171            // indicates that we need to flip the has_overwrite_extents metadata flag to false.
1172            Ok(Some(false))
1173        } else {
1174            Ok(None)
1175        }
1176    }
1177
1178    pub async fn shrink<'a>(
1179        &'a self,
1180        transaction: &mut Transaction<'a>,
1181        size: u64,
1182        update_has_overwrite_extents: Option<bool>,
1183    ) -> Result<NeedsTrim, Error> {
1184        let needs_trim = self.handle.shrink(transaction, self.attribute_id(), size).await?;
1185        self.txn_update_size(transaction, size, update_has_overwrite_extents).await?;
1186        Ok(needs_trim)
1187    }
1188
1189    pub async fn grow<'a>(
1190        &'a self,
1191        transaction: &mut Transaction<'a>,
1192        old_size: u64,
1193        size: u64,
1194    ) -> Result<(), Error> {
1195        // Before growing the file, we must make sure that a previous trim has completed.
1196        let store = self.store();
1197        while matches!(
1198            store
1199                .trim_some(
1200                    transaction,
1201                    self.object_id(),
1202                    self.attribute_id(),
1203                    TrimMode::FromOffset(old_size)
1204                )
1205                .await?,
1206            TrimResult::Incomplete
1207        ) {
1208            transaction.commit_and_continue().await?;
1209        }
1210        // We might need to zero out the tail of the old last block.
1211        let block_size = self.block_size();
1212        if old_size % block_size != 0 {
1213            let layer_set = store.tree.layer_set();
1214            let mut merger = layer_set.merger();
1215            let aligned_old_size = round_down(old_size, block_size);
1216            let iter = merger
1217                .query(Query::FullRange(&ObjectKey::extent(
1218                    self.object_id(),
1219                    self.attribute_id(),
1220                    aligned_old_size..aligned_old_size + 1,
1221                )))
1222                .await?;
1223            if let Some(ItemRef {
1224                key:
1225                    ObjectKey {
1226                        object_id,
1227                        data:
1228                            ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
1229                    },
1230                value: ObjectValue::Extent(ExtentValue::Some { device_offset, key_id, .. }),
1231                ..
1232            }) = iter.get()
1233            {
1234                if *object_id == self.object_id() && *attribute_id == self.attribute_id() {
1235                    let device_offset = device_offset
1236                        .checked_add(aligned_old_size - extent_key.range.start)
1237                        .ok_or(FxfsError::Inconsistent)?;
1238                    ensure!(device_offset % block_size == 0, FxfsError::Inconsistent);
1239                    let mut buf = self.allocate_buffer(block_size as usize).await;
1240                    // In the case that this extent is in OverwritePartial mode, there is a
1241                    // possibility that the last block is allocated, but not initialized yet, in
1242                    // which case we don't actually need to bother zeroing out the tail. However,
1243                    // it's not strictly incorrect to change uninitialized data, so we skip the
1244                    // check and blindly do it to keep it simpler here.
1245                    self.read_and_decrypt(device_offset, aligned_old_size, buf.as_mut(), *key_id)
1246                        .await?;
1247                    buf.as_mut_slice()[(old_size % block_size) as usize..].fill(0);
1248                    self.multi_write(
1249                        transaction,
1250                        *attribute_id,
1251                        &[aligned_old_size..aligned_old_size + block_size],
1252                        buf.as_mut(),
1253                    )
1254                    .await?;
1255                }
1256            }
1257        }
1258        self.txn_update_size(transaction, size, None).await?;
1259        Ok(())
1260    }
1261
1262    /// Attempts to pre-allocate a `file_range` of bytes for this object.
1263    /// Returns a set of device ranges (i.e. potentially multiple extents).
1264    ///
1265    /// It may not be possible to preallocate the entire requested range in one request
1266    /// due to limitations on transaction size. In such cases, we will preallocate as much as
1267    /// we can up to some (arbitrary, internal) limit on transaction size.
1268    ///
1269    /// `file_range.start` is modified to point at the end of the logical range
1270    /// that was preallocated such that repeated calls to `preallocate_range` with new
1271    /// transactions can be used to preallocate ranges of any size.
1272    ///
1273    /// Requested range must be a multiple of block size.
1274    pub async fn preallocate_range<'a>(
1275        &'a self,
1276        transaction: &mut Transaction<'a>,
1277        file_range: &mut Range<u64>,
1278    ) -> Result<Vec<Range<u64>>, Error> {
1279        let block_size = self.block_size();
1280        assert!(file_range.is_aligned(block_size));
1281        assert!(!self.handle.is_encrypted());
1282        let mut ranges = Vec::new();
1283        let tree = &self.store().tree;
1284        let layer_set = tree.layer_set();
1285        let mut merger = layer_set.merger();
1286        let mut iter = merger
1287            .query(Query::FullRange(&ObjectKey::attribute(
1288                self.object_id(),
1289                self.attribute_id(),
1290                AttributeKey::Extent(ExtentKey::search_key_from_offset(file_range.start)),
1291            )))
1292            .await?;
1293        let mut allocated = 0;
1294        let key_id = self.get_key(None).await?.0;
1295        'outer: while file_range.start < file_range.end {
1296            let allocate_end = loop {
1297                match iter.get() {
1298                    // Case for allocated extents for the same object that overlap with file_range.
1299                    Some(ItemRef {
1300                        key:
1301                            ObjectKey {
1302                                object_id,
1303                                data:
1304                                    ObjectKeyData::Attribute(
1305                                        attribute_id,
1306                                        AttributeKey::Extent(ExtentKey { range }),
1307                                    ),
1308                            },
1309                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1310                        ..
1311                    }) if *object_id == self.object_id()
1312                        && *attribute_id == self.attribute_id()
1313                        && range.start < file_range.end =>
1314                    {
1315                        ensure!(
1316                            range.is_valid()
1317                                && range.is_aligned(block_size)
1318                                && device_offset % block_size == 0,
1319                            FxfsError::Inconsistent
1320                        );
1321                        // If the start of the requested file_range overlaps with an existing extent...
1322                        if range.start <= file_range.start {
1323                            // Record the existing extent and move on.
1324                            let device_range = device_offset
1325                                .checked_add(file_range.start - range.start)
1326                                .ok_or(FxfsError::Inconsistent)?
1327                                ..device_offset
1328                                    .checked_add(min(range.end, file_range.end) - range.start)
1329                                    .ok_or(FxfsError::Inconsistent)?;
1330                            file_range.start += device_range.end - device_range.start;
1331                            ranges.push(device_range);
1332                            if file_range.start >= file_range.end {
1333                                break 'outer;
1334                            }
1335                            iter.advance().await?;
1336                            continue;
1337                        } else {
1338                            // There's nothing allocated between file_range.start and the beginning
1339                            // of this extent.
1340                            break range.start;
1341                        }
1342                    }
1343                    // Case for deleted extents eclipsed by file_range.
1344                    Some(ItemRef {
1345                        key:
1346                            ObjectKey {
1347                                object_id,
1348                                data:
1349                                    ObjectKeyData::Attribute(
1350                                        attribute_id,
1351                                        AttributeKey::Extent(ExtentKey { range }),
1352                                    ),
1353                            },
1354                        value: ObjectValue::Extent(ExtentValue::None),
1355                        ..
1356                    }) if *object_id == self.object_id()
1357                        && *attribute_id == self.attribute_id()
1358                        && range.end < file_range.end =>
1359                    {
1360                        iter.advance().await?;
1361                    }
1362                    _ => {
1363                        // We can just preallocate the rest.
1364                        break file_range.end;
1365                    }
1366                }
1367            };
1368            let device_range = self
1369                .store()
1370                .allocator()
1371                .allocate(
1372                    transaction,
1373                    self.store().store_object_id(),
1374                    allocate_end - file_range.start,
1375                )
1376                .await
1377                .context("Allocation failed")?;
1378            allocated += device_range.end - device_range.start;
1379            let this_file_range =
1380                file_range.start..file_range.start + device_range.end - device_range.start;
1381            file_range.start = this_file_range.end;
1382            transaction.add(
1383                self.store().store_object_id,
1384                Mutation::merge_object(
1385                    ObjectKey::extent(self.object_id(), self.attribute_id(), this_file_range),
1386                    ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
1387                ),
1388            );
1389            ranges.push(device_range);
1390            // If we didn't allocate all that we requested, we'll loop around and try again.
1391            // ... unless we have filled the transaction. The caller should check file_range.
1392            if transaction.mutations().len() > TRANSACTION_MUTATION_THRESHOLD {
1393                break;
1394            }
1395        }
1396        // Update the file size if it changed.
1397        if file_range.start > round_up(self.txn_get_size(transaction), block_size).unwrap() {
1398            self.txn_update_size(transaction, file_range.start, None).await?;
1399        }
1400        self.update_allocated_size(transaction, allocated, 0).await?;
1401        Ok(ranges)
1402    }
1403
1404    pub async fn update_attributes<'a>(
1405        &self,
1406        transaction: &mut Transaction<'a>,
1407        node_attributes: Option<&fio::MutableNodeAttributes>,
1408        change_time: Option<Timestamp>,
1409    ) -> Result<(), Error> {
1410        // This codepath is only called by files, whose wrapping key id users cannot directly set
1411        // as per fscrypt.
1412        ensure!(
1413            !matches!(
1414                node_attributes,
1415                Some(fio::MutableNodeAttributes { wrapping_key_id: Some(_), .. })
1416            ),
1417            FxfsError::BadPath
1418        );
1419        self.handle.update_attributes(transaction, node_attributes, change_time).await
1420    }
1421
1422    /// Get the default set of transaction options for this object. This is mostly the overall
1423    /// default, modified by any [`HandleOptions`] held by this handle.
1424    pub fn default_transaction_options<'b>(&self) -> Options<'b> {
1425        self.handle.default_transaction_options()
1426    }
1427
1428    pub async fn new_transaction<'b>(&self) -> Result<Transaction<'b>, Error> {
1429        self.new_transaction_with_options(self.default_transaction_options()).await
1430    }
1431
1432    pub async fn new_transaction_with_options<'b>(
1433        &self,
1434        options: Options<'b>,
1435    ) -> Result<Transaction<'b>, Error> {
1436        self.handle.new_transaction_with_options(self.attribute_id(), options).await
1437    }
1438
1439    /// Flushes the underlying device.  This is expensive and should be used sparingly.
1440    pub async fn flush_device(&self) -> Result<(), Error> {
1441        self.handle.flush_device().await
1442    }
1443
1444    /// Reads an entire attribute.
1445    pub async fn read_attr(&self, attribute_id: u64) -> Result<Option<Box<[u8]>>, Error> {
1446        self.handle.read_attr(attribute_id).await
1447    }
1448
1449    /// Writes an entire attribute.  This *always* uses the volume data key.
1450    pub async fn write_attr(&self, attribute_id: u64, data: &[u8]) -> Result<(), Error> {
1451        // Must be different attribute otherwise cached size gets out of date.
1452        assert_ne!(attribute_id, self.attribute_id());
1453        let store = self.store();
1454        let mut transaction = self.new_transaction().await?;
1455        if self.handle.write_attr(&mut transaction, attribute_id, data).await?.0 {
1456            transaction.commit_and_continue().await?;
1457            while matches!(
1458                store
1459                    .trim_some(
1460                        &mut transaction,
1461                        self.object_id(),
1462                        attribute_id,
1463                        TrimMode::FromOffset(data.len() as u64),
1464                    )
1465                    .await?,
1466                TrimResult::Incomplete
1467            ) {
1468                transaction.commit_and_continue().await?;
1469            }
1470        }
1471        transaction.commit().await?;
1472        Ok(())
1473    }
1474
1475    async fn read_and_decrypt(
1476        &self,
1477        device_offset: u64,
1478        file_offset: u64,
1479        buffer: MutableBufferRef<'_>,
1480        key_id: u64,
1481    ) -> Result<(), Error> {
1482        self.handle.read_and_decrypt(device_offset, file_offset, buffer, key_id).await
1483    }
1484
1485    /// Truncates a file to a given size (growing/shrinking as required).
1486    ///
1487    /// Nb: Most code will want to call truncate() instead. This method is used
1488    /// to update the super block -- a case where we must borrow metadata space.
1489    pub async fn truncate_with_options(
1490        &self,
1491        options: Options<'_>,
1492        size: u64,
1493    ) -> Result<(), Error> {
1494        let mut transaction = self.new_transaction_with_options(options).await?;
1495        let old_size = self.get_size();
1496        if size == old_size {
1497            return Ok(());
1498        }
1499        if size < old_size {
1500            let update_has_overwrite_ranges = self.truncate_overwrite_ranges(size)?;
1501            if self.shrink(&mut transaction, size, update_has_overwrite_ranges).await?.0 {
1502                // The file needs to be trimmed.
1503                transaction.commit_and_continue().await?;
1504                let store = self.store();
1505                while matches!(
1506                    store
1507                        .trim_some(
1508                            &mut transaction,
1509                            self.object_id(),
1510                            self.attribute_id(),
1511                            TrimMode::FromOffset(size)
1512                        )
1513                        .await?,
1514                    TrimResult::Incomplete
1515                ) {
1516                    if let Err(error) = transaction.commit_and_continue().await {
1517                        warn!(error:?; "Failed to trim after truncate");
1518                        return Ok(());
1519                    }
1520                }
1521                if let Err(error) = transaction.commit().await {
1522                    warn!(error:?; "Failed to trim after truncate");
1523                }
1524                return Ok(());
1525            }
1526        } else {
1527            self.grow(&mut transaction, old_size, size).await?;
1528        }
1529        transaction.commit().await?;
1530        Ok(())
1531    }
1532
1533    pub async fn get_properties(&self) -> Result<ObjectProperties, Error> {
1534        // We don't take a read guard here since the object properties are contained in a single
1535        // object, which cannot be inconsistent with itself. The LSM tree does not return
1536        // intermediate states for a single object.
1537        let item = self
1538            .store()
1539            .tree
1540            .find(&ObjectKey::object(self.object_id()))
1541            .await?
1542            .expect("Unable to find object record");
1543        match item.value {
1544            ObjectValue::Object {
1545                kind: ObjectKind::File { refs, .. },
1546                attributes:
1547                    ObjectAttributes {
1548                        creation_time,
1549                        modification_time,
1550                        posix_attributes,
1551                        allocated_size,
1552                        access_time,
1553                        change_time,
1554                        ..
1555                    },
1556            } => Ok(ObjectProperties {
1557                refs,
1558                allocated_size,
1559                data_attribute_size: self.get_size(),
1560                creation_time,
1561                modification_time,
1562                access_time,
1563                change_time,
1564                sub_dirs: 0,
1565                posix_attributes,
1566                casefold: false,
1567                wrapping_key_id: None,
1568            }),
1569            _ => bail!(FxfsError::NotFile),
1570        }
1571    }
1572
1573    // Returns the contents of this object. This object must be < |limit| bytes in size.
1574    pub async fn contents(&self, limit: usize) -> Result<Box<[u8]>, Error> {
1575        let size = self.get_size();
1576        if size > limit as u64 {
1577            bail!("Object too big ({} > {})", size, limit);
1578        }
1579        let mut buf = self.allocate_buffer(size as usize).await;
1580        self.read(0u64, buf.as_mut()).await?;
1581        Ok(buf.as_slice().into())
1582    }
1583
1584    /// Returns the set of file_offset->extent mappings for this file. The extents will be sorted by
1585    /// their logical offset within the file.
1586    ///
1587    /// *NOTE*: This operation is potentially expensive and should generally be avoided.
1588    pub async fn device_extents(&self) -> Result<Vec<FileExtent>, Error> {
1589        let mut extents = Vec::new();
1590        let tree = &self.store().tree;
1591        let layer_set = tree.layer_set();
1592        let mut merger = layer_set.merger();
1593        let mut iter = merger
1594            .query(Query::FullRange(&ObjectKey::attribute(
1595                self.object_id(),
1596                self.attribute_id(),
1597                AttributeKey::Extent(ExtentKey::search_key_from_offset(0)),
1598            )))
1599            .await?;
1600        loop {
1601            match iter.get() {
1602                Some(ItemRef {
1603                    key:
1604                        ObjectKey {
1605                            object_id,
1606                            data:
1607                                ObjectKeyData::Attribute(
1608                                    attribute_id,
1609                                    AttributeKey::Extent(ExtentKey { range }),
1610                                ),
1611                        },
1612                    value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1613                    ..
1614                }) if *object_id == self.object_id() && *attribute_id == self.attribute_id() => {
1615                    let logical_offset = range.start;
1616                    let device_range = *device_offset..*device_offset + range.length()?;
1617                    extents.push(FileExtent::new(logical_offset, device_range)?);
1618                }
1619                _ => break,
1620            }
1621            iter.advance().await?;
1622        }
1623        Ok(extents)
1624    }
1625}
1626
1627impl<S: HandleOwner> AssociatedObject for DataObjectHandle<S> {
1628    fn will_apply_mutation(&self, mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
1629        match mutation {
1630            Mutation::ObjectStore(ObjectStoreMutation {
1631                item: ObjectItem { value: ObjectValue::Attribute { size, .. }, .. },
1632                ..
1633            }) => self.content_size.store(*size, atomic::Ordering::Relaxed),
1634            Mutation::ObjectStore(ObjectStoreMutation {
1635                item: ObjectItem { value: ObjectValue::VerifiedAttribute { size, .. }, .. },
1636                ..
1637            }) => {
1638                debug_assert_eq!(
1639                    self.get_size(),
1640                    *size,
1641                    "size should be set when verity is enabled and must not change"
1642                );
1643                self.finalize_fsverity_state()
1644            }
1645            Mutation::ObjectStore(ObjectStoreMutation {
1646                item:
1647                    ObjectItem {
1648                        key:
1649                            ObjectKey {
1650                                object_id,
1651                                data:
1652                                    ObjectKeyData::Attribute(
1653                                        attr_id,
1654                                        AttributeKey::Extent(ExtentKey { range }),
1655                                    ),
1656                            },
1657                        value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
1658                        ..
1659                    },
1660                ..
1661            }) if self.object_id() == *object_id && self.attribute_id() == *attr_id => match mode {
1662                ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => {
1663                    self.overwrite_ranges.apply_range(range.clone())
1664                }
1665                ExtentMode::Raw | ExtentMode::Cow(_) => (),
1666            },
1667            _ => {}
1668        }
1669    }
1670}
1671
1672impl<S: HandleOwner> ObjectHandle for DataObjectHandle<S> {
1673    fn set_trace(&self, v: bool) {
1674        self.handle.set_trace(v)
1675    }
1676
1677    fn object_id(&self) -> u64 {
1678        self.handle.object_id()
1679    }
1680
1681    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
1682        self.handle.allocate_buffer(size)
1683    }
1684
1685    fn block_size(&self) -> u64 {
1686        self.handle.block_size()
1687    }
1688}
1689
1690#[async_trait]
1691impl<S: HandleOwner> ReadObjectHandle for DataObjectHandle<S> {
1692    async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
1693        let fs = self.store().filesystem();
1694        let guard = fs
1695            .lock_manager()
1696            .read_lock(lock_keys![LockKey::object_attribute(
1697                self.store().store_object_id,
1698                self.object_id(),
1699                self.attribute_id(),
1700            )])
1701            .await;
1702
1703        let size = self.get_size();
1704        if offset >= size {
1705            return Ok(0);
1706        }
1707        let length = min(buf.len() as u64, size - offset) as usize;
1708        buf = buf.subslice_mut(0..length);
1709        self.handle.read_unchecked(self.attribute_id(), offset, buf.reborrow(), &guard).await?;
1710        if self.is_verified_file() {
1711            self.verify_data(offset as usize, buf.as_slice())?;
1712        }
1713        Ok(length)
1714    }
1715
1716    fn get_size(&self) -> u64 {
1717        self.content_size.load(atomic::Ordering::Relaxed)
1718    }
1719}
1720
1721impl<S: HandleOwner> WriteObjectHandle for DataObjectHandle<S> {
1722    async fn write_or_append(&self, offset: Option<u64>, buf: BufferRef<'_>) -> Result<u64, Error> {
1723        let offset = offset.unwrap_or_else(|| self.get_size());
1724        let mut transaction = self.new_transaction().await?;
1725        self.txn_write(&mut transaction, offset, buf).await?;
1726        let new_size = self.txn_get_size(&transaction);
1727        transaction.commit().await?;
1728        Ok(new_size)
1729    }
1730
1731    async fn truncate(&self, size: u64) -> Result<(), Error> {
1732        self.truncate_with_options(self.default_transaction_options(), size).await
1733    }
1734
1735    async fn flush(&self) -> Result<(), Error> {
1736        Ok(())
1737    }
1738}
1739
1740/// Like object_handle::Writer, but allows custom transaction options to be set, and makes every
1741/// write go directly to the handle in a transaction.
1742pub struct DirectWriter<'a, S: HandleOwner> {
1743    handle: &'a DataObjectHandle<S>,
1744    options: transaction::Options<'a>,
1745    buffer: Buffer<'a>,
1746    offset: u64,
1747    buf_offset: usize,
1748}
1749
1750const BUFFER_SIZE: usize = 1_048_576;
1751
1752impl<S: HandleOwner> Drop for DirectWriter<'_, S> {
1753    fn drop(&mut self) {
1754        if self.buf_offset != 0 {
1755            warn!("DirectWriter: dropping data, did you forget to call complete?");
1756        }
1757    }
1758}
1759
1760impl<'a, S: HandleOwner> DirectWriter<'a, S> {
1761    pub async fn new(
1762        handle: &'a DataObjectHandle<S>,
1763        options: transaction::Options<'a>,
1764    ) -> DirectWriter<'a, S> {
1765        Self {
1766            handle,
1767            options,
1768            buffer: handle.allocate_buffer(BUFFER_SIZE).await,
1769            offset: 0,
1770            buf_offset: 0,
1771        }
1772    }
1773
1774    async fn flush(&mut self) -> Result<(), Error> {
1775        let mut transaction = self.handle.new_transaction_with_options(self.options).await?;
1776        self.handle
1777            .txn_write(&mut transaction, self.offset, self.buffer.subslice(..self.buf_offset))
1778            .await?;
1779        transaction.commit().await?;
1780        self.offset += self.buf_offset as u64;
1781        self.buf_offset = 0;
1782        Ok(())
1783    }
1784}
1785
1786impl<'a, S: HandleOwner> WriteBytes for DirectWriter<'a, S> {
1787    fn block_size(&self) -> u64 {
1788        self.handle.block_size()
1789    }
1790
1791    async fn write_bytes(&mut self, mut buf: &[u8]) -> Result<(), Error> {
1792        while buf.len() > 0 {
1793            let to_do = std::cmp::min(buf.len(), BUFFER_SIZE - self.buf_offset);
1794            self.buffer
1795                .subslice_mut(self.buf_offset..self.buf_offset + to_do)
1796                .as_mut_slice()
1797                .copy_from_slice(&buf[..to_do]);
1798            self.buf_offset += to_do;
1799            if self.buf_offset == BUFFER_SIZE {
1800                self.flush().await?;
1801            }
1802            buf = &buf[to_do..];
1803        }
1804        Ok(())
1805    }
1806
1807    async fn complete(&mut self) -> Result<(), Error> {
1808        self.flush().await?;
1809        Ok(())
1810    }
1811
1812    async fn skip(&mut self, amount: u64) -> Result<(), Error> {
1813        if (BUFFER_SIZE - self.buf_offset) as u64 > amount {
1814            self.buffer
1815                .subslice_mut(self.buf_offset..self.buf_offset + amount as usize)
1816                .as_mut_slice()
1817                .fill(0);
1818            self.buf_offset += amount as usize;
1819        } else {
1820            self.flush().await?;
1821            self.offset += amount;
1822        }
1823        Ok(())
1824    }
1825}
1826
1827#[cfg(test)]
1828mod tests {
1829    use crate::errors::FxfsError;
1830    use crate::filesystem::{
1831        FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem, SyncOptions,
1832    };
1833    use crate::fsck::{
1834        FsckOptions, fsck, fsck_volume, fsck_volume_with_options, fsck_with_options,
1835    };
1836    use crate::lsm_tree::Query;
1837    use crate::lsm_tree::types::{ItemRef, LayerIterator};
1838    use crate::object_handle::{
1839        ObjectHandle, ObjectProperties, ReadObjectHandle, WriteObjectHandle,
1840    };
1841    use crate::object_store::data_object_handle::{OverwriteOptions, WRITE_ATTR_BATCH_SIZE};
1842    use crate::object_store::directory::replace_child;
1843    use crate::object_store::object_record::{ObjectKey, ObjectValue, Timestamp};
1844    use crate::object_store::transaction::{Mutation, Options, lock_keys};
1845    use crate::object_store::volume::root_volume;
1846    use crate::object_store::{
1847        AttributeKey, DataObjectHandle, Directory, ExtentKey, ExtentMode, ExtentValue,
1848        FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions, LockKey, NewChildStoreOptions, ObjectKeyData,
1849        ObjectStore, PosixAttributes, StoreOptions, TRANSACTION_MUTATION_THRESHOLD,
1850    };
1851    use crate::range::RangeExt;
1852    use crate::round::{round_down, round_up};
1853    use assert_matches::assert_matches;
1854    use bit_vec::BitVec;
1855    use fuchsia_sync::Mutex;
1856    use futures::FutureExt;
1857    use futures::channel::oneshot::channel;
1858    use futures::stream::{FuturesUnordered, StreamExt};
1859    use fxfs_crypto::{Crypt, EncryptionKey, KeyPurpose};
1860    use fxfs_insecure_crypto::InsecureCrypt;
1861    use mundane::hash::{Digest, Hasher, Sha256};
1862    use std::ops::Range;
1863    use std::sync::Arc;
1864    use std::time::Duration;
1865    use storage_device::DeviceHolder;
1866    use storage_device::fake_device::FakeDevice;
1867    use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
1868
1869    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1870
1871    // Some tests (the preallocate_range ones) currently assume that the data only occupies a single
1872    // device block.
1873    const TEST_DATA_OFFSET: u64 = 5000;
1874    const TEST_DATA: &[u8] = b"hello";
1875    const TEST_OBJECT_SIZE: u64 = 5678;
1876    const TEST_OBJECT_ALLOCATED_SIZE: u64 = 4096;
1877    const TEST_OBJECT_NAME: &str = "foo";
1878
1879    async fn test_filesystem() -> OpenFxFilesystem {
1880        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1881        FxFilesystem::new_empty(device).await.expect("new_empty failed")
1882    }
1883
1884    async fn test_filesystem_and_object_with_key(
1885        crypt: Option<&dyn Crypt>,
1886        write_object_test_data: bool,
1887    ) -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
1888        let fs = test_filesystem().await;
1889        let store = fs.root_store();
1890        let object;
1891
1892        let mut transaction = fs
1893            .clone()
1894            .new_transaction(
1895                lock_keys![LockKey::object(
1896                    store.store_object_id(),
1897                    store.root_directory_object_id()
1898                )],
1899                Options::default(),
1900            )
1901            .await
1902            .expect("new_transaction failed");
1903
1904        object = if let Some(crypt) = crypt {
1905            let object_id = store.get_next_object_id(transaction.txn_guard()).await.unwrap();
1906            let (key, unwrapped_key) = crypt.create_key(object_id, KeyPurpose::Data).await.unwrap();
1907            ObjectStore::create_object_with_key(
1908                &store,
1909                &mut transaction,
1910                object_id,
1911                HandleOptions::default(),
1912                EncryptionKey::Fxfs(key),
1913                unwrapped_key,
1914            )
1915            .await
1916            .expect("create_object failed")
1917        } else {
1918            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
1919                .await
1920                .expect("create_object failed")
1921        };
1922
1923        let root_directory =
1924            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1925        root_directory
1926            .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
1927            .await
1928            .expect("add_child_file failed");
1929
1930        if write_object_test_data {
1931            let align = TEST_DATA_OFFSET as usize % TEST_DEVICE_BLOCK_SIZE as usize;
1932            let mut buf = object.allocate_buffer(align + TEST_DATA.len()).await;
1933            buf.as_mut_slice()[align..].copy_from_slice(TEST_DATA);
1934            object
1935                .txn_write(&mut transaction, TEST_DATA_OFFSET, buf.subslice(align..))
1936                .await
1937                .expect("write failed");
1938        }
1939        transaction.commit().await.expect("commit failed");
1940        object.truncate(TEST_OBJECT_SIZE).await.expect("truncate failed");
1941        (fs, object)
1942    }
1943
1944    async fn test_filesystem_and_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
1945        test_filesystem_and_object_with_key(Some(&InsecureCrypt::new()), true).await
1946    }
1947
1948    async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
1949    {
1950        test_filesystem_and_object_with_key(Some(&InsecureCrypt::new()), false).await
1951    }
1952
1953    #[fuchsia::test]
1954    async fn test_zero_buf_len_read() {
1955        let (fs, object) = test_filesystem_and_object().await;
1956        let mut buf = object.allocate_buffer(0).await;
1957        assert_eq!(object.read(0u64, buf.as_mut()).await.expect("read failed"), 0);
1958        fs.close().await.expect("Close failed");
1959    }
1960
1961    #[fuchsia::test]
1962    async fn test_beyond_eof_read() {
1963        let (fs, object) = test_filesystem_and_object().await;
1964        let offset = TEST_OBJECT_SIZE as usize - 2;
1965        let align = offset % fs.block_size() as usize;
1966        let len: usize = 2;
1967        let mut buf = object.allocate_buffer(align + len + 1).await;
1968        buf.as_mut_slice().fill(123u8);
1969        assert_eq!(
1970            object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
1971            align + len
1972        );
1973        assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
1974        assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
1975        fs.close().await.expect("Close failed");
1976    }
1977
1978    #[fuchsia::test]
1979    async fn test_beyond_eof_read_from() {
1980        let (fs, object) = test_filesystem_and_object().await;
1981        let handle = &*object;
1982        let offset = TEST_OBJECT_SIZE as usize - 2;
1983        let align = offset % fs.block_size() as usize;
1984        let len: usize = 2;
1985        let mut buf = object.allocate_buffer(align + len + 1).await;
1986        buf.as_mut_slice().fill(123u8);
1987        assert_eq!(
1988            handle.read(0, (offset - align) as u64, buf.as_mut()).await.expect("read failed"),
1989            align + len
1990        );
1991        assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
1992        assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
1993        fs.close().await.expect("Close failed");
1994    }
1995
1996    #[fuchsia::test]
1997    async fn test_beyond_eof_read_unchecked() {
1998        let (fs, object) = test_filesystem_and_object().await;
1999        let offset = TEST_OBJECT_SIZE as usize - 2;
2000        let align = offset % fs.block_size() as usize;
2001        let len: usize = 2;
2002        let mut buf = object.allocate_buffer(align + len + 1).await;
2003        buf.as_mut_slice().fill(123u8);
2004        let guard = fs
2005            .lock_manager()
2006            .read_lock(lock_keys![LockKey::object_attribute(
2007                object.store().store_object_id,
2008                object.object_id(),
2009                0,
2010            )])
2011            .await;
2012        object
2013            .read_unchecked(0, (offset - align) as u64, buf.as_mut(), &guard)
2014            .await
2015            .expect("read failed");
2016        assert_eq!(&buf.as_slice()[align..], &vec![0u8; len + 1]);
2017        fs.close().await.expect("Close failed");
2018    }
2019
2020    #[fuchsia::test]
2021    async fn test_read_sparse() {
2022        let (fs, object) = test_filesystem_and_object().await;
2023        // Deliberately read not right to eof.
2024        let len = TEST_OBJECT_SIZE as usize - 1;
2025        let mut buf = object.allocate_buffer(len).await;
2026        buf.as_mut_slice().fill(123u8);
2027        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
2028        let mut expected = vec![0; len];
2029        let offset = TEST_DATA_OFFSET as usize;
2030        expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
2031        assert_eq!(buf.as_slice()[..len], expected[..]);
2032        fs.close().await.expect("Close failed");
2033    }
2034
2035    #[fuchsia::test]
2036    async fn test_read_after_writes_interspersed_with_flush() {
2037        let (fs, object) = test_filesystem_and_object().await;
2038
2039        object.owner().flush().await.expect("flush failed");
2040
2041        // Write more test data to the first block fo the file.
2042        let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
2043        buf.as_mut_slice().copy_from_slice(TEST_DATA);
2044        object.write_or_append(Some(0u64), buf.as_ref()).await.expect("write failed");
2045
2046        let len = TEST_OBJECT_SIZE as usize - 1;
2047        let mut buf = object.allocate_buffer(len).await;
2048        buf.as_mut_slice().fill(123u8);
2049        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
2050
2051        let mut expected = vec![0u8; len];
2052        let offset = TEST_DATA_OFFSET as usize;
2053        expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
2054        expected[..TEST_DATA.len()].copy_from_slice(TEST_DATA);
2055        assert_eq!(buf.as_slice(), &expected);
2056        fs.close().await.expect("Close failed");
2057    }
2058
2059    #[fuchsia::test]
2060    async fn test_read_after_truncate_and_extend() {
2061        let (fs, object) = test_filesystem_and_object().await;
2062
2063        // Arrange for there to be <extent><deleted-extent><extent>.
2064        let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
2065        buf.as_mut_slice().copy_from_slice(TEST_DATA);
2066        // This adds an extent at 0..512.
2067        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2068        // This deletes 512..1024.
2069        object.truncate(3).await.expect("truncate failed");
2070        let data = b"foo";
2071        let offset = 1500u64;
2072        let align = (offset % fs.block_size() as u64) as usize;
2073        let mut buf = object.allocate_buffer(align + data.len()).await;
2074        buf.as_mut_slice()[align..].copy_from_slice(data);
2075        // This adds 1024..1536.
2076        object.write_or_append(Some(1500), buf.subslice(align..)).await.expect("write failed");
2077
2078        const LEN1: usize = 1503;
2079        let mut buf = object.allocate_buffer(LEN1).await;
2080        buf.as_mut_slice().fill(123u8);
2081        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN1);
2082        let mut expected = [0; LEN1];
2083        expected[..3].copy_from_slice(&TEST_DATA[..3]);
2084        expected[1500..].copy_from_slice(b"foo");
2085        assert_eq!(buf.as_slice(), &expected);
2086
2087        // Also test a read that ends midway through the deleted extent.
2088        const LEN2: usize = 601;
2089        let mut buf = object.allocate_buffer(LEN2).await;
2090        buf.as_mut_slice().fill(123u8);
2091        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN2);
2092        assert_eq!(buf.as_slice(), &expected[..LEN2]);
2093        fs.close().await.expect("Close failed");
2094    }
2095
2096    #[fuchsia::test]
2097    async fn test_read_whole_blocks_with_multiple_objects() {
2098        let (fs, object) = test_filesystem_and_object().await;
2099        let block_size = object.block_size() as usize;
2100        let mut buffer = object.allocate_buffer(block_size).await;
2101        buffer.as_mut_slice().fill(0xaf);
2102        object.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2103
2104        let store = object.owner();
2105        let mut transaction = fs
2106            .clone()
2107            .new_transaction(lock_keys![], Options::default())
2108            .await
2109            .expect("new_transaction failed");
2110        let object2 =
2111            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2112                .await
2113                .expect("create_object failed");
2114        transaction.commit().await.expect("commit failed");
2115        let mut ef_buffer = object.allocate_buffer(block_size).await;
2116        ef_buffer.as_mut_slice().fill(0xef);
2117        object2.write_or_append(Some(0), ef_buffer.as_ref()).await.expect("write failed");
2118
2119        let mut buffer = object.allocate_buffer(block_size).await;
2120        buffer.as_mut_slice().fill(0xaf);
2121        object
2122            .write_or_append(Some(block_size as u64), buffer.as_ref())
2123            .await
2124            .expect("write failed");
2125        object.truncate(3 * block_size as u64).await.expect("truncate failed");
2126        object2
2127            .write_or_append(Some(block_size as u64), ef_buffer.as_ref())
2128            .await
2129            .expect("write failed");
2130
2131        let mut buffer = object.allocate_buffer(4 * block_size).await;
2132        buffer.as_mut_slice().fill(123);
2133        assert_eq!(object.read(0, buffer.as_mut()).await.expect("read failed"), 3 * block_size);
2134        assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xaf; 2 * block_size]);
2135        assert_eq!(&buffer.as_slice()[2 * block_size..3 * block_size], &vec![0; block_size]);
2136        assert_eq!(object2.read(0, buffer.as_mut()).await.expect("read failed"), 2 * block_size);
2137        assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xef; 2 * block_size]);
2138        fs.close().await.expect("Close failed");
2139    }
2140
2141    #[fuchsia::test]
2142    async fn test_alignment() {
2143        let (fs, object) = test_filesystem_and_object().await;
2144
2145        struct AlignTest {
2146            fill: u8,
2147            object: DataObjectHandle<ObjectStore>,
2148            mirror: Vec<u8>,
2149        }
2150
2151        impl AlignTest {
2152            async fn new(object: DataObjectHandle<ObjectStore>) -> Self {
2153                let mirror = {
2154                    let mut buf = object.allocate_buffer(object.get_size() as usize).await;
2155                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2156                    buf.as_slice().to_vec()
2157                };
2158                Self { fill: 0, object, mirror }
2159            }
2160
2161            // Fills |range| of self.object with a byte value (self.fill) and mirrors the same
2162            // operation to an in-memory copy of the object.
2163            // Each subsequent call bumps the value of fill.
2164            // It is expected that the object and its mirror maintain identical content.
2165            async fn test(&mut self, range: Range<u64>) {
2166                let mut buf = self.object.allocate_buffer((range.end - range.start) as usize).await;
2167                self.fill += 1;
2168                buf.as_mut_slice().fill(self.fill);
2169                self.object
2170                    .write_or_append(Some(range.start), buf.as_ref())
2171                    .await
2172                    .expect("write_or_append failed");
2173                if range.end > self.mirror.len() as u64 {
2174                    self.mirror.resize(range.end as usize, 0);
2175                }
2176                self.mirror[range.start as usize..range.end as usize].fill(self.fill);
2177                let mut buf = self.object.allocate_buffer(self.mirror.len() + 1).await;
2178                assert_eq!(
2179                    self.object.read(0, buf.as_mut()).await.expect("read failed"),
2180                    self.mirror.len()
2181                );
2182                assert_eq!(&buf.as_slice()[..self.mirror.len()], self.mirror.as_slice());
2183            }
2184        }
2185
2186        let block_size = object.block_size() as u64;
2187        let mut align = AlignTest::new(object).await;
2188
2189        // Fill the object to start with (with 1).
2190        align.test(0..2 * block_size + 1).await;
2191
2192        // Unaligned head (fills with 2, overwrites that with 3).
2193        align.test(1..block_size).await;
2194        align.test(1..2 * block_size).await;
2195
2196        // Unaligned tail (fills with 4 and 5).
2197        align.test(0..block_size - 1).await;
2198        align.test(0..2 * block_size - 1).await;
2199
2200        // Both unaligned (fills with 6 and 7).
2201        align.test(1..block_size - 1).await;
2202        align.test(1..2 * block_size - 1).await;
2203
2204        fs.close().await.expect("Close failed");
2205    }
2206
2207    async fn test_preallocate_common(fs: &FxFilesystem, object: DataObjectHandle<ObjectStore>) {
2208        let allocator = fs.allocator();
2209        let allocated_before = allocator.get_allocated_bytes();
2210        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2211        object
2212            .preallocate_range(&mut transaction, &mut (0..fs.block_size() as u64))
2213            .await
2214            .expect("preallocate_range failed");
2215        transaction.commit().await.expect("commit failed");
2216        assert!(object.get_size() < 1048576);
2217        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2218        object
2219            .preallocate_range(&mut transaction, &mut (0..1048576))
2220            .await
2221            .expect("preallocate_range failed");
2222        transaction.commit().await.expect("commit failed");
2223        assert_eq!(object.get_size(), 1048576);
2224        // Check that it didn't reallocate the space for the existing extent
2225        let allocated_after = allocator.get_allocated_bytes();
2226        assert_eq!(allocated_after - allocated_before, 1048576 - fs.block_size() as u64);
2227
2228        let mut buf = object
2229            .allocate_buffer(round_up(TEST_DATA_OFFSET, fs.block_size()).unwrap() as usize)
2230            .await;
2231        buf.as_mut_slice().fill(47);
2232        object
2233            .write_or_append(Some(0), buf.subslice(..TEST_DATA_OFFSET as usize))
2234            .await
2235            .expect("write failed");
2236        buf.as_mut_slice().fill(95);
2237        let offset = round_up(TEST_OBJECT_SIZE, fs.block_size()).unwrap();
2238        object
2239            .overwrite(offset, buf.as_mut(), OverwriteOptions::default())
2240            .await
2241            .expect("write failed");
2242
2243        // Make sure there were no more allocations.
2244        assert_eq!(allocator.get_allocated_bytes(), allocated_after);
2245
2246        // Read back the data and make sure it is what we expect.
2247        let mut buf = object.allocate_buffer(104876).await;
2248        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2249        assert_eq!(&buf.as_slice()[..TEST_DATA_OFFSET as usize], &[47; TEST_DATA_OFFSET as usize]);
2250        assert_eq!(
2251            &buf.as_slice()[TEST_DATA_OFFSET as usize..TEST_DATA_OFFSET as usize + TEST_DATA.len()],
2252            TEST_DATA
2253        );
2254        assert_eq!(&buf.as_slice()[offset as usize..offset as usize + 2048], &[95; 2048]);
2255    }
2256
2257    #[fuchsia::test]
2258    async fn test_preallocate_range() {
2259        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2260        test_preallocate_common(&fs, object).await;
2261        fs.close().await.expect("Close failed");
2262    }
2263
2264    // This is identical to the previous test except that we flush so that extents end up in
2265    // different layers.
2266    #[fuchsia::test]
2267    async fn test_preallocate_succeeds_when_extents_are_in_different_layers() {
2268        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2269        object.owner().flush().await.expect("flush failed");
2270        test_preallocate_common(&fs, object).await;
2271        fs.close().await.expect("Close failed");
2272    }
2273
2274    #[fuchsia::test]
2275    async fn test_already_preallocated() {
2276        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2277        let allocator = fs.allocator();
2278        let allocated_before = allocator.get_allocated_bytes();
2279        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2280        let offset = TEST_DATA_OFFSET - TEST_DATA_OFFSET % fs.block_size() as u64;
2281        object
2282            .preallocate_range(&mut transaction, &mut (offset..offset + fs.block_size() as u64))
2283            .await
2284            .expect("preallocate_range failed");
2285        transaction.commit().await.expect("commit failed");
2286        // Check that it didn't reallocate any new space.
2287        assert_eq!(allocator.get_allocated_bytes(), allocated_before);
2288        fs.close().await.expect("Close failed");
2289    }
2290
2291    #[fuchsia::test]
2292    async fn test_overwrite_when_preallocated_at_start_of_file() {
2293        // The standard test data we put in the test object would cause an extent with checksums
2294        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2295        let (fs, object) = test_filesystem_and_empty_object().await;
2296
2297        let object = ObjectStore::open_object(
2298            object.owner(),
2299            object.object_id(),
2300            HandleOptions::default(),
2301            None,
2302        )
2303        .await
2304        .expect("open_object failed");
2305
2306        assert_eq!(fs.block_size(), 4096);
2307
2308        let mut write_buf = object.allocate_buffer(4096).await;
2309        write_buf.as_mut_slice().fill(95);
2310
2311        // First try to overwrite without allowing allocations
2312        // We expect this to fail, since nothing is allocated yet
2313        object
2314            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2315            .await
2316            .expect_err("overwrite succeeded");
2317
2318        // Now preallocate some space (exactly one block)
2319        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2320        object
2321            .preallocate_range(&mut transaction, &mut (0..4096 as u64))
2322            .await
2323            .expect("preallocate_range failed");
2324        transaction.commit().await.expect("commit failed");
2325
2326        // Now try the same overwrite command as before, it should work this time,
2327        // even with allocations disabled...
2328        {
2329            let mut read_buf = object.allocate_buffer(4096).await;
2330            object.read(0, read_buf.as_mut()).await.expect("read failed");
2331            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2332        }
2333        object
2334            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2335            .await
2336            .expect("overwrite failed");
2337        {
2338            let mut read_buf = object.allocate_buffer(4096).await;
2339            object.read(0, read_buf.as_mut()).await.expect("read failed");
2340            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2341        }
2342
2343        // Now try to overwrite at offset 4096. We expect this to fail, since we only preallocated
2344        // one block earlier at offset 0
2345        object
2346            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2347            .await
2348            .expect_err("overwrite succeeded");
2349
2350        // We can't assert anything about the existing bytes, because they haven't been allocated
2351        // yet and they could contain any values
2352        object
2353            .overwrite(
2354                4096,
2355                write_buf.as_mut(),
2356                OverwriteOptions { allow_allocations: true, ..Default::default() },
2357            )
2358            .await
2359            .expect("overwrite failed");
2360        {
2361            let mut read_buf = object.allocate_buffer(4096).await;
2362            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2363            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2364        }
2365
2366        // Check that the overwrites haven't messed up the filesystem state
2367        let fsck_options = FsckOptions {
2368            fail_on_warning: true,
2369            no_lock: true,
2370            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2371            ..Default::default()
2372        };
2373        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2374
2375        fs.close().await.expect("Close failed");
2376    }
2377
2378    #[fuchsia::test]
2379    async fn test_overwrite_large_buffer_and_file_with_many_holes() {
2380        // The standard test data we put in the test object would cause an extent with checksums
2381        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2382        let (fs, object) = test_filesystem_and_empty_object().await;
2383
2384        let object = ObjectStore::open_object(
2385            object.owner(),
2386            object.object_id(),
2387            HandleOptions::default(),
2388            None,
2389        )
2390        .await
2391        .expect("open_object failed");
2392
2393        assert_eq!(fs.block_size(), 4096);
2394        assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2395
2396        // Let's create some non-holes
2397        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2398        object
2399            .preallocate_range(&mut transaction, &mut (4096..8192 as u64))
2400            .await
2401            .expect("preallocate_range failed");
2402        object
2403            .preallocate_range(&mut transaction, &mut (16384..32768 as u64))
2404            .await
2405            .expect("preallocate_range failed");
2406        object
2407            .preallocate_range(&mut transaction, &mut (65536..131072 as u64))
2408            .await
2409            .expect("preallocate_range failed");
2410        object
2411            .preallocate_range(&mut transaction, &mut (262144..524288 as u64))
2412            .await
2413            .expect("preallocate_range failed");
2414        transaction.commit().await.expect("commit failed");
2415
2416        assert_eq!(object.get_size(), 524288);
2417
2418        let mut write_buf = object.allocate_buffer(4096).await;
2419        write_buf.as_mut_slice().fill(95);
2420
2421        // We shouldn't be able to overwrite in the holes if new allocations aren't enabled
2422        object
2423            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2424            .await
2425            .expect_err("overwrite succeeded");
2426        object
2427            .overwrite(8192, write_buf.as_mut(), OverwriteOptions::default())
2428            .await
2429            .expect_err("overwrite succeeded");
2430        object
2431            .overwrite(32768, write_buf.as_mut(), OverwriteOptions::default())
2432            .await
2433            .expect_err("overwrite succeeded");
2434        object
2435            .overwrite(131072, write_buf.as_mut(), OverwriteOptions::default())
2436            .await
2437            .expect_err("overwrite succeeded");
2438
2439        // But we should be able to overwrite in the prealloc'd areas without needing allocations
2440        {
2441            let mut read_buf = object.allocate_buffer(4096).await;
2442            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2443            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2444        }
2445        object
2446            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2447            .await
2448            .expect("overwrite failed");
2449        {
2450            let mut read_buf = object.allocate_buffer(4096).await;
2451            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2452            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2453        }
2454        {
2455            let mut read_buf = object.allocate_buffer(4096).await;
2456            object.read(16384, read_buf.as_mut()).await.expect("read failed");
2457            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2458        }
2459        object
2460            .overwrite(16384, write_buf.as_mut(), OverwriteOptions::default())
2461            .await
2462            .expect("overwrite failed");
2463        {
2464            let mut read_buf = object.allocate_buffer(4096).await;
2465            object.read(16384, read_buf.as_mut()).await.expect("read failed");
2466            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2467        }
2468        {
2469            let mut read_buf = object.allocate_buffer(4096).await;
2470            object.read(65536, read_buf.as_mut()).await.expect("read failed");
2471            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2472        }
2473        object
2474            .overwrite(65536, write_buf.as_mut(), OverwriteOptions::default())
2475            .await
2476            .expect("overwrite failed");
2477        {
2478            let mut read_buf = object.allocate_buffer(4096).await;
2479            object.read(65536, read_buf.as_mut()).await.expect("read failed");
2480            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2481        }
2482        {
2483            let mut read_buf = object.allocate_buffer(4096).await;
2484            object.read(262144, read_buf.as_mut()).await.expect("read failed");
2485            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2486        }
2487        object
2488            .overwrite(262144, write_buf.as_mut(), OverwriteOptions::default())
2489            .await
2490            .expect("overwrite failed");
2491        {
2492            let mut read_buf = object.allocate_buffer(4096).await;
2493            object.read(262144, read_buf.as_mut()).await.expect("read failed");
2494            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2495        }
2496
2497        // Now let's try to do a huge overwrite, that spans over many holes and non-holes
2498        let mut huge_write_buf = object.allocate_buffer(524288).await;
2499        huge_write_buf.as_mut_slice().fill(96);
2500
2501        // With allocations disabled, the big overwrite should fail...
2502        object
2503            .overwrite(0, huge_write_buf.as_mut(), OverwriteOptions::default())
2504            .await
2505            .expect_err("overwrite succeeded");
2506        // ... but it should work when allocations are enabled
2507        object
2508            .overwrite(
2509                0,
2510                huge_write_buf.as_mut(),
2511                OverwriteOptions { allow_allocations: true, ..Default::default() },
2512            )
2513            .await
2514            .expect("overwrite failed");
2515        {
2516            let mut read_buf = object.allocate_buffer(524288).await;
2517            object.read(0, read_buf.as_mut()).await.expect("read failed");
2518            assert_eq!(&read_buf.as_slice(), &[96; 524288]);
2519        }
2520
2521        // Check that the overwrites haven't messed up the filesystem state
2522        let fsck_options = FsckOptions {
2523            fail_on_warning: true,
2524            no_lock: true,
2525            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2526            ..Default::default()
2527        };
2528        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2529
2530        fs.close().await.expect("Close failed");
2531    }
2532
2533    #[fuchsia::test]
2534    async fn test_overwrite_when_unallocated_at_start_of_file() {
2535        // The standard test data we put in the test object would cause an extent with checksums
2536        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2537        let (fs, object) = test_filesystem_and_empty_object().await;
2538
2539        let object = ObjectStore::open_object(
2540            object.owner(),
2541            object.object_id(),
2542            HandleOptions::default(),
2543            None,
2544        )
2545        .await
2546        .expect("open_object failed");
2547
2548        assert_eq!(fs.block_size(), 4096);
2549
2550        let mut write_buf = object.allocate_buffer(4096).await;
2551        write_buf.as_mut_slice().fill(95);
2552
2553        // First try to overwrite without allowing allocations
2554        // We expect this to fail, since nothing is allocated yet
2555        object
2556            .overwrite(0, write_buf.as_mut(), OverwriteOptions::default())
2557            .await
2558            .expect_err("overwrite succeeded");
2559
2560        // Now try the same overwrite command as before, but allow allocations
2561        object
2562            .overwrite(
2563                0,
2564                write_buf.as_mut(),
2565                OverwriteOptions { allow_allocations: true, ..Default::default() },
2566            )
2567            .await
2568            .expect("overwrite failed");
2569        {
2570            let mut read_buf = object.allocate_buffer(4096).await;
2571            object.read(0, read_buf.as_mut()).await.expect("read failed");
2572            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2573        }
2574
2575        // Now try to overwrite at the next block. This should fail if allocations are disabled
2576        object
2577            .overwrite(4096, write_buf.as_mut(), OverwriteOptions::default())
2578            .await
2579            .expect_err("overwrite succeeded");
2580
2581        // ... but it should work if allocations are enabled
2582        object
2583            .overwrite(
2584                4096,
2585                write_buf.as_mut(),
2586                OverwriteOptions { allow_allocations: true, ..Default::default() },
2587            )
2588            .await
2589            .expect("overwrite failed");
2590        {
2591            let mut read_buf = object.allocate_buffer(4096).await;
2592            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2593            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2594        }
2595
2596        // Check that the overwrites haven't messed up the filesystem state
2597        let fsck_options = FsckOptions {
2598            fail_on_warning: true,
2599            no_lock: true,
2600            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2601            ..Default::default()
2602        };
2603        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2604
2605        fs.close().await.expect("Close failed");
2606    }
2607
2608    #[fuchsia::test]
2609    async fn test_overwrite_can_extend_a_file() {
2610        // The standard test data we put in the test object would cause an extent with checksums
2611        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2612        let (fs, object) = test_filesystem_and_empty_object().await;
2613
2614        let object = ObjectStore::open_object(
2615            object.owner(),
2616            object.object_id(),
2617            HandleOptions::default(),
2618            None,
2619        )
2620        .await
2621        .expect("open_object failed");
2622
2623        assert_eq!(fs.block_size(), 4096);
2624        assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2625
2626        let mut write_buf = object.allocate_buffer(4096).await;
2627        write_buf.as_mut_slice().fill(95);
2628
2629        // Let's try to fill up the last block, and increase the file size in doing so
2630        let last_block_offset = round_down(TEST_OBJECT_SIZE, 4096 as u32);
2631
2632        // Expected to fail with allocations disabled
2633        object
2634            .overwrite(last_block_offset, write_buf.as_mut(), OverwriteOptions::default())
2635            .await
2636            .expect_err("overwrite succeeded");
2637        // ... but expected to succeed with allocations enabled
2638        object
2639            .overwrite(
2640                last_block_offset,
2641                write_buf.as_mut(),
2642                OverwriteOptions { allow_allocations: true, ..Default::default() },
2643            )
2644            .await
2645            .expect("overwrite failed");
2646        {
2647            let mut read_buf = object.allocate_buffer(4096).await;
2648            object.read(last_block_offset, read_buf.as_mut()).await.expect("read failed");
2649            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2650        }
2651
2652        assert_eq!(object.get_size(), 8192);
2653
2654        // Let's try to write at the next block, too
2655        let next_block_offset = round_up(TEST_OBJECT_SIZE, 4096 as u32).unwrap();
2656
2657        // Expected to fail with allocations disabled
2658        object
2659            .overwrite(next_block_offset, write_buf.as_mut(), OverwriteOptions::default())
2660            .await
2661            .expect_err("overwrite succeeded");
2662        // ... but expected to succeed with allocations enabled
2663        object
2664            .overwrite(
2665                next_block_offset,
2666                write_buf.as_mut(),
2667                OverwriteOptions { allow_allocations: true, ..Default::default() },
2668            )
2669            .await
2670            .expect("overwrite failed");
2671        {
2672            let mut read_buf = object.allocate_buffer(4096).await;
2673            object.read(next_block_offset, read_buf.as_mut()).await.expect("read failed");
2674            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2675        }
2676
2677        assert_eq!(object.get_size(), 12288);
2678
2679        // Check that the overwrites haven't messed up the filesystem state
2680        let fsck_options = FsckOptions {
2681            fail_on_warning: true,
2682            no_lock: true,
2683            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2684            ..Default::default()
2685        };
2686        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2687
2688        fs.close().await.expect("Close failed");
2689    }
2690
2691    #[fuchsia::test]
2692    async fn test_enable_verity() {
2693        let fs: OpenFxFilesystem = test_filesystem().await;
2694        let mut transaction = fs
2695            .clone()
2696            .new_transaction(lock_keys![], Options::default())
2697            .await
2698            .expect("new_transaction failed");
2699        let store = fs.root_store();
2700        let object = Arc::new(
2701            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2702                .await
2703                .expect("create_object failed"),
2704        );
2705
2706        transaction.commit().await.unwrap();
2707
2708        object
2709            .enable_verity(fio::VerificationOptions {
2710                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2711                salt: Some(vec![]),
2712                ..Default::default()
2713            })
2714            .await
2715            .expect("set verified file metadata failed");
2716
2717        let handle =
2718            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2719                .await
2720                .expect("open_object failed");
2721
2722        assert!(handle.is_verified_file());
2723
2724        fs.close().await.expect("Close failed");
2725    }
2726
2727    #[fuchsia::test]
2728    async fn test_enable_verity_large_file() {
2729        // Need to make a large FakeDevice to create space for a 67 MB file.
2730        let device = DeviceHolder::new(FakeDevice::new(262144, TEST_DEVICE_BLOCK_SIZE));
2731        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2732        let root_store = fs.root_store();
2733        let mut transaction = fs
2734            .clone()
2735            .new_transaction(lock_keys![], Options::default())
2736            .await
2737            .expect("new_transaction failed");
2738
2739        let handle = ObjectStore::create_object(
2740            &root_store,
2741            &mut transaction,
2742            HandleOptions::default(),
2743            None,
2744        )
2745        .await
2746        .expect("failed to create object");
2747        transaction.commit().await.expect("commit failed");
2748        let mut offset = 0;
2749
2750        // Write a file big enough to trigger multiple transactions on enable_verity().
2751        let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2752        buf.as_mut_slice().fill(1);
2753        for _ in 0..130 {
2754            handle.write_or_append(Some(offset), buf.as_ref()).await.expect("write failed");
2755            offset += WRITE_ATTR_BATCH_SIZE as u64;
2756        }
2757
2758        handle
2759            .enable_verity(fio::VerificationOptions {
2760                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2761                salt: Some(vec![]),
2762                ..Default::default()
2763            })
2764            .await
2765            .expect("set verified file metadata failed");
2766
2767        let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2768        offset = 0;
2769        for _ in 0..130 {
2770            handle.read(offset, buf.as_mut()).await.expect("verification during read should fail");
2771            assert_eq!(buf.as_slice(), &[1; WRITE_ATTR_BATCH_SIZE]);
2772            offset += WRITE_ATTR_BATCH_SIZE as u64;
2773        }
2774
2775        fsck(fs.clone()).await.expect("fsck failed");
2776        fs.close().await.expect("Close failed");
2777    }
2778
2779    #[fuchsia::test]
2780    async fn test_retry_enable_verity_on_reboot() {
2781        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2782        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2783        let root_store = fs.root_store();
2784        let mut transaction = fs
2785            .clone()
2786            .new_transaction(lock_keys![], Options::default())
2787            .await
2788            .expect("new_transaction failed");
2789
2790        let handle = ObjectStore::create_object(
2791            &root_store,
2792            &mut transaction,
2793            HandleOptions::default(),
2794            None,
2795        )
2796        .await
2797        .expect("failed to create object");
2798        transaction.commit().await.expect("commit failed");
2799
2800        let object_id = {
2801            let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
2802            transaction.add(
2803                root_store.store_object_id(),
2804                Mutation::replace_or_insert_object(
2805                    ObjectKey::graveyard_attribute_entry(
2806                        root_store.graveyard_directory_object_id(),
2807                        handle.object_id(),
2808                        FSVERITY_MERKLE_ATTRIBUTE_ID,
2809                    ),
2810                    ObjectValue::Some,
2811                ),
2812            );
2813
2814            // This write should span three transactions. This test mimics the behavior when the
2815            // last transaction gets interrupted by a filesystem.close().
2816            handle
2817                .write_new_attr_in_batches(
2818                    &mut transaction,
2819                    FSVERITY_MERKLE_ATTRIBUTE_ID,
2820                    &vec![0; 2 * WRITE_ATTR_BATCH_SIZE],
2821                    WRITE_ATTR_BATCH_SIZE,
2822                )
2823                .await
2824                .expect("failed to write merkle attribute");
2825
2826            handle.object_id()
2827            // Drop the transaction to simulate interrupting the merkle tree creation as well as to
2828            // release the transaction locks.
2829        };
2830
2831        fs.close().await.expect("failed to close filesystem");
2832        let device = fs.take_device().await;
2833        device.reopen(false);
2834
2835        let fs =
2836            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
2837        fsck(fs.clone()).await.expect("fsck failed");
2838        fs.close().await.expect("failed to close filesystem");
2839        let device = fs.take_device().await;
2840        device.reopen(false);
2841
2842        // On open, the filesystem will call initial_reap which will call queue_tombstone().
2843        let fs = FxFilesystem::open(device).await.expect("open failed");
2844        let root_store = fs.root_store();
2845        let handle =
2846            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
2847                .await
2848                .expect("open_object failed");
2849        handle
2850            .enable_verity(fio::VerificationOptions {
2851                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2852                salt: Some(vec![]),
2853                ..Default::default()
2854            })
2855            .await
2856            .expect("set verified file metadata failed");
2857
2858        // `flush` will ensure that initial reap fully processes all the graveyard entries. This
2859        // isn't strictly necessary for the test to pass (the graveyard marker was already
2860        // processed during `enable_verity`), but it does help catch bugs, such as the attribute
2861        // graveyard entry not being removed upon processing.
2862        fs.graveyard().flush().await;
2863        assert_eq!(
2864            handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
2865            Some(vec![0; <Sha256 as Hasher>::Digest::DIGEST_LEN].into())
2866        );
2867        fsck(fs.clone()).await.expect("fsck failed");
2868        fs.close().await.expect("Close failed");
2869    }
2870
2871    #[fuchsia::test]
2872    async fn test_verify_data_corrupt_file() {
2873        let fs: OpenFxFilesystem = test_filesystem().await;
2874        let mut transaction = fs
2875            .clone()
2876            .new_transaction(lock_keys![], Options::default())
2877            .await
2878            .expect("new_transaction failed");
2879        let store = fs.root_store();
2880        let object = Arc::new(
2881            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2882                .await
2883                .expect("create_object failed"),
2884        );
2885
2886        transaction.commit().await.unwrap();
2887
2888        let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
2889        buf.as_mut_slice().fill(123);
2890        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2891
2892        object
2893            .enable_verity(fio::VerificationOptions {
2894                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2895                salt: Some(vec![]),
2896                ..Default::default()
2897            })
2898            .await
2899            .expect("set verified file metadata failed");
2900
2901        // Change file contents and ensure verification fails
2902        buf.as_mut_slice().fill(234);
2903        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2904        object.read(0, buf.as_mut()).await.expect_err("verification during read should fail");
2905
2906        fs.close().await.expect("Close failed");
2907    }
2908
2909    #[fuchsia::test]
2910    async fn test_verify_data_corrupt_tree() {
2911        let fs: OpenFxFilesystem = test_filesystem().await;
2912        let object_id = {
2913            let store = fs.root_store();
2914            let mut transaction = fs
2915                .clone()
2916                .new_transaction(lock_keys![], Options::default())
2917                .await
2918                .expect("new_transaction failed");
2919            let object = Arc::new(
2920                ObjectStore::create_object(
2921                    &store,
2922                    &mut transaction,
2923                    HandleOptions::default(),
2924                    None,
2925                )
2926                .await
2927                .expect("create_object failed"),
2928            );
2929            let object_id = object.object_id();
2930
2931            transaction.commit().await.unwrap();
2932
2933            let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
2934            buf.as_mut_slice().fill(123);
2935            object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2936
2937            object
2938                .enable_verity(fio::VerificationOptions {
2939                    hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2940                    salt: Some(vec![]),
2941                    ..Default::default()
2942                })
2943                .await
2944                .expect("set verified file metadata failed");
2945            object.read(0, buf.as_mut()).await.expect("verified read");
2946
2947            // Corrupt the merkle tree before closing.
2948            let mut merkle = object
2949                .read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID)
2950                .await
2951                .unwrap()
2952                .expect("Reading merkle tree");
2953            merkle[0] = merkle[0].wrapping_add(1);
2954            object
2955                .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &*merkle)
2956                .await
2957                .expect("Overwriting merkle");
2958
2959            object_id
2960        }; // Close object.
2961
2962        // Reopening the object should complain about the corrupted merkle tree.
2963        assert!(
2964            ObjectStore::open_object(&fs.root_store(), object_id, HandleOptions::default(), None)
2965                .await
2966                .is_err()
2967        );
2968        fs.close().await.expect("Close failed");
2969    }
2970
2971    #[fuchsia::test]
2972    async fn test_extend() {
2973        let fs = test_filesystem().await;
2974        let handle;
2975        let mut transaction = fs
2976            .clone()
2977            .new_transaction(lock_keys![], Options::default())
2978            .await
2979            .expect("new_transaction failed");
2980        let store = fs.root_store();
2981        handle =
2982            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2983                .await
2984                .expect("create_object failed");
2985
2986        // As of writing, an empty filesystem has two 512kiB superblock extents and a little over
2987        // 256kiB of additional allocations (journal, etc) so we start use a 'magic' starting point
2988        // of 2MiB here.
2989        const START_OFFSET: u64 = 2048 * 1024;
2990        handle
2991            .extend(&mut transaction, START_OFFSET..START_OFFSET + 5 * fs.block_size() as u64)
2992            .await
2993            .expect("extend failed");
2994        transaction.commit().await.expect("commit failed");
2995        let mut buf = handle.allocate_buffer(5 * fs.block_size() as usize).await;
2996        buf.as_mut_slice().fill(123);
2997        handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2998        buf.as_mut_slice().fill(67);
2999        handle.read(0, buf.as_mut()).await.expect("read failed");
3000        assert_eq!(buf.as_slice(), &vec![123; 5 * fs.block_size() as usize]);
3001        fs.close().await.expect("Close failed");
3002    }
3003
3004    #[fuchsia::test]
3005    async fn test_truncate_deallocates_old_extents() {
3006        let (fs, object) = test_filesystem_and_object().await;
3007        let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
3008        buf.as_mut_slice().fill(0xaa);
3009        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3010
3011        let allocator = fs.allocator();
3012        let allocated_before = allocator.get_allocated_bytes();
3013        object.truncate(fs.block_size() as u64).await.expect("truncate failed");
3014        let allocated_after = allocator.get_allocated_bytes();
3015        assert!(
3016            allocated_after < allocated_before,
3017            "before = {} after = {}",
3018            allocated_before,
3019            allocated_after
3020        );
3021        fs.close().await.expect("Close failed");
3022    }
3023
3024    #[fuchsia::test]
3025    async fn test_truncate_zeroes_tail_block() {
3026        let (fs, object) = test_filesystem_and_object().await;
3027
3028        WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + 3).await.expect("truncate failed");
3029        WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + TEST_DATA.len() as u64)
3030            .await
3031            .expect("truncate failed");
3032
3033        let mut buf = object.allocate_buffer(fs.block_size() as usize).await;
3034        let offset = (TEST_DATA_OFFSET % fs.block_size()) as usize;
3035        object.read(TEST_DATA_OFFSET - offset as u64, buf.as_mut()).await.expect("read failed");
3036
3037        let mut expected = TEST_DATA.to_vec();
3038        expected[3..].fill(0);
3039        assert_eq!(&buf.as_slice()[offset..offset + expected.len()], &expected);
3040    }
3041
3042    #[fuchsia::test]
3043    async fn test_trim() {
3044        // Format a new filesystem.
3045        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
3046        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
3047        let block_size = fs.block_size();
3048        root_volume(fs.clone())
3049            .await
3050            .expect("root_volume failed")
3051            .new_volume("test", NewChildStoreOptions::default())
3052            .await
3053            .expect("volume failed");
3054        fs.close().await.expect("close failed");
3055        let device = fs.take_device().await;
3056        device.reopen(false);
3057
3058        // To test trim, we open the filesystem and set up a post commit hook that runs after every
3059        // transaction.  When the hook triggers, we can fsck the volume, take a snapshot of the
3060        // device and check that it gets replayed correctly on the snapshot.  We can check that the
3061        // graveyard trims the file as expected.
3062        #[derive(Default)]
3063        struct Context {
3064            store: Option<Arc<ObjectStore>>,
3065            object_id: Option<u64>,
3066        }
3067        let shared_context = Arc::new(Mutex::new(Context::default()));
3068
3069        let object_size = (TRANSACTION_MUTATION_THRESHOLD as u64 + 10) * 2 * block_size;
3070
3071        // Wait for an object to get tombstoned by the graveyard.
3072        async fn expect_tombstoned(store: &Arc<ObjectStore>, object_id: u64) {
3073            loop {
3074                if let Err(e) =
3075                    ObjectStore::open_object(store, object_id, HandleOptions::default(), None).await
3076                {
3077                    assert!(
3078                        FxfsError::NotFound.matches(&e),
3079                        "open_object didn't fail with NotFound: {:?}",
3080                        e
3081                    );
3082                    break;
3083                }
3084                // The graveyard should eventually tombstone the object.
3085                fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3086            }
3087        }
3088
3089        // Checks to see if the object needs to be trimmed.
3090        async fn needs_trim(store: &Arc<ObjectStore>) -> Option<DataObjectHandle<ObjectStore>> {
3091            let root_directory = Directory::open(store, store.root_directory_object_id())
3092                .await
3093                .expect("open failed");
3094            let oid = root_directory.lookup("foo").await.expect("lookup failed");
3095            if let Some((oid, _, _)) = oid {
3096                let object = ObjectStore::open_object(store, oid, HandleOptions::default(), None)
3097                    .await
3098                    .expect("open_object failed");
3099                let props = object.get_properties().await.expect("get_properties failed");
3100                if props.allocated_size > 0 && props.data_attribute_size == 0 {
3101                    Some(object)
3102                } else {
3103                    None
3104                }
3105            } else {
3106                None
3107            }
3108        }
3109
3110        let shared_context_clone = shared_context.clone();
3111        let post_commit = move || {
3112            let store = shared_context_clone.lock().store.as_ref().cloned().unwrap();
3113            let shared_context = shared_context_clone.clone();
3114            async move {
3115                // First run fsck on the current filesystem.
3116                let options = FsckOptions {
3117                    fail_on_warning: true,
3118                    no_lock: true,
3119                    on_error: Box::new(|err| println!("fsck error: {:?}", err)),
3120                    ..Default::default()
3121                };
3122                let fs = store.filesystem();
3123
3124                fsck_with_options(fs.clone(), &options).await.expect("fsck_with_options failed");
3125                fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
3126                    .await
3127                    .expect("fsck_volume_with_options failed");
3128
3129                // Now check that we can replay this correctly.
3130                fs.sync(SyncOptions { flush_device: true, ..Default::default() })
3131                    .await
3132                    .expect("sync failed");
3133                let device = fs.device().snapshot().expect("snapshot failed");
3134
3135                let object_id = shared_context.lock().object_id.clone();
3136
3137                let fs2 = FxFilesystemBuilder::new()
3138                    .skip_initial_reap(object_id.is_none())
3139                    .open(device)
3140                    .await
3141                    .expect("open failed");
3142
3143                // If the "foo" file exists check that allocated size matches content size.
3144                let root_vol = root_volume(fs2.clone()).await.expect("root_volume failed");
3145                let store =
3146                    root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
3147
3148                if let Some(oid) = object_id {
3149                    // For the second pass, the object should get tombstoned.
3150                    expect_tombstoned(&store, oid).await;
3151                } else if let Some(object) = needs_trim(&store).await {
3152                    // Extend the file and make sure that it is correctly trimmed.
3153                    object.truncate(object_size).await.expect("truncate failed");
3154                    let mut buf = object.allocate_buffer(block_size as usize).await;
3155                    object
3156                        .read(object_size - block_size * 2, buf.as_mut())
3157                        .await
3158                        .expect("read failed");
3159                    assert_eq!(buf.as_slice(), &vec![0; block_size as usize]);
3160
3161                    // Remount, this time with the graveyard performing an initial reap and the
3162                    // object should get trimmed.
3163                    let fs = FxFilesystem::open(fs.device().snapshot().expect("snapshot failed"))
3164                        .await
3165                        .expect("open failed");
3166                    let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
3167                    let store = root_vol
3168                        .volume("test", StoreOptions::default())
3169                        .await
3170                        .expect("volume failed");
3171                    while needs_trim(&store).await.is_some() {
3172                        // The object has been truncated, but still has some data allocated to
3173                        // it.  The graveyard should trim the object eventually.
3174                        fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3175                    }
3176
3177                    // Run fsck.
3178                    fsck_with_options(fs.clone(), &options)
3179                        .await
3180                        .expect("fsck_with_options failed");
3181                    fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
3182                        .await
3183                        .expect("fsck_volume_with_options failed");
3184                    fs.close().await.expect("close failed");
3185                }
3186
3187                // Run fsck on fs2.
3188                fsck_with_options(fs2.clone(), &options).await.expect("fsck_with_options failed");
3189                fsck_volume_with_options(fs2.as_ref(), &options, store.store_object_id(), None)
3190                    .await
3191                    .expect("fsck_volume_with_options failed");
3192                fs2.close().await.expect("close failed");
3193            }
3194            .boxed()
3195        };
3196
3197        let fs = FxFilesystemBuilder::new()
3198            .post_commit_hook(post_commit)
3199            .open(device)
3200            .await
3201            .expect("open failed");
3202
3203        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
3204        let store = root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
3205
3206        shared_context.lock().store = Some(store.clone());
3207
3208        let root_directory =
3209            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3210
3211        let object;
3212        let mut transaction = fs
3213            .clone()
3214            .new_transaction(
3215                lock_keys![LockKey::object(
3216                    store.store_object_id(),
3217                    store.root_directory_object_id()
3218                )],
3219                Options::default(),
3220            )
3221            .await
3222            .expect("new_transaction failed");
3223        object = root_directory
3224            .create_child_file(&mut transaction, "foo")
3225            .await
3226            .expect("create_object failed");
3227        transaction.commit().await.expect("commit failed");
3228
3229        let mut transaction = fs
3230            .clone()
3231            .new_transaction(
3232                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3233                Options::default(),
3234            )
3235            .await
3236            .expect("new_transaction failed");
3237
3238        // Two passes: first with a regular object, and then with that object moved into the
3239        // graveyard.
3240        let mut pass = 0;
3241        loop {
3242            // Create enough extents in it such that when we truncate the object it will require
3243            // more than one transaction.
3244            let mut buf = object.allocate_buffer(5).await;
3245            buf.as_mut_slice().fill(1);
3246            // Write every other block.
3247            for offset in (0..object_size).into_iter().step_by(2 * block_size as usize) {
3248                object
3249                    .txn_write(&mut transaction, offset, buf.as_ref())
3250                    .await
3251                    .expect("write failed");
3252            }
3253            transaction.commit().await.expect("commit failed");
3254            // This should take up more than one transaction.
3255            WriteObjectHandle::truncate(&object, 0).await.expect("truncate failed");
3256
3257            if pass == 1 {
3258                break;
3259            }
3260
3261            // Store the object ID so that we can make sure the object is always tombstoned
3262            // after remount (see above).
3263            shared_context.lock().object_id = Some(object.object_id());
3264
3265            transaction = fs
3266                .clone()
3267                .new_transaction(
3268                    lock_keys![
3269                        LockKey::object(store.store_object_id(), store.root_directory_object_id()),
3270                        LockKey::object(store.store_object_id(), object.object_id()),
3271                    ],
3272                    Options::default(),
3273                )
3274                .await
3275                .expect("new_transaction failed");
3276
3277            // Move the object into the graveyard.
3278            replace_child(&mut transaction, None, (&root_directory, "foo"))
3279                .await
3280                .expect("replace_child failed");
3281            store.add_to_graveyard(&mut transaction, object.object_id());
3282
3283            pass += 1;
3284        }
3285
3286        fs.close().await.expect("Close failed");
3287    }
3288
3289    #[fuchsia::test]
3290    async fn test_adjust_refs() {
3291        let (fs, object) = test_filesystem_and_object().await;
3292        let store = object.owner();
3293        let mut transaction = fs
3294            .clone()
3295            .new_transaction(
3296                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3297                Options::default(),
3298            )
3299            .await
3300            .expect("new_transaction failed");
3301        assert_eq!(
3302            store
3303                .adjust_refs(&mut transaction, object.object_id(), 1)
3304                .await
3305                .expect("adjust_refs failed"),
3306            false
3307        );
3308        transaction.commit().await.expect("commit failed");
3309
3310        let allocator = fs.allocator();
3311        let allocated_before = allocator.get_allocated_bytes();
3312        let mut transaction = fs
3313            .clone()
3314            .new_transaction(
3315                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3316                Options::default(),
3317            )
3318            .await
3319            .expect("new_transaction failed");
3320        assert_eq!(
3321            store
3322                .adjust_refs(&mut transaction, object.object_id(), -2)
3323                .await
3324                .expect("adjust_refs failed"),
3325            true
3326        );
3327        transaction.commit().await.expect("commit failed");
3328
3329        assert_eq!(allocator.get_allocated_bytes(), allocated_before);
3330
3331        store
3332            .tombstone_object(
3333                object.object_id(),
3334                Options { borrow_metadata_space: true, ..Default::default() },
3335            )
3336            .await
3337            .expect("purge failed");
3338
3339        assert_eq!(allocated_before - allocator.get_allocated_bytes(), fs.block_size() as u64);
3340
3341        // We need to remove the directory entry, too, otherwise fsck will complain
3342        {
3343            let mut transaction = fs
3344                .clone()
3345                .new_transaction(
3346                    lock_keys![LockKey::object(
3347                        store.store_object_id(),
3348                        store.root_directory_object_id()
3349                    )],
3350                    Options::default(),
3351                )
3352                .await
3353                .expect("new_transaction failed");
3354            let root_directory = Directory::open(&store, store.root_directory_object_id())
3355                .await
3356                .expect("open failed");
3357            transaction.add(
3358                store.store_object_id(),
3359                Mutation::replace_or_insert_object(
3360                    ObjectKey::child(root_directory.object_id(), TEST_OBJECT_NAME, false),
3361                    ObjectValue::None,
3362                ),
3363            );
3364            transaction.commit().await.expect("commit failed");
3365        }
3366
3367        fsck_with_options(
3368            fs.clone(),
3369            &FsckOptions {
3370                fail_on_warning: true,
3371                on_error: Box::new(|err| println!("fsck error: {:?}", err)),
3372                ..Default::default()
3373            },
3374        )
3375        .await
3376        .expect("fsck_with_options failed");
3377
3378        fs.close().await.expect("Close failed");
3379    }
3380
3381    #[fuchsia::test]
3382    async fn test_locks() {
3383        let (fs, object) = test_filesystem_and_object().await;
3384        let (send1, recv1) = channel();
3385        let (send2, recv2) = channel();
3386        let (send3, recv3) = channel();
3387        let done = Mutex::new(false);
3388        let mut futures = FuturesUnordered::new();
3389        futures.push(
3390            async {
3391                let mut t = object.new_transaction().await.expect("new_transaction failed");
3392                send1.send(()).unwrap(); // Tell the next future to continue.
3393                send3.send(()).unwrap(); // Tell the last future to continue.
3394                recv2.await.unwrap();
3395                let mut buf = object.allocate_buffer(5).await;
3396                buf.as_mut_slice().copy_from_slice(b"hello");
3397                object.txn_write(&mut t, 0, buf.as_ref()).await.expect("write failed");
3398                // This is a halting problem so all we can do is sleep.
3399                fasync::Timer::new(Duration::from_millis(100)).await;
3400                assert!(!*done.lock());
3401                t.commit().await.expect("commit failed");
3402            }
3403            .boxed(),
3404        );
3405        futures.push(
3406            async {
3407                recv1.await.unwrap();
3408                // Reads should not block.
3409                let offset = TEST_DATA_OFFSET as usize;
3410                let align = offset % fs.block_size() as usize;
3411                let len = TEST_DATA.len();
3412                let mut buf = object.allocate_buffer(align + len).await;
3413                assert_eq!(
3414                    object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
3415                    align + TEST_DATA.len()
3416                );
3417                assert_eq!(&buf.as_slice()[align..], TEST_DATA);
3418                // Tell the first future to continue.
3419                send2.send(()).unwrap();
3420            }
3421            .boxed(),
3422        );
3423        futures.push(
3424            async {
3425                // This should block until the first future has completed.
3426                recv3.await.unwrap();
3427                let _t = object.new_transaction().await.expect("new_transaction failed");
3428                let mut buf = object.allocate_buffer(5).await;
3429                assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 5);
3430                assert_eq!(buf.as_slice(), b"hello");
3431            }
3432            .boxed(),
3433        );
3434        while let Some(()) = futures.next().await {}
3435        fs.close().await.expect("Close failed");
3436    }
3437
3438    #[fuchsia::test(threads = 10)]
3439    async fn test_racy_reads() {
3440        let fs = test_filesystem().await;
3441        let object;
3442        let mut transaction = fs
3443            .clone()
3444            .new_transaction(lock_keys![], Options::default())
3445            .await
3446            .expect("new_transaction failed");
3447        let store = fs.root_store();
3448        object = Arc::new(
3449            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3450                .await
3451                .expect("create_object failed"),
3452        );
3453        transaction.commit().await.expect("commit failed");
3454        for _ in 0..100 {
3455            let cloned_object = object.clone();
3456            let writer = fasync::Task::spawn(async move {
3457                let mut buf = cloned_object.allocate_buffer(10).await;
3458                buf.as_mut_slice().fill(123);
3459                cloned_object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3460            });
3461            let cloned_object = object.clone();
3462            let reader = fasync::Task::spawn(async move {
3463                let wait_time = rand::random_range(0..5);
3464                fasync::Timer::new(Duration::from_millis(wait_time)).await;
3465                let mut buf = cloned_object.allocate_buffer(10).await;
3466                buf.as_mut_slice().fill(23);
3467                let amount = cloned_object.read(0, buf.as_mut()).await.expect("write failed");
3468                // If we succeed in reading data, it must include the write; i.e. if we see the size
3469                // change, we should see the data too.  For this to succeed it requires locking on
3470                // the read size to ensure that when we read the size, we get the extents changed in
3471                // that same transaction.
3472                if amount != 0 {
3473                    assert_eq!(amount, 10);
3474                    assert_eq!(buf.as_slice(), &[123; 10]);
3475                }
3476            });
3477            writer.await;
3478            reader.await;
3479            object.truncate(0).await.expect("truncate failed");
3480        }
3481        fs.close().await.expect("Close failed");
3482    }
3483
3484    #[fuchsia::test]
3485    async fn test_allocated_size() {
3486        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
3487
3488        let before = object.get_properties().await.expect("get_properties failed").allocated_size;
3489        let mut buf = object.allocate_buffer(5).await;
3490        buf.as_mut_slice().copy_from_slice(b"hello");
3491        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3492        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3493        assert_eq!(after, before + fs.block_size() as u64);
3494
3495        // Do the same write again and there should be no change.
3496        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3497        assert_eq!(
3498            object.get_properties().await.expect("get_properties failed").allocated_size,
3499            after
3500        );
3501
3502        // extend...
3503        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3504        let offset = 1000 * fs.block_size() as u64;
3505        let before = after;
3506        object
3507            .extend(&mut transaction, offset..offset + fs.block_size() as u64)
3508            .await
3509            .expect("extend failed");
3510        transaction.commit().await.expect("commit failed");
3511        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3512        assert_eq!(after, before + fs.block_size() as u64);
3513
3514        // truncate...
3515        let before = after;
3516        let size = object.get_size();
3517        object.truncate(size - fs.block_size() as u64).await.expect("extend failed");
3518        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3519        assert_eq!(after, before - fs.block_size() as u64);
3520
3521        // preallocate_range...
3522        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3523        let before = after;
3524        let mut file_range = offset..offset + fs.block_size() as u64;
3525        object.preallocate_range(&mut transaction, &mut file_range).await.expect("extend failed");
3526        transaction.commit().await.expect("commit failed");
3527        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3528        assert_eq!(after, before + fs.block_size() as u64);
3529        fs.close().await.expect("Close failed");
3530    }
3531
3532    #[fuchsia::test(threads = 10)]
3533    async fn test_zero() {
3534        let (fs, object) = test_filesystem_and_object().await;
3535        let expected_size = object.get_size();
3536        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3537        object.zero(&mut transaction, 0..fs.block_size() as u64 * 10).await.expect("zero failed");
3538        transaction.commit().await.expect("commit failed");
3539        assert_eq!(object.get_size(), expected_size);
3540        let mut buf = object.allocate_buffer(fs.block_size() as usize * 10).await;
3541        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed") as u64, expected_size);
3542        assert_eq!(
3543            &buf.as_slice()[0..expected_size as usize],
3544            vec![0u8; expected_size as usize].as_slice()
3545        );
3546        fs.close().await.expect("Close failed");
3547    }
3548
3549    #[fuchsia::test]
3550    async fn test_properties() {
3551        let (fs, object) = test_filesystem_and_object().await;
3552        const CRTIME: Timestamp = Timestamp::from_nanos(1234);
3553        const MTIME: Timestamp = Timestamp::from_nanos(5678);
3554        const CTIME: Timestamp = Timestamp::from_nanos(8765);
3555
3556        // ObjectProperties can be updated through `update_attributes`.
3557        // `get_properties` should reflect the latest changes.
3558        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3559        object
3560            .update_attributes(
3561                &mut transaction,
3562                Some(&fio::MutableNodeAttributes {
3563                    creation_time: Some(CRTIME.as_nanos()),
3564                    modification_time: Some(MTIME.as_nanos()),
3565                    mode: Some(111),
3566                    gid: Some(222),
3567                    ..Default::default()
3568                }),
3569                None,
3570            )
3571            .await
3572            .expect("update_attributes failed");
3573        const MTIME_NEW: Timestamp = Timestamp::from_nanos(12345678);
3574        object
3575            .update_attributes(
3576                &mut transaction,
3577                Some(&fio::MutableNodeAttributes {
3578                    modification_time: Some(MTIME_NEW.as_nanos()),
3579                    gid: Some(333),
3580                    rdev: Some(444),
3581                    ..Default::default()
3582                }),
3583                Some(CTIME),
3584            )
3585            .await
3586            .expect("update_timestamps failed");
3587        transaction.commit().await.expect("commit failed");
3588
3589        let properties = object.get_properties().await.expect("get_properties failed");
3590        assert_matches!(
3591            properties,
3592            ObjectProperties {
3593                refs: 1u64,
3594                allocated_size: TEST_OBJECT_ALLOCATED_SIZE,
3595                data_attribute_size: TEST_OBJECT_SIZE,
3596                creation_time: CRTIME,
3597                modification_time: MTIME_NEW,
3598                posix_attributes: Some(PosixAttributes { mode: 111, gid: 333, rdev: 444, .. }),
3599                change_time: CTIME,
3600                ..
3601            }
3602        );
3603        fs.close().await.expect("Close failed");
3604    }
3605
3606    #[fuchsia::test]
3607    async fn test_is_allocated() {
3608        let (fs, object) = test_filesystem_and_object().await;
3609
3610        // `test_filesystem_and_object()` wrote the buffer `TEST_DATA` to the device at offset
3611        // `TEST_DATA_OFFSET` where the length and offset are aligned to the block size.
3612        let aligned_offset = round_down(TEST_DATA_OFFSET, fs.block_size());
3613        let aligned_length = round_up(TEST_DATA.len() as u64, fs.block_size()).unwrap();
3614
3615        // Check for the case where where we have the following extent layout
3616        //       [ unallocated ][ `TEST_DATA` ]
3617        // The extents before `aligned_offset` should not be allocated
3618        let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3619        assert_eq!(count, aligned_offset);
3620        assert_eq!(allocated, false);
3621
3622        let (allocated, count) =
3623            object.is_allocated(aligned_offset).await.expect("is_allocated failed");
3624        assert_eq!(count, aligned_length);
3625        assert_eq!(allocated, true);
3626
3627        // Check for the case where where we query out of range
3628        let end = aligned_offset + aligned_length;
3629        object
3630            .is_allocated(end)
3631            .await
3632            .expect_err("is_allocated should have returned ERR_OUT_OF_RANGE");
3633
3634        // Check for the case where where we start querying for allocation starting from
3635        // an allocated range to the end of the device
3636        let size = 50 * fs.block_size() as u64;
3637        object.truncate(size).await.expect("extend failed");
3638
3639        let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3640        assert_eq!(count, size - end);
3641        assert_eq!(allocated, false);
3642
3643        // Check for the case where where we have the following extent layout
3644        //      [ unallocated ][ `buf` ][ `buf` ]
3645        let buf_length = 5 * fs.block_size();
3646        let mut buf = object.allocate_buffer(buf_length as usize).await;
3647        buf.as_mut_slice().fill(123);
3648        let new_offset = end + 20 * fs.block_size() as u64;
3649        object.write_or_append(Some(new_offset), buf.as_ref()).await.expect("write failed");
3650        object
3651            .write_or_append(Some(new_offset + buf_length), buf.as_ref())
3652            .await
3653            .expect("write failed");
3654
3655        let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3656        assert_eq!(count, new_offset - end);
3657        assert_eq!(allocated, false);
3658
3659        let (allocated, count) =
3660            object.is_allocated(new_offset).await.expect("is_allocated failed");
3661        assert_eq!(count, 2 * buf_length);
3662        assert_eq!(allocated, true);
3663
3664        // Check the case where we query from the middle of an extent
3665        let (allocated, count) = object
3666            .is_allocated(new_offset + 4 * fs.block_size())
3667            .await
3668            .expect("is_allocated failed");
3669        assert_eq!(count, 2 * buf_length - 4 * fs.block_size());
3670        assert_eq!(allocated, true);
3671
3672        // Now, write buffer to a location already written to.
3673        // Check for the case when we the following extent layout
3674        //      [ unallocated ][ `other_buf` ][ (part of) `buf` ][ `buf` ]
3675        let other_buf_length = 3 * fs.block_size();
3676        let mut other_buf = object.allocate_buffer(other_buf_length as usize).await;
3677        other_buf.as_mut_slice().fill(231);
3678        object.write_or_append(Some(new_offset), other_buf.as_ref()).await.expect("write failed");
3679
3680        // We still expect that `is_allocated(..)` will return that  there are 2*`buf_length bytes`
3681        // allocated from `new_offset`
3682        let (allocated, count) =
3683            object.is_allocated(new_offset).await.expect("is_allocated failed");
3684        assert_eq!(count, 2 * buf_length);
3685        assert_eq!(allocated, true);
3686
3687        // Check for the case when we the following extent layout
3688        //   [ unallocated ][ deleted ][ unallocated ][ deleted ][ allocated ]
3689        // Mark TEST_DATA as deleted
3690        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3691        object
3692            .zero(&mut transaction, aligned_offset..aligned_offset + aligned_length)
3693            .await
3694            .expect("zero failed");
3695        // Mark `other_buf` as deleted
3696        object
3697            .zero(&mut transaction, new_offset..new_offset + buf_length)
3698            .await
3699            .expect("zero failed");
3700        transaction.commit().await.expect("commit transaction failed");
3701
3702        let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3703        assert_eq!(count, new_offset + buf_length);
3704        assert_eq!(allocated, false);
3705
3706        let (allocated, count) =
3707            object.is_allocated(new_offset + buf_length).await.expect("is_allocated failed");
3708        assert_eq!(count, buf_length);
3709        assert_eq!(allocated, true);
3710
3711        let new_end = new_offset + buf_length + count;
3712
3713        // Check for the case where there are objects with different keys.
3714        // Case that we're checking for:
3715        //      [ unallocated ][ extent (object with different key) ][ unallocated ]
3716        let store = object.owner();
3717        let mut transaction = fs
3718            .clone()
3719            .new_transaction(lock_keys![], Options::default())
3720            .await
3721            .expect("new_transaction failed");
3722        let object2 =
3723            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3724                .await
3725                .expect("create_object failed");
3726        transaction.commit().await.expect("commit failed");
3727
3728        object2
3729            .write_or_append(Some(new_end + fs.block_size()), buf.as_ref())
3730            .await
3731            .expect("write failed");
3732
3733        // Expecting that the extent with a different key is treated like unallocated extent
3734        let (allocated, count) = object.is_allocated(new_end).await.expect("is_allocated failed");
3735        assert_eq!(count, size - new_end);
3736        assert_eq!(allocated, false);
3737
3738        fs.close().await.expect("close failed");
3739    }
3740
3741    #[fuchsia::test(threads = 10)]
3742    async fn test_read_write_attr() {
3743        let (_fs, object) = test_filesystem_and_object().await;
3744        let data = [0xffu8; 16_384];
3745        object.write_attr(20, &data).await.expect("write_attr failed");
3746        let rdata =
3747            object.read_attr(20).await.expect("read_attr failed").expect("no attribute data found");
3748        assert_eq!(&data[..], &rdata[..]);
3749
3750        assert_eq!(object.read_attr(21).await.expect("read_attr failed"), None);
3751    }
3752
3753    #[fuchsia::test(threads = 10)]
3754    async fn test_allocate_basic() {
3755        let (fs, object) = test_filesystem_and_empty_object().await;
3756        let block_size = fs.block_size();
3757        let file_size = block_size * 10;
3758        object.truncate(file_size).await.unwrap();
3759
3760        let small_buf_size = 1024;
3761        let large_buf_aligned_size = block_size as usize * 2;
3762        let large_buf_size = block_size as usize * 2 + 1024;
3763
3764        let mut small_buf = object.allocate_buffer(small_buf_size).await;
3765        let mut large_buf_aligned = object.allocate_buffer(large_buf_aligned_size).await;
3766        let mut large_buf = object.allocate_buffer(large_buf_size).await;
3767
3768        assert_eq!(object.read(0, small_buf.as_mut()).await.unwrap(), small_buf_size);
3769        assert_eq!(small_buf.as_slice(), &vec![0; small_buf_size]);
3770        assert_eq!(object.read(0, large_buf.as_mut()).await.unwrap(), large_buf_size);
3771        assert_eq!(large_buf.as_slice(), &vec![0; large_buf_size]);
3772        assert_eq!(
3773            object.read(0, large_buf_aligned.as_mut()).await.unwrap(),
3774            large_buf_aligned_size
3775        );
3776        assert_eq!(large_buf_aligned.as_slice(), &vec![0; large_buf_aligned_size]);
3777
3778        // Allocation succeeds, and without any writes to the location it shows up as zero.
3779        object.allocate(block_size..block_size * 3).await.unwrap();
3780
3781        // Test starting before, inside, and after the allocated section with every sized buffer.
3782        for (buf_index, buf) in [small_buf, large_buf, large_buf_aligned].iter_mut().enumerate() {
3783            for offset in 0..4 {
3784                assert_eq!(
3785                    object.read(block_size * offset, buf.as_mut()).await.unwrap(),
3786                    buf.len(),
3787                    "buf_index: {}, read offset: {}",
3788                    buf_index,
3789                    offset,
3790                );
3791                assert_eq!(
3792                    buf.as_slice(),
3793                    &vec![0; buf.len()],
3794                    "buf_index: {}, read offset: {}",
3795                    buf_index,
3796                    offset,
3797                );
3798            }
3799        }
3800
3801        fs.close().await.expect("close failed");
3802    }
3803
3804    #[fuchsia::test(threads = 10)]
3805    async fn test_allocate_extends_file() {
3806        const BUF_SIZE: usize = 1024;
3807        let (fs, object) = test_filesystem_and_empty_object().await;
3808        let mut buf = object.allocate_buffer(BUF_SIZE).await;
3809        let block_size = fs.block_size();
3810
3811        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
3812        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3813
3814        assert!(TEST_OBJECT_SIZE < block_size * 4);
3815        // Allocation succeeds, and without any writes to the location it shows up as zero.
3816        object.allocate(0..block_size * 4).await.unwrap();
3817        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
3818        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3819        assert_eq!(object.read(block_size, buf.as_mut()).await.unwrap(), buf.len());
3820        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3821        assert_eq!(object.read(block_size * 3, buf.as_mut()).await.unwrap(), buf.len());
3822        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3823
3824        fs.close().await.expect("close failed");
3825    }
3826
3827    #[fuchsia::test(threads = 10)]
3828    async fn test_allocate_past_end() {
3829        const BUF_SIZE: usize = 1024;
3830        let (fs, object) = test_filesystem_and_empty_object().await;
3831        let mut buf = object.allocate_buffer(BUF_SIZE).await;
3832        let block_size = fs.block_size();
3833
3834        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
3835        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3836
3837        assert!(TEST_OBJECT_SIZE < block_size * 4);
3838        // Allocation succeeds, and without any writes to the location it shows up as zero.
3839        object.allocate(block_size * 4..block_size * 6).await.unwrap();
3840        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
3841        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3842        assert_eq!(object.read(block_size * 4, buf.as_mut()).await.unwrap(), buf.len());
3843        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3844        assert_eq!(object.read(block_size * 5, buf.as_mut()).await.unwrap(), buf.len());
3845        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3846
3847        fs.close().await.expect("close failed");
3848    }
3849
3850    #[fuchsia::test(threads = 10)]
3851    async fn test_allocate_read_attr() {
3852        let (fs, object) = test_filesystem_and_empty_object().await;
3853        let block_size = fs.block_size();
3854        let file_size = block_size * 4;
3855        object.truncate(file_size).await.unwrap();
3856
3857        let content = object
3858            .read_attr(object.attribute_id())
3859            .await
3860            .expect("failed to read attr")
3861            .expect("attr returned none");
3862        assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
3863
3864        object.allocate(block_size..block_size * 3).await.unwrap();
3865
3866        let content = object
3867            .read_attr(object.attribute_id())
3868            .await
3869            .expect("failed to read attr")
3870            .expect("attr returned none");
3871        assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
3872
3873        fs.close().await.expect("close failed");
3874    }
3875
3876    #[fuchsia::test(threads = 10)]
3877    async fn test_allocate_existing_data() {
3878        struct Case {
3879            written_ranges: Vec<Range<usize>>,
3880            allocate_range: Range<u64>,
3881        }
3882        let cases = [
3883            Case { written_ranges: vec![4..7], allocate_range: 4..7 },
3884            Case { written_ranges: vec![4..7], allocate_range: 3..8 },
3885            Case { written_ranges: vec![4..7], allocate_range: 5..6 },
3886            Case { written_ranges: vec![4..7], allocate_range: 5..8 },
3887            Case { written_ranges: vec![4..7], allocate_range: 3..5 },
3888            Case { written_ranges: vec![0..1, 2..3, 4..5, 6..7, 8..9], allocate_range: 0..10 },
3889            Case { written_ranges: vec![0..2, 4..6, 7..10], allocate_range: 1..8 },
3890        ];
3891
3892        for case in cases {
3893            let (fs, object) = test_filesystem_and_empty_object().await;
3894            let block_size = fs.block_size();
3895            let file_size = block_size * 10;
3896            object.truncate(file_size).await.unwrap();
3897
3898            for write in &case.written_ranges {
3899                let write_len = (write.end - write.start) * block_size as usize;
3900                let mut write_buf = object.allocate_buffer(write_len).await;
3901                write_buf.as_mut_slice().fill(0xff);
3902                assert_eq!(
3903                    object
3904                        .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
3905                        .await
3906                        .unwrap(),
3907                    file_size
3908                );
3909            }
3910
3911            let mut expected_buf = object.allocate_buffer(file_size as usize).await;
3912            assert_eq!(object.read(0, expected_buf.as_mut()).await.unwrap(), expected_buf.len());
3913
3914            object
3915                .allocate(
3916                    case.allocate_range.start * block_size..case.allocate_range.end * block_size,
3917                )
3918                .await
3919                .unwrap();
3920
3921            let mut read_buf = object.allocate_buffer(file_size as usize).await;
3922            assert_eq!(object.read(0, read_buf.as_mut()).await.unwrap(), read_buf.len());
3923            assert_eq!(read_buf.as_slice(), expected_buf.as_slice());
3924
3925            fs.close().await.expect("close failed");
3926        }
3927    }
3928
3929    async fn get_modes(
3930        obj: &DataObjectHandle<ObjectStore>,
3931        mut search_range: Range<u64>,
3932    ) -> Vec<(Range<u64>, ExtentMode)> {
3933        let mut modes = Vec::new();
3934        let store = obj.store();
3935        let tree = store.tree();
3936        let layer_set = tree.layer_set();
3937        let mut merger = layer_set.merger();
3938        let mut iter = merger
3939            .query(Query::FullRange(&ObjectKey::attribute(
3940                obj.object_id(),
3941                0,
3942                AttributeKey::Extent(ExtentKey::search_key_from_offset(search_range.start)),
3943            )))
3944            .await
3945            .unwrap();
3946        loop {
3947            match iter.get() {
3948                Some(ItemRef {
3949                    key:
3950                        ObjectKey {
3951                            object_id,
3952                            data:
3953                                ObjectKeyData::Attribute(
3954                                    attribute_id,
3955                                    AttributeKey::Extent(ExtentKey { range }),
3956                                ),
3957                        },
3958                    value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
3959                    ..
3960                }) if *object_id == obj.object_id() && *attribute_id == 0 => {
3961                    if search_range.end <= range.start {
3962                        break;
3963                    }
3964                    let found_range = std::cmp::max(search_range.start, range.start)
3965                        ..std::cmp::min(search_range.end, range.end);
3966                    search_range.start = found_range.end;
3967                    modes.push((found_range, mode.clone()));
3968                    if search_range.start == search_range.end {
3969                        break;
3970                    }
3971                    iter.advance().await.unwrap();
3972                }
3973                x => panic!("looking for extent record, found this {:?}", x),
3974            }
3975        }
3976        modes
3977    }
3978
3979    async fn assert_all_overwrite(
3980        obj: &DataObjectHandle<ObjectStore>,
3981        mut search_range: Range<u64>,
3982    ) {
3983        let modes = get_modes(obj, search_range.clone()).await;
3984        for mode in modes {
3985            assert_eq!(
3986                mode.0.start, search_range.start,
3987                "missing mode in range {}..{}",
3988                search_range.start, mode.0.start
3989            );
3990            match mode.1 {
3991                ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => (),
3992                m => panic!("mode at range {:?} was not overwrite, instead found {:?}", mode.0, m),
3993            }
3994            assert!(
3995                mode.0.end <= search_range.end,
3996                "mode ends beyond search range (bug in test) - search_range: {:?}, mode: {:?}",
3997                search_range,
3998                mode,
3999            );
4000            search_range.start = mode.0.end;
4001        }
4002        assert_eq!(
4003            search_range.start, search_range.end,
4004            "missing mode in range {:?}",
4005            search_range
4006        );
4007    }
4008
4009    #[fuchsia::test(threads = 10)]
4010    async fn test_multi_overwrite() {
4011        #[derive(Debug)]
4012        struct Case {
4013            pre_writes: Vec<Range<usize>>,
4014            allocate_ranges: Vec<Range<u64>>,
4015            overwrites: Vec<Vec<Range<u64>>>,
4016        }
4017        let cases = [
4018            Case {
4019                pre_writes: Vec::new(),
4020                allocate_ranges: vec![1..3],
4021                overwrites: vec![vec![1..3]],
4022            },
4023            Case {
4024                pre_writes: Vec::new(),
4025                allocate_ranges: vec![0..1, 1..2, 2..3, 3..4],
4026                overwrites: vec![vec![0..4]],
4027            },
4028            Case {
4029                pre_writes: Vec::new(),
4030                allocate_ranges: vec![0..4],
4031                overwrites: vec![vec![0..1], vec![1..2], vec![3..4]],
4032            },
4033            Case {
4034                pre_writes: Vec::new(),
4035                allocate_ranges: vec![0..4],
4036                overwrites: vec![vec![3..4]],
4037            },
4038            Case {
4039                pre_writes: Vec::new(),
4040                allocate_ranges: vec![0..4],
4041                overwrites: vec![vec![3..4], vec![2..3], vec![1..2]],
4042            },
4043            Case {
4044                pre_writes: Vec::new(),
4045                allocate_ranges: vec![1..2, 5..6, 7..8],
4046                overwrites: vec![vec![5..6]],
4047            },
4048            Case {
4049                pre_writes: Vec::new(),
4050                allocate_ranges: vec![1..3],
4051                overwrites: vec![
4052                    vec![1..3],
4053                    vec![1..3],
4054                    vec![1..3],
4055                    vec![1..3],
4056                    vec![1..3],
4057                    vec![1..3],
4058                    vec![1..3],
4059                    vec![1..3],
4060                ],
4061            },
4062            Case {
4063                pre_writes: Vec::new(),
4064                allocate_ranges: vec![0..5],
4065                overwrites: vec![
4066                    vec![1..3],
4067                    vec![1..3],
4068                    vec![1..3],
4069                    vec![1..3],
4070                    vec![1..3],
4071                    vec![1..3],
4072                    vec![1..3],
4073                    vec![1..3],
4074                ],
4075            },
4076            Case {
4077                pre_writes: Vec::new(),
4078                allocate_ranges: vec![0..5],
4079                overwrites: vec![vec![0..2, 2..4, 4..5]],
4080            },
4081            Case {
4082                pre_writes: Vec::new(),
4083                allocate_ranges: vec![0..5, 5..10],
4084                overwrites: vec![vec![1..2, 2..3, 4..7, 7..8]],
4085            },
4086            Case {
4087                pre_writes: Vec::new(),
4088                allocate_ranges: vec![0..4, 6..10],
4089                overwrites: vec![vec![2..3, 7..9]],
4090            },
4091            Case {
4092                pre_writes: Vec::new(),
4093                allocate_ranges: vec![0..10],
4094                overwrites: vec![vec![1..2, 5..10], vec![0..1, 5..10], vec![0..5, 5..10]],
4095            },
4096            Case {
4097                pre_writes: Vec::new(),
4098                allocate_ranges: vec![0..10],
4099                overwrites: vec![vec![0..2, 2..4, 4..6, 6..8, 8..10], vec![0..5, 5..10]],
4100            },
4101            Case {
4102                pre_writes: vec![1..3],
4103                allocate_ranges: vec![1..3],
4104                overwrites: vec![vec![1..3]],
4105            },
4106            Case {
4107                pre_writes: vec![1..3],
4108                allocate_ranges: vec![4..6],
4109                overwrites: vec![vec![5..6]],
4110            },
4111            Case {
4112                pre_writes: vec![1..3],
4113                allocate_ranges: vec![0..4],
4114                overwrites: vec![vec![0..4]],
4115            },
4116            Case {
4117                pre_writes: vec![1..3],
4118                allocate_ranges: vec![2..4],
4119                overwrites: vec![vec![2..4]],
4120            },
4121            Case {
4122                pre_writes: vec![3..5],
4123                allocate_ranges: vec![1..3, 6..7],
4124                overwrites: vec![vec![1..3, 6..7]],
4125            },
4126            Case {
4127                pre_writes: vec![1..3, 5..7, 8..9],
4128                allocate_ranges: vec![0..5],
4129                overwrites: vec![vec![0..2, 2..5], vec![0..5]],
4130            },
4131            Case {
4132                pre_writes: Vec::new(),
4133                allocate_ranges: vec![0..10, 4..6],
4134                overwrites: Vec::new(),
4135            },
4136            Case {
4137                pre_writes: Vec::new(),
4138                allocate_ranges: vec![3..8, 5..10],
4139                overwrites: Vec::new(),
4140            },
4141            Case {
4142                pre_writes: Vec::new(),
4143                allocate_ranges: vec![5..10, 3..8],
4144                overwrites: Vec::new(),
4145            },
4146        ];
4147
4148        for (i, case) in cases.into_iter().enumerate() {
4149            log::info!("running case {} - {:?}", i, case);
4150            let (fs, object) = test_filesystem_and_empty_object().await;
4151            let block_size = fs.block_size();
4152            let file_size = block_size * 10;
4153            object.truncate(file_size).await.unwrap();
4154
4155            for write in case.pre_writes {
4156                let write_len = (write.end - write.start) * block_size as usize;
4157                let mut write_buf = object.allocate_buffer(write_len).await;
4158                write_buf.as_mut_slice().fill(0xff);
4159                assert_eq!(
4160                    object
4161                        .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
4162                        .await
4163                        .unwrap(),
4164                    file_size
4165                );
4166            }
4167
4168            for allocate_range in &case.allocate_ranges {
4169                object
4170                    .allocate(allocate_range.start * block_size..allocate_range.end * block_size)
4171                    .await
4172                    .unwrap();
4173            }
4174
4175            for allocate_range in case.allocate_ranges {
4176                assert_all_overwrite(
4177                    &object,
4178                    allocate_range.start * block_size..allocate_range.end * block_size,
4179                )
4180                .await;
4181            }
4182
4183            for overwrite in case.overwrites {
4184                let mut write_len = 0;
4185                let overwrite = overwrite
4186                    .into_iter()
4187                    .map(|r| {
4188                        write_len += (r.end - r.start) * block_size;
4189                        r.start * block_size..r.end * block_size
4190                    })
4191                    .collect::<Vec<_>>();
4192                let mut write_buf = object.allocate_buffer(write_len as usize).await;
4193                let data = (0..20).cycle().take(write_len as usize).collect::<Vec<_>>();
4194                write_buf.as_mut_slice().copy_from_slice(&data);
4195
4196                let mut expected_buf = object.allocate_buffer(file_size as usize).await;
4197                assert_eq!(
4198                    object.read(0, expected_buf.as_mut()).await.unwrap(),
4199                    expected_buf.len()
4200                );
4201                let expected_buf_slice = expected_buf.as_mut_slice();
4202                let mut data_slice = data.as_slice();
4203                for r in &overwrite {
4204                    let len = r.length().unwrap() as usize;
4205                    let (copy_from, rest) = data_slice.split_at(len);
4206                    expected_buf_slice[r.start as usize..r.end as usize]
4207                        .copy_from_slice(&copy_from);
4208                    data_slice = rest;
4209                }
4210
4211                let mut transaction = object.new_transaction().await.unwrap();
4212                object
4213                    .multi_overwrite(&mut transaction, 0, &overwrite, write_buf.as_mut())
4214                    .await
4215                    .unwrap_or_else(|_| panic!("multi_overwrite error on case {}", i));
4216                // Double check the emitted checksums. We should have one u64 checksum for every
4217                // block we wrote to disk.
4218                let mut checksummed_range_length = 0;
4219                let mut num_checksums = 0;
4220                for (device_range, checksums, _) in transaction.checksums() {
4221                    let range_len = device_range.end - device_range.start;
4222                    let checksums_len = checksums.len() as u64;
4223                    assert_eq!(range_len / checksums_len, block_size);
4224                    checksummed_range_length += range_len;
4225                    num_checksums += checksums_len;
4226                }
4227                assert_eq!(checksummed_range_length, write_len);
4228                assert_eq!(num_checksums, write_len / block_size);
4229                transaction.commit().await.unwrap();
4230
4231                let mut buf = object.allocate_buffer(file_size as usize).await;
4232                assert_eq!(
4233                    object.read(0, buf.as_mut()).await.unwrap(),
4234                    buf.len(),
4235                    "failed length check on case {}",
4236                    i,
4237                );
4238                assert_eq!(buf.as_slice(), expected_buf.as_slice(), "failed on case {}", i);
4239            }
4240
4241            fsck_volume(&fs, object.store().store_object_id(), None).await.expect("fsck failed");
4242            fs.close().await.expect("close failed");
4243        }
4244    }
4245
4246    #[fuchsia::test(threads = 10)]
4247    async fn test_multi_overwrite_mode_updates() {
4248        let (fs, object) = test_filesystem_and_empty_object().await;
4249        let block_size = fs.block_size();
4250        let file_size = block_size * 10;
4251        object.truncate(file_size).await.unwrap();
4252
4253        let mut expected_bitmap = BitVec::from_elem(10, false);
4254
4255        object.allocate(0..10 * block_size).await.unwrap();
4256        assert_eq!(
4257            get_modes(&object, 0..10 * block_size).await,
4258            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4259        );
4260
4261        let mut write_buf = object.allocate_buffer(2 * block_size as usize).await;
4262        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4263        write_buf.as_mut_slice().copy_from_slice(&data);
4264        let mut transaction = object.new_transaction().await.unwrap();
4265        object
4266            .multi_overwrite(
4267                &mut transaction,
4268                0,
4269                &[2 * block_size..4 * block_size],
4270                write_buf.as_mut(),
4271            )
4272            .await
4273            .unwrap();
4274        transaction.commit().await.unwrap();
4275
4276        expected_bitmap.set(2, true);
4277        expected_bitmap.set(3, true);
4278        assert_eq!(
4279            get_modes(&object, 0..10 * block_size).await,
4280            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4281        );
4282
4283        let mut write_buf = object.allocate_buffer(3 * block_size as usize).await;
4284        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4285        write_buf.as_mut_slice().copy_from_slice(&data);
4286        let mut transaction = object.new_transaction().await.unwrap();
4287        object
4288            .multi_overwrite(
4289                &mut transaction,
4290                0,
4291                &[3 * block_size..5 * block_size, 6 * block_size..7 * block_size],
4292                write_buf.as_mut(),
4293            )
4294            .await
4295            .unwrap();
4296        transaction.commit().await.unwrap();
4297
4298        expected_bitmap.set(4, true);
4299        expected_bitmap.set(6, true);
4300        assert_eq!(
4301            get_modes(&object, 0..10 * block_size).await,
4302            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4303        );
4304
4305        let mut write_buf = object.allocate_buffer(6 * block_size as usize).await;
4306        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4307        write_buf.as_mut_slice().copy_from_slice(&data);
4308        let mut transaction = object.new_transaction().await.unwrap();
4309        object
4310            .multi_overwrite(
4311                &mut transaction,
4312                0,
4313                &[
4314                    0..2 * block_size,
4315                    5 * block_size..6 * block_size,
4316                    7 * block_size..10 * block_size,
4317                ],
4318                write_buf.as_mut(),
4319            )
4320            .await
4321            .unwrap();
4322        transaction.commit().await.unwrap();
4323
4324        assert_eq!(
4325            get_modes(&object, 0..10 * block_size).await,
4326            vec![(0..10 * block_size, ExtentMode::Overwrite)]
4327        );
4328
4329        fs.close().await.expect("close failed");
4330    }
4331}