fxfs/object_store/
store_object_handle.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::{Checksum, Checksums, fletcher64};
6use crate::errors::FxfsError;
7use crate::log::*;
8use crate::lsm_tree::Query;
9use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
10use crate::object_handle::ObjectHandle;
11use crate::object_store::extent_record::{ExtentKey, ExtentMode, ExtentValue};
12use crate::object_store::object_manager::ObjectManager;
13use crate::object_store::object_record::{
14    AttributeKey, ExtendedAttributeValue, ObjectAttributes, ObjectItem, ObjectKey, ObjectKeyData,
15    ObjectValue, Timestamp,
16};
17use crate::object_store::transaction::{
18    AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Options, ReadGuard,
19    Transaction, lock_keys,
20};
21use crate::object_store::{
22    HandleOptions, HandleOwner, ObjectStore, TrimMode, TrimResult, VOLUME_DATA_KEY_ID,
23};
24use crate::range::RangeExt;
25use crate::round::{round_down, round_up};
26use anyhow::{Context, Error, anyhow, bail, ensure};
27use assert_matches::assert_matches;
28use bit_vec::BitVec;
29use futures::stream::{FuturesOrdered, FuturesUnordered};
30use futures::{TryStreamExt, try_join};
31use fxfs_crypto::{Cipher, CipherSet, EncryptionKey, FindKeyResult, FxfsCipher, KeyPurpose};
32use fxfs_trace::trace;
33use static_assertions::const_assert;
34use std::cmp::min;
35use std::future::Future;
36use std::ops::Range;
37use std::sync::Arc;
38use std::sync::atomic::{self, AtomicBool, Ordering};
39use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
40use storage_device::{InlineCryptoOptions, ReadOptions, WriteOptions};
41
42use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
43
44/// Maximum size for an extended attribute name.
45pub const MAX_XATTR_NAME_SIZE: usize = 255;
46/// Maximum size an extended attribute can be before it's stored in an object attribute instead of
47/// inside the record directly.
48pub const MAX_INLINE_XATTR_SIZE: usize = 256;
49/// Maximum size for an extended attribute value. NB: the maximum size for an extended attribute is
50/// 64kB, which we rely on for correctness when deleting attributes, so ensure it's always
51/// enforced.
52pub const MAX_XATTR_VALUE_SIZE: usize = 64000;
53/// The range of fxfs attribute ids which are reserved for extended attribute values. Whenever a
54/// new attribute is needed, the first unused id will be chosen from this range. It's technically
55/// safe to change these values, but it has potential consequences - they are only used during id
56/// selection, so any existing extended attributes keep their ids, which means any past or present
57/// selected range here could potentially have used attributes unless they are explicitly migrated,
58/// which isn't currently done.
59pub const EXTENDED_ATTRIBUTE_RANGE_START: u64 = 64;
60pub const EXTENDED_ATTRIBUTE_RANGE_END: u64 = 512;
61
62/// Zeroes blocks in 'buffer' based on `bitmap`, one bit per block from start of buffer.
63fn apply_bitmap_zeroing(
64    block_size: usize,
65    bitmap: &bit_vec::BitVec,
66    mut buffer: MutableBufferRef<'_>,
67) {
68    let buf = buffer.as_mut_slice();
69    debug_assert_eq!(bitmap.len() * block_size, buf.len());
70    for (i, block) in bitmap.iter().enumerate() {
71        if !block {
72            let start = i * block_size;
73            buf[start..start + block_size].fill(0);
74        }
75    }
76}
77
78/// When writing, often the logic should be generic over whether or not checksums are generated.
79/// This provides that and a handy way to convert to the more general ExtentMode that eventually
80/// stores it on disk.
81#[derive(Debug, Clone, PartialEq)]
82pub enum MaybeChecksums {
83    None,
84    Fletcher(Vec<Checksum>),
85}
86
87impl MaybeChecksums {
88    pub fn maybe_as_ref(&self) -> Option<&[Checksum]> {
89        match self {
90            Self::None => None,
91            Self::Fletcher(sums) => Some(&sums),
92        }
93    }
94
95    pub fn split_off(&mut self, at: usize) -> Self {
96        match self {
97            Self::None => Self::None,
98            Self::Fletcher(sums) => Self::Fletcher(sums.split_off(at)),
99        }
100    }
101
102    pub fn to_mode(self) -> ExtentMode {
103        match self {
104            Self::None => ExtentMode::Raw,
105            Self::Fletcher(sums) => ExtentMode::Cow(Checksums::fletcher(sums)),
106        }
107    }
108
109    pub fn into_option(self) -> Option<Vec<Checksum>> {
110        match self {
111            Self::None => None,
112            Self::Fletcher(sums) => Some(sums),
113        }
114    }
115}
116
117/// The mode of operation when setting extended attributes. This is the same as the fidl definition
118/// but is replicated here so we don't have fuchsia.io structures in the api, so this can be used
119/// on host.
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
121pub enum SetExtendedAttributeMode {
122    /// Create the extended attribute if it doesn't exist, replace the value if it does.
123    Set,
124    /// Create the extended attribute if it doesn't exist, fail if it does.
125    Create,
126    /// Replace the extended attribute value if it exists, fail if it doesn't.
127    Replace,
128}
129
130impl From<fio::SetExtendedAttributeMode> for SetExtendedAttributeMode {
131    fn from(other: fio::SetExtendedAttributeMode) -> SetExtendedAttributeMode {
132        match other {
133            fio::SetExtendedAttributeMode::Set => SetExtendedAttributeMode::Set,
134            fio::SetExtendedAttributeMode::Create => SetExtendedAttributeMode::Create,
135            fio::SetExtendedAttributeMode::Replace => SetExtendedAttributeMode::Replace,
136        }
137    }
138}
139
140enum Encryption {
141    /// The object doesn't use encryption.
142    None,
143
144    /// The object has keys that are cached (which means unwrapping occurs on-demand) with
145    /// KeyManager.
146    CachedKeys,
147
148    /// The object has permanent keys registered with KeyManager.
149    PermanentKeys,
150}
151
152#[derive(PartialEq, Debug)]
153enum OverwriteBitmaps {
154    None,
155    Some {
156        /// The block bitmap of a partial overwrite extent in the tree.
157        extent_bitmap: BitVec,
158        /// A bitmap of the blocks written to by the current overwrite.
159        write_bitmap: BitVec,
160        /// BitVec doesn't have a slice equivalent, so for a particular section of the write we
161        /// keep track of an offset in the bitmaps to operate on.
162        bitmap_offset: usize,
163    },
164}
165
166impl OverwriteBitmaps {
167    fn new(extent_bitmap: BitVec) -> Self {
168        OverwriteBitmaps::Some {
169            write_bitmap: BitVec::from_elem(extent_bitmap.len(), false),
170            extent_bitmap,
171            bitmap_offset: 0,
172        }
173    }
174
175    fn is_none(&self) -> bool {
176        *self == OverwriteBitmaps::None
177    }
178
179    fn set_offset(&mut self, new_offset: usize) {
180        match self {
181            OverwriteBitmaps::None => (),
182            OverwriteBitmaps::Some { bitmap_offset, .. } => *bitmap_offset = new_offset,
183        }
184    }
185
186    fn get_from_extent_bitmap(&self, i: usize) -> Option<bool> {
187        match self {
188            OverwriteBitmaps::None => None,
189            OverwriteBitmaps::Some { extent_bitmap, bitmap_offset, .. } => {
190                extent_bitmap.get(*bitmap_offset + i)
191            }
192        }
193    }
194
195    fn set_in_write_bitmap(&mut self, i: usize, x: bool) {
196        match self {
197            OverwriteBitmaps::None => (),
198            OverwriteBitmaps::Some { write_bitmap, bitmap_offset, .. } => {
199                write_bitmap.set(*bitmap_offset + i, x)
200            }
201        }
202    }
203
204    fn take_bitmaps(self) -> Option<(BitVec, BitVec)> {
205        match self {
206            OverwriteBitmaps::None => None,
207            OverwriteBitmaps::Some { extent_bitmap, write_bitmap, .. } => {
208                Some((extent_bitmap, write_bitmap))
209            }
210        }
211    }
212}
213
214/// When writing to Overwrite ranges, we need to emit whether a set of checksums for a device range
215/// is the first write to that region or not. This tracks one such range so we can use it after the
216/// write to break up the returned checksum list.
217#[derive(PartialEq, Debug)]
218struct ChecksumRangeChunk {
219    checksum_range: Range<usize>,
220    device_range: Range<u64>,
221    is_first_write: bool,
222}
223
224impl ChecksumRangeChunk {
225    fn group_first_write_ranges(
226        bitmaps: &mut OverwriteBitmaps,
227        block_size: u64,
228        write_device_range: Range<u64>,
229    ) -> Vec<ChecksumRangeChunk> {
230        let write_block_len = (write_device_range.length().unwrap() / block_size) as usize;
231        if bitmaps.is_none() {
232            // If there is no bitmap, then the overwrite range is fully written to. However, we
233            // could still be within the journal flush window where one of the blocks was written
234            // to for the first time to put it in this state, so we still need to emit the
235            // checksums in case replay needs them.
236            vec![ChecksumRangeChunk {
237                checksum_range: 0..write_block_len,
238                device_range: write_device_range,
239                is_first_write: false,
240            }]
241        } else {
242            let mut checksum_ranges = vec![ChecksumRangeChunk {
243                checksum_range: 0..0,
244                device_range: write_device_range.start..write_device_range.start,
245                is_first_write: !bitmaps.get_from_extent_bitmap(0).unwrap(),
246            }];
247            let mut working_range = checksum_ranges.last_mut().unwrap();
248            for i in 0..write_block_len {
249                bitmaps.set_in_write_bitmap(i, true);
250
251                // bitmap.get returning true means the block is initialized and therefore has been
252                // written to before.
253                if working_range.is_first_write != bitmaps.get_from_extent_bitmap(i).unwrap() {
254                    // is_first_write is tracking opposite of what comes back from the bitmap, so
255                    // if the are still opposites we continue our current range.
256                    working_range.checksum_range.end += 1;
257                    working_range.device_range.end += block_size;
258                } else {
259                    // If they are the same, then we need to make a new chunk.
260                    let new_chunk = ChecksumRangeChunk {
261                        checksum_range: working_range.checksum_range.end
262                            ..working_range.checksum_range.end + 1,
263                        device_range: working_range.device_range.end
264                            ..working_range.device_range.end + block_size,
265                        is_first_write: !working_range.is_first_write,
266                    };
267                    checksum_ranges.push(new_chunk);
268                    working_range = checksum_ranges.last_mut().unwrap();
269                }
270            }
271            checksum_ranges
272        }
273    }
274}
275
276/// StoreObjectHandle is the lowest-level, untyped handle to an object with the id [`object_id`] in
277/// a particular store, [`owner`]. It provides functionality shared across all objects, such as
278/// reading and writing attributes and managing encryption keys.
279///
280/// Since it's untyped, it doesn't do any object kind validation, and is generally meant to
281/// implement higher-level typed handles.
282///
283/// For file-like objects with a data attribute, DataObjectHandle implements traits and helpers for
284/// doing more complex extent management and caches the content size.
285///
286/// For directory-like objects, Directory knows how to add and remove child objects and enumerate
287/// its children.
288pub struct StoreObjectHandle<S: HandleOwner> {
289    owner: Arc<S>,
290    object_id: u64,
291    options: HandleOptions,
292    trace: AtomicBool,
293    encryption: Encryption,
294}
295
296impl<S: HandleOwner> ObjectHandle for StoreObjectHandle<S> {
297    fn set_trace(&self, v: bool) {
298        info!(store_id = self.store().store_object_id, oid = self.object_id(), trace = v; "trace");
299        self.trace.store(v, atomic::Ordering::Relaxed);
300    }
301
302    fn object_id(&self) -> u64 {
303        return self.object_id;
304    }
305
306    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
307        self.store().device.allocate_buffer(size)
308    }
309
310    fn block_size(&self) -> u64 {
311        self.store().block_size()
312    }
313}
314
315struct Watchdog {
316    _task: fasync::Task<()>,
317}
318
319impl Watchdog {
320    fn new(increment_seconds: u64, cb: impl Fn(u64) + Send + 'static) -> Self {
321        Self {
322            _task: fasync::Task::spawn(async move {
323                let increment = increment_seconds.try_into().unwrap();
324                let mut fired_counter = 0;
325                let mut next_wake = fasync::MonotonicInstant::now();
326                loop {
327                    next_wake += std::time::Duration::from_secs(increment).into();
328                    // If this isn't being scheduled this will purposely result in fast looping when
329                    // it does. This will be insightful about the state of the thread and task
330                    // scheduling.
331                    if fasync::MonotonicInstant::now() < next_wake {
332                        fasync::Timer::new(next_wake).await;
333                    }
334                    fired_counter += 1;
335                    cb(fired_counter);
336                }
337            }),
338        }
339    }
340}
341
342impl<S: HandleOwner> StoreObjectHandle<S> {
343    /// Make a new StoreObjectHandle for the object with id [`object_id`] in store [`owner`].
344    pub fn new(
345        owner: Arc<S>,
346        object_id: u64,
347        permanent_keys: bool,
348        mut options: HandleOptions,
349        trace: bool,
350    ) -> Self {
351        let encryption = if permanent_keys {
352            Encryption::PermanentKeys
353        } else if owner.as_ref().as_ref().is_encrypted() {
354            Encryption::CachedKeys
355        } else {
356            Encryption::None
357        };
358        let store: &ObjectStore = owner.as_ref().as_ref();
359        if store.is_encrypted() && store.filesystem().options().inline_crypto_enabled {
360            options.skip_checksums = true;
361        }
362        Self { owner, object_id, encryption, options, trace: AtomicBool::new(trace) }
363    }
364
365    pub fn owner(&self) -> &Arc<S> {
366        &self.owner
367    }
368
369    pub fn store(&self) -> &ObjectStore {
370        self.owner.as_ref().as_ref()
371    }
372
373    pub fn trace(&self) -> bool {
374        self.trace.load(atomic::Ordering::Relaxed)
375    }
376
377    pub fn is_encrypted(&self) -> bool {
378        !matches!(self.encryption, Encryption::None)
379    }
380
381    /// Get the default set of transaction options for this object. This is mostly the overall
382    /// default, modified by any [`HandleOptions`] held by this handle.
383    pub fn default_transaction_options<'b>(&self) -> Options<'b> {
384        Options { skip_journal_checks: self.options.skip_journal_checks, ..Default::default() }
385    }
386
387    pub async fn new_transaction_with_options<'b>(
388        &self,
389        attribute_id: u64,
390        options: Options<'b>,
391    ) -> Result<Transaction<'b>, Error> {
392        Ok(self
393            .store()
394            .filesystem()
395            .new_transaction(
396                lock_keys![
397                    LockKey::object_attribute(
398                        self.store().store_object_id(),
399                        self.object_id(),
400                        attribute_id,
401                    ),
402                    LockKey::object(self.store().store_object_id(), self.object_id()),
403                ],
404                options,
405            )
406            .await?)
407    }
408
409    pub async fn new_transaction<'b>(&self, attribute_id: u64) -> Result<Transaction<'b>, Error> {
410        self.new_transaction_with_options(attribute_id, self.default_transaction_options()).await
411    }
412
413    // If |transaction| has an impending mutation for the underlying object, returns that.
414    // Otherwise, looks up the object from the tree.
415    async fn txn_get_object_mutation(
416        &self,
417        transaction: &Transaction<'_>,
418    ) -> Result<ObjectStoreMutation, Error> {
419        self.store().txn_get_object_mutation(transaction, self.object_id()).await
420    }
421
422    // Returns the amount deallocated.
423    async fn deallocate_old_extents(
424        &self,
425        transaction: &mut Transaction<'_>,
426        attribute_id: u64,
427        range: Range<u64>,
428    ) -> Result<u64, Error> {
429        let block_size = self.block_size();
430        assert_eq!(range.start % block_size, 0);
431        assert_eq!(range.end % block_size, 0);
432        if range.start == range.end {
433            return Ok(0);
434        }
435        let tree = &self.store().tree;
436        let layer_set = tree.layer_set();
437        let key = ExtentKey { range };
438        let lower_bound = ObjectKey::attribute(
439            self.object_id(),
440            attribute_id,
441            AttributeKey::Extent(key.search_key()),
442        );
443        let mut merger = layer_set.merger();
444        let mut iter = merger.query(Query::FullRange(&lower_bound)).await?;
445        let allocator = self.store().allocator();
446        let mut deallocated = 0;
447        let trace = self.trace();
448        while let Some(ItemRef {
449            key:
450                ObjectKey {
451                    object_id,
452                    data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
453                },
454            value: ObjectValue::Extent(value),
455            ..
456        }) = iter.get()
457        {
458            if *object_id != self.object_id() || *attr_id != attribute_id {
459                break;
460            }
461            if let ExtentValue::Some { device_offset, .. } = value {
462                if let Some(overlap) = key.overlap(extent_key) {
463                    let range = device_offset + overlap.start - extent_key.range.start
464                        ..device_offset + overlap.end - extent_key.range.start;
465                    ensure!(range.is_aligned(block_size), FxfsError::Inconsistent);
466                    if trace {
467                        info!(
468                            store_id = self.store().store_object_id(),
469                            oid = self.object_id(),
470                            device_range:? = range,
471                            len = range.end - range.start,
472                            extent_key:?;
473                            "D",
474                        );
475                    }
476                    allocator
477                        .deallocate(transaction, self.store().store_object_id(), range)
478                        .await?;
479                    deallocated += overlap.end - overlap.start;
480                } else {
481                    break;
482                }
483            }
484            iter.advance().await?;
485        }
486        Ok(deallocated)
487    }
488
489    // Writes aligned data (that should already be encrypted) to the given offset and computes
490    // checksums if requested. The aligned data must be from a single logical file range.
491    async fn write_aligned(
492        &self,
493        buf: BufferRef<'_>,
494        device_offset: u64,
495        crypt_ctx: Option<(u32, u8)>,
496    ) -> Result<MaybeChecksums, Error> {
497        if self.trace() {
498            info!(
499                store_id = self.store().store_object_id(),
500                oid = self.object_id(),
501                device_range:? = (device_offset..device_offset + buf.len() as u64),
502                len = buf.len();
503                "W",
504            );
505        }
506        let store = self.store();
507        store.device_write_ops.fetch_add(1, Ordering::Relaxed);
508        let mut checksums = Vec::new();
509        let _watchdog = Watchdog::new(10, |count| {
510            warn!("Write has been stalled for {} seconds", count * 10);
511        });
512
513        if self.options.skip_checksums {
514            store
515                .device
516                .write_with_opts(
517                    device_offset as u64,
518                    buf,
519                    crypt_ctx.map_or_else(
520                        || WriteOptions::default(),
521                        |(dun, slot)| WriteOptions {
522                            inline_crypto_options: InlineCryptoOptions { dun, slot },
523                            ..Default::default()
524                        },
525                    ),
526                )
527                .await?;
528            Ok(MaybeChecksums::None)
529        } else {
530            debug_assert!(crypt_ctx.is_none());
531            try_join!(store.device.write(device_offset, buf), async {
532                let block_size = self.block_size();
533                for chunk in buf.as_slice().chunks_exact(block_size as usize) {
534                    checksums.push(fletcher64(chunk, 0));
535                }
536                Ok(())
537            })?;
538            Ok(MaybeChecksums::Fletcher(checksums))
539        }
540    }
541
542    /// Flushes the underlying device.  This is expensive and should be used sparingly.
543    pub async fn flush_device(&self) -> Result<(), Error> {
544        self.store().device().flush().await
545    }
546
547    pub async fn update_allocated_size(
548        &self,
549        transaction: &mut Transaction<'_>,
550        allocated: u64,
551        deallocated: u64,
552    ) -> Result<(), Error> {
553        if allocated == deallocated {
554            return Ok(());
555        }
556        let mut mutation = self.txn_get_object_mutation(transaction).await?;
557        if let ObjectValue::Object {
558            attributes: ObjectAttributes { project_id, allocated_size, .. },
559            ..
560        } = &mut mutation.item.value
561        {
562            // The only way for these to fail are if the volume is inconsistent.
563            *allocated_size = allocated_size
564                .checked_add(allocated)
565                .ok_or_else(|| anyhow!(FxfsError::Inconsistent).context("Allocated size overflow"))?
566                .checked_sub(deallocated)
567                .ok_or_else(|| {
568                    anyhow!(FxfsError::Inconsistent).context("Allocated size underflow")
569                })?;
570
571            if *project_id != 0 {
572                // The allocated and deallocated shouldn't exceed the max size of the file which is
573                // bound within i64.
574                let diff = i64::try_from(allocated).unwrap() - i64::try_from(deallocated).unwrap();
575                transaction.add(
576                    self.store().store_object_id(),
577                    Mutation::merge_object(
578                        ObjectKey::project_usage(
579                            self.store().root_directory_object_id(),
580                            *project_id,
581                        ),
582                        ObjectValue::BytesAndNodes { bytes: diff, nodes: 0 },
583                    ),
584                );
585            }
586        } else {
587            // This can occur when the object mutation is created from an object in the tree which
588            // was corrupt.
589            bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
590        }
591        transaction.add(self.store().store_object_id, Mutation::ObjectStore(mutation));
592        Ok(())
593    }
594
595    pub async fn update_attributes<'a>(
596        &self,
597        transaction: &mut Transaction<'a>,
598        node_attributes: Option<&fio::MutableNodeAttributes>,
599        change_time: Option<Timestamp>,
600    ) -> Result<(), Error> {
601        if let Some(&fio::MutableNodeAttributes { selinux_context: Some(ref context), .. }) =
602            node_attributes
603        {
604            if let fio::SelinuxContext::Data(context) = context {
605                self.set_extended_attribute_impl(
606                    "security.selinux".into(),
607                    context.clone(),
608                    SetExtendedAttributeMode::Set,
609                    transaction,
610                )
611                .await?;
612            } else {
613                return Err(anyhow!(FxfsError::InvalidArgs)
614                    .context("Only set SELinux context with `data` member."));
615            }
616        }
617        self.store()
618            .update_attributes(transaction, self.object_id, node_attributes, change_time)
619            .await
620    }
621
622    /// Zeroes the given range.  The range must be aligned.  Returns the amount of data deallocated.
623    pub async fn zero(
624        &self,
625        transaction: &mut Transaction<'_>,
626        attribute_id: u64,
627        range: Range<u64>,
628    ) -> Result<(), Error> {
629        let deallocated =
630            self.deallocate_old_extents(transaction, attribute_id, range.clone()).await?;
631        if deallocated > 0 {
632            self.update_allocated_size(transaction, 0, deallocated).await?;
633            transaction.add(
634                self.store().store_object_id,
635                Mutation::merge_object(
636                    ObjectKey::extent(self.object_id(), attribute_id, range),
637                    ObjectValue::Extent(ExtentValue::deleted_extent()),
638                ),
639            );
640        }
641        Ok(())
642    }
643
644    // Returns a new aligned buffer (reading the head and tail blocks if necessary) with a copy of
645    // the data from `buf`.
646    pub async fn align_buffer(
647        &self,
648        attribute_id: u64,
649        offset: u64,
650        buf: BufferRef<'_>,
651    ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
652        let block_size = self.block_size();
653        let end = offset + buf.len() as u64;
654        let aligned =
655            round_down(offset, block_size)..round_up(end, block_size).ok_or(FxfsError::TooBig)?;
656
657        let mut aligned_buf =
658            self.store().device.allocate_buffer((aligned.end - aligned.start) as usize).await;
659
660        // Deal with head alignment.
661        if aligned.start < offset {
662            let mut head_block = aligned_buf.subslice_mut(..block_size as usize);
663            let read = self.read(attribute_id, aligned.start, head_block.reborrow()).await?;
664            head_block.as_mut_slice()[read..].fill(0);
665        }
666
667        // Deal with tail alignment.
668        if aligned.end > end {
669            let end_block_offset = aligned.end - block_size;
670            // There's no need to read the tail block if we read it as part of the head block.
671            if offset <= end_block_offset {
672                let mut tail_block =
673                    aligned_buf.subslice_mut(aligned_buf.len() - block_size as usize..);
674                let read = self.read(attribute_id, end_block_offset, tail_block.reborrow()).await?;
675                tail_block.as_mut_slice()[read..].fill(0);
676            }
677        }
678
679        aligned_buf.as_mut_slice()
680            [(offset - aligned.start) as usize..(end - aligned.start) as usize]
681            .copy_from_slice(buf.as_slice());
682
683        Ok((aligned, aligned_buf))
684    }
685
686    /// Trim an attribute's extents, potentially adding a graveyard trim entry if more trimming is
687    /// needed, so the transaction can be committed without worrying about leaking data.
688    ///
689    /// This doesn't update the size stored in the attribute value - the caller is responsible for
690    /// doing that to keep the size up to date.
691    pub async fn shrink(
692        &self,
693        transaction: &mut Transaction<'_>,
694        attribute_id: u64,
695        size: u64,
696    ) -> Result<NeedsTrim, Error> {
697        let store = self.store();
698        let needs_trim = matches!(
699            store
700                .trim_some(transaction, self.object_id(), attribute_id, TrimMode::FromOffset(size))
701                .await?,
702            TrimResult::Incomplete
703        );
704        if needs_trim {
705            // Add the object to the graveyard in case the following transactions don't get
706            // replayed.
707            let graveyard_id = store.graveyard_directory_object_id();
708            match store
709                .tree
710                .find(&ObjectKey::graveyard_entry(graveyard_id, self.object_id()))
711                .await?
712            {
713                Some(ObjectItem { value: ObjectValue::Some, .. })
714                | Some(ObjectItem { value: ObjectValue::Trim, .. }) => {
715                    // This object is already in the graveyard so we don't need to do anything.
716                }
717                _ => {
718                    transaction.add(
719                        store.store_object_id,
720                        Mutation::replace_or_insert_object(
721                            ObjectKey::graveyard_entry(graveyard_id, self.object_id()),
722                            ObjectValue::Trim,
723                        ),
724                    );
725                }
726            }
727        }
728        Ok(NeedsTrim(needs_trim))
729    }
730
731    /// Reads and decrypts a singular logical range.
732    pub async fn read_and_decrypt(
733        &self,
734        device_offset: u64,
735        file_offset: u64,
736        mut buffer: MutableBufferRef<'_>,
737        key_id: u64,
738    ) -> Result<(), Error> {
739        let store = self.store();
740        store.device_read_ops.fetch_add(1, Ordering::Relaxed);
741
742        let _watchdog = Watchdog::new(10, |count| {
743            warn!("Read has been stalled for {} seconds", count * 10);
744        });
745
746        let (_key_id, key) = self.get_key(Some(key_id)).await?;
747        if let Some(key) = key {
748            if let Some((dun, slot)) = key.crypt_ctx(self.object_id, file_offset) {
749                store
750                    .device
751                    .read_with_opts(
752                        device_offset as u64,
753                        buffer.reborrow(),
754                        ReadOptions { inline_crypto_options: InlineCryptoOptions { dun, slot } },
755                    )
756                    .await?;
757            } else {
758                store.device.read(device_offset, buffer.reborrow()).await?;
759                key.decrypt(self.object_id, device_offset, file_offset, buffer.as_mut_slice())?;
760            }
761        } else {
762            store.device.read(device_offset, buffer.reborrow()).await?;
763        }
764
765        Ok(())
766    }
767
768    /// Returns the specified key. If `key_id` is None, it will try and return the fscrypt key if
769    /// it is present, or the volume key if it isn't. If the fscrypt key is present, but the key
770    /// cannot be unwrapped, then this will return `FxfsError::NoKey`. If the volume is not
771    /// encrypted, this returns None.
772    pub async fn get_key(
773        &self,
774        key_id: Option<u64>,
775    ) -> Result<(u64, Option<Arc<dyn Cipher>>), Error> {
776        let store = self.store();
777        Ok(match self.encryption {
778            Encryption::None => (VOLUME_DATA_KEY_ID, None),
779            Encryption::CachedKeys => {
780                if let Some(key_id) = key_id {
781                    (
782                        key_id,
783                        Some(
784                            store
785                                .key_manager
786                                .get_key(
787                                    self.object_id,
788                                    store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
789                                    async || store.get_keys(self.object_id).await,
790                                    key_id,
791                                )
792                                .await?,
793                        ),
794                    )
795                } else {
796                    let (key_id, key) = store
797                        .key_manager
798                        .get_fscrypt_key_if_present(
799                            self.object_id,
800                            store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
801                            async || store.get_keys(self.object_id).await,
802                        )
803                        .await?;
804                    (key_id, Some(key))
805                }
806            }
807            Encryption::PermanentKeys => {
808                (VOLUME_DATA_KEY_ID, Some(store.key_manager.get(self.object_id).await?.unwrap()))
809            }
810        })
811    }
812
813    /// This will only work for a non-permanent volume data key. This is designed to be used with
814    /// extended attributes where we'll only create the key on demand for directories and encrypted
815    /// files.
816    async fn get_or_create_key(
817        &self,
818        transaction: &mut Transaction<'_>,
819    ) -> Result<Arc<dyn Cipher>, Error> {
820        let store = self.store();
821
822        // Fast path: try and get keys from the cache.
823        if let Some(key) = store.key_manager.get(self.object_id).await.context("get failed")? {
824            return Ok(key);
825        }
826
827        let crypt = store.crypt().ok_or_else(|| anyhow!("No crypt!"))?;
828
829        // Next, see if the keys are already created.
830        let (mut encryption_keys, mut cipher_set) = if let Some(item) =
831            store.tree.find(&ObjectKey::keys(self.object_id)).await.context("find failed")?
832        {
833            if let ObjectValue::Keys(encryption_keys) = item.value {
834                let cipher_set = store
835                    .key_manager
836                    .get_keys(
837                        self.object_id,
838                        crypt.as_ref(),
839                        &mut Some(async || Ok(encryption_keys.clone())),
840                        /* permanent= */ false,
841                        /* force= */ false,
842                    )
843                    .await
844                    .context("get_keys failed")?;
845                match cipher_set.find_key(VOLUME_DATA_KEY_ID) {
846                    FindKeyResult::NotFound => {}
847                    FindKeyResult::Unavailable => return Err(FxfsError::NoKey.into()),
848                    FindKeyResult::Key(key) => return Ok(key),
849                }
850                (encryption_keys, (*cipher_set).clone())
851            } else {
852                return Err(anyhow!(FxfsError::Inconsistent));
853            }
854        } else {
855            Default::default()
856        };
857
858        // Proceed to create the key.  The transaction holds the required locks.
859        let (key, unwrapped_key) = crypt.create_key(self.object_id, KeyPurpose::Data).await?;
860        let cipher: Arc<dyn Cipher> = Arc::new(FxfsCipher::new(&unwrapped_key));
861
862        // Add new cipher to cloned cipher set. This will replace existing one
863        // if transaction is successful.
864        cipher_set.add_key(VOLUME_DATA_KEY_ID, Some(cipher.clone()));
865        let cipher_set = Arc::new(cipher_set);
866
867        // Arrange for the CipherSet to be added to the cache when (and if) the transaction
868        // commits.
869        struct UnwrappedKeys {
870            object_id: u64,
871            new_keys: Arc<CipherSet>,
872        }
873
874        impl AssociatedObject for UnwrappedKeys {
875            fn will_apply_mutation(
876                &self,
877                _mutation: &Mutation,
878                object_id: u64,
879                manager: &ObjectManager,
880            ) {
881                manager.store(object_id).unwrap().key_manager.insert(
882                    self.object_id,
883                    self.new_keys.clone(),
884                    /* permanent= */ false,
885                );
886            }
887        }
888
889        encryption_keys.insert(VOLUME_DATA_KEY_ID, EncryptionKey::Fxfs(key).into());
890
891        transaction.add_with_object(
892            store.store_object_id(),
893            Mutation::replace_or_insert_object(
894                ObjectKey::keys(self.object_id),
895                ObjectValue::keys(encryption_keys),
896            ),
897            AssocObj::Owned(Box::new(UnwrappedKeys {
898                object_id: self.object_id,
899                new_keys: cipher_set,
900            })),
901        );
902
903        Ok(cipher)
904    }
905
906    pub async fn read(
907        &self,
908        attribute_id: u64,
909        offset: u64,
910        mut buf: MutableBufferRef<'_>,
911    ) -> Result<usize, Error> {
912        let fs = self.store().filesystem();
913        let guard = fs
914            .lock_manager()
915            .read_lock(lock_keys![LockKey::object_attribute(
916                self.store().store_object_id(),
917                self.object_id(),
918                attribute_id,
919            )])
920            .await;
921
922        let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
923        let item = self.store().tree().find(&key).await?;
924        let size = match item {
925            Some(item) if item.key == key => match item.value {
926                ObjectValue::Attribute { size, .. } => size,
927                _ => bail!(FxfsError::Inconsistent),
928            },
929            _ => return Ok(0),
930        };
931        if offset >= size {
932            return Ok(0);
933        }
934        let length = min(buf.len() as u64, size - offset) as usize;
935        buf = buf.subslice_mut(0..length);
936        self.read_unchecked(attribute_id, offset, buf, &guard).await?;
937        Ok(length)
938    }
939
940    /// Read `buf.len()` bytes from the attribute `attribute_id`, starting at `offset`, into `buf`.
941    /// It's required that a read lock on this attribute id is taken before this is called.
942    ///
943    /// This function doesn't do any size checking - any portion of `buf` past the end of the file
944    /// will be filled with zeros. The caller is responsible for enforcing the file size on reads.
945    /// This is because, just looking at the extents, we can't tell the difference between the file
946    /// actually ending and there just being a section at the end with no data (since attributes
947    /// are sparse).
948    pub(super) async fn read_unchecked(
949        &self,
950        attribute_id: u64,
951        mut offset: u64,
952        mut buf: MutableBufferRef<'_>,
953        _guard: &ReadGuard<'_>,
954    ) -> Result<(), Error> {
955        if buf.len() == 0 {
956            return Ok(());
957        }
958        let end_offset = offset + buf.len() as u64;
959
960        self.store().logical_read_ops.fetch_add(1, Ordering::Relaxed);
961
962        // Whilst the read offset must be aligned to the filesystem block size, the buffer need only
963        // be aligned to the device's block size.
964        let block_size = self.block_size() as u64;
965        let device_block_size = self.store().device.block_size() as u64;
966        assert_eq!(offset % block_size, 0);
967        assert_eq!(buf.range().start as u64 % device_block_size, 0);
968        let tree = &self.store().tree;
969        let layer_set = tree.layer_set();
970        let mut merger = layer_set.merger();
971        let mut iter = merger
972            .query(Query::LimitedRange(&ObjectKey::extent(
973                self.object_id(),
974                attribute_id,
975                offset..end_offset,
976            )))
977            .await?;
978        let end_align = ((offset + buf.len() as u64) % block_size) as usize;
979        let trace = self.trace();
980        let reads = FuturesUnordered::new();
981        while let Some(ItemRef {
982            key:
983                ObjectKey {
984                    object_id,
985                    data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
986                },
987            value: ObjectValue::Extent(extent_value),
988            ..
989        }) = iter.get()
990        {
991            if *object_id != self.object_id() || *attr_id != attribute_id {
992                break;
993            }
994            ensure!(
995                extent_key.range.is_valid() && extent_key.range.is_aligned(block_size),
996                FxfsError::Inconsistent
997            );
998            if extent_key.range.start > offset {
999                // Zero everything up to the start of the extent.
1000                let to_zero = min(extent_key.range.start - offset, buf.len() as u64) as usize;
1001                for i in &mut buf.as_mut_slice()[..to_zero] {
1002                    *i = 0;
1003                }
1004                buf = buf.subslice_mut(to_zero..);
1005                if buf.is_empty() {
1006                    break;
1007                }
1008                offset += to_zero as u64;
1009            }
1010
1011            if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1012                let mut device_offset = device_offset + (offset - extent_key.range.start);
1013                let key_id = *key_id;
1014
1015                let to_copy = min(buf.len() - end_align, (extent_key.range.end - offset) as usize);
1016                if to_copy > 0 {
1017                    if trace {
1018                        info!(
1019                            store_id = self.store().store_object_id(),
1020                            oid = self.object_id(),
1021                            device_range:? = (device_offset..device_offset + to_copy as u64),
1022                            offset,
1023                            range:? = extent_key.range,
1024                            block_size;
1025                            "R",
1026                        );
1027                    }
1028                    let (mut head, tail) = buf.split_at_mut(to_copy);
1029                    let maybe_bitmap = match mode {
1030                        ExtentMode::OverwritePartial(bitmap) => {
1031                            let mut read_bitmap = bitmap.clone().split_off(
1032                                ((offset - extent_key.range.start) / block_size) as usize,
1033                            );
1034                            read_bitmap.truncate(to_copy / block_size as usize);
1035                            Some(read_bitmap)
1036                        }
1037                        _ => None,
1038                    };
1039                    reads.push(async move {
1040                        self.read_and_decrypt(device_offset, offset, head.reborrow(), key_id)
1041                            .await?;
1042                        if let Some(bitmap) = maybe_bitmap {
1043                            apply_bitmap_zeroing(self.block_size() as usize, &bitmap, head);
1044                        }
1045                        Ok::<(), Error>(())
1046                    });
1047                    buf = tail;
1048                    if buf.is_empty() {
1049                        break;
1050                    }
1051                    offset += to_copy as u64;
1052                    device_offset += to_copy as u64;
1053                }
1054
1055                // Deal with end alignment by reading the existing contents into an alignment
1056                // buffer.
1057                if offset < extent_key.range.end && end_align > 0 {
1058                    if let ExtentMode::OverwritePartial(bitmap) = mode {
1059                        let bitmap_offset = (offset - extent_key.range.start) / block_size;
1060                        if !bitmap.get(bitmap_offset as usize).ok_or(FxfsError::Inconsistent)? {
1061                            // If this block isn't actually initialized, skip it.
1062                            break;
1063                        }
1064                    }
1065                    let mut align_buf =
1066                        self.store().device.allocate_buffer(block_size as usize).await;
1067                    if trace {
1068                        info!(
1069                            store_id = self.store().store_object_id(),
1070                            oid = self.object_id(),
1071                            device_range:? = (device_offset..device_offset + align_buf.len() as u64);
1072                            "RT",
1073                        );
1074                    }
1075                    self.read_and_decrypt(device_offset, offset, align_buf.as_mut(), key_id)
1076                        .await?;
1077                    buf.as_mut_slice().copy_from_slice(&align_buf.as_slice()[..end_align]);
1078                    buf = buf.subslice_mut(0..0);
1079                    break;
1080                }
1081            } else if extent_key.range.end >= offset + buf.len() as u64 {
1082                // Deleted extent covers remainder, so we're done.
1083                break;
1084            }
1085
1086            iter.advance().await?;
1087        }
1088        reads.try_collect::<()>().await?;
1089        buf.as_mut_slice().fill(0);
1090        Ok(())
1091    }
1092
1093    /// Reads an entire attribute.
1094    pub async fn read_attr(&self, attribute_id: u64) -> Result<Option<Box<[u8]>>, Error> {
1095        let store = self.store();
1096        let tree = &store.tree;
1097        let layer_set = tree.layer_set();
1098        let mut merger = layer_set.merger();
1099        let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1100        let mut iter = merger.query(Query::FullRange(&key)).await?;
1101        let (mut buffer, size) = match iter.get() {
1102            Some(item) if item.key == &key => match item.value {
1103                ObjectValue::Attribute { size, .. } => {
1104                    // TODO(https://fxbug.dev/42073113): size > max buffer size
1105                    (
1106                        store
1107                            .device
1108                            .allocate_buffer(round_up(*size, self.block_size()).unwrap() as usize)
1109                            .await,
1110                        *size as usize,
1111                    )
1112                }
1113                // Attribute was deleted.
1114                ObjectValue::None => return Ok(None),
1115                _ => bail!(FxfsError::Inconsistent),
1116            },
1117            _ => return Ok(None),
1118        };
1119        store.logical_read_ops.fetch_add(1, Ordering::Relaxed);
1120        let mut last_offset = 0;
1121        loop {
1122            iter.advance().await?;
1123            match iter.get() {
1124                Some(ItemRef {
1125                    key:
1126                        ObjectKey {
1127                            object_id,
1128                            data:
1129                                ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
1130                        },
1131                    value: ObjectValue::Extent(extent_value),
1132                    ..
1133                }) if *object_id == self.object_id() && *attr_id == attribute_id => {
1134                    if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1135                        let offset = extent_key.range.start as usize;
1136                        buffer.as_mut_slice()[last_offset..offset].fill(0);
1137                        let end = std::cmp::min(extent_key.range.end as usize, buffer.len());
1138                        let maybe_bitmap = match mode {
1139                            ExtentMode::OverwritePartial(bitmap) => {
1140                                // The caller has to adjust the bitmap if necessary, but we always
1141                                // start from the beginning of any extent, so we only truncate.
1142                                let mut read_bitmap = bitmap.clone();
1143                                read_bitmap.truncate(
1144                                    (end - extent_key.range.start as usize)
1145                                        / self.block_size() as usize,
1146                                );
1147                                Some(read_bitmap)
1148                            }
1149                            _ => None,
1150                        };
1151                        self.read_and_decrypt(
1152                            *device_offset,
1153                            extent_key.range.start,
1154                            buffer.subslice_mut(offset..end as usize),
1155                            *key_id,
1156                        )
1157                        .await?;
1158                        if let Some(bitmap) = maybe_bitmap {
1159                            apply_bitmap_zeroing(
1160                                self.block_size() as usize,
1161                                &bitmap,
1162                                buffer.subslice_mut(offset..end as usize),
1163                            );
1164                        }
1165                        last_offset = end;
1166                        if last_offset >= size {
1167                            break;
1168                        }
1169                    }
1170                }
1171                _ => break,
1172            }
1173        }
1174        buffer.as_mut_slice()[std::cmp::min(last_offset, size)..].fill(0);
1175        Ok(Some(buffer.as_slice()[..size].into()))
1176    }
1177
1178    /// Writes potentially unaligned data at `device_offset` and returns checksums if requested.
1179    /// The data will be encrypted if necessary.  `buf` is mutable as an optimization, since the
1180    /// write may require encryption, we can encrypt the buffer in-place rather than copying to
1181    /// another buffer if the write is already aligned.
1182    ///
1183    /// NOTE: This will not create keys if they are missing (it will fail with an error if that
1184    /// happens to be the case).
1185    pub async fn write_at(
1186        &self,
1187        attribute_id: u64,
1188        offset: u64,
1189        buf: MutableBufferRef<'_>,
1190        key_id: Option<u64>,
1191        mut device_offset: u64,
1192    ) -> Result<MaybeChecksums, Error> {
1193        let mut transfer_buf;
1194        let block_size = self.block_size();
1195        let (range, mut transfer_buf_ref) =
1196            if offset % block_size == 0 && buf.len() as u64 % block_size == 0 {
1197                (offset..offset + buf.len() as u64, buf)
1198            } else {
1199                let (range, buf) = self.align_buffer(attribute_id, offset, buf.as_ref()).await?;
1200                transfer_buf = buf;
1201                device_offset -= offset - range.start;
1202                (range, transfer_buf.as_mut())
1203            };
1204
1205        let mut crypt_ctx = None;
1206        if let (_, Some(key)) = self.get_key(key_id).await? {
1207            if let Some(ctx) = key.crypt_ctx(self.object_id, range.start) {
1208                crypt_ctx = Some(ctx);
1209            } else {
1210                key.encrypt(
1211                    self.object_id,
1212                    device_offset,
1213                    range.start,
1214                    transfer_buf_ref.as_mut_slice(),
1215                )?;
1216            }
1217        }
1218        self.write_aligned(transfer_buf_ref.as_ref(), device_offset, crypt_ctx).await
1219    }
1220
1221    /// Writes to multiple ranges with data provided in `buf`. This function is specifically
1222    /// designed for migration purposes, allowing raw writes to the device without updating
1223    /// object metadata like allocated size or mtime. It's essential for scenarios where
1224    /// data needs to be transferred directly without triggering standard filesystem operations.
1225    #[cfg(feature = "migration")]
1226    pub async fn raw_multi_write(
1227        &self,
1228        transaction: &mut Transaction<'_>,
1229        attribute_id: u64,
1230        key_id: Option<u64>,
1231        ranges: &[Range<u64>],
1232        buf: MutableBufferRef<'_>,
1233    ) -> Result<(), Error> {
1234        self.multi_write_internal(transaction, attribute_id, key_id, ranges, buf).await?;
1235        Ok(())
1236    }
1237
1238    /// This is a low-level write function that writes to multiple ranges. Users should generally
1239    /// use `multi_write` instead of this function as this does not update the object's allocated
1240    /// size, mtime, atime, etc.
1241    ///
1242    /// Returns (allocated, deallocated) bytes on success.
1243    async fn multi_write_internal(
1244        &self,
1245        transaction: &mut Transaction<'_>,
1246        attribute_id: u64,
1247        key_id: Option<u64>,
1248        ranges: &[Range<u64>],
1249        mut buf: MutableBufferRef<'_>,
1250    ) -> Result<(u64, u64), Error> {
1251        if buf.is_empty() {
1252            return Ok((0, 0));
1253        }
1254        let block_size = self.block_size();
1255        let store = self.store();
1256        let store_id = store.store_object_id();
1257
1258        // The only key we allow to be created on-the-fly is a non permanent key wrapped with the
1259        // volume data key.
1260        let (key_id, key) = if key_id == Some(VOLUME_DATA_KEY_ID)
1261            && matches!(self.encryption, Encryption::CachedKeys)
1262        {
1263            (
1264                VOLUME_DATA_KEY_ID,
1265                Some(
1266                    self.get_or_create_key(transaction)
1267                        .await
1268                        .context("get_or_create_key failed")?,
1269                ),
1270            )
1271        } else {
1272            self.get_key(key_id).await?
1273        };
1274        if let Some(key) = &key {
1275            if !key.supports_inline_encryption() {
1276                let mut slice = buf.as_mut_slice();
1277                for r in ranges {
1278                    let l = r.end - r.start;
1279                    let (head, tail) = slice.split_at_mut(l as usize);
1280                    key.encrypt(
1281                        self.object_id,
1282                        0, /* TODO(https://fxbug.dev/421269588): plumb through device_offset. */
1283                        r.start,
1284                        head,
1285                    )?;
1286                    slice = tail;
1287                }
1288            }
1289        }
1290
1291        let mut allocated = 0;
1292        let allocator = store.allocator();
1293        let trace = self.trace();
1294        let mut writes = FuturesOrdered::new();
1295
1296        let mut logical_ranges = ranges.iter();
1297        let mut current_range = logical_ranges.next().unwrap().clone();
1298
1299        while !buf.is_empty() {
1300            let mut device_range = allocator
1301                .allocate(transaction, store_id, buf.len() as u64)
1302                .await
1303                .context("allocation failed")?;
1304            if trace {
1305                info!(
1306                    store_id,
1307                    oid = self.object_id(),
1308                    device_range:?,
1309                    len = device_range.end - device_range.start;
1310                    "A",
1311                );
1312            }
1313            let mut device_range_len = device_range.end - device_range.start;
1314            allocated += device_range_len;
1315            // If inline encryption is NOT supported, this loop should only happen once.
1316            while device_range_len > 0 {
1317                if current_range.end <= current_range.start {
1318                    current_range = logical_ranges.next().unwrap().clone();
1319                }
1320                let (crypt_ctx, split) = if let Some(key) = &key {
1321                    if key.supports_inline_encryption() {
1322                        let split = std::cmp::min(
1323                            current_range.end - current_range.start,
1324                            device_range_len,
1325                        );
1326                        current_range.start += split;
1327
1328                        (key.crypt_ctx(self.object_id, current_range.start), split)
1329                    } else {
1330                        (None, device_range_len)
1331                    }
1332                } else {
1333                    (None, device_range_len)
1334                };
1335
1336                let (head, tail) = buf.split_at_mut(split as usize);
1337                buf = tail;
1338
1339                writes.push_back(async move {
1340                    let len = head.len() as u64;
1341                    Result::<_, Error>::Ok((
1342                        device_range.start,
1343                        len,
1344                        self.write_aligned(head.as_ref(), device_range.start, crypt_ctx).await?,
1345                    ))
1346                });
1347                device_range.start += split;
1348                device_range_len = device_range.end - device_range.start;
1349            }
1350        }
1351
1352        self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1353        let ((mutations, checksums), deallocated) = try_join!(
1354            async {
1355                let mut current_range = 0..0;
1356                let mut mutations = Vec::new();
1357                let mut out_checksums = Vec::new();
1358                let mut ranges = ranges.iter();
1359                while let Some((mut device_offset, mut len, mut checksums)) =
1360                    writes.try_next().await?
1361                {
1362                    while len > 0 {
1363                        if current_range.end <= current_range.start {
1364                            current_range = ranges.next().unwrap().clone();
1365                        }
1366                        let chunk_len = std::cmp::min(len, current_range.end - current_range.start);
1367                        let tail = checksums.split_off((chunk_len / block_size) as usize);
1368                        if let Some(checksums) = checksums.maybe_as_ref() {
1369                            out_checksums.push((
1370                                device_offset..device_offset + chunk_len,
1371                                checksums.to_owned(),
1372                            ));
1373                        }
1374                        mutations.push(Mutation::merge_object(
1375                            ObjectKey::extent(
1376                                self.object_id(),
1377                                attribute_id,
1378                                current_range.start..current_range.start + chunk_len,
1379                            ),
1380                            ObjectValue::Extent(ExtentValue::new(
1381                                device_offset,
1382                                checksums.to_mode(),
1383                                key_id,
1384                            )),
1385                        ));
1386                        checksums = tail;
1387                        device_offset += chunk_len;
1388                        len -= chunk_len;
1389                        current_range.start += chunk_len;
1390                    }
1391                }
1392                Result::<_, Error>::Ok((mutations, out_checksums))
1393            },
1394            async {
1395                let mut deallocated = 0;
1396                for r in ranges {
1397                    deallocated +=
1398                        self.deallocate_old_extents(transaction, attribute_id, r.clone()).await?;
1399                }
1400                Result::<_, Error>::Ok(deallocated)
1401            }
1402        )?;
1403
1404        for m in mutations {
1405            transaction.add(store_id, m);
1406        }
1407
1408        // Only store checksums in the journal if barriers are not enabled.
1409        if !store.filesystem().options().barriers_enabled {
1410            for (r, c) in checksums {
1411                transaction.add_checksum(r, c, true);
1412            }
1413        }
1414        Ok((allocated, deallocated))
1415    }
1416
1417    /// Writes to multiple ranges with data provided in `buf`.  The buffer can be modified in place
1418    /// if encryption takes place.  The ranges must all be aligned and no change to content size is
1419    /// applied; the caller is responsible for updating size if required.  If `key_id` is None, it
1420    /// means pick the default key for the object which is the fscrypt key if present, or the volume
1421    /// data key, or no key if it's an unencrypted file.
1422    pub async fn multi_write(
1423        &self,
1424        transaction: &mut Transaction<'_>,
1425        attribute_id: u64,
1426        key_id: Option<u64>,
1427        ranges: &[Range<u64>],
1428        buf: MutableBufferRef<'_>,
1429    ) -> Result<(), Error> {
1430        let (allocated, deallocated) =
1431            self.multi_write_internal(transaction, attribute_id, key_id, ranges, buf).await?;
1432        if allocated == 0 && deallocated == 0 {
1433            return Ok(());
1434        }
1435        self.update_allocated_size(transaction, allocated, deallocated).await
1436    }
1437
1438    /// Write data to overwrite extents with the provided set of ranges. This makes a strong
1439    /// assumption that the ranges are actually going to be already allocated overwrite extents and
1440    /// will error out or do something wrong if they aren't. It also assumes the ranges passed to
1441    /// it are sorted.
1442    pub async fn multi_overwrite<'a>(
1443        &'a self,
1444        transaction: &mut Transaction<'a>,
1445        attr_id: u64,
1446        ranges: &[Range<u64>],
1447        mut buf: MutableBufferRef<'_>,
1448    ) -> Result<(), Error> {
1449        if buf.is_empty() {
1450            return Ok(());
1451        }
1452        let block_size = self.block_size();
1453        let store = self.store();
1454        let tree = store.tree();
1455        let store_id = store.store_object_id();
1456
1457        let (key_id, key) = self.get_key(None).await?;
1458        if let Some(key) = &key {
1459            if !key.supports_inline_encryption() {
1460                let mut slice = buf.as_mut_slice();
1461                for r in ranges {
1462                    let l = r.end - r.start;
1463                    let (head, tail) = slice.split_at_mut(l as usize);
1464                    key.encrypt(
1465                        self.object_id,
1466                        0, /* TODO(https://fxbug.dev/421269588): plumb through device_offset. */
1467                        r.start,
1468                        head,
1469                    )?;
1470                    slice = tail;
1471                }
1472            }
1473        }
1474
1475        let mut range_iter = ranges.into_iter();
1476        // There should be at least one range if the buffer has data in it
1477        let mut target_range = range_iter.next().unwrap().clone();
1478        let mut mutations = Vec::new();
1479        let writes = FuturesUnordered::new();
1480
1481        let layer_set = tree.layer_set();
1482        let mut merger = layer_set.merger();
1483        let mut iter = merger
1484            .query(Query::FullRange(&ObjectKey::attribute(
1485                self.object_id(),
1486                attr_id,
1487                AttributeKey::Extent(ExtentKey::search_key_from_offset(target_range.start)),
1488            )))
1489            .await?;
1490
1491        loop {
1492            match iter.get() {
1493                Some(ItemRef {
1494                    key:
1495                        ObjectKey {
1496                            object_id,
1497                            data:
1498                                ObjectKeyData::Attribute(
1499                                    attribute_id,
1500                                    AttributeKey::Extent(ExtentKey { range }),
1501                                ),
1502                        },
1503                    value: ObjectValue::Extent(extent_value),
1504                    ..
1505                }) if *object_id == self.object_id() && *attribute_id == attr_id => {
1506                    // If this extent ends before the target range starts (not possible on the
1507                    // first loop because of the query parameters but possible on further loops),
1508                    // advance until we find a the next one we care about.
1509                    if range.end <= target_range.start {
1510                        iter.advance().await?;
1511                        continue;
1512                    }
1513                    let (device_offset, mode) = match extent_value {
1514                        ExtentValue::None => {
1515                            return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1516                                format!(
1517                                    "multi_overwrite failed: target_range ({}, {}) overlaps with \
1518                                deleted extent found at ({}, {})",
1519                                    target_range.start, target_range.end, range.start, range.end,
1520                                )
1521                            });
1522                        }
1523                        ExtentValue::Some { device_offset, mode, .. } => (device_offset, mode),
1524                    };
1525                    // The ranges passed to this function should already by allocated, so
1526                    // extent records should exist for them.
1527                    if range.start > target_range.start {
1528                        return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1529                            format!(
1530                                "multi_overwrite failed: target range ({}, {}) starts before first \
1531                            extent found at ({}, {})",
1532                                target_range.start, target_range.end, range.start, range.end,
1533                            )
1534                        });
1535                    }
1536                    let mut bitmap = match mode {
1537                        ExtentMode::Raw | ExtentMode::Cow(_) => {
1538                            return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1539                                format!(
1540                                    "multi_overwrite failed: \
1541                            extent from ({}, {}) which overlaps target range ({}, {}) had the \
1542                            wrong extent mode",
1543                                    range.start, range.end, target_range.start, target_range.end,
1544                                )
1545                            });
1546                        }
1547                        ExtentMode::OverwritePartial(bitmap) => {
1548                            OverwriteBitmaps::new(bitmap.clone())
1549                        }
1550                        ExtentMode::Overwrite => OverwriteBitmaps::None,
1551                    };
1552                    loop {
1553                        let offset_within_extent = target_range.start - range.start;
1554                        let bitmap_offset = offset_within_extent / block_size;
1555                        let write_device_offset = *device_offset + offset_within_extent;
1556                        let write_end = min(range.end, target_range.end);
1557                        let write_len = write_end - target_range.start;
1558                        let write_device_range =
1559                            write_device_offset..write_device_offset + write_len;
1560                        let (current_buf, remaining_buf) = buf.split_at_mut(write_len as usize);
1561
1562                        bitmap.set_offset(bitmap_offset as usize);
1563                        let checksum_ranges = ChecksumRangeChunk::group_first_write_ranges(
1564                            &mut bitmap,
1565                            block_size,
1566                            write_device_range,
1567                        );
1568
1569                        let crypt_ctx = if let Some(key) = &key {
1570                            key.crypt_ctx(self.object_id, target_range.start)
1571                        } else {
1572                            None
1573                        };
1574
1575                        writes.push(async move {
1576                            let maybe_checksums = self
1577                                .write_aligned(current_buf.as_ref(), write_device_offset, crypt_ctx)
1578                                .await?;
1579                            Ok::<_, Error>(match maybe_checksums {
1580                                MaybeChecksums::None => Vec::new(),
1581                                MaybeChecksums::Fletcher(checksums) => checksum_ranges
1582                                    .into_iter()
1583                                    .map(
1584                                        |ChecksumRangeChunk {
1585                                             checksum_range,
1586                                             device_range,
1587                                             is_first_write,
1588                                         }| {
1589                                            (
1590                                                device_range,
1591                                                checksums[checksum_range].to_vec(),
1592                                                is_first_write,
1593                                            )
1594                                        },
1595                                    )
1596                                    .collect(),
1597                            })
1598                        });
1599                        buf = remaining_buf;
1600                        target_range.start += write_len;
1601                        if target_range.start == target_range.end {
1602                            match range_iter.next() {
1603                                None => break,
1604                                Some(next_range) => target_range = next_range.clone(),
1605                            }
1606                        }
1607                        if range.end <= target_range.start {
1608                            break;
1609                        }
1610                    }
1611                    if let Some((mut bitmap, write_bitmap)) = bitmap.take_bitmaps() {
1612                        if bitmap.or(&write_bitmap) {
1613                            let mode = if bitmap.all() {
1614                                ExtentMode::Overwrite
1615                            } else {
1616                                ExtentMode::OverwritePartial(bitmap)
1617                            };
1618                            mutations.push(Mutation::merge_object(
1619                                ObjectKey::extent(self.object_id(), attr_id, range.clone()),
1620                                ObjectValue::Extent(ExtentValue::new(*device_offset, mode, key_id)),
1621                            ))
1622                        }
1623                    }
1624                    if target_range.start == target_range.end {
1625                        break;
1626                    }
1627                    iter.advance().await?;
1628                }
1629                // We've either run past the end of the existing extents or something is wrong with
1630                // the tree. The main section should break if it finishes the ranges, so either
1631                // case, this is an error.
1632                _ => bail!(anyhow!(FxfsError::Internal).context(
1633                    "found a non-extent object record while there were still ranges to process"
1634                )),
1635            }
1636        }
1637
1638        let checksums = writes.try_collect::<Vec<_>>().await?;
1639        // Only store checksums in the journal if barriers are not enabled.
1640        if !store.filesystem().options().barriers_enabled {
1641            for (r, c, first_write) in checksums.into_iter().flatten() {
1642                transaction.add_checksum(r, c, first_write);
1643            }
1644        }
1645
1646        for m in mutations {
1647            transaction.add(store_id, m);
1648        }
1649
1650        Ok(())
1651    }
1652
1653    /// Writes an attribute that should not already exist and therefore does not require trimming.
1654    /// Breaks up the write into multiple transactions if `data.len()` is larger than `batch_size`.
1655    /// If writing the attribute requires multiple transactions, adds the attribute to the
1656    /// graveyard. The caller is responsible for removing the attribute from the graveyard when it
1657    /// commits the last transaction.  This always writes using a key wrapped with the volume data
1658    /// key.
1659    #[trace]
1660    pub async fn write_new_attr_in_batches<'a>(
1661        &'a self,
1662        transaction: &mut Transaction<'a>,
1663        attribute_id: u64,
1664        data: &[u8],
1665        batch_size: usize,
1666    ) -> Result<(), Error> {
1667        transaction.add(
1668            self.store().store_object_id,
1669            Mutation::replace_or_insert_object(
1670                ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1671                ObjectValue::attribute(data.len() as u64, false),
1672            ),
1673        );
1674        let chunks = data.chunks(batch_size);
1675        let num_chunks = chunks.len();
1676        if num_chunks > 1 {
1677            transaction.add(
1678                self.store().store_object_id,
1679                Mutation::replace_or_insert_object(
1680                    ObjectKey::graveyard_attribute_entry(
1681                        self.store().graveyard_directory_object_id(),
1682                        self.object_id(),
1683                        attribute_id,
1684                    ),
1685                    ObjectValue::Some,
1686                ),
1687            );
1688        }
1689        let mut start_offset = 0;
1690        for (i, chunk) in chunks.enumerate() {
1691            let rounded_len = round_up(chunk.len() as u64, self.block_size()).unwrap();
1692            let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1693            let slice = buffer.as_mut_slice();
1694            slice[..chunk.len()].copy_from_slice(chunk);
1695            slice[chunk.len()..].fill(0);
1696            self.multi_write(
1697                transaction,
1698                attribute_id,
1699                Some(VOLUME_DATA_KEY_ID),
1700                &[start_offset..start_offset + rounded_len],
1701                buffer.as_mut(),
1702            )
1703            .await?;
1704            start_offset += rounded_len;
1705            // Do not commit the last chunk.
1706            if i < num_chunks - 1 {
1707                transaction.commit_and_continue().await?;
1708            }
1709        }
1710        Ok(())
1711    }
1712
1713    /// Writes an entire attribute. Returns whether or not the attribute needs to continue being
1714    /// trimmed - if the new data is shorter than the old data, this will trim any extents beyond
1715    /// the end of the new size, but if there were too many for a single transaction, a commit
1716    /// needs to be made before trimming again, so the responsibility is left to the caller so as
1717    /// to not accidentally split the transaction when it's not in a consistent state.  This will
1718    /// write using the volume data key; the fscrypt key is not supported.
1719    pub async fn write_attr(
1720        &self,
1721        transaction: &mut Transaction<'_>,
1722        attribute_id: u64,
1723        data: &[u8],
1724    ) -> Result<NeedsTrim, Error> {
1725        let rounded_len = round_up(data.len() as u64, self.block_size()).unwrap();
1726        let store = self.store();
1727        let tree = store.tree();
1728        let should_trim = if let Some(item) = tree
1729            .find(&ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute))
1730            .await?
1731        {
1732            match item.value {
1733                ObjectValue::Attribute { size: _, has_overwrite_extents: true } => {
1734                    bail!(
1735                        anyhow!(FxfsError::Inconsistent)
1736                            .context("write_attr on an attribute with overwrite extents")
1737                    )
1738                }
1739                ObjectValue::Attribute { size, .. } => (data.len() as u64) < size,
1740                _ => bail!(FxfsError::Inconsistent),
1741            }
1742        } else {
1743            false
1744        };
1745        let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1746        let slice = buffer.as_mut_slice();
1747        slice[..data.len()].copy_from_slice(data);
1748        slice[data.len()..].fill(0);
1749        self.multi_write(
1750            transaction,
1751            attribute_id,
1752            Some(VOLUME_DATA_KEY_ID),
1753            &[0..rounded_len],
1754            buffer.as_mut(),
1755        )
1756        .await?;
1757        transaction.add(
1758            self.store().store_object_id,
1759            Mutation::replace_or_insert_object(
1760                ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1761                ObjectValue::attribute(data.len() as u64, false),
1762            ),
1763        );
1764        if should_trim {
1765            self.shrink(transaction, attribute_id, data.len() as u64).await
1766        } else {
1767            Ok(NeedsTrim(false))
1768        }
1769    }
1770
1771    pub async fn list_extended_attributes(&self) -> Result<Vec<Vec<u8>>, Error> {
1772        let layer_set = self.store().tree().layer_set();
1773        let mut merger = layer_set.merger();
1774        // Seek to the first extended attribute key for this object.
1775        let mut iter = merger
1776            .query(Query::FullRange(&ObjectKey::extended_attribute(self.object_id(), Vec::new())))
1777            .await?;
1778        let mut out = Vec::new();
1779        while let Some(item) = iter.get() {
1780            // Skip deleted extended attributes.
1781            if item.value != &ObjectValue::None {
1782                match item.key {
1783                    ObjectKey { object_id, data: ObjectKeyData::ExtendedAttribute { name } } => {
1784                        if self.object_id() != *object_id {
1785                            bail!(
1786                                anyhow!(FxfsError::Inconsistent)
1787                                    .context("list_extended_attributes: wrong object id")
1788                            )
1789                        }
1790                        out.push(name.clone());
1791                    }
1792                    // Once we hit something that isn't an extended attribute key, we've gotten to
1793                    // the end.
1794                    _ => break,
1795                }
1796            }
1797            iter.advance().await?;
1798        }
1799        Ok(out)
1800    }
1801
1802    /// Looks up the values for the extended attribute `fio::SELINUX_CONTEXT_NAME`, returning it
1803    /// if it is found inline. If it is not inline, it will request use of the
1804    /// `get_extended_attributes` method. If the entry doesn't exist at all, returns None.
1805    pub async fn get_inline_selinux_context(&self) -> Result<Option<fio::SelinuxContext>, Error> {
1806        // This optimization is only useful as long as the attribute is smaller than inline sizes.
1807        // Avoid reading the data out of the attributes.
1808        const_assert!(fio::MAX_SELINUX_CONTEXT_ATTRIBUTE_LEN as usize <= MAX_INLINE_XATTR_SIZE);
1809        let item = match self
1810            .store()
1811            .tree()
1812            .find(&ObjectKey::extended_attribute(
1813                self.object_id(),
1814                fio::SELINUX_CONTEXT_NAME.into(),
1815            ))
1816            .await?
1817        {
1818            Some(item) => item,
1819            None => return Ok(None),
1820        };
1821        match item.value {
1822            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => {
1823                Ok(Some(fio::SelinuxContext::Data(value)))
1824            }
1825            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(_)) => {
1826                Ok(Some(fio::SelinuxContext::UseExtendedAttributes(fio::EmptyStruct {})))
1827            }
1828            _ => {
1829                bail!(
1830                    anyhow!(FxfsError::Inconsistent)
1831                        .context("get_inline_extended_attribute: Expected ExtendedAttribute value")
1832                )
1833            }
1834        }
1835    }
1836
1837    pub async fn get_extended_attribute(&self, name: Vec<u8>) -> Result<Vec<u8>, Error> {
1838        let item = self
1839            .store()
1840            .tree()
1841            .find(&ObjectKey::extended_attribute(self.object_id(), name))
1842            .await?
1843            .ok_or(FxfsError::NotFound)?;
1844        match item.value {
1845            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => Ok(value),
1846            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1847                Ok(self.read_attr(id).await?.ok_or(FxfsError::Inconsistent)?.into_vec())
1848            }
1849            _ => {
1850                bail!(
1851                    anyhow!(FxfsError::Inconsistent)
1852                        .context("get_extended_attribute: Expected ExtendedAttribute value")
1853                )
1854            }
1855        }
1856    }
1857
1858    pub async fn set_extended_attribute(
1859        &self,
1860        name: Vec<u8>,
1861        value: Vec<u8>,
1862        mode: SetExtendedAttributeMode,
1863    ) -> Result<(), Error> {
1864        let store = self.store();
1865        let fs = store.filesystem();
1866        // NB: We need to take this lock before we potentially look up the value to prevent racing
1867        // with another set.
1868        let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
1869        let mut transaction = fs.new_transaction(keys, Options::default()).await?;
1870        self.set_extended_attribute_impl(name, value, mode, &mut transaction).await?;
1871        transaction.commit().await?;
1872        Ok(())
1873    }
1874
1875    async fn set_extended_attribute_impl(
1876        &self,
1877        name: Vec<u8>,
1878        value: Vec<u8>,
1879        mode: SetExtendedAttributeMode,
1880        transaction: &mut Transaction<'_>,
1881    ) -> Result<(), Error> {
1882        ensure!(name.len() <= MAX_XATTR_NAME_SIZE, FxfsError::TooBig);
1883        ensure!(value.len() <= MAX_XATTR_VALUE_SIZE, FxfsError::TooBig);
1884        let tree = self.store().tree();
1885        let object_key = ObjectKey::extended_attribute(self.object_id(), name);
1886
1887        let existing_attribute_id = {
1888            let (found, existing_attribute_id) = match tree.find(&object_key).await? {
1889                None => (false, None),
1890                Some(Item { value, .. }) => (
1891                    true,
1892                    match value {
1893                        ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
1894                        ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1895                            Some(id)
1896                        }
1897                        _ => bail!(
1898                            anyhow!(FxfsError::Inconsistent)
1899                                .context("expected extended attribute value")
1900                        ),
1901                    },
1902                ),
1903            };
1904            match mode {
1905                SetExtendedAttributeMode::Create if found => {
1906                    bail!(FxfsError::AlreadyExists)
1907                }
1908                SetExtendedAttributeMode::Replace if !found => {
1909                    bail!(FxfsError::NotFound)
1910                }
1911                _ => (),
1912            }
1913            existing_attribute_id
1914        };
1915
1916        if let Some(attribute_id) = existing_attribute_id {
1917            // If we already have an attribute id allocated for this extended attribute, we always
1918            // use it, even if the value has shrunk enough to be stored inline. We don't need to
1919            // worry about trimming here for the same reason we don't need to worry about it when
1920            // we delete xattrs - they simply aren't large enough to ever need more than one
1921            // transaction.
1922            let _ = self.write_attr(transaction, attribute_id, &value).await?;
1923        } else if value.len() <= MAX_INLINE_XATTR_SIZE {
1924            transaction.add(
1925                self.store().store_object_id(),
1926                Mutation::replace_or_insert_object(
1927                    object_key,
1928                    ObjectValue::inline_extended_attribute(value),
1929                ),
1930            );
1931        } else {
1932            // If there isn't an existing attribute id and we are going to store the value in
1933            // an attribute, find the next empty attribute id in the range. We search for fxfs
1934            // attribute records specifically, instead of the extended attribute records, because
1935            // even if the extended attribute record is removed the attribute may not be fully
1936            // trimmed yet.
1937            let mut attribute_id = EXTENDED_ATTRIBUTE_RANGE_START;
1938            let layer_set = tree.layer_set();
1939            let mut merger = layer_set.merger();
1940            let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1941            let mut iter = merger.query(Query::FullRange(&key)).await?;
1942            loop {
1943                match iter.get() {
1944                    // None means the key passed to seek wasn't found. That means the first
1945                    // attribute is available and we can just stop right away.
1946                    None => break,
1947                    Some(ItemRef {
1948                        key: ObjectKey { object_id, data: ObjectKeyData::Attribute(attr_id, _) },
1949                        value,
1950                        ..
1951                    }) if *object_id == self.object_id() => {
1952                        if matches!(value, ObjectValue::None) {
1953                            // This attribute was once used but is now deleted, so it's safe to use
1954                            // again.
1955                            break;
1956                        }
1957                        if attribute_id < *attr_id {
1958                            // We found a gap - use it.
1959                            break;
1960                        } else if attribute_id == *attr_id {
1961                            // This attribute id is in use, try the next one.
1962                            attribute_id += 1;
1963                            if attribute_id == EXTENDED_ATTRIBUTE_RANGE_END {
1964                                bail!(FxfsError::NoSpace);
1965                            }
1966                        }
1967                        // If we don't hit either of those cases, we are still moving through the
1968                        // extent keys for the current attribute, so just keep advancing until the
1969                        // attribute id changes.
1970                    }
1971                    // As we are working our way through the iterator, if we hit anything that
1972                    // doesn't have our object id or attribute key data, we've gone past the end of
1973                    // this section and can stop.
1974                    _ => break,
1975                }
1976                iter.advance().await?;
1977            }
1978
1979            // We know this won't need trimming because it's a new attribute.
1980            let _ = self.write_attr(transaction, attribute_id, &value).await?;
1981            transaction.add(
1982                self.store().store_object_id(),
1983                Mutation::replace_or_insert_object(
1984                    object_key,
1985                    ObjectValue::extended_attribute(attribute_id),
1986                ),
1987            );
1988        }
1989
1990        Ok(())
1991    }
1992
1993    pub async fn remove_extended_attribute(&self, name: Vec<u8>) -> Result<(), Error> {
1994        let store = self.store();
1995        let tree = store.tree();
1996        let object_key = ObjectKey::extended_attribute(self.object_id(), name);
1997
1998        // NB: The API says we have to return an error if the attribute doesn't exist, so we have
1999        // to look it up first to make sure we have a record of it before we delete it. Make sure
2000        // we take a lock and make a transaction before we do so we don't race with other
2001        // operations.
2002        let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
2003        let mut transaction = store.filesystem().new_transaction(keys, Options::default()).await?;
2004
2005        let attribute_to_delete =
2006            match tree.find(&object_key).await?.ok_or(FxfsError::NotFound)?.value {
2007                ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => Some(id),
2008                ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
2009                _ => {
2010                    bail!(
2011                        anyhow!(FxfsError::Inconsistent)
2012                            .context("remove_extended_attribute: Expected ExtendedAttribute value")
2013                    )
2014                }
2015            };
2016
2017        transaction.add(
2018            store.store_object_id(),
2019            Mutation::replace_or_insert_object(object_key, ObjectValue::None),
2020        );
2021
2022        // If the attribute wasn't stored inline, we need to deallocate all the extents too. This
2023        // would normally need to interact with the graveyard for correctness - if there are too
2024        // many extents to delete to fit in a single transaction then we could potentially have
2025        // consistency issues. However, the maximum size of an extended attribute is small enough
2026        // that it will never come close to that limit even in the worst case, so we just delete
2027        // everything in one shot.
2028        if let Some(attribute_id) = attribute_to_delete {
2029            let trim_result = store
2030                .trim_some(
2031                    &mut transaction,
2032                    self.object_id(),
2033                    attribute_id,
2034                    TrimMode::FromOffset(0),
2035                )
2036                .await?;
2037            // In case you didn't read the comment above - this should not be used to delete
2038            // arbitrary attributes!
2039            assert_matches!(trim_result, TrimResult::Done(_));
2040            transaction.add(
2041                store.store_object_id(),
2042                Mutation::replace_or_insert_object(
2043                    ObjectKey::attribute(self.object_id, attribute_id, AttributeKey::Attribute),
2044                    ObjectValue::None,
2045                ),
2046            );
2047        }
2048
2049        transaction.commit().await?;
2050        Ok(())
2051    }
2052
2053    /// Returns a future that will pre-fetches the keys so as to avoid paying the performance
2054    /// penalty later. Must ensure that the object is not removed before the future completes.
2055    pub fn pre_fetch_keys(&self) -> Option<impl Future<Output = ()> + use<S>> {
2056        if let Encryption::CachedKeys = self.encryption {
2057            let owner = self.owner.clone();
2058            let object_id = self.object_id;
2059            Some(async move {
2060                let store = owner.as_ref().as_ref();
2061                if let Some(crypt) = store.crypt() {
2062                    let _ = store
2063                        .key_manager
2064                        .get_keys(
2065                            object_id,
2066                            crypt.as_ref(),
2067                            &mut Some(async || store.get_keys(object_id).await),
2068                            /* permanent= */ false,
2069                            /* force= */ false,
2070                        )
2071                        .await;
2072                }
2073            })
2074        } else {
2075            None
2076        }
2077    }
2078}
2079
2080impl<S: HandleOwner> Drop for StoreObjectHandle<S> {
2081    fn drop(&mut self) {
2082        if self.is_encrypted() {
2083            let _ = self.store().key_manager.remove(self.object_id);
2084        }
2085    }
2086}
2087
2088/// When truncating an object, sometimes it might not be possible to complete the transaction in a
2089/// single transaction, in which case the caller needs to finish trimming the object in subsequent
2090/// transactions (by calling ObjectStore::trim).
2091#[must_use]
2092pub struct NeedsTrim(pub bool);
2093
2094#[cfg(test)]
2095mod tests {
2096    use super::{ChecksumRangeChunk, OverwriteBitmaps};
2097    use crate::errors::FxfsError;
2098    use crate::filesystem::{FxFilesystem, OpenFxFilesystem};
2099    use crate::object_handle::ObjectHandle;
2100    use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
2101    use crate::object_store::transaction::{Mutation, Options, lock_keys};
2102    use crate::object_store::{
2103        AttributeKey, DataObjectHandle, Directory, FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions,
2104        LockKey, ObjectKey, ObjectStore, ObjectValue, SetExtendedAttributeMode, StoreObjectHandle,
2105    };
2106    use bit_vec::BitVec;
2107    use fuchsia_async as fasync;
2108    use futures::join;
2109    use std::sync::Arc;
2110    use storage_device::DeviceHolder;
2111    use storage_device::fake_device::FakeDevice;
2112
2113    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2114    const TEST_OBJECT_NAME: &str = "foo";
2115
2116    fn is_error(actual: anyhow::Error, expected: FxfsError) {
2117        assert_eq!(*actual.root_cause().downcast_ref::<FxfsError>().unwrap(), expected)
2118    }
2119
2120    async fn test_filesystem() -> OpenFxFilesystem {
2121        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2122        FxFilesystem::new_empty(device).await.expect("new_empty failed")
2123    }
2124
2125    async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
2126    {
2127        let fs = test_filesystem().await;
2128        let store = fs.root_store();
2129
2130        let mut transaction = fs
2131            .clone()
2132            .new_transaction(
2133                lock_keys![LockKey::object(
2134                    store.store_object_id(),
2135                    store.root_directory_object_id()
2136                )],
2137                Options::default(),
2138            )
2139            .await
2140            .expect("new_transaction failed");
2141
2142        let object =
2143            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2144                .await
2145                .expect("create_object failed");
2146
2147        let root_directory =
2148            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2149        root_directory
2150            .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
2151            .await
2152            .expect("add_child_file failed");
2153
2154        transaction.commit().await.expect("commit failed");
2155
2156        (fs, object)
2157    }
2158
2159    #[fuchsia::test(threads = 3)]
2160    async fn extended_attribute_double_remove() {
2161        // This test is intended to trip a potential race condition in remove. Removing an
2162        // attribute that doesn't exist is an error, so we need to check before we remove, but if
2163        // we aren't careful, two parallel removes might both succeed in the check and then both
2164        // remove the value.
2165        let (fs, object) = test_filesystem_and_empty_object().await;
2166        let basic = Arc::new(StoreObjectHandle::new(
2167            object.owner().clone(),
2168            object.object_id(),
2169            /* permanent_keys: */ false,
2170            HandleOptions::default(),
2171            false,
2172        ));
2173        let basic_a = basic.clone();
2174        let basic_b = basic.clone();
2175
2176        basic
2177            .set_extended_attribute(
2178                b"security.selinux".to_vec(),
2179                b"bar".to_vec(),
2180                SetExtendedAttributeMode::Set,
2181            )
2182            .await
2183            .expect("failed to set attribute");
2184
2185        // Try to remove the attribute twice at the same time. One should succeed in the race and
2186        // return Ok, and the other should fail the race and return NOT_FOUND.
2187        let a_task = fasync::Task::spawn(async move {
2188            basic_a.remove_extended_attribute(b"security.selinux".to_vec()).await
2189        });
2190        let b_task = fasync::Task::spawn(async move {
2191            basic_b.remove_extended_attribute(b"security.selinux".to_vec()).await
2192        });
2193        match join!(a_task, b_task) {
2194            (Ok(()), Ok(())) => panic!("both remove calls succeeded"),
2195            (Err(_), Err(_)) => panic!("both remove calls failed"),
2196
2197            (Ok(()), Err(e)) => is_error(e, FxfsError::NotFound),
2198            (Err(e), Ok(())) => is_error(e, FxfsError::NotFound),
2199        }
2200
2201        fs.close().await.expect("Close failed");
2202    }
2203
2204    #[fuchsia::test(threads = 3)]
2205    async fn extended_attribute_double_create() {
2206        // This test is intended to trip a potential race in set when using the create flag,
2207        // similar to above. If the create mode is set, we need to check that the attribute isn't
2208        // already created, but if two parallel creates both succeed in that check, and we aren't
2209        // careful with locking, they will both succeed and one will overwrite the other.
2210        let (fs, object) = test_filesystem_and_empty_object().await;
2211        let basic = Arc::new(StoreObjectHandle::new(
2212            object.owner().clone(),
2213            object.object_id(),
2214            /* permanent_keys: */ false,
2215            HandleOptions::default(),
2216            false,
2217        ));
2218        let basic_a = basic.clone();
2219        let basic_b = basic.clone();
2220
2221        // Try to set the attribute twice at the same time. One should succeed in the race and
2222        // return Ok, and the other should fail the race and return ALREADY_EXISTS.
2223        let a_task = fasync::Task::spawn(async move {
2224            basic_a
2225                .set_extended_attribute(
2226                    b"security.selinux".to_vec(),
2227                    b"one".to_vec(),
2228                    SetExtendedAttributeMode::Create,
2229                )
2230                .await
2231        });
2232        let b_task = fasync::Task::spawn(async move {
2233            basic_b
2234                .set_extended_attribute(
2235                    b"security.selinux".to_vec(),
2236                    b"two".to_vec(),
2237                    SetExtendedAttributeMode::Create,
2238                )
2239                .await
2240        });
2241        match join!(a_task, b_task) {
2242            (Ok(()), Ok(())) => panic!("both set calls succeeded"),
2243            (Err(_), Err(_)) => panic!("both set calls failed"),
2244
2245            (Ok(()), Err(e)) => {
2246                assert_eq!(
2247                    basic
2248                        .get_extended_attribute(b"security.selinux".to_vec())
2249                        .await
2250                        .expect("failed to get xattr"),
2251                    b"one"
2252                );
2253                is_error(e, FxfsError::AlreadyExists);
2254            }
2255            (Err(e), Ok(())) => {
2256                assert_eq!(
2257                    basic
2258                        .get_extended_attribute(b"security.selinux".to_vec())
2259                        .await
2260                        .expect("failed to get xattr"),
2261                    b"two"
2262                );
2263                is_error(e, FxfsError::AlreadyExists);
2264            }
2265        }
2266
2267        fs.close().await.expect("Close failed");
2268    }
2269
2270    struct TestAttr {
2271        name: Vec<u8>,
2272        value: Vec<u8>,
2273    }
2274
2275    impl TestAttr {
2276        fn new(name: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> Self {
2277            Self { name: name.as_ref().to_vec(), value: value.as_ref().to_vec() }
2278        }
2279        fn name(&self) -> Vec<u8> {
2280            self.name.clone()
2281        }
2282        fn value(&self) -> Vec<u8> {
2283            self.value.clone()
2284        }
2285    }
2286
2287    #[fuchsia::test]
2288    async fn extended_attributes() {
2289        let (fs, object) = test_filesystem_and_empty_object().await;
2290
2291        let test_attr = TestAttr::new(b"security.selinux", b"foo");
2292
2293        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2294        is_error(
2295            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2296            FxfsError::NotFound,
2297        );
2298
2299        object
2300            .set_extended_attribute(
2301                test_attr.name(),
2302                test_attr.value(),
2303                SetExtendedAttributeMode::Set,
2304            )
2305            .await
2306            .unwrap();
2307        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2308        assert_eq!(
2309            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2310            test_attr.value()
2311        );
2312
2313        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2314        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2315        is_error(
2316            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2317            FxfsError::NotFound,
2318        );
2319
2320        // Make sure we can object the same attribute being set again.
2321        object
2322            .set_extended_attribute(
2323                test_attr.name(),
2324                test_attr.value(),
2325                SetExtendedAttributeMode::Set,
2326            )
2327            .await
2328            .unwrap();
2329        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2330        assert_eq!(
2331            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2332            test_attr.value()
2333        );
2334
2335        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2336        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2337        is_error(
2338            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2339            FxfsError::NotFound,
2340        );
2341
2342        fs.close().await.expect("close failed");
2343    }
2344
2345    #[fuchsia::test]
2346    async fn large_extended_attribute() {
2347        let (fs, object) = test_filesystem_and_empty_object().await;
2348
2349        let test_attr = TestAttr::new(b"security.selinux", vec![3u8; 300]);
2350
2351        object
2352            .set_extended_attribute(
2353                test_attr.name(),
2354                test_attr.value(),
2355                SetExtendedAttributeMode::Set,
2356            )
2357            .await
2358            .unwrap();
2359        assert_eq!(
2360            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2361            test_attr.value()
2362        );
2363
2364        // Probe the fxfs attributes to make sure it did the expected thing. This relies on inside
2365        // knowledge of how the attribute id is chosen.
2366        assert_eq!(
2367            object
2368                .read_attr(64)
2369                .await
2370                .expect("read_attr failed")
2371                .expect("read_attr returned none")
2372                .into_vec(),
2373            test_attr.value()
2374        );
2375
2376        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2377        is_error(
2378            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2379            FxfsError::NotFound,
2380        );
2381
2382        // Make sure we can object the same attribute being set again.
2383        object
2384            .set_extended_attribute(
2385                test_attr.name(),
2386                test_attr.value(),
2387                SetExtendedAttributeMode::Set,
2388            )
2389            .await
2390            .unwrap();
2391        assert_eq!(
2392            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2393            test_attr.value()
2394        );
2395        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2396        is_error(
2397            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2398            FxfsError::NotFound,
2399        );
2400
2401        fs.close().await.expect("close failed");
2402    }
2403
2404    #[fuchsia::test]
2405    async fn multiple_extended_attributes() {
2406        let (fs, object) = test_filesystem_and_empty_object().await;
2407
2408        let attrs = [
2409            TestAttr::new(b"security.selinux", b"foo"),
2410            TestAttr::new(b"large.attribute", vec![3u8; 300]),
2411            TestAttr::new(b"an.attribute", b"asdf"),
2412            TestAttr::new(b"user.big", vec![5u8; 288]),
2413            TestAttr::new(b"user.tiny", b"smol"),
2414            TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2415            TestAttr::new(b"also big", vec![7u8; 500]),
2416            TestAttr::new(b"all.ones", vec![1u8; 11111]),
2417        ];
2418
2419        for i in 0..attrs.len() {
2420            object
2421                .set_extended_attribute(
2422                    attrs[i].name(),
2423                    attrs[i].value(),
2424                    SetExtendedAttributeMode::Set,
2425                )
2426                .await
2427                .unwrap();
2428            assert_eq!(
2429                object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2430                attrs[i].value()
2431            );
2432        }
2433
2434        for i in 0..attrs.len() {
2435            // Make sure expected attributes are still available.
2436            let mut found_attrs = object.list_extended_attributes().await.unwrap();
2437            let mut expected_attrs: Vec<Vec<u8>> = attrs.iter().skip(i).map(|a| a.name()).collect();
2438            found_attrs.sort();
2439            expected_attrs.sort();
2440            assert_eq!(found_attrs, expected_attrs);
2441            for j in i..attrs.len() {
2442                assert_eq!(
2443                    object.get_extended_attribute(attrs[j].name()).await.unwrap(),
2444                    attrs[j].value()
2445                );
2446            }
2447
2448            object.remove_extended_attribute(attrs[i].name()).await.expect("failed to remove");
2449            is_error(
2450                object.get_extended_attribute(attrs[i].name()).await.unwrap_err(),
2451                FxfsError::NotFound,
2452            );
2453        }
2454
2455        fs.close().await.expect("close failed");
2456    }
2457
2458    #[fuchsia::test]
2459    async fn multiple_extended_attributes_delete() {
2460        let (fs, object) = test_filesystem_and_empty_object().await;
2461        let store = object.owner().clone();
2462
2463        let attrs = [
2464            TestAttr::new(b"security.selinux", b"foo"),
2465            TestAttr::new(b"large.attribute", vec![3u8; 300]),
2466            TestAttr::new(b"an.attribute", b"asdf"),
2467            TestAttr::new(b"user.big", vec![5u8; 288]),
2468            TestAttr::new(b"user.tiny", b"smol"),
2469            TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2470            TestAttr::new(b"also big", vec![7u8; 500]),
2471            TestAttr::new(b"all.ones", vec![1u8; 11111]),
2472        ];
2473
2474        for i in 0..attrs.len() {
2475            object
2476                .set_extended_attribute(
2477                    attrs[i].name(),
2478                    attrs[i].value(),
2479                    SetExtendedAttributeMode::Set,
2480                )
2481                .await
2482                .unwrap();
2483            assert_eq!(
2484                object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2485                attrs[i].value()
2486            );
2487        }
2488
2489        // Unlink the file
2490        let root_directory =
2491            Directory::open(object.owner(), object.store().root_directory_object_id())
2492                .await
2493                .expect("open failed");
2494        let mut transaction = fs
2495            .clone()
2496            .new_transaction(
2497                lock_keys![
2498                    LockKey::object(store.store_object_id(), store.root_directory_object_id()),
2499                    LockKey::object(store.store_object_id(), object.object_id()),
2500                ],
2501                Options::default(),
2502            )
2503            .await
2504            .expect("new_transaction failed");
2505        crate::object_store::directory::replace_child(
2506            &mut transaction,
2507            None,
2508            (&root_directory, TEST_OBJECT_NAME),
2509        )
2510        .await
2511        .expect("replace_child failed");
2512        transaction.commit().await.unwrap();
2513        store.tombstone_object(object.object_id(), Options::default()).await.unwrap();
2514
2515        crate::fsck::fsck(fs.clone()).await.unwrap();
2516
2517        fs.close().await.expect("close failed");
2518    }
2519
2520    #[fuchsia::test]
2521    async fn extended_attribute_changing_sizes() {
2522        let (fs, object) = test_filesystem_and_empty_object().await;
2523
2524        let test_name = b"security.selinux";
2525        let test_small_attr = TestAttr::new(test_name, b"smol");
2526        let test_large_attr = TestAttr::new(test_name, vec![3u8; 300]);
2527
2528        object
2529            .set_extended_attribute(
2530                test_small_attr.name(),
2531                test_small_attr.value(),
2532                SetExtendedAttributeMode::Set,
2533            )
2534            .await
2535            .unwrap();
2536        assert_eq!(
2537            object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2538            test_small_attr.value()
2539        );
2540
2541        // With a small attribute, we don't expect it to write to an fxfs attribute.
2542        assert!(object.read_attr(64).await.expect("read_attr failed").is_none());
2543
2544        crate::fsck::fsck(fs.clone()).await.unwrap();
2545
2546        object
2547            .set_extended_attribute(
2548                test_large_attr.name(),
2549                test_large_attr.value(),
2550                SetExtendedAttributeMode::Set,
2551            )
2552            .await
2553            .unwrap();
2554        assert_eq!(
2555            object.get_extended_attribute(test_large_attr.name()).await.unwrap(),
2556            test_large_attr.value()
2557        );
2558
2559        // Once the value is above the threshold, we expect it to get upgraded to an fxfs
2560        // attribute.
2561        assert_eq!(
2562            object
2563                .read_attr(64)
2564                .await
2565                .expect("read_attr failed")
2566                .expect("read_attr returned none")
2567                .into_vec(),
2568            test_large_attr.value()
2569        );
2570
2571        crate::fsck::fsck(fs.clone()).await.unwrap();
2572
2573        object
2574            .set_extended_attribute(
2575                test_small_attr.name(),
2576                test_small_attr.value(),
2577                SetExtendedAttributeMode::Set,
2578            )
2579            .await
2580            .unwrap();
2581        assert_eq!(
2582            object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2583            test_small_attr.value()
2584        );
2585
2586        // Even though we are back under the threshold, we still expect it to be stored in an fxfs
2587        // attribute, because we don't downgrade to inline once we've allocated one.
2588        assert_eq!(
2589            object
2590                .read_attr(64)
2591                .await
2592                .expect("read_attr failed")
2593                .expect("read_attr returned none")
2594                .into_vec(),
2595            test_small_attr.value()
2596        );
2597
2598        crate::fsck::fsck(fs.clone()).await.unwrap();
2599
2600        object.remove_extended_attribute(test_small_attr.name()).await.expect("failed to remove");
2601
2602        crate::fsck::fsck(fs.clone()).await.unwrap();
2603
2604        fs.close().await.expect("close failed");
2605    }
2606
2607    #[fuchsia::test]
2608    async fn extended_attribute_max_size() {
2609        let (fs, object) = test_filesystem_and_empty_object().await;
2610
2611        let test_attr = TestAttr::new(
2612            vec![3u8; super::MAX_XATTR_NAME_SIZE],
2613            vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2614        );
2615
2616        object
2617            .set_extended_attribute(
2618                test_attr.name(),
2619                test_attr.value(),
2620                SetExtendedAttributeMode::Set,
2621            )
2622            .await
2623            .unwrap();
2624        assert_eq!(
2625            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2626            test_attr.value()
2627        );
2628        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2629        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2630
2631        fs.close().await.expect("close failed");
2632    }
2633
2634    #[fuchsia::test]
2635    async fn extended_attribute_remove_then_create() {
2636        let (fs, object) = test_filesystem_and_empty_object().await;
2637
2638        let test_attr = TestAttr::new(
2639            vec![3u8; super::MAX_XATTR_NAME_SIZE],
2640            vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2641        );
2642
2643        object
2644            .set_extended_attribute(
2645                test_attr.name(),
2646                test_attr.value(),
2647                SetExtendedAttributeMode::Create,
2648            )
2649            .await
2650            .unwrap();
2651        fs.journal().compact().await.unwrap();
2652        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2653        object
2654            .set_extended_attribute(
2655                test_attr.name(),
2656                test_attr.value(),
2657                SetExtendedAttributeMode::Create,
2658            )
2659            .await
2660            .unwrap();
2661
2662        assert_eq!(
2663            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2664            test_attr.value()
2665        );
2666
2667        fs.close().await.expect("close failed");
2668    }
2669
2670    #[fuchsia::test]
2671    async fn large_extended_attribute_max_number() {
2672        let (fs, object) = test_filesystem_and_empty_object().await;
2673
2674        let max_xattrs =
2675            super::EXTENDED_ATTRIBUTE_RANGE_END - super::EXTENDED_ATTRIBUTE_RANGE_START;
2676        for i in 0..max_xattrs {
2677            let test_attr = TestAttr::new(format!("{}", i).as_bytes(), vec![0x3; 300]);
2678            object
2679                .set_extended_attribute(
2680                    test_attr.name(),
2681                    test_attr.value(),
2682                    SetExtendedAttributeMode::Set,
2683                )
2684                .await
2685                .unwrap_or_else(|_| panic!("failed to set xattr number {}", i));
2686        }
2687
2688        // That should have taken up all the attributes we've allocated to extended attributes, so
2689        // this one should return ERR_NO_SPACE.
2690        match object
2691            .set_extended_attribute(
2692                b"one.too.many".to_vec(),
2693                vec![0x3; 300],
2694                SetExtendedAttributeMode::Set,
2695            )
2696            .await
2697        {
2698            Ok(()) => panic!("set should not succeed"),
2699            Err(e) => is_error(e, FxfsError::NoSpace),
2700        }
2701
2702        // But inline attributes don't need an attribute number, so it should work fine.
2703        object
2704            .set_extended_attribute(
2705                b"this.is.okay".to_vec(),
2706                b"small value".to_vec(),
2707                SetExtendedAttributeMode::Set,
2708            )
2709            .await
2710            .unwrap();
2711
2712        // And updating existing ones should be okay.
2713        object
2714            .set_extended_attribute(b"11".to_vec(), vec![0x4; 300], SetExtendedAttributeMode::Set)
2715            .await
2716            .unwrap();
2717        object
2718            .set_extended_attribute(
2719                b"12".to_vec(),
2720                vec![0x1; 300],
2721                SetExtendedAttributeMode::Replace,
2722            )
2723            .await
2724            .unwrap();
2725
2726        // And we should be able to remove an attribute and set another one.
2727        object.remove_extended_attribute(b"5".to_vec()).await.unwrap();
2728        object
2729            .set_extended_attribute(
2730                b"new attr".to_vec(),
2731                vec![0x3; 300],
2732                SetExtendedAttributeMode::Set,
2733            )
2734            .await
2735            .unwrap();
2736
2737        fs.close().await.expect("close failed");
2738    }
2739
2740    #[fuchsia::test]
2741    async fn write_attr_trims_beyond_new_end() {
2742        // When writing, multi_write will deallocate old extents that overlap with the new data,
2743        // but it doesn't trim anything beyond that, since it doesn't know what the total size will
2744        // be. write_attr does know, because it writes the whole attribute at once, so we need to
2745        // make sure it cleans up properly.
2746        let (fs, object) = test_filesystem_and_empty_object().await;
2747
2748        let block_size = fs.block_size();
2749        let buf_size = block_size * 2;
2750        let attribute_id = 10;
2751
2752        let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2753        let mut buffer = object.allocate_buffer(buf_size as usize).await;
2754        buffer.as_mut_slice().fill(3);
2755        // Writing two separate ranges, even if they are contiguous, forces them to be separate
2756        // extent records.
2757        object
2758            .multi_write(
2759                &mut transaction,
2760                attribute_id,
2761                &[0..block_size, block_size..block_size * 2],
2762                buffer.as_mut(),
2763            )
2764            .await
2765            .unwrap();
2766        transaction.add(
2767            object.store().store_object_id,
2768            Mutation::replace_or_insert_object(
2769                ObjectKey::attribute(object.object_id(), attribute_id, AttributeKey::Attribute),
2770                ObjectValue::attribute(block_size * 2, false),
2771            ),
2772        );
2773        transaction.commit().await.unwrap();
2774
2775        crate::fsck::fsck(fs.clone()).await.unwrap();
2776
2777        let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2778        let needs_trim = (*object)
2779            .write_attr(&mut transaction, attribute_id, &vec![3u8; block_size as usize])
2780            .await
2781            .unwrap();
2782        assert!(!needs_trim.0);
2783        transaction.commit().await.unwrap();
2784
2785        crate::fsck::fsck(fs.clone()).await.unwrap();
2786
2787        fs.close().await.expect("close failed");
2788    }
2789
2790    #[fuchsia::test]
2791    async fn write_new_attr_in_batches_multiple_txns() {
2792        let (fs, object) = test_filesystem_and_empty_object().await;
2793        let merkle_tree = vec![1; 3 * WRITE_ATTR_BATCH_SIZE];
2794        let mut transaction =
2795            (*object).new_transaction(FSVERITY_MERKLE_ATTRIBUTE_ID).await.unwrap();
2796        object
2797            .write_new_attr_in_batches(
2798                &mut transaction,
2799                FSVERITY_MERKLE_ATTRIBUTE_ID,
2800                &merkle_tree,
2801                WRITE_ATTR_BATCH_SIZE,
2802            )
2803            .await
2804            .expect("failed to write merkle attribute");
2805
2806        transaction.add(
2807            object.store().store_object_id,
2808            Mutation::replace_or_insert_object(
2809                ObjectKey::graveyard_attribute_entry(
2810                    object.store().graveyard_directory_object_id(),
2811                    object.object_id(),
2812                    FSVERITY_MERKLE_ATTRIBUTE_ID,
2813                ),
2814                ObjectValue::None,
2815            ),
2816        );
2817        transaction.commit().await.unwrap();
2818        assert_eq!(
2819            object.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
2820            Some(merkle_tree.into())
2821        );
2822
2823        fs.close().await.expect("close failed");
2824    }
2825
2826    // Running on target only, to use fake time features in the executor.
2827    #[cfg(target_os = "fuchsia")]
2828    #[fuchsia::test(allow_stalls = false)]
2829    async fn test_watchdog() {
2830        use super::Watchdog;
2831        use fuchsia_async::{MonotonicDuration, MonotonicInstant, TestExecutor};
2832        use std::sync::mpsc::channel;
2833
2834        TestExecutor::advance_to(make_time(0)).await;
2835        let (sender, receiver) = channel();
2836
2837        fn make_time(time_secs: i64) -> MonotonicInstant {
2838            MonotonicInstant::from_nanos(0) + MonotonicDuration::from_seconds(time_secs)
2839        }
2840
2841        {
2842            let _watchdog = Watchdog::new(10, move |count| {
2843                sender.send(count).expect("Sending value");
2844            });
2845
2846            // Too early.
2847            TestExecutor::advance_to(make_time(5)).await;
2848            receiver.try_recv().expect_err("Should not have message");
2849
2850            // First message.
2851            TestExecutor::advance_to(make_time(10)).await;
2852            assert_eq!(1, receiver.recv().expect("Receiving"));
2853
2854            // Too early for the next.
2855            TestExecutor::advance_to(make_time(15)).await;
2856            receiver.try_recv().expect_err("Should not have message");
2857
2858            // Missed one. They'll be spooled up.
2859            TestExecutor::advance_to(make_time(30)).await;
2860            assert_eq!(2, receiver.recv().expect("Receiving"));
2861            assert_eq!(3, receiver.recv().expect("Receiving"));
2862        }
2863
2864        // Watchdog is dropped, nothing should trigger.
2865        TestExecutor::advance_to(make_time(100)).await;
2866        receiver.recv().expect_err("Watchdog should be gone");
2867    }
2868
2869    #[fuchsia::test]
2870    fn test_checksum_range_chunk() {
2871        let block_size = 4096;
2872
2873        // No bitmap means one chunk that covers the whole range
2874        assert_eq!(
2875            ChecksumRangeChunk::group_first_write_ranges(
2876                &mut OverwriteBitmaps::None,
2877                block_size,
2878                block_size * 2..block_size * 5,
2879            ),
2880            vec![ChecksumRangeChunk {
2881                checksum_range: 0..3,
2882                device_range: block_size * 2..block_size * 5,
2883                is_first_write: false,
2884            }],
2885        );
2886
2887        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2888        assert_eq!(
2889            ChecksumRangeChunk::group_first_write_ranges(
2890                &mut bitmaps,
2891                block_size,
2892                block_size * 2..block_size * 5,
2893            ),
2894            vec![ChecksumRangeChunk {
2895                checksum_range: 0..3,
2896                device_range: block_size * 2..block_size * 5,
2897                is_first_write: false,
2898            }],
2899        );
2900        assert_eq!(
2901            bitmaps.take_bitmaps(),
2902            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b11100000])))
2903        );
2904
2905        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2906        bitmaps.set_offset(2);
2907        assert_eq!(
2908            ChecksumRangeChunk::group_first_write_ranges(
2909                &mut bitmaps,
2910                block_size,
2911                block_size * 2..block_size * 5,
2912            ),
2913            vec![
2914                ChecksumRangeChunk {
2915                    checksum_range: 0..2,
2916                    device_range: block_size * 2..block_size * 4,
2917                    is_first_write: false,
2918                },
2919                ChecksumRangeChunk {
2920                    checksum_range: 2..3,
2921                    device_range: block_size * 4..block_size * 5,
2922                    is_first_write: true,
2923                },
2924            ],
2925        );
2926        assert_eq!(
2927            bitmaps.take_bitmaps(),
2928            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00111000])))
2929        );
2930
2931        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2932        bitmaps.set_offset(4);
2933        assert_eq!(
2934            ChecksumRangeChunk::group_first_write_ranges(
2935                &mut bitmaps,
2936                block_size,
2937                block_size * 2..block_size * 5,
2938            ),
2939            vec![ChecksumRangeChunk {
2940                checksum_range: 0..3,
2941                device_range: block_size * 2..block_size * 5,
2942                is_first_write: true,
2943            }],
2944        );
2945        assert_eq!(
2946            bitmaps.take_bitmaps(),
2947            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00001110])))
2948        );
2949
2950        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b01010101]));
2951        assert_eq!(
2952            ChecksumRangeChunk::group_first_write_ranges(
2953                &mut bitmaps,
2954                block_size,
2955                block_size * 2..block_size * 10,
2956            ),
2957            vec![
2958                ChecksumRangeChunk {
2959                    checksum_range: 0..1,
2960                    device_range: block_size * 2..block_size * 3,
2961                    is_first_write: true,
2962                },
2963                ChecksumRangeChunk {
2964                    checksum_range: 1..2,
2965                    device_range: block_size * 3..block_size * 4,
2966                    is_first_write: false,
2967                },
2968                ChecksumRangeChunk {
2969                    checksum_range: 2..3,
2970                    device_range: block_size * 4..block_size * 5,
2971                    is_first_write: true,
2972                },
2973                ChecksumRangeChunk {
2974                    checksum_range: 3..4,
2975                    device_range: block_size * 5..block_size * 6,
2976                    is_first_write: false,
2977                },
2978                ChecksumRangeChunk {
2979                    checksum_range: 4..5,
2980                    device_range: block_size * 6..block_size * 7,
2981                    is_first_write: true,
2982                },
2983                ChecksumRangeChunk {
2984                    checksum_range: 5..6,
2985                    device_range: block_size * 7..block_size * 8,
2986                    is_first_write: false,
2987                },
2988                ChecksumRangeChunk {
2989                    checksum_range: 6..7,
2990                    device_range: block_size * 8..block_size * 9,
2991                    is_first_write: true,
2992                },
2993                ChecksumRangeChunk {
2994                    checksum_range: 7..8,
2995                    device_range: block_size * 9..block_size * 10,
2996                    is_first_write: false,
2997                },
2998            ],
2999        );
3000        assert_eq!(
3001            bitmaps.take_bitmaps(),
3002            Some((BitVec::from_bytes(&[0b01010101]), BitVec::from_bytes(&[0b11111111])))
3003        );
3004    }
3005}