fxfs/object_store/
store_object_handle.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::{fletcher64, Checksum, Checksums};
6use crate::errors::FxfsError;
7use crate::log::*;
8use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
9use crate::lsm_tree::Query;
10use crate::object_handle::ObjectHandle;
11use crate::object_store::extent_record::{ExtentKey, ExtentMode, ExtentValue};
12use crate::object_store::object_manager::ObjectManager;
13use crate::object_store::object_record::{
14    AttributeKey, EncryptionKeys, ExtendedAttributeValue, ObjectAttributes, ObjectItem, ObjectKey,
15    ObjectKeyData, ObjectValue, Timestamp,
16};
17use crate::object_store::transaction::{
18    lock_keys, AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Options,
19    ReadGuard, Transaction,
20};
21use crate::object_store::{
22    HandleOptions, HandleOwner, ObjectStore, TrimMode, TrimResult, VOLUME_DATA_KEY_ID,
23};
24use crate::range::RangeExt;
25use crate::round::{round_down, round_up};
26use anyhow::{anyhow, bail, ensure, Context, Error};
27use assert_matches::assert_matches;
28use bit_vec::BitVec;
29use futures::stream::{FuturesOrdered, FuturesUnordered};
30use futures::{try_join, TryStreamExt};
31use fxfs_crypto::{Cipher, CipherSet, FindKeyResult, Key, KeyPurpose};
32use fxfs_trace::trace;
33use static_assertions::const_assert;
34use std::cmp::min;
35use std::future::Future;
36use std::ops::Range;
37use std::sync::atomic::{self, AtomicBool, Ordering};
38use std::sync::Arc;
39use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
40use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
41
42/// Maximum size for an extended attribute name.
43pub const MAX_XATTR_NAME_SIZE: usize = 255;
44/// Maximum size an extended attribute can be before it's stored in an object attribute instead of
45/// inside the record directly.
46pub const MAX_INLINE_XATTR_SIZE: usize = 256;
47/// Maximum size for an extended attribute value. NB: the maximum size for an extended attribute is
48/// 64kB, which we rely on for correctness when deleting attributes, so ensure it's always
49/// enforced.
50pub const MAX_XATTR_VALUE_SIZE: usize = 64000;
51/// The range of fxfs attribute ids which are reserved for extended attribute values. Whenever a
52/// new attribute is needed, the first unused id will be chosen from this range. It's technically
53/// safe to change these values, but it has potential consequences - they are only used during id
54/// selection, so any existing extended attributes keep their ids, which means any past or present
55/// selected range here could potentially have used attributes unless they are explicitly migrated,
56/// which isn't currently done.
57pub const EXTENDED_ATTRIBUTE_RANGE_START: u64 = 64;
58pub const EXTENDED_ATTRIBUTE_RANGE_END: u64 = 512;
59
60/// When writing, often the logic should be generic over whether or not checksums are generated.
61/// This provides that and a handy way to convert to the more general ExtentMode that eventually
62/// stores it on disk.
63#[derive(Debug, Clone, PartialEq)]
64pub enum MaybeChecksums {
65    None,
66    Fletcher(Vec<Checksum>),
67}
68
69impl MaybeChecksums {
70    pub fn maybe_as_ref(&self) -> Option<&[Checksum]> {
71        match self {
72            Self::None => None,
73            Self::Fletcher(sums) => Some(&sums),
74        }
75    }
76
77    pub fn split_off(&mut self, at: usize) -> Self {
78        match self {
79            Self::None => Self::None,
80            Self::Fletcher(sums) => Self::Fletcher(sums.split_off(at)),
81        }
82    }
83
84    pub fn to_mode(self) -> ExtentMode {
85        match self {
86            Self::None => ExtentMode::Raw,
87            Self::Fletcher(sums) => ExtentMode::Cow(Checksums::fletcher(sums)),
88        }
89    }
90
91    pub fn into_option(self) -> Option<Vec<Checksum>> {
92        match self {
93            Self::None => None,
94            Self::Fletcher(sums) => Some(sums),
95        }
96    }
97}
98
99/// The mode of operation when setting extended attributes. This is the same as the fidl definition
100/// but is replicated here so we don't have fuchsia.io structures in the api, so this can be used
101/// on host.
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub enum SetExtendedAttributeMode {
104    /// Create the extended attribute if it doesn't exist, replace the value if it does.
105    Set,
106    /// Create the extended attribute if it doesn't exist, fail if it does.
107    Create,
108    /// Replace the extended attribute value if it exists, fail if it doesn't.
109    Replace,
110}
111
112impl From<fio::SetExtendedAttributeMode> for SetExtendedAttributeMode {
113    fn from(other: fio::SetExtendedAttributeMode) -> SetExtendedAttributeMode {
114        match other {
115            fio::SetExtendedAttributeMode::Set => SetExtendedAttributeMode::Set,
116            fio::SetExtendedAttributeMode::Create => SetExtendedAttributeMode::Create,
117            fio::SetExtendedAttributeMode::Replace => SetExtendedAttributeMode::Replace,
118        }
119    }
120}
121
122enum Encryption {
123    /// The object doesn't use encryption.
124    None,
125
126    /// The object has keys that are cached (which means unwrapping occurs on-demand) with
127    /// KeyManager.
128    CachedKeys,
129
130    /// The object has permanent keys registered with KeyManager.
131    PermanentKeys,
132}
133
134#[derive(PartialEq, Debug)]
135enum OverwriteBitmaps {
136    None,
137    Some {
138        /// The block bitmap of a partial overwrite extent in the tree.
139        extent_bitmap: BitVec,
140        /// A bitmap of the blocks written to by the current overwrite.
141        write_bitmap: BitVec,
142        /// BitVec doesn't have a slice equivalent, so for a particular section of the write we
143        /// keep track of an offset in the bitmaps to operate on.
144        bitmap_offset: usize,
145    },
146}
147
148impl OverwriteBitmaps {
149    fn new(extent_bitmap: BitVec) -> Self {
150        OverwriteBitmaps::Some {
151            write_bitmap: BitVec::from_elem(extent_bitmap.len(), false),
152            extent_bitmap,
153            bitmap_offset: 0,
154        }
155    }
156
157    fn is_none(&self) -> bool {
158        *self == OverwriteBitmaps::None
159    }
160
161    fn set_offset(&mut self, new_offset: usize) {
162        match self {
163            OverwriteBitmaps::None => (),
164            OverwriteBitmaps::Some { bitmap_offset, .. } => *bitmap_offset = new_offset,
165        }
166    }
167
168    fn get_from_extent_bitmap(&self, i: usize) -> Option<bool> {
169        match self {
170            OverwriteBitmaps::None => None,
171            OverwriteBitmaps::Some { extent_bitmap, bitmap_offset, .. } => {
172                extent_bitmap.get(*bitmap_offset + i)
173            }
174        }
175    }
176
177    fn set_in_write_bitmap(&mut self, i: usize, x: bool) {
178        match self {
179            OverwriteBitmaps::None => (),
180            OverwriteBitmaps::Some { write_bitmap, bitmap_offset, .. } => {
181                write_bitmap.set(*bitmap_offset + i, x)
182            }
183        }
184    }
185
186    fn take_bitmaps(self) -> Option<(BitVec, BitVec)> {
187        match self {
188            OverwriteBitmaps::None => None,
189            OverwriteBitmaps::Some { extent_bitmap, write_bitmap, .. } => {
190                Some((extent_bitmap, write_bitmap))
191            }
192        }
193    }
194}
195
196/// When writing to Overwrite ranges, we need to emit whether a set of checksums for a device range
197/// is the first write to that region or not. This tracks one such range so we can use it after the
198/// write to break up the returned checksum list.
199#[derive(PartialEq, Debug)]
200struct ChecksumRangeChunk {
201    checksum_range: Range<usize>,
202    device_range: Range<u64>,
203    is_first_write: bool,
204}
205
206impl ChecksumRangeChunk {
207    fn group_first_write_ranges(
208        bitmaps: &mut OverwriteBitmaps,
209        block_size: u64,
210        write_device_range: Range<u64>,
211    ) -> Vec<ChecksumRangeChunk> {
212        let write_block_len = (write_device_range.length().unwrap() / block_size) as usize;
213        if bitmaps.is_none() {
214            // If there is no bitmap, then the overwrite range is fully written to. However, we
215            // could still be within the journal flush window where one of the blocks was written
216            // to for the first time to put it in this state, so we still need to emit the
217            // checksums in case replay needs them.
218            vec![ChecksumRangeChunk {
219                checksum_range: 0..write_block_len,
220                device_range: write_device_range,
221                is_first_write: false,
222            }]
223        } else {
224            let mut checksum_ranges = vec![ChecksumRangeChunk {
225                checksum_range: 0..0,
226                device_range: write_device_range.start..write_device_range.start,
227                is_first_write: !bitmaps.get_from_extent_bitmap(0).unwrap(),
228            }];
229            let mut working_range = checksum_ranges.last_mut().unwrap();
230            for i in 0..write_block_len {
231                bitmaps.set_in_write_bitmap(i, true);
232
233                // bitmap.get returning true means the block is initialized and therefore has been
234                // written to before.
235                if working_range.is_first_write != bitmaps.get_from_extent_bitmap(i).unwrap() {
236                    // is_first_write is tracking opposite of what comes back from the bitmap, so
237                    // if the are still opposites we continue our current range.
238                    working_range.checksum_range.end += 1;
239                    working_range.device_range.end += block_size;
240                } else {
241                    // If they are the same, then we need to make a new chunk.
242                    let new_chunk = ChecksumRangeChunk {
243                        checksum_range: working_range.checksum_range.end
244                            ..working_range.checksum_range.end + 1,
245                        device_range: working_range.device_range.end
246                            ..working_range.device_range.end + block_size,
247                        is_first_write: !working_range.is_first_write,
248                    };
249                    checksum_ranges.push(new_chunk);
250                    working_range = checksum_ranges.last_mut().unwrap();
251                }
252            }
253            checksum_ranges
254        }
255    }
256}
257
258/// StoreObjectHandle is the lowest-level, untyped handle to an object with the id [`object_id`] in
259/// a particular store, [`owner`]. It provides functionality shared across all objects, such as
260/// reading and writing attributes and managing encryption keys.
261///
262/// Since it's untyped, it doesn't do any object kind validation, and is generally meant to
263/// implement higher-level typed handles.
264///
265/// For file-like objects with a data attribute, DataObjectHandle implements traits and helpers for
266/// doing more complex extent management and caches the content size.
267///
268/// For directory-like objects, Directory knows how to add and remove child objects and enumerate
269/// its children.
270pub struct StoreObjectHandle<S: HandleOwner> {
271    owner: Arc<S>,
272    object_id: u64,
273    options: HandleOptions,
274    trace: AtomicBool,
275    encryption: Encryption,
276}
277
278impl<S: HandleOwner> ObjectHandle for StoreObjectHandle<S> {
279    fn set_trace(&self, v: bool) {
280        info!(store_id = self.store().store_object_id, oid = self.object_id(), trace = v; "trace");
281        self.trace.store(v, atomic::Ordering::Relaxed);
282    }
283
284    fn object_id(&self) -> u64 {
285        return self.object_id;
286    }
287
288    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
289        self.store().device.allocate_buffer(size)
290    }
291
292    fn block_size(&self) -> u64 {
293        self.store().block_size()
294    }
295}
296
297struct Watchdog {
298    _task: fasync::Task<()>,
299}
300
301impl Watchdog {
302    fn new(increment_seconds: u64, cb: impl Fn(u64) + Send + 'static) -> Self {
303        Self {
304            _task: fasync::Task::spawn(async move {
305                let increment = increment_seconds.try_into().unwrap();
306                let mut fired_counter = 0;
307                let mut next_wake = fasync::MonotonicInstant::now();
308                loop {
309                    next_wake += std::time::Duration::from_secs(increment).into();
310                    // If this isn't being scheduled this will purposely result in fast looping when
311                    // it does. This will be insightful about the state of the thread and task
312                    // scheduling.
313                    if fasync::MonotonicInstant::now() < next_wake {
314                        fasync::Timer::new(next_wake).await;
315                    }
316                    fired_counter += 1;
317                    cb(fired_counter);
318                }
319            }),
320        }
321    }
322}
323
324impl<S: HandleOwner> StoreObjectHandle<S> {
325    /// Make a new StoreObjectHandle for the object with id [`object_id`] in store [`owner`].
326    pub fn new(
327        owner: Arc<S>,
328        object_id: u64,
329        permanent_keys: bool,
330        options: HandleOptions,
331        trace: bool,
332    ) -> Self {
333        let encryption = if permanent_keys {
334            Encryption::PermanentKeys
335        } else if owner.as_ref().as_ref().is_encrypted() {
336            Encryption::CachedKeys
337        } else {
338            Encryption::None
339        };
340        Self { owner, object_id, encryption, options, trace: AtomicBool::new(trace) }
341    }
342
343    pub fn owner(&self) -> &Arc<S> {
344        &self.owner
345    }
346
347    pub fn store(&self) -> &ObjectStore {
348        self.owner.as_ref().as_ref()
349    }
350
351    pub fn trace(&self) -> bool {
352        self.trace.load(atomic::Ordering::Relaxed)
353    }
354
355    pub fn is_encrypted(&self) -> bool {
356        !matches!(self.encryption, Encryption::None)
357    }
358
359    /// Get the default set of transaction options for this object. This is mostly the overall
360    /// default, modified by any [`HandleOptions`] held by this handle.
361    pub fn default_transaction_options<'b>(&self) -> Options<'b> {
362        Options { skip_journal_checks: self.options.skip_journal_checks, ..Default::default() }
363    }
364
365    pub async fn new_transaction_with_options<'b>(
366        &self,
367        attribute_id: u64,
368        options: Options<'b>,
369    ) -> Result<Transaction<'b>, Error> {
370        Ok(self
371            .store()
372            .filesystem()
373            .new_transaction(
374                lock_keys![
375                    LockKey::object_attribute(
376                        self.store().store_object_id(),
377                        self.object_id(),
378                        attribute_id,
379                    ),
380                    LockKey::object(self.store().store_object_id(), self.object_id()),
381                ],
382                options,
383            )
384            .await?)
385    }
386
387    pub async fn new_transaction<'b>(&self, attribute_id: u64) -> Result<Transaction<'b>, Error> {
388        self.new_transaction_with_options(attribute_id, self.default_transaction_options()).await
389    }
390
391    // If |transaction| has an impending mutation for the underlying object, returns that.
392    // Otherwise, looks up the object from the tree.
393    async fn txn_get_object_mutation(
394        &self,
395        transaction: &Transaction<'_>,
396    ) -> Result<ObjectStoreMutation, Error> {
397        self.store().txn_get_object_mutation(transaction, self.object_id()).await
398    }
399
400    // Returns the amount deallocated.
401    async fn deallocate_old_extents(
402        &self,
403        transaction: &mut Transaction<'_>,
404        attribute_id: u64,
405        range: Range<u64>,
406    ) -> Result<u64, Error> {
407        let block_size = self.block_size();
408        assert_eq!(range.start % block_size, 0);
409        assert_eq!(range.end % block_size, 0);
410        if range.start == range.end {
411            return Ok(0);
412        }
413        let tree = &self.store().tree;
414        let layer_set = tree.layer_set();
415        let key = ExtentKey { range };
416        let lower_bound = ObjectKey::attribute(
417            self.object_id(),
418            attribute_id,
419            AttributeKey::Extent(key.search_key()),
420        );
421        let mut merger = layer_set.merger();
422        let mut iter = merger.query(Query::FullRange(&lower_bound)).await?;
423        let allocator = self.store().allocator();
424        let mut deallocated = 0;
425        let trace = self.trace();
426        while let Some(ItemRef {
427            key:
428                ObjectKey {
429                    object_id,
430                    data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
431                },
432            value: ObjectValue::Extent(value),
433            ..
434        }) = iter.get()
435        {
436            if *object_id != self.object_id() || *attr_id != attribute_id {
437                break;
438            }
439            if let ExtentValue::Some { device_offset, .. } = value {
440                if let Some(overlap) = key.overlap(extent_key) {
441                    let range = device_offset + overlap.start - extent_key.range.start
442                        ..device_offset + overlap.end - extent_key.range.start;
443                    ensure!(range.is_aligned(block_size), FxfsError::Inconsistent);
444                    if trace {
445                        info!(
446                            store_id = self.store().store_object_id(),
447                            oid = self.object_id(),
448                            device_range:? = range,
449                            len = range.end - range.start,
450                            extent_key:?;
451                            "D",
452                        );
453                    }
454                    allocator
455                        .deallocate(transaction, self.store().store_object_id(), range)
456                        .await?;
457                    deallocated += overlap.end - overlap.start;
458                } else {
459                    break;
460                }
461            }
462            iter.advance().await?;
463        }
464        Ok(deallocated)
465    }
466
467    // Writes aligned data (that should already be encrypted) to the given offset and computes
468    // checksums if requested.
469    async fn write_aligned(
470        &self,
471        buf: BufferRef<'_>,
472        device_offset: u64,
473    ) -> Result<MaybeChecksums, Error> {
474        if self.trace() {
475            info!(
476                store_id = self.store().store_object_id(),
477                oid = self.object_id(),
478                device_range:? = (device_offset..device_offset + buf.len() as u64),
479                len = buf.len();
480                "W",
481            );
482        }
483        let store = self.store();
484        store.device_write_ops.fetch_add(1, Ordering::Relaxed);
485        let mut checksums = Vec::new();
486        let _watchdog = Watchdog::new(10, |count| {
487            warn!("Write has been stalled for {} seconds", count * 10);
488        });
489        try_join!(store.device.write(device_offset, buf), async {
490            if !self.options.skip_checksums {
491                let block_size = self.block_size();
492                for chunk in buf.as_slice().chunks_exact(block_size as usize) {
493                    checksums.push(fletcher64(chunk, 0));
494                }
495            }
496            Ok(())
497        })?;
498        Ok(if !self.options.skip_checksums {
499            MaybeChecksums::Fletcher(checksums)
500        } else {
501            MaybeChecksums::None
502        })
503    }
504
505    /// Flushes the underlying device.  This is expensive and should be used sparingly.
506    pub async fn flush_device(&self) -> Result<(), Error> {
507        self.store().device().flush().await
508    }
509
510    pub async fn update_allocated_size(
511        &self,
512        transaction: &mut Transaction<'_>,
513        allocated: u64,
514        deallocated: u64,
515    ) -> Result<(), Error> {
516        if allocated == deallocated {
517            return Ok(());
518        }
519        let mut mutation = self.txn_get_object_mutation(transaction).await?;
520        if let ObjectValue::Object {
521            attributes: ObjectAttributes { project_id, allocated_size, .. },
522            ..
523        } = &mut mutation.item.value
524        {
525            // The only way for these to fail are if the volume is inconsistent.
526            *allocated_size = allocated_size
527                .checked_add(allocated)
528                .ok_or_else(|| anyhow!(FxfsError::Inconsistent).context("Allocated size overflow"))?
529                .checked_sub(deallocated)
530                .ok_or_else(|| {
531                    anyhow!(FxfsError::Inconsistent).context("Allocated size underflow")
532                })?;
533
534            if *project_id != 0 {
535                // The allocated and deallocated shouldn't exceed the max size of the file which is
536                // bound within i64.
537                let diff = i64::try_from(allocated).unwrap() - i64::try_from(deallocated).unwrap();
538                transaction.add(
539                    self.store().store_object_id(),
540                    Mutation::merge_object(
541                        ObjectKey::project_usage(
542                            self.store().root_directory_object_id(),
543                            *project_id,
544                        ),
545                        ObjectValue::BytesAndNodes { bytes: diff, nodes: 0 },
546                    ),
547                );
548            }
549        } else {
550            // This can occur when the object mutation is created from an object in the tree which
551            // was corrupt.
552            bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
553        }
554        transaction.add(self.store().store_object_id, Mutation::ObjectStore(mutation));
555        Ok(())
556    }
557
558    pub async fn update_attributes<'a>(
559        &self,
560        transaction: &mut Transaction<'a>,
561        node_attributes: Option<&fio::MutableNodeAttributes>,
562        change_time: Option<Timestamp>,
563    ) -> Result<(), Error> {
564        if let Some(&fio::MutableNodeAttributes { selinux_context: Some(ref context), .. }) =
565            node_attributes
566        {
567            if let fio::SelinuxContext::Data(context) = context {
568                self.set_extended_attribute_impl(
569                    "security.selinux".into(),
570                    context.clone(),
571                    SetExtendedAttributeMode::Set,
572                    transaction,
573                )
574                .await?;
575            } else {
576                return Err(anyhow!(FxfsError::InvalidArgs)
577                    .context("Only set SELinux context with `data` member."));
578            }
579        }
580        self.store()
581            .update_attributes(transaction, self.object_id, node_attributes, change_time)
582            .await
583    }
584
585    /// Zeroes the given range.  The range must be aligned.  Returns the amount of data deallocated.
586    pub async fn zero(
587        &self,
588        transaction: &mut Transaction<'_>,
589        attribute_id: u64,
590        range: Range<u64>,
591    ) -> Result<(), Error> {
592        let deallocated =
593            self.deallocate_old_extents(transaction, attribute_id, range.clone()).await?;
594        if deallocated > 0 {
595            self.update_allocated_size(transaction, 0, deallocated).await?;
596            transaction.add(
597                self.store().store_object_id,
598                Mutation::merge_object(
599                    ObjectKey::extent(self.object_id(), attribute_id, range),
600                    ObjectValue::Extent(ExtentValue::deleted_extent()),
601                ),
602            );
603        }
604        Ok(())
605    }
606
607    // Returns a new aligned buffer (reading the head and tail blocks if necessary) with a copy of
608    // the data from `buf`.
609    pub async fn align_buffer(
610        &self,
611        attribute_id: u64,
612        offset: u64,
613        buf: BufferRef<'_>,
614    ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
615        let block_size = self.block_size();
616        let end = offset + buf.len() as u64;
617        let aligned =
618            round_down(offset, block_size)..round_up(end, block_size).ok_or(FxfsError::TooBig)?;
619
620        let mut aligned_buf =
621            self.store().device.allocate_buffer((aligned.end - aligned.start) as usize).await;
622
623        // Deal with head alignment.
624        if aligned.start < offset {
625            let mut head_block = aligned_buf.subslice_mut(..block_size as usize);
626            let read = self.read(attribute_id, aligned.start, head_block.reborrow()).await?;
627            head_block.as_mut_slice()[read..].fill(0);
628        }
629
630        // Deal with tail alignment.
631        if aligned.end > end {
632            let end_block_offset = aligned.end - block_size;
633            // There's no need to read the tail block if we read it as part of the head block.
634            if offset <= end_block_offset {
635                let mut tail_block =
636                    aligned_buf.subslice_mut(aligned_buf.len() - block_size as usize..);
637                let read = self.read(attribute_id, end_block_offset, tail_block.reborrow()).await?;
638                tail_block.as_mut_slice()[read..].fill(0);
639            }
640        }
641
642        aligned_buf.as_mut_slice()
643            [(offset - aligned.start) as usize..(end - aligned.start) as usize]
644            .copy_from_slice(buf.as_slice());
645
646        Ok((aligned, aligned_buf))
647    }
648
649    /// Trim an attribute's extents, potentially adding a graveyard trim entry if more trimming is
650    /// needed, so the transaction can be committed without worrying about leaking data.
651    ///
652    /// This doesn't update the size stored in the attribute value - the caller is responsible for
653    /// doing that to keep the size up to date.
654    pub async fn shrink(
655        &self,
656        transaction: &mut Transaction<'_>,
657        attribute_id: u64,
658        size: u64,
659    ) -> Result<NeedsTrim, Error> {
660        let store = self.store();
661        let needs_trim = matches!(
662            store
663                .trim_some(transaction, self.object_id(), attribute_id, TrimMode::FromOffset(size))
664                .await?,
665            TrimResult::Incomplete
666        );
667        if needs_trim {
668            // Add the object to the graveyard in case the following transactions don't get
669            // replayed.
670            let graveyard_id = store.graveyard_directory_object_id();
671            match store
672                .tree
673                .find(&ObjectKey::graveyard_entry(graveyard_id, self.object_id()))
674                .await?
675            {
676                Some(ObjectItem { value: ObjectValue::Some, .. })
677                | Some(ObjectItem { value: ObjectValue::Trim, .. }) => {
678                    // This object is already in the graveyard so we don't need to do anything.
679                }
680                _ => {
681                    transaction.add(
682                        store.store_object_id,
683                        Mutation::replace_or_insert_object(
684                            ObjectKey::graveyard_entry(graveyard_id, self.object_id()),
685                            ObjectValue::Trim,
686                        ),
687                    );
688                }
689            }
690        }
691        Ok(NeedsTrim(needs_trim))
692    }
693
694    pub async fn read_and_decrypt(
695        &self,
696        device_offset: u64,
697        file_offset: u64,
698        mut buffer: MutableBufferRef<'_>,
699        key_id: u64,
700        // If provided, blocks in the bitmap that are zero will have their contents zeroed out. The
701        // bitmap should be exactly the size of the buffer and aligned to the offset in the extent
702        // the read is starting at.
703        block_bitmap: Option<bit_vec::BitVec>,
704    ) -> Result<(), Error> {
705        let store = self.store();
706        store.device_read_ops.fetch_add(1, Ordering::Relaxed);
707        let ((), key) = {
708            let _watchdog = Watchdog::new(10, |count| {
709                warn!("Read has been stalled for {} seconds", count * 10);
710            });
711            futures::future::try_join(
712                store.device.read(device_offset, buffer.reborrow()),
713                self.get_key(Some(key_id)),
714            )
715            .await?
716        };
717        if let Some(key) = key {
718            key.decrypt(file_offset, buffer.as_mut_slice())?;
719        }
720        if let Some(bitmap) = block_bitmap {
721            let block_size = self.block_size() as usize;
722            let buf = buffer.as_mut_slice();
723            debug_assert_eq!(bitmap.len() * block_size, buf.len());
724            for (i, block) in bitmap.iter().enumerate() {
725                if !block {
726                    let start = i * block_size;
727                    buf[start..start + block_size].fill(0);
728                }
729            }
730        }
731        Ok(())
732    }
733
734    /// Returns the specified key. If `key_id` is None, it will try and return the fscrypt key if
735    /// it is present, or the volume key if it isn't. If the fscrypt key is present, but the key
736    /// cannot be unwrapped, then this will return `FxfsError::NoKey`. If the volume is not
737    /// encrypted, this returns None.
738    pub async fn get_key(&self, key_id: Option<u64>) -> Result<Option<Key>, Error> {
739        let store = self.store();
740        Ok(match self.encryption {
741            Encryption::None => None,
742            Encryption::CachedKeys => {
743                if let Some(key_id) = key_id {
744                    Some(
745                        store
746                            .key_manager
747                            .get_key(
748                                self.object_id,
749                                store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
750                                async || store.get_keys(self.object_id).await,
751                                key_id,
752                            )
753                            .await?
754                            .ok_or(FxfsError::NotFound)?,
755                    )
756                } else {
757                    Some(
758                        store
759                            .key_manager
760                            .get_fscrypt_key_if_present(
761                                self.object_id,
762                                store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
763                                async || store.get_keys(self.object_id).await,
764                            )
765                            .await?,
766                    )
767                }
768            }
769            Encryption::PermanentKeys => {
770                Some(store.key_manager.get(self.object_id).await?.unwrap())
771            }
772        })
773    }
774
775    /// This will only work for a non-permanent volume data key. This is designed to be used with
776    /// extended attributes where we'll only create the key on demand for directories and encrypted
777    /// files.
778    async fn get_or_create_key(&self, transaction: &mut Transaction<'_>) -> Result<Key, Error> {
779        let store = self.store();
780
781        // Fast path: try and get keys from the cache.
782        if let Some(key) = store.key_manager.get(self.object_id).await.context("get failed")? {
783            return Ok(key);
784        }
785
786        let crypt = store.crypt().ok_or_else(|| anyhow!("No crypt!"))?;
787
788        // Next, see if the keys are already created.
789        let (mut wrapped_keys, mut unwrapped_keys) = if let Some(item) =
790            store.tree.find(&ObjectKey::keys(self.object_id)).await.context("find failed")?
791        {
792            if let ObjectValue::Keys(EncryptionKeys::AES256XTS(wrapped_keys)) = item.value {
793                let unwrapped_keys = store
794                    .key_manager
795                    .get_keys(
796                        self.object_id,
797                        crypt.as_ref(),
798                        &mut Some(async || Ok(wrapped_keys.clone())),
799                        /* permanent= */ false,
800                        /* force= */ false,
801                    )
802                    .await
803                    .context("get_keys failed")?;
804                match unwrapped_keys.find_key(VOLUME_DATA_KEY_ID) {
805                    FindKeyResult::NotFound => {}
806                    FindKeyResult::Unavailable => return Err(FxfsError::NoKey.into()),
807                    FindKeyResult::Key(key) => return Ok(key),
808                }
809                (wrapped_keys, unwrapped_keys.ciphers().to_vec())
810            } else {
811                return Err(anyhow!(FxfsError::Inconsistent));
812            }
813        } else {
814            Default::default()
815        };
816
817        // Proceed to create the key.  The transaction holds the required locks.
818        let (key, unwrapped_key) = crypt.create_key(self.object_id, KeyPurpose::Data).await?;
819
820        // Merge in unwrapped_key.
821        unwrapped_keys.push(Cipher::new(VOLUME_DATA_KEY_ID, &unwrapped_key));
822        let unwrapped_keys = Arc::new(CipherSet::from(unwrapped_keys));
823
824        // Arrange for the key to be added to the cache when (and if) the transaction
825        // commits.
826        struct UnwrappedKeys {
827            object_id: u64,
828            new_keys: Arc<CipherSet>,
829        }
830
831        impl AssociatedObject for UnwrappedKeys {
832            fn will_apply_mutation(
833                &self,
834                _mutation: &Mutation,
835                object_id: u64,
836                manager: &ObjectManager,
837            ) {
838                manager.store(object_id).unwrap().key_manager.insert(
839                    self.object_id,
840                    self.new_keys.clone(),
841                    /* permanent= */ false,
842                );
843            }
844        }
845
846        wrapped_keys.push((VOLUME_DATA_KEY_ID, key));
847
848        transaction.add_with_object(
849            store.store_object_id(),
850            Mutation::replace_or_insert_object(
851                ObjectKey::keys(self.object_id),
852                ObjectValue::keys(EncryptionKeys::AES256XTS(wrapped_keys)),
853            ),
854            AssocObj::Owned(Box::new(UnwrappedKeys {
855                object_id: self.object_id,
856                new_keys: unwrapped_keys.clone(),
857            })),
858        );
859
860        let FindKeyResult::Key(key) = unwrapped_keys.find_key(VOLUME_DATA_KEY_ID) else {
861            unreachable!()
862        };
863        Ok(key)
864    }
865
866    pub async fn read(
867        &self,
868        attribute_id: u64,
869        offset: u64,
870        mut buf: MutableBufferRef<'_>,
871    ) -> Result<usize, Error> {
872        let fs = self.store().filesystem();
873        let guard = fs
874            .lock_manager()
875            .read_lock(lock_keys![LockKey::object_attribute(
876                self.store().store_object_id(),
877                self.object_id(),
878                attribute_id,
879            )])
880            .await;
881
882        let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
883        let item = self.store().tree().find(&key).await?;
884        let size = match item {
885            Some(item) if item.key == key => match item.value {
886                ObjectValue::Attribute { size, .. } => size,
887                _ => bail!(FxfsError::Inconsistent),
888            },
889            _ => return Ok(0),
890        };
891        if offset >= size {
892            return Ok(0);
893        }
894        let length = min(buf.len() as u64, size - offset) as usize;
895        buf = buf.subslice_mut(0..length);
896        self.read_unchecked(attribute_id, offset, buf, &guard).await?;
897        Ok(length)
898    }
899
900    /// Read `buf.len()` bytes from the attribute `attribute_id`, starting at `offset`, into `buf`.
901    /// It's required that a read lock on this attribute id is taken before this is called.
902    ///
903    /// This function doesn't do any size checking - any portion of `buf` past the end of the file
904    /// will be filled with zeros. The caller is responsible for enforcing the file size on reads.
905    /// This is because, just looking at the extents, we can't tell the difference between the file
906    /// actually ending and there just being a section at the end with no data (since attributes
907    /// are sparse).
908    pub(super) async fn read_unchecked(
909        &self,
910        attribute_id: u64,
911        mut offset: u64,
912        mut buf: MutableBufferRef<'_>,
913        _guard: &ReadGuard<'_>,
914    ) -> Result<(), Error> {
915        if buf.len() == 0 {
916            return Ok(());
917        }
918        let end_offset = offset + buf.len() as u64;
919
920        self.store().logical_read_ops.fetch_add(1, Ordering::Relaxed);
921
922        // Whilst the read offset must be aligned to the filesystem block size, the buffer need only
923        // be aligned to the device's block size.
924        let block_size = self.block_size() as u64;
925        let device_block_size = self.store().device.block_size() as u64;
926        assert_eq!(offset % block_size, 0);
927        assert_eq!(buf.range().start as u64 % device_block_size, 0);
928        let tree = &self.store().tree;
929        let layer_set = tree.layer_set();
930        let mut merger = layer_set.merger();
931        let mut iter = merger
932            .query(Query::LimitedRange(&ObjectKey::extent(
933                self.object_id(),
934                attribute_id,
935                offset..end_offset,
936            )))
937            .await?;
938        let end_align = ((offset + buf.len() as u64) % block_size) as usize;
939        let trace = self.trace();
940        let reads = FuturesUnordered::new();
941        while let Some(ItemRef {
942            key:
943                ObjectKey {
944                    object_id,
945                    data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
946                },
947            value: ObjectValue::Extent(extent_value),
948            ..
949        }) = iter.get()
950        {
951            if *object_id != self.object_id() || *attr_id != attribute_id {
952                break;
953            }
954            ensure!(
955                extent_key.range.is_valid() && extent_key.range.is_aligned(block_size),
956                FxfsError::Inconsistent
957            );
958            if extent_key.range.start > offset {
959                // Zero everything up to the start of the extent.
960                let to_zero = min(extent_key.range.start - offset, buf.len() as u64) as usize;
961                for i in &mut buf.as_mut_slice()[..to_zero] {
962                    *i = 0;
963                }
964                buf = buf.subslice_mut(to_zero..);
965                if buf.is_empty() {
966                    break;
967                }
968                offset += to_zero as u64;
969            }
970
971            if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
972                let mut device_offset = device_offset + (offset - extent_key.range.start);
973                let key_id = *key_id;
974
975                let to_copy = min(buf.len() - end_align, (extent_key.range.end - offset) as usize);
976                if to_copy > 0 {
977                    if trace {
978                        info!(
979                            store_id = self.store().store_object_id(),
980                            oid = self.object_id(),
981                            device_range:? = (device_offset..device_offset + to_copy as u64),
982                            offset,
983                            range:? = extent_key.range,
984                            block_size;
985                            "R",
986                        );
987                    }
988                    let (head, tail) = buf.split_at_mut(to_copy);
989                    let maybe_bitmap = match mode {
990                        ExtentMode::OverwritePartial(bitmap) => {
991                            let mut read_bitmap = bitmap.clone().split_off(
992                                ((offset - extent_key.range.start) / block_size) as usize,
993                            );
994                            read_bitmap.truncate(to_copy / block_size as usize);
995                            Some(read_bitmap)
996                        }
997                        _ => None,
998                    };
999                    reads.push(self.read_and_decrypt(
1000                        device_offset,
1001                        offset,
1002                        head,
1003                        key_id,
1004                        maybe_bitmap,
1005                    ));
1006                    buf = tail;
1007                    if buf.is_empty() {
1008                        break;
1009                    }
1010                    offset += to_copy as u64;
1011                    device_offset += to_copy as u64;
1012                }
1013
1014                // Deal with end alignment by reading the existing contents into an alignment
1015                // buffer.
1016                if offset < extent_key.range.end && end_align > 0 {
1017                    if let ExtentMode::OverwritePartial(bitmap) = mode {
1018                        let bitmap_offset = (offset - extent_key.range.start) / block_size;
1019                        if !bitmap.get(bitmap_offset as usize).ok_or(FxfsError::Inconsistent)? {
1020                            // If this block isn't actually initialized, skip it.
1021                            break;
1022                        }
1023                    }
1024                    let mut align_buf =
1025                        self.store().device.allocate_buffer(block_size as usize).await;
1026                    if trace {
1027                        info!(
1028                            store_id = self.store().store_object_id(),
1029                            oid = self.object_id(),
1030                            device_range:? = (device_offset..device_offset + align_buf.len() as u64);
1031                            "RT",
1032                        );
1033                    }
1034                    self.read_and_decrypt(device_offset, offset, align_buf.as_mut(), key_id, None)
1035                        .await?;
1036                    buf.as_mut_slice().copy_from_slice(&align_buf.as_slice()[..end_align]);
1037                    buf = buf.subslice_mut(0..0);
1038                    break;
1039                }
1040            } else if extent_key.range.end >= offset + buf.len() as u64 {
1041                // Deleted extent covers remainder, so we're done.
1042                break;
1043            }
1044
1045            iter.advance().await?;
1046        }
1047        reads.try_collect::<()>().await?;
1048        buf.as_mut_slice().fill(0);
1049        Ok(())
1050    }
1051
1052    /// Reads an entire attribute.
1053    pub async fn read_attr(&self, attribute_id: u64) -> Result<Option<Box<[u8]>>, Error> {
1054        let store = self.store();
1055        let tree = &store.tree;
1056        let layer_set = tree.layer_set();
1057        let mut merger = layer_set.merger();
1058        let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1059        let mut iter = merger.query(Query::FullRange(&key)).await?;
1060        let (mut buffer, size) = match iter.get() {
1061            Some(item) if item.key == &key => match item.value {
1062                ObjectValue::Attribute { size, .. } => {
1063                    // TODO(https://fxbug.dev/42073113): size > max buffer size
1064                    (
1065                        store
1066                            .device
1067                            .allocate_buffer(round_up(*size, self.block_size()).unwrap() as usize)
1068                            .await,
1069                        *size as usize,
1070                    )
1071                }
1072                // Attribute was deleted.
1073                ObjectValue::None => return Ok(None),
1074                _ => bail!(FxfsError::Inconsistent),
1075            },
1076            _ => return Ok(None),
1077        };
1078        store.logical_read_ops.fetch_add(1, Ordering::Relaxed);
1079        let mut last_offset = 0;
1080        loop {
1081            iter.advance().await?;
1082            match iter.get() {
1083                Some(ItemRef {
1084                    key:
1085                        ObjectKey {
1086                            object_id,
1087                            data:
1088                                ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
1089                        },
1090                    value: ObjectValue::Extent(extent_value),
1091                    ..
1092                }) if *object_id == self.object_id() && *attr_id == attribute_id => {
1093                    if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1094                        let offset = extent_key.range.start as usize;
1095                        buffer.as_mut_slice()[last_offset..offset].fill(0);
1096                        let end = std::cmp::min(extent_key.range.end as usize, buffer.len());
1097                        let maybe_bitmap = match mode {
1098                            ExtentMode::OverwritePartial(bitmap) => {
1099                                // The caller has to adjust the bitmap if necessary, but we always
1100                                // start from the beginning of any extent, so we only truncate.
1101                                let mut read_bitmap = bitmap.clone();
1102                                read_bitmap.truncate(
1103                                    (end - extent_key.range.start as usize)
1104                                        / self.block_size() as usize,
1105                                );
1106                                Some(read_bitmap)
1107                            }
1108                            _ => None,
1109                        };
1110                        self.read_and_decrypt(
1111                            *device_offset,
1112                            extent_key.range.start,
1113                            buffer.subslice_mut(offset..end as usize),
1114                            *key_id,
1115                            maybe_bitmap,
1116                        )
1117                        .await?;
1118                        last_offset = end;
1119                        if last_offset >= size {
1120                            break;
1121                        }
1122                    }
1123                }
1124                _ => break,
1125            }
1126        }
1127        buffer.as_mut_slice()[std::cmp::min(last_offset, size)..].fill(0);
1128        Ok(Some(buffer.as_slice()[..size].into()))
1129    }
1130
1131    /// Writes potentially unaligned data at `device_offset` and returns checksums if requested.
1132    /// The data will be encrypted if necessary.  `buf` is mutable as an optimization, since the
1133    /// write may require encryption, we can encrypt the buffer in-place rather than copying to
1134    /// another buffer if the write is already aligned.
1135    ///
1136    /// NOTE: This will not create keys if they are missing (it will fail with an error if that
1137    /// happens to be the case).
1138    pub async fn write_at(
1139        &self,
1140        attribute_id: u64,
1141        offset: u64,
1142        buf: MutableBufferRef<'_>,
1143        key_id: Option<u64>,
1144        device_offset: u64,
1145    ) -> Result<MaybeChecksums, Error> {
1146        let mut transfer_buf;
1147        let block_size = self.block_size();
1148        let (range, mut transfer_buf_ref) =
1149            if offset % block_size == 0 && buf.len() as u64 % block_size == 0 {
1150                (offset..offset + buf.len() as u64, buf)
1151            } else {
1152                let (range, buf) = self.align_buffer(attribute_id, offset, buf.as_ref()).await?;
1153                transfer_buf = buf;
1154                (range, transfer_buf.as_mut())
1155            };
1156
1157        if let Some(key) = self.get_key(key_id).await? {
1158            key.encrypt(range.start, transfer_buf_ref.as_mut_slice())?;
1159        }
1160
1161        self.write_aligned(transfer_buf_ref.as_ref(), device_offset - (offset - range.start)).await
1162    }
1163
1164    /// Writes to multiple ranges with data provided in `buf`.  The buffer can be modified in place
1165    /// if encryption takes place.  The ranges must all be aligned and no change to content size is
1166    /// applied; the caller is responsible for updating size if required.  If `key_id` is None, it
1167    /// means pick the default key for the object which is the fscrypt key if present, or the volume
1168    /// data key, or no key if it's an unencrypted file.
1169    pub async fn multi_write(
1170        &self,
1171        transaction: &mut Transaction<'_>,
1172        attribute_id: u64,
1173        key_id: Option<u64>,
1174        ranges: &[Range<u64>],
1175        mut buf: MutableBufferRef<'_>,
1176    ) -> Result<(), Error> {
1177        if buf.is_empty() {
1178            return Ok(());
1179        }
1180        let block_size = self.block_size();
1181        let store = self.store();
1182        let store_id = store.store_object_id();
1183
1184        // The only key we allow to be created on-the-fly is a non permanent key wrapped with the
1185        // volume data key.
1186        let key = if key_id == Some(VOLUME_DATA_KEY_ID)
1187            && matches!(self.encryption, Encryption::CachedKeys)
1188        {
1189            Some(self.get_or_create_key(transaction).await.context("get_or_create_key failed")?)
1190        } else {
1191            self.get_key(key_id).await?
1192        };
1193        let key_id = if let Some(key) = key {
1194            let mut slice = buf.as_mut_slice();
1195            for r in ranges {
1196                let l = r.end - r.start;
1197                let (head, tail) = slice.split_at_mut(l as usize);
1198                key.encrypt(r.start, head)?;
1199                slice = tail;
1200            }
1201            key.key_id()
1202        } else {
1203            0
1204        };
1205
1206        let mut allocated = 0;
1207        let allocator = store.allocator();
1208        let trace = self.trace();
1209        let mut writes = FuturesOrdered::new();
1210        while !buf.is_empty() {
1211            let device_range = allocator
1212                .allocate(transaction, store_id, buf.len() as u64)
1213                .await
1214                .context("allocation failed")?;
1215            if trace {
1216                info!(
1217                    store_id,
1218                    oid = self.object_id(),
1219                    device_range:?,
1220                    len = device_range.end - device_range.start;
1221                    "A",
1222                );
1223            }
1224            let device_range_len = device_range.end - device_range.start;
1225            allocated += device_range_len;
1226
1227            let (head, tail) = buf.split_at_mut(device_range_len as usize);
1228            buf = tail;
1229
1230            writes.push_back(async move {
1231                let len = head.len() as u64;
1232                Result::<_, Error>::Ok((
1233                    device_range.start,
1234                    len,
1235                    self.write_aligned(head.as_ref(), device_range.start).await?,
1236                ))
1237            });
1238        }
1239
1240        self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1241        let ((mutations, checksums), deallocated) = try_join!(
1242            async {
1243                let mut current_range = 0..0;
1244                let mut mutations = Vec::new();
1245                let mut out_checksums = Vec::new();
1246                let mut ranges = ranges.iter();
1247                while let Some((mut device_offset, mut len, mut checksums)) =
1248                    writes.try_next().await?
1249                {
1250                    while len > 0 {
1251                        if current_range.end <= current_range.start {
1252                            current_range = ranges.next().unwrap().clone();
1253                        }
1254                        let chunk_len = std::cmp::min(len, current_range.end - current_range.start);
1255                        let tail = checksums.split_off((chunk_len / block_size) as usize);
1256                        if let Some(checksums) = checksums.maybe_as_ref() {
1257                            out_checksums.push((
1258                                device_offset..device_offset + chunk_len,
1259                                checksums.to_owned(),
1260                            ));
1261                        }
1262                        mutations.push(Mutation::merge_object(
1263                            ObjectKey::extent(
1264                                self.object_id(),
1265                                attribute_id,
1266                                current_range.start..current_range.start + chunk_len,
1267                            ),
1268                            ObjectValue::Extent(ExtentValue::new(
1269                                device_offset,
1270                                checksums.to_mode(),
1271                                key_id,
1272                            )),
1273                        ));
1274                        checksums = tail;
1275                        device_offset += chunk_len;
1276                        len -= chunk_len;
1277                        current_range.start += chunk_len;
1278                    }
1279                }
1280                Result::<_, Error>::Ok((mutations, out_checksums))
1281            },
1282            async {
1283                let mut deallocated = 0;
1284                for r in ranges {
1285                    deallocated +=
1286                        self.deallocate_old_extents(transaction, attribute_id, r.clone()).await?;
1287                }
1288                Result::<_, Error>::Ok(deallocated)
1289            }
1290        )?;
1291        for m in mutations {
1292            transaction.add(store_id, m);
1293        }
1294        for (r, c) in checksums {
1295            transaction.add_checksum(r, c, true);
1296        }
1297        self.update_allocated_size(transaction, allocated, deallocated).await
1298    }
1299
1300    /// Write data to overwrite extents with the provided set of ranges. This makes a strong
1301    /// assumption that the ranges are actually going to be already allocated overwrite extents and
1302    /// will error out or do something wrong if they aren't. It also assumes the ranges passed to
1303    /// it are sorted.
1304    pub async fn multi_overwrite<'a>(
1305        &'a self,
1306        transaction: &mut Transaction<'a>,
1307        attr_id: u64,
1308        ranges: &[Range<u64>],
1309        mut buf: MutableBufferRef<'_>,
1310    ) -> Result<(), Error> {
1311        if buf.is_empty() {
1312            return Ok(());
1313        }
1314        let block_size = self.block_size();
1315        let store = self.store();
1316        let tree = store.tree();
1317        let store_id = store.store_object_id();
1318
1319        let key = self.get_key(None).await?;
1320        let key_id = if let Some(key) = &key {
1321            let mut slice = buf.as_mut_slice();
1322            for r in ranges {
1323                let l = r.end - r.start;
1324                let (head, tail) = slice.split_at_mut(l as usize);
1325                key.encrypt(r.start, head)?;
1326                slice = tail;
1327            }
1328            key.key_id()
1329        } else {
1330            0
1331        };
1332
1333        let mut range_iter = ranges.into_iter();
1334        // There should be at least one range if the buffer has data in it
1335        let mut target_range = range_iter.next().unwrap().clone();
1336        let mut mutations = Vec::new();
1337        let writes = FuturesUnordered::new();
1338
1339        let layer_set = tree.layer_set();
1340        let mut merger = layer_set.merger();
1341        let mut iter = merger
1342            .query(Query::FullRange(&ObjectKey::attribute(
1343                self.object_id(),
1344                attr_id,
1345                AttributeKey::Extent(ExtentKey::search_key_from_offset(target_range.start)),
1346            )))
1347            .await?;
1348
1349        loop {
1350            match iter.get() {
1351                Some(ItemRef {
1352                    key:
1353                        ObjectKey {
1354                            object_id,
1355                            data:
1356                                ObjectKeyData::Attribute(
1357                                    attribute_id,
1358                                    AttributeKey::Extent(ExtentKey { range }),
1359                                ),
1360                        },
1361                    value: ObjectValue::Extent(extent_value),
1362                    ..
1363                }) if *object_id == self.object_id() && *attribute_id == attr_id => {
1364                    // If this extent ends before the target range starts (not possible on the
1365                    // first loop because of the query parameters but possible on further loops),
1366                    // advance until we find a the next one we care about.
1367                    if range.end <= target_range.start {
1368                        iter.advance().await?;
1369                        continue;
1370                    }
1371                    let (device_offset, mode) = match extent_value {
1372                        ExtentValue::None => {
1373                            return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1374                                format!(
1375                                    "multi_overwrite failed: target_range ({}, {}) overlaps with \
1376                                deleted extent found at ({}, {})",
1377                                    target_range.start, target_range.end, range.start, range.end,
1378                                )
1379                            })
1380                        }
1381                        ExtentValue::Some { device_offset, mode, .. } => (device_offset, mode),
1382                    };
1383                    // The ranges passed to this function should already by allocated, so
1384                    // extent records should exist for them.
1385                    if range.start > target_range.start {
1386                        return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1387                            format!(
1388                            "multi_overwrite failed: target range ({}, {}) starts before first \
1389                            extent found at ({}, {})",
1390                            target_range.start,
1391                            target_range.end,
1392                            range.start,
1393                            range.end,
1394                        )
1395                        });
1396                    }
1397                    let mut bitmap = match mode {
1398                        ExtentMode::Raw | ExtentMode::Cow(_) => {
1399                            return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1400                                format!(
1401                                    "multi_overwrite failed: \
1402                            extent from ({}, {}) which overlaps target range ({}, {}) had the \
1403                            wrong extent mode",
1404                                    range.start, range.end, target_range.start, target_range.end,
1405                                )
1406                            })
1407                        }
1408                        ExtentMode::OverwritePartial(bitmap) => {
1409                            OverwriteBitmaps::new(bitmap.clone())
1410                        }
1411                        ExtentMode::Overwrite => OverwriteBitmaps::None,
1412                    };
1413                    loop {
1414                        let offset_within_extent = target_range.start - range.start;
1415                        let bitmap_offset = offset_within_extent / block_size;
1416                        let write_device_offset = *device_offset + offset_within_extent;
1417                        let write_end = min(range.end, target_range.end);
1418                        let write_len = write_end - target_range.start;
1419                        let write_device_range =
1420                            write_device_offset..write_device_offset + write_len;
1421                        let (current_buf, remaining_buf) = buf.split_at_mut(write_len as usize);
1422
1423                        bitmap.set_offset(bitmap_offset as usize);
1424                        let checksum_ranges = ChecksumRangeChunk::group_first_write_ranges(
1425                            &mut bitmap,
1426                            block_size,
1427                            write_device_range,
1428                        );
1429                        writes.push(async move {
1430                            let maybe_checksums = self
1431                                .write_aligned(current_buf.as_ref(), write_device_offset)
1432                                .await?;
1433                            Ok::<_, Error>(match maybe_checksums {
1434                                MaybeChecksums::None => Vec::new(),
1435                                MaybeChecksums::Fletcher(checksums) => checksum_ranges
1436                                    .into_iter()
1437                                    .map(
1438                                        |ChecksumRangeChunk {
1439                                             checksum_range,
1440                                             device_range,
1441                                             is_first_write,
1442                                         }| {
1443                                            (
1444                                                device_range,
1445                                                checksums[checksum_range].to_vec(),
1446                                                is_first_write,
1447                                            )
1448                                        },
1449                                    )
1450                                    .collect(),
1451                            })
1452                        });
1453                        buf = remaining_buf;
1454                        target_range.start += write_len;
1455                        if target_range.start == target_range.end {
1456                            match range_iter.next() {
1457                                None => break,
1458                                Some(next_range) => target_range = next_range.clone(),
1459                            }
1460                        }
1461                        if range.end <= target_range.start {
1462                            break;
1463                        }
1464                    }
1465                    if let Some((mut bitmap, write_bitmap)) = bitmap.take_bitmaps() {
1466                        if bitmap.or(&write_bitmap) {
1467                            let mode = if bitmap.all() {
1468                                ExtentMode::Overwrite
1469                            } else {
1470                                ExtentMode::OverwritePartial(bitmap)
1471                            };
1472                            mutations.push(Mutation::merge_object(
1473                                ObjectKey::extent(self.object_id(), attr_id, range.clone()),
1474                                ObjectValue::Extent(ExtentValue::new(*device_offset, mode, key_id)),
1475                            ))
1476                        }
1477                    }
1478                    if target_range.start == target_range.end {
1479                        break;
1480                    }
1481                    iter.advance().await?;
1482                }
1483                // We've either run past the end of the existing extents or something is wrong with
1484                // the tree. The main section should break if it finishes the ranges, so either
1485                // case, this is an error.
1486                _ => bail!(anyhow!(FxfsError::Internal).context(
1487                    "found a non-extent object record while there were still ranges to process"
1488                )),
1489            }
1490        }
1491
1492        let checksums = writes.try_collect::<Vec<_>>().await?;
1493        for (r, c, first_write) in checksums.into_iter().flatten() {
1494            transaction.add_checksum(r, c, first_write);
1495        }
1496
1497        for m in mutations {
1498            transaction.add(store_id, m);
1499        }
1500
1501        Ok(())
1502    }
1503
1504    /// Writes an attribute that should not already exist and therefore does not require trimming.
1505    /// Breaks up the write into multiple transactions if `data.len()` is larger than `batch_size`.
1506    /// If writing the attribute requires multiple transactions, adds the attribute to the
1507    /// graveyard. The caller is responsible for removing the attribute from the graveyard when it
1508    /// commits the last transaction.  This always writes using a key wrapped with the volume data
1509    /// key.
1510    #[trace]
1511    pub async fn write_new_attr_in_batches<'a>(
1512        &'a self,
1513        transaction: &mut Transaction<'a>,
1514        attribute_id: u64,
1515        data: &[u8],
1516        batch_size: usize,
1517    ) -> Result<(), Error> {
1518        transaction.add(
1519            self.store().store_object_id,
1520            Mutation::replace_or_insert_object(
1521                ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1522                ObjectValue::attribute(data.len() as u64, false),
1523            ),
1524        );
1525        let chunks = data.chunks(batch_size);
1526        let num_chunks = chunks.len();
1527        if num_chunks > 1 {
1528            transaction.add(
1529                self.store().store_object_id,
1530                Mutation::replace_or_insert_object(
1531                    ObjectKey::graveyard_attribute_entry(
1532                        self.store().graveyard_directory_object_id(),
1533                        self.object_id(),
1534                        attribute_id,
1535                    ),
1536                    ObjectValue::Some,
1537                ),
1538            );
1539        }
1540        let mut start_offset = 0;
1541        for (i, chunk) in chunks.enumerate() {
1542            let rounded_len = round_up(chunk.len() as u64, self.block_size()).unwrap();
1543            let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1544            let slice = buffer.as_mut_slice();
1545            slice[..chunk.len()].copy_from_slice(chunk);
1546            slice[chunk.len()..].fill(0);
1547            self.multi_write(
1548                transaction,
1549                attribute_id,
1550                Some(VOLUME_DATA_KEY_ID),
1551                &[start_offset..start_offset + rounded_len],
1552                buffer.as_mut(),
1553            )
1554            .await?;
1555            start_offset += rounded_len;
1556            // Do not commit the last chunk.
1557            if i < num_chunks - 1 {
1558                transaction.commit_and_continue().await?;
1559            }
1560        }
1561        Ok(())
1562    }
1563
1564    /// Writes an entire attribute. Returns whether or not the attribute needs to continue being
1565    /// trimmed - if the new data is shorter than the old data, this will trim any extents beyond
1566    /// the end of the new size, but if there were too many for a single transaction, a commit
1567    /// needs to be made before trimming again, so the responsibility is left to the caller so as
1568    /// to not accidentally split the transaction when it's not in a consistent state.  This will
1569    /// write using the volume data key; the fscrypt key is not supported.
1570    pub async fn write_attr(
1571        &self,
1572        transaction: &mut Transaction<'_>,
1573        attribute_id: u64,
1574        data: &[u8],
1575    ) -> Result<NeedsTrim, Error> {
1576        let rounded_len = round_up(data.len() as u64, self.block_size()).unwrap();
1577        let store = self.store();
1578        let tree = store.tree();
1579        let should_trim = if let Some(item) = tree
1580            .find(&ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute))
1581            .await?
1582        {
1583            match item.value {
1584                ObjectValue::Attribute { size: _, has_overwrite_extents: true } => {
1585                    bail!(anyhow!(FxfsError::Inconsistent)
1586                        .context("write_attr on an attribute with overwrite extents"))
1587                }
1588                ObjectValue::Attribute { size, .. } => (data.len() as u64) < size,
1589                _ => bail!(FxfsError::Inconsistent),
1590            }
1591        } else {
1592            false
1593        };
1594        let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1595        let slice = buffer.as_mut_slice();
1596        slice[..data.len()].copy_from_slice(data);
1597        slice[data.len()..].fill(0);
1598        self.multi_write(
1599            transaction,
1600            attribute_id,
1601            Some(VOLUME_DATA_KEY_ID),
1602            &[0..rounded_len],
1603            buffer.as_mut(),
1604        )
1605        .await?;
1606        transaction.add(
1607            self.store().store_object_id,
1608            Mutation::replace_or_insert_object(
1609                ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1610                ObjectValue::attribute(data.len() as u64, false),
1611            ),
1612        );
1613        if should_trim {
1614            self.shrink(transaction, attribute_id, data.len() as u64).await
1615        } else {
1616            Ok(NeedsTrim(false))
1617        }
1618    }
1619
1620    pub async fn list_extended_attributes(&self) -> Result<Vec<Vec<u8>>, Error> {
1621        let layer_set = self.store().tree().layer_set();
1622        let mut merger = layer_set.merger();
1623        // Seek to the first extended attribute key for this object.
1624        let mut iter = merger
1625            .query(Query::FullRange(&ObjectKey::extended_attribute(self.object_id(), Vec::new())))
1626            .await?;
1627        let mut out = Vec::new();
1628        while let Some(item) = iter.get() {
1629            // Skip deleted extended attributes.
1630            if item.value != &ObjectValue::None {
1631                match item.key {
1632                    ObjectKey { object_id, data: ObjectKeyData::ExtendedAttribute { name } } => {
1633                        if self.object_id() != *object_id {
1634                            bail!(anyhow!(FxfsError::Inconsistent)
1635                                .context("list_extended_attributes: wrong object id"))
1636                        }
1637                        out.push(name.clone());
1638                    }
1639                    // Once we hit something that isn't an extended attribute key, we've gotten to
1640                    // the end.
1641                    _ => break,
1642                }
1643            }
1644            iter.advance().await?;
1645        }
1646        Ok(out)
1647    }
1648
1649    /// Looks up the values for the extended attribute `fio::SELINUX_CONTEXT_NAME`, returning it
1650    /// if it is found inline. If it is not inline, it will request use of the
1651    /// `get_extended_attributes` method. If the entry doesn't exist at all, returns None.
1652    pub async fn get_inline_selinux_context(&self) -> Result<Option<fio::SelinuxContext>, Error> {
1653        // This optimization is only useful as long as the attribute is smaller than inline sizes.
1654        // Avoid reading the data out of the attributes.
1655        const_assert!(fio::MAX_SELINUX_CONTEXT_ATTRIBUTE_LEN as usize <= MAX_INLINE_XATTR_SIZE);
1656        let item = match self
1657            .store()
1658            .tree()
1659            .find(&ObjectKey::extended_attribute(
1660                self.object_id(),
1661                fio::SELINUX_CONTEXT_NAME.into(),
1662            ))
1663            .await?
1664        {
1665            Some(item) => item,
1666            None => return Ok(None),
1667        };
1668        match item.value {
1669            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => {
1670                Ok(Some(fio::SelinuxContext::Data(value)))
1671            }
1672            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(_)) => {
1673                Ok(Some(fio::SelinuxContext::UseExtendedAttributes(fio::EmptyStruct {})))
1674            }
1675            _ => {
1676                bail!(anyhow!(FxfsError::Inconsistent)
1677                    .context("get_inline_extended_attribute: Expected ExtendedAttribute value"))
1678            }
1679        }
1680    }
1681
1682    pub async fn get_extended_attribute(&self, name: Vec<u8>) -> Result<Vec<u8>, Error> {
1683        let item = self
1684            .store()
1685            .tree()
1686            .find(&ObjectKey::extended_attribute(self.object_id(), name))
1687            .await?
1688            .ok_or(FxfsError::NotFound)?;
1689        match item.value {
1690            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => Ok(value),
1691            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1692                Ok(self.read_attr(id).await?.ok_or(FxfsError::Inconsistent)?.into_vec())
1693            }
1694            _ => {
1695                bail!(anyhow!(FxfsError::Inconsistent)
1696                    .context("get_extended_attribute: Expected ExtendedAttribute value"))
1697            }
1698        }
1699    }
1700
1701    pub async fn set_extended_attribute(
1702        &self,
1703        name: Vec<u8>,
1704        value: Vec<u8>,
1705        mode: SetExtendedAttributeMode,
1706    ) -> Result<(), Error> {
1707        let store = self.store();
1708        let fs = store.filesystem();
1709        // NB: We need to take this lock before we potentially look up the value to prevent racing
1710        // with another set.
1711        let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
1712        let mut transaction = fs.new_transaction(keys, Options::default()).await?;
1713        self.set_extended_attribute_impl(name, value, mode, &mut transaction).await?;
1714        transaction.commit().await?;
1715        Ok(())
1716    }
1717
1718    async fn set_extended_attribute_impl(
1719        &self,
1720        name: Vec<u8>,
1721        value: Vec<u8>,
1722        mode: SetExtendedAttributeMode,
1723        transaction: &mut Transaction<'_>,
1724    ) -> Result<(), Error> {
1725        ensure!(name.len() <= MAX_XATTR_NAME_SIZE, FxfsError::TooBig);
1726        ensure!(value.len() <= MAX_XATTR_VALUE_SIZE, FxfsError::TooBig);
1727        let tree = self.store().tree();
1728        let object_key = ObjectKey::extended_attribute(self.object_id(), name);
1729
1730        let existing_attribute_id = {
1731            let (found, existing_attribute_id) = match tree.find(&object_key).await? {
1732                None => (false, None),
1733                Some(Item { value, .. }) => (
1734                    true,
1735                    match value {
1736                        ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
1737                        ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1738                            Some(id)
1739                        }
1740                        _ => bail!(anyhow!(FxfsError::Inconsistent)
1741                            .context("expected extended attribute value")),
1742                    },
1743                ),
1744            };
1745            match mode {
1746                SetExtendedAttributeMode::Create if found => {
1747                    bail!(FxfsError::AlreadyExists)
1748                }
1749                SetExtendedAttributeMode::Replace if !found => {
1750                    bail!(FxfsError::NotFound)
1751                }
1752                _ => (),
1753            }
1754            existing_attribute_id
1755        };
1756
1757        if let Some(attribute_id) = existing_attribute_id {
1758            // If we already have an attribute id allocated for this extended attribute, we always
1759            // use it, even if the value has shrunk enough to be stored inline. We don't need to
1760            // worry about trimming here for the same reason we don't need to worry about it when
1761            // we delete xattrs - they simply aren't large enough to ever need more than one
1762            // transaction.
1763            let _ = self.write_attr(transaction, attribute_id, &value).await?;
1764        } else if value.len() <= MAX_INLINE_XATTR_SIZE {
1765            transaction.add(
1766                self.store().store_object_id(),
1767                Mutation::replace_or_insert_object(
1768                    object_key,
1769                    ObjectValue::inline_extended_attribute(value),
1770                ),
1771            );
1772        } else {
1773            // If there isn't an existing attribute id and we are going to store the value in
1774            // an attribute, find the next empty attribute id in the range. We search for fxfs
1775            // attribute records specifically, instead of the extended attribute records, because
1776            // even if the extended attribute record is removed the attribute may not be fully
1777            // trimmed yet.
1778            let mut attribute_id = EXTENDED_ATTRIBUTE_RANGE_START;
1779            let layer_set = tree.layer_set();
1780            let mut merger = layer_set.merger();
1781            let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1782            let mut iter = merger.query(Query::FullRange(&key)).await?;
1783            loop {
1784                match iter.get() {
1785                    // None means the key passed to seek wasn't found. That means the first
1786                    // attribute is available and we can just stop right away.
1787                    None => break,
1788                    Some(ItemRef {
1789                        key: ObjectKey { object_id, data: ObjectKeyData::Attribute(attr_id, _) },
1790                        value,
1791                        ..
1792                    }) if *object_id == self.object_id() => {
1793                        if matches!(value, ObjectValue::None) {
1794                            // This attribute was once used but is now deleted, so it's safe to use
1795                            // again.
1796                            break;
1797                        }
1798                        if attribute_id < *attr_id {
1799                            // We found a gap - use it.
1800                            break;
1801                        } else if attribute_id == *attr_id {
1802                            // This attribute id is in use, try the next one.
1803                            attribute_id += 1;
1804                            if attribute_id == EXTENDED_ATTRIBUTE_RANGE_END {
1805                                bail!(FxfsError::NoSpace);
1806                            }
1807                        }
1808                        // If we don't hit either of those cases, we are still moving through the
1809                        // extent keys for the current attribute, so just keep advancing until the
1810                        // attribute id changes.
1811                    }
1812                    // As we are working our way through the iterator, if we hit anything that
1813                    // doesn't have our object id or attribute key data, we've gone past the end of
1814                    // this section and can stop.
1815                    _ => break,
1816                }
1817                iter.advance().await?;
1818            }
1819
1820            // We know this won't need trimming because it's a new attribute.
1821            let _ = self.write_attr(transaction, attribute_id, &value).await?;
1822            transaction.add(
1823                self.store().store_object_id(),
1824                Mutation::replace_or_insert_object(
1825                    object_key,
1826                    ObjectValue::extended_attribute(attribute_id),
1827                ),
1828            );
1829        }
1830
1831        Ok(())
1832    }
1833
1834    pub async fn remove_extended_attribute(&self, name: Vec<u8>) -> Result<(), Error> {
1835        let store = self.store();
1836        let tree = store.tree();
1837        let object_key = ObjectKey::extended_attribute(self.object_id(), name);
1838
1839        // NB: The API says we have to return an error if the attribute doesn't exist, so we have
1840        // to look it up first to make sure we have a record of it before we delete it. Make sure
1841        // we take a lock and make a transaction before we do so we don't race with other
1842        // operations.
1843        let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
1844        let mut transaction = store.filesystem().new_transaction(keys, Options::default()).await?;
1845
1846        let attribute_to_delete =
1847            match tree.find(&object_key).await?.ok_or(FxfsError::NotFound)?.value {
1848                ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => Some(id),
1849                ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
1850                _ => {
1851                    bail!(anyhow!(FxfsError::Inconsistent)
1852                        .context("remove_extended_attribute: Expected ExtendedAttribute value"))
1853                }
1854            };
1855
1856        transaction.add(
1857            store.store_object_id(),
1858            Mutation::replace_or_insert_object(object_key, ObjectValue::None),
1859        );
1860
1861        // If the attribute wasn't stored inline, we need to deallocate all the extents too. This
1862        // would normally need to interact with the graveyard for correctness - if there are too
1863        // many extents to delete to fit in a single transaction then we could potentially have
1864        // consistency issues. However, the maximum size of an extended attribute is small enough
1865        // that it will never come close to that limit even in the worst case, so we just delete
1866        // everything in one shot.
1867        if let Some(attribute_id) = attribute_to_delete {
1868            let trim_result = store
1869                .trim_some(
1870                    &mut transaction,
1871                    self.object_id(),
1872                    attribute_id,
1873                    TrimMode::FromOffset(0),
1874                )
1875                .await?;
1876            // In case you didn't read the comment above - this should not be used to delete
1877            // arbitrary attributes!
1878            assert_matches!(trim_result, TrimResult::Done(_));
1879            transaction.add(
1880                store.store_object_id(),
1881                Mutation::replace_or_insert_object(
1882                    ObjectKey::attribute(self.object_id, attribute_id, AttributeKey::Attribute),
1883                    ObjectValue::None,
1884                ),
1885            );
1886        }
1887
1888        transaction.commit().await?;
1889        Ok(())
1890    }
1891
1892    /// Returns a future that will pre-fetches the keys so as to avoid paying the performance
1893    /// penalty later. Must ensure that the object is not removed before the future completes.
1894    pub fn pre_fetch_keys(&self) -> Option<impl Future<Output = ()>> {
1895        if let Encryption::CachedKeys = self.encryption {
1896            let owner = self.owner.clone();
1897            let object_id = self.object_id;
1898            Some(async move {
1899                let store = owner.as_ref().as_ref();
1900                if let Some(crypt) = store.crypt() {
1901                    let _ = store
1902                        .key_manager
1903                        .get_keys(
1904                            object_id,
1905                            crypt.as_ref(),
1906                            &mut Some(async || store.get_keys(object_id).await),
1907                            /* permanent= */ false,
1908                            /* force= */ false,
1909                        )
1910                        .await;
1911                }
1912            })
1913        } else {
1914            None
1915        }
1916    }
1917}
1918
1919impl<S: HandleOwner> Drop for StoreObjectHandle<S> {
1920    fn drop(&mut self) {
1921        if self.is_encrypted() {
1922            let _ = self.store().key_manager.remove(self.object_id);
1923        }
1924    }
1925}
1926
1927/// When truncating an object, sometimes it might not be possible to complete the transaction in a
1928/// single transaction, in which case the caller needs to finish trimming the object in subsequent
1929/// transactions (by calling ObjectStore::trim).
1930#[must_use]
1931pub struct NeedsTrim(pub bool);
1932
1933#[cfg(test)]
1934mod tests {
1935    use super::{ChecksumRangeChunk, OverwriteBitmaps};
1936    use crate::errors::FxfsError;
1937    use crate::filesystem::{FxFilesystem, OpenFxFilesystem};
1938    use crate::object_handle::ObjectHandle;
1939    use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
1940    use crate::object_store::transaction::{lock_keys, Mutation, Options};
1941    use crate::object_store::{
1942        AttributeKey, DataObjectHandle, Directory, HandleOptions, LockKey, ObjectKey, ObjectStore,
1943        ObjectValue, SetExtendedAttributeMode, StoreObjectHandle, FSVERITY_MERKLE_ATTRIBUTE_ID,
1944    };
1945    use bit_vec::BitVec;
1946    use fuchsia_async as fasync;
1947    use futures::join;
1948    use std::sync::Arc;
1949    use storage_device::fake_device::FakeDevice;
1950    use storage_device::DeviceHolder;
1951
1952    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1953    const TEST_OBJECT_NAME: &str = "foo";
1954
1955    fn is_error(actual: anyhow::Error, expected: FxfsError) {
1956        assert_eq!(*actual.root_cause().downcast_ref::<FxfsError>().unwrap(), expected)
1957    }
1958
1959    async fn test_filesystem() -> OpenFxFilesystem {
1960        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
1961        FxFilesystem::new_empty(device).await.expect("new_empty failed")
1962    }
1963
1964    async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
1965    {
1966        let fs = test_filesystem().await;
1967        let store = fs.root_store();
1968
1969        let mut transaction = fs
1970            .clone()
1971            .new_transaction(
1972                lock_keys![LockKey::object(
1973                    store.store_object_id(),
1974                    store.root_directory_object_id()
1975                )],
1976                Options::default(),
1977            )
1978            .await
1979            .expect("new_transaction failed");
1980
1981        let object =
1982            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
1983                .await
1984                .expect("create_object failed");
1985
1986        let root_directory =
1987            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1988        root_directory
1989            .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
1990            .await
1991            .expect("add_child_file failed");
1992
1993        transaction.commit().await.expect("commit failed");
1994
1995        (fs, object)
1996    }
1997
1998    #[fuchsia::test(threads = 3)]
1999    async fn extended_attribute_double_remove() {
2000        // This test is intended to trip a potential race condition in remove. Removing an
2001        // attribute that doesn't exist is an error, so we need to check before we remove, but if
2002        // we aren't careful, two parallel removes might both succeed in the check and then both
2003        // remove the value.
2004        let (fs, object) = test_filesystem_and_empty_object().await;
2005        let basic = Arc::new(StoreObjectHandle::new(
2006            object.owner().clone(),
2007            object.object_id(),
2008            /* permanent_keys: */ false,
2009            HandleOptions::default(),
2010            false,
2011        ));
2012        let basic_a = basic.clone();
2013        let basic_b = basic.clone();
2014
2015        basic
2016            .set_extended_attribute(
2017                b"security.selinux".to_vec(),
2018                b"bar".to_vec(),
2019                SetExtendedAttributeMode::Set,
2020            )
2021            .await
2022            .expect("failed to set attribute");
2023
2024        // Try to remove the attribute twice at the same time. One should succeed in the race and
2025        // return Ok, and the other should fail the race and return NOT_FOUND.
2026        let a_task = fasync::Task::spawn(async move {
2027            basic_a.remove_extended_attribute(b"security.selinux".to_vec()).await
2028        });
2029        let b_task = fasync::Task::spawn(async move {
2030            basic_b.remove_extended_attribute(b"security.selinux".to_vec()).await
2031        });
2032        match join!(a_task, b_task) {
2033            (Ok(()), Ok(())) => panic!("both remove calls succeeded"),
2034            (Err(_), Err(_)) => panic!("both remove calls failed"),
2035
2036            (Ok(()), Err(e)) => is_error(e, FxfsError::NotFound),
2037            (Err(e), Ok(())) => is_error(e, FxfsError::NotFound),
2038        }
2039
2040        fs.close().await.expect("Close failed");
2041    }
2042
2043    #[fuchsia::test(threads = 3)]
2044    async fn extended_attribute_double_create() {
2045        // This test is intended to trip a potential race in set when using the create flag,
2046        // similar to above. If the create mode is set, we need to check that the attribute isn't
2047        // already created, but if two parallel creates both succeed in that check, and we aren't
2048        // careful with locking, they will both succeed and one will overwrite the other.
2049        let (fs, object) = test_filesystem_and_empty_object().await;
2050        let basic = Arc::new(StoreObjectHandle::new(
2051            object.owner().clone(),
2052            object.object_id(),
2053            /* permanent_keys: */ false,
2054            HandleOptions::default(),
2055            false,
2056        ));
2057        let basic_a = basic.clone();
2058        let basic_b = basic.clone();
2059
2060        // Try to set the attribute twice at the same time. One should succeed in the race and
2061        // return Ok, and the other should fail the race and return ALREADY_EXISTS.
2062        let a_task = fasync::Task::spawn(async move {
2063            basic_a
2064                .set_extended_attribute(
2065                    b"security.selinux".to_vec(),
2066                    b"one".to_vec(),
2067                    SetExtendedAttributeMode::Create,
2068                )
2069                .await
2070        });
2071        let b_task = fasync::Task::spawn(async move {
2072            basic_b
2073                .set_extended_attribute(
2074                    b"security.selinux".to_vec(),
2075                    b"two".to_vec(),
2076                    SetExtendedAttributeMode::Create,
2077                )
2078                .await
2079        });
2080        match join!(a_task, b_task) {
2081            (Ok(()), Ok(())) => panic!("both set calls succeeded"),
2082            (Err(_), Err(_)) => panic!("both set calls failed"),
2083
2084            (Ok(()), Err(e)) => {
2085                assert_eq!(
2086                    basic
2087                        .get_extended_attribute(b"security.selinux".to_vec())
2088                        .await
2089                        .expect("failed to get xattr"),
2090                    b"one"
2091                );
2092                is_error(e, FxfsError::AlreadyExists);
2093            }
2094            (Err(e), Ok(())) => {
2095                assert_eq!(
2096                    basic
2097                        .get_extended_attribute(b"security.selinux".to_vec())
2098                        .await
2099                        .expect("failed to get xattr"),
2100                    b"two"
2101                );
2102                is_error(e, FxfsError::AlreadyExists);
2103            }
2104        }
2105
2106        fs.close().await.expect("Close failed");
2107    }
2108
2109    struct TestAttr {
2110        name: Vec<u8>,
2111        value: Vec<u8>,
2112    }
2113
2114    impl TestAttr {
2115        fn new(name: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> Self {
2116            Self { name: name.as_ref().to_vec(), value: value.as_ref().to_vec() }
2117        }
2118        fn name(&self) -> Vec<u8> {
2119            self.name.clone()
2120        }
2121        fn value(&self) -> Vec<u8> {
2122            self.value.clone()
2123        }
2124    }
2125
2126    #[fuchsia::test]
2127    async fn extended_attributes() {
2128        let (fs, object) = test_filesystem_and_empty_object().await;
2129
2130        let test_attr = TestAttr::new(b"security.selinux", b"foo");
2131
2132        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2133        is_error(
2134            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2135            FxfsError::NotFound,
2136        );
2137
2138        object
2139            .set_extended_attribute(
2140                test_attr.name(),
2141                test_attr.value(),
2142                SetExtendedAttributeMode::Set,
2143            )
2144            .await
2145            .unwrap();
2146        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2147        assert_eq!(
2148            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2149            test_attr.value()
2150        );
2151
2152        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2153        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2154        is_error(
2155            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2156            FxfsError::NotFound,
2157        );
2158
2159        // Make sure we can object the same attribute being set again.
2160        object
2161            .set_extended_attribute(
2162                test_attr.name(),
2163                test_attr.value(),
2164                SetExtendedAttributeMode::Set,
2165            )
2166            .await
2167            .unwrap();
2168        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2169        assert_eq!(
2170            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2171            test_attr.value()
2172        );
2173
2174        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2175        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2176        is_error(
2177            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2178            FxfsError::NotFound,
2179        );
2180
2181        fs.close().await.expect("close failed");
2182    }
2183
2184    #[fuchsia::test]
2185    async fn large_extended_attribute() {
2186        let (fs, object) = test_filesystem_and_empty_object().await;
2187
2188        let test_attr = TestAttr::new(b"security.selinux", vec![3u8; 300]);
2189
2190        object
2191            .set_extended_attribute(
2192                test_attr.name(),
2193                test_attr.value(),
2194                SetExtendedAttributeMode::Set,
2195            )
2196            .await
2197            .unwrap();
2198        assert_eq!(
2199            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2200            test_attr.value()
2201        );
2202
2203        // Probe the fxfs attributes to make sure it did the expected thing. This relies on inside
2204        // knowledge of how the attribute id is chosen.
2205        assert_eq!(
2206            object
2207                .read_attr(64)
2208                .await
2209                .expect("read_attr failed")
2210                .expect("read_attr returned none")
2211                .into_vec(),
2212            test_attr.value()
2213        );
2214
2215        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2216        is_error(
2217            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2218            FxfsError::NotFound,
2219        );
2220
2221        // Make sure we can object the same attribute being set again.
2222        object
2223            .set_extended_attribute(
2224                test_attr.name(),
2225                test_attr.value(),
2226                SetExtendedAttributeMode::Set,
2227            )
2228            .await
2229            .unwrap();
2230        assert_eq!(
2231            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2232            test_attr.value()
2233        );
2234        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2235        is_error(
2236            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2237            FxfsError::NotFound,
2238        );
2239
2240        fs.close().await.expect("close failed");
2241    }
2242
2243    #[fuchsia::test]
2244    async fn multiple_extended_attributes() {
2245        let (fs, object) = test_filesystem_and_empty_object().await;
2246
2247        let attrs = [
2248            TestAttr::new(b"security.selinux", b"foo"),
2249            TestAttr::new(b"large.attribute", vec![3u8; 300]),
2250            TestAttr::new(b"an.attribute", b"asdf"),
2251            TestAttr::new(b"user.big", vec![5u8; 288]),
2252            TestAttr::new(b"user.tiny", b"smol"),
2253            TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2254            TestAttr::new(b"also big", vec![7u8; 500]),
2255            TestAttr::new(b"all.ones", vec![1u8; 11111]),
2256        ];
2257
2258        for i in 0..attrs.len() {
2259            object
2260                .set_extended_attribute(
2261                    attrs[i].name(),
2262                    attrs[i].value(),
2263                    SetExtendedAttributeMode::Set,
2264                )
2265                .await
2266                .unwrap();
2267            assert_eq!(
2268                object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2269                attrs[i].value()
2270            );
2271        }
2272
2273        for i in 0..attrs.len() {
2274            // Make sure expected attributes are still available.
2275            let mut found_attrs = object.list_extended_attributes().await.unwrap();
2276            let mut expected_attrs: Vec<Vec<u8>> = attrs.iter().skip(i).map(|a| a.name()).collect();
2277            found_attrs.sort();
2278            expected_attrs.sort();
2279            assert_eq!(found_attrs, expected_attrs);
2280            for j in i..attrs.len() {
2281                assert_eq!(
2282                    object.get_extended_attribute(attrs[j].name()).await.unwrap(),
2283                    attrs[j].value()
2284                );
2285            }
2286
2287            object.remove_extended_attribute(attrs[i].name()).await.expect("failed to remove");
2288            is_error(
2289                object.get_extended_attribute(attrs[i].name()).await.unwrap_err(),
2290                FxfsError::NotFound,
2291            );
2292        }
2293
2294        fs.close().await.expect("close failed");
2295    }
2296
2297    #[fuchsia::test]
2298    async fn multiple_extended_attributes_delete() {
2299        let (fs, object) = test_filesystem_and_empty_object().await;
2300        let store = object.owner().clone();
2301
2302        let attrs = [
2303            TestAttr::new(b"security.selinux", b"foo"),
2304            TestAttr::new(b"large.attribute", vec![3u8; 300]),
2305            TestAttr::new(b"an.attribute", b"asdf"),
2306            TestAttr::new(b"user.big", vec![5u8; 288]),
2307            TestAttr::new(b"user.tiny", b"smol"),
2308            TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2309            TestAttr::new(b"also big", vec![7u8; 500]),
2310            TestAttr::new(b"all.ones", vec![1u8; 11111]),
2311        ];
2312
2313        for i in 0..attrs.len() {
2314            object
2315                .set_extended_attribute(
2316                    attrs[i].name(),
2317                    attrs[i].value(),
2318                    SetExtendedAttributeMode::Set,
2319                )
2320                .await
2321                .unwrap();
2322            assert_eq!(
2323                object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2324                attrs[i].value()
2325            );
2326        }
2327
2328        // Unlink the file
2329        let root_directory =
2330            Directory::open(object.owner(), object.store().root_directory_object_id())
2331                .await
2332                .expect("open failed");
2333        let mut transaction = fs
2334            .clone()
2335            .new_transaction(
2336                lock_keys![
2337                    LockKey::object(store.store_object_id(), store.root_directory_object_id()),
2338                    LockKey::object(store.store_object_id(), object.object_id()),
2339                ],
2340                Options::default(),
2341            )
2342            .await
2343            .expect("new_transaction failed");
2344        crate::object_store::directory::replace_child(
2345            &mut transaction,
2346            None,
2347            (&root_directory, TEST_OBJECT_NAME),
2348        )
2349        .await
2350        .expect("replace_child failed");
2351        transaction.commit().await.unwrap();
2352        store.tombstone_object(object.object_id(), Options::default()).await.unwrap();
2353
2354        crate::fsck::fsck(fs.clone()).await.unwrap();
2355
2356        fs.close().await.expect("close failed");
2357    }
2358
2359    #[fuchsia::test]
2360    async fn extended_attribute_changing_sizes() {
2361        let (fs, object) = test_filesystem_and_empty_object().await;
2362
2363        let test_name = b"security.selinux";
2364        let test_small_attr = TestAttr::new(test_name, b"smol");
2365        let test_large_attr = TestAttr::new(test_name, vec![3u8; 300]);
2366
2367        object
2368            .set_extended_attribute(
2369                test_small_attr.name(),
2370                test_small_attr.value(),
2371                SetExtendedAttributeMode::Set,
2372            )
2373            .await
2374            .unwrap();
2375        assert_eq!(
2376            object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2377            test_small_attr.value()
2378        );
2379
2380        // With a small attribute, we don't expect it to write to an fxfs attribute.
2381        assert!(object.read_attr(64).await.expect("read_attr failed").is_none());
2382
2383        crate::fsck::fsck(fs.clone()).await.unwrap();
2384
2385        object
2386            .set_extended_attribute(
2387                test_large_attr.name(),
2388                test_large_attr.value(),
2389                SetExtendedAttributeMode::Set,
2390            )
2391            .await
2392            .unwrap();
2393        assert_eq!(
2394            object.get_extended_attribute(test_large_attr.name()).await.unwrap(),
2395            test_large_attr.value()
2396        );
2397
2398        // Once the value is above the threshold, we expect it to get upgraded to an fxfs
2399        // attribute.
2400        assert_eq!(
2401            object
2402                .read_attr(64)
2403                .await
2404                .expect("read_attr failed")
2405                .expect("read_attr returned none")
2406                .into_vec(),
2407            test_large_attr.value()
2408        );
2409
2410        crate::fsck::fsck(fs.clone()).await.unwrap();
2411
2412        object
2413            .set_extended_attribute(
2414                test_small_attr.name(),
2415                test_small_attr.value(),
2416                SetExtendedAttributeMode::Set,
2417            )
2418            .await
2419            .unwrap();
2420        assert_eq!(
2421            object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2422            test_small_attr.value()
2423        );
2424
2425        // Even though we are back under the threshold, we still expect it to be stored in an fxfs
2426        // attribute, because we don't downgrade to inline once we've allocated one.
2427        assert_eq!(
2428            object
2429                .read_attr(64)
2430                .await
2431                .expect("read_attr failed")
2432                .expect("read_attr returned none")
2433                .into_vec(),
2434            test_small_attr.value()
2435        );
2436
2437        crate::fsck::fsck(fs.clone()).await.unwrap();
2438
2439        object.remove_extended_attribute(test_small_attr.name()).await.expect("failed to remove");
2440
2441        crate::fsck::fsck(fs.clone()).await.unwrap();
2442
2443        fs.close().await.expect("close failed");
2444    }
2445
2446    #[fuchsia::test]
2447    async fn extended_attribute_max_size() {
2448        let (fs, object) = test_filesystem_and_empty_object().await;
2449
2450        let test_attr = TestAttr::new(
2451            vec![3u8; super::MAX_XATTR_NAME_SIZE],
2452            vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2453        );
2454
2455        object
2456            .set_extended_attribute(
2457                test_attr.name(),
2458                test_attr.value(),
2459                SetExtendedAttributeMode::Set,
2460            )
2461            .await
2462            .unwrap();
2463        assert_eq!(
2464            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2465            test_attr.value()
2466        );
2467        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2468        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2469
2470        fs.close().await.expect("close failed");
2471    }
2472
2473    #[fuchsia::test]
2474    async fn extended_attribute_remove_then_create() {
2475        let (fs, object) = test_filesystem_and_empty_object().await;
2476
2477        let test_attr = TestAttr::new(
2478            vec![3u8; super::MAX_XATTR_NAME_SIZE],
2479            vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2480        );
2481
2482        object
2483            .set_extended_attribute(
2484                test_attr.name(),
2485                test_attr.value(),
2486                SetExtendedAttributeMode::Create,
2487            )
2488            .await
2489            .unwrap();
2490        fs.journal().compact().await.unwrap();
2491        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2492        object
2493            .set_extended_attribute(
2494                test_attr.name(),
2495                test_attr.value(),
2496                SetExtendedAttributeMode::Create,
2497            )
2498            .await
2499            .unwrap();
2500
2501        assert_eq!(
2502            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2503            test_attr.value()
2504        );
2505
2506        fs.close().await.expect("close failed");
2507    }
2508
2509    #[fuchsia::test]
2510    async fn large_extended_attribute_max_number() {
2511        let (fs, object) = test_filesystem_and_empty_object().await;
2512
2513        let max_xattrs =
2514            super::EXTENDED_ATTRIBUTE_RANGE_END - super::EXTENDED_ATTRIBUTE_RANGE_START;
2515        for i in 0..max_xattrs {
2516            let test_attr = TestAttr::new(format!("{}", i).as_bytes(), vec![0x3; 300]);
2517            object
2518                .set_extended_attribute(
2519                    test_attr.name(),
2520                    test_attr.value(),
2521                    SetExtendedAttributeMode::Set,
2522                )
2523                .await
2524                .unwrap_or_else(|_| panic!("failed to set xattr number {}", i));
2525        }
2526
2527        // That should have taken up all the attributes we've allocated to extended attributes, so
2528        // this one should return ERR_NO_SPACE.
2529        match object
2530            .set_extended_attribute(
2531                b"one.too.many".to_vec(),
2532                vec![0x3; 300],
2533                SetExtendedAttributeMode::Set,
2534            )
2535            .await
2536        {
2537            Ok(()) => panic!("set should not succeed"),
2538            Err(e) => is_error(e, FxfsError::NoSpace),
2539        }
2540
2541        // But inline attributes don't need an attribute number, so it should work fine.
2542        object
2543            .set_extended_attribute(
2544                b"this.is.okay".to_vec(),
2545                b"small value".to_vec(),
2546                SetExtendedAttributeMode::Set,
2547            )
2548            .await
2549            .unwrap();
2550
2551        // And updating existing ones should be okay.
2552        object
2553            .set_extended_attribute(b"11".to_vec(), vec![0x4; 300], SetExtendedAttributeMode::Set)
2554            .await
2555            .unwrap();
2556        object
2557            .set_extended_attribute(
2558                b"12".to_vec(),
2559                vec![0x1; 300],
2560                SetExtendedAttributeMode::Replace,
2561            )
2562            .await
2563            .unwrap();
2564
2565        // And we should be able to remove an attribute and set another one.
2566        object.remove_extended_attribute(b"5".to_vec()).await.unwrap();
2567        object
2568            .set_extended_attribute(
2569                b"new attr".to_vec(),
2570                vec![0x3; 300],
2571                SetExtendedAttributeMode::Set,
2572            )
2573            .await
2574            .unwrap();
2575
2576        fs.close().await.expect("close failed");
2577    }
2578
2579    #[fuchsia::test]
2580    async fn write_attr_trims_beyond_new_end() {
2581        // When writing, multi_write will deallocate old extents that overlap with the new data,
2582        // but it doesn't trim anything beyond that, since it doesn't know what the total size will
2583        // be. write_attr does know, because it writes the whole attribute at once, so we need to
2584        // make sure it cleans up properly.
2585        let (fs, object) = test_filesystem_and_empty_object().await;
2586
2587        let block_size = fs.block_size();
2588        let buf_size = block_size * 2;
2589        let attribute_id = 10;
2590
2591        let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2592        let mut buffer = object.allocate_buffer(buf_size as usize).await;
2593        buffer.as_mut_slice().fill(3);
2594        // Writing two separate ranges, even if they are contiguous, forces them to be separate
2595        // extent records.
2596        object
2597            .multi_write(
2598                &mut transaction,
2599                attribute_id,
2600                &[0..block_size, block_size..block_size * 2],
2601                buffer.as_mut(),
2602            )
2603            .await
2604            .unwrap();
2605        transaction.add(
2606            object.store().store_object_id,
2607            Mutation::replace_or_insert_object(
2608                ObjectKey::attribute(object.object_id(), attribute_id, AttributeKey::Attribute),
2609                ObjectValue::attribute(block_size * 2, false),
2610            ),
2611        );
2612        transaction.commit().await.unwrap();
2613
2614        crate::fsck::fsck(fs.clone()).await.unwrap();
2615
2616        let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2617        let needs_trim = (*object)
2618            .write_attr(&mut transaction, attribute_id, &vec![3u8; block_size as usize])
2619            .await
2620            .unwrap();
2621        assert!(!needs_trim.0);
2622        transaction.commit().await.unwrap();
2623
2624        crate::fsck::fsck(fs.clone()).await.unwrap();
2625
2626        fs.close().await.expect("close failed");
2627    }
2628
2629    #[fuchsia::test]
2630    async fn write_new_attr_in_batches_multiple_txns() {
2631        let (fs, object) = test_filesystem_and_empty_object().await;
2632        let merkle_tree = vec![1; 3 * WRITE_ATTR_BATCH_SIZE];
2633        let mut transaction =
2634            (*object).new_transaction(FSVERITY_MERKLE_ATTRIBUTE_ID).await.unwrap();
2635        object
2636            .write_new_attr_in_batches(
2637                &mut transaction,
2638                FSVERITY_MERKLE_ATTRIBUTE_ID,
2639                &merkle_tree,
2640                WRITE_ATTR_BATCH_SIZE,
2641            )
2642            .await
2643            .expect("failed to write merkle attribute");
2644
2645        transaction.add(
2646            object.store().store_object_id,
2647            Mutation::replace_or_insert_object(
2648                ObjectKey::graveyard_attribute_entry(
2649                    object.store().graveyard_directory_object_id(),
2650                    object.object_id(),
2651                    FSVERITY_MERKLE_ATTRIBUTE_ID,
2652                ),
2653                ObjectValue::None,
2654            ),
2655        );
2656        transaction.commit().await.unwrap();
2657        assert_eq!(
2658            object.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
2659            Some(merkle_tree.into())
2660        );
2661
2662        fs.close().await.expect("close failed");
2663    }
2664
2665    // Running on target only, to use fake time features in the executor.
2666    #[cfg(target_os = "fuchsia")]
2667    #[fuchsia::test(allow_stalls = false)]
2668    async fn test_watchdog() {
2669        use super::Watchdog;
2670        use fuchsia_async::{MonotonicDuration, MonotonicInstant, TestExecutor};
2671        use std::sync::mpsc::channel;
2672
2673        TestExecutor::advance_to(make_time(0)).await;
2674        let (sender, receiver) = channel();
2675
2676        fn make_time(time_secs: i64) -> MonotonicInstant {
2677            MonotonicInstant::from_nanos(0) + MonotonicDuration::from_seconds(time_secs)
2678        }
2679
2680        {
2681            let _watchdog = Watchdog::new(10, move |count| {
2682                sender.send(count).expect("Sending value");
2683            });
2684
2685            // Too early.
2686            TestExecutor::advance_to(make_time(5)).await;
2687            receiver.try_recv().expect_err("Should not have message");
2688
2689            // First message.
2690            TestExecutor::advance_to(make_time(10)).await;
2691            assert_eq!(1, receiver.recv().expect("Receiving"));
2692
2693            // Too early for the next.
2694            TestExecutor::advance_to(make_time(15)).await;
2695            receiver.try_recv().expect_err("Should not have message");
2696
2697            // Missed one. They'll be spooled up.
2698            TestExecutor::advance_to(make_time(30)).await;
2699            assert_eq!(2, receiver.recv().expect("Receiving"));
2700            assert_eq!(3, receiver.recv().expect("Receiving"));
2701        }
2702
2703        // Watchdog is dropped, nothing should trigger.
2704        TestExecutor::advance_to(make_time(100)).await;
2705        receiver.recv().expect_err("Watchdog should be gone");
2706    }
2707
2708    #[fuchsia::test]
2709    fn test_checksum_range_chunk() {
2710        let block_size = 4096;
2711
2712        // No bitmap means one chunk that covers the whole range
2713        assert_eq!(
2714            ChecksumRangeChunk::group_first_write_ranges(
2715                &mut OverwriteBitmaps::None,
2716                block_size,
2717                block_size * 2..block_size * 5,
2718            ),
2719            vec![ChecksumRangeChunk {
2720                checksum_range: 0..3,
2721                device_range: block_size * 2..block_size * 5,
2722                is_first_write: false,
2723            }],
2724        );
2725
2726        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2727        assert_eq!(
2728            ChecksumRangeChunk::group_first_write_ranges(
2729                &mut bitmaps,
2730                block_size,
2731                block_size * 2..block_size * 5,
2732            ),
2733            vec![ChecksumRangeChunk {
2734                checksum_range: 0..3,
2735                device_range: block_size * 2..block_size * 5,
2736                is_first_write: false,
2737            }],
2738        );
2739        assert_eq!(
2740            bitmaps.take_bitmaps(),
2741            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b11100000])))
2742        );
2743
2744        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2745        bitmaps.set_offset(2);
2746        assert_eq!(
2747            ChecksumRangeChunk::group_first_write_ranges(
2748                &mut bitmaps,
2749                block_size,
2750                block_size * 2..block_size * 5,
2751            ),
2752            vec![
2753                ChecksumRangeChunk {
2754                    checksum_range: 0..2,
2755                    device_range: block_size * 2..block_size * 4,
2756                    is_first_write: false,
2757                },
2758                ChecksumRangeChunk {
2759                    checksum_range: 2..3,
2760                    device_range: block_size * 4..block_size * 5,
2761                    is_first_write: true,
2762                },
2763            ],
2764        );
2765        assert_eq!(
2766            bitmaps.take_bitmaps(),
2767            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00111000])))
2768        );
2769
2770        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2771        bitmaps.set_offset(4);
2772        assert_eq!(
2773            ChecksumRangeChunk::group_first_write_ranges(
2774                &mut bitmaps,
2775                block_size,
2776                block_size * 2..block_size * 5,
2777            ),
2778            vec![ChecksumRangeChunk {
2779                checksum_range: 0..3,
2780                device_range: block_size * 2..block_size * 5,
2781                is_first_write: true,
2782            }],
2783        );
2784        assert_eq!(
2785            bitmaps.take_bitmaps(),
2786            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00001110])))
2787        );
2788
2789        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b01010101]));
2790        assert_eq!(
2791            ChecksumRangeChunk::group_first_write_ranges(
2792                &mut bitmaps,
2793                block_size,
2794                block_size * 2..block_size * 10,
2795            ),
2796            vec![
2797                ChecksumRangeChunk {
2798                    checksum_range: 0..1,
2799                    device_range: block_size * 2..block_size * 3,
2800                    is_first_write: true,
2801                },
2802                ChecksumRangeChunk {
2803                    checksum_range: 1..2,
2804                    device_range: block_size * 3..block_size * 4,
2805                    is_first_write: false,
2806                },
2807                ChecksumRangeChunk {
2808                    checksum_range: 2..3,
2809                    device_range: block_size * 4..block_size * 5,
2810                    is_first_write: true,
2811                },
2812                ChecksumRangeChunk {
2813                    checksum_range: 3..4,
2814                    device_range: block_size * 5..block_size * 6,
2815                    is_first_write: false,
2816                },
2817                ChecksumRangeChunk {
2818                    checksum_range: 4..5,
2819                    device_range: block_size * 6..block_size * 7,
2820                    is_first_write: true,
2821                },
2822                ChecksumRangeChunk {
2823                    checksum_range: 5..6,
2824                    device_range: block_size * 7..block_size * 8,
2825                    is_first_write: false,
2826                },
2827                ChecksumRangeChunk {
2828                    checksum_range: 6..7,
2829                    device_range: block_size * 8..block_size * 9,
2830                    is_first_write: true,
2831                },
2832                ChecksumRangeChunk {
2833                    checksum_range: 7..8,
2834                    device_range: block_size * 9..block_size * 10,
2835                    is_first_write: false,
2836                },
2837            ],
2838        );
2839        assert_eq!(
2840            bitmaps.take_bitmaps(),
2841            Some((BitVec::from_bytes(&[0b01010101]), BitVec::from_bytes(&[0b11111111])))
2842        );
2843    }
2844}