fxfs/object_store/
store_object_handle.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::{Checksum, Checksums, fletcher64};
6use crate::errors::FxfsError;
7use crate::log::*;
8use crate::lsm_tree::Query;
9use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
10use crate::object_handle::ObjectHandle;
11use crate::object_store::extent_record::{ExtentKey, ExtentMode, ExtentValue};
12use crate::object_store::object_manager::ObjectManager;
13use crate::object_store::object_record::{
14    AttributeKey, ExtendedAttributeValue, ObjectAttributes, ObjectItem, ObjectKey, ObjectKeyData,
15    ObjectValue, Timestamp,
16};
17use crate::object_store::transaction::{
18    AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Options, ReadGuard,
19    Transaction, lock_keys,
20};
21use crate::object_store::{
22    HandleOptions, HandleOwner, ObjectStore, TrimMode, TrimResult, VOLUME_DATA_KEY_ID,
23};
24use crate::range::RangeExt;
25use crate::round::{round_down, round_up};
26use anyhow::{Context, Error, anyhow, bail, ensure};
27use assert_matches::assert_matches;
28use bit_vec::BitVec;
29use futures::stream::{FuturesOrdered, FuturesUnordered};
30use futures::{TryStreamExt, try_join};
31use fxfs_crypto::{
32    Cipher, CipherHolder, CipherSet, EncryptionKey, FindKeyResult, FxfsCipher, KeyPurpose,
33};
34use fxfs_trace::trace;
35use static_assertions::const_assert;
36use std::cmp::min;
37use std::future::Future;
38use std::ops::Range;
39use std::sync::Arc;
40use std::sync::atomic::{self, AtomicBool, Ordering};
41use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
42use storage_device::{InlineCryptoOptions, ReadOptions, WriteOptions};
43
44use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
45
46/// Maximum size for an extended attribute name.
47pub const MAX_XATTR_NAME_SIZE: usize = 255;
48/// Maximum size an extended attribute can be before it's stored in an object attribute instead of
49/// inside the record directly.
50pub const MAX_INLINE_XATTR_SIZE: usize = 256;
51/// Maximum size for an extended attribute value. NB: the maximum size for an extended attribute is
52/// 64kB, which we rely on for correctness when deleting attributes, so ensure it's always
53/// enforced.
54pub const MAX_XATTR_VALUE_SIZE: usize = 64000;
55/// The range of fxfs attribute ids which are reserved for extended attribute values. Whenever a
56/// new attribute is needed, the first unused id will be chosen from this range. It's technically
57/// safe to change these values, but it has potential consequences - they are only used during id
58/// selection, so any existing extended attributes keep their ids, which means any past or present
59/// selected range here could potentially have used attributes unless they are explicitly migrated,
60/// which isn't currently done.
61pub const EXTENDED_ATTRIBUTE_RANGE_START: u64 = 64;
62pub const EXTENDED_ATTRIBUTE_RANGE_END: u64 = 512;
63
64/// Zeroes blocks in 'buffer' based on `bitmap`, one bit per block from start of buffer.
65fn apply_bitmap_zeroing(
66    block_size: usize,
67    bitmap: &bit_vec::BitVec,
68    mut buffer: MutableBufferRef<'_>,
69) {
70    let buf = buffer.as_mut_slice();
71    debug_assert_eq!(bitmap.len() * block_size, buf.len());
72    for (i, block) in bitmap.iter().enumerate() {
73        if !block {
74            let start = i * block_size;
75            buf[start..start + block_size].fill(0);
76        }
77    }
78}
79
80/// When writing, often the logic should be generic over whether or not checksums are generated.
81/// This provides that and a handy way to convert to the more general ExtentMode that eventually
82/// stores it on disk.
83#[derive(Debug, Clone, PartialEq)]
84pub enum MaybeChecksums {
85    None,
86    Fletcher(Vec<Checksum>),
87}
88
89impl MaybeChecksums {
90    pub fn maybe_as_ref(&self) -> Option<&[Checksum]> {
91        match self {
92            Self::None => None,
93            Self::Fletcher(sums) => Some(&sums),
94        }
95    }
96
97    pub fn split_off(&mut self, at: usize) -> Self {
98        match self {
99            Self::None => Self::None,
100            Self::Fletcher(sums) => Self::Fletcher(sums.split_off(at)),
101        }
102    }
103
104    pub fn to_mode(self) -> ExtentMode {
105        match self {
106            Self::None => ExtentMode::Raw,
107            Self::Fletcher(sums) => ExtentMode::Cow(Checksums::fletcher(sums)),
108        }
109    }
110
111    pub fn into_option(self) -> Option<Vec<Checksum>> {
112        match self {
113            Self::None => None,
114            Self::Fletcher(sums) => Some(sums),
115        }
116    }
117}
118
119/// The mode of operation when setting extended attributes. This is the same as the fidl definition
120/// but is replicated here so we don't have fuchsia.io structures in the api, so this can be used
121/// on host.
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123pub enum SetExtendedAttributeMode {
124    /// Create the extended attribute if it doesn't exist, replace the value if it does.
125    Set,
126    /// Create the extended attribute if it doesn't exist, fail if it does.
127    Create,
128    /// Replace the extended attribute value if it exists, fail if it doesn't.
129    Replace,
130}
131
132impl From<fio::SetExtendedAttributeMode> for SetExtendedAttributeMode {
133    fn from(other: fio::SetExtendedAttributeMode) -> SetExtendedAttributeMode {
134        match other {
135            fio::SetExtendedAttributeMode::Set => SetExtendedAttributeMode::Set,
136            fio::SetExtendedAttributeMode::Create => SetExtendedAttributeMode::Create,
137            fio::SetExtendedAttributeMode::Replace => SetExtendedAttributeMode::Replace,
138        }
139    }
140}
141
142enum Encryption {
143    /// The object doesn't use encryption.
144    None,
145
146    /// The object has keys that are cached (which means unwrapping occurs on-demand) with
147    /// KeyManager.
148    CachedKeys,
149
150    /// The object has permanent keys registered with KeyManager.
151    PermanentKeys,
152}
153
154#[derive(PartialEq, Debug)]
155enum OverwriteBitmaps {
156    None,
157    Some {
158        /// The block bitmap of a partial overwrite extent in the tree.
159        extent_bitmap: BitVec,
160        /// A bitmap of the blocks written to by the current overwrite.
161        write_bitmap: BitVec,
162        /// BitVec doesn't have a slice equivalent, so for a particular section of the write we
163        /// keep track of an offset in the bitmaps to operate on.
164        bitmap_offset: usize,
165    },
166}
167
168impl OverwriteBitmaps {
169    fn new(extent_bitmap: BitVec) -> Self {
170        OverwriteBitmaps::Some {
171            write_bitmap: BitVec::from_elem(extent_bitmap.len(), false),
172            extent_bitmap,
173            bitmap_offset: 0,
174        }
175    }
176
177    fn is_none(&self) -> bool {
178        *self == OverwriteBitmaps::None
179    }
180
181    fn set_offset(&mut self, new_offset: usize) {
182        match self {
183            OverwriteBitmaps::None => (),
184            OverwriteBitmaps::Some { bitmap_offset, .. } => *bitmap_offset = new_offset,
185        }
186    }
187
188    fn get_from_extent_bitmap(&self, i: usize) -> Option<bool> {
189        match self {
190            OverwriteBitmaps::None => None,
191            OverwriteBitmaps::Some { extent_bitmap, bitmap_offset, .. } => {
192                extent_bitmap.get(*bitmap_offset + i)
193            }
194        }
195    }
196
197    fn set_in_write_bitmap(&mut self, i: usize, x: bool) {
198        match self {
199            OverwriteBitmaps::None => (),
200            OverwriteBitmaps::Some { write_bitmap, bitmap_offset, .. } => {
201                write_bitmap.set(*bitmap_offset + i, x)
202            }
203        }
204    }
205
206    fn take_bitmaps(self) -> Option<(BitVec, BitVec)> {
207        match self {
208            OverwriteBitmaps::None => None,
209            OverwriteBitmaps::Some { extent_bitmap, write_bitmap, .. } => {
210                Some((extent_bitmap, write_bitmap))
211            }
212        }
213    }
214}
215
216/// When writing to Overwrite ranges, we need to emit whether a set of checksums for a device range
217/// is the first write to that region or not. This tracks one such range so we can use it after the
218/// write to break up the returned checksum list.
219#[derive(PartialEq, Debug)]
220struct ChecksumRangeChunk {
221    checksum_range: Range<usize>,
222    device_range: Range<u64>,
223    is_first_write: bool,
224}
225
226impl ChecksumRangeChunk {
227    fn group_first_write_ranges(
228        bitmaps: &mut OverwriteBitmaps,
229        block_size: u64,
230        write_device_range: Range<u64>,
231    ) -> Vec<ChecksumRangeChunk> {
232        let write_block_len = (write_device_range.length().unwrap() / block_size) as usize;
233        if bitmaps.is_none() {
234            // If there is no bitmap, then the overwrite range is fully written to. However, we
235            // could still be within the journal flush window where one of the blocks was written
236            // to for the first time to put it in this state, so we still need to emit the
237            // checksums in case replay needs them.
238            vec![ChecksumRangeChunk {
239                checksum_range: 0..write_block_len,
240                device_range: write_device_range,
241                is_first_write: false,
242            }]
243        } else {
244            let mut checksum_ranges = vec![ChecksumRangeChunk {
245                checksum_range: 0..0,
246                device_range: write_device_range.start..write_device_range.start,
247                is_first_write: !bitmaps.get_from_extent_bitmap(0).unwrap(),
248            }];
249            let mut working_range = checksum_ranges.last_mut().unwrap();
250            for i in 0..write_block_len {
251                bitmaps.set_in_write_bitmap(i, true);
252
253                // bitmap.get returning true means the block is initialized and therefore has been
254                // written to before.
255                if working_range.is_first_write != bitmaps.get_from_extent_bitmap(i).unwrap() {
256                    // is_first_write is tracking opposite of what comes back from the bitmap, so
257                    // if the are still opposites we continue our current range.
258                    working_range.checksum_range.end += 1;
259                    working_range.device_range.end += block_size;
260                } else {
261                    // If they are the same, then we need to make a new chunk.
262                    let new_chunk = ChecksumRangeChunk {
263                        checksum_range: working_range.checksum_range.end
264                            ..working_range.checksum_range.end + 1,
265                        device_range: working_range.device_range.end
266                            ..working_range.device_range.end + block_size,
267                        is_first_write: !working_range.is_first_write,
268                    };
269                    checksum_ranges.push(new_chunk);
270                    working_range = checksum_ranges.last_mut().unwrap();
271                }
272            }
273            checksum_ranges
274        }
275    }
276}
277
278/// StoreObjectHandle is the lowest-level, untyped handle to an object with the id [`object_id`] in
279/// a particular store, [`owner`]. It provides functionality shared across all objects, such as
280/// reading and writing attributes and managing encryption keys.
281///
282/// Since it's untyped, it doesn't do any object kind validation, and is generally meant to
283/// implement higher-level typed handles.
284///
285/// For file-like objects with a data attribute, DataObjectHandle implements traits and helpers for
286/// doing more complex extent management and caches the content size.
287///
288/// For directory-like objects, Directory knows how to add and remove child objects and enumerate
289/// its children.
290pub struct StoreObjectHandle<S: HandleOwner> {
291    owner: Arc<S>,
292    object_id: u64,
293    options: HandleOptions,
294    trace: AtomicBool,
295    encryption: Encryption,
296}
297
298impl<S: HandleOwner> ObjectHandle for StoreObjectHandle<S> {
299    fn set_trace(&self, v: bool) {
300        info!(store_id = self.store().store_object_id, oid = self.object_id(), trace = v; "trace");
301        self.trace.store(v, atomic::Ordering::Relaxed);
302    }
303
304    fn object_id(&self) -> u64 {
305        return self.object_id;
306    }
307
308    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
309        self.store().device.allocate_buffer(size)
310    }
311
312    fn block_size(&self) -> u64 {
313        self.store().block_size()
314    }
315}
316
317struct Watchdog {
318    _task: fasync::Task<()>,
319}
320
321impl Watchdog {
322    fn new(increment_seconds: u64, cb: impl Fn(u64) + Send + 'static) -> Self {
323        Self {
324            _task: fasync::Task::spawn(async move {
325                let increment = increment_seconds.try_into().unwrap();
326                let mut fired_counter = 0;
327                let mut next_wake = fasync::MonotonicInstant::now();
328                loop {
329                    next_wake += std::time::Duration::from_secs(increment).into();
330                    // If this isn't being scheduled this will purposely result in fast looping when
331                    // it does. This will be insightful about the state of the thread and task
332                    // scheduling.
333                    if fasync::MonotonicInstant::now() < next_wake {
334                        fasync::Timer::new(next_wake).await;
335                    }
336                    fired_counter += 1;
337                    cb(fired_counter);
338                }
339            }),
340        }
341    }
342}
343
344impl<S: HandleOwner> StoreObjectHandle<S> {
345    /// Make a new StoreObjectHandle for the object with id [`object_id`] in store [`owner`].
346    pub fn new(
347        owner: Arc<S>,
348        object_id: u64,
349        permanent_keys: bool,
350        options: HandleOptions,
351        trace: bool,
352    ) -> Self {
353        let encryption = if permanent_keys {
354            Encryption::PermanentKeys
355        } else if owner.as_ref().as_ref().is_encrypted() {
356            Encryption::CachedKeys
357        } else {
358            Encryption::None
359        };
360        Self { owner, object_id, encryption, options, trace: AtomicBool::new(trace) }
361    }
362
363    pub fn owner(&self) -> &Arc<S> {
364        &self.owner
365    }
366
367    pub fn store(&self) -> &ObjectStore {
368        self.owner.as_ref().as_ref()
369    }
370
371    pub fn trace(&self) -> bool {
372        self.trace.load(atomic::Ordering::Relaxed)
373    }
374
375    pub fn is_encrypted(&self) -> bool {
376        !matches!(self.encryption, Encryption::None)
377    }
378
379    /// Get the default set of transaction options for this object. This is mostly the overall
380    /// default, modified by any [`HandleOptions`] held by this handle.
381    pub fn default_transaction_options<'b>(&self) -> Options<'b> {
382        Options { skip_journal_checks: self.options.skip_journal_checks, ..Default::default() }
383    }
384
385    pub async fn new_transaction_with_options<'b>(
386        &self,
387        attribute_id: u64,
388        options: Options<'b>,
389    ) -> Result<Transaction<'b>, Error> {
390        Ok(self
391            .store()
392            .filesystem()
393            .new_transaction(
394                lock_keys![
395                    LockKey::object_attribute(
396                        self.store().store_object_id(),
397                        self.object_id(),
398                        attribute_id,
399                    ),
400                    LockKey::object(self.store().store_object_id(), self.object_id()),
401                ],
402                options,
403            )
404            .await?)
405    }
406
407    pub async fn new_transaction<'b>(&self, attribute_id: u64) -> Result<Transaction<'b>, Error> {
408        self.new_transaction_with_options(attribute_id, self.default_transaction_options()).await
409    }
410
411    // If |transaction| has an impending mutation for the underlying object, returns that.
412    // Otherwise, looks up the object from the tree.
413    async fn txn_get_object_mutation(
414        &self,
415        transaction: &Transaction<'_>,
416    ) -> Result<ObjectStoreMutation, Error> {
417        self.store().txn_get_object_mutation(transaction, self.object_id()).await
418    }
419
420    // Returns the amount deallocated.
421    async fn deallocate_old_extents(
422        &self,
423        transaction: &mut Transaction<'_>,
424        attribute_id: u64,
425        range: Range<u64>,
426    ) -> Result<u64, Error> {
427        let block_size = self.block_size();
428        assert_eq!(range.start % block_size, 0);
429        assert_eq!(range.end % block_size, 0);
430        if range.start == range.end {
431            return Ok(0);
432        }
433        let tree = &self.store().tree;
434        let layer_set = tree.layer_set();
435        let key = ExtentKey { range };
436        let lower_bound = ObjectKey::attribute(
437            self.object_id(),
438            attribute_id,
439            AttributeKey::Extent(key.search_key()),
440        );
441        let mut merger = layer_set.merger();
442        let mut iter = merger.query(Query::FullRange(&lower_bound)).await?;
443        let allocator = self.store().allocator();
444        let mut deallocated = 0;
445        let trace = self.trace();
446        while let Some(ItemRef {
447            key:
448                ObjectKey {
449                    object_id,
450                    data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
451                },
452            value: ObjectValue::Extent(value),
453            ..
454        }) = iter.get()
455        {
456            if *object_id != self.object_id() || *attr_id != attribute_id {
457                break;
458            }
459            if let ExtentValue::Some { device_offset, .. } = value {
460                if let Some(overlap) = key.overlap(extent_key) {
461                    let range = device_offset + overlap.start - extent_key.range.start
462                        ..device_offset + overlap.end - extent_key.range.start;
463                    ensure!(range.is_aligned(block_size), FxfsError::Inconsistent);
464                    if trace {
465                        info!(
466                            store_id = self.store().store_object_id(),
467                            oid = self.object_id(),
468                            device_range:? = range,
469                            len = range.end - range.start,
470                            extent_key:?;
471                            "D",
472                        );
473                    }
474                    allocator
475                        .deallocate(transaction, self.store().store_object_id(), range)
476                        .await?;
477                    deallocated += overlap.end - overlap.start;
478                } else {
479                    break;
480                }
481            }
482            iter.advance().await?;
483        }
484        Ok(deallocated)
485    }
486
487    // Writes aligned data (that should already be encrypted) to the given offset and computes
488    // checksums if requested. The aligned data must be from a single logical file range.
489    async fn write_aligned(
490        &self,
491        buf: BufferRef<'_>,
492        device_offset: u64,
493        crypt_ctx: Option<(u32, u8)>,
494    ) -> Result<MaybeChecksums, Error> {
495        if self.trace() {
496            info!(
497                store_id = self.store().store_object_id(),
498                oid = self.object_id(),
499                device_range:? = (device_offset..device_offset + buf.len() as u64),
500                len = buf.len();
501                "W",
502            );
503        }
504        let store = self.store();
505        store.device_write_ops.fetch_add(1, Ordering::Relaxed);
506        let mut checksums = Vec::new();
507        let _watchdog = Watchdog::new(10, |count| {
508            warn!("Write has been stalled for {} seconds", count * 10);
509        });
510
511        match crypt_ctx {
512            Some((dun, slot)) => {
513                if !store.filesystem().options().barriers_enabled {
514                    return Err(anyhow!(FxfsError::InvalidArgs)
515                        .context("Barriers must be enabled for inline encrypted writes."));
516                }
517                store
518                    .device
519                    .write_with_opts(
520                        device_offset as u64,
521                        buf,
522                        WriteOptions {
523                            inline_crypto: InlineCryptoOptions::enabled(slot, dun),
524                            ..Default::default()
525                        },
526                    )
527                    .await?;
528                Ok(MaybeChecksums::None)
529            }
530            None => {
531                if self.options.skip_checksums {
532                    store
533                        .device
534                        .write_with_opts(device_offset as u64, buf, WriteOptions::default())
535                        .await?;
536                    Ok(MaybeChecksums::None)
537                } else {
538                    try_join!(store.device.write(device_offset, buf), async {
539                        let block_size = self.block_size();
540                        for chunk in buf.as_slice().chunks_exact(block_size as usize) {
541                            checksums.push(fletcher64(chunk, 0));
542                        }
543                        Ok(())
544                    })?;
545                    Ok(MaybeChecksums::Fletcher(checksums))
546                }
547            }
548        }
549    }
550
551    /// Flushes the underlying device.  This is expensive and should be used sparingly.
552    pub async fn flush_device(&self) -> Result<(), Error> {
553        self.store().device().flush().await
554    }
555
556    pub async fn update_allocated_size(
557        &self,
558        transaction: &mut Transaction<'_>,
559        allocated: u64,
560        deallocated: u64,
561    ) -> Result<(), Error> {
562        if allocated == deallocated {
563            return Ok(());
564        }
565        let mut mutation = self.txn_get_object_mutation(transaction).await?;
566        if let ObjectValue::Object {
567            attributes: ObjectAttributes { project_id, allocated_size, .. },
568            ..
569        } = &mut mutation.item.value
570        {
571            // The only way for these to fail are if the volume is inconsistent.
572            *allocated_size = allocated_size
573                .checked_add(allocated)
574                .ok_or_else(|| anyhow!(FxfsError::Inconsistent).context("Allocated size overflow"))?
575                .checked_sub(deallocated)
576                .ok_or_else(|| {
577                    anyhow!(FxfsError::Inconsistent).context("Allocated size underflow")
578                })?;
579
580            if *project_id != 0 {
581                // The allocated and deallocated shouldn't exceed the max size of the file which is
582                // bound within i64.
583                let diff = i64::try_from(allocated).unwrap() - i64::try_from(deallocated).unwrap();
584                transaction.add(
585                    self.store().store_object_id(),
586                    Mutation::merge_object(
587                        ObjectKey::project_usage(
588                            self.store().root_directory_object_id(),
589                            *project_id,
590                        ),
591                        ObjectValue::BytesAndNodes { bytes: diff, nodes: 0 },
592                    ),
593                );
594            }
595        } else {
596            // This can occur when the object mutation is created from an object in the tree which
597            // was corrupt.
598            bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
599        }
600        transaction.add(self.store().store_object_id, Mutation::ObjectStore(mutation));
601        Ok(())
602    }
603
604    pub async fn update_attributes<'a>(
605        &self,
606        transaction: &mut Transaction<'a>,
607        node_attributes: Option<&fio::MutableNodeAttributes>,
608        change_time: Option<Timestamp>,
609    ) -> Result<(), Error> {
610        if let Some(&fio::MutableNodeAttributes { selinux_context: Some(ref context), .. }) =
611            node_attributes
612        {
613            if let fio::SelinuxContext::Data(context) = context {
614                self.set_extended_attribute_impl(
615                    "security.selinux".into(),
616                    context.clone(),
617                    SetExtendedAttributeMode::Set,
618                    transaction,
619                )
620                .await?;
621            } else {
622                return Err(anyhow!(FxfsError::InvalidArgs)
623                    .context("Only set SELinux context with `data` member."));
624            }
625        }
626        self.store()
627            .update_attributes(transaction, self.object_id, node_attributes, change_time)
628            .await
629    }
630
631    /// Zeroes the given range.  The range must be aligned.  Returns the amount of data deallocated.
632    pub async fn zero(
633        &self,
634        transaction: &mut Transaction<'_>,
635        attribute_id: u64,
636        range: Range<u64>,
637    ) -> Result<(), Error> {
638        let deallocated =
639            self.deallocate_old_extents(transaction, attribute_id, range.clone()).await?;
640        if deallocated > 0 {
641            self.update_allocated_size(transaction, 0, deallocated).await?;
642            transaction.add(
643                self.store().store_object_id,
644                Mutation::merge_object(
645                    ObjectKey::extent(self.object_id(), attribute_id, range),
646                    ObjectValue::Extent(ExtentValue::deleted_extent()),
647                ),
648            );
649        }
650        Ok(())
651    }
652
653    // Returns a new aligned buffer (reading the head and tail blocks if necessary) with a copy of
654    // the data from `buf`.
655    pub async fn align_buffer(
656        &self,
657        attribute_id: u64,
658        offset: u64,
659        buf: BufferRef<'_>,
660    ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
661        let block_size = self.block_size();
662        let end = offset + buf.len() as u64;
663        let aligned =
664            round_down(offset, block_size)..round_up(end, block_size).ok_or(FxfsError::TooBig)?;
665
666        let mut aligned_buf =
667            self.store().device.allocate_buffer((aligned.end - aligned.start) as usize).await;
668
669        // Deal with head alignment.
670        if aligned.start < offset {
671            let mut head_block = aligned_buf.subslice_mut(..block_size as usize);
672            let read = self.read(attribute_id, aligned.start, head_block.reborrow()).await?;
673            head_block.as_mut_slice()[read..].fill(0);
674        }
675
676        // Deal with tail alignment.
677        if aligned.end > end {
678            let end_block_offset = aligned.end - block_size;
679            // There's no need to read the tail block if we read it as part of the head block.
680            if offset <= end_block_offset {
681                let mut tail_block =
682                    aligned_buf.subslice_mut(aligned_buf.len() - block_size as usize..);
683                let read = self.read(attribute_id, end_block_offset, tail_block.reborrow()).await?;
684                tail_block.as_mut_slice()[read..].fill(0);
685            }
686        }
687
688        aligned_buf.as_mut_slice()
689            [(offset - aligned.start) as usize..(end - aligned.start) as usize]
690            .copy_from_slice(buf.as_slice());
691
692        Ok((aligned, aligned_buf))
693    }
694
695    /// Trim an attribute's extents, potentially adding a graveyard trim entry if more trimming is
696    /// needed, so the transaction can be committed without worrying about leaking data.
697    ///
698    /// This doesn't update the size stored in the attribute value - the caller is responsible for
699    /// doing that to keep the size up to date.
700    pub async fn shrink(
701        &self,
702        transaction: &mut Transaction<'_>,
703        attribute_id: u64,
704        size: u64,
705    ) -> Result<NeedsTrim, Error> {
706        let store = self.store();
707        let needs_trim = matches!(
708            store
709                .trim_some(transaction, self.object_id(), attribute_id, TrimMode::FromOffset(size))
710                .await?,
711            TrimResult::Incomplete
712        );
713        if needs_trim {
714            // Add the object to the graveyard in case the following transactions don't get
715            // replayed.
716            let graveyard_id = store.graveyard_directory_object_id();
717            match store
718                .tree
719                .find(&ObjectKey::graveyard_entry(graveyard_id, self.object_id()))
720                .await?
721            {
722                Some(ObjectItem { value: ObjectValue::Some, .. })
723                | Some(ObjectItem { value: ObjectValue::Trim, .. }) => {
724                    // This object is already in the graveyard so we don't need to do anything.
725                }
726                _ => {
727                    transaction.add(
728                        store.store_object_id,
729                        Mutation::replace_or_insert_object(
730                            ObjectKey::graveyard_entry(graveyard_id, self.object_id()),
731                            ObjectValue::Trim,
732                        ),
733                    );
734                }
735            }
736        }
737        Ok(NeedsTrim(needs_trim))
738    }
739
740    /// Reads and decrypts a singular logical range.
741    pub async fn read_and_decrypt(
742        &self,
743        device_offset: u64,
744        file_offset: u64,
745        mut buffer: MutableBufferRef<'_>,
746        key_id: u64,
747    ) -> Result<(), Error> {
748        let store = self.store();
749        store.device_read_ops.fetch_add(1, Ordering::Relaxed);
750
751        let _watchdog = Watchdog::new(10, |count| {
752            warn!("Read has been stalled for {} seconds", count * 10);
753        });
754
755        let (_key_id, key) = self.get_key(Some(key_id)).await?;
756        if let Some(key) = key {
757            if let Some((dun, slot)) = key.crypt_ctx(self.object_id, file_offset) {
758                store
759                    .device
760                    .read_with_opts(
761                        device_offset as u64,
762                        buffer.reborrow(),
763                        ReadOptions { inline_crypto: InlineCryptoOptions::enabled(slot, dun) },
764                    )
765                    .await?;
766            } else {
767                store.device.read(device_offset, buffer.reborrow()).await?;
768                key.decrypt(self.object_id, device_offset, file_offset, buffer.as_mut_slice())?;
769            }
770        } else {
771            store.device.read(device_offset, buffer.reborrow()).await?;
772        }
773
774        Ok(())
775    }
776
777    /// Returns the specified key. If `key_id` is None, it will try and return the fscrypt key if
778    /// it is present, or the volume key if it isn't. If the fscrypt key is present, but the key
779    /// cannot be unwrapped, then this will return `FxfsError::NoKey`. If the volume is not
780    /// encrypted, this returns None.
781    pub async fn get_key(
782        &self,
783        key_id: Option<u64>,
784    ) -> Result<(u64, Option<Arc<dyn Cipher>>), Error> {
785        let store = self.store();
786        let result = match self.encryption {
787            Encryption::None => (VOLUME_DATA_KEY_ID, None),
788            Encryption::CachedKeys => {
789                if let Some(key_id) = key_id {
790                    (
791                        key_id,
792                        Some(
793                            store
794                                .key_manager
795                                .get_key(
796                                    self.object_id,
797                                    store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
798                                    async || store.get_keys(self.object_id).await,
799                                    key_id,
800                                )
801                                .await?,
802                        ),
803                    )
804                } else {
805                    let (key_id, key) = store
806                        .key_manager
807                        .get_fscrypt_key_if_present(
808                            self.object_id,
809                            store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
810                            async || store.get_keys(self.object_id).await,
811                        )
812                        .await?;
813                    (key_id, Some(key))
814                }
815            }
816            Encryption::PermanentKeys => {
817                (VOLUME_DATA_KEY_ID, Some(store.key_manager.get(self.object_id).await?.unwrap()))
818            }
819        };
820
821        // Ensure that if the key we receive uses inline encryption, barriers should be enabled.
822        if let Some(ref key) = result.1 {
823            if key.crypt_ctx(self.object_id, 0).is_some() {
824                if !store.filesystem().options().barriers_enabled {
825                    return Err(anyhow!(FxfsError::InvalidArgs)
826                        .context("Barriers must be enabled for inline encrypted writes."));
827                }
828            }
829        }
830
831        Ok(result)
832    }
833
834    /// This will only work for a non-permanent volume data key. This is designed to be used with
835    /// extended attributes where we'll only create the key on demand for directories and encrypted
836    /// files.
837    async fn get_or_create_key(
838        &self,
839        transaction: &mut Transaction<'_>,
840    ) -> Result<Arc<dyn Cipher>, Error> {
841        let store = self.store();
842
843        // Fast path: try and get keys from the cache.
844        if let Some(key) = store.key_manager.get(self.object_id).await.context("get failed")? {
845            return Ok(key);
846        }
847
848        let crypt = store.crypt().ok_or_else(|| anyhow!("No crypt!"))?;
849
850        // Next, see if the keys are already created.
851        let (mut encryption_keys, mut cipher_set) = if let Some(item) =
852            store.tree.find(&ObjectKey::keys(self.object_id)).await.context("find failed")?
853        {
854            if let ObjectValue::Keys(encryption_keys) = item.value {
855                let cipher_set = store
856                    .key_manager
857                    .get_keys(
858                        self.object_id,
859                        crypt.as_ref(),
860                        &mut Some(async || Ok(encryption_keys.clone())),
861                        /* permanent= */ false,
862                        /* force= */ false,
863                    )
864                    .await
865                    .context("get_keys failed")?;
866                match cipher_set.find_key(VOLUME_DATA_KEY_ID) {
867                    FindKeyResult::NotFound => {}
868                    FindKeyResult::Unavailable(_) => return Err(FxfsError::NoKey.into()),
869                    FindKeyResult::Key(key) => return Ok(key),
870                }
871                (encryption_keys, (*cipher_set).clone())
872            } else {
873                return Err(anyhow!(FxfsError::Inconsistent));
874            }
875        } else {
876            Default::default()
877        };
878
879        // Proceed to create the key.  The transaction holds the required locks.
880        let (key, unwrapped_key) = crypt.create_key(self.object_id, KeyPurpose::Data).await?;
881        let cipher: Arc<dyn Cipher> = Arc::new(FxfsCipher::new(&unwrapped_key));
882
883        // Add new cipher to cloned cipher set. This will replace existing one
884        // if transaction is successful.
885        cipher_set.add_key(VOLUME_DATA_KEY_ID, CipherHolder::Cipher(cipher.clone()));
886        let cipher_set = Arc::new(cipher_set);
887
888        // Arrange for the CipherSet to be added to the cache when (and if) the transaction
889        // commits.
890        struct UnwrappedKeys {
891            object_id: u64,
892            new_keys: Arc<CipherSet>,
893        }
894
895        impl AssociatedObject for UnwrappedKeys {
896            fn will_apply_mutation(
897                &self,
898                _mutation: &Mutation,
899                object_id: u64,
900                manager: &ObjectManager,
901            ) {
902                manager.store(object_id).unwrap().key_manager.insert(
903                    self.object_id,
904                    self.new_keys.clone(),
905                    /* permanent= */ false,
906                );
907            }
908        }
909
910        encryption_keys.insert(VOLUME_DATA_KEY_ID, EncryptionKey::Fxfs(key).into());
911
912        transaction.add_with_object(
913            store.store_object_id(),
914            Mutation::replace_or_insert_object(
915                ObjectKey::keys(self.object_id),
916                ObjectValue::keys(encryption_keys),
917            ),
918            AssocObj::Owned(Box::new(UnwrappedKeys {
919                object_id: self.object_id,
920                new_keys: cipher_set,
921            })),
922        );
923
924        Ok(cipher)
925    }
926
927    pub async fn read(
928        &self,
929        attribute_id: u64,
930        offset: u64,
931        mut buf: MutableBufferRef<'_>,
932    ) -> Result<usize, Error> {
933        let fs = self.store().filesystem();
934        let guard = fs
935            .lock_manager()
936            .read_lock(lock_keys![LockKey::object_attribute(
937                self.store().store_object_id(),
938                self.object_id(),
939                attribute_id,
940            )])
941            .await;
942
943        let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
944        let item = self.store().tree().find(&key).await?;
945        let size = match item {
946            Some(item) if item.key == key => match item.value {
947                ObjectValue::Attribute { size, .. } => size,
948                _ => bail!(FxfsError::Inconsistent),
949            },
950            _ => return Ok(0),
951        };
952        if offset >= size {
953            return Ok(0);
954        }
955        let length = min(buf.len() as u64, size - offset) as usize;
956        buf = buf.subslice_mut(0..length);
957        self.read_unchecked(attribute_id, offset, buf, &guard).await?;
958        Ok(length)
959    }
960
961    /// Read `buf.len()` bytes from the attribute `attribute_id`, starting at `offset`, into `buf`.
962    /// It's required that a read lock on this attribute id is taken before this is called.
963    ///
964    /// This function doesn't do any size checking - any portion of `buf` past the end of the file
965    /// will be filled with zeros. The caller is responsible for enforcing the file size on reads.
966    /// This is because, just looking at the extents, we can't tell the difference between the file
967    /// actually ending and there just being a section at the end with no data (since attributes
968    /// are sparse).
969    pub(super) async fn read_unchecked(
970        &self,
971        attribute_id: u64,
972        mut offset: u64,
973        mut buf: MutableBufferRef<'_>,
974        _guard: &ReadGuard<'_>,
975    ) -> Result<(), Error> {
976        if buf.len() == 0 {
977            return Ok(());
978        }
979        let end_offset = offset + buf.len() as u64;
980
981        self.store().logical_read_ops.fetch_add(1, Ordering::Relaxed);
982
983        // Whilst the read offset must be aligned to the filesystem block size, the buffer need only
984        // be aligned to the device's block size.
985        let block_size = self.block_size() as u64;
986        let device_block_size = self.store().device.block_size() as u64;
987        assert_eq!(offset % block_size, 0);
988        assert_eq!(buf.range().start as u64 % device_block_size, 0);
989        let tree = &self.store().tree;
990        let layer_set = tree.layer_set();
991        let mut merger = layer_set.merger();
992        let mut iter = merger
993            .query(Query::LimitedRange(&ObjectKey::extent(
994                self.object_id(),
995                attribute_id,
996                offset..end_offset,
997            )))
998            .await?;
999        let end_align = ((offset + buf.len() as u64) % block_size) as usize;
1000        let trace = self.trace();
1001        let reads = FuturesUnordered::new();
1002        while let Some(ItemRef {
1003            key:
1004                ObjectKey {
1005                    object_id,
1006                    data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
1007                },
1008            value: ObjectValue::Extent(extent_value),
1009            ..
1010        }) = iter.get()
1011        {
1012            if *object_id != self.object_id() || *attr_id != attribute_id {
1013                break;
1014            }
1015            ensure!(
1016                extent_key.range.is_valid() && extent_key.range.is_aligned(block_size),
1017                FxfsError::Inconsistent
1018            );
1019            if extent_key.range.start > offset {
1020                // Zero everything up to the start of the extent.
1021                let to_zero = min(extent_key.range.start - offset, buf.len() as u64) as usize;
1022                for i in &mut buf.as_mut_slice()[..to_zero] {
1023                    *i = 0;
1024                }
1025                buf = buf.subslice_mut(to_zero..);
1026                if buf.is_empty() {
1027                    break;
1028                }
1029                offset += to_zero as u64;
1030            }
1031
1032            if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1033                let mut device_offset = device_offset + (offset - extent_key.range.start);
1034                let key_id = *key_id;
1035
1036                let to_copy = min(buf.len() - end_align, (extent_key.range.end - offset) as usize);
1037                if to_copy > 0 {
1038                    if trace {
1039                        info!(
1040                            store_id = self.store().store_object_id(),
1041                            oid = self.object_id(),
1042                            device_range:? = (device_offset..device_offset + to_copy as u64),
1043                            offset,
1044                            range:? = extent_key.range,
1045                            block_size;
1046                            "R",
1047                        );
1048                    }
1049                    let (mut head, tail) = buf.split_at_mut(to_copy);
1050                    let maybe_bitmap = match mode {
1051                        ExtentMode::OverwritePartial(bitmap) => {
1052                            let mut read_bitmap = bitmap.clone().split_off(
1053                                ((offset - extent_key.range.start) / block_size) as usize,
1054                            );
1055                            read_bitmap.truncate(to_copy / block_size as usize);
1056                            Some(read_bitmap)
1057                        }
1058                        _ => None,
1059                    };
1060                    reads.push(async move {
1061                        self.read_and_decrypt(device_offset, offset, head.reborrow(), key_id)
1062                            .await?;
1063                        if let Some(bitmap) = maybe_bitmap {
1064                            apply_bitmap_zeroing(self.block_size() as usize, &bitmap, head);
1065                        }
1066                        Ok::<(), Error>(())
1067                    });
1068                    buf = tail;
1069                    if buf.is_empty() {
1070                        break;
1071                    }
1072                    offset += to_copy as u64;
1073                    device_offset += to_copy as u64;
1074                }
1075
1076                // Deal with end alignment by reading the existing contents into an alignment
1077                // buffer.
1078                if offset < extent_key.range.end && end_align > 0 {
1079                    if let ExtentMode::OverwritePartial(bitmap) = mode {
1080                        let bitmap_offset = (offset - extent_key.range.start) / block_size;
1081                        if !bitmap.get(bitmap_offset as usize).ok_or(FxfsError::Inconsistent)? {
1082                            // If this block isn't actually initialized, skip it.
1083                            break;
1084                        }
1085                    }
1086                    let mut align_buf =
1087                        self.store().device.allocate_buffer(block_size as usize).await;
1088                    if trace {
1089                        info!(
1090                            store_id = self.store().store_object_id(),
1091                            oid = self.object_id(),
1092                            device_range:? = (device_offset..device_offset + align_buf.len() as u64);
1093                            "RT",
1094                        );
1095                    }
1096                    self.read_and_decrypt(device_offset, offset, align_buf.as_mut(), key_id)
1097                        .await?;
1098                    buf.as_mut_slice().copy_from_slice(&align_buf.as_slice()[..end_align]);
1099                    buf = buf.subslice_mut(0..0);
1100                    break;
1101                }
1102            } else if extent_key.range.end >= offset + buf.len() as u64 {
1103                // Deleted extent covers remainder, so we're done.
1104                break;
1105            }
1106
1107            iter.advance().await?;
1108        }
1109        reads.try_collect::<()>().await?;
1110        buf.as_mut_slice().fill(0);
1111        Ok(())
1112    }
1113
1114    /// Reads an entire attribute.
1115    pub async fn read_attr(&self, attribute_id: u64) -> Result<Option<Box<[u8]>>, Error> {
1116        let store = self.store();
1117        let tree = &store.tree;
1118        let layer_set = tree.layer_set();
1119        let mut merger = layer_set.merger();
1120        let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1121        let mut iter = merger.query(Query::FullRange(&key)).await?;
1122        let (mut buffer, size) = match iter.get() {
1123            Some(item) if item.key == &key => match item.value {
1124                ObjectValue::Attribute { size, .. } => {
1125                    // TODO(https://fxbug.dev/42073113): size > max buffer size
1126                    (
1127                        store
1128                            .device
1129                            .allocate_buffer(round_up(*size, self.block_size()).unwrap() as usize)
1130                            .await,
1131                        *size as usize,
1132                    )
1133                }
1134                // Attribute was deleted.
1135                ObjectValue::None => return Ok(None),
1136                _ => bail!(FxfsError::Inconsistent),
1137            },
1138            _ => return Ok(None),
1139        };
1140        store.logical_read_ops.fetch_add(1, Ordering::Relaxed);
1141        let mut last_offset = 0;
1142        loop {
1143            iter.advance().await?;
1144            match iter.get() {
1145                Some(ItemRef {
1146                    key:
1147                        ObjectKey {
1148                            object_id,
1149                            data:
1150                                ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
1151                        },
1152                    value: ObjectValue::Extent(extent_value),
1153                    ..
1154                }) if *object_id == self.object_id() && *attr_id == attribute_id => {
1155                    if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1156                        let offset = extent_key.range.start as usize;
1157                        buffer.as_mut_slice()[last_offset..offset].fill(0);
1158                        let end = std::cmp::min(extent_key.range.end as usize, buffer.len());
1159                        let maybe_bitmap = match mode {
1160                            ExtentMode::OverwritePartial(bitmap) => {
1161                                // The caller has to adjust the bitmap if necessary, but we always
1162                                // start from the beginning of any extent, so we only truncate.
1163                                let mut read_bitmap = bitmap.clone();
1164                                read_bitmap.truncate(
1165                                    (end - extent_key.range.start as usize)
1166                                        / self.block_size() as usize,
1167                                );
1168                                Some(read_bitmap)
1169                            }
1170                            _ => None,
1171                        };
1172                        self.read_and_decrypt(
1173                            *device_offset,
1174                            extent_key.range.start,
1175                            buffer.subslice_mut(offset..end as usize),
1176                            *key_id,
1177                        )
1178                        .await?;
1179                        if let Some(bitmap) = maybe_bitmap {
1180                            apply_bitmap_zeroing(
1181                                self.block_size() as usize,
1182                                &bitmap,
1183                                buffer.subslice_mut(offset..end as usize),
1184                            );
1185                        }
1186                        last_offset = end;
1187                        if last_offset >= size {
1188                            break;
1189                        }
1190                    }
1191                }
1192                _ => break,
1193            }
1194        }
1195        buffer.as_mut_slice()[std::cmp::min(last_offset, size)..].fill(0);
1196        Ok(Some(buffer.as_slice()[..size].into()))
1197    }
1198
1199    /// Writes potentially unaligned data at `device_offset` and returns checksums if requested.
1200    /// The data will be encrypted if necessary.  `buf` is mutable as an optimization, since the
1201    /// write may require encryption, we can encrypt the buffer in-place rather than copying to
1202    /// another buffer if the write is already aligned.
1203    ///
1204    /// NOTE: This will not create keys if they are missing (it will fail with an error if that
1205    /// happens to be the case).
1206    pub async fn write_at(
1207        &self,
1208        attribute_id: u64,
1209        offset: u64,
1210        buf: MutableBufferRef<'_>,
1211        key_id: Option<u64>,
1212        mut device_offset: u64,
1213    ) -> Result<MaybeChecksums, Error> {
1214        let mut transfer_buf;
1215        let block_size = self.block_size();
1216        let (range, mut transfer_buf_ref) =
1217            if offset % block_size == 0 && buf.len() as u64 % block_size == 0 {
1218                (offset..offset + buf.len() as u64, buf)
1219            } else {
1220                let (range, buf) = self.align_buffer(attribute_id, offset, buf.as_ref()).await?;
1221                transfer_buf = buf;
1222                device_offset -= offset - range.start;
1223                (range, transfer_buf.as_mut())
1224            };
1225
1226        let mut crypt_ctx = None;
1227        if let (_, Some(key)) = self.get_key(key_id).await? {
1228            if let Some(ctx) = key.crypt_ctx(self.object_id, range.start) {
1229                crypt_ctx = Some(ctx);
1230            } else {
1231                key.encrypt(
1232                    self.object_id,
1233                    device_offset,
1234                    range.start,
1235                    transfer_buf_ref.as_mut_slice(),
1236                )?;
1237            }
1238        }
1239        self.write_aligned(transfer_buf_ref.as_ref(), device_offset, crypt_ctx).await
1240    }
1241
1242    /// Writes to multiple ranges with data provided in `buf`. This function is specifically
1243    /// designed for migration purposes, allowing raw writes to the device without updating
1244    /// object metadata like allocated size or mtime. It's essential for scenarios where
1245    /// data needs to be transferred directly without triggering standard filesystem operations.
1246    #[cfg(feature = "migration")]
1247    pub async fn raw_multi_write(
1248        &self,
1249        transaction: &mut Transaction<'_>,
1250        attribute_id: u64,
1251        key_id: Option<u64>,
1252        ranges: &[Range<u64>],
1253        buf: MutableBufferRef<'_>,
1254    ) -> Result<(), Error> {
1255        self.multi_write_internal(transaction, attribute_id, key_id, ranges, buf).await?;
1256        Ok(())
1257    }
1258
1259    /// This is a low-level write function that writes to multiple ranges. Users should generally
1260    /// use `multi_write` instead of this function as this does not update the object's allocated
1261    /// size, mtime, atime, etc.
1262    ///
1263    /// Returns (allocated, deallocated) bytes on success.
1264    async fn multi_write_internal(
1265        &self,
1266        transaction: &mut Transaction<'_>,
1267        attribute_id: u64,
1268        key_id: Option<u64>,
1269        ranges: &[Range<u64>],
1270        mut buf: MutableBufferRef<'_>,
1271    ) -> Result<(u64, u64), Error> {
1272        if buf.is_empty() {
1273            return Ok((0, 0));
1274        }
1275        let block_size = self.block_size();
1276        let store = self.store();
1277        let store_id = store.store_object_id();
1278
1279        // The only key we allow to be created on-the-fly is a non permanent key wrapped with the
1280        // volume data key.
1281        let (key_id, key) = if key_id == Some(VOLUME_DATA_KEY_ID)
1282            && matches!(self.encryption, Encryption::CachedKeys)
1283        {
1284            (
1285                VOLUME_DATA_KEY_ID,
1286                Some(
1287                    self.get_or_create_key(transaction)
1288                        .await
1289                        .context("get_or_create_key failed")?,
1290                ),
1291            )
1292        } else {
1293            self.get_key(key_id).await?
1294        };
1295        if let Some(key) = &key {
1296            if !key.supports_inline_encryption() {
1297                let mut slice = buf.as_mut_slice();
1298                for r in ranges {
1299                    let l = r.end - r.start;
1300                    let (head, tail) = slice.split_at_mut(l as usize);
1301                    key.encrypt(
1302                        self.object_id,
1303                        0, /* TODO(https://fxbug.dev/421269588): plumb through device_offset. */
1304                        r.start,
1305                        head,
1306                    )?;
1307                    slice = tail;
1308                }
1309            }
1310        }
1311
1312        let mut allocated = 0;
1313        let allocator = store.allocator();
1314        let trace = self.trace();
1315        let mut writes = FuturesOrdered::new();
1316
1317        let mut logical_ranges = ranges.iter();
1318        let mut current_range = logical_ranges.next().unwrap().clone();
1319
1320        while !buf.is_empty() {
1321            let mut device_range = allocator
1322                .allocate(transaction, store_id, buf.len() as u64)
1323                .await
1324                .context("allocation failed")?;
1325            if trace {
1326                info!(
1327                    store_id,
1328                    oid = self.object_id(),
1329                    device_range:?,
1330                    len = device_range.end - device_range.start;
1331                    "A",
1332                );
1333            }
1334            let mut device_range_len = device_range.end - device_range.start;
1335            allocated += device_range_len;
1336            // If inline encryption is NOT supported, this loop should only happen once.
1337            while device_range_len > 0 {
1338                if current_range.end <= current_range.start {
1339                    current_range = logical_ranges.next().unwrap().clone();
1340                }
1341                let (crypt_ctx, split) = if let Some(key) = &key {
1342                    if key.supports_inline_encryption() {
1343                        let split = std::cmp::min(
1344                            current_range.end - current_range.start,
1345                            device_range_len,
1346                        );
1347                        let crypt_ctx = key.crypt_ctx(self.object_id, current_range.start);
1348                        current_range.start += split;
1349                        (crypt_ctx, split)
1350                    } else {
1351                        (None, device_range_len)
1352                    }
1353                } else {
1354                    (None, device_range_len)
1355                };
1356
1357                let (head, tail) = buf.split_at_mut(split as usize);
1358                buf = tail;
1359
1360                writes.push_back(async move {
1361                    let len = head.len() as u64;
1362                    Result::<_, Error>::Ok((
1363                        device_range.start,
1364                        len,
1365                        self.write_aligned(head.as_ref(), device_range.start, crypt_ctx).await?,
1366                    ))
1367                });
1368                device_range.start += split;
1369                device_range_len = device_range.end - device_range.start;
1370            }
1371        }
1372
1373        self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1374        let ((mutations, checksums), deallocated) = try_join!(
1375            async {
1376                let mut current_range = 0..0;
1377                let mut mutations = Vec::new();
1378                let mut out_checksums = Vec::new();
1379                let mut ranges = ranges.iter();
1380                while let Some((mut device_offset, mut len, mut checksums)) =
1381                    writes.try_next().await?
1382                {
1383                    while len > 0 {
1384                        if current_range.end <= current_range.start {
1385                            current_range = ranges.next().unwrap().clone();
1386                        }
1387                        let chunk_len = std::cmp::min(len, current_range.end - current_range.start);
1388                        let tail = checksums.split_off((chunk_len / block_size) as usize);
1389                        if let Some(checksums) = checksums.maybe_as_ref() {
1390                            out_checksums.push((
1391                                device_offset..device_offset + chunk_len,
1392                                checksums.to_owned(),
1393                            ));
1394                        }
1395                        mutations.push(Mutation::merge_object(
1396                            ObjectKey::extent(
1397                                self.object_id(),
1398                                attribute_id,
1399                                current_range.start..current_range.start + chunk_len,
1400                            ),
1401                            ObjectValue::Extent(ExtentValue::new(
1402                                device_offset,
1403                                checksums.to_mode(),
1404                                key_id,
1405                            )),
1406                        ));
1407                        checksums = tail;
1408                        device_offset += chunk_len;
1409                        len -= chunk_len;
1410                        current_range.start += chunk_len;
1411                    }
1412                }
1413                Result::<_, Error>::Ok((mutations, out_checksums))
1414            },
1415            async {
1416                let mut deallocated = 0;
1417                for r in ranges {
1418                    deallocated +=
1419                        self.deallocate_old_extents(transaction, attribute_id, r.clone()).await?;
1420                }
1421                Result::<_, Error>::Ok(deallocated)
1422            }
1423        )?;
1424
1425        for m in mutations {
1426            transaction.add(store_id, m);
1427        }
1428
1429        // Only store checksums in the journal if barriers are not enabled.
1430        if !store.filesystem().options().barriers_enabled {
1431            for (r, c) in checksums {
1432                transaction.add_checksum(r, c, true);
1433            }
1434        }
1435        Ok((allocated, deallocated))
1436    }
1437
1438    /// Writes to multiple ranges with data provided in `buf`.  The buffer can be modified in place
1439    /// if encryption takes place.  The ranges must all be aligned and no change to content size is
1440    /// applied; the caller is responsible for updating size if required.  If `key_id` is None, it
1441    /// means pick the default key for the object which is the fscrypt key if present, or the volume
1442    /// data key, or no key if it's an unencrypted file.
1443    pub async fn multi_write(
1444        &self,
1445        transaction: &mut Transaction<'_>,
1446        attribute_id: u64,
1447        key_id: Option<u64>,
1448        ranges: &[Range<u64>],
1449        buf: MutableBufferRef<'_>,
1450    ) -> Result<(), Error> {
1451        let (allocated, deallocated) =
1452            self.multi_write_internal(transaction, attribute_id, key_id, ranges, buf).await?;
1453        if allocated == 0 && deallocated == 0 {
1454            return Ok(());
1455        }
1456        self.update_allocated_size(transaction, allocated, deallocated).await
1457    }
1458
1459    /// Write data to overwrite extents with the provided set of ranges. This makes a strong
1460    /// assumption that the ranges are actually going to be already allocated overwrite extents and
1461    /// will error out or do something wrong if they aren't. It also assumes the ranges passed to
1462    /// it are sorted.
1463    pub async fn multi_overwrite<'a>(
1464        &'a self,
1465        transaction: &mut Transaction<'a>,
1466        attr_id: u64,
1467        ranges: &[Range<u64>],
1468        mut buf: MutableBufferRef<'_>,
1469    ) -> Result<(), Error> {
1470        if buf.is_empty() {
1471            return Ok(());
1472        }
1473        let block_size = self.block_size();
1474        let store = self.store();
1475        let tree = store.tree();
1476        let store_id = store.store_object_id();
1477
1478        let (key_id, key) = self.get_key(None).await?;
1479        if let Some(key) = &key {
1480            if !key.supports_inline_encryption() {
1481                let mut slice = buf.as_mut_slice();
1482                for r in ranges {
1483                    let l = r.end - r.start;
1484                    let (head, tail) = slice.split_at_mut(l as usize);
1485                    key.encrypt(
1486                        self.object_id,
1487                        0, /* TODO(https://fxbug.dev/421269588): plumb through device_offset. */
1488                        r.start,
1489                        head,
1490                    )?;
1491                    slice = tail;
1492                }
1493            }
1494        }
1495
1496        let mut range_iter = ranges.into_iter();
1497        // There should be at least one range if the buffer has data in it
1498        let mut target_range = range_iter.next().unwrap().clone();
1499        let mut mutations = Vec::new();
1500        let writes = FuturesUnordered::new();
1501
1502        let layer_set = tree.layer_set();
1503        let mut merger = layer_set.merger();
1504        let mut iter = merger
1505            .query(Query::FullRange(&ObjectKey::attribute(
1506                self.object_id(),
1507                attr_id,
1508                AttributeKey::Extent(ExtentKey::search_key_from_offset(target_range.start)),
1509            )))
1510            .await?;
1511
1512        loop {
1513            match iter.get() {
1514                Some(ItemRef {
1515                    key:
1516                        ObjectKey {
1517                            object_id,
1518                            data:
1519                                ObjectKeyData::Attribute(
1520                                    attribute_id,
1521                                    AttributeKey::Extent(ExtentKey { range }),
1522                                ),
1523                        },
1524                    value: ObjectValue::Extent(extent_value),
1525                    ..
1526                }) if *object_id == self.object_id() && *attribute_id == attr_id => {
1527                    // If this extent ends before the target range starts (not possible on the
1528                    // first loop because of the query parameters but possible on further loops),
1529                    // advance until we find a the next one we care about.
1530                    if range.end <= target_range.start {
1531                        iter.advance().await?;
1532                        continue;
1533                    }
1534                    let (device_offset, mode) = match extent_value {
1535                        ExtentValue::None => {
1536                            return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1537                                format!(
1538                                    "multi_overwrite failed: target_range ({}, {}) overlaps with \
1539                                deleted extent found at ({}, {})",
1540                                    target_range.start, target_range.end, range.start, range.end,
1541                                )
1542                            });
1543                        }
1544                        ExtentValue::Some { device_offset, mode, .. } => (device_offset, mode),
1545                    };
1546                    // The ranges passed to this function should already by allocated, so
1547                    // extent records should exist for them.
1548                    if range.start > target_range.start {
1549                        return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1550                            format!(
1551                                "multi_overwrite failed: target range ({}, {}) starts before first \
1552                            extent found at ({}, {})",
1553                                target_range.start, target_range.end, range.start, range.end,
1554                            )
1555                        });
1556                    }
1557                    let mut bitmap = match mode {
1558                        ExtentMode::Raw | ExtentMode::Cow(_) => {
1559                            return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1560                                format!(
1561                                    "multi_overwrite failed: \
1562                            extent from ({}, {}) which overlaps target range ({}, {}) had the \
1563                            wrong extent mode",
1564                                    range.start, range.end, target_range.start, target_range.end,
1565                                )
1566                            });
1567                        }
1568                        ExtentMode::OverwritePartial(bitmap) => {
1569                            OverwriteBitmaps::new(bitmap.clone())
1570                        }
1571                        ExtentMode::Overwrite => OverwriteBitmaps::None,
1572                    };
1573                    loop {
1574                        let offset_within_extent = target_range.start - range.start;
1575                        let bitmap_offset = offset_within_extent / block_size;
1576                        let write_device_offset = *device_offset + offset_within_extent;
1577                        let write_end = min(range.end, target_range.end);
1578                        let write_len = write_end - target_range.start;
1579                        let write_device_range =
1580                            write_device_offset..write_device_offset + write_len;
1581                        let (current_buf, remaining_buf) = buf.split_at_mut(write_len as usize);
1582
1583                        bitmap.set_offset(bitmap_offset as usize);
1584                        let checksum_ranges = ChecksumRangeChunk::group_first_write_ranges(
1585                            &mut bitmap,
1586                            block_size,
1587                            write_device_range,
1588                        );
1589
1590                        let crypt_ctx = if let Some(key) = &key {
1591                            key.crypt_ctx(self.object_id, target_range.start)
1592                        } else {
1593                            None
1594                        };
1595
1596                        writes.push(async move {
1597                            let maybe_checksums = self
1598                                .write_aligned(current_buf.as_ref(), write_device_offset, crypt_ctx)
1599                                .await?;
1600                            Ok::<_, Error>(match maybe_checksums {
1601                                MaybeChecksums::None => Vec::new(),
1602                                MaybeChecksums::Fletcher(checksums) => checksum_ranges
1603                                    .into_iter()
1604                                    .map(
1605                                        |ChecksumRangeChunk {
1606                                             checksum_range,
1607                                             device_range,
1608                                             is_first_write,
1609                                         }| {
1610                                            (
1611                                                device_range,
1612                                                checksums[checksum_range].to_vec(),
1613                                                is_first_write,
1614                                            )
1615                                        },
1616                                    )
1617                                    .collect(),
1618                            })
1619                        });
1620                        buf = remaining_buf;
1621                        target_range.start += write_len;
1622                        if target_range.start == target_range.end {
1623                            match range_iter.next() {
1624                                None => break,
1625                                Some(next_range) => target_range = next_range.clone(),
1626                            }
1627                        }
1628                        if range.end <= target_range.start {
1629                            break;
1630                        }
1631                    }
1632                    if let Some((mut bitmap, write_bitmap)) = bitmap.take_bitmaps() {
1633                        if bitmap.or(&write_bitmap) {
1634                            let mode = if bitmap.all() {
1635                                ExtentMode::Overwrite
1636                            } else {
1637                                ExtentMode::OverwritePartial(bitmap)
1638                            };
1639                            mutations.push(Mutation::merge_object(
1640                                ObjectKey::extent(self.object_id(), attr_id, range.clone()),
1641                                ObjectValue::Extent(ExtentValue::new(*device_offset, mode, key_id)),
1642                            ))
1643                        }
1644                    }
1645                    if target_range.start == target_range.end {
1646                        break;
1647                    }
1648                    iter.advance().await?;
1649                }
1650                // We've either run past the end of the existing extents or something is wrong with
1651                // the tree. The main section should break if it finishes the ranges, so either
1652                // case, this is an error.
1653                _ => bail!(anyhow!(FxfsError::Internal).context(
1654                    "found a non-extent object record while there were still ranges to process"
1655                )),
1656            }
1657        }
1658
1659        let checksums = writes.try_collect::<Vec<_>>().await?;
1660        // Only store checksums in the journal if barriers are not enabled.
1661        if !store.filesystem().options().barriers_enabled {
1662            for (r, c, first_write) in checksums.into_iter().flatten() {
1663                transaction.add_checksum(r, c, first_write);
1664            }
1665        }
1666
1667        for m in mutations {
1668            transaction.add(store_id, m);
1669        }
1670
1671        Ok(())
1672    }
1673
1674    /// Writes an attribute that should not already exist and therefore does not require trimming.
1675    /// Breaks up the write into multiple transactions if `data.len()` is larger than `batch_size`.
1676    /// If writing the attribute requires multiple transactions, adds the attribute to the
1677    /// graveyard. The caller is responsible for removing the attribute from the graveyard when it
1678    /// commits the last transaction.  This always writes using a key wrapped with the volume data
1679    /// key.
1680    #[trace]
1681    pub async fn write_new_attr_in_batches<'a>(
1682        &'a self,
1683        transaction: &mut Transaction<'a>,
1684        attribute_id: u64,
1685        data: &[u8],
1686        batch_size: usize,
1687    ) -> Result<(), Error> {
1688        transaction.add(
1689            self.store().store_object_id,
1690            Mutation::replace_or_insert_object(
1691                ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1692                ObjectValue::attribute(data.len() as u64, false),
1693            ),
1694        );
1695        let chunks = data.chunks(batch_size);
1696        let num_chunks = chunks.len();
1697        if num_chunks > 1 {
1698            transaction.add(
1699                self.store().store_object_id,
1700                Mutation::replace_or_insert_object(
1701                    ObjectKey::graveyard_attribute_entry(
1702                        self.store().graveyard_directory_object_id(),
1703                        self.object_id(),
1704                        attribute_id,
1705                    ),
1706                    ObjectValue::Some,
1707                ),
1708            );
1709        }
1710        let mut start_offset = 0;
1711        for (i, chunk) in chunks.enumerate() {
1712            let rounded_len = round_up(chunk.len() as u64, self.block_size()).unwrap();
1713            let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1714            let slice = buffer.as_mut_slice();
1715            slice[..chunk.len()].copy_from_slice(chunk);
1716            slice[chunk.len()..].fill(0);
1717            self.multi_write(
1718                transaction,
1719                attribute_id,
1720                Some(VOLUME_DATA_KEY_ID),
1721                &[start_offset..start_offset + rounded_len],
1722                buffer.as_mut(),
1723            )
1724            .await?;
1725            start_offset += rounded_len;
1726            // Do not commit the last chunk.
1727            if i < num_chunks - 1 {
1728                transaction.commit_and_continue().await?;
1729            }
1730        }
1731        Ok(())
1732    }
1733
1734    /// Writes an entire attribute. Returns whether or not the attribute needs to continue being
1735    /// trimmed - if the new data is shorter than the old data, this will trim any extents beyond
1736    /// the end of the new size, but if there were too many for a single transaction, a commit
1737    /// needs to be made before trimming again, so the responsibility is left to the caller so as
1738    /// to not accidentally split the transaction when it's not in a consistent state.  This will
1739    /// write using the volume data key; the fscrypt key is not supported.
1740    pub async fn write_attr(
1741        &self,
1742        transaction: &mut Transaction<'_>,
1743        attribute_id: u64,
1744        data: &[u8],
1745    ) -> Result<NeedsTrim, Error> {
1746        let rounded_len = round_up(data.len() as u64, self.block_size()).unwrap();
1747        let store = self.store();
1748        let tree = store.tree();
1749        let should_trim = if let Some(item) = tree
1750            .find(&ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute))
1751            .await?
1752        {
1753            match item.value {
1754                ObjectValue::Attribute { size: _, has_overwrite_extents: true } => {
1755                    bail!(
1756                        anyhow!(FxfsError::Inconsistent)
1757                            .context("write_attr on an attribute with overwrite extents")
1758                    )
1759                }
1760                ObjectValue::Attribute { size, .. } => (data.len() as u64) < size,
1761                _ => bail!(FxfsError::Inconsistent),
1762            }
1763        } else {
1764            false
1765        };
1766        let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1767        let slice = buffer.as_mut_slice();
1768        slice[..data.len()].copy_from_slice(data);
1769        slice[data.len()..].fill(0);
1770        self.multi_write(
1771            transaction,
1772            attribute_id,
1773            Some(VOLUME_DATA_KEY_ID),
1774            &[0..rounded_len],
1775            buffer.as_mut(),
1776        )
1777        .await?;
1778        transaction.add(
1779            self.store().store_object_id,
1780            Mutation::replace_or_insert_object(
1781                ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1782                ObjectValue::attribute(data.len() as u64, false),
1783            ),
1784        );
1785        if should_trim {
1786            self.shrink(transaction, attribute_id, data.len() as u64).await
1787        } else {
1788            Ok(NeedsTrim(false))
1789        }
1790    }
1791
1792    pub async fn list_extended_attributes(&self) -> Result<Vec<Vec<u8>>, Error> {
1793        let layer_set = self.store().tree().layer_set();
1794        let mut merger = layer_set.merger();
1795        // Seek to the first extended attribute key for this object.
1796        let mut iter = merger
1797            .query(Query::FullRange(&ObjectKey::extended_attribute(self.object_id(), Vec::new())))
1798            .await?;
1799        let mut out = Vec::new();
1800        while let Some(item) = iter.get() {
1801            // Skip deleted extended attributes.
1802            if item.value != &ObjectValue::None {
1803                match item.key {
1804                    ObjectKey { object_id, data: ObjectKeyData::ExtendedAttribute { name } } => {
1805                        if self.object_id() != *object_id {
1806                            bail!(
1807                                anyhow!(FxfsError::Inconsistent)
1808                                    .context("list_extended_attributes: wrong object id")
1809                            )
1810                        }
1811                        out.push(name.clone());
1812                    }
1813                    // Once we hit something that isn't an extended attribute key, we've gotten to
1814                    // the end.
1815                    _ => break,
1816                }
1817            }
1818            iter.advance().await?;
1819        }
1820        Ok(out)
1821    }
1822
1823    /// Looks up the values for the extended attribute `fio::SELINUX_CONTEXT_NAME`, returning it
1824    /// if it is found inline. If it is not inline, it will request use of the
1825    /// `get_extended_attributes` method. If the entry doesn't exist at all, returns None.
1826    pub async fn get_inline_selinux_context(&self) -> Result<Option<fio::SelinuxContext>, Error> {
1827        // This optimization is only useful as long as the attribute is smaller than inline sizes.
1828        // Avoid reading the data out of the attributes.
1829        const_assert!(fio::MAX_SELINUX_CONTEXT_ATTRIBUTE_LEN as usize <= MAX_INLINE_XATTR_SIZE);
1830        let item = match self
1831            .store()
1832            .tree()
1833            .find(&ObjectKey::extended_attribute(
1834                self.object_id(),
1835                fio::SELINUX_CONTEXT_NAME.into(),
1836            ))
1837            .await?
1838        {
1839            Some(item) => item,
1840            None => return Ok(None),
1841        };
1842        match item.value {
1843            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => {
1844                Ok(Some(fio::SelinuxContext::Data(value)))
1845            }
1846            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(_)) => {
1847                Ok(Some(fio::SelinuxContext::UseExtendedAttributes(fio::EmptyStruct {})))
1848            }
1849            _ => {
1850                bail!(
1851                    anyhow!(FxfsError::Inconsistent)
1852                        .context("get_inline_extended_attribute: Expected ExtendedAttribute value")
1853                )
1854            }
1855        }
1856    }
1857
1858    pub async fn get_extended_attribute(&self, name: Vec<u8>) -> Result<Vec<u8>, Error> {
1859        let item = self
1860            .store()
1861            .tree()
1862            .find(&ObjectKey::extended_attribute(self.object_id(), name))
1863            .await?
1864            .ok_or(FxfsError::NotFound)?;
1865        match item.value {
1866            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => Ok(value),
1867            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1868                Ok(self.read_attr(id).await?.ok_or(FxfsError::Inconsistent)?.into_vec())
1869            }
1870            _ => {
1871                bail!(
1872                    anyhow!(FxfsError::Inconsistent)
1873                        .context("get_extended_attribute: Expected ExtendedAttribute value")
1874                )
1875            }
1876        }
1877    }
1878
1879    pub async fn set_extended_attribute(
1880        &self,
1881        name: Vec<u8>,
1882        value: Vec<u8>,
1883        mode: SetExtendedAttributeMode,
1884    ) -> Result<(), Error> {
1885        let store = self.store();
1886        let fs = store.filesystem();
1887        // NB: We need to take this lock before we potentially look up the value to prevent racing
1888        // with another set.
1889        let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
1890        let mut transaction = fs.new_transaction(keys, Options::default()).await?;
1891        self.set_extended_attribute_impl(name, value, mode, &mut transaction).await?;
1892        transaction.commit().await?;
1893        Ok(())
1894    }
1895
1896    async fn set_extended_attribute_impl(
1897        &self,
1898        name: Vec<u8>,
1899        value: Vec<u8>,
1900        mode: SetExtendedAttributeMode,
1901        transaction: &mut Transaction<'_>,
1902    ) -> Result<(), Error> {
1903        ensure!(name.len() <= MAX_XATTR_NAME_SIZE, FxfsError::TooBig);
1904        ensure!(value.len() <= MAX_XATTR_VALUE_SIZE, FxfsError::TooBig);
1905        let tree = self.store().tree();
1906        let object_key = ObjectKey::extended_attribute(self.object_id(), name);
1907
1908        let existing_attribute_id = {
1909            let (found, existing_attribute_id) = match tree.find(&object_key).await? {
1910                None => (false, None),
1911                Some(Item { value, .. }) => (
1912                    true,
1913                    match value {
1914                        ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
1915                        ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1916                            Some(id)
1917                        }
1918                        _ => bail!(
1919                            anyhow!(FxfsError::Inconsistent)
1920                                .context("expected extended attribute value")
1921                        ),
1922                    },
1923                ),
1924            };
1925            match mode {
1926                SetExtendedAttributeMode::Create if found => {
1927                    bail!(FxfsError::AlreadyExists)
1928                }
1929                SetExtendedAttributeMode::Replace if !found => {
1930                    bail!(FxfsError::NotFound)
1931                }
1932                _ => (),
1933            }
1934            existing_attribute_id
1935        };
1936
1937        if let Some(attribute_id) = existing_attribute_id {
1938            // If we already have an attribute id allocated for this extended attribute, we always
1939            // use it, even if the value has shrunk enough to be stored inline. We don't need to
1940            // worry about trimming here for the same reason we don't need to worry about it when
1941            // we delete xattrs - they simply aren't large enough to ever need more than one
1942            // transaction.
1943            let _ = self.write_attr(transaction, attribute_id, &value).await?;
1944        } else if value.len() <= MAX_INLINE_XATTR_SIZE {
1945            transaction.add(
1946                self.store().store_object_id(),
1947                Mutation::replace_or_insert_object(
1948                    object_key,
1949                    ObjectValue::inline_extended_attribute(value),
1950                ),
1951            );
1952        } else {
1953            // If there isn't an existing attribute id and we are going to store the value in
1954            // an attribute, find the next empty attribute id in the range. We search for fxfs
1955            // attribute records specifically, instead of the extended attribute records, because
1956            // even if the extended attribute record is removed the attribute may not be fully
1957            // trimmed yet.
1958            let mut attribute_id = EXTENDED_ATTRIBUTE_RANGE_START;
1959            let layer_set = tree.layer_set();
1960            let mut merger = layer_set.merger();
1961            let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1962            let mut iter = merger.query(Query::FullRange(&key)).await?;
1963            loop {
1964                match iter.get() {
1965                    // None means the key passed to seek wasn't found. That means the first
1966                    // attribute is available and we can just stop right away.
1967                    None => break,
1968                    Some(ItemRef {
1969                        key: ObjectKey { object_id, data: ObjectKeyData::Attribute(attr_id, _) },
1970                        value,
1971                        ..
1972                    }) if *object_id == self.object_id() => {
1973                        if matches!(value, ObjectValue::None) {
1974                            // This attribute was once used but is now deleted, so it's safe to use
1975                            // again.
1976                            break;
1977                        }
1978                        if attribute_id < *attr_id {
1979                            // We found a gap - use it.
1980                            break;
1981                        } else if attribute_id == *attr_id {
1982                            // This attribute id is in use, try the next one.
1983                            attribute_id += 1;
1984                            if attribute_id == EXTENDED_ATTRIBUTE_RANGE_END {
1985                                bail!(FxfsError::NoSpace);
1986                            }
1987                        }
1988                        // If we don't hit either of those cases, we are still moving through the
1989                        // extent keys for the current attribute, so just keep advancing until the
1990                        // attribute id changes.
1991                    }
1992                    // As we are working our way through the iterator, if we hit anything that
1993                    // doesn't have our object id or attribute key data, we've gone past the end of
1994                    // this section and can stop.
1995                    _ => break,
1996                }
1997                iter.advance().await?;
1998            }
1999
2000            // We know this won't need trimming because it's a new attribute.
2001            let _ = self.write_attr(transaction, attribute_id, &value).await?;
2002            transaction.add(
2003                self.store().store_object_id(),
2004                Mutation::replace_or_insert_object(
2005                    object_key,
2006                    ObjectValue::extended_attribute(attribute_id),
2007                ),
2008            );
2009        }
2010
2011        Ok(())
2012    }
2013
2014    pub async fn remove_extended_attribute(&self, name: Vec<u8>) -> Result<(), Error> {
2015        let store = self.store();
2016        let tree = store.tree();
2017        let object_key = ObjectKey::extended_attribute(self.object_id(), name);
2018
2019        // NB: The API says we have to return an error if the attribute doesn't exist, so we have
2020        // to look it up first to make sure we have a record of it before we delete it. Make sure
2021        // we take a lock and make a transaction before we do so we don't race with other
2022        // operations.
2023        let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
2024        let mut transaction = store.filesystem().new_transaction(keys, Options::default()).await?;
2025
2026        let attribute_to_delete =
2027            match tree.find(&object_key).await?.ok_or(FxfsError::NotFound)?.value {
2028                ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => Some(id),
2029                ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
2030                _ => {
2031                    bail!(
2032                        anyhow!(FxfsError::Inconsistent)
2033                            .context("remove_extended_attribute: Expected ExtendedAttribute value")
2034                    )
2035                }
2036            };
2037
2038        transaction.add(
2039            store.store_object_id(),
2040            Mutation::replace_or_insert_object(object_key, ObjectValue::None),
2041        );
2042
2043        // If the attribute wasn't stored inline, we need to deallocate all the extents too. This
2044        // would normally need to interact with the graveyard for correctness - if there are too
2045        // many extents to delete to fit in a single transaction then we could potentially have
2046        // consistency issues. However, the maximum size of an extended attribute is small enough
2047        // that it will never come close to that limit even in the worst case, so we just delete
2048        // everything in one shot.
2049        if let Some(attribute_id) = attribute_to_delete {
2050            let trim_result = store
2051                .trim_some(
2052                    &mut transaction,
2053                    self.object_id(),
2054                    attribute_id,
2055                    TrimMode::FromOffset(0),
2056                )
2057                .await?;
2058            // In case you didn't read the comment above - this should not be used to delete
2059            // arbitrary attributes!
2060            assert_matches!(trim_result, TrimResult::Done(_));
2061            transaction.add(
2062                store.store_object_id(),
2063                Mutation::replace_or_insert_object(
2064                    ObjectKey::attribute(self.object_id, attribute_id, AttributeKey::Attribute),
2065                    ObjectValue::None,
2066                ),
2067            );
2068        }
2069
2070        transaction.commit().await?;
2071        Ok(())
2072    }
2073
2074    /// Returns a future that will pre-fetches the keys so as to avoid paying the performance
2075    /// penalty later. Must ensure that the object is not removed before the future completes.
2076    pub fn pre_fetch_keys(&self) -> Option<impl Future<Output = ()> + use<S>> {
2077        if let Encryption::CachedKeys = self.encryption {
2078            let owner = self.owner.clone();
2079            let object_id = self.object_id;
2080            Some(async move {
2081                let store = owner.as_ref().as_ref();
2082                if let Some(crypt) = store.crypt() {
2083                    let _ = store
2084                        .key_manager
2085                        .get_keys(
2086                            object_id,
2087                            crypt.as_ref(),
2088                            &mut Some(async || store.get_keys(object_id).await),
2089                            /* permanent= */ false,
2090                            /* force= */ false,
2091                        )
2092                        .await;
2093                }
2094            })
2095        } else {
2096            None
2097        }
2098    }
2099}
2100
2101impl<S: HandleOwner> Drop for StoreObjectHandle<S> {
2102    fn drop(&mut self) {
2103        if self.is_encrypted() {
2104            let _ = self.store().key_manager.remove(self.object_id);
2105        }
2106    }
2107}
2108
2109/// When truncating an object, sometimes it might not be possible to complete the transaction in a
2110/// single transaction, in which case the caller needs to finish trimming the object in subsequent
2111/// transactions (by calling ObjectStore::trim).
2112#[must_use]
2113pub struct NeedsTrim(pub bool);
2114
2115#[cfg(test)]
2116mod tests {
2117    use super::{ChecksumRangeChunk, OverwriteBitmaps};
2118    use crate::errors::FxfsError;
2119    use crate::filesystem::{FxFilesystem, OpenFxFilesystem};
2120    use crate::object_handle::ObjectHandle;
2121    use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
2122    use crate::object_store::transaction::{Mutation, Options, lock_keys};
2123    use crate::object_store::{
2124        AttributeKey, DataObjectHandle, Directory, FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions,
2125        LockKey, ObjectKey, ObjectStore, ObjectValue, SetExtendedAttributeMode, StoreObjectHandle,
2126    };
2127    use bit_vec::BitVec;
2128    use fuchsia_async as fasync;
2129    use futures::join;
2130    use std::sync::Arc;
2131    use storage_device::DeviceHolder;
2132    use storage_device::fake_device::FakeDevice;
2133
2134    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2135    const TEST_OBJECT_NAME: &str = "foo";
2136
2137    fn is_error(actual: anyhow::Error, expected: FxfsError) {
2138        assert_eq!(*actual.root_cause().downcast_ref::<FxfsError>().unwrap(), expected)
2139    }
2140
2141    async fn test_filesystem() -> OpenFxFilesystem {
2142        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2143        FxFilesystem::new_empty(device).await.expect("new_empty failed")
2144    }
2145
2146    async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
2147    {
2148        let fs = test_filesystem().await;
2149        let store = fs.root_store();
2150
2151        let mut transaction = fs
2152            .clone()
2153            .new_transaction(
2154                lock_keys![LockKey::object(
2155                    store.store_object_id(),
2156                    store.root_directory_object_id()
2157                )],
2158                Options::default(),
2159            )
2160            .await
2161            .expect("new_transaction failed");
2162
2163        let object =
2164            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2165                .await
2166                .expect("create_object failed");
2167
2168        let root_directory =
2169            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2170        root_directory
2171            .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
2172            .await
2173            .expect("add_child_file failed");
2174
2175        transaction.commit().await.expect("commit failed");
2176
2177        (fs, object)
2178    }
2179
2180    #[fuchsia::test(threads = 3)]
2181    async fn extended_attribute_double_remove() {
2182        // This test is intended to trip a potential race condition in remove. Removing an
2183        // attribute that doesn't exist is an error, so we need to check before we remove, but if
2184        // we aren't careful, two parallel removes might both succeed in the check and then both
2185        // remove the value.
2186        let (fs, object) = test_filesystem_and_empty_object().await;
2187        let basic = Arc::new(StoreObjectHandle::new(
2188            object.owner().clone(),
2189            object.object_id(),
2190            /* permanent_keys: */ false,
2191            HandleOptions::default(),
2192            false,
2193        ));
2194        let basic_a = basic.clone();
2195        let basic_b = basic.clone();
2196
2197        basic
2198            .set_extended_attribute(
2199                b"security.selinux".to_vec(),
2200                b"bar".to_vec(),
2201                SetExtendedAttributeMode::Set,
2202            )
2203            .await
2204            .expect("failed to set attribute");
2205
2206        // Try to remove the attribute twice at the same time. One should succeed in the race and
2207        // return Ok, and the other should fail the race and return NOT_FOUND.
2208        let a_task = fasync::Task::spawn(async move {
2209            basic_a.remove_extended_attribute(b"security.selinux".to_vec()).await
2210        });
2211        let b_task = fasync::Task::spawn(async move {
2212            basic_b.remove_extended_attribute(b"security.selinux".to_vec()).await
2213        });
2214        match join!(a_task, b_task) {
2215            (Ok(()), Ok(())) => panic!("both remove calls succeeded"),
2216            (Err(_), Err(_)) => panic!("both remove calls failed"),
2217
2218            (Ok(()), Err(e)) => is_error(e, FxfsError::NotFound),
2219            (Err(e), Ok(())) => is_error(e, FxfsError::NotFound),
2220        }
2221
2222        fs.close().await.expect("Close failed");
2223    }
2224
2225    #[fuchsia::test(threads = 3)]
2226    async fn extended_attribute_double_create() {
2227        // This test is intended to trip a potential race in set when using the create flag,
2228        // similar to above. If the create mode is set, we need to check that the attribute isn't
2229        // already created, but if two parallel creates both succeed in that check, and we aren't
2230        // careful with locking, they will both succeed and one will overwrite the other.
2231        let (fs, object) = test_filesystem_and_empty_object().await;
2232        let basic = Arc::new(StoreObjectHandle::new(
2233            object.owner().clone(),
2234            object.object_id(),
2235            /* permanent_keys: */ false,
2236            HandleOptions::default(),
2237            false,
2238        ));
2239        let basic_a = basic.clone();
2240        let basic_b = basic.clone();
2241
2242        // Try to set the attribute twice at the same time. One should succeed in the race and
2243        // return Ok, and the other should fail the race and return ALREADY_EXISTS.
2244        let a_task = fasync::Task::spawn(async move {
2245            basic_a
2246                .set_extended_attribute(
2247                    b"security.selinux".to_vec(),
2248                    b"one".to_vec(),
2249                    SetExtendedAttributeMode::Create,
2250                )
2251                .await
2252        });
2253        let b_task = fasync::Task::spawn(async move {
2254            basic_b
2255                .set_extended_attribute(
2256                    b"security.selinux".to_vec(),
2257                    b"two".to_vec(),
2258                    SetExtendedAttributeMode::Create,
2259                )
2260                .await
2261        });
2262        match join!(a_task, b_task) {
2263            (Ok(()), Ok(())) => panic!("both set calls succeeded"),
2264            (Err(_), Err(_)) => panic!("both set calls failed"),
2265
2266            (Ok(()), Err(e)) => {
2267                assert_eq!(
2268                    basic
2269                        .get_extended_attribute(b"security.selinux".to_vec())
2270                        .await
2271                        .expect("failed to get xattr"),
2272                    b"one"
2273                );
2274                is_error(e, FxfsError::AlreadyExists);
2275            }
2276            (Err(e), Ok(())) => {
2277                assert_eq!(
2278                    basic
2279                        .get_extended_attribute(b"security.selinux".to_vec())
2280                        .await
2281                        .expect("failed to get xattr"),
2282                    b"two"
2283                );
2284                is_error(e, FxfsError::AlreadyExists);
2285            }
2286        }
2287
2288        fs.close().await.expect("Close failed");
2289    }
2290
2291    struct TestAttr {
2292        name: Vec<u8>,
2293        value: Vec<u8>,
2294    }
2295
2296    impl TestAttr {
2297        fn new(name: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> Self {
2298            Self { name: name.as_ref().to_vec(), value: value.as_ref().to_vec() }
2299        }
2300        fn name(&self) -> Vec<u8> {
2301            self.name.clone()
2302        }
2303        fn value(&self) -> Vec<u8> {
2304            self.value.clone()
2305        }
2306    }
2307
2308    #[fuchsia::test]
2309    async fn extended_attributes() {
2310        let (fs, object) = test_filesystem_and_empty_object().await;
2311
2312        let test_attr = TestAttr::new(b"security.selinux", b"foo");
2313
2314        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2315        is_error(
2316            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2317            FxfsError::NotFound,
2318        );
2319
2320        object
2321            .set_extended_attribute(
2322                test_attr.name(),
2323                test_attr.value(),
2324                SetExtendedAttributeMode::Set,
2325            )
2326            .await
2327            .unwrap();
2328        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2329        assert_eq!(
2330            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2331            test_attr.value()
2332        );
2333
2334        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2335        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2336        is_error(
2337            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2338            FxfsError::NotFound,
2339        );
2340
2341        // Make sure we can object the same attribute being set again.
2342        object
2343            .set_extended_attribute(
2344                test_attr.name(),
2345                test_attr.value(),
2346                SetExtendedAttributeMode::Set,
2347            )
2348            .await
2349            .unwrap();
2350        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2351        assert_eq!(
2352            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2353            test_attr.value()
2354        );
2355
2356        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2357        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2358        is_error(
2359            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2360            FxfsError::NotFound,
2361        );
2362
2363        fs.close().await.expect("close failed");
2364    }
2365
2366    #[fuchsia::test]
2367    async fn large_extended_attribute() {
2368        let (fs, object) = test_filesystem_and_empty_object().await;
2369
2370        let test_attr = TestAttr::new(b"security.selinux", vec![3u8; 300]);
2371
2372        object
2373            .set_extended_attribute(
2374                test_attr.name(),
2375                test_attr.value(),
2376                SetExtendedAttributeMode::Set,
2377            )
2378            .await
2379            .unwrap();
2380        assert_eq!(
2381            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2382            test_attr.value()
2383        );
2384
2385        // Probe the fxfs attributes to make sure it did the expected thing. This relies on inside
2386        // knowledge of how the attribute id is chosen.
2387        assert_eq!(
2388            object
2389                .read_attr(64)
2390                .await
2391                .expect("read_attr failed")
2392                .expect("read_attr returned none")
2393                .into_vec(),
2394            test_attr.value()
2395        );
2396
2397        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2398        is_error(
2399            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2400            FxfsError::NotFound,
2401        );
2402
2403        // Make sure we can object the same attribute being set again.
2404        object
2405            .set_extended_attribute(
2406                test_attr.name(),
2407                test_attr.value(),
2408                SetExtendedAttributeMode::Set,
2409            )
2410            .await
2411            .unwrap();
2412        assert_eq!(
2413            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2414            test_attr.value()
2415        );
2416        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2417        is_error(
2418            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2419            FxfsError::NotFound,
2420        );
2421
2422        fs.close().await.expect("close failed");
2423    }
2424
2425    #[fuchsia::test]
2426    async fn multiple_extended_attributes() {
2427        let (fs, object) = test_filesystem_and_empty_object().await;
2428
2429        let attrs = [
2430            TestAttr::new(b"security.selinux", b"foo"),
2431            TestAttr::new(b"large.attribute", vec![3u8; 300]),
2432            TestAttr::new(b"an.attribute", b"asdf"),
2433            TestAttr::new(b"user.big", vec![5u8; 288]),
2434            TestAttr::new(b"user.tiny", b"smol"),
2435            TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2436            TestAttr::new(b"also big", vec![7u8; 500]),
2437            TestAttr::new(b"all.ones", vec![1u8; 11111]),
2438        ];
2439
2440        for i in 0..attrs.len() {
2441            object
2442                .set_extended_attribute(
2443                    attrs[i].name(),
2444                    attrs[i].value(),
2445                    SetExtendedAttributeMode::Set,
2446                )
2447                .await
2448                .unwrap();
2449            assert_eq!(
2450                object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2451                attrs[i].value()
2452            );
2453        }
2454
2455        for i in 0..attrs.len() {
2456            // Make sure expected attributes are still available.
2457            let mut found_attrs = object.list_extended_attributes().await.unwrap();
2458            let mut expected_attrs: Vec<Vec<u8>> = attrs.iter().skip(i).map(|a| a.name()).collect();
2459            found_attrs.sort();
2460            expected_attrs.sort();
2461            assert_eq!(found_attrs, expected_attrs);
2462            for j in i..attrs.len() {
2463                assert_eq!(
2464                    object.get_extended_attribute(attrs[j].name()).await.unwrap(),
2465                    attrs[j].value()
2466                );
2467            }
2468
2469            object.remove_extended_attribute(attrs[i].name()).await.expect("failed to remove");
2470            is_error(
2471                object.get_extended_attribute(attrs[i].name()).await.unwrap_err(),
2472                FxfsError::NotFound,
2473            );
2474        }
2475
2476        fs.close().await.expect("close failed");
2477    }
2478
2479    #[fuchsia::test]
2480    async fn multiple_extended_attributes_delete() {
2481        let (fs, object) = test_filesystem_and_empty_object().await;
2482        let store = object.owner().clone();
2483
2484        let attrs = [
2485            TestAttr::new(b"security.selinux", b"foo"),
2486            TestAttr::new(b"large.attribute", vec![3u8; 300]),
2487            TestAttr::new(b"an.attribute", b"asdf"),
2488            TestAttr::new(b"user.big", vec![5u8; 288]),
2489            TestAttr::new(b"user.tiny", b"smol"),
2490            TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2491            TestAttr::new(b"also big", vec![7u8; 500]),
2492            TestAttr::new(b"all.ones", vec![1u8; 11111]),
2493        ];
2494
2495        for i in 0..attrs.len() {
2496            object
2497                .set_extended_attribute(
2498                    attrs[i].name(),
2499                    attrs[i].value(),
2500                    SetExtendedAttributeMode::Set,
2501                )
2502                .await
2503                .unwrap();
2504            assert_eq!(
2505                object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2506                attrs[i].value()
2507            );
2508        }
2509
2510        // Unlink the file
2511        let root_directory =
2512            Directory::open(object.owner(), object.store().root_directory_object_id())
2513                .await
2514                .expect("open failed");
2515        let mut transaction = fs
2516            .clone()
2517            .new_transaction(
2518                lock_keys![
2519                    LockKey::object(store.store_object_id(), store.root_directory_object_id()),
2520                    LockKey::object(store.store_object_id(), object.object_id()),
2521                ],
2522                Options::default(),
2523            )
2524            .await
2525            .expect("new_transaction failed");
2526        crate::object_store::directory::replace_child(
2527            &mut transaction,
2528            None,
2529            (&root_directory, TEST_OBJECT_NAME),
2530        )
2531        .await
2532        .expect("replace_child failed");
2533        transaction.commit().await.unwrap();
2534        store.tombstone_object(object.object_id(), Options::default()).await.unwrap();
2535
2536        crate::fsck::fsck(fs.clone()).await.unwrap();
2537
2538        fs.close().await.expect("close failed");
2539    }
2540
2541    #[fuchsia::test]
2542    async fn extended_attribute_changing_sizes() {
2543        let (fs, object) = test_filesystem_and_empty_object().await;
2544
2545        let test_name = b"security.selinux";
2546        let test_small_attr = TestAttr::new(test_name, b"smol");
2547        let test_large_attr = TestAttr::new(test_name, vec![3u8; 300]);
2548
2549        object
2550            .set_extended_attribute(
2551                test_small_attr.name(),
2552                test_small_attr.value(),
2553                SetExtendedAttributeMode::Set,
2554            )
2555            .await
2556            .unwrap();
2557        assert_eq!(
2558            object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2559            test_small_attr.value()
2560        );
2561
2562        // With a small attribute, we don't expect it to write to an fxfs attribute.
2563        assert!(object.read_attr(64).await.expect("read_attr failed").is_none());
2564
2565        crate::fsck::fsck(fs.clone()).await.unwrap();
2566
2567        object
2568            .set_extended_attribute(
2569                test_large_attr.name(),
2570                test_large_attr.value(),
2571                SetExtendedAttributeMode::Set,
2572            )
2573            .await
2574            .unwrap();
2575        assert_eq!(
2576            object.get_extended_attribute(test_large_attr.name()).await.unwrap(),
2577            test_large_attr.value()
2578        );
2579
2580        // Once the value is above the threshold, we expect it to get upgraded to an fxfs
2581        // attribute.
2582        assert_eq!(
2583            object
2584                .read_attr(64)
2585                .await
2586                .expect("read_attr failed")
2587                .expect("read_attr returned none")
2588                .into_vec(),
2589            test_large_attr.value()
2590        );
2591
2592        crate::fsck::fsck(fs.clone()).await.unwrap();
2593
2594        object
2595            .set_extended_attribute(
2596                test_small_attr.name(),
2597                test_small_attr.value(),
2598                SetExtendedAttributeMode::Set,
2599            )
2600            .await
2601            .unwrap();
2602        assert_eq!(
2603            object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2604            test_small_attr.value()
2605        );
2606
2607        // Even though we are back under the threshold, we still expect it to be stored in an fxfs
2608        // attribute, because we don't downgrade to inline once we've allocated one.
2609        assert_eq!(
2610            object
2611                .read_attr(64)
2612                .await
2613                .expect("read_attr failed")
2614                .expect("read_attr returned none")
2615                .into_vec(),
2616            test_small_attr.value()
2617        );
2618
2619        crate::fsck::fsck(fs.clone()).await.unwrap();
2620
2621        object.remove_extended_attribute(test_small_attr.name()).await.expect("failed to remove");
2622
2623        crate::fsck::fsck(fs.clone()).await.unwrap();
2624
2625        fs.close().await.expect("close failed");
2626    }
2627
2628    #[fuchsia::test]
2629    async fn extended_attribute_max_size() {
2630        let (fs, object) = test_filesystem_and_empty_object().await;
2631
2632        let test_attr = TestAttr::new(
2633            vec![3u8; super::MAX_XATTR_NAME_SIZE],
2634            vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2635        );
2636
2637        object
2638            .set_extended_attribute(
2639                test_attr.name(),
2640                test_attr.value(),
2641                SetExtendedAttributeMode::Set,
2642            )
2643            .await
2644            .unwrap();
2645        assert_eq!(
2646            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2647            test_attr.value()
2648        );
2649        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2650        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2651
2652        fs.close().await.expect("close failed");
2653    }
2654
2655    #[fuchsia::test]
2656    async fn extended_attribute_remove_then_create() {
2657        let (fs, object) = test_filesystem_and_empty_object().await;
2658
2659        let test_attr = TestAttr::new(
2660            vec![3u8; super::MAX_XATTR_NAME_SIZE],
2661            vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2662        );
2663
2664        object
2665            .set_extended_attribute(
2666                test_attr.name(),
2667                test_attr.value(),
2668                SetExtendedAttributeMode::Create,
2669            )
2670            .await
2671            .unwrap();
2672        fs.journal().compact().await.unwrap();
2673        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2674        object
2675            .set_extended_attribute(
2676                test_attr.name(),
2677                test_attr.value(),
2678                SetExtendedAttributeMode::Create,
2679            )
2680            .await
2681            .unwrap();
2682
2683        assert_eq!(
2684            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2685            test_attr.value()
2686        );
2687
2688        fs.close().await.expect("close failed");
2689    }
2690
2691    #[fuchsia::test]
2692    async fn large_extended_attribute_max_number() {
2693        let (fs, object) = test_filesystem_and_empty_object().await;
2694
2695        let max_xattrs =
2696            super::EXTENDED_ATTRIBUTE_RANGE_END - super::EXTENDED_ATTRIBUTE_RANGE_START;
2697        for i in 0..max_xattrs {
2698            let test_attr = TestAttr::new(format!("{}", i).as_bytes(), vec![0x3; 300]);
2699            object
2700                .set_extended_attribute(
2701                    test_attr.name(),
2702                    test_attr.value(),
2703                    SetExtendedAttributeMode::Set,
2704                )
2705                .await
2706                .unwrap_or_else(|_| panic!("failed to set xattr number {}", i));
2707        }
2708
2709        // That should have taken up all the attributes we've allocated to extended attributes, so
2710        // this one should return ERR_NO_SPACE.
2711        match object
2712            .set_extended_attribute(
2713                b"one.too.many".to_vec(),
2714                vec![0x3; 300],
2715                SetExtendedAttributeMode::Set,
2716            )
2717            .await
2718        {
2719            Ok(()) => panic!("set should not succeed"),
2720            Err(e) => is_error(e, FxfsError::NoSpace),
2721        }
2722
2723        // But inline attributes don't need an attribute number, so it should work fine.
2724        object
2725            .set_extended_attribute(
2726                b"this.is.okay".to_vec(),
2727                b"small value".to_vec(),
2728                SetExtendedAttributeMode::Set,
2729            )
2730            .await
2731            .unwrap();
2732
2733        // And updating existing ones should be okay.
2734        object
2735            .set_extended_attribute(b"11".to_vec(), vec![0x4; 300], SetExtendedAttributeMode::Set)
2736            .await
2737            .unwrap();
2738        object
2739            .set_extended_attribute(
2740                b"12".to_vec(),
2741                vec![0x1; 300],
2742                SetExtendedAttributeMode::Replace,
2743            )
2744            .await
2745            .unwrap();
2746
2747        // And we should be able to remove an attribute and set another one.
2748        object.remove_extended_attribute(b"5".to_vec()).await.unwrap();
2749        object
2750            .set_extended_attribute(
2751                b"new attr".to_vec(),
2752                vec![0x3; 300],
2753                SetExtendedAttributeMode::Set,
2754            )
2755            .await
2756            .unwrap();
2757
2758        fs.close().await.expect("close failed");
2759    }
2760
2761    #[fuchsia::test]
2762    async fn write_attr_trims_beyond_new_end() {
2763        // When writing, multi_write will deallocate old extents that overlap with the new data,
2764        // but it doesn't trim anything beyond that, since it doesn't know what the total size will
2765        // be. write_attr does know, because it writes the whole attribute at once, so we need to
2766        // make sure it cleans up properly.
2767        let (fs, object) = test_filesystem_and_empty_object().await;
2768
2769        let block_size = fs.block_size();
2770        let buf_size = block_size * 2;
2771        let attribute_id = 10;
2772
2773        let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2774        let mut buffer = object.allocate_buffer(buf_size as usize).await;
2775        buffer.as_mut_slice().fill(3);
2776        // Writing two separate ranges, even if they are contiguous, forces them to be separate
2777        // extent records.
2778        object
2779            .multi_write(
2780                &mut transaction,
2781                attribute_id,
2782                &[0..block_size, block_size..block_size * 2],
2783                buffer.as_mut(),
2784            )
2785            .await
2786            .unwrap();
2787        transaction.add(
2788            object.store().store_object_id,
2789            Mutation::replace_or_insert_object(
2790                ObjectKey::attribute(object.object_id(), attribute_id, AttributeKey::Attribute),
2791                ObjectValue::attribute(block_size * 2, false),
2792            ),
2793        );
2794        transaction.commit().await.unwrap();
2795
2796        crate::fsck::fsck(fs.clone()).await.unwrap();
2797
2798        let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2799        let needs_trim = (*object)
2800            .write_attr(&mut transaction, attribute_id, &vec![3u8; block_size as usize])
2801            .await
2802            .unwrap();
2803        assert!(!needs_trim.0);
2804        transaction.commit().await.unwrap();
2805
2806        crate::fsck::fsck(fs.clone()).await.unwrap();
2807
2808        fs.close().await.expect("close failed");
2809    }
2810
2811    #[fuchsia::test]
2812    async fn write_new_attr_in_batches_multiple_txns() {
2813        let (fs, object) = test_filesystem_and_empty_object().await;
2814        let merkle_tree = vec![1; 3 * WRITE_ATTR_BATCH_SIZE];
2815        let mut transaction =
2816            (*object).new_transaction(FSVERITY_MERKLE_ATTRIBUTE_ID).await.unwrap();
2817        object
2818            .write_new_attr_in_batches(
2819                &mut transaction,
2820                FSVERITY_MERKLE_ATTRIBUTE_ID,
2821                &merkle_tree,
2822                WRITE_ATTR_BATCH_SIZE,
2823            )
2824            .await
2825            .expect("failed to write merkle attribute");
2826
2827        transaction.add(
2828            object.store().store_object_id,
2829            Mutation::replace_or_insert_object(
2830                ObjectKey::graveyard_attribute_entry(
2831                    object.store().graveyard_directory_object_id(),
2832                    object.object_id(),
2833                    FSVERITY_MERKLE_ATTRIBUTE_ID,
2834                ),
2835                ObjectValue::None,
2836            ),
2837        );
2838        transaction.commit().await.unwrap();
2839        assert_eq!(
2840            object.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
2841            Some(merkle_tree.into())
2842        );
2843
2844        fs.close().await.expect("close failed");
2845    }
2846
2847    // Running on target only, to use fake time features in the executor.
2848    #[cfg(target_os = "fuchsia")]
2849    #[fuchsia::test(allow_stalls = false)]
2850    async fn test_watchdog() {
2851        use super::Watchdog;
2852        use fuchsia_async::{MonotonicDuration, MonotonicInstant, TestExecutor};
2853        use std::sync::mpsc::channel;
2854
2855        TestExecutor::advance_to(make_time(0)).await;
2856        let (sender, receiver) = channel();
2857
2858        fn make_time(time_secs: i64) -> MonotonicInstant {
2859            MonotonicInstant::from_nanos(0) + MonotonicDuration::from_seconds(time_secs)
2860        }
2861
2862        {
2863            let _watchdog = Watchdog::new(10, move |count| {
2864                sender.send(count).expect("Sending value");
2865            });
2866
2867            // Too early.
2868            TestExecutor::advance_to(make_time(5)).await;
2869            receiver.try_recv().expect_err("Should not have message");
2870
2871            // First message.
2872            TestExecutor::advance_to(make_time(10)).await;
2873            assert_eq!(1, receiver.recv().expect("Receiving"));
2874
2875            // Too early for the next.
2876            TestExecutor::advance_to(make_time(15)).await;
2877            receiver.try_recv().expect_err("Should not have message");
2878
2879            // Missed one. They'll be spooled up.
2880            TestExecutor::advance_to(make_time(30)).await;
2881            assert_eq!(2, receiver.recv().expect("Receiving"));
2882            assert_eq!(3, receiver.recv().expect("Receiving"));
2883        }
2884
2885        // Watchdog is dropped, nothing should trigger.
2886        TestExecutor::advance_to(make_time(100)).await;
2887        receiver.recv().expect_err("Watchdog should be gone");
2888    }
2889
2890    #[fuchsia::test]
2891    fn test_checksum_range_chunk() {
2892        let block_size = 4096;
2893
2894        // No bitmap means one chunk that covers the whole range
2895        assert_eq!(
2896            ChecksumRangeChunk::group_first_write_ranges(
2897                &mut OverwriteBitmaps::None,
2898                block_size,
2899                block_size * 2..block_size * 5,
2900            ),
2901            vec![ChecksumRangeChunk {
2902                checksum_range: 0..3,
2903                device_range: block_size * 2..block_size * 5,
2904                is_first_write: false,
2905            }],
2906        );
2907
2908        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2909        assert_eq!(
2910            ChecksumRangeChunk::group_first_write_ranges(
2911                &mut bitmaps,
2912                block_size,
2913                block_size * 2..block_size * 5,
2914            ),
2915            vec![ChecksumRangeChunk {
2916                checksum_range: 0..3,
2917                device_range: block_size * 2..block_size * 5,
2918                is_first_write: false,
2919            }],
2920        );
2921        assert_eq!(
2922            bitmaps.take_bitmaps(),
2923            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b11100000])))
2924        );
2925
2926        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2927        bitmaps.set_offset(2);
2928        assert_eq!(
2929            ChecksumRangeChunk::group_first_write_ranges(
2930                &mut bitmaps,
2931                block_size,
2932                block_size * 2..block_size * 5,
2933            ),
2934            vec![
2935                ChecksumRangeChunk {
2936                    checksum_range: 0..2,
2937                    device_range: block_size * 2..block_size * 4,
2938                    is_first_write: false,
2939                },
2940                ChecksumRangeChunk {
2941                    checksum_range: 2..3,
2942                    device_range: block_size * 4..block_size * 5,
2943                    is_first_write: true,
2944                },
2945            ],
2946        );
2947        assert_eq!(
2948            bitmaps.take_bitmaps(),
2949            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00111000])))
2950        );
2951
2952        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2953        bitmaps.set_offset(4);
2954        assert_eq!(
2955            ChecksumRangeChunk::group_first_write_ranges(
2956                &mut bitmaps,
2957                block_size,
2958                block_size * 2..block_size * 5,
2959            ),
2960            vec![ChecksumRangeChunk {
2961                checksum_range: 0..3,
2962                device_range: block_size * 2..block_size * 5,
2963                is_first_write: true,
2964            }],
2965        );
2966        assert_eq!(
2967            bitmaps.take_bitmaps(),
2968            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00001110])))
2969        );
2970
2971        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b01010101]));
2972        assert_eq!(
2973            ChecksumRangeChunk::group_first_write_ranges(
2974                &mut bitmaps,
2975                block_size,
2976                block_size * 2..block_size * 10,
2977            ),
2978            vec![
2979                ChecksumRangeChunk {
2980                    checksum_range: 0..1,
2981                    device_range: block_size * 2..block_size * 3,
2982                    is_first_write: true,
2983                },
2984                ChecksumRangeChunk {
2985                    checksum_range: 1..2,
2986                    device_range: block_size * 3..block_size * 4,
2987                    is_first_write: false,
2988                },
2989                ChecksumRangeChunk {
2990                    checksum_range: 2..3,
2991                    device_range: block_size * 4..block_size * 5,
2992                    is_first_write: true,
2993                },
2994                ChecksumRangeChunk {
2995                    checksum_range: 3..4,
2996                    device_range: block_size * 5..block_size * 6,
2997                    is_first_write: false,
2998                },
2999                ChecksumRangeChunk {
3000                    checksum_range: 4..5,
3001                    device_range: block_size * 6..block_size * 7,
3002                    is_first_write: true,
3003                },
3004                ChecksumRangeChunk {
3005                    checksum_range: 5..6,
3006                    device_range: block_size * 7..block_size * 8,
3007                    is_first_write: false,
3008                },
3009                ChecksumRangeChunk {
3010                    checksum_range: 6..7,
3011                    device_range: block_size * 8..block_size * 9,
3012                    is_first_write: true,
3013                },
3014                ChecksumRangeChunk {
3015                    checksum_range: 7..8,
3016                    device_range: block_size * 9..block_size * 10,
3017                    is_first_write: false,
3018                },
3019            ],
3020        );
3021        assert_eq!(
3022            bitmaps.take_bitmaps(),
3023            Some((BitVec::from_bytes(&[0b01010101]), BitVec::from_bytes(&[0b11111111])))
3024        );
3025    }
3026}