fxfs/object_store/
data_object_handle.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::log::*;
7use crate::lsm_tree::types::{ItemRef, LayerIterator};
8use crate::lsm_tree::Query;
9use crate::object_handle::{
10    ObjectHandle, ObjectProperties, ReadObjectHandle, WriteBytes, WriteObjectHandle,
11};
12use crate::object_store::extent_record::{ExtentKey, ExtentMode, ExtentValue};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::object_record::{
15    AttributeKey, FsverityMetadata, ObjectAttributes, ObjectItem, ObjectKey, ObjectKeyData,
16    ObjectKind, ObjectValue, Timestamp,
17};
18use crate::object_store::store_object_handle::{MaybeChecksums, NeedsTrim};
19use crate::object_store::transaction::{
20    self, lock_keys, AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Operation,
21    Options, Transaction,
22};
23use crate::object_store::{
24    HandleOptions, HandleOwner, RootDigest, StoreObjectHandle, TrimMode, TrimResult,
25    DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID, TRANSACTION_MUTATION_THRESHOLD,
26};
27use crate::range::RangeExt;
28use crate::round::{round_down, round_up};
29use anyhow::{anyhow, bail, ensure, Context, Error};
30use async_trait::async_trait;
31use fidl_fuchsia_io as fio;
32use fsverity_merkle::{FsVerityHasher, FsVerityHasherOptions, MerkleTreeBuilder};
33use fuchsia_sync::Mutex;
34use futures::stream::FuturesUnordered;
35use futures::TryStreamExt;
36use fxfs_trace::trace;
37use mundane::hash::{Digest, Hasher, Sha256, Sha512};
38use std::cmp::min;
39use std::ops::{Deref, DerefMut, Range};
40use std::sync::atomic::{self, AtomicU64, Ordering};
41use std::sync::Arc;
42use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
43
44mod allocated_ranges;
45pub use allocated_ranges::{AllocatedRanges, RangeType};
46
47/// How much data each transaction will cover when writing an attribute across batches. Pulled from
48/// `FLUSH_BATCH_SIZE` in paged_object_handle.rs.
49pub const WRITE_ATTR_BATCH_SIZE: usize = 524_288;
50
51/// DataObjectHandle is a typed handle for file-like objects that store data in the default data
52/// attribute. In addition to traditional files, this means things like the journal, superblocks,
53/// and layer files.
54///
55/// It caches the content size of the data attribute it was configured for, and has helpers for
56/// complex extent manipulation, as well as implementations of ReadObjectHandle and
57/// WriteObjectHandle.
58pub struct DataObjectHandle<S: HandleOwner> {
59    handle: StoreObjectHandle<S>,
60    attribute_id: u64,
61    content_size: AtomicU64,
62    fsverity_state: Mutex<FsverityState>,
63    overwrite_ranges: AllocatedRanges,
64}
65
66#[derive(Debug)]
67pub enum FsverityState {
68    None,
69    Started,
70    Pending(FsverityStateInner),
71    Some(FsverityStateInner),
72}
73
74#[derive(Debug)]
75pub struct FsverityStateInner {
76    descriptor: FsverityMetadata,
77    // TODO(b/309656632): This should store the entire merkle tree and not just the leaf nodes.
78    // Potentially store a pager-backed vmo instead of passing around a boxed array.
79    merkle_tree: Box<[u8]>,
80}
81
82impl FsverityStateInner {
83    pub fn new(descriptor: FsverityMetadata, merkle_tree: Box<[u8]>) -> Self {
84        FsverityStateInner { descriptor, merkle_tree }
85    }
86}
87
88impl<S: HandleOwner> Deref for DataObjectHandle<S> {
89    type Target = StoreObjectHandle<S>;
90    fn deref(&self) -> &Self::Target {
91        &self.handle
92    }
93}
94
95impl<S: HandleOwner> DataObjectHandle<S> {
96    pub fn new(
97        owner: Arc<S>,
98        object_id: u64,
99        permanent_keys: bool,
100        attribute_id: u64,
101        size: u64,
102        fsverity_state: FsverityState,
103        options: HandleOptions,
104        trace: bool,
105        overwrite_ranges: Vec<Range<u64>>,
106    ) -> Self {
107        Self {
108            handle: StoreObjectHandle::new(owner, object_id, permanent_keys, options, trace),
109            attribute_id,
110            content_size: AtomicU64::new(size),
111            fsverity_state: Mutex::new(fsverity_state),
112            overwrite_ranges: AllocatedRanges::new(overwrite_ranges),
113        }
114    }
115
116    pub fn attribute_id(&self) -> u64 {
117        self.attribute_id
118    }
119
120    pub fn overwrite_ranges(&self) -> &AllocatedRanges {
121        &self.overwrite_ranges
122    }
123
124    pub fn is_verified_file(&self) -> bool {
125        matches!(*self.fsverity_state.lock(), FsverityState::Some(_))
126    }
127
128    /// Sets `self.fsverity_state` to FsverityState::Started. Called at the top of `enable_verity`.
129    /// If another caller has already started but not completed `enabled_verity`, returns
130    /// FxfsError::AlreadyBound. If another caller has already completed `enable_verity`, returns
131    /// FxfsError::AlreadyExists.
132    pub fn set_fsverity_state_started(&self) -> Result<(), Error> {
133        let mut fsverity_guard = self.fsverity_state.lock();
134        match *fsverity_guard {
135            FsverityState::None => {
136                *fsverity_guard = FsverityState::Started;
137                Ok(())
138            }
139            FsverityState::Started | FsverityState::Pending(_) => {
140                Err(anyhow!(FxfsError::Unavailable))
141            }
142            FsverityState::Some(_) => Err(anyhow!(FxfsError::AlreadyExists)),
143        }
144    }
145
146    /// Sets `self.fsverity_state` to Pending. Must be called before `finalize_fsverity_state()`.
147    /// Asserts that the prior state of `self.fsverity_state` was `FsverityState::Started`.
148    pub fn set_fsverity_state_pending(&self, descriptor: FsverityMetadata, merkle_tree: Box<[u8]>) {
149        let mut fsverity_guard = self.fsverity_state.lock();
150        assert!(matches!(*fsverity_guard, FsverityState::Started));
151        *fsverity_guard = FsverityState::Pending(FsverityStateInner { descriptor, merkle_tree });
152    }
153
154    /// Sets `self.fsverity_state` to Some. Panics if the prior state of `self.fsverity_state` was
155    /// not `FsverityState::Pending(_)`.
156    pub fn finalize_fsverity_state(&self) {
157        let mut fsverity_state_guard = self.fsverity_state.lock();
158        let mut_fsverity_state = fsverity_state_guard.deref_mut();
159        let fsverity_state = std::mem::replace(mut_fsverity_state, FsverityState::None);
160        match fsverity_state {
161            FsverityState::None => panic!("Cannot go from FsverityState::None to Some"),
162            FsverityState::Started => panic!("Cannot go from FsverityState::Started to Some"),
163            FsverityState::Pending(inner) => *mut_fsverity_state = FsverityState::Some(inner),
164            FsverityState::Some(_) => panic!("Fsverity state was already set to Some"),
165        }
166        // Once we finalize the fsverity state, the file is permanently read-only. The in-memory
167        // overwrite ranges tracking is only used for writing, so we don't need them anymore. This
168        // leaves any uninitialized, but allocated, overwrite regions if there are any, rather than
169        // converting them back to sparse regions.
170        self.overwrite_ranges.clear();
171    }
172
173    /// Sets `self.fsverity_state` directly to Some without going through the entire state machine.
174    /// Used to set `self.fsverity_state` on open of a verified file.
175    pub fn set_fsverity_state_some(&self, descriptor: FsverityMetadata, merkle_tree: Box<[u8]>) {
176        let mut fsverity_guard = self.fsverity_state.lock();
177        assert!(matches!(*fsverity_guard, FsverityState::None));
178        *fsverity_guard = FsverityState::Some(FsverityStateInner { descriptor, merkle_tree });
179    }
180
181    /// Verifies contents of `buffer` against the corresponding hashes in the stored merkle tree.
182    /// `offset` is the logical offset in the file that `buffer` starts at. `offset` must be
183    /// block-aligned. Fails on non fsverity-enabled files.
184    async fn verify_data(&self, mut offset: usize, buffer: &[u8]) -> Result<(), Error> {
185        let block_size = self.block_size() as usize;
186        assert!(offset % block_size == 0);
187        let fsverity_state = self.fsverity_state.lock();
188        match &*fsverity_state {
189            FsverityState::None => {
190                Err(anyhow!("Tried to verify read on a non verity-enabled file"))
191            }
192            FsverityState::Started | FsverityState::Pending(_) => Err(anyhow!(
193                "Enable verity has not yet completed, fsverity state: {:?}",
194                &*fsverity_state
195            )),
196            FsverityState::Some(metadata) => {
197                let (hasher, digest_size) = match metadata.descriptor.root_digest {
198                    RootDigest::Sha256(_) => {
199                        let hasher = FsVerityHasher::Sha256(FsVerityHasherOptions::new(
200                            metadata.descriptor.salt.clone(),
201                            block_size,
202                        ));
203                        (hasher, <Sha256 as Hasher>::Digest::DIGEST_LEN)
204                    }
205                    RootDigest::Sha512(_) => {
206                        let hasher = FsVerityHasher::Sha512(FsVerityHasherOptions::new(
207                            metadata.descriptor.salt.clone(),
208                            block_size,
209                        ));
210                        (hasher, <Sha512 as Hasher>::Digest::DIGEST_LEN)
211                    }
212                };
213                let leaf_nodes: Vec<&[u8]> = metadata.merkle_tree.chunks(digest_size).collect();
214                fxfs_trace::duration!(c"fsverity-verify", "len" => buffer.len());
215                // TODO(b/318880297): Consider parallelizing computation.
216                for b in buffer.chunks(block_size) {
217                    ensure!(
218                        hasher.hash_block(b) == leaf_nodes[offset / block_size],
219                        anyhow!(FxfsError::Inconsistent).context("Hash mismatch")
220                    );
221                    offset += block_size;
222                }
223                Ok(())
224            }
225        }
226    }
227
228    /// Extend the file with the given extent.  The only use case for this right now is for files
229    /// that must exist at certain offsets on the device, such as super-blocks.
230    pub async fn extend<'a>(
231        &'a self,
232        transaction: &mut Transaction<'a>,
233        device_range: Range<u64>,
234    ) -> Result<(), Error> {
235        let old_end =
236            round_up(self.txn_get_size(transaction), self.block_size()).ok_or(FxfsError::TooBig)?;
237        let new_size = old_end + device_range.end - device_range.start;
238        self.store()
239            .allocator()
240            .mark_allocated(transaction, self.store().store_object_id(), device_range.clone())
241            .await?;
242        self.txn_update_size(transaction, new_size, None).await?;
243        let key_id = self.get_key(None).await?.map_or(0, |k| k.key_id());
244        transaction.add(
245            self.store().store_object_id,
246            Mutation::merge_object(
247                ObjectKey::extent(self.object_id(), self.attribute_id(), old_end..new_size),
248                ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
249            ),
250        );
251        self.update_allocated_size(transaction, device_range.end - device_range.start, 0).await
252    }
253
254    // Returns a new aligned buffer (reading the head and tail blocks if necessary) with a copy of
255    // the data from `buf`.
256    async fn align_buffer(
257        &self,
258        offset: u64,
259        buf: BufferRef<'_>,
260    ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
261        self.handle.align_buffer(self.attribute_id(), offset, buf).await
262    }
263
264    // Writes potentially unaligned data at `device_offset` and returns checksums if requested. The
265    // data will be encrypted if necessary.
266    // `buf` is mutable as an optimization, since the write may require encryption, we can encrypt
267    // the buffer in-place rather than copying to another buffer if the write is already aligned.
268    async fn write_at(
269        &self,
270        offset: u64,
271        buf: MutableBufferRef<'_>,
272        device_offset: u64,
273    ) -> Result<MaybeChecksums, Error> {
274        self.handle.write_at(self.attribute_id(), offset, buf, None, device_offset).await
275    }
276
277    /// Zeroes the given range.  The range must be aligned.  Returns the amount of data deallocated.
278    pub async fn zero(
279        &self,
280        transaction: &mut Transaction<'_>,
281        range: Range<u64>,
282    ) -> Result<(), Error> {
283        self.handle.zero(transaction, self.attribute_id(), range).await
284    }
285
286    /// The cached value for `self.fsverity_state` is set either in `open_object` or on
287    /// `enable_verity`. If set, translates `self.fsverity_state.descriptor` into an
288    /// fio::VerificationOptions instance and a root hash. Otherwise, returns None.
289    pub async fn get_descriptor(
290        &self,
291    ) -> Result<Option<(fio::VerificationOptions, Vec<u8>)>, Error> {
292        let fsverity_state = self.fsverity_state.lock();
293        match &*fsverity_state {
294            FsverityState::None => Ok(None),
295            FsverityState::Started | FsverityState::Pending(_) => Err(anyhow!(
296                "Enable verity has not yet completed, fsverity state: {:?}",
297                &*fsverity_state
298            )),
299            FsverityState::Some(metadata) => {
300                let (options, root_hash) = match &metadata.descriptor.root_digest {
301                    RootDigest::Sha256(root_hash) => {
302                        let mut root_vec = root_hash.to_vec();
303                        // Need to zero out the rest of the vector so that there's no garbage.
304                        root_vec.extend_from_slice(&[0; 32]);
305                        (
306                            fio::VerificationOptions {
307                                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
308                                salt: Some(metadata.descriptor.salt.clone()),
309                                ..Default::default()
310                            },
311                            root_vec,
312                        )
313                    }
314                    RootDigest::Sha512(root_hash) => (
315                        fio::VerificationOptions {
316                            hash_algorithm: Some(fio::HashAlgorithm::Sha512),
317                            salt: Some(metadata.descriptor.salt.clone()),
318                            ..Default::default()
319                        },
320                        root_hash.clone(),
321                    ),
322                };
323                Ok(Some((options, root_hash)))
324            }
325        }
326    }
327
328    /// Reads the data attribute and computes a merkle tree from the data. The values of the
329    /// parameters required to build the merkle tree are supplied by `descriptor` (i.e. salt,
330    /// hash_algorithm, etc.) Writes the leaf nodes of the merkle tree to an attribute with id
331    /// `FSVERITY_MERKLE_ATTRIBUTE_ID`. Updates the root_hash of the `descriptor` according to the
332    /// computed merkle tree and then replaces the ObjectValue of the data attribute with
333    /// ObjectValue::VerifiedAttribute, which stores the `descriptor` inline.
334    #[trace]
335    pub async fn enable_verity(&self, options: fio::VerificationOptions) -> Result<(), Error> {
336        self.set_fsverity_state_started()?;
337        // If the merkle attribute was tombstoned in the last attempt of `enable_verity`, flushing
338        // the graveyard should process the tombstone before we start rewriting the attribute.
339        if let Some(_) = self
340            .store()
341            .tree()
342            .find(&ObjectKey::graveyard_attribute_entry(
343                self.store().graveyard_directory_object_id(),
344                self.object_id(),
345                FSVERITY_MERKLE_ATTRIBUTE_ID,
346            ))
347            .await?
348        {
349            self.store().filesystem().graveyard().flush().await;
350        }
351        let mut transaction = self.new_transaction().await?;
352        let hash_alg =
353            options.hash_algorithm.ok_or_else(|| anyhow!("No hash algorithm provided"))?;
354        let salt = options.salt.ok_or_else(|| anyhow!("No salt provided"))?;
355        let (root_digest, merkle_tree) = match hash_alg {
356            fio::HashAlgorithm::Sha256 => {
357                let hasher = FsVerityHasher::Sha256(FsVerityHasherOptions::new(
358                    salt.clone(),
359                    self.block_size() as usize,
360                ));
361                let mut builder = MerkleTreeBuilder::new(hasher);
362                let mut offset = 0;
363                let size = self.get_size();
364                // TODO(b/314836822): Consider further tuning the buffer size to optimize
365                // performance. Experimentally, most verity-enabled files are <256K.
366                let mut buf = self.allocate_buffer(64 * self.block_size() as usize).await;
367                while offset < size {
368                    // TODO(b/314842875): Consider optimizations for sparse files.
369                    let read = self.read(offset, buf.as_mut()).await? as u64;
370                    assert!(offset + read <= size);
371                    builder.write(&buf.as_slice()[0..read as usize]);
372                    offset += read;
373                }
374                let tree = builder.finish();
375                let merkle_leaf_nodes: Vec<u8> =
376                    tree.as_ref()[0].iter().flat_map(|x| x.clone()).collect();
377                // TODO(b/314194485): Eventually want streaming writes.
378                // The merkle tree attribute should not require trimming because it should not
379                // exist.
380                self.handle
381                    .write_new_attr_in_batches(
382                        &mut transaction,
383                        FSVERITY_MERKLE_ATTRIBUTE_ID,
384                        &merkle_leaf_nodes,
385                        WRITE_ATTR_BATCH_SIZE,
386                    )
387                    .await?;
388                let root: [u8; 32] = tree.root().try_into().unwrap();
389                (RootDigest::Sha256(root), merkle_leaf_nodes)
390            }
391            fio::HashAlgorithm::Sha512 => {
392                let hasher = FsVerityHasher::Sha512(FsVerityHasherOptions::new(
393                    salt.clone(),
394                    self.block_size() as usize,
395                ));
396                let mut builder = MerkleTreeBuilder::new(hasher);
397                let mut offset = 0;
398                let size = self.get_size();
399                // TODO(b/314836822): Consider further tuning the buffer size to optimize
400                // performance. Experimentally, most verity-enabled files are <256K.
401                let mut buf = self.allocate_buffer(64 * self.block_size() as usize).await;
402                while offset < size {
403                    // TODO(b/314842875): Consider optimizations for sparse files.
404                    let read = self.read(offset, buf.as_mut()).await? as u64;
405                    assert!(offset + read <= size);
406                    builder.write(&buf.as_slice()[0..read as usize]);
407                    offset += read;
408                }
409                let tree = builder.finish();
410                let merkle_leaf_nodes: Vec<u8> =
411                    tree.as_ref()[0].iter().flat_map(|x| x.clone()).collect();
412                // TODO(b/314194485): Eventually want streaming writes.
413                // The merkle tree attribute should not require trimming because it should not
414                // exist.
415                self.handle
416                    .write_new_attr_in_batches(
417                        &mut transaction,
418                        FSVERITY_MERKLE_ATTRIBUTE_ID,
419                        &merkle_leaf_nodes,
420                        WRITE_ATTR_BATCH_SIZE,
421                    )
422                    .await?;
423                (RootDigest::Sha512(tree.root().to_vec()), merkle_leaf_nodes)
424            }
425            _ => {
426                bail!(anyhow!(FxfsError::NotSupported)
427                    .context(format!("hash algorithm not supported")));
428            }
429        };
430        if merkle_tree.len() > WRITE_ATTR_BATCH_SIZE {
431            transaction.add(
432                self.store().store_object_id,
433                Mutation::replace_or_insert_object(
434                    ObjectKey::graveyard_attribute_entry(
435                        self.store().graveyard_directory_object_id(),
436                        self.object_id(),
437                        FSVERITY_MERKLE_ATTRIBUTE_ID,
438                    ),
439                    ObjectValue::None,
440                ),
441            );
442        };
443        let descriptor = FsverityMetadata { root_digest, salt };
444        self.set_fsverity_state_pending(descriptor.clone(), merkle_tree.into());
445        transaction.add_with_object(
446            self.store().store_object_id(),
447            Mutation::replace_or_insert_object(
448                ObjectKey::attribute(
449                    self.object_id(),
450                    DEFAULT_DATA_ATTRIBUTE_ID,
451                    AttributeKey::Attribute,
452                ),
453                ObjectValue::verified_attribute(self.get_size(), descriptor),
454            ),
455            AssocObj::Borrowed(self),
456        );
457        transaction.commit().await?;
458        Ok(())
459    }
460
461    /// Pre-allocate disk space for the given logical file range. If any part of the allocation
462    /// range is beyond the end of the file, the file size is updated.
463    pub async fn allocate(&self, range: Range<u64>) -> Result<(), Error> {
464        debug_assert!(range.start < range.end);
465
466        // It's not required that callers of allocate use block aligned ranges, but we need to make
467        // the extents block aligned. Luckily, fallocate in posix is allowed to allocate more than
468        // what was asked for for block alignment purposes. We just need to make sure that the size
469        // of the file is still the non-block-aligned end of the range if the size was changed.
470        let mut new_range = range.clone();
471        new_range.start = round_down(new_range.start, self.block_size());
472        // NB: FxfsError::TooBig turns into EFBIG when passed through starnix, which is the
473        // required error code when the requested range is larger than the file size.
474        new_range.end = round_up(new_range.end, self.block_size()).ok_or(FxfsError::TooBig)?;
475
476        let mut transaction = self.new_transaction().await?;
477        let mut to_allocate = Vec::new();
478        let mut to_switch = Vec::new();
479        let key_id = self.get_key(None).await?.map_or(0, |k| k.key_id());
480
481        {
482            let tree = &self.store().tree;
483            let layer_set = tree.layer_set();
484            let offset_key = ObjectKey::attribute(
485                self.object_id(),
486                self.attribute_id(),
487                AttributeKey::Extent(ExtentKey::search_key_from_offset(new_range.start)),
488            );
489            let mut merger = layer_set.merger();
490            let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
491
492            loop {
493                match iter.get() {
494                    Some(ItemRef {
495                        key:
496                            ObjectKey {
497                                object_id,
498                                data:
499                                    ObjectKeyData::Attribute(
500                                        attribute_id,
501                                        AttributeKey::Extent(extent_key),
502                                    ),
503                            },
504                        value: ObjectValue::Extent(extent_value),
505                        ..
506                    }) if *object_id == self.object_id()
507                        && *attribute_id == self.attribute_id() =>
508                    {
509                        // If the start of this extent is beyond the end of the range we are
510                        // allocating, we don't have any more work to do.
511                        if new_range.end <= extent_key.range.start {
512                            break;
513                        }
514                        // Add any prefix we might need to allocate.
515                        if new_range.start < extent_key.range.start {
516                            to_allocate.push(new_range.start..extent_key.range.start);
517                            new_range.start = extent_key.range.start;
518                        }
519                        let device_offset = match extent_value {
520                            ExtentValue::None => {
521                                // If the extent value is None, it indicates a deleted extent. In
522                                // that case, we just skip it entirely. By keeping the new_range
523                                // where it is, this section will get included in the new
524                                // allocations.
525                                iter.advance().await?;
526                                continue;
527                            }
528                            ExtentValue::Some { mode: ExtentMode::OverwritePartial(_), .. }
529                            | ExtentValue::Some { mode: ExtentMode::Overwrite, .. } => {
530                                // If this extent is already in overwrite mode, we can skip it.
531                                if extent_key.range.end < new_range.end {
532                                    new_range.start = extent_key.range.end;
533                                    iter.advance().await?;
534                                    continue;
535                                } else {
536                                    new_range.start = new_range.end;
537                                    break;
538                                }
539                            }
540                            ExtentValue::Some { device_offset, .. } => *device_offset,
541                        };
542
543                        // Figure out how we have to break up the ranges.
544                        let device_offset =
545                            device_offset + (new_range.start - extent_key.range.start);
546                        if extent_key.range.end < new_range.end {
547                            to_switch.push((new_range.start..extent_key.range.end, device_offset));
548                            new_range.start = extent_key.range.end;
549                        } else {
550                            to_switch.push((new_range.start..new_range.end, device_offset));
551                            new_range.start = new_range.end;
552                            break;
553                        }
554                    }
555                    // The records are sorted so if we find something that isn't an extent or
556                    // doesn't match the object id then there are no more extent records for this
557                    // object.
558                    _ => break,
559                }
560                iter.advance().await?;
561            }
562        }
563
564        if new_range.start < new_range.end {
565            to_allocate.push(new_range.clone());
566        }
567
568        // We can update the size in the first transaction because even if subsequent transactions
569        // don't get replayed, the data between the current and new end of the file will be zero
570        // (either sparse zero or allocated zero). On the other hand, if we don't update the size
571        // in the first transaction, overwrite extents may be written past the end of the file
572        // which is an fsck error.
573        //
574        // The potential new size needs to be the non-block-aligned range end - we round up to the
575        // nearest block size for the actual allocation, but shouldn't do that for the file size.
576        let new_size = std::cmp::max(range.end, self.get_size());
577        // Make sure the mutation that flips the has_overwrite_extents advisory flag is in the
578        // first transaction, in case we split transactions. This makes it okay to only replay the
579        // first transaction if power loss occurs - the file will be in an unusual state, but not
580        // an invalid one, if only part of the allocate goes through.
581        transaction.add_with_object(
582            self.store().store_object_id(),
583            Mutation::replace_or_insert_object(
584                ObjectKey::attribute(
585                    self.object_id(),
586                    self.attribute_id(),
587                    AttributeKey::Attribute,
588                ),
589                ObjectValue::Attribute { size: new_size, has_overwrite_extents: true },
590            ),
591            AssocObj::Borrowed(self),
592        );
593
594        // The maximum number of mutations we are going to allow per transaction in allocate. This
595        // is probably quite a bit lower than the actual limit, but it should be large enough to
596        // handle most non-edge-case versions of allocate without splitting the transaction.
597        const MAX_TRANSACTION_SIZE: usize = 256;
598        for (switch_range, device_offset) in to_switch {
599            transaction.add_with_object(
600                self.store().store_object_id(),
601                Mutation::merge_object(
602                    ObjectKey::extent(self.object_id(), self.attribute_id(), switch_range),
603                    ObjectValue::Extent(ExtentValue::initialized_overwrite_extent(
604                        device_offset,
605                        key_id,
606                    )),
607                ),
608                AssocObj::Borrowed(self),
609            );
610            if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
611                transaction.commit_and_continue().await?;
612            }
613        }
614
615        let mut allocated = 0;
616        let allocator = self.store().allocator();
617        for mut allocate_range in to_allocate {
618            while allocate_range.start < allocate_range.end {
619                let device_range = allocator
620                    .allocate(
621                        &mut transaction,
622                        self.store().store_object_id(),
623                        allocate_range.end - allocate_range.start,
624                    )
625                    .await
626                    .context("allocation failed")?;
627                let device_range_len = device_range.end - device_range.start;
628
629                transaction.add_with_object(
630                    self.store().store_object_id(),
631                    Mutation::merge_object(
632                        ObjectKey::extent(
633                            self.object_id(),
634                            self.attribute_id(),
635                            allocate_range.start..allocate_range.start + device_range_len,
636                        ),
637                        ObjectValue::Extent(ExtentValue::blank_overwrite_extent(
638                            device_range.start,
639                            (device_range_len / self.block_size()) as usize,
640                            key_id,
641                        )),
642                    ),
643                    AssocObj::Borrowed(self),
644                );
645
646                allocate_range.start += device_range_len;
647                allocated += device_range_len;
648
649                if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
650                    self.update_allocated_size(&mut transaction, allocated, 0).await?;
651                    transaction.commit_and_continue().await?;
652                    allocated = 0;
653                }
654            }
655        }
656
657        self.update_allocated_size(&mut transaction, allocated, 0).await?;
658        transaction.commit().await?;
659
660        Ok(())
661    }
662
663    /// Return information on a contiguous set of extents that has the same allocation status,
664    /// starting from `start_offset`. The information returned is if this set of extents are marked
665    /// allocated/not allocated and also the size of this set (in bytes). This is used when
666    /// querying slices for volumes.
667    /// This function expects `start_offset` to be aligned to block size
668    pub async fn is_allocated(&self, start_offset: u64) -> Result<(bool, u64), Error> {
669        let block_size = self.block_size();
670        assert_eq!(start_offset % block_size, 0);
671
672        if start_offset > self.get_size() {
673            bail!(FxfsError::OutOfRange)
674        }
675
676        if start_offset == self.get_size() {
677            return Ok((false, 0));
678        }
679
680        let tree = &self.store().tree;
681        let layer_set = tree.layer_set();
682        let offset_key = ObjectKey::attribute(
683            self.object_id(),
684            self.attribute_id(),
685            AttributeKey::Extent(ExtentKey::search_key_from_offset(start_offset)),
686        );
687        let mut merger = layer_set.merger();
688        let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
689
690        let mut allocated = None;
691        let mut end = start_offset;
692
693        loop {
694            // Iterate through the extents, each time setting `end` as the end of the previous
695            // extent
696            match iter.get() {
697                Some(ItemRef {
698                    key:
699                        ObjectKey {
700                            object_id,
701                            data:
702                                ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
703                        },
704                    value: ObjectValue::Extent(extent_value),
705                    ..
706                }) => {
707                    // Equivalent of getting no extents back
708                    if *object_id != self.object_id() || *attribute_id != self.attribute_id() {
709                        if allocated == Some(false) || allocated.is_none() {
710                            end = self.get_size();
711                            allocated = Some(false);
712                        }
713                        break;
714                    }
715                    ensure!(extent_key.range.is_aligned(block_size), FxfsError::Inconsistent);
716                    if extent_key.range.start > end {
717                        // If a previous extent has already been visited and we are tracking an
718                        // allocated set, we are only interested in an extent where the range of the
719                        // current extent follows immediately after the previous one.
720                        if allocated == Some(true) {
721                            break;
722                        } else {
723                            // The gap between the previous `end` and this extent is not allocated
724                            end = extent_key.range.start;
725                            allocated = Some(false);
726                            // Continue this iteration, except now the `end` is set to the end of
727                            // the "previous" extent which is this gap between the start_offset
728                            // and the current extent
729                        }
730                    }
731
732                    // We can assume that from here, the `end` points to the end of a previous
733                    // extent.
734                    match extent_value {
735                        // The current extent has been allocated
736                        ExtentValue::Some { .. } => {
737                            // Stop searching if previous extent was marked deleted
738                            if allocated == Some(false) {
739                                break;
740                            }
741                            allocated = Some(true);
742                        }
743                        // This extent has been marked deleted
744                        ExtentValue::None => {
745                            // Stop searching if previous extent was marked allocated
746                            if allocated == Some(true) {
747                                break;
748                            }
749                            allocated = Some(false);
750                        }
751                    }
752                    end = extent_key.range.end;
753                }
754                // This occurs when there are no extents left
755                None => {
756                    if allocated == Some(false) || allocated.is_none() {
757                        end = self.get_size();
758                        allocated = Some(false);
759                    }
760                    // Otherwise, we were monitoring extents that were allocated, so just exit.
761                    break;
762                }
763                // Non-extent records (Object, Child, GraveyardEntry) are ignored.
764                Some(_) => {}
765            }
766            iter.advance().await?;
767        }
768
769        Ok((allocated.unwrap(), end - start_offset))
770    }
771
772    pub async fn txn_write<'a>(
773        &'a self,
774        transaction: &mut Transaction<'a>,
775        offset: u64,
776        buf: BufferRef<'_>,
777    ) -> Result<(), Error> {
778        if buf.is_empty() {
779            return Ok(());
780        }
781        let (aligned, mut transfer_buf) = self.align_buffer(offset, buf).await?;
782        self.multi_write(
783            transaction,
784            self.attribute_id(),
785            &[aligned.clone()],
786            transfer_buf.as_mut(),
787        )
788        .await?;
789        if offset + buf.len() as u64 > self.txn_get_size(transaction) {
790            self.txn_update_size(transaction, offset + buf.len() as u64, None).await?;
791        }
792        Ok(())
793    }
794
795    // Writes to multiple ranges with data provided in `buf`.  The buffer can be modified in place
796    // if encryption takes place.  The ranges must all be aligned and no change to content size is
797    // applied; the caller is responsible for updating size if required.
798    pub async fn multi_write<'a>(
799        &'a self,
800        transaction: &mut Transaction<'a>,
801        attribute_id: u64,
802        ranges: &[Range<u64>],
803        buf: MutableBufferRef<'_>,
804    ) -> Result<(), Error> {
805        self.handle.multi_write(transaction, attribute_id, None, ranges, buf).await
806    }
807
808    // If allow_allocations is false, then all the extents for the range must have been
809    // preallocated using preallocate_range or from existing writes.
810    //
811    // `buf` is mutable as an optimization, since the write may require encryption, we can
812    // encrypt the buffer in-place rather than copying to another buffer if the write is
813    // already aligned.
814    //
815    // Note: in the event of power failure during an overwrite() call, it is possible that
816    // old data (which hasn't been overwritten with new bytes yet) may be exposed to the user.
817    // Since the old data should be encrypted, it is probably safe to expose, although not ideal.
818    pub async fn overwrite(
819        &self,
820        mut offset: u64,
821        mut buf: MutableBufferRef<'_>,
822        allow_allocations: bool,
823    ) -> Result<(), Error> {
824        assert_eq!((buf.len() as u32) % self.store().device.block_size(), 0);
825        let end = offset + buf.len() as u64;
826
827        let key_id = self.get_key(None).await?.map_or(0, |k| k.key_id());
828
829        // The transaction only ends up being used if allow_allocations is true
830        let mut transaction =
831            if allow_allocations { Some(self.new_transaction().await?) } else { None };
832
833        // We build up a list of writes to perform later
834        let writes = FuturesUnordered::new();
835
836        // We create a new scope here, so that the merger iterator will get dropped before we try to
837        // commit our transaction. Otherwise the transaction commit would block.
838        {
839            let store = self.store();
840            let store_object_id = store.store_object_id;
841            let allocator = store.allocator();
842            let tree = &store.tree;
843            let layer_set = tree.layer_set();
844            let mut merger = layer_set.merger();
845            let mut iter = merger
846                .query(Query::FullRange(&ObjectKey::attribute(
847                    self.object_id(),
848                    self.attribute_id(),
849                    AttributeKey::Extent(ExtentKey::search_key_from_offset(offset)),
850                )))
851                .await?;
852            let block_size = self.block_size();
853
854            loop {
855                let (device_offset, bytes_to_write, should_advance) = match iter.get() {
856                    Some(ItemRef {
857                        key:
858                            ObjectKey {
859                                object_id,
860                                data:
861                                    ObjectKeyData::Attribute(
862                                        attribute_id,
863                                        AttributeKey::Extent(ExtentKey { range }),
864                                    ),
865                            },
866                        value: ObjectValue::Extent(ExtentValue::Some { .. }),
867                        ..
868                    }) if *object_id == self.object_id()
869                        && *attribute_id == self.attribute_id()
870                        && range.end == offset =>
871                    {
872                        iter.advance().await?;
873                        continue;
874                    }
875                    Some(ItemRef {
876                        key:
877                            ObjectKey {
878                                object_id,
879                                data:
880                                    ObjectKeyData::Attribute(
881                                        attribute_id,
882                                        AttributeKey::Extent(ExtentKey { range }),
883                                    ),
884                            },
885                        value,
886                        ..
887                    }) if *object_id == self.object_id()
888                        && *attribute_id == self.attribute_id()
889                        && range.start <= offset =>
890                    {
891                        match value {
892                            ObjectValue::Extent(ExtentValue::Some {
893                                device_offset,
894                                mode: ExtentMode::Raw,
895                                ..
896                            }) => {
897                                ensure!(
898                                    range.is_aligned(block_size) && device_offset % block_size == 0,
899                                    FxfsError::Inconsistent
900                                );
901                                let offset_within_extent = offset - range.start;
902                                let remaining_length_of_extent = (range
903                                    .end
904                                    .checked_sub(offset)
905                                    .ok_or(FxfsError::Inconsistent)?)
906                                    as usize;
907                                // Yields (device_offset, bytes_to_write, should_advance)
908                                (
909                                    device_offset + offset_within_extent,
910                                    min(buf.len(), remaining_length_of_extent),
911                                    true,
912                                )
913                            }
914                            ObjectValue::Extent(ExtentValue::Some { .. }) => {
915                                // TODO(https://fxbug.dev/42066056): Maybe we should create
916                                // a new extent without checksums?
917                                bail!(
918                                    "extent from ({},{}) which overlaps offset \
919                                        {} has the wrong extent mode",
920                                    range.start,
921                                    range.end,
922                                    offset
923                                )
924                            }
925                            _ => {
926                                bail!(
927                                    "overwrite failed: extent overlapping offset {} has \
928                                      unexpected ObjectValue",
929                                    offset
930                                )
931                            }
932                        }
933                    }
934                    maybe_item_ref => {
935                        if let Some(transaction) = transaction.as_mut() {
936                            assert_eq!(allow_allocations, true);
937                            assert_eq!(offset % self.block_size(), 0);
938
939                            // We are going to make a new extent, but let's check if there is an
940                            // extent after us. If there is an extent after us, then we don't want
941                            // our new extent to bump into it...
942                            let mut bytes_to_allocate =
943                                round_up(buf.len() as u64, self.block_size())
944                                    .ok_or(FxfsError::TooBig)?;
945                            if let Some(ItemRef {
946                                key:
947                                    ObjectKey {
948                                        object_id,
949                                        data:
950                                            ObjectKeyData::Attribute(
951                                                attribute_id,
952                                                AttributeKey::Extent(ExtentKey { range }),
953                                            ),
954                                    },
955                                ..
956                            }) = maybe_item_ref
957                            {
958                                if *object_id == self.object_id()
959                                    && *attribute_id == self.attribute_id()
960                                    && offset < range.start
961                                {
962                                    let bytes_until_next_extent = range.start - offset;
963                                    bytes_to_allocate =
964                                        min(bytes_to_allocate, bytes_until_next_extent);
965                                }
966                            }
967
968                            let device_range = allocator
969                                .allocate(transaction, store_object_id, bytes_to_allocate)
970                                .await?;
971                            let device_range_len = device_range.end - device_range.start;
972                            transaction.add(
973                                store_object_id,
974                                Mutation::insert_object(
975                                    ObjectKey::extent(
976                                        self.object_id(),
977                                        self.attribute_id(),
978                                        offset..offset + device_range_len,
979                                    ),
980                                    ObjectValue::Extent(ExtentValue::new_raw(
981                                        device_range.start,
982                                        key_id,
983                                    )),
984                                ),
985                            );
986
987                            self.update_allocated_size(transaction, device_range_len, 0).await?;
988
989                            // Yields (device_offset, bytes_to_write, should_advance)
990                            (device_range.start, min(buf.len(), device_range_len as usize), false)
991                        } else {
992                            bail!(
993                                "no extent overlapping offset {}, \
994                                and new allocations are not allowed",
995                                offset
996                            )
997                        }
998                    }
999                };
1000                let (current_buf, remaining_buf) = buf.split_at_mut(bytes_to_write);
1001                writes.push(self.write_at(offset, current_buf, device_offset));
1002                if remaining_buf.len() == 0 {
1003                    break;
1004                } else {
1005                    buf = remaining_buf;
1006                    offset += bytes_to_write as u64;
1007                    if should_advance {
1008                        iter.advance().await?;
1009                    }
1010                }
1011            }
1012        }
1013
1014        self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1015        // The checksums are being ignored here, but we don't need to know them
1016        writes.try_collect::<Vec<MaybeChecksums>>().await?;
1017
1018        if let Some(mut transaction) = transaction {
1019            assert_eq!(allow_allocations, true);
1020            if !transaction.is_empty() {
1021                if end > self.get_size() {
1022                    self.grow(&mut transaction, self.get_size(), end).await?;
1023                }
1024                transaction.commit().await?;
1025            }
1026        }
1027
1028        Ok(())
1029    }
1030
1031    // Within a transaction, the size of the object might have changed, so get the size from there
1032    // if it exists, otherwise, fall back on the cached size.
1033    fn txn_get_size(&self, transaction: &Transaction<'_>) -> u64 {
1034        transaction
1035            .get_object_mutation(
1036                self.store().store_object_id,
1037                ObjectKey::attribute(
1038                    self.object_id(),
1039                    self.attribute_id(),
1040                    AttributeKey::Attribute,
1041                ),
1042            )
1043            .and_then(|m| {
1044                if let ObjectItem { value: ObjectValue::Attribute { size, .. }, .. } = m.item {
1045                    Some(size)
1046                } else {
1047                    None
1048                }
1049            })
1050            .unwrap_or_else(|| self.get_size())
1051    }
1052
1053    pub async fn txn_update_size<'a>(
1054        &'a self,
1055        transaction: &mut Transaction<'a>,
1056        new_size: u64,
1057        // Allow callers to update the has_overwrite_extents metadata if they want. If this is
1058        // Some it is set to the value, if None it is left unchanged.
1059        update_has_overwrite_extents: Option<bool>,
1060    ) -> Result<(), Error> {
1061        let key =
1062            ObjectKey::attribute(self.object_id(), self.attribute_id(), AttributeKey::Attribute);
1063        let mut mutation = if let Some(mutation) =
1064            transaction.get_object_mutation(self.store().store_object_id(), key.clone())
1065        {
1066            mutation.clone()
1067        } else {
1068            ObjectStoreMutation {
1069                item: self.store().tree().find(&key).await?.ok_or(FxfsError::NotFound)?,
1070                op: Operation::ReplaceOrInsert,
1071            }
1072        };
1073        if let ObjectValue::Attribute { size, has_overwrite_extents } = &mut mutation.item.value {
1074            *size = new_size;
1075            if let Some(update_has_overwrite_extents) = update_has_overwrite_extents {
1076                *has_overwrite_extents = update_has_overwrite_extents;
1077            }
1078        } else {
1079            bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
1080        }
1081        transaction.add_with_object(
1082            self.store().store_object_id(),
1083            Mutation::ObjectStore(mutation),
1084            AssocObj::Borrowed(self),
1085        );
1086        Ok(())
1087    }
1088
1089    async fn update_allocated_size(
1090        &self,
1091        transaction: &mut Transaction<'_>,
1092        allocated: u64,
1093        deallocated: u64,
1094    ) -> Result<(), Error> {
1095        self.handle.update_allocated_size(transaction, allocated, deallocated).await
1096    }
1097
1098    pub fn truncate_overwrite_ranges(&self, size: u64) -> Result<Option<bool>, Error> {
1099        if self
1100            .overwrite_ranges
1101            .truncate(round_up(size, self.block_size()).ok_or(FxfsError::TooBig)?)
1102        {
1103            // This returns true if there were ranges, but this truncate removed them all, which
1104            // indicates that we need to flip the has_overwrite_extents metadata flag to false.
1105            Ok(Some(false))
1106        } else {
1107            Ok(None)
1108        }
1109    }
1110
1111    pub async fn shrink<'a>(
1112        &'a self,
1113        transaction: &mut Transaction<'a>,
1114        size: u64,
1115        update_has_overwrite_extents: Option<bool>,
1116    ) -> Result<NeedsTrim, Error> {
1117        let needs_trim = self.handle.shrink(transaction, self.attribute_id(), size).await?;
1118        self.txn_update_size(transaction, size, update_has_overwrite_extents).await?;
1119        Ok(needs_trim)
1120    }
1121
1122    pub async fn grow<'a>(
1123        &'a self,
1124        transaction: &mut Transaction<'a>,
1125        old_size: u64,
1126        size: u64,
1127    ) -> Result<(), Error> {
1128        // Before growing the file, we must make sure that a previous trim has completed.
1129        let store = self.store();
1130        while matches!(
1131            store
1132                .trim_some(
1133                    transaction,
1134                    self.object_id(),
1135                    self.attribute_id(),
1136                    TrimMode::FromOffset(old_size)
1137                )
1138                .await?,
1139            TrimResult::Incomplete
1140        ) {
1141            transaction.commit_and_continue().await?;
1142        }
1143        // We might need to zero out the tail of the old last block.
1144        let block_size = self.block_size();
1145        if old_size % block_size != 0 {
1146            let layer_set = store.tree.layer_set();
1147            let mut merger = layer_set.merger();
1148            let aligned_old_size = round_down(old_size, block_size);
1149            let iter = merger
1150                .query(Query::FullRange(&ObjectKey::extent(
1151                    self.object_id(),
1152                    self.attribute_id(),
1153                    aligned_old_size..aligned_old_size + 1,
1154                )))
1155                .await?;
1156            if let Some(ItemRef {
1157                key:
1158                    ObjectKey {
1159                        object_id,
1160                        data:
1161                            ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
1162                    },
1163                value: ObjectValue::Extent(ExtentValue::Some { device_offset, key_id, .. }),
1164                ..
1165            }) = iter.get()
1166            {
1167                if *object_id == self.object_id() && *attribute_id == self.attribute_id() {
1168                    let device_offset = device_offset
1169                        .checked_add(aligned_old_size - extent_key.range.start)
1170                        .ok_or(FxfsError::Inconsistent)?;
1171                    ensure!(device_offset % block_size == 0, FxfsError::Inconsistent);
1172                    let mut buf = self.allocate_buffer(block_size as usize).await;
1173                    // In the case that this extent is in OverwritePartial mode, there is a
1174                    // possibility that the last block is allocated, but not initialized yet, in
1175                    // which case we don't actually need to bother zeroing out the tail. However,
1176                    // it's not strictly incorrect to change uninitialized data, so we skip the
1177                    // check and blindly do it to keep it simpler here.
1178                    self.read_and_decrypt(
1179                        device_offset,
1180                        aligned_old_size,
1181                        buf.as_mut(),
1182                        *key_id,
1183                        None,
1184                    )
1185                    .await?;
1186                    buf.as_mut_slice()[(old_size % block_size) as usize..].fill(0);
1187                    self.multi_write(
1188                        transaction,
1189                        *attribute_id,
1190                        &[aligned_old_size..aligned_old_size + block_size],
1191                        buf.as_mut(),
1192                    )
1193                    .await?;
1194                }
1195            }
1196        }
1197        self.txn_update_size(transaction, size, None).await?;
1198        Ok(())
1199    }
1200
1201    /// Attempts to pre-allocate a `file_range` of bytes for this object.
1202    /// Returns a set of device ranges (i.e. potentially multiple extents).
1203    ///
1204    /// It may not be possible to preallocate the entire requested range in one request
1205    /// due to limitations on transaction size. In such cases, we will preallocate as much as
1206    /// we can up to some (arbitrary, internal) limit on transaction size.
1207    ///
1208    /// `file_range.start` is modified to point at the end of the logical range
1209    /// that was preallocated such that repeated calls to `preallocate_range` with new
1210    /// transactions can be used to preallocate ranges of any size.
1211    ///
1212    /// Requested range must be a multiple of block size.
1213    pub async fn preallocate_range<'a>(
1214        &'a self,
1215        transaction: &mut Transaction<'a>,
1216        file_range: &mut Range<u64>,
1217    ) -> Result<Vec<Range<u64>>, Error> {
1218        let block_size = self.block_size();
1219        assert!(file_range.is_aligned(block_size));
1220        assert!(!self.handle.is_encrypted());
1221        let mut ranges = Vec::new();
1222        let tree = &self.store().tree;
1223        let layer_set = tree.layer_set();
1224        let mut merger = layer_set.merger();
1225        let mut iter = merger
1226            .query(Query::FullRange(&ObjectKey::attribute(
1227                self.object_id(),
1228                self.attribute_id(),
1229                AttributeKey::Extent(ExtentKey::search_key_from_offset(file_range.start)),
1230            )))
1231            .await?;
1232        let mut allocated = 0;
1233        let key_id = self.get_key(None).await?.map_or(0, |k| k.key_id());
1234        'outer: while file_range.start < file_range.end {
1235            let allocate_end = loop {
1236                match iter.get() {
1237                    // Case for allocated extents for the same object that overlap with file_range.
1238                    Some(ItemRef {
1239                        key:
1240                            ObjectKey {
1241                                object_id,
1242                                data:
1243                                    ObjectKeyData::Attribute(
1244                                        attribute_id,
1245                                        AttributeKey::Extent(ExtentKey { range }),
1246                                    ),
1247                            },
1248                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1249                        ..
1250                    }) if *object_id == self.object_id()
1251                        && *attribute_id == self.attribute_id()
1252                        && range.start < file_range.end =>
1253                    {
1254                        ensure!(
1255                            range.is_valid()
1256                                && range.is_aligned(block_size)
1257                                && device_offset % block_size == 0,
1258                            FxfsError::Inconsistent
1259                        );
1260                        // If the start of the requested file_range overlaps with an existing extent...
1261                        if range.start <= file_range.start {
1262                            // Record the existing extent and move on.
1263                            let device_range = device_offset
1264                                .checked_add(file_range.start - range.start)
1265                                .ok_or(FxfsError::Inconsistent)?
1266                                ..device_offset
1267                                    .checked_add(min(range.end, file_range.end) - range.start)
1268                                    .ok_or(FxfsError::Inconsistent)?;
1269                            file_range.start += device_range.end - device_range.start;
1270                            ranges.push(device_range);
1271                            if file_range.start >= file_range.end {
1272                                break 'outer;
1273                            }
1274                            iter.advance().await?;
1275                            continue;
1276                        } else {
1277                            // There's nothing allocated between file_range.start and the beginning
1278                            // of this extent.
1279                            break range.start;
1280                        }
1281                    }
1282                    // Case for deleted extents eclipsed by file_range.
1283                    Some(ItemRef {
1284                        key:
1285                            ObjectKey {
1286                                object_id,
1287                                data:
1288                                    ObjectKeyData::Attribute(
1289                                        attribute_id,
1290                                        AttributeKey::Extent(ExtentKey { range }),
1291                                    ),
1292                            },
1293                        value: ObjectValue::Extent(ExtentValue::None),
1294                        ..
1295                    }) if *object_id == self.object_id()
1296                        && *attribute_id == self.attribute_id()
1297                        && range.end < file_range.end =>
1298                    {
1299                        iter.advance().await?;
1300                    }
1301                    _ => {
1302                        // We can just preallocate the rest.
1303                        break file_range.end;
1304                    }
1305                }
1306            };
1307            let device_range = self
1308                .store()
1309                .allocator()
1310                .allocate(
1311                    transaction,
1312                    self.store().store_object_id(),
1313                    allocate_end - file_range.start,
1314                )
1315                .await
1316                .context("Allocation failed")?;
1317            allocated += device_range.end - device_range.start;
1318            let this_file_range =
1319                file_range.start..file_range.start + device_range.end - device_range.start;
1320            file_range.start = this_file_range.end;
1321            transaction.add(
1322                self.store().store_object_id,
1323                Mutation::merge_object(
1324                    ObjectKey::extent(self.object_id(), self.attribute_id(), this_file_range),
1325                    ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
1326                ),
1327            );
1328            ranges.push(device_range);
1329            // If we didn't allocate all that we requested, we'll loop around and try again.
1330            // ... unless we have filled the transaction. The caller should check file_range.
1331            if transaction.mutations().len() > TRANSACTION_MUTATION_THRESHOLD {
1332                break;
1333            }
1334        }
1335        // Update the file size if it changed.
1336        if file_range.start > round_up(self.txn_get_size(transaction), block_size).unwrap() {
1337            self.txn_update_size(transaction, file_range.start, None).await?;
1338        }
1339        self.update_allocated_size(transaction, allocated, 0).await?;
1340        Ok(ranges)
1341    }
1342
1343    pub async fn update_attributes<'a>(
1344        &self,
1345        transaction: &mut Transaction<'a>,
1346        node_attributes: Option<&fio::MutableNodeAttributes>,
1347        change_time: Option<Timestamp>,
1348    ) -> Result<(), Error> {
1349        // This codepath is only called by files, whose wrapping key id users cannot directly set
1350        // as per fscrypt.
1351        ensure!(
1352            !matches!(
1353                node_attributes,
1354                Some(fio::MutableNodeAttributes { wrapping_key_id: Some(_), .. })
1355            ),
1356            FxfsError::BadPath
1357        );
1358        self.handle.update_attributes(transaction, node_attributes, change_time).await
1359    }
1360
1361    /// Get the default set of transaction options for this object. This is mostly the overall
1362    /// default, modified by any [`HandleOptions`] held by this handle.
1363    pub fn default_transaction_options<'b>(&self) -> Options<'b> {
1364        self.handle.default_transaction_options()
1365    }
1366
1367    pub async fn new_transaction<'b>(&self) -> Result<Transaction<'b>, Error> {
1368        self.new_transaction_with_options(self.default_transaction_options()).await
1369    }
1370
1371    pub async fn new_transaction_with_options<'b>(
1372        &self,
1373        options: Options<'b>,
1374    ) -> Result<Transaction<'b>, Error> {
1375        self.handle.new_transaction_with_options(self.attribute_id(), options).await
1376    }
1377
1378    /// Flushes the underlying device.  This is expensive and should be used sparingly.
1379    pub async fn flush_device(&self) -> Result<(), Error> {
1380        self.handle.flush_device().await
1381    }
1382
1383    /// Reads an entire attribute.
1384    pub async fn read_attr(&self, attribute_id: u64) -> Result<Option<Box<[u8]>>, Error> {
1385        self.handle.read_attr(attribute_id).await
1386    }
1387
1388    /// Writes an entire attribute.  This *always* uses the volume data key.
1389    pub async fn write_attr(&self, attribute_id: u64, data: &[u8]) -> Result<(), Error> {
1390        // Must be different attribute otherwise cached size gets out of date.
1391        assert_ne!(attribute_id, self.attribute_id());
1392        let store = self.store();
1393        let mut transaction = self.new_transaction().await?;
1394        if self.handle.write_attr(&mut transaction, attribute_id, data).await?.0 {
1395            transaction.commit_and_continue().await?;
1396            while matches!(
1397                store
1398                    .trim_some(
1399                        &mut transaction,
1400                        self.object_id(),
1401                        attribute_id,
1402                        TrimMode::FromOffset(data.len() as u64),
1403                    )
1404                    .await?,
1405                TrimResult::Incomplete
1406            ) {
1407                transaction.commit_and_continue().await?;
1408            }
1409        }
1410        transaction.commit().await?;
1411        Ok(())
1412    }
1413
1414    async fn read_and_decrypt(
1415        &self,
1416        device_offset: u64,
1417        file_offset: u64,
1418        buffer: MutableBufferRef<'_>,
1419        key_id: u64,
1420        block_bitmap: Option<bit_vec::BitVec>,
1421    ) -> Result<(), Error> {
1422        self.handle.read_and_decrypt(device_offset, file_offset, buffer, key_id, block_bitmap).await
1423    }
1424
1425    /// Truncates a file to a given size (growing/shrinking as required).
1426    ///
1427    /// Nb: Most code will want to call truncate() instead. This method is used
1428    /// to update the super block -- a case where we must borrow metadata space.
1429    pub async fn truncate_with_options(
1430        &self,
1431        options: Options<'_>,
1432        size: u64,
1433    ) -> Result<(), Error> {
1434        let mut transaction = self.new_transaction_with_options(options).await?;
1435        let old_size = self.get_size();
1436        if size == old_size {
1437            return Ok(());
1438        }
1439        if size < old_size {
1440            let update_has_overwrite_ranges = self.truncate_overwrite_ranges(size)?;
1441            if self.shrink(&mut transaction, size, update_has_overwrite_ranges).await?.0 {
1442                // The file needs to be trimmed.
1443                transaction.commit_and_continue().await?;
1444                let store = self.store();
1445                while matches!(
1446                    store
1447                        .trim_some(
1448                            &mut transaction,
1449                            self.object_id(),
1450                            self.attribute_id(),
1451                            TrimMode::FromOffset(size)
1452                        )
1453                        .await?,
1454                    TrimResult::Incomplete
1455                ) {
1456                    if let Err(error) = transaction.commit_and_continue().await {
1457                        warn!(error:?; "Failed to trim after truncate");
1458                        return Ok(());
1459                    }
1460                }
1461                if let Err(error) = transaction.commit().await {
1462                    warn!(error:?; "Failed to trim after truncate");
1463                }
1464                return Ok(());
1465            }
1466        } else {
1467            self.grow(&mut transaction, old_size, size).await?;
1468        }
1469        transaction.commit().await?;
1470        Ok(())
1471    }
1472
1473    pub async fn get_properties(&self) -> Result<ObjectProperties, Error> {
1474        // We don't take a read guard here since the object properties are contained in a single
1475        // object, which cannot be inconsistent with itself. The LSM tree does not return
1476        // intermediate states for a single object.
1477        let item = self
1478            .store()
1479            .tree
1480            .find(&ObjectKey::object(self.object_id()))
1481            .await?
1482            .expect("Unable to find object record");
1483        match item.value {
1484            ObjectValue::Object {
1485                kind: ObjectKind::File { refs, .. },
1486                attributes:
1487                    ObjectAttributes {
1488                        creation_time,
1489                        modification_time,
1490                        posix_attributes,
1491                        allocated_size,
1492                        access_time,
1493                        change_time,
1494                        ..
1495                    },
1496            } => Ok(ObjectProperties {
1497                refs,
1498                allocated_size,
1499                data_attribute_size: self.get_size(),
1500                creation_time,
1501                modification_time,
1502                access_time,
1503                change_time,
1504                sub_dirs: 0,
1505                posix_attributes,
1506                casefold: false,
1507                wrapping_key_id: None,
1508            }),
1509            _ => bail!(FxfsError::NotFile),
1510        }
1511    }
1512
1513    // Returns the contents of this object. This object must be < |limit| bytes in size.
1514    pub async fn contents(&self, limit: usize) -> Result<Box<[u8]>, Error> {
1515        let size = self.get_size();
1516        if size > limit as u64 {
1517            bail!("Object too big ({} > {})", size, limit);
1518        }
1519        let mut buf = self.allocate_buffer(size as usize).await;
1520        self.read(0u64, buf.as_mut()).await?;
1521        Ok(buf.as_slice().into())
1522    }
1523
1524    /// Returns the set of file_offset->extent mappings for this file.
1525    /// This operation is potentially expensive and should generally be avoided.
1526    pub async fn device_extents(&self) -> Result<Vec<(u64, Range<u64>)>, Error> {
1527        let mut extents = Vec::new();
1528        let tree = &self.store().tree;
1529        let layer_set = tree.layer_set();
1530        let mut merger = layer_set.merger();
1531        let mut iter = merger
1532            .query(Query::FullRange(&ObjectKey::attribute(
1533                self.object_id(),
1534                self.attribute_id(),
1535                AttributeKey::Extent(ExtentKey::search_key_from_offset(0)),
1536            )))
1537            .await?;
1538        loop {
1539            match iter.get() {
1540                Some(ItemRef {
1541                    key:
1542                        ObjectKey {
1543                            object_id,
1544                            data:
1545                                ObjectKeyData::Attribute(
1546                                    attribute_id,
1547                                    AttributeKey::Extent(ExtentKey { range }),
1548                                ),
1549                        },
1550                    value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1551                    ..
1552                }) if *object_id == self.object_id() && *attribute_id == self.attribute_id() => {
1553                    extents.push((
1554                        range.start,
1555                        *device_offset..*device_offset + range.length().unwrap(),
1556                    ))
1557                }
1558                _ => break,
1559            }
1560            iter.advance().await?;
1561        }
1562        Ok(extents)
1563    }
1564}
1565
1566impl<S: HandleOwner> AssociatedObject for DataObjectHandle<S> {
1567    fn will_apply_mutation(&self, mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
1568        match mutation {
1569            Mutation::ObjectStore(ObjectStoreMutation {
1570                item: ObjectItem { value: ObjectValue::Attribute { size, .. }, .. },
1571                ..
1572            }) => self.content_size.store(*size, atomic::Ordering::Relaxed),
1573            Mutation::ObjectStore(ObjectStoreMutation {
1574                item: ObjectItem { value: ObjectValue::VerifiedAttribute { size, .. }, .. },
1575                ..
1576            }) => {
1577                debug_assert_eq!(self.get_size(), *size, "VerifiedAttribute size should be set when verity is enabled and should not change");
1578                self.finalize_fsverity_state()
1579            }
1580            Mutation::ObjectStore(ObjectStoreMutation {
1581                item:
1582                    ObjectItem {
1583                        key:
1584                            ObjectKey {
1585                                object_id,
1586                                data:
1587                                    ObjectKeyData::Attribute(
1588                                        attr_id,
1589                                        AttributeKey::Extent(ExtentKey { range }),
1590                                    ),
1591                            },
1592                        value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
1593                        ..
1594                    },
1595                ..
1596            }) if self.object_id() == *object_id && self.attribute_id() == *attr_id => match mode {
1597                ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => {
1598                    self.overwrite_ranges.apply_range(range.clone())
1599                }
1600                ExtentMode::Raw | ExtentMode::Cow(_) => (),
1601            },
1602            _ => {}
1603        }
1604    }
1605}
1606
1607impl<S: HandleOwner> ObjectHandle for DataObjectHandle<S> {
1608    fn set_trace(&self, v: bool) {
1609        self.handle.set_trace(v)
1610    }
1611
1612    fn object_id(&self) -> u64 {
1613        self.handle.object_id()
1614    }
1615
1616    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
1617        self.handle.allocate_buffer(size)
1618    }
1619
1620    fn block_size(&self) -> u64 {
1621        self.handle.block_size()
1622    }
1623}
1624
1625#[async_trait]
1626impl<S: HandleOwner> ReadObjectHandle for DataObjectHandle<S> {
1627    async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
1628        let fs = self.store().filesystem();
1629        let guard = fs
1630            .lock_manager()
1631            .read_lock(lock_keys![LockKey::object_attribute(
1632                self.store().store_object_id,
1633                self.object_id(),
1634                self.attribute_id(),
1635            )])
1636            .await;
1637
1638        let size = self.get_size();
1639        if offset >= size {
1640            return Ok(0);
1641        }
1642        let length = min(buf.len() as u64, size - offset) as usize;
1643        buf = buf.subslice_mut(0..length);
1644        self.handle.read_unchecked(self.attribute_id(), offset, buf.reborrow(), &guard).await?;
1645        if self.is_verified_file() {
1646            self.verify_data(offset as usize, buf.as_slice()).await?;
1647        }
1648        Ok(length)
1649    }
1650
1651    fn get_size(&self) -> u64 {
1652        self.content_size.load(atomic::Ordering::Relaxed)
1653    }
1654}
1655
1656impl<S: HandleOwner> WriteObjectHandle for DataObjectHandle<S> {
1657    async fn write_or_append(&self, offset: Option<u64>, buf: BufferRef<'_>) -> Result<u64, Error> {
1658        let offset = offset.unwrap_or_else(|| self.get_size());
1659        let mut transaction = self.new_transaction().await?;
1660        self.txn_write(&mut transaction, offset, buf).await?;
1661        let new_size = self.txn_get_size(&transaction);
1662        transaction.commit().await?;
1663        Ok(new_size)
1664    }
1665
1666    async fn truncate(&self, size: u64) -> Result<(), Error> {
1667        self.truncate_with_options(self.default_transaction_options(), size).await
1668    }
1669
1670    async fn flush(&self) -> Result<(), Error> {
1671        Ok(())
1672    }
1673}
1674
1675/// Like object_handle::Writer, but allows custom transaction options to be set, and makes every
1676/// write go directly to the handle in a transaction.
1677pub struct DirectWriter<'a, S: HandleOwner> {
1678    handle: &'a DataObjectHandle<S>,
1679    options: transaction::Options<'a>,
1680    buffer: Buffer<'a>,
1681    offset: u64,
1682    buf_offset: usize,
1683}
1684
1685const BUFFER_SIZE: usize = 1_048_576;
1686
1687impl<S: HandleOwner> Drop for DirectWriter<'_, S> {
1688    fn drop(&mut self) {
1689        if self.buf_offset != 0 {
1690            warn!("DirectWriter: dropping data, did you forget to call complete?");
1691        }
1692    }
1693}
1694
1695impl<'a, S: HandleOwner> DirectWriter<'a, S> {
1696    pub async fn new(
1697        handle: &'a DataObjectHandle<S>,
1698        options: transaction::Options<'a>,
1699    ) -> DirectWriter<'a, S> {
1700        Self {
1701            handle,
1702            options,
1703            buffer: handle.allocate_buffer(BUFFER_SIZE).await,
1704            offset: 0,
1705            buf_offset: 0,
1706        }
1707    }
1708
1709    async fn flush(&mut self) -> Result<(), Error> {
1710        let mut transaction = self.handle.new_transaction_with_options(self.options).await?;
1711        self.handle
1712            .txn_write(&mut transaction, self.offset, self.buffer.subslice(..self.buf_offset))
1713            .await?;
1714        transaction.commit().await?;
1715        self.offset += self.buf_offset as u64;
1716        self.buf_offset = 0;
1717        Ok(())
1718    }
1719}
1720
1721impl<'a, S: HandleOwner> WriteBytes for DirectWriter<'a, S> {
1722    fn block_size(&self) -> u64 {
1723        self.handle.block_size()
1724    }
1725
1726    async fn write_bytes(&mut self, mut buf: &[u8]) -> Result<(), Error> {
1727        while buf.len() > 0 {
1728            let to_do = std::cmp::min(buf.len(), BUFFER_SIZE - self.buf_offset);
1729            self.buffer
1730                .subslice_mut(self.buf_offset..self.buf_offset + to_do)
1731                .as_mut_slice()
1732                .copy_from_slice(&buf[..to_do]);
1733            self.buf_offset += to_do;
1734            if self.buf_offset == BUFFER_SIZE {
1735                self.flush().await?;
1736            }
1737            buf = &buf[to_do..];
1738        }
1739        Ok(())
1740    }
1741
1742    async fn complete(&mut self) -> Result<(), Error> {
1743        self.flush().await?;
1744        Ok(())
1745    }
1746
1747    async fn skip(&mut self, amount: u64) -> Result<(), Error> {
1748        if (BUFFER_SIZE - self.buf_offset) as u64 > amount {
1749            self.buffer
1750                .subslice_mut(self.buf_offset..self.buf_offset + amount as usize)
1751                .as_mut_slice()
1752                .fill(0);
1753            self.buf_offset += amount as usize;
1754        } else {
1755            self.flush().await?;
1756            self.offset += amount;
1757        }
1758        Ok(())
1759    }
1760}
1761
1762#[cfg(test)]
1763mod tests {
1764    use crate::errors::FxfsError;
1765    use crate::filesystem::{
1766        FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem, SyncOptions,
1767    };
1768    use crate::fsck::{
1769        fsck, fsck_volume, fsck_volume_with_options, fsck_with_options, FsckOptions,
1770    };
1771    use crate::lsm_tree::types::{ItemRef, LayerIterator};
1772    use crate::lsm_tree::Query;
1773    use crate::object_handle::{
1774        ObjectHandle, ObjectProperties, ReadObjectHandle, WriteObjectHandle,
1775    };
1776    use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
1777    use crate::object_store::directory::replace_child;
1778    use crate::object_store::object_record::{ObjectKey, ObjectValue, Timestamp};
1779    use crate::object_store::transaction::{lock_keys, Mutation, Options};
1780    use crate::object_store::volume::root_volume;
1781    use crate::object_store::{
1782        AttributeKey, DataObjectHandle, Directory, ExtentKey, ExtentMode, ExtentValue,
1783        HandleOptions, LockKey, ObjectKeyData, ObjectStore, PosixAttributes,
1784        FSVERITY_MERKLE_ATTRIBUTE_ID, NO_OWNER, TRANSACTION_MUTATION_THRESHOLD,
1785    };
1786    use crate::range::RangeExt;
1787    use crate::round::{round_down, round_up};
1788    use assert_matches::assert_matches;
1789    use bit_vec::BitVec;
1790    use fuchsia_sync::Mutex;
1791    use futures::channel::oneshot::channel;
1792    use futures::stream::{FuturesUnordered, StreamExt};
1793    use futures::FutureExt;
1794    use fxfs_crypto::{Crypt, KeyPurpose};
1795    use fxfs_insecure_crypto::InsecureCrypt;
1796    use mundane::hash::{Digest, Hasher, Sha256};
1797    use rand::Rng;
1798    use std::ops::Range;
1799    use std::sync::Arc;
1800    use std::time::Duration;
1801    use storage_device::fake_device::FakeDevice;
1802    use storage_device::DeviceHolder;
1803    use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
1804
1805    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1806
1807    // Some tests (the preallocate_range ones) currently assume that the data only occupies a single
1808    // device block.
1809    const TEST_DATA_OFFSET: u64 = 5000;
1810    const TEST_DATA: &[u8] = b"hello";
1811    const TEST_OBJECT_SIZE: u64 = 5678;
1812    const TEST_OBJECT_ALLOCATED_SIZE: u64 = 4096;
1813    const TEST_OBJECT_NAME: &str = "foo";
1814
1815    async fn test_filesystem() -> OpenFxFilesystem {
1816        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1817        FxFilesystem::new_empty(device).await.expect("new_empty failed")
1818    }
1819
1820    async fn test_filesystem_and_object_with_key(
1821        crypt: Option<&dyn Crypt>,
1822        write_object_test_data: bool,
1823    ) -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
1824        let fs = test_filesystem().await;
1825        let store = fs.root_store();
1826        let object;
1827
1828        let mut transaction = fs
1829            .clone()
1830            .new_transaction(
1831                lock_keys![LockKey::object(
1832                    store.store_object_id(),
1833                    store.root_directory_object_id()
1834                )],
1835                Options::default(),
1836            )
1837            .await
1838            .expect("new_transaction failed");
1839
1840        object = if let Some(crypt) = crypt {
1841            let object_id = store.get_next_object_id(transaction.txn_guard()).await.unwrap();
1842            let (key, unwrapped_key) = crypt.create_key(object_id, KeyPurpose::Data).await.unwrap();
1843            ObjectStore::create_object_with_key(
1844                &store,
1845                &mut transaction,
1846                object_id,
1847                HandleOptions::default(),
1848                key,
1849                unwrapped_key,
1850            )
1851            .await
1852            .expect("create_object failed")
1853        } else {
1854            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
1855                .await
1856                .expect("create_object failed")
1857        };
1858
1859        let root_directory =
1860            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1861        root_directory
1862            .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
1863            .await
1864            .expect("add_child_file failed");
1865
1866        if write_object_test_data {
1867            let align = TEST_DATA_OFFSET as usize % TEST_DEVICE_BLOCK_SIZE as usize;
1868            let mut buf = object.allocate_buffer(align + TEST_DATA.len()).await;
1869            buf.as_mut_slice()[align..].copy_from_slice(TEST_DATA);
1870            object
1871                .txn_write(&mut transaction, TEST_DATA_OFFSET, buf.subslice(align..))
1872                .await
1873                .expect("write failed");
1874        }
1875        transaction.commit().await.expect("commit failed");
1876        object.truncate(TEST_OBJECT_SIZE).await.expect("truncate failed");
1877        (fs, object)
1878    }
1879
1880    async fn test_filesystem_and_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
1881        test_filesystem_and_object_with_key(Some(&InsecureCrypt::new()), true).await
1882    }
1883
1884    async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
1885    {
1886        test_filesystem_and_object_with_key(Some(&InsecureCrypt::new()), false).await
1887    }
1888
1889    #[fuchsia::test]
1890    async fn test_zero_buf_len_read() {
1891        let (fs, object) = test_filesystem_and_object().await;
1892        let mut buf = object.allocate_buffer(0).await;
1893        assert_eq!(object.read(0u64, buf.as_mut()).await.expect("read failed"), 0);
1894        fs.close().await.expect("Close failed");
1895    }
1896
1897    #[fuchsia::test]
1898    async fn test_beyond_eof_read() {
1899        let (fs, object) = test_filesystem_and_object().await;
1900        let offset = TEST_OBJECT_SIZE as usize - 2;
1901        let align = offset % fs.block_size() as usize;
1902        let len: usize = 2;
1903        let mut buf = object.allocate_buffer(align + len + 1).await;
1904        buf.as_mut_slice().fill(123u8);
1905        assert_eq!(
1906            object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
1907            align + len
1908        );
1909        assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
1910        assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
1911        fs.close().await.expect("Close failed");
1912    }
1913
1914    #[fuchsia::test]
1915    async fn test_beyond_eof_read_from() {
1916        let (fs, object) = test_filesystem_and_object().await;
1917        let handle = &*object;
1918        let offset = TEST_OBJECT_SIZE as usize - 2;
1919        let align = offset % fs.block_size() as usize;
1920        let len: usize = 2;
1921        let mut buf = object.allocate_buffer(align + len + 1).await;
1922        buf.as_mut_slice().fill(123u8);
1923        assert_eq!(
1924            handle.read(0, (offset - align) as u64, buf.as_mut()).await.expect("read failed"),
1925            align + len
1926        );
1927        assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
1928        assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
1929        fs.close().await.expect("Close failed");
1930    }
1931
1932    #[fuchsia::test]
1933    async fn test_beyond_eof_read_unchecked() {
1934        let (fs, object) = test_filesystem_and_object().await;
1935        let offset = TEST_OBJECT_SIZE as usize - 2;
1936        let align = offset % fs.block_size() as usize;
1937        let len: usize = 2;
1938        let mut buf = object.allocate_buffer(align + len + 1).await;
1939        buf.as_mut_slice().fill(123u8);
1940        let guard = fs
1941            .lock_manager()
1942            .read_lock(lock_keys![LockKey::object_attribute(
1943                object.store().store_object_id,
1944                object.object_id(),
1945                0,
1946            )])
1947            .await;
1948        object
1949            .read_unchecked(0, (offset - align) as u64, buf.as_mut(), &guard)
1950            .await
1951            .expect("read failed");
1952        assert_eq!(&buf.as_slice()[align..], &vec![0u8; len + 1]);
1953        fs.close().await.expect("Close failed");
1954    }
1955
1956    #[fuchsia::test]
1957    async fn test_read_sparse() {
1958        let (fs, object) = test_filesystem_and_object().await;
1959        // Deliberately read not right to eof.
1960        let len = TEST_OBJECT_SIZE as usize - 1;
1961        let mut buf = object.allocate_buffer(len).await;
1962        buf.as_mut_slice().fill(123u8);
1963        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
1964        let mut expected = vec![0; len];
1965        let offset = TEST_DATA_OFFSET as usize;
1966        expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
1967        assert_eq!(buf.as_slice()[..len], expected[..]);
1968        fs.close().await.expect("Close failed");
1969    }
1970
1971    #[fuchsia::test]
1972    async fn test_read_after_writes_interspersed_with_flush() {
1973        let (fs, object) = test_filesystem_and_object().await;
1974
1975        object.owner().flush().await.expect("flush failed");
1976
1977        // Write more test data to the first block fo the file.
1978        let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
1979        buf.as_mut_slice().copy_from_slice(TEST_DATA);
1980        object.write_or_append(Some(0u64), buf.as_ref()).await.expect("write failed");
1981
1982        let len = TEST_OBJECT_SIZE as usize - 1;
1983        let mut buf = object.allocate_buffer(len).await;
1984        buf.as_mut_slice().fill(123u8);
1985        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
1986
1987        let mut expected = vec![0u8; len];
1988        let offset = TEST_DATA_OFFSET as usize;
1989        expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
1990        expected[..TEST_DATA.len()].copy_from_slice(TEST_DATA);
1991        assert_eq!(buf.as_slice(), &expected);
1992        fs.close().await.expect("Close failed");
1993    }
1994
1995    #[fuchsia::test]
1996    async fn test_read_after_truncate_and_extend() {
1997        let (fs, object) = test_filesystem_and_object().await;
1998
1999        // Arrange for there to be <extent><deleted-extent><extent>.
2000        let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
2001        buf.as_mut_slice().copy_from_slice(TEST_DATA);
2002        // This adds an extent at 0..512.
2003        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2004        // This deletes 512..1024.
2005        object.truncate(3).await.expect("truncate failed");
2006        let data = b"foo";
2007        let offset = 1500u64;
2008        let align = (offset % fs.block_size() as u64) as usize;
2009        let mut buf = object.allocate_buffer(align + data.len()).await;
2010        buf.as_mut_slice()[align..].copy_from_slice(data);
2011        // This adds 1024..1536.
2012        object.write_or_append(Some(1500), buf.subslice(align..)).await.expect("write failed");
2013
2014        const LEN1: usize = 1503;
2015        let mut buf = object.allocate_buffer(LEN1).await;
2016        buf.as_mut_slice().fill(123u8);
2017        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN1);
2018        let mut expected = [0; LEN1];
2019        expected[..3].copy_from_slice(&TEST_DATA[..3]);
2020        expected[1500..].copy_from_slice(b"foo");
2021        assert_eq!(buf.as_slice(), &expected);
2022
2023        // Also test a read that ends midway through the deleted extent.
2024        const LEN2: usize = 601;
2025        let mut buf = object.allocate_buffer(LEN2).await;
2026        buf.as_mut_slice().fill(123u8);
2027        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN2);
2028        assert_eq!(buf.as_slice(), &expected[..LEN2]);
2029        fs.close().await.expect("Close failed");
2030    }
2031
2032    #[fuchsia::test]
2033    async fn test_read_whole_blocks_with_multiple_objects() {
2034        let (fs, object) = test_filesystem_and_object().await;
2035        let block_size = object.block_size() as usize;
2036        let mut buffer = object.allocate_buffer(block_size).await;
2037        buffer.as_mut_slice().fill(0xaf);
2038        object.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2039
2040        let store = object.owner();
2041        let mut transaction = fs
2042            .clone()
2043            .new_transaction(lock_keys![], Options::default())
2044            .await
2045            .expect("new_transaction failed");
2046        let object2 =
2047            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2048                .await
2049                .expect("create_object failed");
2050        transaction.commit().await.expect("commit failed");
2051        let mut ef_buffer = object.allocate_buffer(block_size).await;
2052        ef_buffer.as_mut_slice().fill(0xef);
2053        object2.write_or_append(Some(0), ef_buffer.as_ref()).await.expect("write failed");
2054
2055        let mut buffer = object.allocate_buffer(block_size).await;
2056        buffer.as_mut_slice().fill(0xaf);
2057        object
2058            .write_or_append(Some(block_size as u64), buffer.as_ref())
2059            .await
2060            .expect("write failed");
2061        object.truncate(3 * block_size as u64).await.expect("truncate failed");
2062        object2
2063            .write_or_append(Some(block_size as u64), ef_buffer.as_ref())
2064            .await
2065            .expect("write failed");
2066
2067        let mut buffer = object.allocate_buffer(4 * block_size).await;
2068        buffer.as_mut_slice().fill(123);
2069        assert_eq!(object.read(0, buffer.as_mut()).await.expect("read failed"), 3 * block_size);
2070        assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xaf; 2 * block_size]);
2071        assert_eq!(&buffer.as_slice()[2 * block_size..3 * block_size], &vec![0; block_size]);
2072        assert_eq!(object2.read(0, buffer.as_mut()).await.expect("read failed"), 2 * block_size);
2073        assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xef; 2 * block_size]);
2074        fs.close().await.expect("Close failed");
2075    }
2076
2077    #[fuchsia::test]
2078    async fn test_alignment() {
2079        let (fs, object) = test_filesystem_and_object().await;
2080
2081        struct AlignTest {
2082            fill: u8,
2083            object: DataObjectHandle<ObjectStore>,
2084            mirror: Vec<u8>,
2085        }
2086
2087        impl AlignTest {
2088            async fn new(object: DataObjectHandle<ObjectStore>) -> Self {
2089                let mirror = {
2090                    let mut buf = object.allocate_buffer(object.get_size() as usize).await;
2091                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2092                    buf.as_slice().to_vec()
2093                };
2094                Self { fill: 0, object, mirror }
2095            }
2096
2097            // Fills |range| of self.object with a byte value (self.fill) and mirrors the same
2098            // operation to an in-memory copy of the object.
2099            // Each subsequent call bumps the value of fill.
2100            // It is expected that the object and its mirror maintain identical content.
2101            async fn test(&mut self, range: Range<u64>) {
2102                let mut buf = self.object.allocate_buffer((range.end - range.start) as usize).await;
2103                self.fill += 1;
2104                buf.as_mut_slice().fill(self.fill);
2105                self.object
2106                    .write_or_append(Some(range.start), buf.as_ref())
2107                    .await
2108                    .expect("write_or_append failed");
2109                if range.end > self.mirror.len() as u64 {
2110                    self.mirror.resize(range.end as usize, 0);
2111                }
2112                self.mirror[range.start as usize..range.end as usize].fill(self.fill);
2113                let mut buf = self.object.allocate_buffer(self.mirror.len() + 1).await;
2114                assert_eq!(
2115                    self.object.read(0, buf.as_mut()).await.expect("read failed"),
2116                    self.mirror.len()
2117                );
2118                assert_eq!(&buf.as_slice()[..self.mirror.len()], self.mirror.as_slice());
2119            }
2120        }
2121
2122        let block_size = object.block_size() as u64;
2123        let mut align = AlignTest::new(object).await;
2124
2125        // Fill the object to start with (with 1).
2126        align.test(0..2 * block_size + 1).await;
2127
2128        // Unaligned head (fills with 2, overwrites that with 3).
2129        align.test(1..block_size).await;
2130        align.test(1..2 * block_size).await;
2131
2132        // Unaligned tail (fills with 4 and 5).
2133        align.test(0..block_size - 1).await;
2134        align.test(0..2 * block_size - 1).await;
2135
2136        // Both unaligned (fills with 6 and 7).
2137        align.test(1..block_size - 1).await;
2138        align.test(1..2 * block_size - 1).await;
2139
2140        fs.close().await.expect("Close failed");
2141    }
2142
2143    async fn test_preallocate_common(fs: &FxFilesystem, object: DataObjectHandle<ObjectStore>) {
2144        let allocator = fs.allocator();
2145        let allocated_before = allocator.get_allocated_bytes();
2146        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2147        object
2148            .preallocate_range(&mut transaction, &mut (0..fs.block_size() as u64))
2149            .await
2150            .expect("preallocate_range failed");
2151        transaction.commit().await.expect("commit failed");
2152        assert!(object.get_size() < 1048576);
2153        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2154        object
2155            .preallocate_range(&mut transaction, &mut (0..1048576))
2156            .await
2157            .expect("preallocate_range failed");
2158        transaction.commit().await.expect("commit failed");
2159        assert_eq!(object.get_size(), 1048576);
2160        // Check that it didn't reallocate the space for the existing extent
2161        let allocated_after = allocator.get_allocated_bytes();
2162        assert_eq!(allocated_after - allocated_before, 1048576 - fs.block_size() as u64);
2163
2164        let mut buf = object
2165            .allocate_buffer(round_up(TEST_DATA_OFFSET, fs.block_size()).unwrap() as usize)
2166            .await;
2167        buf.as_mut_slice().fill(47);
2168        object
2169            .write_or_append(Some(0), buf.subslice(..TEST_DATA_OFFSET as usize))
2170            .await
2171            .expect("write failed");
2172        buf.as_mut_slice().fill(95);
2173        let offset = round_up(TEST_OBJECT_SIZE, fs.block_size()).unwrap();
2174        object.overwrite(offset, buf.as_mut(), false).await.expect("write failed");
2175
2176        // Make sure there were no more allocations.
2177        assert_eq!(allocator.get_allocated_bytes(), allocated_after);
2178
2179        // Read back the data and make sure it is what we expect.
2180        let mut buf = object.allocate_buffer(104876).await;
2181        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2182        assert_eq!(&buf.as_slice()[..TEST_DATA_OFFSET as usize], &[47; TEST_DATA_OFFSET as usize]);
2183        assert_eq!(
2184            &buf.as_slice()[TEST_DATA_OFFSET as usize..TEST_DATA_OFFSET as usize + TEST_DATA.len()],
2185            TEST_DATA
2186        );
2187        assert_eq!(&buf.as_slice()[offset as usize..offset as usize + 2048], &[95; 2048]);
2188    }
2189
2190    #[fuchsia::test]
2191    async fn test_preallocate_range() {
2192        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2193        test_preallocate_common(&fs, object).await;
2194        fs.close().await.expect("Close failed");
2195    }
2196
2197    // This is identical to the previous test except that we flush so that extents end up in
2198    // different layers.
2199    #[fuchsia::test]
2200    async fn test_preallocate_succeeds_when_extents_are_in_different_layers() {
2201        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2202        object.owner().flush().await.expect("flush failed");
2203        test_preallocate_common(&fs, object).await;
2204        fs.close().await.expect("Close failed");
2205    }
2206
2207    #[fuchsia::test]
2208    async fn test_already_preallocated() {
2209        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2210        let allocator = fs.allocator();
2211        let allocated_before = allocator.get_allocated_bytes();
2212        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2213        let offset = TEST_DATA_OFFSET - TEST_DATA_OFFSET % fs.block_size() as u64;
2214        object
2215            .preallocate_range(&mut transaction, &mut (offset..offset + fs.block_size() as u64))
2216            .await
2217            .expect("preallocate_range failed");
2218        transaction.commit().await.expect("commit failed");
2219        // Check that it didn't reallocate any new space.
2220        assert_eq!(allocator.get_allocated_bytes(), allocated_before);
2221        fs.close().await.expect("Close failed");
2222    }
2223
2224    #[fuchsia::test]
2225    async fn test_overwrite_when_preallocated_at_start_of_file() {
2226        // The standard test data we put in the test object would cause an extent with checksums
2227        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2228        let (fs, object) = test_filesystem_and_empty_object().await;
2229
2230        let object = ObjectStore::open_object(
2231            object.owner(),
2232            object.object_id(),
2233            HandleOptions::default(),
2234            None,
2235        )
2236        .await
2237        .expect("open_object failed");
2238
2239        assert_eq!(fs.block_size(), 4096);
2240
2241        let mut write_buf = object.allocate_buffer(4096).await;
2242        write_buf.as_mut_slice().fill(95);
2243
2244        // First try to overwrite without allowing allocations
2245        // We expect this to fail, since nothing is allocated yet
2246        object.overwrite(0, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
2247
2248        // Now preallocate some space (exactly one block)
2249        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2250        object
2251            .preallocate_range(&mut transaction, &mut (0..4096 as u64))
2252            .await
2253            .expect("preallocate_range failed");
2254        transaction.commit().await.expect("commit failed");
2255
2256        // Now try the same overwrite command as before, it should work this time,
2257        // even with allocations disabled...
2258        {
2259            let mut read_buf = object.allocate_buffer(4096).await;
2260            object.read(0, read_buf.as_mut()).await.expect("read failed");
2261            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2262        }
2263        object.overwrite(0, write_buf.as_mut(), false).await.expect("overwrite failed");
2264        {
2265            let mut read_buf = object.allocate_buffer(4096).await;
2266            object.read(0, read_buf.as_mut()).await.expect("read failed");
2267            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2268        }
2269
2270        // Now try to overwrite at offset 4096. We expect this to fail, since we only preallocated
2271        // one block earlier at offset 0
2272        object.overwrite(4096, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
2273
2274        // We can't assert anything about the existing bytes, because they haven't been allocated
2275        // yet and they could contain any values
2276        object.overwrite(4096, write_buf.as_mut(), true).await.expect("overwrite failed");
2277        {
2278            let mut read_buf = object.allocate_buffer(4096).await;
2279            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2280            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2281        }
2282
2283        // Check that the overwrites haven't messed up the filesystem state
2284        let fsck_options = FsckOptions {
2285            fail_on_warning: true,
2286            no_lock: true,
2287            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2288            ..Default::default()
2289        };
2290        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2291
2292        fs.close().await.expect("Close failed");
2293    }
2294
2295    #[fuchsia::test]
2296    async fn test_overwrite_large_buffer_and_file_with_many_holes() {
2297        // The standard test data we put in the test object would cause an extent with checksums
2298        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2299        let (fs, object) = test_filesystem_and_empty_object().await;
2300
2301        let object = ObjectStore::open_object(
2302            object.owner(),
2303            object.object_id(),
2304            HandleOptions::default(),
2305            None,
2306        )
2307        .await
2308        .expect("open_object failed");
2309
2310        assert_eq!(fs.block_size(), 4096);
2311        assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2312
2313        // Let's create some non-holes
2314        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2315        object
2316            .preallocate_range(&mut transaction, &mut (4096..8192 as u64))
2317            .await
2318            .expect("preallocate_range failed");
2319        object
2320            .preallocate_range(&mut transaction, &mut (16384..32768 as u64))
2321            .await
2322            .expect("preallocate_range failed");
2323        object
2324            .preallocate_range(&mut transaction, &mut (65536..131072 as u64))
2325            .await
2326            .expect("preallocate_range failed");
2327        object
2328            .preallocate_range(&mut transaction, &mut (262144..524288 as u64))
2329            .await
2330            .expect("preallocate_range failed");
2331        transaction.commit().await.expect("commit failed");
2332
2333        assert_eq!(object.get_size(), 524288);
2334
2335        let mut write_buf = object.allocate_buffer(4096).await;
2336        write_buf.as_mut_slice().fill(95);
2337
2338        // We shouldn't be able to overwrite in the holes if new allocations aren't enabled
2339        object.overwrite(0, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
2340        object.overwrite(8192, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
2341        object.overwrite(32768, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
2342        object.overwrite(131072, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
2343
2344        // But we should be able to overwrite in the prealloc'd areas without needing allocations
2345        {
2346            let mut read_buf = object.allocate_buffer(4096).await;
2347            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2348            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2349        }
2350        object.overwrite(4096, write_buf.as_mut(), false).await.expect("overwrite failed");
2351        {
2352            let mut read_buf = object.allocate_buffer(4096).await;
2353            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2354            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2355        }
2356        {
2357            let mut read_buf = object.allocate_buffer(4096).await;
2358            object.read(16384, read_buf.as_mut()).await.expect("read failed");
2359            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2360        }
2361        object.overwrite(16384, write_buf.as_mut(), false).await.expect("overwrite failed");
2362        {
2363            let mut read_buf = object.allocate_buffer(4096).await;
2364            object.read(16384, read_buf.as_mut()).await.expect("read failed");
2365            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2366        }
2367        {
2368            let mut read_buf = object.allocate_buffer(4096).await;
2369            object.read(65536, read_buf.as_mut()).await.expect("read failed");
2370            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2371        }
2372        object.overwrite(65536, write_buf.as_mut(), false).await.expect("overwrite failed");
2373        {
2374            let mut read_buf = object.allocate_buffer(4096).await;
2375            object.read(65536, read_buf.as_mut()).await.expect("read failed");
2376            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2377        }
2378        {
2379            let mut read_buf = object.allocate_buffer(4096).await;
2380            object.read(262144, read_buf.as_mut()).await.expect("read failed");
2381            assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2382        }
2383        object.overwrite(262144, write_buf.as_mut(), false).await.expect("overwrite failed");
2384        {
2385            let mut read_buf = object.allocate_buffer(4096).await;
2386            object.read(262144, read_buf.as_mut()).await.expect("read failed");
2387            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2388        }
2389
2390        // Now let's try to do a huge overwrite, that spans over many holes and non-holes
2391        let mut huge_write_buf = object.allocate_buffer(524288).await;
2392        huge_write_buf.as_mut_slice().fill(96);
2393
2394        // With allocations disabled, the big overwrite should fail...
2395        object.overwrite(0, huge_write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
2396        // ... but it should work when allocations are enabled
2397        object.overwrite(0, huge_write_buf.as_mut(), true).await.expect("overwrite failed");
2398        {
2399            let mut read_buf = object.allocate_buffer(524288).await;
2400            object.read(0, read_buf.as_mut()).await.expect("read failed");
2401            assert_eq!(&read_buf.as_slice(), &[96; 524288]);
2402        }
2403
2404        // Check that the overwrites haven't messed up the filesystem state
2405        let fsck_options = FsckOptions {
2406            fail_on_warning: true,
2407            no_lock: true,
2408            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2409            ..Default::default()
2410        };
2411        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2412
2413        fs.close().await.expect("Close failed");
2414    }
2415
2416    #[fuchsia::test]
2417    async fn test_overwrite_when_unallocated_at_start_of_file() {
2418        // The standard test data we put in the test object would cause an extent with checksums
2419        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2420        let (fs, object) = test_filesystem_and_empty_object().await;
2421
2422        let object = ObjectStore::open_object(
2423            object.owner(),
2424            object.object_id(),
2425            HandleOptions::default(),
2426            None,
2427        )
2428        .await
2429        .expect("open_object failed");
2430
2431        assert_eq!(fs.block_size(), 4096);
2432
2433        let mut write_buf = object.allocate_buffer(4096).await;
2434        write_buf.as_mut_slice().fill(95);
2435
2436        // First try to overwrite without allowing allocations
2437        // We expect this to fail, since nothing is allocated yet
2438        object.overwrite(0, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
2439
2440        // Now try the same overwrite command as before, but allow allocations
2441        object.overwrite(0, write_buf.as_mut(), true).await.expect("overwrite failed");
2442        {
2443            let mut read_buf = object.allocate_buffer(4096).await;
2444            object.read(0, read_buf.as_mut()).await.expect("read failed");
2445            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2446        }
2447
2448        // Now try to overwrite at the next block. This should fail if allocations are disabled
2449        object.overwrite(4096, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
2450
2451        // ... but it should work if allocations are enabled
2452        object.overwrite(4096, write_buf.as_mut(), true).await.expect("overwrite failed");
2453        {
2454            let mut read_buf = object.allocate_buffer(4096).await;
2455            object.read(4096, read_buf.as_mut()).await.expect("read failed");
2456            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2457        }
2458
2459        // Check that the overwrites haven't messed up the filesystem state
2460        let fsck_options = FsckOptions {
2461            fail_on_warning: true,
2462            no_lock: true,
2463            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2464            ..Default::default()
2465        };
2466        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2467
2468        fs.close().await.expect("Close failed");
2469    }
2470
2471    #[fuchsia::test]
2472    async fn test_overwrite_can_extend_a_file() {
2473        // The standard test data we put in the test object would cause an extent with checksums
2474        // to be created, which overwrite() doesn't support. So we create an empty object instead.
2475        let (fs, object) = test_filesystem_and_empty_object().await;
2476
2477        let object = ObjectStore::open_object(
2478            object.owner(),
2479            object.object_id(),
2480            HandleOptions::default(),
2481            None,
2482        )
2483        .await
2484        .expect("open_object failed");
2485
2486        assert_eq!(fs.block_size(), 4096);
2487        assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2488
2489        let mut write_buf = object.allocate_buffer(4096).await;
2490        write_buf.as_mut_slice().fill(95);
2491
2492        // Let's try to fill up the last block, and increase the file size in doing so
2493        let last_block_offset = round_down(TEST_OBJECT_SIZE, 4096 as u32);
2494
2495        // Expected to fail with allocations disabled
2496        object
2497            .overwrite(last_block_offset, write_buf.as_mut(), false)
2498            .await
2499            .expect_err("overwrite succeeded");
2500        // ... but expected to succeed with allocations enabled
2501        object
2502            .overwrite(last_block_offset, write_buf.as_mut(), true)
2503            .await
2504            .expect("overwrite failed");
2505        {
2506            let mut read_buf = object.allocate_buffer(4096).await;
2507            object.read(last_block_offset, read_buf.as_mut()).await.expect("read failed");
2508            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2509        }
2510
2511        assert_eq!(object.get_size(), 8192);
2512
2513        // Let's try to write at the next block, too
2514        let next_block_offset = round_up(TEST_OBJECT_SIZE, 4096 as u32).unwrap();
2515
2516        // Expected to fail with allocations disabled
2517        object
2518            .overwrite(next_block_offset, write_buf.as_mut(), false)
2519            .await
2520            .expect_err("overwrite succeeded");
2521        // ... but expected to succeed with allocations enabled
2522        object
2523            .overwrite(next_block_offset, write_buf.as_mut(), true)
2524            .await
2525            .expect("overwrite failed");
2526        {
2527            let mut read_buf = object.allocate_buffer(4096).await;
2528            object.read(next_block_offset, read_buf.as_mut()).await.expect("read failed");
2529            assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2530        }
2531
2532        assert_eq!(object.get_size(), 12288);
2533
2534        // Check that the overwrites haven't messed up the filesystem state
2535        let fsck_options = FsckOptions {
2536            fail_on_warning: true,
2537            no_lock: true,
2538            on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2539            ..Default::default()
2540        };
2541        fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2542
2543        fs.close().await.expect("Close failed");
2544    }
2545
2546    #[fuchsia::test]
2547    async fn test_enable_verity() {
2548        let fs: OpenFxFilesystem = test_filesystem().await;
2549        let mut transaction = fs
2550            .clone()
2551            .new_transaction(lock_keys![], Options::default())
2552            .await
2553            .expect("new_transaction failed");
2554        let store = fs.root_store();
2555        let object = Arc::new(
2556            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2557                .await
2558                .expect("create_object failed"),
2559        );
2560
2561        transaction.commit().await.unwrap();
2562
2563        object
2564            .enable_verity(fio::VerificationOptions {
2565                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2566                salt: Some(vec![]),
2567                ..Default::default()
2568            })
2569            .await
2570            .expect("set verified file metadata failed");
2571
2572        let handle =
2573            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2574                .await
2575                .expect("open_object failed");
2576
2577        assert!(handle.is_verified_file());
2578
2579        fs.close().await.expect("Close failed");
2580    }
2581
2582    #[fuchsia::test]
2583    async fn test_enable_verity_large_file() {
2584        // Need to make a large FakeDevice to create space for a 67 MB file.
2585        let device = DeviceHolder::new(FakeDevice::new(262144, TEST_DEVICE_BLOCK_SIZE));
2586        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2587        let root_store = fs.root_store();
2588        let mut transaction = fs
2589            .clone()
2590            .new_transaction(lock_keys![], Options::default())
2591            .await
2592            .expect("new_transaction failed");
2593
2594        let handle = ObjectStore::create_object(
2595            &root_store,
2596            &mut transaction,
2597            HandleOptions::default(),
2598            None,
2599        )
2600        .await
2601        .expect("failed to create object");
2602        transaction.commit().await.expect("commit failed");
2603        let mut offset = 0;
2604
2605        // Write a file big enough to trigger multiple transactions on enable_verity().
2606        let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2607        buf.as_mut_slice().fill(1);
2608        for _ in 0..130 {
2609            handle.write_or_append(Some(offset), buf.as_ref()).await.expect("write failed");
2610            offset += WRITE_ATTR_BATCH_SIZE as u64;
2611        }
2612
2613        handle
2614            .enable_verity(fio::VerificationOptions {
2615                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2616                salt: Some(vec![]),
2617                ..Default::default()
2618            })
2619            .await
2620            .expect("set verified file metadata failed");
2621
2622        let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2623        offset = 0;
2624        for _ in 0..130 {
2625            handle.read(offset, buf.as_mut()).await.expect("verification during read should fail");
2626            assert_eq!(buf.as_slice(), &[1; WRITE_ATTR_BATCH_SIZE]);
2627            offset += WRITE_ATTR_BATCH_SIZE as u64;
2628        }
2629
2630        fsck(fs.clone()).await.expect("fsck failed");
2631        fs.close().await.expect("Close failed");
2632    }
2633
2634    #[fuchsia::test]
2635    async fn test_retry_enable_verity_on_reboot() {
2636        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2637        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2638        let root_store = fs.root_store();
2639        let mut transaction = fs
2640            .clone()
2641            .new_transaction(lock_keys![], Options::default())
2642            .await
2643            .expect("new_transaction failed");
2644
2645        let handle = ObjectStore::create_object(
2646            &root_store,
2647            &mut transaction,
2648            HandleOptions::default(),
2649            None,
2650        )
2651        .await
2652        .expect("failed to create object");
2653        transaction.commit().await.expect("commit failed");
2654
2655        let object_id = {
2656            let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
2657            transaction.add(
2658                root_store.store_object_id(),
2659                Mutation::replace_or_insert_object(
2660                    ObjectKey::graveyard_attribute_entry(
2661                        root_store.graveyard_directory_object_id(),
2662                        handle.object_id(),
2663                        FSVERITY_MERKLE_ATTRIBUTE_ID,
2664                    ),
2665                    ObjectValue::Some,
2666                ),
2667            );
2668
2669            // This write should span three transactions. This test mimics the behavior when the
2670            // last transaction gets interrupted by a filesystem.close().
2671            handle
2672                .write_new_attr_in_batches(
2673                    &mut transaction,
2674                    FSVERITY_MERKLE_ATTRIBUTE_ID,
2675                    &vec![0; 2 * WRITE_ATTR_BATCH_SIZE],
2676                    WRITE_ATTR_BATCH_SIZE,
2677                )
2678                .await
2679                .expect("failed to write merkle attribute");
2680
2681            handle.object_id()
2682            // Drop the transaction to simulate interrupting the merkle tree creation as well as to
2683            // release the transaction locks.
2684        };
2685
2686        fs.close().await.expect("failed to close filesystem");
2687        let device = fs.take_device().await;
2688        device.reopen(false);
2689
2690        let fs =
2691            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
2692        fsck(fs.clone()).await.expect("fsck failed");
2693        fs.close().await.expect("failed to close filesystem");
2694        let device = fs.take_device().await;
2695        device.reopen(false);
2696
2697        // On open, the filesystem will call initial_reap which will call queue_tombstone().
2698        let fs = FxFilesystem::open(device).await.expect("open failed");
2699        let root_store = fs.root_store();
2700        let handle =
2701            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
2702                .await
2703                .expect("open_object failed");
2704        handle
2705            .enable_verity(fio::VerificationOptions {
2706                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2707                salt: Some(vec![]),
2708                ..Default::default()
2709            })
2710            .await
2711            .expect("set verified file metadata failed");
2712
2713        // `flush` will ensure that initial reap fully processes all the graveyard entries. This
2714        // isn't strictly necessary for the test to pass (the graveyard marker was already
2715        // processed during `enable_verity`), but it does help catch bugs, such as the attribute
2716        // graveyard entry not being removed upon processing.
2717        fs.graveyard().flush().await;
2718        assert_eq!(
2719            handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
2720            Some(vec![0; <Sha256 as Hasher>::Digest::DIGEST_LEN].into())
2721        );
2722        fsck(fs.clone()).await.expect("fsck failed");
2723        fs.close().await.expect("Close failed");
2724    }
2725
2726    #[fuchsia::test]
2727    async fn test_verify_data_corrupt_file() {
2728        let fs: OpenFxFilesystem = test_filesystem().await;
2729        let mut transaction = fs
2730            .clone()
2731            .new_transaction(lock_keys![], Options::default())
2732            .await
2733            .expect("new_transaction failed");
2734        let store = fs.root_store();
2735        let object = Arc::new(
2736            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2737                .await
2738                .expect("create_object failed"),
2739        );
2740
2741        transaction.commit().await.unwrap();
2742
2743        let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
2744        buf.as_mut_slice().fill(123);
2745        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2746
2747        object
2748            .enable_verity(fio::VerificationOptions {
2749                hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2750                salt: Some(vec![]),
2751                ..Default::default()
2752            })
2753            .await
2754            .expect("set verified file metadata failed");
2755
2756        // Change file contents and ensure verification fails
2757        buf.as_mut_slice().fill(234);
2758        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2759        object.read(0, buf.as_mut()).await.expect_err("verification during read should fail");
2760
2761        fs.close().await.expect("Close failed");
2762    }
2763
2764    #[fuchsia::test]
2765    async fn test_extend() {
2766        let fs = test_filesystem().await;
2767        let handle;
2768        let mut transaction = fs
2769            .clone()
2770            .new_transaction(lock_keys![], Options::default())
2771            .await
2772            .expect("new_transaction failed");
2773        let store = fs.root_store();
2774        handle =
2775            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2776                .await
2777                .expect("create_object failed");
2778
2779        // As of writing, an empty filesystem has two 512kiB superblock extents and a little over
2780        // 256kiB of additional allocations (journal, etc) so we start use a 'magic' starting point
2781        // of 2MiB here.
2782        const START_OFFSET: u64 = 2048 * 1024;
2783        handle
2784            .extend(&mut transaction, START_OFFSET..START_OFFSET + 5 * fs.block_size() as u64)
2785            .await
2786            .expect("extend failed");
2787        transaction.commit().await.expect("commit failed");
2788        let mut buf = handle.allocate_buffer(5 * fs.block_size() as usize).await;
2789        buf.as_mut_slice().fill(123);
2790        handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2791        buf.as_mut_slice().fill(67);
2792        handle.read(0, buf.as_mut()).await.expect("read failed");
2793        assert_eq!(buf.as_slice(), &vec![123; 5 * fs.block_size() as usize]);
2794        fs.close().await.expect("Close failed");
2795    }
2796
2797    #[fuchsia::test]
2798    async fn test_truncate_deallocates_old_extents() {
2799        let (fs, object) = test_filesystem_and_object().await;
2800        let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
2801        buf.as_mut_slice().fill(0xaa);
2802        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2803
2804        let allocator = fs.allocator();
2805        let allocated_before = allocator.get_allocated_bytes();
2806        object.truncate(fs.block_size() as u64).await.expect("truncate failed");
2807        let allocated_after = allocator.get_allocated_bytes();
2808        assert!(
2809            allocated_after < allocated_before,
2810            "before = {} after = {}",
2811            allocated_before,
2812            allocated_after
2813        );
2814        fs.close().await.expect("Close failed");
2815    }
2816
2817    #[fuchsia::test]
2818    async fn test_truncate_zeroes_tail_block() {
2819        let (fs, object) = test_filesystem_and_object().await;
2820
2821        WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + 3).await.expect("truncate failed");
2822        WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + TEST_DATA.len() as u64)
2823            .await
2824            .expect("truncate failed");
2825
2826        let mut buf = object.allocate_buffer(fs.block_size() as usize).await;
2827        let offset = (TEST_DATA_OFFSET % fs.block_size()) as usize;
2828        object.read(TEST_DATA_OFFSET - offset as u64, buf.as_mut()).await.expect("read failed");
2829
2830        let mut expected = TEST_DATA.to_vec();
2831        expected[3..].fill(0);
2832        assert_eq!(&buf.as_slice()[offset..offset + expected.len()], &expected);
2833    }
2834
2835    #[fuchsia::test]
2836    async fn test_trim() {
2837        // Format a new filesystem.
2838        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2839        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2840        let block_size = fs.block_size();
2841        root_volume(fs.clone())
2842            .await
2843            .expect("root_volume failed")
2844            .new_volume("test", NO_OWNER, None)
2845            .await
2846            .expect("volume failed");
2847        fs.close().await.expect("close failed");
2848        let device = fs.take_device().await;
2849        device.reopen(false);
2850
2851        // To test trim, we open the filesystem and set up a post commit hook that runs after every
2852        // transaction.  When the hook triggers, we can fsck the volume, take a snapshot of the
2853        // device and check that it gets replayed correctly on the snapshot.  We can check that the
2854        // graveyard trims the file as expected.
2855        #[derive(Default)]
2856        struct Context {
2857            store: Option<Arc<ObjectStore>>,
2858            object_id: Option<u64>,
2859        }
2860        let shared_context = Arc::new(Mutex::new(Context::default()));
2861
2862        let object_size = (TRANSACTION_MUTATION_THRESHOLD as u64 + 10) * 2 * block_size;
2863
2864        // Wait for an object to get tombstoned by the graveyard.
2865        async fn expect_tombstoned(store: &Arc<ObjectStore>, object_id: u64) {
2866            loop {
2867                if let Err(e) =
2868                    ObjectStore::open_object(store, object_id, HandleOptions::default(), None).await
2869                {
2870                    assert!(
2871                        FxfsError::NotFound.matches(&e),
2872                        "open_object didn't fail with NotFound: {:?}",
2873                        e
2874                    );
2875                    break;
2876                }
2877                // The graveyard should eventually tombstone the object.
2878                fasync::Timer::new(std::time::Duration::from_millis(100)).await;
2879            }
2880        }
2881
2882        // Checks to see if the object needs to be trimmed.
2883        async fn needs_trim(store: &Arc<ObjectStore>) -> Option<DataObjectHandle<ObjectStore>> {
2884            let root_directory = Directory::open(store, store.root_directory_object_id())
2885                .await
2886                .expect("open failed");
2887            let oid = root_directory.lookup("foo").await.expect("lookup failed");
2888            if let Some((oid, _)) = oid {
2889                let object = ObjectStore::open_object(store, oid, HandleOptions::default(), None)
2890                    .await
2891                    .expect("open_object failed");
2892                let props = object.get_properties().await.expect("get_properties failed");
2893                if props.allocated_size > 0 && props.data_attribute_size == 0 {
2894                    Some(object)
2895                } else {
2896                    None
2897                }
2898            } else {
2899                None
2900            }
2901        }
2902
2903        let shared_context_clone = shared_context.clone();
2904        let post_commit = move || {
2905            let store = shared_context_clone.lock().store.as_ref().cloned().unwrap();
2906            let shared_context = shared_context_clone.clone();
2907            async move {
2908                // First run fsck on the current filesystem.
2909                let options = FsckOptions {
2910                    fail_on_warning: true,
2911                    no_lock: true,
2912                    on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2913                    ..Default::default()
2914                };
2915                let fs = store.filesystem();
2916
2917                fsck_with_options(fs.clone(), &options).await.expect("fsck_with_options failed");
2918                fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
2919                    .await
2920                    .expect("fsck_volume_with_options failed");
2921
2922                // Now check that we can replay this correctly.
2923                fs.sync(SyncOptions { flush_device: true, ..Default::default() })
2924                    .await
2925                    .expect("sync failed");
2926                let device = fs.device().snapshot().expect("snapshot failed");
2927
2928                let object_id = shared_context.lock().object_id.clone();
2929
2930                let fs2 = FxFilesystemBuilder::new()
2931                    .skip_initial_reap(object_id.is_none())
2932                    .open(device)
2933                    .await
2934                    .expect("open failed");
2935
2936                // If the "foo" file exists check that allocated size matches content size.
2937                let root_vol = root_volume(fs2.clone()).await.expect("root_volume failed");
2938                let store = root_vol.volume("test", NO_OWNER, None).await.expect("volume failed");
2939
2940                if let Some(oid) = object_id {
2941                    // For the second pass, the object should get tombstoned.
2942                    expect_tombstoned(&store, oid).await;
2943                } else if let Some(object) = needs_trim(&store).await {
2944                    // Extend the file and make sure that it is correctly trimmed.
2945                    object.truncate(object_size).await.expect("truncate failed");
2946                    let mut buf = object.allocate_buffer(block_size as usize).await;
2947                    object
2948                        .read(object_size - block_size * 2, buf.as_mut())
2949                        .await
2950                        .expect("read failed");
2951                    assert_eq!(buf.as_slice(), &vec![0; block_size as usize]);
2952
2953                    // Remount, this time with the graveyard performing an initial reap and the
2954                    // object should get trimmed.
2955                    let fs = FxFilesystem::open(fs.device().snapshot().expect("snapshot failed"))
2956                        .await
2957                        .expect("open failed");
2958                    let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2959                    let store =
2960                        root_vol.volume("test", NO_OWNER, None).await.expect("volume failed");
2961                    while needs_trim(&store).await.is_some() {
2962                        // The object has been truncated, but still has some data allocated to
2963                        // it.  The graveyard should trim the object eventually.
2964                        fasync::Timer::new(std::time::Duration::from_millis(100)).await;
2965                    }
2966
2967                    // Run fsck.
2968                    fsck_with_options(fs.clone(), &options)
2969                        .await
2970                        .expect("fsck_with_options failed");
2971                    fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
2972                        .await
2973                        .expect("fsck_volume_with_options failed");
2974                    fs.close().await.expect("close failed");
2975                }
2976
2977                // Run fsck on fs2.
2978                fsck_with_options(fs2.clone(), &options).await.expect("fsck_with_options failed");
2979                fsck_volume_with_options(fs2.as_ref(), &options, store.store_object_id(), None)
2980                    .await
2981                    .expect("fsck_volume_with_options failed");
2982                fs2.close().await.expect("close failed");
2983            }
2984            .boxed()
2985        };
2986
2987        let fs = FxFilesystemBuilder::new()
2988            .post_commit_hook(post_commit)
2989            .open(device)
2990            .await
2991            .expect("open failed");
2992
2993        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2994        let store = root_vol.volume("test", NO_OWNER, None).await.expect("volume failed");
2995
2996        shared_context.lock().store = Some(store.clone());
2997
2998        let root_directory =
2999            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3000
3001        let object;
3002        let mut transaction = fs
3003            .clone()
3004            .new_transaction(
3005                lock_keys![LockKey::object(
3006                    store.store_object_id(),
3007                    store.root_directory_object_id()
3008                )],
3009                Options::default(),
3010            )
3011            .await
3012            .expect("new_transaction failed");
3013        object = root_directory
3014            .create_child_file(&mut transaction, "foo")
3015            .await
3016            .expect("create_object failed");
3017        transaction.commit().await.expect("commit failed");
3018
3019        let mut transaction = fs
3020            .clone()
3021            .new_transaction(
3022                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3023                Options::default(),
3024            )
3025            .await
3026            .expect("new_transaction failed");
3027
3028        // Two passes: first with a regular object, and then with that object moved into the
3029        // graveyard.
3030        let mut pass = 0;
3031        loop {
3032            // Create enough extents in it such that when we truncate the object it will require
3033            // more than one transaction.
3034            let mut buf = object.allocate_buffer(5).await;
3035            buf.as_mut_slice().fill(1);
3036            // Write every other block.
3037            for offset in (0..object_size).into_iter().step_by(2 * block_size as usize) {
3038                object
3039                    .txn_write(&mut transaction, offset, buf.as_ref())
3040                    .await
3041                    .expect("write failed");
3042            }
3043            transaction.commit().await.expect("commit failed");
3044            // This should take up more than one transaction.
3045            WriteObjectHandle::truncate(&object, 0).await.expect("truncate failed");
3046
3047            if pass == 1 {
3048                break;
3049            }
3050
3051            // Store the object ID so that we can make sure the object is always tombstoned
3052            // after remount (see above).
3053            shared_context.lock().object_id = Some(object.object_id());
3054
3055            transaction = fs
3056                .clone()
3057                .new_transaction(
3058                    lock_keys![
3059                        LockKey::object(store.store_object_id(), store.root_directory_object_id()),
3060                        LockKey::object(store.store_object_id(), object.object_id()),
3061                    ],
3062                    Options::default(),
3063                )
3064                .await
3065                .expect("new_transaction failed");
3066
3067            // Move the object into the graveyard.
3068            replace_child(&mut transaction, None, (&root_directory, "foo"))
3069                .await
3070                .expect("replace_child failed");
3071            store.add_to_graveyard(&mut transaction, object.object_id());
3072
3073            pass += 1;
3074        }
3075
3076        fs.close().await.expect("Close failed");
3077    }
3078
3079    #[fuchsia::test]
3080    async fn test_adjust_refs() {
3081        let (fs, object) = test_filesystem_and_object().await;
3082        let store = object.owner();
3083        let mut transaction = fs
3084            .clone()
3085            .new_transaction(
3086                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3087                Options::default(),
3088            )
3089            .await
3090            .expect("new_transaction failed");
3091        assert_eq!(
3092            store
3093                .adjust_refs(&mut transaction, object.object_id(), 1)
3094                .await
3095                .expect("adjust_refs failed"),
3096            false
3097        );
3098        transaction.commit().await.expect("commit failed");
3099
3100        let allocator = fs.allocator();
3101        let allocated_before = allocator.get_allocated_bytes();
3102        let mut transaction = fs
3103            .clone()
3104            .new_transaction(
3105                lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3106                Options::default(),
3107            )
3108            .await
3109            .expect("new_transaction failed");
3110        assert_eq!(
3111            store
3112                .adjust_refs(&mut transaction, object.object_id(), -2)
3113                .await
3114                .expect("adjust_refs failed"),
3115            true
3116        );
3117        transaction.commit().await.expect("commit failed");
3118
3119        assert_eq!(allocator.get_allocated_bytes(), allocated_before);
3120
3121        store
3122            .tombstone_object(
3123                object.object_id(),
3124                Options { borrow_metadata_space: true, ..Default::default() },
3125            )
3126            .await
3127            .expect("purge failed");
3128
3129        assert_eq!(allocated_before - allocator.get_allocated_bytes(), fs.block_size() as u64);
3130
3131        // We need to remove the directory entry, too, otherwise fsck will complain
3132        {
3133            let mut transaction = fs
3134                .clone()
3135                .new_transaction(
3136                    lock_keys![LockKey::object(
3137                        store.store_object_id(),
3138                        store.root_directory_object_id()
3139                    )],
3140                    Options::default(),
3141                )
3142                .await
3143                .expect("new_transaction failed");
3144            let root_directory = Directory::open(&store, store.root_directory_object_id())
3145                .await
3146                .expect("open failed");
3147            transaction.add(
3148                store.store_object_id(),
3149                Mutation::replace_or_insert_object(
3150                    ObjectKey::child(root_directory.object_id(), TEST_OBJECT_NAME, false),
3151                    ObjectValue::None,
3152                ),
3153            );
3154            transaction.commit().await.expect("commit failed");
3155        }
3156
3157        fsck_with_options(
3158            fs.clone(),
3159            &FsckOptions {
3160                fail_on_warning: true,
3161                on_error: Box::new(|err| println!("fsck error: {:?}", err)),
3162                ..Default::default()
3163            },
3164        )
3165        .await
3166        .expect("fsck_with_options failed");
3167
3168        fs.close().await.expect("Close failed");
3169    }
3170
3171    #[fuchsia::test]
3172    async fn test_locks() {
3173        let (fs, object) = test_filesystem_and_object().await;
3174        let (send1, recv1) = channel();
3175        let (send2, recv2) = channel();
3176        let (send3, recv3) = channel();
3177        let done = Mutex::new(false);
3178        let mut futures = FuturesUnordered::new();
3179        futures.push(
3180            async {
3181                let mut t = object.new_transaction().await.expect("new_transaction failed");
3182                send1.send(()).unwrap(); // Tell the next future to continue.
3183                send3.send(()).unwrap(); // Tell the last future to continue.
3184                recv2.await.unwrap();
3185                let mut buf = object.allocate_buffer(5).await;
3186                buf.as_mut_slice().copy_from_slice(b"hello");
3187                object.txn_write(&mut t, 0, buf.as_ref()).await.expect("write failed");
3188                // This is a halting problem so all we can do is sleep.
3189                fasync::Timer::new(Duration::from_millis(100)).await;
3190                assert!(!*done.lock());
3191                t.commit().await.expect("commit failed");
3192            }
3193            .boxed(),
3194        );
3195        futures.push(
3196            async {
3197                recv1.await.unwrap();
3198                // Reads should not block.
3199                let offset = TEST_DATA_OFFSET as usize;
3200                let align = offset % fs.block_size() as usize;
3201                let len = TEST_DATA.len();
3202                let mut buf = object.allocate_buffer(align + len).await;
3203                assert_eq!(
3204                    object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
3205                    align + TEST_DATA.len()
3206                );
3207                assert_eq!(&buf.as_slice()[align..], TEST_DATA);
3208                // Tell the first future to continue.
3209                send2.send(()).unwrap();
3210            }
3211            .boxed(),
3212        );
3213        futures.push(
3214            async {
3215                // This should block until the first future has completed.
3216                recv3.await.unwrap();
3217                let _t = object.new_transaction().await.expect("new_transaction failed");
3218                let mut buf = object.allocate_buffer(5).await;
3219                assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 5);
3220                assert_eq!(buf.as_slice(), b"hello");
3221            }
3222            .boxed(),
3223        );
3224        while let Some(()) = futures.next().await {}
3225        fs.close().await.expect("Close failed");
3226    }
3227
3228    #[fuchsia::test(threads = 10)]
3229    async fn test_racy_reads() {
3230        let fs = test_filesystem().await;
3231        let object;
3232        let mut transaction = fs
3233            .clone()
3234            .new_transaction(lock_keys![], Options::default())
3235            .await
3236            .expect("new_transaction failed");
3237        let store = fs.root_store();
3238        object = Arc::new(
3239            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3240                .await
3241                .expect("create_object failed"),
3242        );
3243        transaction.commit().await.expect("commit failed");
3244        for _ in 0..100 {
3245            let cloned_object = object.clone();
3246            let writer = fasync::Task::spawn(async move {
3247                let mut buf = cloned_object.allocate_buffer(10).await;
3248                buf.as_mut_slice().fill(123);
3249                cloned_object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3250            });
3251            let cloned_object = object.clone();
3252            let reader = fasync::Task::spawn(async move {
3253                let wait_time = rand::thread_rng().gen_range(0..5);
3254                fasync::Timer::new(Duration::from_millis(wait_time)).await;
3255                let mut buf = cloned_object.allocate_buffer(10).await;
3256                buf.as_mut_slice().fill(23);
3257                let amount = cloned_object.read(0, buf.as_mut()).await.expect("write failed");
3258                // If we succeed in reading data, it must include the write; i.e. if we see the size
3259                // change, we should see the data too.  For this to succeed it requires locking on
3260                // the read size to ensure that when we read the size, we get the extents changed in
3261                // that same transaction.
3262                if amount != 0 {
3263                    assert_eq!(amount, 10);
3264                    assert_eq!(buf.as_slice(), &[123; 10]);
3265                }
3266            });
3267            writer.await;
3268            reader.await;
3269            object.truncate(0).await.expect("truncate failed");
3270        }
3271        fs.close().await.expect("Close failed");
3272    }
3273
3274    #[fuchsia::test]
3275    async fn test_allocated_size() {
3276        let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
3277
3278        let before = object.get_properties().await.expect("get_properties failed").allocated_size;
3279        let mut buf = object.allocate_buffer(5).await;
3280        buf.as_mut_slice().copy_from_slice(b"hello");
3281        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3282        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3283        assert_eq!(after, before + fs.block_size() as u64);
3284
3285        // Do the same write again and there should be no change.
3286        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3287        assert_eq!(
3288            object.get_properties().await.expect("get_properties failed").allocated_size,
3289            after
3290        );
3291
3292        // extend...
3293        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3294        let offset = 1000 * fs.block_size() as u64;
3295        let before = after;
3296        object
3297            .extend(&mut transaction, offset..offset + fs.block_size() as u64)
3298            .await
3299            .expect("extend failed");
3300        transaction.commit().await.expect("commit failed");
3301        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3302        assert_eq!(after, before + fs.block_size() as u64);
3303
3304        // truncate...
3305        let before = after;
3306        let size = object.get_size();
3307        object.truncate(size - fs.block_size() as u64).await.expect("extend failed");
3308        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3309        assert_eq!(after, before - fs.block_size() as u64);
3310
3311        // preallocate_range...
3312        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3313        let before = after;
3314        let mut file_range = offset..offset + fs.block_size() as u64;
3315        object.preallocate_range(&mut transaction, &mut file_range).await.expect("extend failed");
3316        transaction.commit().await.expect("commit failed");
3317        let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3318        assert_eq!(after, before + fs.block_size() as u64);
3319        fs.close().await.expect("Close failed");
3320    }
3321
3322    #[fuchsia::test(threads = 10)]
3323    async fn test_zero() {
3324        let (fs, object) = test_filesystem_and_object().await;
3325        let expected_size = object.get_size();
3326        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3327        object.zero(&mut transaction, 0..fs.block_size() as u64 * 10).await.expect("zero failed");
3328        transaction.commit().await.expect("commit failed");
3329        assert_eq!(object.get_size(), expected_size);
3330        let mut buf = object.allocate_buffer(fs.block_size() as usize * 10).await;
3331        assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed") as u64, expected_size);
3332        assert_eq!(
3333            &buf.as_slice()[0..expected_size as usize],
3334            vec![0u8; expected_size as usize].as_slice()
3335        );
3336        fs.close().await.expect("Close failed");
3337    }
3338
3339    #[fuchsia::test]
3340    async fn test_properties() {
3341        let (fs, object) = test_filesystem_and_object().await;
3342        const CRTIME: Timestamp = Timestamp::from_nanos(1234);
3343        const MTIME: Timestamp = Timestamp::from_nanos(5678);
3344        const CTIME: Timestamp = Timestamp::from_nanos(8765);
3345
3346        // ObjectProperties can be updated through `update_attributes`.
3347        // `get_properties` should reflect the latest changes.
3348        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3349        object
3350            .update_attributes(
3351                &mut transaction,
3352                Some(&fio::MutableNodeAttributes {
3353                    creation_time: Some(CRTIME.as_nanos()),
3354                    modification_time: Some(MTIME.as_nanos()),
3355                    mode: Some(111),
3356                    gid: Some(222),
3357                    ..Default::default()
3358                }),
3359                None,
3360            )
3361            .await
3362            .expect("update_attributes failed");
3363        const MTIME_NEW: Timestamp = Timestamp::from_nanos(12345678);
3364        object
3365            .update_attributes(
3366                &mut transaction,
3367                Some(&fio::MutableNodeAttributes {
3368                    modification_time: Some(MTIME_NEW.as_nanos()),
3369                    gid: Some(333),
3370                    rdev: Some(444),
3371                    ..Default::default()
3372                }),
3373                Some(CTIME),
3374            )
3375            .await
3376            .expect("update_timestamps failed");
3377        transaction.commit().await.expect("commit failed");
3378
3379        let properties = object.get_properties().await.expect("get_properties failed");
3380        assert_matches!(
3381            properties,
3382            ObjectProperties {
3383                refs: 1u64,
3384                allocated_size: TEST_OBJECT_ALLOCATED_SIZE,
3385                data_attribute_size: TEST_OBJECT_SIZE,
3386                creation_time: CRTIME,
3387                modification_time: MTIME_NEW,
3388                posix_attributes: Some(PosixAttributes { mode: 111, gid: 333, rdev: 444, .. }),
3389                change_time: CTIME,
3390                ..
3391            }
3392        );
3393        fs.close().await.expect("Close failed");
3394    }
3395
3396    #[fuchsia::test]
3397    async fn test_is_allocated() {
3398        let (fs, object) = test_filesystem_and_object().await;
3399
3400        // `test_filesystem_and_object()` wrote the buffer `TEST_DATA` to the device at offset
3401        // `TEST_DATA_OFFSET` where the length and offset are aligned to the block size.
3402        let aligned_offset = round_down(TEST_DATA_OFFSET, fs.block_size());
3403        let aligned_length = round_up(TEST_DATA.len() as u64, fs.block_size()).unwrap();
3404
3405        // Check for the case where where we have the following extent layout
3406        //       [ unallocated ][ `TEST_DATA` ]
3407        // The extents before `aligned_offset` should not be allocated
3408        let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3409        assert_eq!(count, aligned_offset);
3410        assert_eq!(allocated, false);
3411
3412        let (allocated, count) =
3413            object.is_allocated(aligned_offset).await.expect("is_allocated failed");
3414        assert_eq!(count, aligned_length);
3415        assert_eq!(allocated, true);
3416
3417        // Check for the case where where we query out of range
3418        let end = aligned_offset + aligned_length;
3419        object
3420            .is_allocated(end)
3421            .await
3422            .expect_err("is_allocated should have returned ERR_OUT_OF_RANGE");
3423
3424        // Check for the case where where we start querying for allocation starting from
3425        // an allocated range to the end of the device
3426        let size = 50 * fs.block_size() as u64;
3427        object.truncate(size).await.expect("extend failed");
3428
3429        let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3430        assert_eq!(count, size - end);
3431        assert_eq!(allocated, false);
3432
3433        // Check for the case where where we have the following extent layout
3434        //      [ unallocated ][ `buf` ][ `buf` ]
3435        let buf_length = 5 * fs.block_size();
3436        let mut buf = object.allocate_buffer(buf_length as usize).await;
3437        buf.as_mut_slice().fill(123);
3438        let new_offset = end + 20 * fs.block_size() as u64;
3439        object.write_or_append(Some(new_offset), buf.as_ref()).await.expect("write failed");
3440        object
3441            .write_or_append(Some(new_offset + buf_length), buf.as_ref())
3442            .await
3443            .expect("write failed");
3444
3445        let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3446        assert_eq!(count, new_offset - end);
3447        assert_eq!(allocated, false);
3448
3449        let (allocated, count) =
3450            object.is_allocated(new_offset).await.expect("is_allocated failed");
3451        assert_eq!(count, 2 * buf_length);
3452        assert_eq!(allocated, true);
3453
3454        // Check the case where we query from the middle of an extent
3455        let (allocated, count) = object
3456            .is_allocated(new_offset + 4 * fs.block_size())
3457            .await
3458            .expect("is_allocated failed");
3459        assert_eq!(count, 2 * buf_length - 4 * fs.block_size());
3460        assert_eq!(allocated, true);
3461
3462        // Now, write buffer to a location already written to.
3463        // Check for the case when we the following extent layout
3464        //      [ unallocated ][ `other_buf` ][ (part of) `buf` ][ `buf` ]
3465        let other_buf_length = 3 * fs.block_size();
3466        let mut other_buf = object.allocate_buffer(other_buf_length as usize).await;
3467        other_buf.as_mut_slice().fill(231);
3468        object.write_or_append(Some(new_offset), other_buf.as_ref()).await.expect("write failed");
3469
3470        // We still expect that `is_allocated(..)` will return that  there are 2*`buf_length bytes`
3471        // allocated from `new_offset`
3472        let (allocated, count) =
3473            object.is_allocated(new_offset).await.expect("is_allocated failed");
3474        assert_eq!(count, 2 * buf_length);
3475        assert_eq!(allocated, true);
3476
3477        // Check for the case when we the following extent layout
3478        //   [ unallocated ][ deleted ][ unallocated ][ deleted ][ allocated ]
3479        // Mark TEST_DATA as deleted
3480        let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3481        object
3482            .zero(&mut transaction, aligned_offset..aligned_offset + aligned_length)
3483            .await
3484            .expect("zero failed");
3485        // Mark `other_buf` as deleted
3486        object
3487            .zero(&mut transaction, new_offset..new_offset + buf_length)
3488            .await
3489            .expect("zero failed");
3490        transaction.commit().await.expect("commit transaction failed");
3491
3492        let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3493        assert_eq!(count, new_offset + buf_length);
3494        assert_eq!(allocated, false);
3495
3496        let (allocated, count) =
3497            object.is_allocated(new_offset + buf_length).await.expect("is_allocated failed");
3498        assert_eq!(count, buf_length);
3499        assert_eq!(allocated, true);
3500
3501        let new_end = new_offset + buf_length + count;
3502
3503        // Check for the case where there are objects with different keys.
3504        // Case that we're checking for:
3505        //      [ unallocated ][ extent (object with different key) ][ unallocated ]
3506        let store = object.owner();
3507        let mut transaction = fs
3508            .clone()
3509            .new_transaction(lock_keys![], Options::default())
3510            .await
3511            .expect("new_transaction failed");
3512        let object2 =
3513            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3514                .await
3515                .expect("create_object failed");
3516        transaction.commit().await.expect("commit failed");
3517
3518        object2
3519            .write_or_append(Some(new_end + fs.block_size()), buf.as_ref())
3520            .await
3521            .expect("write failed");
3522
3523        // Expecting that the extent with a different key is treated like unallocated extent
3524        let (allocated, count) = object.is_allocated(new_end).await.expect("is_allocated failed");
3525        assert_eq!(count, size - new_end);
3526        assert_eq!(allocated, false);
3527
3528        fs.close().await.expect("close failed");
3529    }
3530
3531    #[fuchsia::test(threads = 10)]
3532    async fn test_read_write_attr() {
3533        let (_fs, object) = test_filesystem_and_object().await;
3534        let data = [0xffu8; 16_384];
3535        object.write_attr(20, &data).await.expect("write_attr failed");
3536        let rdata =
3537            object.read_attr(20).await.expect("read_attr failed").expect("no attribute data found");
3538        assert_eq!(&data[..], &rdata[..]);
3539
3540        assert_eq!(object.read_attr(21).await.expect("read_attr failed"), None);
3541    }
3542
3543    #[fuchsia::test(threads = 10)]
3544    async fn test_allocate_basic() {
3545        let (fs, object) = test_filesystem_and_empty_object().await;
3546        let block_size = fs.block_size();
3547        let file_size = block_size * 10;
3548        object.truncate(file_size).await.unwrap();
3549
3550        let small_buf_size = 1024;
3551        let large_buf_aligned_size = block_size as usize * 2;
3552        let large_buf_size = block_size as usize * 2 + 1024;
3553
3554        let mut small_buf = object.allocate_buffer(small_buf_size).await;
3555        let mut large_buf_aligned = object.allocate_buffer(large_buf_aligned_size).await;
3556        let mut large_buf = object.allocate_buffer(large_buf_size).await;
3557
3558        assert_eq!(object.read(0, small_buf.as_mut()).await.unwrap(), small_buf_size);
3559        assert_eq!(small_buf.as_slice(), &vec![0; small_buf_size]);
3560        assert_eq!(object.read(0, large_buf.as_mut()).await.unwrap(), large_buf_size);
3561        assert_eq!(large_buf.as_slice(), &vec![0; large_buf_size]);
3562        assert_eq!(
3563            object.read(0, large_buf_aligned.as_mut()).await.unwrap(),
3564            large_buf_aligned_size
3565        );
3566        assert_eq!(large_buf_aligned.as_slice(), &vec![0; large_buf_aligned_size]);
3567
3568        // Allocation succeeds, and without any writes to the location it shows up as zero.
3569        object.allocate(block_size..block_size * 3).await.unwrap();
3570
3571        // Test starting before, inside, and after the allocated section with every sized buffer.
3572        for (buf_index, buf) in [small_buf, large_buf, large_buf_aligned].iter_mut().enumerate() {
3573            for offset in 0..4 {
3574                assert_eq!(
3575                    object.read(block_size * offset, buf.as_mut()).await.unwrap(),
3576                    buf.len(),
3577                    "buf_index: {}, read offset: {}",
3578                    buf_index,
3579                    offset,
3580                );
3581                assert_eq!(
3582                    buf.as_slice(),
3583                    &vec![0; buf.len()],
3584                    "buf_index: {}, read offset: {}",
3585                    buf_index,
3586                    offset,
3587                );
3588            }
3589        }
3590
3591        fs.close().await.expect("close failed");
3592    }
3593
3594    #[fuchsia::test(threads = 10)]
3595    async fn test_allocate_extends_file() {
3596        const BUF_SIZE: usize = 1024;
3597        let (fs, object) = test_filesystem_and_empty_object().await;
3598        let mut buf = object.allocate_buffer(BUF_SIZE).await;
3599        let block_size = fs.block_size();
3600
3601        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
3602        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3603
3604        assert!(TEST_OBJECT_SIZE < block_size * 4);
3605        // Allocation succeeds, and without any writes to the location it shows up as zero.
3606        object.allocate(0..block_size * 4).await.unwrap();
3607        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
3608        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3609        assert_eq!(object.read(block_size, buf.as_mut()).await.unwrap(), buf.len());
3610        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3611        assert_eq!(object.read(block_size * 3, buf.as_mut()).await.unwrap(), buf.len());
3612        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3613
3614        fs.close().await.expect("close failed");
3615    }
3616
3617    #[fuchsia::test(threads = 10)]
3618    async fn test_allocate_past_end() {
3619        const BUF_SIZE: usize = 1024;
3620        let (fs, object) = test_filesystem_and_empty_object().await;
3621        let mut buf = object.allocate_buffer(BUF_SIZE).await;
3622        let block_size = fs.block_size();
3623
3624        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
3625        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3626
3627        assert!(TEST_OBJECT_SIZE < block_size * 4);
3628        // Allocation succeeds, and without any writes to the location it shows up as zero.
3629        object.allocate(block_size * 4..block_size * 6).await.unwrap();
3630        assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
3631        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3632        assert_eq!(object.read(block_size * 4, buf.as_mut()).await.unwrap(), buf.len());
3633        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3634        assert_eq!(object.read(block_size * 5, buf.as_mut()).await.unwrap(), buf.len());
3635        assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3636
3637        fs.close().await.expect("close failed");
3638    }
3639
3640    #[fuchsia::test(threads = 10)]
3641    async fn test_allocate_read_attr() {
3642        let (fs, object) = test_filesystem_and_empty_object().await;
3643        let block_size = fs.block_size();
3644        let file_size = block_size * 4;
3645        object.truncate(file_size).await.unwrap();
3646
3647        let content = object
3648            .read_attr(object.attribute_id())
3649            .await
3650            .expect("failed to read attr")
3651            .expect("attr returned none");
3652        assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
3653
3654        object.allocate(block_size..block_size * 3).await.unwrap();
3655
3656        let content = object
3657            .read_attr(object.attribute_id())
3658            .await
3659            .expect("failed to read attr")
3660            .expect("attr returned none");
3661        assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
3662
3663        fs.close().await.expect("close failed");
3664    }
3665
3666    #[fuchsia::test(threads = 10)]
3667    async fn test_allocate_existing_data() {
3668        struct Case {
3669            written_ranges: Vec<Range<usize>>,
3670            allocate_range: Range<u64>,
3671        }
3672        let cases = [
3673            Case { written_ranges: vec![4..7], allocate_range: 4..7 },
3674            Case { written_ranges: vec![4..7], allocate_range: 3..8 },
3675            Case { written_ranges: vec![4..7], allocate_range: 5..6 },
3676            Case { written_ranges: vec![4..7], allocate_range: 5..8 },
3677            Case { written_ranges: vec![4..7], allocate_range: 3..5 },
3678            Case { written_ranges: vec![0..1, 2..3, 4..5, 6..7, 8..9], allocate_range: 0..10 },
3679            Case { written_ranges: vec![0..2, 4..6, 7..10], allocate_range: 1..8 },
3680        ];
3681
3682        for case in cases {
3683            let (fs, object) = test_filesystem_and_empty_object().await;
3684            let block_size = fs.block_size();
3685            let file_size = block_size * 10;
3686            object.truncate(file_size).await.unwrap();
3687
3688            for write in &case.written_ranges {
3689                let write_len = (write.end - write.start) * block_size as usize;
3690                let mut write_buf = object.allocate_buffer(write_len).await;
3691                write_buf.as_mut_slice().fill(0xff);
3692                assert_eq!(
3693                    object
3694                        .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
3695                        .await
3696                        .unwrap(),
3697                    file_size
3698                );
3699            }
3700
3701            let mut expected_buf = object.allocate_buffer(file_size as usize).await;
3702            assert_eq!(object.read(0, expected_buf.as_mut()).await.unwrap(), expected_buf.len());
3703
3704            object
3705                .allocate(
3706                    case.allocate_range.start * block_size..case.allocate_range.end * block_size,
3707                )
3708                .await
3709                .unwrap();
3710
3711            let mut read_buf = object.allocate_buffer(file_size as usize).await;
3712            assert_eq!(object.read(0, read_buf.as_mut()).await.unwrap(), read_buf.len());
3713            assert_eq!(read_buf.as_slice(), expected_buf.as_slice());
3714
3715            fs.close().await.expect("close failed");
3716        }
3717    }
3718
3719    async fn get_modes(
3720        obj: &DataObjectHandle<ObjectStore>,
3721        mut search_range: Range<u64>,
3722    ) -> Vec<(Range<u64>, ExtentMode)> {
3723        let mut modes = Vec::new();
3724        let store = obj.store();
3725        let tree = store.tree();
3726        let layer_set = tree.layer_set();
3727        let mut merger = layer_set.merger();
3728        let mut iter = merger
3729            .query(Query::FullRange(&ObjectKey::attribute(
3730                obj.object_id(),
3731                0,
3732                AttributeKey::Extent(ExtentKey::search_key_from_offset(search_range.start)),
3733            )))
3734            .await
3735            .unwrap();
3736        loop {
3737            match iter.get() {
3738                Some(ItemRef {
3739                    key:
3740                        ObjectKey {
3741                            object_id,
3742                            data:
3743                                ObjectKeyData::Attribute(
3744                                    attribute_id,
3745                                    AttributeKey::Extent(ExtentKey { range }),
3746                                ),
3747                        },
3748                    value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
3749                    ..
3750                }) if *object_id == obj.object_id() && *attribute_id == 0 => {
3751                    if search_range.end <= range.start {
3752                        break;
3753                    }
3754                    let found_range = std::cmp::max(search_range.start, range.start)
3755                        ..std::cmp::min(search_range.end, range.end);
3756                    search_range.start = found_range.end;
3757                    modes.push((found_range, mode.clone()));
3758                    if search_range.start == search_range.end {
3759                        break;
3760                    }
3761                    iter.advance().await.unwrap();
3762                }
3763                x => panic!("looking for extent record, found this {:?}", x),
3764            }
3765        }
3766        modes
3767    }
3768
3769    async fn assert_all_overwrite(
3770        obj: &DataObjectHandle<ObjectStore>,
3771        mut search_range: Range<u64>,
3772    ) {
3773        let modes = get_modes(obj, search_range.clone()).await;
3774        for mode in modes {
3775            assert_eq!(
3776                mode.0.start, search_range.start,
3777                "missing mode in range {}..{}",
3778                search_range.start, mode.0.start
3779            );
3780            match mode.1 {
3781                ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => (),
3782                m => panic!("mode at range {:?} was not overwrite, instead found {:?}", mode.0, m),
3783            }
3784            assert!(
3785                mode.0.end <= search_range.end,
3786                "mode ends beyond search range (bug in test) - search_range: {:?}, mode: {:?}",
3787                search_range,
3788                mode,
3789            );
3790            search_range.start = mode.0.end;
3791        }
3792        assert_eq!(
3793            search_range.start, search_range.end,
3794            "missing mode in range {:?}",
3795            search_range
3796        );
3797    }
3798
3799    #[fuchsia::test(threads = 10)]
3800    async fn test_multi_overwrite() {
3801        #[derive(Debug)]
3802        struct Case {
3803            pre_writes: Vec<Range<usize>>,
3804            allocate_ranges: Vec<Range<u64>>,
3805            overwrites: Vec<Vec<Range<u64>>>,
3806        }
3807        let cases = [
3808            Case {
3809                pre_writes: Vec::new(),
3810                allocate_ranges: vec![1..3],
3811                overwrites: vec![vec![1..3]],
3812            },
3813            Case {
3814                pre_writes: Vec::new(),
3815                allocate_ranges: vec![0..1, 1..2, 2..3, 3..4],
3816                overwrites: vec![vec![0..4]],
3817            },
3818            Case {
3819                pre_writes: Vec::new(),
3820                allocate_ranges: vec![0..4],
3821                overwrites: vec![vec![0..1], vec![1..2], vec![3..4]],
3822            },
3823            Case {
3824                pre_writes: Vec::new(),
3825                allocate_ranges: vec![0..4],
3826                overwrites: vec![vec![3..4]],
3827            },
3828            Case {
3829                pre_writes: Vec::new(),
3830                allocate_ranges: vec![0..4],
3831                overwrites: vec![vec![3..4], vec![2..3], vec![1..2]],
3832            },
3833            Case {
3834                pre_writes: Vec::new(),
3835                allocate_ranges: vec![1..2, 5..6, 7..8],
3836                overwrites: vec![vec![5..6]],
3837            },
3838            Case {
3839                pre_writes: Vec::new(),
3840                allocate_ranges: vec![1..3],
3841                overwrites: vec![
3842                    vec![1..3],
3843                    vec![1..3],
3844                    vec![1..3],
3845                    vec![1..3],
3846                    vec![1..3],
3847                    vec![1..3],
3848                    vec![1..3],
3849                    vec![1..3],
3850                ],
3851            },
3852            Case {
3853                pre_writes: Vec::new(),
3854                allocate_ranges: vec![0..5],
3855                overwrites: vec![
3856                    vec![1..3],
3857                    vec![1..3],
3858                    vec![1..3],
3859                    vec![1..3],
3860                    vec![1..3],
3861                    vec![1..3],
3862                    vec![1..3],
3863                    vec![1..3],
3864                ],
3865            },
3866            Case {
3867                pre_writes: Vec::new(),
3868                allocate_ranges: vec![0..5],
3869                overwrites: vec![vec![0..2, 2..4, 4..5]],
3870            },
3871            Case {
3872                pre_writes: Vec::new(),
3873                allocate_ranges: vec![0..5, 5..10],
3874                overwrites: vec![vec![1..2, 2..3, 4..7, 7..8]],
3875            },
3876            Case {
3877                pre_writes: Vec::new(),
3878                allocate_ranges: vec![0..4, 6..10],
3879                overwrites: vec![vec![2..3, 7..9]],
3880            },
3881            Case {
3882                pre_writes: Vec::new(),
3883                allocate_ranges: vec![0..10],
3884                overwrites: vec![vec![1..2, 5..10], vec![0..1, 5..10], vec![0..5, 5..10]],
3885            },
3886            Case {
3887                pre_writes: Vec::new(),
3888                allocate_ranges: vec![0..10],
3889                overwrites: vec![vec![0..2, 2..4, 4..6, 6..8, 8..10], vec![0..5, 5..10]],
3890            },
3891            Case {
3892                pre_writes: vec![1..3],
3893                allocate_ranges: vec![1..3],
3894                overwrites: vec![vec![1..3]],
3895            },
3896            Case {
3897                pre_writes: vec![1..3],
3898                allocate_ranges: vec![4..6],
3899                overwrites: vec![vec![5..6]],
3900            },
3901            Case {
3902                pre_writes: vec![1..3],
3903                allocate_ranges: vec![0..4],
3904                overwrites: vec![vec![0..4]],
3905            },
3906            Case {
3907                pre_writes: vec![1..3],
3908                allocate_ranges: vec![2..4],
3909                overwrites: vec![vec![2..4]],
3910            },
3911            Case {
3912                pre_writes: vec![3..5],
3913                allocate_ranges: vec![1..3, 6..7],
3914                overwrites: vec![vec![1..3, 6..7]],
3915            },
3916            Case {
3917                pre_writes: vec![1..3, 5..7, 8..9],
3918                allocate_ranges: vec![0..5],
3919                overwrites: vec![vec![0..2, 2..5], vec![0..5]],
3920            },
3921            Case {
3922                pre_writes: Vec::new(),
3923                allocate_ranges: vec![0..10, 4..6],
3924                overwrites: Vec::new(),
3925            },
3926            Case {
3927                pre_writes: Vec::new(),
3928                allocate_ranges: vec![3..8, 5..10],
3929                overwrites: Vec::new(),
3930            },
3931            Case {
3932                pre_writes: Vec::new(),
3933                allocate_ranges: vec![5..10, 3..8],
3934                overwrites: Vec::new(),
3935            },
3936        ];
3937
3938        for (i, case) in cases.into_iter().enumerate() {
3939            log::info!("running case {} - {:?}", i, case);
3940            let (fs, object) = test_filesystem_and_empty_object().await;
3941            let block_size = fs.block_size();
3942            let file_size = block_size * 10;
3943            object.truncate(file_size).await.unwrap();
3944
3945            for write in case.pre_writes {
3946                let write_len = (write.end - write.start) * block_size as usize;
3947                let mut write_buf = object.allocate_buffer(write_len).await;
3948                write_buf.as_mut_slice().fill(0xff);
3949                assert_eq!(
3950                    object
3951                        .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
3952                        .await
3953                        .unwrap(),
3954                    file_size
3955                );
3956            }
3957
3958            for allocate_range in &case.allocate_ranges {
3959                object
3960                    .allocate(allocate_range.start * block_size..allocate_range.end * block_size)
3961                    .await
3962                    .unwrap();
3963            }
3964
3965            for allocate_range in case.allocate_ranges {
3966                assert_all_overwrite(
3967                    &object,
3968                    allocate_range.start * block_size..allocate_range.end * block_size,
3969                )
3970                .await;
3971            }
3972
3973            for overwrite in case.overwrites {
3974                let mut write_len = 0;
3975                let overwrite = overwrite
3976                    .into_iter()
3977                    .map(|r| {
3978                        write_len += (r.end - r.start) * block_size;
3979                        r.start * block_size..r.end * block_size
3980                    })
3981                    .collect::<Vec<_>>();
3982                let mut write_buf = object.allocate_buffer(write_len as usize).await;
3983                let data = (0..20).cycle().take(write_len as usize).collect::<Vec<_>>();
3984                write_buf.as_mut_slice().copy_from_slice(&data);
3985
3986                let mut expected_buf = object.allocate_buffer(file_size as usize).await;
3987                assert_eq!(
3988                    object.read(0, expected_buf.as_mut()).await.unwrap(),
3989                    expected_buf.len()
3990                );
3991                let expected_buf_slice = expected_buf.as_mut_slice();
3992                let mut data_slice = data.as_slice();
3993                for r in &overwrite {
3994                    let len = r.length().unwrap() as usize;
3995                    let (copy_from, rest) = data_slice.split_at(len);
3996                    expected_buf_slice[r.start as usize..r.end as usize]
3997                        .copy_from_slice(&copy_from);
3998                    data_slice = rest;
3999                }
4000
4001                let mut transaction = object.new_transaction().await.unwrap();
4002                object
4003                    .multi_overwrite(&mut transaction, 0, &overwrite, write_buf.as_mut())
4004                    .await
4005                    .unwrap_or_else(|_| panic!("multi_overwrite error on case {}", i));
4006                // Double check the emitted checksums. We should have one u64 checksum for every
4007                // block we wrote to disk.
4008                let mut checksummed_range_length = 0;
4009                let mut num_checksums = 0;
4010                for (device_range, checksums, _) in transaction.checksums() {
4011                    let range_len = device_range.end - device_range.start;
4012                    let checksums_len = checksums.len() as u64;
4013                    assert_eq!(range_len / checksums_len, block_size);
4014                    checksummed_range_length += range_len;
4015                    num_checksums += checksums_len;
4016                }
4017                assert_eq!(checksummed_range_length, write_len);
4018                assert_eq!(num_checksums, write_len / block_size);
4019                transaction.commit().await.unwrap();
4020
4021                let mut buf = object.allocate_buffer(file_size as usize).await;
4022                assert_eq!(
4023                    object.read(0, buf.as_mut()).await.unwrap(),
4024                    buf.len(),
4025                    "failed length check on case {}",
4026                    i,
4027                );
4028                assert_eq!(buf.as_slice(), expected_buf.as_slice(), "failed on case {}", i);
4029            }
4030
4031            fsck_volume(&fs, object.store().store_object_id(), None).await.expect("fsck failed");
4032            fs.close().await.expect("close failed");
4033        }
4034    }
4035
4036    #[fuchsia::test(threads = 10)]
4037    async fn test_multi_overwrite_mode_updates() {
4038        let (fs, object) = test_filesystem_and_empty_object().await;
4039        let block_size = fs.block_size();
4040        let file_size = block_size * 10;
4041        object.truncate(file_size).await.unwrap();
4042
4043        let mut expected_bitmap = BitVec::from_elem(10, false);
4044
4045        object.allocate(0..10 * block_size).await.unwrap();
4046        assert_eq!(
4047            get_modes(&object, 0..10 * block_size).await,
4048            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4049        );
4050
4051        let mut write_buf = object.allocate_buffer(2 * block_size as usize).await;
4052        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4053        write_buf.as_mut_slice().copy_from_slice(&data);
4054        let mut transaction = object.new_transaction().await.unwrap();
4055        object
4056            .multi_overwrite(
4057                &mut transaction,
4058                0,
4059                &[2 * block_size..4 * block_size],
4060                write_buf.as_mut(),
4061            )
4062            .await
4063            .unwrap();
4064        transaction.commit().await.unwrap();
4065
4066        expected_bitmap.set(2, true);
4067        expected_bitmap.set(3, true);
4068        assert_eq!(
4069            get_modes(&object, 0..10 * block_size).await,
4070            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4071        );
4072
4073        let mut write_buf = object.allocate_buffer(3 * block_size as usize).await;
4074        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4075        write_buf.as_mut_slice().copy_from_slice(&data);
4076        let mut transaction = object.new_transaction().await.unwrap();
4077        object
4078            .multi_overwrite(
4079                &mut transaction,
4080                0,
4081                &[3 * block_size..5 * block_size, 6 * block_size..7 * block_size],
4082                write_buf.as_mut(),
4083            )
4084            .await
4085            .unwrap();
4086        transaction.commit().await.unwrap();
4087
4088        expected_bitmap.set(4, true);
4089        expected_bitmap.set(6, true);
4090        assert_eq!(
4091            get_modes(&object, 0..10 * block_size).await,
4092            vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4093        );
4094
4095        let mut write_buf = object.allocate_buffer(6 * block_size as usize).await;
4096        let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4097        write_buf.as_mut_slice().copy_from_slice(&data);
4098        let mut transaction = object.new_transaction().await.unwrap();
4099        object
4100            .multi_overwrite(
4101                &mut transaction,
4102                0,
4103                &[
4104                    0..2 * block_size,
4105                    5 * block_size..6 * block_size,
4106                    7 * block_size..10 * block_size,
4107                ],
4108                write_buf.as_mut(),
4109            )
4110            .await
4111            .unwrap();
4112        transaction.commit().await.unwrap();
4113
4114        assert_eq!(
4115            get_modes(&object, 0..10 * block_size).await,
4116            vec![(0..10 * block_size, ExtentMode::Overwrite)]
4117        );
4118
4119        fs.close().await.expect("close failed");
4120    }
4121}