fxfs/object_store/
store_object_handle.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::{Checksum, Checksums, fletcher64};
6use crate::errors::FxfsError;
7use crate::log::*;
8use crate::lsm_tree::Query;
9use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
10use crate::object_handle::ObjectHandle;
11use crate::object_store::extent_record::{ExtentKey, ExtentMode, ExtentValue};
12use crate::object_store::object_manager::ObjectManager;
13use crate::object_store::object_record::{
14    AttributeKey, ExtendedAttributeValue, ObjectAttributes, ObjectItem, ObjectKey, ObjectKeyData,
15    ObjectValue, Timestamp,
16};
17use crate::object_store::transaction::{
18    AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Options, ReadGuard,
19    Transaction, lock_keys,
20};
21use crate::object_store::{
22    HandleOptions, HandleOwner, ObjectStore, TrimMode, TrimResult, VOLUME_DATA_KEY_ID,
23};
24use crate::range::RangeExt;
25use crate::round::{round_down, round_up};
26use anyhow::{Context, Error, anyhow, bail, ensure};
27use assert_matches::assert_matches;
28use bit_vec::BitVec;
29use futures::stream::{FuturesOrdered, FuturesUnordered};
30use futures::{TryStreamExt, try_join};
31use fxfs_crypto::{
32    Cipher, CipherHolder, CipherSet, EncryptionKey, FindKeyResult, FxfsCipher, KeyPurpose,
33};
34use fxfs_trace::trace;
35use static_assertions::const_assert;
36use std::cmp::min;
37use std::future::Future;
38use std::ops::Range;
39use std::sync::Arc;
40use std::sync::atomic::{self, AtomicBool, Ordering};
41use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
42use storage_device::{InlineCryptoOptions, ReadOptions, WriteOptions};
43
44use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
45
46/// Maximum size for an extended attribute name.
47pub const MAX_XATTR_NAME_SIZE: usize = 255;
48/// Maximum size an extended attribute can be before it's stored in an object attribute instead of
49/// inside the record directly.
50pub const MAX_INLINE_XATTR_SIZE: usize = 256;
51/// Maximum size for an extended attribute value. NB: the maximum size for an extended attribute is
52/// 64kB, which we rely on for correctness when deleting attributes, so ensure it's always
53/// enforced.
54pub const MAX_XATTR_VALUE_SIZE: usize = 64000;
55/// The range of fxfs attribute ids which are reserved for extended attribute values. Whenever a
56/// new attribute is needed, the first unused id will be chosen from this range. It's technically
57/// safe to change these values, but it has potential consequences - they are only used during id
58/// selection, so any existing extended attributes keep their ids, which means any past or present
59/// selected range here could potentially have used attributes unless they are explicitly migrated,
60/// which isn't currently done.
61pub const EXTENDED_ATTRIBUTE_RANGE_START: u64 = 64;
62pub const EXTENDED_ATTRIBUTE_RANGE_END: u64 = 512;
63
64/// Zeroes blocks in 'buffer' based on `bitmap`, one bit per block from start of buffer.
65fn apply_bitmap_zeroing(
66    block_size: usize,
67    bitmap: &bit_vec::BitVec,
68    mut buffer: MutableBufferRef<'_>,
69) {
70    let buf = buffer.as_mut_slice();
71    debug_assert_eq!(bitmap.len() * block_size, buf.len());
72    for (i, block) in bitmap.iter().enumerate() {
73        if !block {
74            let start = i * block_size;
75            buf[start..start + block_size].fill(0);
76        }
77    }
78}
79
80/// When writing, often the logic should be generic over whether or not checksums are generated.
81/// This provides that and a handy way to convert to the more general ExtentMode that eventually
82/// stores it on disk.
83#[derive(Debug, Clone, PartialEq)]
84pub enum MaybeChecksums {
85    None,
86    Fletcher(Vec<Checksum>),
87}
88
89impl MaybeChecksums {
90    pub fn maybe_as_ref(&self) -> Option<&[Checksum]> {
91        match self {
92            Self::None => None,
93            Self::Fletcher(sums) => Some(&sums),
94        }
95    }
96
97    pub fn split_off(&mut self, at: usize) -> Self {
98        match self {
99            Self::None => Self::None,
100            Self::Fletcher(sums) => Self::Fletcher(sums.split_off(at)),
101        }
102    }
103
104    pub fn to_mode(self) -> ExtentMode {
105        match self {
106            Self::None => ExtentMode::Raw,
107            Self::Fletcher(sums) => ExtentMode::Cow(Checksums::fletcher(sums)),
108        }
109    }
110
111    pub fn into_option(self) -> Option<Vec<Checksum>> {
112        match self {
113            Self::None => None,
114            Self::Fletcher(sums) => Some(sums),
115        }
116    }
117}
118
119/// The mode of operation when setting extended attributes. This is the same as the fidl definition
120/// but is replicated here so we don't have fuchsia.io structures in the api, so this can be used
121/// on host.
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123pub enum SetExtendedAttributeMode {
124    /// Create the extended attribute if it doesn't exist, replace the value if it does.
125    Set,
126    /// Create the extended attribute if it doesn't exist, fail if it does.
127    Create,
128    /// Replace the extended attribute value if it exists, fail if it doesn't.
129    Replace,
130}
131
132impl From<fio::SetExtendedAttributeMode> for SetExtendedAttributeMode {
133    fn from(other: fio::SetExtendedAttributeMode) -> SetExtendedAttributeMode {
134        match other {
135            fio::SetExtendedAttributeMode::Set => SetExtendedAttributeMode::Set,
136            fio::SetExtendedAttributeMode::Create => SetExtendedAttributeMode::Create,
137            fio::SetExtendedAttributeMode::Replace => SetExtendedAttributeMode::Replace,
138        }
139    }
140}
141
142enum Encryption {
143    /// The object doesn't use encryption.
144    None,
145
146    /// The object has keys that are cached (which means unwrapping occurs on-demand) with
147    /// KeyManager.
148    CachedKeys,
149
150    /// The object has permanent keys registered with KeyManager.
151    PermanentKeys,
152}
153
154#[derive(PartialEq, Debug)]
155enum OverwriteBitmaps {
156    None,
157    Some {
158        /// The block bitmap of a partial overwrite extent in the tree.
159        extent_bitmap: BitVec,
160        /// A bitmap of the blocks written to by the current overwrite.
161        write_bitmap: BitVec,
162        /// BitVec doesn't have a slice equivalent, so for a particular section of the write we
163        /// keep track of an offset in the bitmaps to operate on.
164        bitmap_offset: usize,
165    },
166}
167
168impl OverwriteBitmaps {
169    fn new(extent_bitmap: BitVec) -> Self {
170        OverwriteBitmaps::Some {
171            write_bitmap: BitVec::from_elem(extent_bitmap.len(), false),
172            extent_bitmap,
173            bitmap_offset: 0,
174        }
175    }
176
177    fn is_none(&self) -> bool {
178        *self == OverwriteBitmaps::None
179    }
180
181    fn set_offset(&mut self, new_offset: usize) {
182        match self {
183            OverwriteBitmaps::None => (),
184            OverwriteBitmaps::Some { bitmap_offset, .. } => *bitmap_offset = new_offset,
185        }
186    }
187
188    fn get_from_extent_bitmap(&self, i: usize) -> Option<bool> {
189        match self {
190            OverwriteBitmaps::None => None,
191            OverwriteBitmaps::Some { extent_bitmap, bitmap_offset, .. } => {
192                extent_bitmap.get(*bitmap_offset + i)
193            }
194        }
195    }
196
197    fn set_in_write_bitmap(&mut self, i: usize, x: bool) {
198        match self {
199            OverwriteBitmaps::None => (),
200            OverwriteBitmaps::Some { write_bitmap, bitmap_offset, .. } => {
201                write_bitmap.set(*bitmap_offset + i, x)
202            }
203        }
204    }
205
206    fn take_bitmaps(self) -> Option<(BitVec, BitVec)> {
207        match self {
208            OverwriteBitmaps::None => None,
209            OverwriteBitmaps::Some { extent_bitmap, write_bitmap, .. } => {
210                Some((extent_bitmap, write_bitmap))
211            }
212        }
213    }
214}
215
216/// When writing to Overwrite ranges, we need to emit whether a set of checksums for a device range
217/// is the first write to that region or not. This tracks one such range so we can use it after the
218/// write to break up the returned checksum list.
219#[derive(PartialEq, Debug)]
220struct ChecksumRangeChunk {
221    checksum_range: Range<usize>,
222    device_range: Range<u64>,
223    is_first_write: bool,
224}
225
226impl ChecksumRangeChunk {
227    fn group_first_write_ranges(
228        bitmaps: &mut OverwriteBitmaps,
229        block_size: u64,
230        write_device_range: Range<u64>,
231    ) -> Vec<ChecksumRangeChunk> {
232        let write_block_len = (write_device_range.length().unwrap() / block_size) as usize;
233        if bitmaps.is_none() {
234            // If there is no bitmap, then the overwrite range is fully written to. However, we
235            // could still be within the journal flush window where one of the blocks was written
236            // to for the first time to put it in this state, so we still need to emit the
237            // checksums in case replay needs them.
238            vec![ChecksumRangeChunk {
239                checksum_range: 0..write_block_len,
240                device_range: write_device_range,
241                is_first_write: false,
242            }]
243        } else {
244            let mut checksum_ranges = vec![ChecksumRangeChunk {
245                checksum_range: 0..0,
246                device_range: write_device_range.start..write_device_range.start,
247                is_first_write: !bitmaps.get_from_extent_bitmap(0).unwrap(),
248            }];
249            let mut working_range = checksum_ranges.last_mut().unwrap();
250            for i in 0..write_block_len {
251                bitmaps.set_in_write_bitmap(i, true);
252
253                // bitmap.get returning true means the block is initialized and therefore has been
254                // written to before.
255                if working_range.is_first_write != bitmaps.get_from_extent_bitmap(i).unwrap() {
256                    // is_first_write is tracking opposite of what comes back from the bitmap, so
257                    // if the are still opposites we continue our current range.
258                    working_range.checksum_range.end += 1;
259                    working_range.device_range.end += block_size;
260                } else {
261                    // If they are the same, then we need to make a new chunk.
262                    let new_chunk = ChecksumRangeChunk {
263                        checksum_range: working_range.checksum_range.end
264                            ..working_range.checksum_range.end + 1,
265                        device_range: working_range.device_range.end
266                            ..working_range.device_range.end + block_size,
267                        is_first_write: !working_range.is_first_write,
268                    };
269                    checksum_ranges.push(new_chunk);
270                    working_range = checksum_ranges.last_mut().unwrap();
271                }
272            }
273            checksum_ranges
274        }
275    }
276}
277
278/// StoreObjectHandle is the lowest-level, untyped handle to an object with the id [`object_id`] in
279/// a particular store, [`owner`]. It provides functionality shared across all objects, such as
280/// reading and writing attributes and managing encryption keys.
281///
282/// Since it's untyped, it doesn't do any object kind validation, and is generally meant to
283/// implement higher-level typed handles.
284///
285/// For file-like objects with a data attribute, DataObjectHandle implements traits and helpers for
286/// doing more complex extent management and caches the content size.
287///
288/// For directory-like objects, Directory knows how to add and remove child objects and enumerate
289/// its children.
290pub struct StoreObjectHandle<S: HandleOwner> {
291    owner: Arc<S>,
292    object_id: u64,
293    options: HandleOptions,
294    trace: AtomicBool,
295    encryption: Encryption,
296}
297
298impl<S: HandleOwner> ObjectHandle for StoreObjectHandle<S> {
299    fn set_trace(&self, v: bool) {
300        info!(store_id = self.store().store_object_id, oid = self.object_id(), trace = v; "trace");
301        self.trace.store(v, atomic::Ordering::Relaxed);
302    }
303
304    fn object_id(&self) -> u64 {
305        return self.object_id;
306    }
307
308    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
309        self.store().device.allocate_buffer(size)
310    }
311
312    fn block_size(&self) -> u64 {
313        self.store().block_size()
314    }
315}
316
317struct Watchdog {
318    _task: fasync::Task<()>,
319}
320
321impl Watchdog {
322    fn new(increment_seconds: u64, cb: impl Fn(u64) + Send + 'static) -> Self {
323        Self {
324            _task: fasync::Task::spawn(async move {
325                let increment = increment_seconds.try_into().unwrap();
326                let mut fired_counter = 0;
327                let mut next_wake = fasync::MonotonicInstant::now();
328                loop {
329                    next_wake += std::time::Duration::from_secs(increment).into();
330                    // If this isn't being scheduled this will purposely result in fast looping when
331                    // it does. This will be insightful about the state of the thread and task
332                    // scheduling.
333                    if fasync::MonotonicInstant::now() < next_wake {
334                        fasync::Timer::new(next_wake).await;
335                    }
336                    fired_counter += 1;
337                    cb(fired_counter);
338                }
339            }),
340        }
341    }
342}
343
344impl<S: HandleOwner> StoreObjectHandle<S> {
345    /// Make a new StoreObjectHandle for the object with id [`object_id`] in store [`owner`].
346    pub fn new(
347        owner: Arc<S>,
348        object_id: u64,
349        permanent_keys: bool,
350        mut options: HandleOptions,
351        trace: bool,
352    ) -> Self {
353        let encryption = if permanent_keys {
354            Encryption::PermanentKeys
355        } else if owner.as_ref().as_ref().is_encrypted() {
356            Encryption::CachedKeys
357        } else {
358            Encryption::None
359        };
360        let store: &ObjectStore = owner.as_ref().as_ref();
361        if store.is_encrypted() && store.filesystem().options().inline_crypto_enabled {
362            options.skip_checksums = true;
363        }
364        Self { owner, object_id, encryption, options, trace: AtomicBool::new(trace) }
365    }
366
367    pub fn owner(&self) -> &Arc<S> {
368        &self.owner
369    }
370
371    pub fn store(&self) -> &ObjectStore {
372        self.owner.as_ref().as_ref()
373    }
374
375    pub fn trace(&self) -> bool {
376        self.trace.load(atomic::Ordering::Relaxed)
377    }
378
379    pub fn is_encrypted(&self) -> bool {
380        !matches!(self.encryption, Encryption::None)
381    }
382
383    /// Get the default set of transaction options for this object. This is mostly the overall
384    /// default, modified by any [`HandleOptions`] held by this handle.
385    pub fn default_transaction_options<'b>(&self) -> Options<'b> {
386        Options { skip_journal_checks: self.options.skip_journal_checks, ..Default::default() }
387    }
388
389    pub async fn new_transaction_with_options<'b>(
390        &self,
391        attribute_id: u64,
392        options: Options<'b>,
393    ) -> Result<Transaction<'b>, Error> {
394        Ok(self
395            .store()
396            .filesystem()
397            .new_transaction(
398                lock_keys![
399                    LockKey::object_attribute(
400                        self.store().store_object_id(),
401                        self.object_id(),
402                        attribute_id,
403                    ),
404                    LockKey::object(self.store().store_object_id(), self.object_id()),
405                ],
406                options,
407            )
408            .await?)
409    }
410
411    pub async fn new_transaction<'b>(&self, attribute_id: u64) -> Result<Transaction<'b>, Error> {
412        self.new_transaction_with_options(attribute_id, self.default_transaction_options()).await
413    }
414
415    // If |transaction| has an impending mutation for the underlying object, returns that.
416    // Otherwise, looks up the object from the tree.
417    async fn txn_get_object_mutation(
418        &self,
419        transaction: &Transaction<'_>,
420    ) -> Result<ObjectStoreMutation, Error> {
421        self.store().txn_get_object_mutation(transaction, self.object_id()).await
422    }
423
424    // Returns the amount deallocated.
425    async fn deallocate_old_extents(
426        &self,
427        transaction: &mut Transaction<'_>,
428        attribute_id: u64,
429        range: Range<u64>,
430    ) -> Result<u64, Error> {
431        let block_size = self.block_size();
432        assert_eq!(range.start % block_size, 0);
433        assert_eq!(range.end % block_size, 0);
434        if range.start == range.end {
435            return Ok(0);
436        }
437        let tree = &self.store().tree;
438        let layer_set = tree.layer_set();
439        let key = ExtentKey { range };
440        let lower_bound = ObjectKey::attribute(
441            self.object_id(),
442            attribute_id,
443            AttributeKey::Extent(key.search_key()),
444        );
445        let mut merger = layer_set.merger();
446        let mut iter = merger.query(Query::FullRange(&lower_bound)).await?;
447        let allocator = self.store().allocator();
448        let mut deallocated = 0;
449        let trace = self.trace();
450        while let Some(ItemRef {
451            key:
452                ObjectKey {
453                    object_id,
454                    data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
455                },
456            value: ObjectValue::Extent(value),
457            ..
458        }) = iter.get()
459        {
460            if *object_id != self.object_id() || *attr_id != attribute_id {
461                break;
462            }
463            if let ExtentValue::Some { device_offset, .. } = value {
464                if let Some(overlap) = key.overlap(extent_key) {
465                    let range = device_offset + overlap.start - extent_key.range.start
466                        ..device_offset + overlap.end - extent_key.range.start;
467                    ensure!(range.is_aligned(block_size), FxfsError::Inconsistent);
468                    if trace {
469                        info!(
470                            store_id = self.store().store_object_id(),
471                            oid = self.object_id(),
472                            device_range:? = range,
473                            len = range.end - range.start,
474                            extent_key:?;
475                            "D",
476                        );
477                    }
478                    allocator
479                        .deallocate(transaction, self.store().store_object_id(), range)
480                        .await?;
481                    deallocated += overlap.end - overlap.start;
482                } else {
483                    break;
484                }
485            }
486            iter.advance().await?;
487        }
488        Ok(deallocated)
489    }
490
491    // Writes aligned data (that should already be encrypted) to the given offset and computes
492    // checksums if requested. The aligned data must be from a single logical file range.
493    async fn write_aligned(
494        &self,
495        buf: BufferRef<'_>,
496        device_offset: u64,
497        crypt_ctx: Option<(u32, u8)>,
498    ) -> Result<MaybeChecksums, Error> {
499        if self.trace() {
500            info!(
501                store_id = self.store().store_object_id(),
502                oid = self.object_id(),
503                device_range:? = (device_offset..device_offset + buf.len() as u64),
504                len = buf.len();
505                "W",
506            );
507        }
508        let store = self.store();
509        store.device_write_ops.fetch_add(1, Ordering::Relaxed);
510        let mut checksums = Vec::new();
511        let _watchdog = Watchdog::new(10, |count| {
512            warn!("Write has been stalled for {} seconds", count * 10);
513        });
514
515        if self.options.skip_checksums {
516            store
517                .device
518                .write_with_opts(
519                    device_offset as u64,
520                    buf,
521                    crypt_ctx.map_or_else(
522                        || WriteOptions::default(),
523                        |(dun, slot)| WriteOptions {
524                            inline_crypto_options: InlineCryptoOptions { dun, slot },
525                            ..Default::default()
526                        },
527                    ),
528                )
529                .await?;
530            Ok(MaybeChecksums::None)
531        } else {
532            debug_assert!(crypt_ctx.is_none());
533            try_join!(store.device.write(device_offset, buf), async {
534                let block_size = self.block_size();
535                for chunk in buf.as_slice().chunks_exact(block_size as usize) {
536                    checksums.push(fletcher64(chunk, 0));
537                }
538                Ok(())
539            })?;
540            Ok(MaybeChecksums::Fletcher(checksums))
541        }
542    }
543
544    /// Flushes the underlying device.  This is expensive and should be used sparingly.
545    pub async fn flush_device(&self) -> Result<(), Error> {
546        self.store().device().flush().await
547    }
548
549    pub async fn update_allocated_size(
550        &self,
551        transaction: &mut Transaction<'_>,
552        allocated: u64,
553        deallocated: u64,
554    ) -> Result<(), Error> {
555        if allocated == deallocated {
556            return Ok(());
557        }
558        let mut mutation = self.txn_get_object_mutation(transaction).await?;
559        if let ObjectValue::Object {
560            attributes: ObjectAttributes { project_id, allocated_size, .. },
561            ..
562        } = &mut mutation.item.value
563        {
564            // The only way for these to fail are if the volume is inconsistent.
565            *allocated_size = allocated_size
566                .checked_add(allocated)
567                .ok_or_else(|| anyhow!(FxfsError::Inconsistent).context("Allocated size overflow"))?
568                .checked_sub(deallocated)
569                .ok_or_else(|| {
570                    anyhow!(FxfsError::Inconsistent).context("Allocated size underflow")
571                })?;
572
573            if *project_id != 0 {
574                // The allocated and deallocated shouldn't exceed the max size of the file which is
575                // bound within i64.
576                let diff = i64::try_from(allocated).unwrap() - i64::try_from(deallocated).unwrap();
577                transaction.add(
578                    self.store().store_object_id(),
579                    Mutation::merge_object(
580                        ObjectKey::project_usage(
581                            self.store().root_directory_object_id(),
582                            *project_id,
583                        ),
584                        ObjectValue::BytesAndNodes { bytes: diff, nodes: 0 },
585                    ),
586                );
587            }
588        } else {
589            // This can occur when the object mutation is created from an object in the tree which
590            // was corrupt.
591            bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
592        }
593        transaction.add(self.store().store_object_id, Mutation::ObjectStore(mutation));
594        Ok(())
595    }
596
597    pub async fn update_attributes<'a>(
598        &self,
599        transaction: &mut Transaction<'a>,
600        node_attributes: Option<&fio::MutableNodeAttributes>,
601        change_time: Option<Timestamp>,
602    ) -> Result<(), Error> {
603        if let Some(&fio::MutableNodeAttributes { selinux_context: Some(ref context), .. }) =
604            node_attributes
605        {
606            if let fio::SelinuxContext::Data(context) = context {
607                self.set_extended_attribute_impl(
608                    "security.selinux".into(),
609                    context.clone(),
610                    SetExtendedAttributeMode::Set,
611                    transaction,
612                )
613                .await?;
614            } else {
615                return Err(anyhow!(FxfsError::InvalidArgs)
616                    .context("Only set SELinux context with `data` member."));
617            }
618        }
619        self.store()
620            .update_attributes(transaction, self.object_id, node_attributes, change_time)
621            .await
622    }
623
624    /// Zeroes the given range.  The range must be aligned.  Returns the amount of data deallocated.
625    pub async fn zero(
626        &self,
627        transaction: &mut Transaction<'_>,
628        attribute_id: u64,
629        range: Range<u64>,
630    ) -> Result<(), Error> {
631        let deallocated =
632            self.deallocate_old_extents(transaction, attribute_id, range.clone()).await?;
633        if deallocated > 0 {
634            self.update_allocated_size(transaction, 0, deallocated).await?;
635            transaction.add(
636                self.store().store_object_id,
637                Mutation::merge_object(
638                    ObjectKey::extent(self.object_id(), attribute_id, range),
639                    ObjectValue::Extent(ExtentValue::deleted_extent()),
640                ),
641            );
642        }
643        Ok(())
644    }
645
646    // Returns a new aligned buffer (reading the head and tail blocks if necessary) with a copy of
647    // the data from `buf`.
648    pub async fn align_buffer(
649        &self,
650        attribute_id: u64,
651        offset: u64,
652        buf: BufferRef<'_>,
653    ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
654        let block_size = self.block_size();
655        let end = offset + buf.len() as u64;
656        let aligned =
657            round_down(offset, block_size)..round_up(end, block_size).ok_or(FxfsError::TooBig)?;
658
659        let mut aligned_buf =
660            self.store().device.allocate_buffer((aligned.end - aligned.start) as usize).await;
661
662        // Deal with head alignment.
663        if aligned.start < offset {
664            let mut head_block = aligned_buf.subslice_mut(..block_size as usize);
665            let read = self.read(attribute_id, aligned.start, head_block.reborrow()).await?;
666            head_block.as_mut_slice()[read..].fill(0);
667        }
668
669        // Deal with tail alignment.
670        if aligned.end > end {
671            let end_block_offset = aligned.end - block_size;
672            // There's no need to read the tail block if we read it as part of the head block.
673            if offset <= end_block_offset {
674                let mut tail_block =
675                    aligned_buf.subslice_mut(aligned_buf.len() - block_size as usize..);
676                let read = self.read(attribute_id, end_block_offset, tail_block.reborrow()).await?;
677                tail_block.as_mut_slice()[read..].fill(0);
678            }
679        }
680
681        aligned_buf.as_mut_slice()
682            [(offset - aligned.start) as usize..(end - aligned.start) as usize]
683            .copy_from_slice(buf.as_slice());
684
685        Ok((aligned, aligned_buf))
686    }
687
688    /// Trim an attribute's extents, potentially adding a graveyard trim entry if more trimming is
689    /// needed, so the transaction can be committed without worrying about leaking data.
690    ///
691    /// This doesn't update the size stored in the attribute value - the caller is responsible for
692    /// doing that to keep the size up to date.
693    pub async fn shrink(
694        &self,
695        transaction: &mut Transaction<'_>,
696        attribute_id: u64,
697        size: u64,
698    ) -> Result<NeedsTrim, Error> {
699        let store = self.store();
700        let needs_trim = matches!(
701            store
702                .trim_some(transaction, self.object_id(), attribute_id, TrimMode::FromOffset(size))
703                .await?,
704            TrimResult::Incomplete
705        );
706        if needs_trim {
707            // Add the object to the graveyard in case the following transactions don't get
708            // replayed.
709            let graveyard_id = store.graveyard_directory_object_id();
710            match store
711                .tree
712                .find(&ObjectKey::graveyard_entry(graveyard_id, self.object_id()))
713                .await?
714            {
715                Some(ObjectItem { value: ObjectValue::Some, .. })
716                | Some(ObjectItem { value: ObjectValue::Trim, .. }) => {
717                    // This object is already in the graveyard so we don't need to do anything.
718                }
719                _ => {
720                    transaction.add(
721                        store.store_object_id,
722                        Mutation::replace_or_insert_object(
723                            ObjectKey::graveyard_entry(graveyard_id, self.object_id()),
724                            ObjectValue::Trim,
725                        ),
726                    );
727                }
728            }
729        }
730        Ok(NeedsTrim(needs_trim))
731    }
732
733    /// Reads and decrypts a singular logical range.
734    pub async fn read_and_decrypt(
735        &self,
736        device_offset: u64,
737        file_offset: u64,
738        mut buffer: MutableBufferRef<'_>,
739        key_id: u64,
740    ) -> Result<(), Error> {
741        let store = self.store();
742        store.device_read_ops.fetch_add(1, Ordering::Relaxed);
743
744        let _watchdog = Watchdog::new(10, |count| {
745            warn!("Read has been stalled for {} seconds", count * 10);
746        });
747
748        let (_key_id, key) = self.get_key(Some(key_id)).await?;
749        if let Some(key) = key {
750            if let Some((dun, slot)) = key.crypt_ctx(self.object_id, file_offset) {
751                store
752                    .device
753                    .read_with_opts(
754                        device_offset as u64,
755                        buffer.reborrow(),
756                        ReadOptions { inline_crypto_options: InlineCryptoOptions { dun, slot } },
757                    )
758                    .await?;
759            } else {
760                store.device.read(device_offset, buffer.reborrow()).await?;
761                key.decrypt(self.object_id, device_offset, file_offset, buffer.as_mut_slice())?;
762            }
763        } else {
764            store.device.read(device_offset, buffer.reborrow()).await?;
765        }
766
767        Ok(())
768    }
769
770    /// Returns the specified key. If `key_id` is None, it will try and return the fscrypt key if
771    /// it is present, or the volume key if it isn't. If the fscrypt key is present, but the key
772    /// cannot be unwrapped, then this will return `FxfsError::NoKey`. If the volume is not
773    /// encrypted, this returns None.
774    pub async fn get_key(
775        &self,
776        key_id: Option<u64>,
777    ) -> Result<(u64, Option<Arc<dyn Cipher>>), Error> {
778        let store = self.store();
779        Ok(match self.encryption {
780            Encryption::None => (VOLUME_DATA_KEY_ID, None),
781            Encryption::CachedKeys => {
782                if let Some(key_id) = key_id {
783                    (
784                        key_id,
785                        Some(
786                            store
787                                .key_manager
788                                .get_key(
789                                    self.object_id,
790                                    store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
791                                    async || store.get_keys(self.object_id).await,
792                                    key_id,
793                                )
794                                .await?,
795                        ),
796                    )
797                } else {
798                    let (key_id, key) = store
799                        .key_manager
800                        .get_fscrypt_key_if_present(
801                            self.object_id,
802                            store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
803                            async || store.get_keys(self.object_id).await,
804                        )
805                        .await?;
806                    (key_id, Some(key))
807                }
808            }
809            Encryption::PermanentKeys => {
810                (VOLUME_DATA_KEY_ID, Some(store.key_manager.get(self.object_id).await?.unwrap()))
811            }
812        })
813    }
814
815    /// This will only work for a non-permanent volume data key. This is designed to be used with
816    /// extended attributes where we'll only create the key on demand for directories and encrypted
817    /// files.
818    async fn get_or_create_key(
819        &self,
820        transaction: &mut Transaction<'_>,
821    ) -> Result<Arc<dyn Cipher>, Error> {
822        let store = self.store();
823
824        // Fast path: try and get keys from the cache.
825        if let Some(key) = store.key_manager.get(self.object_id).await.context("get failed")? {
826            return Ok(key);
827        }
828
829        let crypt = store.crypt().ok_or_else(|| anyhow!("No crypt!"))?;
830
831        // Next, see if the keys are already created.
832        let (mut encryption_keys, mut cipher_set) = if let Some(item) =
833            store.tree.find(&ObjectKey::keys(self.object_id)).await.context("find failed")?
834        {
835            if let ObjectValue::Keys(encryption_keys) = item.value {
836                let cipher_set = store
837                    .key_manager
838                    .get_keys(
839                        self.object_id,
840                        crypt.as_ref(),
841                        &mut Some(async || Ok(encryption_keys.clone())),
842                        /* permanent= */ false,
843                        /* force= */ false,
844                    )
845                    .await
846                    .context("get_keys failed")?;
847                match cipher_set.find_key(VOLUME_DATA_KEY_ID) {
848                    FindKeyResult::NotFound => {}
849                    FindKeyResult::Unavailable(_) => return Err(FxfsError::NoKey.into()),
850                    FindKeyResult::Key(key) => return Ok(key),
851                }
852                (encryption_keys, (*cipher_set).clone())
853            } else {
854                return Err(anyhow!(FxfsError::Inconsistent));
855            }
856        } else {
857            Default::default()
858        };
859
860        // Proceed to create the key.  The transaction holds the required locks.
861        let (key, unwrapped_key) = crypt.create_key(self.object_id, KeyPurpose::Data).await?;
862        let cipher: Arc<dyn Cipher> = Arc::new(FxfsCipher::new(&unwrapped_key));
863
864        // Add new cipher to cloned cipher set. This will replace existing one
865        // if transaction is successful.
866        cipher_set.add_key(VOLUME_DATA_KEY_ID, CipherHolder::Cipher(cipher.clone()));
867        let cipher_set = Arc::new(cipher_set);
868
869        // Arrange for the CipherSet to be added to the cache when (and if) the transaction
870        // commits.
871        struct UnwrappedKeys {
872            object_id: u64,
873            new_keys: Arc<CipherSet>,
874        }
875
876        impl AssociatedObject for UnwrappedKeys {
877            fn will_apply_mutation(
878                &self,
879                _mutation: &Mutation,
880                object_id: u64,
881                manager: &ObjectManager,
882            ) {
883                manager.store(object_id).unwrap().key_manager.insert(
884                    self.object_id,
885                    self.new_keys.clone(),
886                    /* permanent= */ false,
887                );
888            }
889        }
890
891        encryption_keys.insert(VOLUME_DATA_KEY_ID, EncryptionKey::Fxfs(key).into());
892
893        transaction.add_with_object(
894            store.store_object_id(),
895            Mutation::replace_or_insert_object(
896                ObjectKey::keys(self.object_id),
897                ObjectValue::keys(encryption_keys),
898            ),
899            AssocObj::Owned(Box::new(UnwrappedKeys {
900                object_id: self.object_id,
901                new_keys: cipher_set,
902            })),
903        );
904
905        Ok(cipher)
906    }
907
908    pub async fn read(
909        &self,
910        attribute_id: u64,
911        offset: u64,
912        mut buf: MutableBufferRef<'_>,
913    ) -> Result<usize, Error> {
914        let fs = self.store().filesystem();
915        let guard = fs
916            .lock_manager()
917            .read_lock(lock_keys![LockKey::object_attribute(
918                self.store().store_object_id(),
919                self.object_id(),
920                attribute_id,
921            )])
922            .await;
923
924        let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
925        let item = self.store().tree().find(&key).await?;
926        let size = match item {
927            Some(item) if item.key == key => match item.value {
928                ObjectValue::Attribute { size, .. } => size,
929                _ => bail!(FxfsError::Inconsistent),
930            },
931            _ => return Ok(0),
932        };
933        if offset >= size {
934            return Ok(0);
935        }
936        let length = min(buf.len() as u64, size - offset) as usize;
937        buf = buf.subslice_mut(0..length);
938        self.read_unchecked(attribute_id, offset, buf, &guard).await?;
939        Ok(length)
940    }
941
942    /// Read `buf.len()` bytes from the attribute `attribute_id`, starting at `offset`, into `buf`.
943    /// It's required that a read lock on this attribute id is taken before this is called.
944    ///
945    /// This function doesn't do any size checking - any portion of `buf` past the end of the file
946    /// will be filled with zeros. The caller is responsible for enforcing the file size on reads.
947    /// This is because, just looking at the extents, we can't tell the difference between the file
948    /// actually ending and there just being a section at the end with no data (since attributes
949    /// are sparse).
950    pub(super) async fn read_unchecked(
951        &self,
952        attribute_id: u64,
953        mut offset: u64,
954        mut buf: MutableBufferRef<'_>,
955        _guard: &ReadGuard<'_>,
956    ) -> Result<(), Error> {
957        if buf.len() == 0 {
958            return Ok(());
959        }
960        let end_offset = offset + buf.len() as u64;
961
962        self.store().logical_read_ops.fetch_add(1, Ordering::Relaxed);
963
964        // Whilst the read offset must be aligned to the filesystem block size, the buffer need only
965        // be aligned to the device's block size.
966        let block_size = self.block_size() as u64;
967        let device_block_size = self.store().device.block_size() as u64;
968        assert_eq!(offset % block_size, 0);
969        assert_eq!(buf.range().start as u64 % device_block_size, 0);
970        let tree = &self.store().tree;
971        let layer_set = tree.layer_set();
972        let mut merger = layer_set.merger();
973        let mut iter = merger
974            .query(Query::LimitedRange(&ObjectKey::extent(
975                self.object_id(),
976                attribute_id,
977                offset..end_offset,
978            )))
979            .await?;
980        let end_align = ((offset + buf.len() as u64) % block_size) as usize;
981        let trace = self.trace();
982        let reads = FuturesUnordered::new();
983        while let Some(ItemRef {
984            key:
985                ObjectKey {
986                    object_id,
987                    data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
988                },
989            value: ObjectValue::Extent(extent_value),
990            ..
991        }) = iter.get()
992        {
993            if *object_id != self.object_id() || *attr_id != attribute_id {
994                break;
995            }
996            ensure!(
997                extent_key.range.is_valid() && extent_key.range.is_aligned(block_size),
998                FxfsError::Inconsistent
999            );
1000            if extent_key.range.start > offset {
1001                // Zero everything up to the start of the extent.
1002                let to_zero = min(extent_key.range.start - offset, buf.len() as u64) as usize;
1003                for i in &mut buf.as_mut_slice()[..to_zero] {
1004                    *i = 0;
1005                }
1006                buf = buf.subslice_mut(to_zero..);
1007                if buf.is_empty() {
1008                    break;
1009                }
1010                offset += to_zero as u64;
1011            }
1012
1013            if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1014                let mut device_offset = device_offset + (offset - extent_key.range.start);
1015                let key_id = *key_id;
1016
1017                let to_copy = min(buf.len() - end_align, (extent_key.range.end - offset) as usize);
1018                if to_copy > 0 {
1019                    if trace {
1020                        info!(
1021                            store_id = self.store().store_object_id(),
1022                            oid = self.object_id(),
1023                            device_range:? = (device_offset..device_offset + to_copy as u64),
1024                            offset,
1025                            range:? = extent_key.range,
1026                            block_size;
1027                            "R",
1028                        );
1029                    }
1030                    let (mut head, tail) = buf.split_at_mut(to_copy);
1031                    let maybe_bitmap = match mode {
1032                        ExtentMode::OverwritePartial(bitmap) => {
1033                            let mut read_bitmap = bitmap.clone().split_off(
1034                                ((offset - extent_key.range.start) / block_size) as usize,
1035                            );
1036                            read_bitmap.truncate(to_copy / block_size as usize);
1037                            Some(read_bitmap)
1038                        }
1039                        _ => None,
1040                    };
1041                    reads.push(async move {
1042                        self.read_and_decrypt(device_offset, offset, head.reborrow(), key_id)
1043                            .await?;
1044                        if let Some(bitmap) = maybe_bitmap {
1045                            apply_bitmap_zeroing(self.block_size() as usize, &bitmap, head);
1046                        }
1047                        Ok::<(), Error>(())
1048                    });
1049                    buf = tail;
1050                    if buf.is_empty() {
1051                        break;
1052                    }
1053                    offset += to_copy as u64;
1054                    device_offset += to_copy as u64;
1055                }
1056
1057                // Deal with end alignment by reading the existing contents into an alignment
1058                // buffer.
1059                if offset < extent_key.range.end && end_align > 0 {
1060                    if let ExtentMode::OverwritePartial(bitmap) = mode {
1061                        let bitmap_offset = (offset - extent_key.range.start) / block_size;
1062                        if !bitmap.get(bitmap_offset as usize).ok_or(FxfsError::Inconsistent)? {
1063                            // If this block isn't actually initialized, skip it.
1064                            break;
1065                        }
1066                    }
1067                    let mut align_buf =
1068                        self.store().device.allocate_buffer(block_size as usize).await;
1069                    if trace {
1070                        info!(
1071                            store_id = self.store().store_object_id(),
1072                            oid = self.object_id(),
1073                            device_range:? = (device_offset..device_offset + align_buf.len() as u64);
1074                            "RT",
1075                        );
1076                    }
1077                    self.read_and_decrypt(device_offset, offset, align_buf.as_mut(), key_id)
1078                        .await?;
1079                    buf.as_mut_slice().copy_from_slice(&align_buf.as_slice()[..end_align]);
1080                    buf = buf.subslice_mut(0..0);
1081                    break;
1082                }
1083            } else if extent_key.range.end >= offset + buf.len() as u64 {
1084                // Deleted extent covers remainder, so we're done.
1085                break;
1086            }
1087
1088            iter.advance().await?;
1089        }
1090        reads.try_collect::<()>().await?;
1091        buf.as_mut_slice().fill(0);
1092        Ok(())
1093    }
1094
1095    /// Reads an entire attribute.
1096    pub async fn read_attr(&self, attribute_id: u64) -> Result<Option<Box<[u8]>>, Error> {
1097        let store = self.store();
1098        let tree = &store.tree;
1099        let layer_set = tree.layer_set();
1100        let mut merger = layer_set.merger();
1101        let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1102        let mut iter = merger.query(Query::FullRange(&key)).await?;
1103        let (mut buffer, size) = match iter.get() {
1104            Some(item) if item.key == &key => match item.value {
1105                ObjectValue::Attribute { size, .. } => {
1106                    // TODO(https://fxbug.dev/42073113): size > max buffer size
1107                    (
1108                        store
1109                            .device
1110                            .allocate_buffer(round_up(*size, self.block_size()).unwrap() as usize)
1111                            .await,
1112                        *size as usize,
1113                    )
1114                }
1115                // Attribute was deleted.
1116                ObjectValue::None => return Ok(None),
1117                _ => bail!(FxfsError::Inconsistent),
1118            },
1119            _ => return Ok(None),
1120        };
1121        store.logical_read_ops.fetch_add(1, Ordering::Relaxed);
1122        let mut last_offset = 0;
1123        loop {
1124            iter.advance().await?;
1125            match iter.get() {
1126                Some(ItemRef {
1127                    key:
1128                        ObjectKey {
1129                            object_id,
1130                            data:
1131                                ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
1132                        },
1133                    value: ObjectValue::Extent(extent_value),
1134                    ..
1135                }) if *object_id == self.object_id() && *attr_id == attribute_id => {
1136                    if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1137                        let offset = extent_key.range.start as usize;
1138                        buffer.as_mut_slice()[last_offset..offset].fill(0);
1139                        let end = std::cmp::min(extent_key.range.end as usize, buffer.len());
1140                        let maybe_bitmap = match mode {
1141                            ExtentMode::OverwritePartial(bitmap) => {
1142                                // The caller has to adjust the bitmap if necessary, but we always
1143                                // start from the beginning of any extent, so we only truncate.
1144                                let mut read_bitmap = bitmap.clone();
1145                                read_bitmap.truncate(
1146                                    (end - extent_key.range.start as usize)
1147                                        / self.block_size() as usize,
1148                                );
1149                                Some(read_bitmap)
1150                            }
1151                            _ => None,
1152                        };
1153                        self.read_and_decrypt(
1154                            *device_offset,
1155                            extent_key.range.start,
1156                            buffer.subslice_mut(offset..end as usize),
1157                            *key_id,
1158                        )
1159                        .await?;
1160                        if let Some(bitmap) = maybe_bitmap {
1161                            apply_bitmap_zeroing(
1162                                self.block_size() as usize,
1163                                &bitmap,
1164                                buffer.subslice_mut(offset..end as usize),
1165                            );
1166                        }
1167                        last_offset = end;
1168                        if last_offset >= size {
1169                            break;
1170                        }
1171                    }
1172                }
1173                _ => break,
1174            }
1175        }
1176        buffer.as_mut_slice()[std::cmp::min(last_offset, size)..].fill(0);
1177        Ok(Some(buffer.as_slice()[..size].into()))
1178    }
1179
1180    /// Writes potentially unaligned data at `device_offset` and returns checksums if requested.
1181    /// The data will be encrypted if necessary.  `buf` is mutable as an optimization, since the
1182    /// write may require encryption, we can encrypt the buffer in-place rather than copying to
1183    /// another buffer if the write is already aligned.
1184    ///
1185    /// NOTE: This will not create keys if they are missing (it will fail with an error if that
1186    /// happens to be the case).
1187    pub async fn write_at(
1188        &self,
1189        attribute_id: u64,
1190        offset: u64,
1191        buf: MutableBufferRef<'_>,
1192        key_id: Option<u64>,
1193        mut device_offset: u64,
1194    ) -> Result<MaybeChecksums, Error> {
1195        let mut transfer_buf;
1196        let block_size = self.block_size();
1197        let (range, mut transfer_buf_ref) =
1198            if offset % block_size == 0 && buf.len() as u64 % block_size == 0 {
1199                (offset..offset + buf.len() as u64, buf)
1200            } else {
1201                let (range, buf) = self.align_buffer(attribute_id, offset, buf.as_ref()).await?;
1202                transfer_buf = buf;
1203                device_offset -= offset - range.start;
1204                (range, transfer_buf.as_mut())
1205            };
1206
1207        let mut crypt_ctx = None;
1208        if let (_, Some(key)) = self.get_key(key_id).await? {
1209            if let Some(ctx) = key.crypt_ctx(self.object_id, range.start) {
1210                crypt_ctx = Some(ctx);
1211            } else {
1212                key.encrypt(
1213                    self.object_id,
1214                    device_offset,
1215                    range.start,
1216                    transfer_buf_ref.as_mut_slice(),
1217                )?;
1218            }
1219        }
1220        self.write_aligned(transfer_buf_ref.as_ref(), device_offset, crypt_ctx).await
1221    }
1222
1223    /// Writes to multiple ranges with data provided in `buf`. This function is specifically
1224    /// designed for migration purposes, allowing raw writes to the device without updating
1225    /// object metadata like allocated size or mtime. It's essential for scenarios where
1226    /// data needs to be transferred directly without triggering standard filesystem operations.
1227    #[cfg(feature = "migration")]
1228    pub async fn raw_multi_write(
1229        &self,
1230        transaction: &mut Transaction<'_>,
1231        attribute_id: u64,
1232        key_id: Option<u64>,
1233        ranges: &[Range<u64>],
1234        buf: MutableBufferRef<'_>,
1235    ) -> Result<(), Error> {
1236        self.multi_write_internal(transaction, attribute_id, key_id, ranges, buf).await?;
1237        Ok(())
1238    }
1239
1240    /// This is a low-level write function that writes to multiple ranges. Users should generally
1241    /// use `multi_write` instead of this function as this does not update the object's allocated
1242    /// size, mtime, atime, etc.
1243    ///
1244    /// Returns (allocated, deallocated) bytes on success.
1245    async fn multi_write_internal(
1246        &self,
1247        transaction: &mut Transaction<'_>,
1248        attribute_id: u64,
1249        key_id: Option<u64>,
1250        ranges: &[Range<u64>],
1251        mut buf: MutableBufferRef<'_>,
1252    ) -> Result<(u64, u64), Error> {
1253        if buf.is_empty() {
1254            return Ok((0, 0));
1255        }
1256        let block_size = self.block_size();
1257        let store = self.store();
1258        let store_id = store.store_object_id();
1259
1260        // The only key we allow to be created on-the-fly is a non permanent key wrapped with the
1261        // volume data key.
1262        let (key_id, key) = if key_id == Some(VOLUME_DATA_KEY_ID)
1263            && matches!(self.encryption, Encryption::CachedKeys)
1264        {
1265            (
1266                VOLUME_DATA_KEY_ID,
1267                Some(
1268                    self.get_or_create_key(transaction)
1269                        .await
1270                        .context("get_or_create_key failed")?,
1271                ),
1272            )
1273        } else {
1274            self.get_key(key_id).await?
1275        };
1276        if let Some(key) = &key {
1277            if !key.supports_inline_encryption() {
1278                let mut slice = buf.as_mut_slice();
1279                for r in ranges {
1280                    let l = r.end - r.start;
1281                    let (head, tail) = slice.split_at_mut(l as usize);
1282                    key.encrypt(
1283                        self.object_id,
1284                        0, /* TODO(https://fxbug.dev/421269588): plumb through device_offset. */
1285                        r.start,
1286                        head,
1287                    )?;
1288                    slice = tail;
1289                }
1290            }
1291        }
1292
1293        let mut allocated = 0;
1294        let allocator = store.allocator();
1295        let trace = self.trace();
1296        let mut writes = FuturesOrdered::new();
1297
1298        let mut logical_ranges = ranges.iter();
1299        let mut current_range = logical_ranges.next().unwrap().clone();
1300
1301        while !buf.is_empty() {
1302            let mut device_range = allocator
1303                .allocate(transaction, store_id, buf.len() as u64)
1304                .await
1305                .context("allocation failed")?;
1306            if trace {
1307                info!(
1308                    store_id,
1309                    oid = self.object_id(),
1310                    device_range:?,
1311                    len = device_range.end - device_range.start;
1312                    "A",
1313                );
1314            }
1315            let mut device_range_len = device_range.end - device_range.start;
1316            allocated += device_range_len;
1317            // If inline encryption is NOT supported, this loop should only happen once.
1318            while device_range_len > 0 {
1319                if current_range.end <= current_range.start {
1320                    current_range = logical_ranges.next().unwrap().clone();
1321                }
1322                let (crypt_ctx, split) = if let Some(key) = &key {
1323                    if key.supports_inline_encryption() {
1324                        let split = std::cmp::min(
1325                            current_range.end - current_range.start,
1326                            device_range_len,
1327                        );
1328                        current_range.start += split;
1329
1330                        (key.crypt_ctx(self.object_id, current_range.start), split)
1331                    } else {
1332                        (None, device_range_len)
1333                    }
1334                } else {
1335                    (None, device_range_len)
1336                };
1337
1338                let (head, tail) = buf.split_at_mut(split as usize);
1339                buf = tail;
1340
1341                writes.push_back(async move {
1342                    let len = head.len() as u64;
1343                    Result::<_, Error>::Ok((
1344                        device_range.start,
1345                        len,
1346                        self.write_aligned(head.as_ref(), device_range.start, crypt_ctx).await?,
1347                    ))
1348                });
1349                device_range.start += split;
1350                device_range_len = device_range.end - device_range.start;
1351            }
1352        }
1353
1354        self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1355        let ((mutations, checksums), deallocated) = try_join!(
1356            async {
1357                let mut current_range = 0..0;
1358                let mut mutations = Vec::new();
1359                let mut out_checksums = Vec::new();
1360                let mut ranges = ranges.iter();
1361                while let Some((mut device_offset, mut len, mut checksums)) =
1362                    writes.try_next().await?
1363                {
1364                    while len > 0 {
1365                        if current_range.end <= current_range.start {
1366                            current_range = ranges.next().unwrap().clone();
1367                        }
1368                        let chunk_len = std::cmp::min(len, current_range.end - current_range.start);
1369                        let tail = checksums.split_off((chunk_len / block_size) as usize);
1370                        if let Some(checksums) = checksums.maybe_as_ref() {
1371                            out_checksums.push((
1372                                device_offset..device_offset + chunk_len,
1373                                checksums.to_owned(),
1374                            ));
1375                        }
1376                        mutations.push(Mutation::merge_object(
1377                            ObjectKey::extent(
1378                                self.object_id(),
1379                                attribute_id,
1380                                current_range.start..current_range.start + chunk_len,
1381                            ),
1382                            ObjectValue::Extent(ExtentValue::new(
1383                                device_offset,
1384                                checksums.to_mode(),
1385                                key_id,
1386                            )),
1387                        ));
1388                        checksums = tail;
1389                        device_offset += chunk_len;
1390                        len -= chunk_len;
1391                        current_range.start += chunk_len;
1392                    }
1393                }
1394                Result::<_, Error>::Ok((mutations, out_checksums))
1395            },
1396            async {
1397                let mut deallocated = 0;
1398                for r in ranges {
1399                    deallocated +=
1400                        self.deallocate_old_extents(transaction, attribute_id, r.clone()).await?;
1401                }
1402                Result::<_, Error>::Ok(deallocated)
1403            }
1404        )?;
1405
1406        for m in mutations {
1407            transaction.add(store_id, m);
1408        }
1409
1410        // Only store checksums in the journal if barriers are not enabled.
1411        if !store.filesystem().options().barriers_enabled {
1412            for (r, c) in checksums {
1413                transaction.add_checksum(r, c, true);
1414            }
1415        }
1416        Ok((allocated, deallocated))
1417    }
1418
1419    /// Writes to multiple ranges with data provided in `buf`.  The buffer can be modified in place
1420    /// if encryption takes place.  The ranges must all be aligned and no change to content size is
1421    /// applied; the caller is responsible for updating size if required.  If `key_id` is None, it
1422    /// means pick the default key for the object which is the fscrypt key if present, or the volume
1423    /// data key, or no key if it's an unencrypted file.
1424    pub async fn multi_write(
1425        &self,
1426        transaction: &mut Transaction<'_>,
1427        attribute_id: u64,
1428        key_id: Option<u64>,
1429        ranges: &[Range<u64>],
1430        buf: MutableBufferRef<'_>,
1431    ) -> Result<(), Error> {
1432        let (allocated, deallocated) =
1433            self.multi_write_internal(transaction, attribute_id, key_id, ranges, buf).await?;
1434        if allocated == 0 && deallocated == 0 {
1435            return Ok(());
1436        }
1437        self.update_allocated_size(transaction, allocated, deallocated).await
1438    }
1439
1440    /// Write data to overwrite extents with the provided set of ranges. This makes a strong
1441    /// assumption that the ranges are actually going to be already allocated overwrite extents and
1442    /// will error out or do something wrong if they aren't. It also assumes the ranges passed to
1443    /// it are sorted.
1444    pub async fn multi_overwrite<'a>(
1445        &'a self,
1446        transaction: &mut Transaction<'a>,
1447        attr_id: u64,
1448        ranges: &[Range<u64>],
1449        mut buf: MutableBufferRef<'_>,
1450    ) -> Result<(), Error> {
1451        if buf.is_empty() {
1452            return Ok(());
1453        }
1454        let block_size = self.block_size();
1455        let store = self.store();
1456        let tree = store.tree();
1457        let store_id = store.store_object_id();
1458
1459        let (key_id, key) = self.get_key(None).await?;
1460        if let Some(key) = &key {
1461            if !key.supports_inline_encryption() {
1462                let mut slice = buf.as_mut_slice();
1463                for r in ranges {
1464                    let l = r.end - r.start;
1465                    let (head, tail) = slice.split_at_mut(l as usize);
1466                    key.encrypt(
1467                        self.object_id,
1468                        0, /* TODO(https://fxbug.dev/421269588): plumb through device_offset. */
1469                        r.start,
1470                        head,
1471                    )?;
1472                    slice = tail;
1473                }
1474            }
1475        }
1476
1477        let mut range_iter = ranges.into_iter();
1478        // There should be at least one range if the buffer has data in it
1479        let mut target_range = range_iter.next().unwrap().clone();
1480        let mut mutations = Vec::new();
1481        let writes = FuturesUnordered::new();
1482
1483        let layer_set = tree.layer_set();
1484        let mut merger = layer_set.merger();
1485        let mut iter = merger
1486            .query(Query::FullRange(&ObjectKey::attribute(
1487                self.object_id(),
1488                attr_id,
1489                AttributeKey::Extent(ExtentKey::search_key_from_offset(target_range.start)),
1490            )))
1491            .await?;
1492
1493        loop {
1494            match iter.get() {
1495                Some(ItemRef {
1496                    key:
1497                        ObjectKey {
1498                            object_id,
1499                            data:
1500                                ObjectKeyData::Attribute(
1501                                    attribute_id,
1502                                    AttributeKey::Extent(ExtentKey { range }),
1503                                ),
1504                        },
1505                    value: ObjectValue::Extent(extent_value),
1506                    ..
1507                }) if *object_id == self.object_id() && *attribute_id == attr_id => {
1508                    // If this extent ends before the target range starts (not possible on the
1509                    // first loop because of the query parameters but possible on further loops),
1510                    // advance until we find a the next one we care about.
1511                    if range.end <= target_range.start {
1512                        iter.advance().await?;
1513                        continue;
1514                    }
1515                    let (device_offset, mode) = match extent_value {
1516                        ExtentValue::None => {
1517                            return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1518                                format!(
1519                                    "multi_overwrite failed: target_range ({}, {}) overlaps with \
1520                                deleted extent found at ({}, {})",
1521                                    target_range.start, target_range.end, range.start, range.end,
1522                                )
1523                            });
1524                        }
1525                        ExtentValue::Some { device_offset, mode, .. } => (device_offset, mode),
1526                    };
1527                    // The ranges passed to this function should already by allocated, so
1528                    // extent records should exist for them.
1529                    if range.start > target_range.start {
1530                        return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1531                            format!(
1532                                "multi_overwrite failed: target range ({}, {}) starts before first \
1533                            extent found at ({}, {})",
1534                                target_range.start, target_range.end, range.start, range.end,
1535                            )
1536                        });
1537                    }
1538                    let mut bitmap = match mode {
1539                        ExtentMode::Raw | ExtentMode::Cow(_) => {
1540                            return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1541                                format!(
1542                                    "multi_overwrite failed: \
1543                            extent from ({}, {}) which overlaps target range ({}, {}) had the \
1544                            wrong extent mode",
1545                                    range.start, range.end, target_range.start, target_range.end,
1546                                )
1547                            });
1548                        }
1549                        ExtentMode::OverwritePartial(bitmap) => {
1550                            OverwriteBitmaps::new(bitmap.clone())
1551                        }
1552                        ExtentMode::Overwrite => OverwriteBitmaps::None,
1553                    };
1554                    loop {
1555                        let offset_within_extent = target_range.start - range.start;
1556                        let bitmap_offset = offset_within_extent / block_size;
1557                        let write_device_offset = *device_offset + offset_within_extent;
1558                        let write_end = min(range.end, target_range.end);
1559                        let write_len = write_end - target_range.start;
1560                        let write_device_range =
1561                            write_device_offset..write_device_offset + write_len;
1562                        let (current_buf, remaining_buf) = buf.split_at_mut(write_len as usize);
1563
1564                        bitmap.set_offset(bitmap_offset as usize);
1565                        let checksum_ranges = ChecksumRangeChunk::group_first_write_ranges(
1566                            &mut bitmap,
1567                            block_size,
1568                            write_device_range,
1569                        );
1570
1571                        let crypt_ctx = if let Some(key) = &key {
1572                            key.crypt_ctx(self.object_id, target_range.start)
1573                        } else {
1574                            None
1575                        };
1576
1577                        writes.push(async move {
1578                            let maybe_checksums = self
1579                                .write_aligned(current_buf.as_ref(), write_device_offset, crypt_ctx)
1580                                .await?;
1581                            Ok::<_, Error>(match maybe_checksums {
1582                                MaybeChecksums::None => Vec::new(),
1583                                MaybeChecksums::Fletcher(checksums) => checksum_ranges
1584                                    .into_iter()
1585                                    .map(
1586                                        |ChecksumRangeChunk {
1587                                             checksum_range,
1588                                             device_range,
1589                                             is_first_write,
1590                                         }| {
1591                                            (
1592                                                device_range,
1593                                                checksums[checksum_range].to_vec(),
1594                                                is_first_write,
1595                                            )
1596                                        },
1597                                    )
1598                                    .collect(),
1599                            })
1600                        });
1601                        buf = remaining_buf;
1602                        target_range.start += write_len;
1603                        if target_range.start == target_range.end {
1604                            match range_iter.next() {
1605                                None => break,
1606                                Some(next_range) => target_range = next_range.clone(),
1607                            }
1608                        }
1609                        if range.end <= target_range.start {
1610                            break;
1611                        }
1612                    }
1613                    if let Some((mut bitmap, write_bitmap)) = bitmap.take_bitmaps() {
1614                        if bitmap.or(&write_bitmap) {
1615                            let mode = if bitmap.all() {
1616                                ExtentMode::Overwrite
1617                            } else {
1618                                ExtentMode::OverwritePartial(bitmap)
1619                            };
1620                            mutations.push(Mutation::merge_object(
1621                                ObjectKey::extent(self.object_id(), attr_id, range.clone()),
1622                                ObjectValue::Extent(ExtentValue::new(*device_offset, mode, key_id)),
1623                            ))
1624                        }
1625                    }
1626                    if target_range.start == target_range.end {
1627                        break;
1628                    }
1629                    iter.advance().await?;
1630                }
1631                // We've either run past the end of the existing extents or something is wrong with
1632                // the tree. The main section should break if it finishes the ranges, so either
1633                // case, this is an error.
1634                _ => bail!(anyhow!(FxfsError::Internal).context(
1635                    "found a non-extent object record while there were still ranges to process"
1636                )),
1637            }
1638        }
1639
1640        let checksums = writes.try_collect::<Vec<_>>().await?;
1641        // Only store checksums in the journal if barriers are not enabled.
1642        if !store.filesystem().options().barriers_enabled {
1643            for (r, c, first_write) in checksums.into_iter().flatten() {
1644                transaction.add_checksum(r, c, first_write);
1645            }
1646        }
1647
1648        for m in mutations {
1649            transaction.add(store_id, m);
1650        }
1651
1652        Ok(())
1653    }
1654
1655    /// Writes an attribute that should not already exist and therefore does not require trimming.
1656    /// Breaks up the write into multiple transactions if `data.len()` is larger than `batch_size`.
1657    /// If writing the attribute requires multiple transactions, adds the attribute to the
1658    /// graveyard. The caller is responsible for removing the attribute from the graveyard when it
1659    /// commits the last transaction.  This always writes using a key wrapped with the volume data
1660    /// key.
1661    #[trace]
1662    pub async fn write_new_attr_in_batches<'a>(
1663        &'a self,
1664        transaction: &mut Transaction<'a>,
1665        attribute_id: u64,
1666        data: &[u8],
1667        batch_size: usize,
1668    ) -> Result<(), Error> {
1669        transaction.add(
1670            self.store().store_object_id,
1671            Mutation::replace_or_insert_object(
1672                ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1673                ObjectValue::attribute(data.len() as u64, false),
1674            ),
1675        );
1676        let chunks = data.chunks(batch_size);
1677        let num_chunks = chunks.len();
1678        if num_chunks > 1 {
1679            transaction.add(
1680                self.store().store_object_id,
1681                Mutation::replace_or_insert_object(
1682                    ObjectKey::graveyard_attribute_entry(
1683                        self.store().graveyard_directory_object_id(),
1684                        self.object_id(),
1685                        attribute_id,
1686                    ),
1687                    ObjectValue::Some,
1688                ),
1689            );
1690        }
1691        let mut start_offset = 0;
1692        for (i, chunk) in chunks.enumerate() {
1693            let rounded_len = round_up(chunk.len() as u64, self.block_size()).unwrap();
1694            let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1695            let slice = buffer.as_mut_slice();
1696            slice[..chunk.len()].copy_from_slice(chunk);
1697            slice[chunk.len()..].fill(0);
1698            self.multi_write(
1699                transaction,
1700                attribute_id,
1701                Some(VOLUME_DATA_KEY_ID),
1702                &[start_offset..start_offset + rounded_len],
1703                buffer.as_mut(),
1704            )
1705            .await?;
1706            start_offset += rounded_len;
1707            // Do not commit the last chunk.
1708            if i < num_chunks - 1 {
1709                transaction.commit_and_continue().await?;
1710            }
1711        }
1712        Ok(())
1713    }
1714
1715    /// Writes an entire attribute. Returns whether or not the attribute needs to continue being
1716    /// trimmed - if the new data is shorter than the old data, this will trim any extents beyond
1717    /// the end of the new size, but if there were too many for a single transaction, a commit
1718    /// needs to be made before trimming again, so the responsibility is left to the caller so as
1719    /// to not accidentally split the transaction when it's not in a consistent state.  This will
1720    /// write using the volume data key; the fscrypt key is not supported.
1721    pub async fn write_attr(
1722        &self,
1723        transaction: &mut Transaction<'_>,
1724        attribute_id: u64,
1725        data: &[u8],
1726    ) -> Result<NeedsTrim, Error> {
1727        let rounded_len = round_up(data.len() as u64, self.block_size()).unwrap();
1728        let store = self.store();
1729        let tree = store.tree();
1730        let should_trim = if let Some(item) = tree
1731            .find(&ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute))
1732            .await?
1733        {
1734            match item.value {
1735                ObjectValue::Attribute { size: _, has_overwrite_extents: true } => {
1736                    bail!(
1737                        anyhow!(FxfsError::Inconsistent)
1738                            .context("write_attr on an attribute with overwrite extents")
1739                    )
1740                }
1741                ObjectValue::Attribute { size, .. } => (data.len() as u64) < size,
1742                _ => bail!(FxfsError::Inconsistent),
1743            }
1744        } else {
1745            false
1746        };
1747        let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1748        let slice = buffer.as_mut_slice();
1749        slice[..data.len()].copy_from_slice(data);
1750        slice[data.len()..].fill(0);
1751        self.multi_write(
1752            transaction,
1753            attribute_id,
1754            Some(VOLUME_DATA_KEY_ID),
1755            &[0..rounded_len],
1756            buffer.as_mut(),
1757        )
1758        .await?;
1759        transaction.add(
1760            self.store().store_object_id,
1761            Mutation::replace_or_insert_object(
1762                ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1763                ObjectValue::attribute(data.len() as u64, false),
1764            ),
1765        );
1766        if should_trim {
1767            self.shrink(transaction, attribute_id, data.len() as u64).await
1768        } else {
1769            Ok(NeedsTrim(false))
1770        }
1771    }
1772
1773    pub async fn list_extended_attributes(&self) -> Result<Vec<Vec<u8>>, Error> {
1774        let layer_set = self.store().tree().layer_set();
1775        let mut merger = layer_set.merger();
1776        // Seek to the first extended attribute key for this object.
1777        let mut iter = merger
1778            .query(Query::FullRange(&ObjectKey::extended_attribute(self.object_id(), Vec::new())))
1779            .await?;
1780        let mut out = Vec::new();
1781        while let Some(item) = iter.get() {
1782            // Skip deleted extended attributes.
1783            if item.value != &ObjectValue::None {
1784                match item.key {
1785                    ObjectKey { object_id, data: ObjectKeyData::ExtendedAttribute { name } } => {
1786                        if self.object_id() != *object_id {
1787                            bail!(
1788                                anyhow!(FxfsError::Inconsistent)
1789                                    .context("list_extended_attributes: wrong object id")
1790                            )
1791                        }
1792                        out.push(name.clone());
1793                    }
1794                    // Once we hit something that isn't an extended attribute key, we've gotten to
1795                    // the end.
1796                    _ => break,
1797                }
1798            }
1799            iter.advance().await?;
1800        }
1801        Ok(out)
1802    }
1803
1804    /// Looks up the values for the extended attribute `fio::SELINUX_CONTEXT_NAME`, returning it
1805    /// if it is found inline. If it is not inline, it will request use of the
1806    /// `get_extended_attributes` method. If the entry doesn't exist at all, returns None.
1807    pub async fn get_inline_selinux_context(&self) -> Result<Option<fio::SelinuxContext>, Error> {
1808        // This optimization is only useful as long as the attribute is smaller than inline sizes.
1809        // Avoid reading the data out of the attributes.
1810        const_assert!(fio::MAX_SELINUX_CONTEXT_ATTRIBUTE_LEN as usize <= MAX_INLINE_XATTR_SIZE);
1811        let item = match self
1812            .store()
1813            .tree()
1814            .find(&ObjectKey::extended_attribute(
1815                self.object_id(),
1816                fio::SELINUX_CONTEXT_NAME.into(),
1817            ))
1818            .await?
1819        {
1820            Some(item) => item,
1821            None => return Ok(None),
1822        };
1823        match item.value {
1824            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => {
1825                Ok(Some(fio::SelinuxContext::Data(value)))
1826            }
1827            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(_)) => {
1828                Ok(Some(fio::SelinuxContext::UseExtendedAttributes(fio::EmptyStruct {})))
1829            }
1830            _ => {
1831                bail!(
1832                    anyhow!(FxfsError::Inconsistent)
1833                        .context("get_inline_extended_attribute: Expected ExtendedAttribute value")
1834                )
1835            }
1836        }
1837    }
1838
1839    pub async fn get_extended_attribute(&self, name: Vec<u8>) -> Result<Vec<u8>, Error> {
1840        let item = self
1841            .store()
1842            .tree()
1843            .find(&ObjectKey::extended_attribute(self.object_id(), name))
1844            .await?
1845            .ok_or(FxfsError::NotFound)?;
1846        match item.value {
1847            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => Ok(value),
1848            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1849                Ok(self.read_attr(id).await?.ok_or(FxfsError::Inconsistent)?.into_vec())
1850            }
1851            _ => {
1852                bail!(
1853                    anyhow!(FxfsError::Inconsistent)
1854                        .context("get_extended_attribute: Expected ExtendedAttribute value")
1855                )
1856            }
1857        }
1858    }
1859
1860    pub async fn set_extended_attribute(
1861        &self,
1862        name: Vec<u8>,
1863        value: Vec<u8>,
1864        mode: SetExtendedAttributeMode,
1865    ) -> Result<(), Error> {
1866        let store = self.store();
1867        let fs = store.filesystem();
1868        // NB: We need to take this lock before we potentially look up the value to prevent racing
1869        // with another set.
1870        let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
1871        let mut transaction = fs.new_transaction(keys, Options::default()).await?;
1872        self.set_extended_attribute_impl(name, value, mode, &mut transaction).await?;
1873        transaction.commit().await?;
1874        Ok(())
1875    }
1876
1877    async fn set_extended_attribute_impl(
1878        &self,
1879        name: Vec<u8>,
1880        value: Vec<u8>,
1881        mode: SetExtendedAttributeMode,
1882        transaction: &mut Transaction<'_>,
1883    ) -> Result<(), Error> {
1884        ensure!(name.len() <= MAX_XATTR_NAME_SIZE, FxfsError::TooBig);
1885        ensure!(value.len() <= MAX_XATTR_VALUE_SIZE, FxfsError::TooBig);
1886        let tree = self.store().tree();
1887        let object_key = ObjectKey::extended_attribute(self.object_id(), name);
1888
1889        let existing_attribute_id = {
1890            let (found, existing_attribute_id) = match tree.find(&object_key).await? {
1891                None => (false, None),
1892                Some(Item { value, .. }) => (
1893                    true,
1894                    match value {
1895                        ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
1896                        ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1897                            Some(id)
1898                        }
1899                        _ => bail!(
1900                            anyhow!(FxfsError::Inconsistent)
1901                                .context("expected extended attribute value")
1902                        ),
1903                    },
1904                ),
1905            };
1906            match mode {
1907                SetExtendedAttributeMode::Create if found => {
1908                    bail!(FxfsError::AlreadyExists)
1909                }
1910                SetExtendedAttributeMode::Replace if !found => {
1911                    bail!(FxfsError::NotFound)
1912                }
1913                _ => (),
1914            }
1915            existing_attribute_id
1916        };
1917
1918        if let Some(attribute_id) = existing_attribute_id {
1919            // If we already have an attribute id allocated for this extended attribute, we always
1920            // use it, even if the value has shrunk enough to be stored inline. We don't need to
1921            // worry about trimming here for the same reason we don't need to worry about it when
1922            // we delete xattrs - they simply aren't large enough to ever need more than one
1923            // transaction.
1924            let _ = self.write_attr(transaction, attribute_id, &value).await?;
1925        } else if value.len() <= MAX_INLINE_XATTR_SIZE {
1926            transaction.add(
1927                self.store().store_object_id(),
1928                Mutation::replace_or_insert_object(
1929                    object_key,
1930                    ObjectValue::inline_extended_attribute(value),
1931                ),
1932            );
1933        } else {
1934            // If there isn't an existing attribute id and we are going to store the value in
1935            // an attribute, find the next empty attribute id in the range. We search for fxfs
1936            // attribute records specifically, instead of the extended attribute records, because
1937            // even if the extended attribute record is removed the attribute may not be fully
1938            // trimmed yet.
1939            let mut attribute_id = EXTENDED_ATTRIBUTE_RANGE_START;
1940            let layer_set = tree.layer_set();
1941            let mut merger = layer_set.merger();
1942            let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1943            let mut iter = merger.query(Query::FullRange(&key)).await?;
1944            loop {
1945                match iter.get() {
1946                    // None means the key passed to seek wasn't found. That means the first
1947                    // attribute is available and we can just stop right away.
1948                    None => break,
1949                    Some(ItemRef {
1950                        key: ObjectKey { object_id, data: ObjectKeyData::Attribute(attr_id, _) },
1951                        value,
1952                        ..
1953                    }) if *object_id == self.object_id() => {
1954                        if matches!(value, ObjectValue::None) {
1955                            // This attribute was once used but is now deleted, so it's safe to use
1956                            // again.
1957                            break;
1958                        }
1959                        if attribute_id < *attr_id {
1960                            // We found a gap - use it.
1961                            break;
1962                        } else if attribute_id == *attr_id {
1963                            // This attribute id is in use, try the next one.
1964                            attribute_id += 1;
1965                            if attribute_id == EXTENDED_ATTRIBUTE_RANGE_END {
1966                                bail!(FxfsError::NoSpace);
1967                            }
1968                        }
1969                        // If we don't hit either of those cases, we are still moving through the
1970                        // extent keys for the current attribute, so just keep advancing until the
1971                        // attribute id changes.
1972                    }
1973                    // As we are working our way through the iterator, if we hit anything that
1974                    // doesn't have our object id or attribute key data, we've gone past the end of
1975                    // this section and can stop.
1976                    _ => break,
1977                }
1978                iter.advance().await?;
1979            }
1980
1981            // We know this won't need trimming because it's a new attribute.
1982            let _ = self.write_attr(transaction, attribute_id, &value).await?;
1983            transaction.add(
1984                self.store().store_object_id(),
1985                Mutation::replace_or_insert_object(
1986                    object_key,
1987                    ObjectValue::extended_attribute(attribute_id),
1988                ),
1989            );
1990        }
1991
1992        Ok(())
1993    }
1994
1995    pub async fn remove_extended_attribute(&self, name: Vec<u8>) -> Result<(), Error> {
1996        let store = self.store();
1997        let tree = store.tree();
1998        let object_key = ObjectKey::extended_attribute(self.object_id(), name);
1999
2000        // NB: The API says we have to return an error if the attribute doesn't exist, so we have
2001        // to look it up first to make sure we have a record of it before we delete it. Make sure
2002        // we take a lock and make a transaction before we do so we don't race with other
2003        // operations.
2004        let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
2005        let mut transaction = store.filesystem().new_transaction(keys, Options::default()).await?;
2006
2007        let attribute_to_delete =
2008            match tree.find(&object_key).await?.ok_or(FxfsError::NotFound)?.value {
2009                ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => Some(id),
2010                ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
2011                _ => {
2012                    bail!(
2013                        anyhow!(FxfsError::Inconsistent)
2014                            .context("remove_extended_attribute: Expected ExtendedAttribute value")
2015                    )
2016                }
2017            };
2018
2019        transaction.add(
2020            store.store_object_id(),
2021            Mutation::replace_or_insert_object(object_key, ObjectValue::None),
2022        );
2023
2024        // If the attribute wasn't stored inline, we need to deallocate all the extents too. This
2025        // would normally need to interact with the graveyard for correctness - if there are too
2026        // many extents to delete to fit in a single transaction then we could potentially have
2027        // consistency issues. However, the maximum size of an extended attribute is small enough
2028        // that it will never come close to that limit even in the worst case, so we just delete
2029        // everything in one shot.
2030        if let Some(attribute_id) = attribute_to_delete {
2031            let trim_result = store
2032                .trim_some(
2033                    &mut transaction,
2034                    self.object_id(),
2035                    attribute_id,
2036                    TrimMode::FromOffset(0),
2037                )
2038                .await?;
2039            // In case you didn't read the comment above - this should not be used to delete
2040            // arbitrary attributes!
2041            assert_matches!(trim_result, TrimResult::Done(_));
2042            transaction.add(
2043                store.store_object_id(),
2044                Mutation::replace_or_insert_object(
2045                    ObjectKey::attribute(self.object_id, attribute_id, AttributeKey::Attribute),
2046                    ObjectValue::None,
2047                ),
2048            );
2049        }
2050
2051        transaction.commit().await?;
2052        Ok(())
2053    }
2054
2055    /// Returns a future that will pre-fetches the keys so as to avoid paying the performance
2056    /// penalty later. Must ensure that the object is not removed before the future completes.
2057    pub fn pre_fetch_keys(&self) -> Option<impl Future<Output = ()> + use<S>> {
2058        if let Encryption::CachedKeys = self.encryption {
2059            let owner = self.owner.clone();
2060            let object_id = self.object_id;
2061            Some(async move {
2062                let store = owner.as_ref().as_ref();
2063                if let Some(crypt) = store.crypt() {
2064                    let _ = store
2065                        .key_manager
2066                        .get_keys(
2067                            object_id,
2068                            crypt.as_ref(),
2069                            &mut Some(async || store.get_keys(object_id).await),
2070                            /* permanent= */ false,
2071                            /* force= */ false,
2072                        )
2073                        .await;
2074                }
2075            })
2076        } else {
2077            None
2078        }
2079    }
2080}
2081
2082impl<S: HandleOwner> Drop for StoreObjectHandle<S> {
2083    fn drop(&mut self) {
2084        if self.is_encrypted() {
2085            let _ = self.store().key_manager.remove(self.object_id);
2086        }
2087    }
2088}
2089
2090/// When truncating an object, sometimes it might not be possible to complete the transaction in a
2091/// single transaction, in which case the caller needs to finish trimming the object in subsequent
2092/// transactions (by calling ObjectStore::trim).
2093#[must_use]
2094pub struct NeedsTrim(pub bool);
2095
2096#[cfg(test)]
2097mod tests {
2098    use super::{ChecksumRangeChunk, OverwriteBitmaps};
2099    use crate::errors::FxfsError;
2100    use crate::filesystem::{FxFilesystem, OpenFxFilesystem};
2101    use crate::object_handle::ObjectHandle;
2102    use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
2103    use crate::object_store::transaction::{Mutation, Options, lock_keys};
2104    use crate::object_store::{
2105        AttributeKey, DataObjectHandle, Directory, FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions,
2106        LockKey, ObjectKey, ObjectStore, ObjectValue, SetExtendedAttributeMode, StoreObjectHandle,
2107    };
2108    use bit_vec::BitVec;
2109    use fuchsia_async as fasync;
2110    use futures::join;
2111    use std::sync::Arc;
2112    use storage_device::DeviceHolder;
2113    use storage_device::fake_device::FakeDevice;
2114
2115    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2116    const TEST_OBJECT_NAME: &str = "foo";
2117
2118    fn is_error(actual: anyhow::Error, expected: FxfsError) {
2119        assert_eq!(*actual.root_cause().downcast_ref::<FxfsError>().unwrap(), expected)
2120    }
2121
2122    async fn test_filesystem() -> OpenFxFilesystem {
2123        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2124        FxFilesystem::new_empty(device).await.expect("new_empty failed")
2125    }
2126
2127    async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
2128    {
2129        let fs = test_filesystem().await;
2130        let store = fs.root_store();
2131
2132        let mut transaction = fs
2133            .clone()
2134            .new_transaction(
2135                lock_keys![LockKey::object(
2136                    store.store_object_id(),
2137                    store.root_directory_object_id()
2138                )],
2139                Options::default(),
2140            )
2141            .await
2142            .expect("new_transaction failed");
2143
2144        let object =
2145            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2146                .await
2147                .expect("create_object failed");
2148
2149        let root_directory =
2150            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2151        root_directory
2152            .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
2153            .await
2154            .expect("add_child_file failed");
2155
2156        transaction.commit().await.expect("commit failed");
2157
2158        (fs, object)
2159    }
2160
2161    #[fuchsia::test(threads = 3)]
2162    async fn extended_attribute_double_remove() {
2163        // This test is intended to trip a potential race condition in remove. Removing an
2164        // attribute that doesn't exist is an error, so we need to check before we remove, but if
2165        // we aren't careful, two parallel removes might both succeed in the check and then both
2166        // remove the value.
2167        let (fs, object) = test_filesystem_and_empty_object().await;
2168        let basic = Arc::new(StoreObjectHandle::new(
2169            object.owner().clone(),
2170            object.object_id(),
2171            /* permanent_keys: */ false,
2172            HandleOptions::default(),
2173            false,
2174        ));
2175        let basic_a = basic.clone();
2176        let basic_b = basic.clone();
2177
2178        basic
2179            .set_extended_attribute(
2180                b"security.selinux".to_vec(),
2181                b"bar".to_vec(),
2182                SetExtendedAttributeMode::Set,
2183            )
2184            .await
2185            .expect("failed to set attribute");
2186
2187        // Try to remove the attribute twice at the same time. One should succeed in the race and
2188        // return Ok, and the other should fail the race and return NOT_FOUND.
2189        let a_task = fasync::Task::spawn(async move {
2190            basic_a.remove_extended_attribute(b"security.selinux".to_vec()).await
2191        });
2192        let b_task = fasync::Task::spawn(async move {
2193            basic_b.remove_extended_attribute(b"security.selinux".to_vec()).await
2194        });
2195        match join!(a_task, b_task) {
2196            (Ok(()), Ok(())) => panic!("both remove calls succeeded"),
2197            (Err(_), Err(_)) => panic!("both remove calls failed"),
2198
2199            (Ok(()), Err(e)) => is_error(e, FxfsError::NotFound),
2200            (Err(e), Ok(())) => is_error(e, FxfsError::NotFound),
2201        }
2202
2203        fs.close().await.expect("Close failed");
2204    }
2205
2206    #[fuchsia::test(threads = 3)]
2207    async fn extended_attribute_double_create() {
2208        // This test is intended to trip a potential race in set when using the create flag,
2209        // similar to above. If the create mode is set, we need to check that the attribute isn't
2210        // already created, but if two parallel creates both succeed in that check, and we aren't
2211        // careful with locking, they will both succeed and one will overwrite the other.
2212        let (fs, object) = test_filesystem_and_empty_object().await;
2213        let basic = Arc::new(StoreObjectHandle::new(
2214            object.owner().clone(),
2215            object.object_id(),
2216            /* permanent_keys: */ false,
2217            HandleOptions::default(),
2218            false,
2219        ));
2220        let basic_a = basic.clone();
2221        let basic_b = basic.clone();
2222
2223        // Try to set the attribute twice at the same time. One should succeed in the race and
2224        // return Ok, and the other should fail the race and return ALREADY_EXISTS.
2225        let a_task = fasync::Task::spawn(async move {
2226            basic_a
2227                .set_extended_attribute(
2228                    b"security.selinux".to_vec(),
2229                    b"one".to_vec(),
2230                    SetExtendedAttributeMode::Create,
2231                )
2232                .await
2233        });
2234        let b_task = fasync::Task::spawn(async move {
2235            basic_b
2236                .set_extended_attribute(
2237                    b"security.selinux".to_vec(),
2238                    b"two".to_vec(),
2239                    SetExtendedAttributeMode::Create,
2240                )
2241                .await
2242        });
2243        match join!(a_task, b_task) {
2244            (Ok(()), Ok(())) => panic!("both set calls succeeded"),
2245            (Err(_), Err(_)) => panic!("both set calls failed"),
2246
2247            (Ok(()), Err(e)) => {
2248                assert_eq!(
2249                    basic
2250                        .get_extended_attribute(b"security.selinux".to_vec())
2251                        .await
2252                        .expect("failed to get xattr"),
2253                    b"one"
2254                );
2255                is_error(e, FxfsError::AlreadyExists);
2256            }
2257            (Err(e), Ok(())) => {
2258                assert_eq!(
2259                    basic
2260                        .get_extended_attribute(b"security.selinux".to_vec())
2261                        .await
2262                        .expect("failed to get xattr"),
2263                    b"two"
2264                );
2265                is_error(e, FxfsError::AlreadyExists);
2266            }
2267        }
2268
2269        fs.close().await.expect("Close failed");
2270    }
2271
2272    struct TestAttr {
2273        name: Vec<u8>,
2274        value: Vec<u8>,
2275    }
2276
2277    impl TestAttr {
2278        fn new(name: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> Self {
2279            Self { name: name.as_ref().to_vec(), value: value.as_ref().to_vec() }
2280        }
2281        fn name(&self) -> Vec<u8> {
2282            self.name.clone()
2283        }
2284        fn value(&self) -> Vec<u8> {
2285            self.value.clone()
2286        }
2287    }
2288
2289    #[fuchsia::test]
2290    async fn extended_attributes() {
2291        let (fs, object) = test_filesystem_and_empty_object().await;
2292
2293        let test_attr = TestAttr::new(b"security.selinux", b"foo");
2294
2295        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2296        is_error(
2297            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2298            FxfsError::NotFound,
2299        );
2300
2301        object
2302            .set_extended_attribute(
2303                test_attr.name(),
2304                test_attr.value(),
2305                SetExtendedAttributeMode::Set,
2306            )
2307            .await
2308            .unwrap();
2309        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2310        assert_eq!(
2311            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2312            test_attr.value()
2313        );
2314
2315        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2316        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2317        is_error(
2318            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2319            FxfsError::NotFound,
2320        );
2321
2322        // Make sure we can object the same attribute being set again.
2323        object
2324            .set_extended_attribute(
2325                test_attr.name(),
2326                test_attr.value(),
2327                SetExtendedAttributeMode::Set,
2328            )
2329            .await
2330            .unwrap();
2331        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2332        assert_eq!(
2333            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2334            test_attr.value()
2335        );
2336
2337        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2338        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2339        is_error(
2340            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2341            FxfsError::NotFound,
2342        );
2343
2344        fs.close().await.expect("close failed");
2345    }
2346
2347    #[fuchsia::test]
2348    async fn large_extended_attribute() {
2349        let (fs, object) = test_filesystem_and_empty_object().await;
2350
2351        let test_attr = TestAttr::new(b"security.selinux", vec![3u8; 300]);
2352
2353        object
2354            .set_extended_attribute(
2355                test_attr.name(),
2356                test_attr.value(),
2357                SetExtendedAttributeMode::Set,
2358            )
2359            .await
2360            .unwrap();
2361        assert_eq!(
2362            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2363            test_attr.value()
2364        );
2365
2366        // Probe the fxfs attributes to make sure it did the expected thing. This relies on inside
2367        // knowledge of how the attribute id is chosen.
2368        assert_eq!(
2369            object
2370                .read_attr(64)
2371                .await
2372                .expect("read_attr failed")
2373                .expect("read_attr returned none")
2374                .into_vec(),
2375            test_attr.value()
2376        );
2377
2378        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2379        is_error(
2380            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2381            FxfsError::NotFound,
2382        );
2383
2384        // Make sure we can object the same attribute being set again.
2385        object
2386            .set_extended_attribute(
2387                test_attr.name(),
2388                test_attr.value(),
2389                SetExtendedAttributeMode::Set,
2390            )
2391            .await
2392            .unwrap();
2393        assert_eq!(
2394            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2395            test_attr.value()
2396        );
2397        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2398        is_error(
2399            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2400            FxfsError::NotFound,
2401        );
2402
2403        fs.close().await.expect("close failed");
2404    }
2405
2406    #[fuchsia::test]
2407    async fn multiple_extended_attributes() {
2408        let (fs, object) = test_filesystem_and_empty_object().await;
2409
2410        let attrs = [
2411            TestAttr::new(b"security.selinux", b"foo"),
2412            TestAttr::new(b"large.attribute", vec![3u8; 300]),
2413            TestAttr::new(b"an.attribute", b"asdf"),
2414            TestAttr::new(b"user.big", vec![5u8; 288]),
2415            TestAttr::new(b"user.tiny", b"smol"),
2416            TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2417            TestAttr::new(b"also big", vec![7u8; 500]),
2418            TestAttr::new(b"all.ones", vec![1u8; 11111]),
2419        ];
2420
2421        for i in 0..attrs.len() {
2422            object
2423                .set_extended_attribute(
2424                    attrs[i].name(),
2425                    attrs[i].value(),
2426                    SetExtendedAttributeMode::Set,
2427                )
2428                .await
2429                .unwrap();
2430            assert_eq!(
2431                object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2432                attrs[i].value()
2433            );
2434        }
2435
2436        for i in 0..attrs.len() {
2437            // Make sure expected attributes are still available.
2438            let mut found_attrs = object.list_extended_attributes().await.unwrap();
2439            let mut expected_attrs: Vec<Vec<u8>> = attrs.iter().skip(i).map(|a| a.name()).collect();
2440            found_attrs.sort();
2441            expected_attrs.sort();
2442            assert_eq!(found_attrs, expected_attrs);
2443            for j in i..attrs.len() {
2444                assert_eq!(
2445                    object.get_extended_attribute(attrs[j].name()).await.unwrap(),
2446                    attrs[j].value()
2447                );
2448            }
2449
2450            object.remove_extended_attribute(attrs[i].name()).await.expect("failed to remove");
2451            is_error(
2452                object.get_extended_attribute(attrs[i].name()).await.unwrap_err(),
2453                FxfsError::NotFound,
2454            );
2455        }
2456
2457        fs.close().await.expect("close failed");
2458    }
2459
2460    #[fuchsia::test]
2461    async fn multiple_extended_attributes_delete() {
2462        let (fs, object) = test_filesystem_and_empty_object().await;
2463        let store = object.owner().clone();
2464
2465        let attrs = [
2466            TestAttr::new(b"security.selinux", b"foo"),
2467            TestAttr::new(b"large.attribute", vec![3u8; 300]),
2468            TestAttr::new(b"an.attribute", b"asdf"),
2469            TestAttr::new(b"user.big", vec![5u8; 288]),
2470            TestAttr::new(b"user.tiny", b"smol"),
2471            TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2472            TestAttr::new(b"also big", vec![7u8; 500]),
2473            TestAttr::new(b"all.ones", vec![1u8; 11111]),
2474        ];
2475
2476        for i in 0..attrs.len() {
2477            object
2478                .set_extended_attribute(
2479                    attrs[i].name(),
2480                    attrs[i].value(),
2481                    SetExtendedAttributeMode::Set,
2482                )
2483                .await
2484                .unwrap();
2485            assert_eq!(
2486                object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2487                attrs[i].value()
2488            );
2489        }
2490
2491        // Unlink the file
2492        let root_directory =
2493            Directory::open(object.owner(), object.store().root_directory_object_id())
2494                .await
2495                .expect("open failed");
2496        let mut transaction = fs
2497            .clone()
2498            .new_transaction(
2499                lock_keys![
2500                    LockKey::object(store.store_object_id(), store.root_directory_object_id()),
2501                    LockKey::object(store.store_object_id(), object.object_id()),
2502                ],
2503                Options::default(),
2504            )
2505            .await
2506            .expect("new_transaction failed");
2507        crate::object_store::directory::replace_child(
2508            &mut transaction,
2509            None,
2510            (&root_directory, TEST_OBJECT_NAME),
2511        )
2512        .await
2513        .expect("replace_child failed");
2514        transaction.commit().await.unwrap();
2515        store.tombstone_object(object.object_id(), Options::default()).await.unwrap();
2516
2517        crate::fsck::fsck(fs.clone()).await.unwrap();
2518
2519        fs.close().await.expect("close failed");
2520    }
2521
2522    #[fuchsia::test]
2523    async fn extended_attribute_changing_sizes() {
2524        let (fs, object) = test_filesystem_and_empty_object().await;
2525
2526        let test_name = b"security.selinux";
2527        let test_small_attr = TestAttr::new(test_name, b"smol");
2528        let test_large_attr = TestAttr::new(test_name, vec![3u8; 300]);
2529
2530        object
2531            .set_extended_attribute(
2532                test_small_attr.name(),
2533                test_small_attr.value(),
2534                SetExtendedAttributeMode::Set,
2535            )
2536            .await
2537            .unwrap();
2538        assert_eq!(
2539            object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2540            test_small_attr.value()
2541        );
2542
2543        // With a small attribute, we don't expect it to write to an fxfs attribute.
2544        assert!(object.read_attr(64).await.expect("read_attr failed").is_none());
2545
2546        crate::fsck::fsck(fs.clone()).await.unwrap();
2547
2548        object
2549            .set_extended_attribute(
2550                test_large_attr.name(),
2551                test_large_attr.value(),
2552                SetExtendedAttributeMode::Set,
2553            )
2554            .await
2555            .unwrap();
2556        assert_eq!(
2557            object.get_extended_attribute(test_large_attr.name()).await.unwrap(),
2558            test_large_attr.value()
2559        );
2560
2561        // Once the value is above the threshold, we expect it to get upgraded to an fxfs
2562        // attribute.
2563        assert_eq!(
2564            object
2565                .read_attr(64)
2566                .await
2567                .expect("read_attr failed")
2568                .expect("read_attr returned none")
2569                .into_vec(),
2570            test_large_attr.value()
2571        );
2572
2573        crate::fsck::fsck(fs.clone()).await.unwrap();
2574
2575        object
2576            .set_extended_attribute(
2577                test_small_attr.name(),
2578                test_small_attr.value(),
2579                SetExtendedAttributeMode::Set,
2580            )
2581            .await
2582            .unwrap();
2583        assert_eq!(
2584            object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2585            test_small_attr.value()
2586        );
2587
2588        // Even though we are back under the threshold, we still expect it to be stored in an fxfs
2589        // attribute, because we don't downgrade to inline once we've allocated one.
2590        assert_eq!(
2591            object
2592                .read_attr(64)
2593                .await
2594                .expect("read_attr failed")
2595                .expect("read_attr returned none")
2596                .into_vec(),
2597            test_small_attr.value()
2598        );
2599
2600        crate::fsck::fsck(fs.clone()).await.unwrap();
2601
2602        object.remove_extended_attribute(test_small_attr.name()).await.expect("failed to remove");
2603
2604        crate::fsck::fsck(fs.clone()).await.unwrap();
2605
2606        fs.close().await.expect("close failed");
2607    }
2608
2609    #[fuchsia::test]
2610    async fn extended_attribute_max_size() {
2611        let (fs, object) = test_filesystem_and_empty_object().await;
2612
2613        let test_attr = TestAttr::new(
2614            vec![3u8; super::MAX_XATTR_NAME_SIZE],
2615            vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2616        );
2617
2618        object
2619            .set_extended_attribute(
2620                test_attr.name(),
2621                test_attr.value(),
2622                SetExtendedAttributeMode::Set,
2623            )
2624            .await
2625            .unwrap();
2626        assert_eq!(
2627            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2628            test_attr.value()
2629        );
2630        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2631        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2632
2633        fs.close().await.expect("close failed");
2634    }
2635
2636    #[fuchsia::test]
2637    async fn extended_attribute_remove_then_create() {
2638        let (fs, object) = test_filesystem_and_empty_object().await;
2639
2640        let test_attr = TestAttr::new(
2641            vec![3u8; super::MAX_XATTR_NAME_SIZE],
2642            vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2643        );
2644
2645        object
2646            .set_extended_attribute(
2647                test_attr.name(),
2648                test_attr.value(),
2649                SetExtendedAttributeMode::Create,
2650            )
2651            .await
2652            .unwrap();
2653        fs.journal().compact().await.unwrap();
2654        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2655        object
2656            .set_extended_attribute(
2657                test_attr.name(),
2658                test_attr.value(),
2659                SetExtendedAttributeMode::Create,
2660            )
2661            .await
2662            .unwrap();
2663
2664        assert_eq!(
2665            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2666            test_attr.value()
2667        );
2668
2669        fs.close().await.expect("close failed");
2670    }
2671
2672    #[fuchsia::test]
2673    async fn large_extended_attribute_max_number() {
2674        let (fs, object) = test_filesystem_and_empty_object().await;
2675
2676        let max_xattrs =
2677            super::EXTENDED_ATTRIBUTE_RANGE_END - super::EXTENDED_ATTRIBUTE_RANGE_START;
2678        for i in 0..max_xattrs {
2679            let test_attr = TestAttr::new(format!("{}", i).as_bytes(), vec![0x3; 300]);
2680            object
2681                .set_extended_attribute(
2682                    test_attr.name(),
2683                    test_attr.value(),
2684                    SetExtendedAttributeMode::Set,
2685                )
2686                .await
2687                .unwrap_or_else(|_| panic!("failed to set xattr number {}", i));
2688        }
2689
2690        // That should have taken up all the attributes we've allocated to extended attributes, so
2691        // this one should return ERR_NO_SPACE.
2692        match object
2693            .set_extended_attribute(
2694                b"one.too.many".to_vec(),
2695                vec![0x3; 300],
2696                SetExtendedAttributeMode::Set,
2697            )
2698            .await
2699        {
2700            Ok(()) => panic!("set should not succeed"),
2701            Err(e) => is_error(e, FxfsError::NoSpace),
2702        }
2703
2704        // But inline attributes don't need an attribute number, so it should work fine.
2705        object
2706            .set_extended_attribute(
2707                b"this.is.okay".to_vec(),
2708                b"small value".to_vec(),
2709                SetExtendedAttributeMode::Set,
2710            )
2711            .await
2712            .unwrap();
2713
2714        // And updating existing ones should be okay.
2715        object
2716            .set_extended_attribute(b"11".to_vec(), vec![0x4; 300], SetExtendedAttributeMode::Set)
2717            .await
2718            .unwrap();
2719        object
2720            .set_extended_attribute(
2721                b"12".to_vec(),
2722                vec![0x1; 300],
2723                SetExtendedAttributeMode::Replace,
2724            )
2725            .await
2726            .unwrap();
2727
2728        // And we should be able to remove an attribute and set another one.
2729        object.remove_extended_attribute(b"5".to_vec()).await.unwrap();
2730        object
2731            .set_extended_attribute(
2732                b"new attr".to_vec(),
2733                vec![0x3; 300],
2734                SetExtendedAttributeMode::Set,
2735            )
2736            .await
2737            .unwrap();
2738
2739        fs.close().await.expect("close failed");
2740    }
2741
2742    #[fuchsia::test]
2743    async fn write_attr_trims_beyond_new_end() {
2744        // When writing, multi_write will deallocate old extents that overlap with the new data,
2745        // but it doesn't trim anything beyond that, since it doesn't know what the total size will
2746        // be. write_attr does know, because it writes the whole attribute at once, so we need to
2747        // make sure it cleans up properly.
2748        let (fs, object) = test_filesystem_and_empty_object().await;
2749
2750        let block_size = fs.block_size();
2751        let buf_size = block_size * 2;
2752        let attribute_id = 10;
2753
2754        let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2755        let mut buffer = object.allocate_buffer(buf_size as usize).await;
2756        buffer.as_mut_slice().fill(3);
2757        // Writing two separate ranges, even if they are contiguous, forces them to be separate
2758        // extent records.
2759        object
2760            .multi_write(
2761                &mut transaction,
2762                attribute_id,
2763                &[0..block_size, block_size..block_size * 2],
2764                buffer.as_mut(),
2765            )
2766            .await
2767            .unwrap();
2768        transaction.add(
2769            object.store().store_object_id,
2770            Mutation::replace_or_insert_object(
2771                ObjectKey::attribute(object.object_id(), attribute_id, AttributeKey::Attribute),
2772                ObjectValue::attribute(block_size * 2, false),
2773            ),
2774        );
2775        transaction.commit().await.unwrap();
2776
2777        crate::fsck::fsck(fs.clone()).await.unwrap();
2778
2779        let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2780        let needs_trim = (*object)
2781            .write_attr(&mut transaction, attribute_id, &vec![3u8; block_size as usize])
2782            .await
2783            .unwrap();
2784        assert!(!needs_trim.0);
2785        transaction.commit().await.unwrap();
2786
2787        crate::fsck::fsck(fs.clone()).await.unwrap();
2788
2789        fs.close().await.expect("close failed");
2790    }
2791
2792    #[fuchsia::test]
2793    async fn write_new_attr_in_batches_multiple_txns() {
2794        let (fs, object) = test_filesystem_and_empty_object().await;
2795        let merkle_tree = vec![1; 3 * WRITE_ATTR_BATCH_SIZE];
2796        let mut transaction =
2797            (*object).new_transaction(FSVERITY_MERKLE_ATTRIBUTE_ID).await.unwrap();
2798        object
2799            .write_new_attr_in_batches(
2800                &mut transaction,
2801                FSVERITY_MERKLE_ATTRIBUTE_ID,
2802                &merkle_tree,
2803                WRITE_ATTR_BATCH_SIZE,
2804            )
2805            .await
2806            .expect("failed to write merkle attribute");
2807
2808        transaction.add(
2809            object.store().store_object_id,
2810            Mutation::replace_or_insert_object(
2811                ObjectKey::graveyard_attribute_entry(
2812                    object.store().graveyard_directory_object_id(),
2813                    object.object_id(),
2814                    FSVERITY_MERKLE_ATTRIBUTE_ID,
2815                ),
2816                ObjectValue::None,
2817            ),
2818        );
2819        transaction.commit().await.unwrap();
2820        assert_eq!(
2821            object.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
2822            Some(merkle_tree.into())
2823        );
2824
2825        fs.close().await.expect("close failed");
2826    }
2827
2828    // Running on target only, to use fake time features in the executor.
2829    #[cfg(target_os = "fuchsia")]
2830    #[fuchsia::test(allow_stalls = false)]
2831    async fn test_watchdog() {
2832        use super::Watchdog;
2833        use fuchsia_async::{MonotonicDuration, MonotonicInstant, TestExecutor};
2834        use std::sync::mpsc::channel;
2835
2836        TestExecutor::advance_to(make_time(0)).await;
2837        let (sender, receiver) = channel();
2838
2839        fn make_time(time_secs: i64) -> MonotonicInstant {
2840            MonotonicInstant::from_nanos(0) + MonotonicDuration::from_seconds(time_secs)
2841        }
2842
2843        {
2844            let _watchdog = Watchdog::new(10, move |count| {
2845                sender.send(count).expect("Sending value");
2846            });
2847
2848            // Too early.
2849            TestExecutor::advance_to(make_time(5)).await;
2850            receiver.try_recv().expect_err("Should not have message");
2851
2852            // First message.
2853            TestExecutor::advance_to(make_time(10)).await;
2854            assert_eq!(1, receiver.recv().expect("Receiving"));
2855
2856            // Too early for the next.
2857            TestExecutor::advance_to(make_time(15)).await;
2858            receiver.try_recv().expect_err("Should not have message");
2859
2860            // Missed one. They'll be spooled up.
2861            TestExecutor::advance_to(make_time(30)).await;
2862            assert_eq!(2, receiver.recv().expect("Receiving"));
2863            assert_eq!(3, receiver.recv().expect("Receiving"));
2864        }
2865
2866        // Watchdog is dropped, nothing should trigger.
2867        TestExecutor::advance_to(make_time(100)).await;
2868        receiver.recv().expect_err("Watchdog should be gone");
2869    }
2870
2871    #[fuchsia::test]
2872    fn test_checksum_range_chunk() {
2873        let block_size = 4096;
2874
2875        // No bitmap means one chunk that covers the whole range
2876        assert_eq!(
2877            ChecksumRangeChunk::group_first_write_ranges(
2878                &mut OverwriteBitmaps::None,
2879                block_size,
2880                block_size * 2..block_size * 5,
2881            ),
2882            vec![ChecksumRangeChunk {
2883                checksum_range: 0..3,
2884                device_range: block_size * 2..block_size * 5,
2885                is_first_write: false,
2886            }],
2887        );
2888
2889        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2890        assert_eq!(
2891            ChecksumRangeChunk::group_first_write_ranges(
2892                &mut bitmaps,
2893                block_size,
2894                block_size * 2..block_size * 5,
2895            ),
2896            vec![ChecksumRangeChunk {
2897                checksum_range: 0..3,
2898                device_range: block_size * 2..block_size * 5,
2899                is_first_write: false,
2900            }],
2901        );
2902        assert_eq!(
2903            bitmaps.take_bitmaps(),
2904            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b11100000])))
2905        );
2906
2907        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2908        bitmaps.set_offset(2);
2909        assert_eq!(
2910            ChecksumRangeChunk::group_first_write_ranges(
2911                &mut bitmaps,
2912                block_size,
2913                block_size * 2..block_size * 5,
2914            ),
2915            vec![
2916                ChecksumRangeChunk {
2917                    checksum_range: 0..2,
2918                    device_range: block_size * 2..block_size * 4,
2919                    is_first_write: false,
2920                },
2921                ChecksumRangeChunk {
2922                    checksum_range: 2..3,
2923                    device_range: block_size * 4..block_size * 5,
2924                    is_first_write: true,
2925                },
2926            ],
2927        );
2928        assert_eq!(
2929            bitmaps.take_bitmaps(),
2930            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00111000])))
2931        );
2932
2933        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2934        bitmaps.set_offset(4);
2935        assert_eq!(
2936            ChecksumRangeChunk::group_first_write_ranges(
2937                &mut bitmaps,
2938                block_size,
2939                block_size * 2..block_size * 5,
2940            ),
2941            vec![ChecksumRangeChunk {
2942                checksum_range: 0..3,
2943                device_range: block_size * 2..block_size * 5,
2944                is_first_write: true,
2945            }],
2946        );
2947        assert_eq!(
2948            bitmaps.take_bitmaps(),
2949            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00001110])))
2950        );
2951
2952        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b01010101]));
2953        assert_eq!(
2954            ChecksumRangeChunk::group_first_write_ranges(
2955                &mut bitmaps,
2956                block_size,
2957                block_size * 2..block_size * 10,
2958            ),
2959            vec![
2960                ChecksumRangeChunk {
2961                    checksum_range: 0..1,
2962                    device_range: block_size * 2..block_size * 3,
2963                    is_first_write: true,
2964                },
2965                ChecksumRangeChunk {
2966                    checksum_range: 1..2,
2967                    device_range: block_size * 3..block_size * 4,
2968                    is_first_write: false,
2969                },
2970                ChecksumRangeChunk {
2971                    checksum_range: 2..3,
2972                    device_range: block_size * 4..block_size * 5,
2973                    is_first_write: true,
2974                },
2975                ChecksumRangeChunk {
2976                    checksum_range: 3..4,
2977                    device_range: block_size * 5..block_size * 6,
2978                    is_first_write: false,
2979                },
2980                ChecksumRangeChunk {
2981                    checksum_range: 4..5,
2982                    device_range: block_size * 6..block_size * 7,
2983                    is_first_write: true,
2984                },
2985                ChecksumRangeChunk {
2986                    checksum_range: 5..6,
2987                    device_range: block_size * 7..block_size * 8,
2988                    is_first_write: false,
2989                },
2990                ChecksumRangeChunk {
2991                    checksum_range: 6..7,
2992                    device_range: block_size * 8..block_size * 9,
2993                    is_first_write: true,
2994                },
2995                ChecksumRangeChunk {
2996                    checksum_range: 7..8,
2997                    device_range: block_size * 9..block_size * 10,
2998                    is_first_write: false,
2999                },
3000            ],
3001        );
3002        assert_eq!(
3003            bitmaps.take_bitmaps(),
3004            Some((BitVec::from_bytes(&[0b01010101]), BitVec::from_bytes(&[0b11111111])))
3005        );
3006    }
3007}