Skip to main content

fxfs/object_store/
store_object_handle.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::{Checksum, Checksums, fletcher64};
6use crate::errors::FxfsError;
7use crate::log::*;
8use crate::lsm_tree::Query;
9use crate::lsm_tree::merge::MergerIterator;
10use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
11use crate::object_handle::ObjectHandle;
12use crate::object_store::extent_record::{ExtentKey, ExtentMode, ExtentValue};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::object_record::{
15    AttributeKey, ExtendedAttributeValue, ObjectAttributes, ObjectItem, ObjectKey, ObjectKeyData,
16    ObjectValue, Timestamp,
17};
18use crate::object_store::transaction::{
19    AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Options, ReadGuard,
20    Transaction, lock_keys,
21};
22use crate::object_store::{
23    HandleOptions, HandleOwner, ObjectStore, TrimMode, TrimResult, VOLUME_DATA_KEY_ID,
24};
25use crate::range::RangeExt;
26use crate::round::{round_down, round_up};
27use anyhow::{Context, Error, anyhow, bail, ensure};
28use assert_matches::assert_matches;
29use bit_vec::BitVec;
30use futures::stream::{FuturesOrdered, FuturesUnordered};
31use futures::{TryStreamExt, try_join};
32use fxfs_crypto::{
33    Cipher, CipherHolder, CipherSet, EncryptionKey, FindKeyResult, FxfsCipher, KeyPurpose,
34};
35use fxfs_trace::{TraceFutureExt, trace, trace_future_args};
36use static_assertions::const_assert;
37use std::cmp::min;
38use std::future::Future;
39use std::ops::Range;
40use std::sync::Arc;
41use std::sync::atomic::{self, AtomicBool, Ordering};
42use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
43use storage_device::{InlineCryptoOptions, ReadOptions, WriteOptions};
44
45use fidl_fuchsia_io as fio;
46use fuchsia_async as fasync;
47
48/// Maximum size for an extended attribute name.
49pub const MAX_XATTR_NAME_SIZE: usize = 255;
50/// Maximum size an extended attribute can be before it's stored in an object attribute instead of
51/// inside the record directly.
52pub const MAX_INLINE_XATTR_SIZE: usize = 256;
53/// Maximum size for an extended attribute value. NB: the maximum size for an extended attribute is
54/// 64kB, which we rely on for correctness when deleting attributes, so ensure it's always
55/// enforced.
56pub const MAX_XATTR_VALUE_SIZE: usize = 64000;
57/// The range of fxfs attribute ids which are reserved for extended attribute values. Whenever a
58/// new attribute is needed, the first unused id will be chosen from this range. It's technically
59/// safe to change these values, but it has potential consequences - they are only used during id
60/// selection, so any existing extended attributes keep their ids, which means any past or present
61/// selected range here could potentially have used attributes unless they are explicitly migrated,
62/// which isn't currently done.
63pub const EXTENDED_ATTRIBUTE_RANGE_START: u64 = 64;
64pub const EXTENDED_ATTRIBUTE_RANGE_END: u64 = 512;
65
66/// Zeroes blocks in 'buffer' based on `bitmap`, one bit per block from start of buffer.
67fn apply_bitmap_zeroing(
68    block_size: usize,
69    bitmap: &bit_vec::BitVec,
70    mut buffer: MutableBufferRef<'_>,
71) {
72    let buf = buffer.as_mut_slice();
73    debug_assert_eq!(bitmap.len() * block_size, buf.len());
74    for (i, block) in bitmap.iter().enumerate() {
75        if !block {
76            let start = i * block_size;
77            buf[start..start + block_size].fill(0);
78        }
79    }
80}
81
82/// When writing, often the logic should be generic over whether or not checksums are generated.
83/// This provides that and a handy way to convert to the more general ExtentMode that eventually
84/// stores it on disk.
85#[derive(Debug, Clone, PartialEq)]
86pub enum MaybeChecksums {
87    None,
88    Fletcher(Vec<Checksum>),
89}
90
91impl MaybeChecksums {
92    pub fn maybe_as_ref(&self) -> Option<&[Checksum]> {
93        match self {
94            Self::None => None,
95            Self::Fletcher(sums) => Some(&sums),
96        }
97    }
98
99    pub fn split_off(&mut self, at: usize) -> Self {
100        match self {
101            Self::None => Self::None,
102            Self::Fletcher(sums) => Self::Fletcher(sums.split_off(at)),
103        }
104    }
105
106    pub fn to_mode(self) -> ExtentMode {
107        match self {
108            Self::None => ExtentMode::Raw,
109            Self::Fletcher(sums) => ExtentMode::Cow(Checksums::fletcher(sums)),
110        }
111    }
112
113    pub fn into_option(self) -> Option<Vec<Checksum>> {
114        match self {
115            Self::None => None,
116            Self::Fletcher(sums) => Some(sums),
117        }
118    }
119}
120
121/// The mode of operation when setting extended attributes. This is the same as the fidl definition
122/// but is replicated here so we don't have fuchsia.io structures in the api, so this can be used
123/// on host.
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125pub enum SetExtendedAttributeMode {
126    /// Create the extended attribute if it doesn't exist, replace the value if it does.
127    Set,
128    /// Create the extended attribute if it doesn't exist, fail if it does.
129    Create,
130    /// Replace the extended attribute value if it exists, fail if it doesn't.
131    Replace,
132}
133
134impl From<fio::SetExtendedAttributeMode> for SetExtendedAttributeMode {
135    fn from(other: fio::SetExtendedAttributeMode) -> SetExtendedAttributeMode {
136        match other {
137            fio::SetExtendedAttributeMode::Set => SetExtendedAttributeMode::Set,
138            fio::SetExtendedAttributeMode::Create => SetExtendedAttributeMode::Create,
139            fio::SetExtendedAttributeMode::Replace => SetExtendedAttributeMode::Replace,
140        }
141    }
142}
143
144enum Encryption {
145    /// The object doesn't use encryption.
146    None,
147
148    /// The object has keys that are cached (which means unwrapping occurs on-demand) with
149    /// KeyManager.
150    CachedKeys,
151
152    /// The object has permanent keys registered with KeyManager.
153    PermanentKeys,
154}
155
156#[derive(PartialEq, Debug)]
157enum OverwriteBitmaps {
158    None,
159    Some {
160        /// The block bitmap of a partial overwrite extent in the tree.
161        extent_bitmap: BitVec,
162        /// A bitmap of the blocks written to by the current overwrite.
163        write_bitmap: BitVec,
164        /// BitVec doesn't have a slice equivalent, so for a particular section of the write we
165        /// keep track of an offset in the bitmaps to operate on.
166        bitmap_offset: usize,
167    },
168}
169
170impl OverwriteBitmaps {
171    fn new(extent_bitmap: BitVec) -> Self {
172        OverwriteBitmaps::Some {
173            write_bitmap: BitVec::from_elem(extent_bitmap.len(), false),
174            extent_bitmap,
175            bitmap_offset: 0,
176        }
177    }
178
179    fn is_none(&self) -> bool {
180        *self == OverwriteBitmaps::None
181    }
182
183    fn set_offset(&mut self, new_offset: usize) {
184        match self {
185            OverwriteBitmaps::None => (),
186            OverwriteBitmaps::Some { bitmap_offset, .. } => *bitmap_offset = new_offset,
187        }
188    }
189
190    fn get_from_extent_bitmap(&self, i: usize) -> Option<bool> {
191        match self {
192            OverwriteBitmaps::None => None,
193            OverwriteBitmaps::Some { extent_bitmap, bitmap_offset, .. } => {
194                extent_bitmap.get(*bitmap_offset + i)
195            }
196        }
197    }
198
199    fn set_in_write_bitmap(&mut self, i: usize, x: bool) {
200        match self {
201            OverwriteBitmaps::None => (),
202            OverwriteBitmaps::Some { write_bitmap, bitmap_offset, .. } => {
203                write_bitmap.set(*bitmap_offset + i, x)
204            }
205        }
206    }
207
208    fn take_bitmaps(self) -> Option<(BitVec, BitVec)> {
209        match self {
210            OverwriteBitmaps::None => None,
211            OverwriteBitmaps::Some { extent_bitmap, write_bitmap, .. } => {
212                Some((extent_bitmap, write_bitmap))
213            }
214        }
215    }
216}
217
218/// When writing to Overwrite ranges, we need to emit whether a set of checksums for a device range
219/// is the first write to that region or not. This tracks one such range so we can use it after the
220/// write to break up the returned checksum list.
221#[derive(PartialEq, Debug)]
222struct ChecksumRangeChunk {
223    checksum_range: Range<usize>,
224    device_range: Range<u64>,
225    is_first_write: bool,
226}
227
228impl ChecksumRangeChunk {
229    fn group_first_write_ranges(
230        bitmaps: &mut OverwriteBitmaps,
231        block_size: u64,
232        write_device_range: Range<u64>,
233    ) -> Vec<ChecksumRangeChunk> {
234        let write_block_len = (write_device_range.length().unwrap() / block_size) as usize;
235        if bitmaps.is_none() {
236            // If there is no bitmap, then the overwrite range is fully written to. However, we
237            // could still be within the journal flush window where one of the blocks was written
238            // to for the first time to put it in this state, so we still need to emit the
239            // checksums in case replay needs them.
240            vec![ChecksumRangeChunk {
241                checksum_range: 0..write_block_len,
242                device_range: write_device_range,
243                is_first_write: false,
244            }]
245        } else {
246            let mut checksum_ranges = vec![ChecksumRangeChunk {
247                checksum_range: 0..0,
248                device_range: write_device_range.start..write_device_range.start,
249                is_first_write: !bitmaps.get_from_extent_bitmap(0).unwrap(),
250            }];
251            let mut working_range = checksum_ranges.last_mut().unwrap();
252            for i in 0..write_block_len {
253                bitmaps.set_in_write_bitmap(i, true);
254
255                // bitmap.get returning true means the block is initialized and therefore has been
256                // written to before.
257                if working_range.is_first_write != bitmaps.get_from_extent_bitmap(i).unwrap() {
258                    // is_first_write is tracking opposite of what comes back from the bitmap, so
259                    // if the are still opposites we continue our current range.
260                    working_range.checksum_range.end += 1;
261                    working_range.device_range.end += block_size;
262                } else {
263                    // If they are the same, then we need to make a new chunk.
264                    let new_chunk = ChecksumRangeChunk {
265                        checksum_range: working_range.checksum_range.end
266                            ..working_range.checksum_range.end + 1,
267                        device_range: working_range.device_range.end
268                            ..working_range.device_range.end + block_size,
269                        is_first_write: !working_range.is_first_write,
270                    };
271                    checksum_ranges.push(new_chunk);
272                    working_range = checksum_ranges.last_mut().unwrap();
273                }
274            }
275            checksum_ranges
276        }
277    }
278}
279
280/// StoreObjectHandle is the lowest-level, untyped handle to an object with the id [`object_id`] in
281/// a particular store, [`owner`]. It provides functionality shared across all objects, such as
282/// reading and writing attributes and managing encryption keys.
283///
284/// Since it's untyped, it doesn't do any object kind validation, and is generally meant to
285/// implement higher-level typed handles.
286///
287/// For file-like objects with a data attribute, DataObjectHandle implements traits and helpers for
288/// doing more complex extent management and caches the content size.
289///
290/// For directory-like objects, Directory knows how to add and remove child objects and enumerate
291/// its children.
292pub struct StoreObjectHandle<S: HandleOwner> {
293    owner: Arc<S>,
294    object_id: u64,
295    options: HandleOptions,
296    trace: AtomicBool,
297    encryption: Encryption,
298}
299
300impl<S: HandleOwner> ObjectHandle for StoreObjectHandle<S> {
301    fn set_trace(&self, v: bool) {
302        info!(store_id = self.store().store_object_id, oid = self.object_id(), trace = v; "trace");
303        self.trace.store(v, atomic::Ordering::Relaxed);
304    }
305
306    fn object_id(&self) -> u64 {
307        return self.object_id;
308    }
309
310    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
311        self.store().device.allocate_buffer(size)
312    }
313
314    fn block_size(&self) -> u64 {
315        self.store().block_size()
316    }
317}
318
319struct Watchdog {
320    _task: fasync::Task<()>,
321}
322
323impl Watchdog {
324    fn new(increment_seconds: u64, cb: impl Fn(u64) + Send + 'static) -> Self {
325        Self {
326            _task: fasync::Task::spawn(
327                async move {
328                    let increment = increment_seconds.try_into().unwrap();
329                    let mut fired_counter = 0;
330                    let mut next_wake = fasync::MonotonicInstant::now();
331                    loop {
332                        next_wake += std::time::Duration::from_secs(increment).into();
333                        // If this isn't being scheduled this will purposely result in fast looping
334                        // when it does. This will be insightful about the state of the thread and
335                        // task scheduling.
336                        if fasync::MonotonicInstant::now() < next_wake {
337                            fasync::Timer::new(next_wake).await;
338                        }
339                        fired_counter += 1;
340                        cb(fired_counter);
341                    }
342                }
343                .trace(trace_future_args!("StoreObjectHandle::Watchdog")),
344            ),
345        }
346    }
347}
348
349impl<S: HandleOwner> StoreObjectHandle<S> {
350    /// Make a new StoreObjectHandle for the object with id [`object_id`] in store [`owner`].
351    pub fn new(
352        owner: Arc<S>,
353        object_id: u64,
354        permanent_keys: bool,
355        options: HandleOptions,
356        trace: bool,
357    ) -> Self {
358        let encryption = if permanent_keys {
359            Encryption::PermanentKeys
360        } else if owner.as_ref().as_ref().is_encrypted() {
361            Encryption::CachedKeys
362        } else {
363            Encryption::None
364        };
365        Self { owner, object_id, encryption, options, trace: AtomicBool::new(trace) }
366    }
367
368    pub fn owner(&self) -> &Arc<S> {
369        &self.owner
370    }
371
372    pub fn store(&self) -> &ObjectStore {
373        self.owner.as_ref().as_ref()
374    }
375
376    pub fn trace(&self) -> bool {
377        self.trace.load(atomic::Ordering::Relaxed)
378    }
379
380    pub fn is_encrypted(&self) -> bool {
381        !matches!(self.encryption, Encryption::None)
382    }
383
384    /// Get the default set of transaction options for this object. This is mostly the overall
385    /// default, modified by any [`HandleOptions`] held by this handle.
386    pub fn default_transaction_options<'b>(&self) -> Options<'b> {
387        Options { skip_journal_checks: self.options.skip_journal_checks, ..Default::default() }
388    }
389
390    pub async fn new_transaction_with_options<'b>(
391        &self,
392        attribute_id: u64,
393        options: Options<'b>,
394    ) -> Result<Transaction<'b>, Error> {
395        Ok(self
396            .store()
397            .filesystem()
398            .new_transaction(
399                lock_keys![
400                    LockKey::object_attribute(
401                        self.store().store_object_id(),
402                        self.object_id(),
403                        attribute_id,
404                    ),
405                    LockKey::object(self.store().store_object_id(), self.object_id()),
406                ],
407                options,
408            )
409            .await?)
410    }
411
412    pub async fn new_transaction<'b>(&self, attribute_id: u64) -> Result<Transaction<'b>, Error> {
413        self.new_transaction_with_options(attribute_id, self.default_transaction_options()).await
414    }
415
416    // If |transaction| has an impending mutation for the underlying object, returns that.
417    // Otherwise, looks up the object from the tree.
418    async fn txn_get_object_mutation(
419        &self,
420        transaction: &Transaction<'_>,
421    ) -> Result<ObjectStoreMutation, Error> {
422        self.store().txn_get_object_mutation(transaction, self.object_id()).await
423    }
424
425    // Returns the amount deallocated.
426    async fn deallocate_old_extents(
427        &self,
428        transaction: &mut Transaction<'_>,
429        attribute_id: u64,
430        range: Range<u64>,
431    ) -> Result<u64, Error> {
432        let block_size = self.block_size();
433        assert_eq!(range.start % block_size, 0);
434        assert_eq!(range.end % block_size, 0);
435        if range.start == range.end {
436            return Ok(0);
437        }
438        let tree = &self.store().tree;
439        let layer_set = tree.layer_set();
440        let key = ExtentKey { range };
441        let lower_bound = ObjectKey::attribute(
442            self.object_id(),
443            attribute_id,
444            AttributeKey::Extent(key.search_key()),
445        );
446        let mut merger = layer_set.merger();
447        let mut iter = merger.query(Query::FullRange(&lower_bound)).await?;
448        let allocator = self.store().allocator();
449        let mut deallocated = 0;
450        let trace = self.trace();
451        while let Some(ItemRef {
452            key:
453                ObjectKey {
454                    object_id,
455                    data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
456                },
457            value: ObjectValue::Extent(value),
458            ..
459        }) = iter.get()
460        {
461            if *object_id != self.object_id() || *attr_id != attribute_id {
462                break;
463            }
464            if let ExtentValue::Some { device_offset, .. } = value {
465                if let Some(overlap) = key.overlap(extent_key) {
466                    let range = device_offset + overlap.start - extent_key.range.start
467                        ..device_offset + overlap.end - extent_key.range.start;
468                    ensure!(range.is_aligned(block_size), FxfsError::Inconsistent);
469                    if trace {
470                        info!(
471                            store_id = self.store().store_object_id(),
472                            oid = self.object_id(),
473                            device_range:? = range,
474                            len = range.end - range.start,
475                            extent_key:?;
476                            "D",
477                        );
478                    }
479                    allocator
480                        .deallocate(transaction, self.store().store_object_id(), range)
481                        .await?;
482                    deallocated += overlap.end - overlap.start;
483                } else {
484                    break;
485                }
486            }
487            iter.advance().await?;
488        }
489        Ok(deallocated)
490    }
491
492    // Writes aligned data (that should already be encrypted) to the given offset and computes
493    // checksums if requested. The aligned data must be from a single logical file range.
494    async fn write_aligned(
495        &self,
496        buf: BufferRef<'_>,
497        device_offset: u64,
498        crypt_ctx: Option<(u32, u8)>,
499    ) -> Result<MaybeChecksums, Error> {
500        if self.trace() {
501            info!(
502                store_id = self.store().store_object_id(),
503                oid = self.object_id(),
504                device_range:? = (device_offset..device_offset + buf.len() as u64),
505                len = buf.len();
506                "W",
507            );
508        }
509        let store = self.store();
510        store.device_write_ops.fetch_add(1, Ordering::Relaxed);
511        let mut checksums = Vec::new();
512        let _watchdog = Watchdog::new(10, |count| {
513            warn!("Write has been stalled for {} seconds", count * 10);
514        });
515
516        match crypt_ctx {
517            Some((dun, slot)) => {
518                if !store.filesystem().options().barriers_enabled {
519                    return Err(anyhow!(FxfsError::InvalidArgs)
520                        .context("Barriers must be enabled for inline encrypted writes."));
521                }
522                store
523                    .device
524                    .write_with_opts(
525                        device_offset as u64,
526                        buf,
527                        WriteOptions {
528                            inline_crypto: InlineCryptoOptions::enabled(slot, dun),
529                            ..Default::default()
530                        },
531                    )
532                    .await?;
533                Ok(MaybeChecksums::None)
534            }
535            None => {
536                if self.options.skip_checksums {
537                    store
538                        .device
539                        .write_with_opts(device_offset as u64, buf, WriteOptions::default())
540                        .await?;
541                    Ok(MaybeChecksums::None)
542                } else {
543                    try_join!(store.device.write(device_offset, buf), async {
544                        let block_size = self.block_size();
545                        for chunk in buf.as_slice().chunks_exact(block_size as usize) {
546                            checksums.push(fletcher64(chunk, 0));
547                        }
548                        Ok(())
549                    })?;
550                    Ok(MaybeChecksums::Fletcher(checksums))
551                }
552            }
553        }
554    }
555
556    /// Flushes the underlying device.  This is expensive and should be used sparingly.
557    pub async fn flush_device(&self) -> Result<(), Error> {
558        self.store().device().flush().await
559    }
560
561    pub async fn update_allocated_size(
562        &self,
563        transaction: &mut Transaction<'_>,
564        allocated: u64,
565        deallocated: u64,
566    ) -> Result<(), Error> {
567        if allocated == deallocated {
568            return Ok(());
569        }
570        let mut mutation = self.txn_get_object_mutation(transaction).await?;
571        if let ObjectValue::Object {
572            attributes: ObjectAttributes { project_id, allocated_size, .. },
573            ..
574        } = &mut mutation.item.value
575        {
576            // The only way for these to fail are if the volume is inconsistent.
577            *allocated_size = allocated_size
578                .checked_add(allocated)
579                .ok_or_else(|| anyhow!(FxfsError::Inconsistent).context("Allocated size overflow"))?
580                .checked_sub(deallocated)
581                .ok_or_else(|| {
582                    anyhow!(FxfsError::Inconsistent).context("Allocated size underflow")
583                })?;
584
585            if *project_id != 0 {
586                // The allocated and deallocated shouldn't exceed the max size of the file which is
587                // bound within i64.
588                let diff = i64::try_from(allocated).unwrap() - i64::try_from(deallocated).unwrap();
589                transaction.add(
590                    self.store().store_object_id(),
591                    Mutation::merge_object(
592                        ObjectKey::project_usage(
593                            self.store().root_directory_object_id(),
594                            *project_id,
595                        ),
596                        ObjectValue::BytesAndNodes { bytes: diff, nodes: 0 },
597                    ),
598                );
599            }
600        } else {
601            // This can occur when the object mutation is created from an object in the tree which
602            // was corrupt.
603            bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
604        }
605        transaction.add(self.store().store_object_id, Mutation::ObjectStore(mutation));
606        Ok(())
607    }
608
609    pub async fn update_attributes<'a>(
610        &self,
611        transaction: &mut Transaction<'a>,
612        node_attributes: Option<&fio::MutableNodeAttributes>,
613        change_time: Option<Timestamp>,
614    ) -> Result<(), Error> {
615        if let Some(&fio::MutableNodeAttributes { selinux_context: Some(ref context), .. }) =
616            node_attributes
617        {
618            if let fio::SelinuxContext::Data(context) = context {
619                self.set_extended_attribute_impl(
620                    "security.selinux".into(),
621                    context.clone(),
622                    SetExtendedAttributeMode::Set,
623                    transaction,
624                )
625                .await?;
626            } else {
627                return Err(anyhow!(FxfsError::InvalidArgs)
628                    .context("Only set SELinux context with `data` member."));
629            }
630        }
631        self.store()
632            .update_attributes(transaction, self.object_id, node_attributes, change_time)
633            .await
634    }
635
636    /// Zeroes the given range.  The range must be aligned.  Returns the amount of data deallocated.
637    pub async fn zero(
638        &self,
639        transaction: &mut Transaction<'_>,
640        attribute_id: u64,
641        range: Range<u64>,
642    ) -> Result<(), Error> {
643        let deallocated =
644            self.deallocate_old_extents(transaction, attribute_id, range.clone()).await?;
645        if deallocated > 0 {
646            self.update_allocated_size(transaction, 0, deallocated).await?;
647            transaction.add(
648                self.store().store_object_id,
649                Mutation::merge_object(
650                    ObjectKey::extent(self.object_id(), attribute_id, range),
651                    ObjectValue::Extent(ExtentValue::deleted_extent()),
652                ),
653            );
654        }
655        Ok(())
656    }
657
658    // Returns a new aligned buffer (reading the head and tail blocks if necessary) with a copy of
659    // the data from `buf`.
660    pub async fn align_buffer(
661        &self,
662        attribute_id: u64,
663        offset: u64,
664        buf: BufferRef<'_>,
665    ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
666        let block_size = self.block_size();
667        let end = offset + buf.len() as u64;
668        let aligned =
669            round_down(offset, block_size)..round_up(end, block_size).ok_or(FxfsError::TooBig)?;
670
671        let mut aligned_buf =
672            self.store().device.allocate_buffer((aligned.end - aligned.start) as usize).await;
673
674        // Deal with head alignment.
675        if aligned.start < offset {
676            let mut head_block = aligned_buf.subslice_mut(..block_size as usize);
677            let read = self.read(attribute_id, aligned.start, head_block.reborrow()).await?;
678            head_block.as_mut_slice()[read..].fill(0);
679        }
680
681        // Deal with tail alignment.
682        if aligned.end > end {
683            let end_block_offset = aligned.end - block_size;
684            // There's no need to read the tail block if we read it as part of the head block.
685            if offset <= end_block_offset {
686                let mut tail_block =
687                    aligned_buf.subslice_mut(aligned_buf.len() - block_size as usize..);
688                let read = self.read(attribute_id, end_block_offset, tail_block.reborrow()).await?;
689                tail_block.as_mut_slice()[read..].fill(0);
690            }
691        }
692
693        aligned_buf.as_mut_slice()
694            [(offset - aligned.start) as usize..(end - aligned.start) as usize]
695            .copy_from_slice(buf.as_slice());
696
697        Ok((aligned, aligned_buf))
698    }
699
700    /// Trim an attribute's extents, potentially adding a graveyard trim entry if more trimming is
701    /// needed, so the transaction can be committed without worrying about leaking data.
702    ///
703    /// This doesn't update the size stored in the attribute value - the caller is responsible for
704    /// doing that to keep the size up to date.
705    pub async fn shrink(
706        &self,
707        transaction: &mut Transaction<'_>,
708        attribute_id: u64,
709        size: u64,
710    ) -> Result<NeedsTrim, Error> {
711        let store = self.store();
712        let needs_trim = matches!(
713            store
714                .trim_some(transaction, self.object_id(), attribute_id, TrimMode::FromOffset(size))
715                .await?,
716            TrimResult::Incomplete
717        );
718        if needs_trim {
719            // Add the object to the graveyard in case the following transactions don't get
720            // replayed.
721            let graveyard_id = store.graveyard_directory_object_id();
722            match store
723                .tree
724                .find(&ObjectKey::graveyard_entry(graveyard_id, self.object_id()))
725                .await?
726            {
727                Some(ObjectItem { value: ObjectValue::Some, .. })
728                | Some(ObjectItem { value: ObjectValue::Trim, .. }) => {
729                    // This object is already in the graveyard so we don't need to do anything.
730                }
731                _ => {
732                    transaction.add(
733                        store.store_object_id,
734                        Mutation::replace_or_insert_object(
735                            ObjectKey::graveyard_entry(graveyard_id, self.object_id()),
736                            ObjectValue::Trim,
737                        ),
738                    );
739                }
740            }
741        }
742        Ok(NeedsTrim(needs_trim))
743    }
744
745    /// Reads and decrypts a singular logical range.
746    pub async fn read_and_decrypt(
747        &self,
748        device_offset: u64,
749        file_offset: u64,
750        mut buffer: MutableBufferRef<'_>,
751        key_id: u64,
752    ) -> Result<(), Error> {
753        let store = self.store();
754        store.device_read_ops.fetch_add(1, Ordering::Relaxed);
755
756        let _watchdog = Watchdog::new(10, |count| {
757            warn!("Read has been stalled for {} seconds", count * 10);
758        });
759
760        let (_key_id, key) = self.get_key(Some(key_id)).await?;
761        if let Some(key) = key {
762            if let Some((dun, slot)) = key.crypt_ctx(self.object_id, file_offset) {
763                store
764                    .device
765                    .read_with_opts(
766                        device_offset as u64,
767                        buffer.reborrow(),
768                        ReadOptions { inline_crypto: InlineCryptoOptions::enabled(slot, dun) },
769                    )
770                    .await?;
771            } else {
772                store.device.read(device_offset, buffer.reborrow()).await?;
773                key.decrypt(self.object_id, device_offset, file_offset, buffer.as_mut_slice())?;
774            }
775        } else {
776            store.device.read(device_offset, buffer.reborrow()).await?;
777        }
778
779        Ok(())
780    }
781
782    /// Returns the specified key. If `key_id` is None, it will try and return the fscrypt key if
783    /// it is present, or the volume key if it isn't. If the fscrypt key is present, but the key
784    /// cannot be unwrapped, then this will return `FxfsError::NoKey`. If the volume is not
785    /// encrypted, this returns None.
786    pub async fn get_key(
787        &self,
788        key_id: Option<u64>,
789    ) -> Result<(u64, Option<Arc<dyn Cipher>>), Error> {
790        let store = self.store();
791        let result = match self.encryption {
792            Encryption::None => (VOLUME_DATA_KEY_ID, None),
793            Encryption::CachedKeys => {
794                if let Some(key_id) = key_id {
795                    (
796                        key_id,
797                        Some(
798                            store
799                                .key_manager
800                                .get_key(
801                                    self.object_id,
802                                    store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
803                                    async || store.get_keys(self.object_id).await,
804                                    key_id,
805                                )
806                                .await?,
807                        ),
808                    )
809                } else {
810                    let (key_id, key) = store
811                        .key_manager
812                        .get_fscrypt_key_if_present(
813                            self.object_id,
814                            store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
815                            async || store.get_keys(self.object_id).await,
816                        )
817                        .await?;
818                    (key_id, Some(key))
819                }
820            }
821            Encryption::PermanentKeys => {
822                (VOLUME_DATA_KEY_ID, Some(store.key_manager.get(self.object_id).await?.unwrap()))
823            }
824        };
825
826        // Ensure that if the key we receive uses inline encryption, barriers should be enabled.
827        if let Some(ref key) = result.1 {
828            if key.crypt_ctx(self.object_id, 0).is_some() {
829                if !store.filesystem().options().barriers_enabled {
830                    return Err(anyhow!(FxfsError::InvalidArgs)
831                        .context("Barriers must be enabled for inline encrypted writes."));
832                }
833            }
834        }
835
836        Ok(result)
837    }
838
839    /// This will only work for a non-permanent volume data key. This is designed to be used with
840    /// extended attributes where we'll only create the key on demand for directories and encrypted
841    /// files.
842    async fn get_or_create_key(
843        &self,
844        transaction: &mut Transaction<'_>,
845    ) -> Result<Arc<dyn Cipher>, Error> {
846        let store = self.store();
847
848        // Fast path: try and get keys from the cache.
849        if let Some(key) = store.key_manager.get(self.object_id).await.context("get failed")? {
850            return Ok(key);
851        }
852
853        let crypt = store.crypt().ok_or_else(|| anyhow!("No crypt!"))?;
854
855        // Next, see if the keys are already created.
856        let (mut encryption_keys, mut cipher_set) = if let Some(item) =
857            store.tree.find(&ObjectKey::keys(self.object_id)).await.context("find failed")?
858        {
859            if let ObjectValue::Keys(encryption_keys) = item.value {
860                let cipher_set = store
861                    .key_manager
862                    .get_keys(
863                        self.object_id,
864                        crypt.as_ref(),
865                        &mut Some(async || Ok(encryption_keys.clone())),
866                        /* permanent= */ false,
867                        /* force= */ false,
868                    )
869                    .await
870                    .context("get_keys failed")?;
871                match cipher_set.find_key(VOLUME_DATA_KEY_ID) {
872                    FindKeyResult::NotFound => {}
873                    FindKeyResult::Unavailable => return Err(FxfsError::NoKey.into()),
874                    FindKeyResult::Key(key) => return Ok(key),
875                }
876                (encryption_keys, (*cipher_set).clone())
877            } else {
878                return Err(anyhow!(FxfsError::Inconsistent));
879            }
880        } else {
881            Default::default()
882        };
883
884        // Proceed to create the key.  The transaction holds the required locks.
885        let (key, unwrapped_key) = crypt.create_key(self.object_id, KeyPurpose::Data).await?;
886        let cipher: Arc<dyn Cipher> = Arc::new(FxfsCipher::new(&unwrapped_key));
887
888        // Add new cipher to cloned cipher set. This will replace existing one
889        // if transaction is successful.
890        cipher_set.add_key(VOLUME_DATA_KEY_ID, CipherHolder::Cipher(cipher.clone()));
891        let cipher_set = Arc::new(cipher_set);
892
893        // Arrange for the CipherSet to be added to the cache when (and if) the transaction
894        // commits.
895        struct UnwrappedKeys {
896            object_id: u64,
897            new_keys: Arc<CipherSet>,
898        }
899
900        impl AssociatedObject for UnwrappedKeys {
901            fn will_apply_mutation(
902                &self,
903                _mutation: &Mutation,
904                object_id: u64,
905                manager: &ObjectManager,
906            ) {
907                manager.store(object_id).unwrap().key_manager.insert(
908                    self.object_id,
909                    self.new_keys.clone(),
910                    /* permanent= */ false,
911                );
912            }
913        }
914
915        encryption_keys.insert(VOLUME_DATA_KEY_ID, EncryptionKey::Fxfs(key).into());
916
917        transaction.add_with_object(
918            store.store_object_id(),
919            Mutation::replace_or_insert_object(
920                ObjectKey::keys(self.object_id),
921                ObjectValue::keys(encryption_keys),
922            ),
923            AssocObj::Owned(Box::new(UnwrappedKeys {
924                object_id: self.object_id,
925                new_keys: cipher_set,
926            })),
927        );
928
929        Ok(cipher)
930    }
931
932    pub async fn read(
933        &self,
934        attribute_id: u64,
935        offset: u64,
936        mut buf: MutableBufferRef<'_>,
937    ) -> Result<usize, Error> {
938        let fs = self.store().filesystem();
939        let guard = fs
940            .lock_manager()
941            .read_lock(lock_keys![LockKey::object_attribute(
942                self.store().store_object_id(),
943                self.object_id(),
944                attribute_id,
945            )])
946            .await;
947
948        let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
949        let item = self.store().tree().find(&key).await?;
950        let size = match item {
951            Some(item) if item.key == key => match item.value {
952                ObjectValue::Attribute { size, .. } => size,
953                _ => bail!(FxfsError::Inconsistent),
954            },
955            _ => return Ok(0),
956        };
957        if offset >= size {
958            return Ok(0);
959        }
960        let length = min(buf.len() as u64, size - offset) as usize;
961        buf = buf.subslice_mut(0..length);
962        self.read_unchecked(attribute_id, offset, buf, &guard).await?;
963        Ok(length)
964    }
965
966    /// Read `buf.len()` bytes from the attribute `attribute_id`, starting at `offset`, into `buf`.
967    /// It's required that a read lock on this attribute id is taken before this is called.
968    ///
969    /// This function doesn't do any size checking - any portion of `buf` past the end of the file
970    /// will be filled with zeros. The caller is responsible for enforcing the file size on reads.
971    /// This is because, just looking at the extents, we can't tell the difference between the file
972    /// actually ending and there just being a section at the end with no data (since attributes
973    /// are sparse).
974    pub(super) async fn read_unchecked(
975        &self,
976        attribute_id: u64,
977        mut offset: u64,
978        mut buf: MutableBufferRef<'_>,
979        _guard: &ReadGuard<'_>,
980    ) -> Result<(), Error> {
981        if buf.len() == 0 {
982            return Ok(());
983        }
984        let end_offset = offset + buf.len() as u64;
985
986        self.store().logical_read_ops.fetch_add(1, Ordering::Relaxed);
987
988        // Whilst the read offset must be aligned to the filesystem block size, the buffer need only
989        // be aligned to the device's block size.
990        let block_size = self.block_size() as u64;
991        let device_block_size = self.store().device.block_size() as u64;
992        assert_eq!(offset % block_size, 0);
993        assert_eq!(buf.range().start as u64 % device_block_size, 0);
994        let tree = &self.store().tree;
995        let layer_set = tree.layer_set();
996        let mut merger = layer_set.merger();
997        let mut iter = merger
998            .query(Query::LimitedRange(&ObjectKey::extent(
999                self.object_id(),
1000                attribute_id,
1001                offset..end_offset,
1002            )))
1003            .await?;
1004        let end_align = ((offset + buf.len() as u64) % block_size) as usize;
1005        let trace = self.trace();
1006        let reads = FuturesUnordered::new();
1007        while let Some(ItemRef {
1008            key:
1009                ObjectKey {
1010                    object_id,
1011                    data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
1012                },
1013            value: ObjectValue::Extent(extent_value),
1014            ..
1015        }) = iter.get()
1016        {
1017            if *object_id != self.object_id() || *attr_id != attribute_id {
1018                break;
1019            }
1020            ensure!(
1021                extent_key.range.is_valid() && extent_key.range.is_aligned(block_size),
1022                FxfsError::Inconsistent
1023            );
1024            if extent_key.range.start > offset {
1025                // Zero everything up to the start of the extent.
1026                let to_zero = min(extent_key.range.start - offset, buf.len() as u64) as usize;
1027                for i in &mut buf.as_mut_slice()[..to_zero] {
1028                    *i = 0;
1029                }
1030                buf = buf.subslice_mut(to_zero..);
1031                if buf.is_empty() {
1032                    break;
1033                }
1034                offset += to_zero as u64;
1035            }
1036
1037            if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1038                let mut device_offset = device_offset + (offset - extent_key.range.start);
1039                let key_id = *key_id;
1040
1041                let to_copy = min(buf.len() - end_align, (extent_key.range.end - offset) as usize);
1042                if to_copy > 0 {
1043                    if trace {
1044                        info!(
1045                            store_id = self.store().store_object_id(),
1046                            oid = self.object_id(),
1047                            device_range:? = (device_offset..device_offset + to_copy as u64),
1048                            offset,
1049                            range:? = extent_key.range,
1050                            block_size;
1051                            "R",
1052                        );
1053                    }
1054                    let (mut head, tail) = buf.split_at_mut(to_copy);
1055                    let maybe_bitmap = match mode {
1056                        ExtentMode::OverwritePartial(bitmap) => {
1057                            let mut read_bitmap = bitmap.clone().split_off(
1058                                ((offset - extent_key.range.start) / block_size) as usize,
1059                            );
1060                            read_bitmap.truncate(to_copy / block_size as usize);
1061                            Some(read_bitmap)
1062                        }
1063                        _ => None,
1064                    };
1065                    reads.push(async move {
1066                        self.read_and_decrypt(device_offset, offset, head.reborrow(), key_id)
1067                            .await?;
1068                        if let Some(bitmap) = maybe_bitmap {
1069                            apply_bitmap_zeroing(self.block_size() as usize, &bitmap, head);
1070                        }
1071                        Ok::<(), Error>(())
1072                    });
1073                    buf = tail;
1074                    if buf.is_empty() {
1075                        break;
1076                    }
1077                    offset += to_copy as u64;
1078                    device_offset += to_copy as u64;
1079                }
1080
1081                // Deal with end alignment by reading the existing contents into an alignment
1082                // buffer.
1083                if offset < extent_key.range.end && end_align > 0 {
1084                    if let ExtentMode::OverwritePartial(bitmap) = mode {
1085                        let bitmap_offset = (offset - extent_key.range.start) / block_size;
1086                        if !bitmap.get(bitmap_offset as usize).ok_or(FxfsError::Inconsistent)? {
1087                            // If this block isn't actually initialized, skip it.
1088                            break;
1089                        }
1090                    }
1091                    let mut align_buf =
1092                        self.store().device.allocate_buffer(block_size as usize).await;
1093                    if trace {
1094                        info!(
1095                            store_id = self.store().store_object_id(),
1096                            oid = self.object_id(),
1097                            device_range:? = (device_offset..device_offset + align_buf.len() as u64);
1098                            "RT",
1099                        );
1100                    }
1101                    self.read_and_decrypt(device_offset, offset, align_buf.as_mut(), key_id)
1102                        .await?;
1103                    buf.as_mut_slice().copy_from_slice(&align_buf.as_slice()[..end_align]);
1104                    buf = buf.subslice_mut(0..0);
1105                    break;
1106                }
1107            } else if extent_key.range.end >= offset + buf.len() as u64 {
1108                // Deleted extent covers remainder, so we're done.
1109                break;
1110            }
1111
1112            iter.advance().await?;
1113        }
1114        reads.try_collect::<()>().await?;
1115        buf.as_mut_slice().fill(0);
1116        Ok(())
1117    }
1118
1119    /// Reads an entire attribute.
1120    pub async fn read_attr(&self, attribute_id: u64) -> Result<Option<Box<[u8]>>, Error> {
1121        let store = self.store();
1122        let tree = &store.tree;
1123        let layer_set = tree.layer_set();
1124        let mut merger = layer_set.merger();
1125        let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1126        let iter = merger.query(Query::FullRange(&key)).await?;
1127        match iter.get() {
1128            Some(item) if item.key == &key => match item.value {
1129                ObjectValue::Attribute { .. } => Ok(Some(self.read_attr_from_iter(iter).await?)),
1130                // Attribute was deleted.
1131                ObjectValue::None => Ok(None),
1132                _ => Err(FxfsError::Inconsistent.into()),
1133            },
1134            _ => Ok(None),
1135        }
1136    }
1137
1138    /// Reads an entire attribute pointed to by `iter`. `iter` must be pointing to the
1139    /// `AttributeKey::Attribute` of the attribute.
1140    pub async fn read_attr_from_iter(
1141        &self,
1142        mut iter: MergerIterator<'_, '_, ObjectKey, ObjectValue>,
1143    ) -> Result<Box<[u8]>, Error> {
1144        let (mut buffer, size, attribute_id) = match iter.get() {
1145            Some(ItemRef {
1146                key:
1147                    ObjectKey {
1148                        object_id,
1149                        data: ObjectKeyData::Attribute(attribute_id, AttributeKey::Attribute),
1150                    },
1151                value: ObjectValue::Attribute { size, .. },
1152                ..
1153            }) if *object_id == self.object_id => {
1154                // TODO(https://fxbug.dev/42073113): size > max buffer size
1155                (
1156                    self.store()
1157                        .device
1158                        .allocate_buffer(round_up(*size, self.block_size()).unwrap() as usize)
1159                        .await,
1160                    *size as usize,
1161                    *attribute_id,
1162                )
1163            }
1164            _ => bail!(FxfsError::InvalidArgs),
1165        };
1166
1167        self.store().logical_read_ops.fetch_add(1, Ordering::Relaxed);
1168        let mut last_offset = 0;
1169        loop {
1170            iter.advance().await?;
1171            match iter.get() {
1172                Some(ItemRef {
1173                    key:
1174                        ObjectKey {
1175                            object_id,
1176                            data:
1177                                ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
1178                        },
1179                    value: ObjectValue::Extent(extent_value),
1180                    ..
1181                }) if *object_id == self.object_id() && *attr_id == attribute_id => {
1182                    if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1183                        let offset = extent_key.range.start as usize;
1184                        buffer.as_mut_slice()[last_offset..offset].fill(0);
1185                        let end = std::cmp::min(extent_key.range.end as usize, buffer.len());
1186                        let maybe_bitmap = match mode {
1187                            ExtentMode::OverwritePartial(bitmap) => {
1188                                // The caller has to adjust the bitmap if necessary, but we always
1189                                // start from the beginning of any extent, so we only truncate.
1190                                let mut read_bitmap = bitmap.clone();
1191                                read_bitmap.truncate(
1192                                    (end - extent_key.range.start as usize)
1193                                        / self.block_size() as usize,
1194                                );
1195                                Some(read_bitmap)
1196                            }
1197                            _ => None,
1198                        };
1199                        self.read_and_decrypt(
1200                            *device_offset,
1201                            extent_key.range.start,
1202                            buffer.subslice_mut(offset..end as usize),
1203                            *key_id,
1204                        )
1205                        .await?;
1206                        if let Some(bitmap) = maybe_bitmap {
1207                            apply_bitmap_zeroing(
1208                                self.block_size() as usize,
1209                                &bitmap,
1210                                buffer.subslice_mut(offset..end as usize),
1211                            );
1212                        }
1213                        last_offset = end;
1214                        if last_offset >= size {
1215                            break;
1216                        }
1217                    }
1218                }
1219                _ => break,
1220            }
1221        }
1222        buffer.as_mut_slice()[std::cmp::min(last_offset, size)..].fill(0);
1223        Ok(buffer.as_slice()[..size].into())
1224    }
1225
1226    /// Writes potentially unaligned data at `device_offset` and returns checksums if requested.
1227    /// The data will be encrypted if necessary.  `buf` is mutable as an optimization, since the
1228    /// write may require encryption, we can encrypt the buffer in-place rather than copying to
1229    /// another buffer if the write is already aligned.
1230    ///
1231    /// NOTE: This will not create keys if they are missing (it will fail with an error if that
1232    /// happens to be the case).
1233    pub async fn write_at(
1234        &self,
1235        attribute_id: u64,
1236        offset: u64,
1237        buf: MutableBufferRef<'_>,
1238        key_id: Option<u64>,
1239        mut device_offset: u64,
1240    ) -> Result<MaybeChecksums, Error> {
1241        let mut transfer_buf;
1242        let block_size = self.block_size();
1243        let (range, mut transfer_buf_ref) =
1244            if offset % block_size == 0 && buf.len() as u64 % block_size == 0 {
1245                (offset..offset + buf.len() as u64, buf)
1246            } else {
1247                let (range, buf) = self.align_buffer(attribute_id, offset, buf.as_ref()).await?;
1248                transfer_buf = buf;
1249                device_offset -= offset - range.start;
1250                (range, transfer_buf.as_mut())
1251            };
1252
1253        let mut crypt_ctx = None;
1254        if let (_, Some(key)) = self.get_key(key_id).await? {
1255            if let Some(ctx) = key.crypt_ctx(self.object_id, range.start) {
1256                crypt_ctx = Some(ctx);
1257            } else {
1258                key.encrypt(
1259                    self.object_id,
1260                    device_offset,
1261                    range.start,
1262                    transfer_buf_ref.as_mut_slice(),
1263                )?;
1264            }
1265        }
1266        self.write_aligned(transfer_buf_ref.as_ref(), device_offset, crypt_ctx).await
1267    }
1268
1269    /// Writes to multiple ranges with data provided in `buf`. This function is specifically
1270    /// designed for migration purposes, allowing raw writes to the device without updating
1271    /// object metadata like allocated size or mtime. It's essential for scenarios where
1272    /// data needs to be transferred directly without triggering standard filesystem operations.
1273    #[cfg(feature = "migration")]
1274    pub async fn raw_multi_write(
1275        &self,
1276        transaction: &mut Transaction<'_>,
1277        attribute_id: u64,
1278        key_id: Option<u64>,
1279        ranges: &[Range<u64>],
1280        buf: MutableBufferRef<'_>,
1281    ) -> Result<(), Error> {
1282        self.multi_write_internal(transaction, attribute_id, key_id, ranges, buf).await?;
1283        Ok(())
1284    }
1285
1286    /// This is a low-level write function that writes to multiple ranges. Users should generally
1287    /// use `multi_write` instead of this function as this does not update the object's allocated
1288    /// size, mtime, atime, etc.
1289    ///
1290    /// Returns (allocated, deallocated) bytes on success.
1291    async fn multi_write_internal(
1292        &self,
1293        transaction: &mut Transaction<'_>,
1294        attribute_id: u64,
1295        key_id: Option<u64>,
1296        ranges: &[Range<u64>],
1297        mut buf: MutableBufferRef<'_>,
1298    ) -> Result<(u64, u64), Error> {
1299        if buf.is_empty() {
1300            return Ok((0, 0));
1301        }
1302        let block_size = self.block_size();
1303        let store = self.store();
1304        let store_id = store.store_object_id();
1305
1306        // The only key we allow to be created on-the-fly is a non permanent key wrapped with the
1307        // volume data key.
1308        let (key_id, key) = if key_id == Some(VOLUME_DATA_KEY_ID)
1309            && matches!(self.encryption, Encryption::CachedKeys)
1310        {
1311            (
1312                VOLUME_DATA_KEY_ID,
1313                Some(
1314                    self.get_or_create_key(transaction)
1315                        .await
1316                        .context("get_or_create_key failed")?,
1317                ),
1318            )
1319        } else {
1320            self.get_key(key_id).await?
1321        };
1322        if let Some(key) = &key {
1323            if !key.supports_inline_encryption() {
1324                let mut slice = buf.as_mut_slice();
1325                for r in ranges {
1326                    let l = r.end - r.start;
1327                    let (head, tail) = slice.split_at_mut(l as usize);
1328                    key.encrypt(
1329                        self.object_id,
1330                        0, /* TODO(https://fxbug.dev/421269588): plumb through device_offset. */
1331                        r.start,
1332                        head,
1333                    )?;
1334                    slice = tail;
1335                }
1336            }
1337        }
1338
1339        let mut allocated = 0;
1340        let allocator = store.allocator();
1341        let trace = self.trace();
1342        let mut writes = FuturesOrdered::new();
1343
1344        let mut logical_ranges = ranges.iter();
1345        let mut current_range = logical_ranges.next().unwrap().clone();
1346
1347        while !buf.is_empty() {
1348            let mut device_range = allocator
1349                .allocate(transaction, store_id, buf.len() as u64)
1350                .await
1351                .context("allocation failed")?;
1352            if trace {
1353                info!(
1354                    store_id,
1355                    oid = self.object_id(),
1356                    device_range:?,
1357                    len = device_range.end - device_range.start;
1358                    "A",
1359                );
1360            }
1361            let mut device_range_len = device_range.end - device_range.start;
1362            allocated += device_range_len;
1363            // If inline encryption is NOT supported, this loop should only happen once.
1364            while device_range_len > 0 {
1365                if current_range.end <= current_range.start {
1366                    current_range = logical_ranges.next().unwrap().clone();
1367                }
1368                let (crypt_ctx, split) = if let Some(key) = &key {
1369                    if key.supports_inline_encryption() {
1370                        let split = std::cmp::min(
1371                            current_range.end - current_range.start,
1372                            device_range_len,
1373                        );
1374                        let crypt_ctx = key.crypt_ctx(self.object_id, current_range.start);
1375                        current_range.start += split;
1376                        (crypt_ctx, split)
1377                    } else {
1378                        (None, device_range_len)
1379                    }
1380                } else {
1381                    (None, device_range_len)
1382                };
1383
1384                let (head, tail) = buf.split_at_mut(split as usize);
1385                buf = tail;
1386
1387                writes.push_back(async move {
1388                    let len = head.len() as u64;
1389                    Result::<_, Error>::Ok((
1390                        device_range.start,
1391                        len,
1392                        self.write_aligned(head.as_ref(), device_range.start, crypt_ctx).await?,
1393                    ))
1394                });
1395                device_range.start += split;
1396                device_range_len = device_range.end - device_range.start;
1397            }
1398        }
1399
1400        self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1401        let ((mutations, checksums), deallocated) = try_join!(
1402            async {
1403                let mut current_range = 0..0;
1404                let mut mutations = Vec::new();
1405                let mut out_checksums = Vec::new();
1406                let mut ranges = ranges.iter();
1407                while let Some((mut device_offset, mut len, mut checksums)) =
1408                    writes.try_next().await?
1409                {
1410                    while len > 0 {
1411                        if current_range.end <= current_range.start {
1412                            current_range = ranges.next().unwrap().clone();
1413                        }
1414                        let chunk_len = std::cmp::min(len, current_range.end - current_range.start);
1415                        let tail = checksums.split_off((chunk_len / block_size) as usize);
1416                        if let Some(checksums) = checksums.maybe_as_ref() {
1417                            out_checksums.push((
1418                                device_offset..device_offset + chunk_len,
1419                                checksums.to_owned(),
1420                            ));
1421                        }
1422                        mutations.push(Mutation::merge_object(
1423                            ObjectKey::extent(
1424                                self.object_id(),
1425                                attribute_id,
1426                                current_range.start..current_range.start + chunk_len,
1427                            ),
1428                            ObjectValue::Extent(ExtentValue::new(
1429                                device_offset,
1430                                checksums.to_mode(),
1431                                key_id,
1432                            )),
1433                        ));
1434                        checksums = tail;
1435                        device_offset += chunk_len;
1436                        len -= chunk_len;
1437                        current_range.start += chunk_len;
1438                    }
1439                }
1440                Result::<_, Error>::Ok((mutations, out_checksums))
1441            },
1442            async {
1443                let mut deallocated = 0;
1444                for r in ranges {
1445                    deallocated +=
1446                        self.deallocate_old_extents(transaction, attribute_id, r.clone()).await?;
1447                }
1448                Result::<_, Error>::Ok(deallocated)
1449            }
1450        )?;
1451
1452        for m in mutations {
1453            transaction.add(store_id, m);
1454        }
1455
1456        // Only store checksums in the journal if barriers are not enabled.
1457        if !store.filesystem().options().barriers_enabled {
1458            for (r, c) in checksums {
1459                transaction.add_checksum(r, c, true);
1460            }
1461        }
1462        Ok((allocated, deallocated))
1463    }
1464
1465    /// Writes to multiple ranges with data provided in `buf`.  The buffer can be modified in place
1466    /// if encryption takes place.  The ranges must all be aligned and no change to content size is
1467    /// applied; the caller is responsible for updating size if required.  If `key_id` is None, it
1468    /// means pick the default key for the object which is the fscrypt key if present, or the volume
1469    /// data key, or no key if it's an unencrypted file.
1470    pub async fn multi_write(
1471        &self,
1472        transaction: &mut Transaction<'_>,
1473        attribute_id: u64,
1474        key_id: Option<u64>,
1475        ranges: &[Range<u64>],
1476        buf: MutableBufferRef<'_>,
1477    ) -> Result<(), Error> {
1478        let (allocated, deallocated) =
1479            self.multi_write_internal(transaction, attribute_id, key_id, ranges, buf).await?;
1480        if allocated == 0 && deallocated == 0 {
1481            return Ok(());
1482        }
1483        self.update_allocated_size(transaction, allocated, deallocated).await
1484    }
1485
1486    /// Write data to overwrite extents with the provided set of ranges. This makes a strong
1487    /// assumption that the ranges are actually going to be already allocated overwrite extents and
1488    /// will error out or do something wrong if they aren't. It also assumes the ranges passed to
1489    /// it are sorted.
1490    pub async fn multi_overwrite<'a>(
1491        &'a self,
1492        transaction: &mut Transaction<'a>,
1493        attr_id: u64,
1494        ranges: &[Range<u64>],
1495        mut buf: MutableBufferRef<'_>,
1496    ) -> Result<(), Error> {
1497        if buf.is_empty() {
1498            return Ok(());
1499        }
1500        let block_size = self.block_size();
1501        let store = self.store();
1502        let tree = store.tree();
1503        let store_id = store.store_object_id();
1504
1505        let (key_id, key) = self.get_key(None).await?;
1506        if let Some(key) = &key {
1507            if !key.supports_inline_encryption() {
1508                let mut slice = buf.as_mut_slice();
1509                for r in ranges {
1510                    let l = r.end - r.start;
1511                    let (head, tail) = slice.split_at_mut(l as usize);
1512                    key.encrypt(
1513                        self.object_id,
1514                        0, /* TODO(https://fxbug.dev/421269588): plumb through device_offset. */
1515                        r.start,
1516                        head,
1517                    )?;
1518                    slice = tail;
1519                }
1520            }
1521        }
1522
1523        let mut range_iter = ranges.iter();
1524        // There should be at least one range if the buffer has data in it
1525        let mut target_range = range_iter.next().unwrap().clone();
1526        let mut mutations = Vec::new();
1527        let writes = FuturesUnordered::new();
1528
1529        let layer_set = tree.layer_set();
1530        let mut merger = layer_set.merger();
1531        let mut iter = merger
1532            .query(Query::FullRange(&ObjectKey::attribute(
1533                self.object_id(),
1534                attr_id,
1535                AttributeKey::Extent(ExtentKey::search_key_from_offset(target_range.start)),
1536            )))
1537            .await?;
1538
1539        loop {
1540            match iter.get() {
1541                Some(ItemRef {
1542                    key:
1543                        ObjectKey {
1544                            object_id,
1545                            data:
1546                                ObjectKeyData::Attribute(
1547                                    attribute_id,
1548                                    AttributeKey::Extent(ExtentKey { range }),
1549                                ),
1550                        },
1551                    value: ObjectValue::Extent(extent_value),
1552                    ..
1553                }) if *object_id == self.object_id() && *attribute_id == attr_id => {
1554                    // If this extent ends before the target range starts (not possible on the
1555                    // first loop because of the query parameters but possible on further loops),
1556                    // advance until we find a the next one we care about.
1557                    if range.end <= target_range.start {
1558                        iter.advance().await?;
1559                        continue;
1560                    }
1561                    let (device_offset, mode) = match extent_value {
1562                        ExtentValue::None => {
1563                            return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1564                                format!(
1565                                    "multi_overwrite failed: target_range ({}, {}) overlaps with \
1566                                deleted extent found at ({}, {})",
1567                                    target_range.start, target_range.end, range.start, range.end,
1568                                )
1569                            });
1570                        }
1571                        ExtentValue::Some { device_offset, mode, .. } => (device_offset, mode),
1572                    };
1573                    // The ranges passed to this function should already by allocated, so
1574                    // extent records should exist for them.
1575                    if range.start > target_range.start {
1576                        return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1577                            format!(
1578                                "multi_overwrite failed: target range ({}, {}) starts before first \
1579                            extent found at ({}, {})",
1580                                target_range.start, target_range.end, range.start, range.end,
1581                            )
1582                        });
1583                    }
1584                    let mut bitmap = match mode {
1585                        ExtentMode::Raw | ExtentMode::Cow(_) => {
1586                            return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1587                                format!(
1588                                    "multi_overwrite failed: \
1589                            extent from ({}, {}) which overlaps target range ({}, {}) had the \
1590                            wrong extent mode",
1591                                    range.start, range.end, target_range.start, target_range.end,
1592                                )
1593                            });
1594                        }
1595                        ExtentMode::OverwritePartial(bitmap) => {
1596                            OverwriteBitmaps::new(bitmap.clone())
1597                        }
1598                        ExtentMode::Overwrite => OverwriteBitmaps::None,
1599                    };
1600                    loop {
1601                        let offset_within_extent = target_range.start - range.start;
1602                        let bitmap_offset = offset_within_extent / block_size;
1603                        let write_device_offset = *device_offset + offset_within_extent;
1604                        let write_end = min(range.end, target_range.end);
1605                        let write_len = write_end - target_range.start;
1606                        let write_device_range =
1607                            write_device_offset..write_device_offset + write_len;
1608                        let (current_buf, remaining_buf) = buf.split_at_mut(write_len as usize);
1609
1610                        bitmap.set_offset(bitmap_offset as usize);
1611                        let checksum_ranges = ChecksumRangeChunk::group_first_write_ranges(
1612                            &mut bitmap,
1613                            block_size,
1614                            write_device_range,
1615                        );
1616
1617                        let crypt_ctx = if let Some(key) = &key {
1618                            key.crypt_ctx(self.object_id, target_range.start)
1619                        } else {
1620                            None
1621                        };
1622
1623                        writes.push(async move {
1624                            let maybe_checksums = self
1625                                .write_aligned(current_buf.as_ref(), write_device_offset, crypt_ctx)
1626                                .await?;
1627                            Ok::<_, Error>(match maybe_checksums {
1628                                MaybeChecksums::None => Vec::new(),
1629                                MaybeChecksums::Fletcher(checksums) => checksum_ranges
1630                                    .into_iter()
1631                                    .map(
1632                                        |ChecksumRangeChunk {
1633                                             checksum_range,
1634                                             device_range,
1635                                             is_first_write,
1636                                         }| {
1637                                            (
1638                                                device_range,
1639                                                checksums[checksum_range].to_vec(),
1640                                                is_first_write,
1641                                            )
1642                                        },
1643                                    )
1644                                    .collect(),
1645                            })
1646                        });
1647                        buf = remaining_buf;
1648                        target_range.start += write_len;
1649                        if target_range.start == target_range.end {
1650                            match range_iter.next() {
1651                                None => break,
1652                                Some(next_range) => target_range = next_range.clone(),
1653                            }
1654                        }
1655                        if range.end <= target_range.start {
1656                            break;
1657                        }
1658                    }
1659                    if let Some((mut bitmap, write_bitmap)) = bitmap.take_bitmaps() {
1660                        if bitmap.or(&write_bitmap) {
1661                            let mode = if bitmap.all() {
1662                                ExtentMode::Overwrite
1663                            } else {
1664                                ExtentMode::OverwritePartial(bitmap)
1665                            };
1666                            mutations.push(Mutation::merge_object(
1667                                ObjectKey::extent(self.object_id(), attr_id, range.clone()),
1668                                ObjectValue::Extent(ExtentValue::new(*device_offset, mode, key_id)),
1669                            ))
1670                        }
1671                    }
1672                    if target_range.start == target_range.end {
1673                        break;
1674                    }
1675                    iter.advance().await?;
1676                }
1677                // We've either run past the end of the existing extents or something is wrong with
1678                // the tree. The main section should break if it finishes the ranges, so either
1679                // case, this is an error.
1680                _ => bail!(anyhow!(FxfsError::Internal).context(
1681                    "found a non-extent object record while there were still ranges to process"
1682                )),
1683            }
1684        }
1685
1686        let checksums = writes.try_collect::<Vec<_>>().await?;
1687        // Only store checksums in the journal if barriers are not enabled.
1688        if !store.filesystem().options().barriers_enabled {
1689            for (r, c, first_write) in checksums.into_iter().flatten() {
1690                transaction.add_checksum(r, c, first_write);
1691            }
1692        }
1693
1694        for m in mutations {
1695            transaction.add(store_id, m);
1696        }
1697
1698        Ok(())
1699    }
1700
1701    /// Writes an attribute that should not already exist and therefore does not require trimming.
1702    /// Breaks up the write into multiple transactions if `data.len()` is larger than `batch_size`.
1703    /// If writing the attribute requires multiple transactions, adds the attribute to the
1704    /// graveyard. The caller is responsible for removing the attribute from the graveyard when it
1705    /// commits the last transaction.  This always writes using a key wrapped with the volume data
1706    /// key.
1707    #[trace]
1708    pub async fn write_new_attr_in_batches<'a>(
1709        &'a self,
1710        transaction: &mut Transaction<'a>,
1711        attribute_id: u64,
1712        data: &[u8],
1713        batch_size: usize,
1714    ) -> Result<(), Error> {
1715        transaction.add(
1716            self.store().store_object_id,
1717            Mutation::replace_or_insert_object(
1718                ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1719                ObjectValue::attribute(data.len() as u64, false),
1720            ),
1721        );
1722        let chunks = data.chunks(batch_size);
1723        let num_chunks = chunks.len();
1724        if num_chunks > 1 {
1725            transaction.add(
1726                self.store().store_object_id,
1727                Mutation::replace_or_insert_object(
1728                    ObjectKey::graveyard_attribute_entry(
1729                        self.store().graveyard_directory_object_id(),
1730                        self.object_id(),
1731                        attribute_id,
1732                    ),
1733                    ObjectValue::Some,
1734                ),
1735            );
1736        }
1737        let mut start_offset = 0;
1738        for (i, chunk) in chunks.enumerate() {
1739            let rounded_len = round_up(chunk.len() as u64, self.block_size()).unwrap();
1740            let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1741            let slice = buffer.as_mut_slice();
1742            slice[..chunk.len()].copy_from_slice(chunk);
1743            slice[chunk.len()..].fill(0);
1744            self.multi_write(
1745                transaction,
1746                attribute_id,
1747                Some(VOLUME_DATA_KEY_ID),
1748                &[start_offset..start_offset + rounded_len],
1749                buffer.as_mut(),
1750            )
1751            .await?;
1752            start_offset += rounded_len;
1753            // Do not commit the last chunk.
1754            if i < num_chunks - 1 {
1755                transaction.commit_and_continue().await?;
1756            }
1757        }
1758        Ok(())
1759    }
1760
1761    /// Writes an entire attribute. Returns whether or not the attribute needs to continue being
1762    /// trimmed - if the new data is shorter than the old data, this will trim any extents beyond
1763    /// the end of the new size, but if there were too many for a single transaction, a commit
1764    /// needs to be made before trimming again, so the responsibility is left to the caller so as
1765    /// to not accidentally split the transaction when it's not in a consistent state.  This will
1766    /// write using the volume data key; the fscrypt key is not supported.
1767    pub async fn write_attr(
1768        &self,
1769        transaction: &mut Transaction<'_>,
1770        attribute_id: u64,
1771        data: &[u8],
1772    ) -> Result<NeedsTrim, Error> {
1773        let rounded_len = round_up(data.len() as u64, self.block_size()).unwrap();
1774        let store = self.store();
1775        let tree = store.tree();
1776        let should_trim = if let Some(item) = tree
1777            .find(&ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute))
1778            .await?
1779        {
1780            match item.value {
1781                ObjectValue::Attribute { size: _, has_overwrite_extents: true } => {
1782                    bail!(
1783                        anyhow!(FxfsError::Inconsistent)
1784                            .context("write_attr on an attribute with overwrite extents")
1785                    )
1786                }
1787                ObjectValue::Attribute { size, .. } => (data.len() as u64) < size,
1788                _ => bail!(FxfsError::Inconsistent),
1789            }
1790        } else {
1791            false
1792        };
1793        let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1794        let slice = buffer.as_mut_slice();
1795        slice[..data.len()].copy_from_slice(data);
1796        slice[data.len()..].fill(0);
1797        self.multi_write(
1798            transaction,
1799            attribute_id,
1800            Some(VOLUME_DATA_KEY_ID),
1801            &[0..rounded_len],
1802            buffer.as_mut(),
1803        )
1804        .await?;
1805        transaction.add(
1806            self.store().store_object_id,
1807            Mutation::replace_or_insert_object(
1808                ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1809                ObjectValue::attribute(data.len() as u64, false),
1810            ),
1811        );
1812        if should_trim {
1813            self.shrink(transaction, attribute_id, data.len() as u64).await
1814        } else {
1815            Ok(NeedsTrim(false))
1816        }
1817    }
1818
1819    pub async fn list_extended_attributes(&self) -> Result<Vec<Vec<u8>>, Error> {
1820        let layer_set = self.store().tree().layer_set();
1821        let mut merger = layer_set.merger();
1822        // Seek to the first extended attribute key for this object.
1823        let mut iter = merger
1824            .query(Query::FullRange(&ObjectKey::extended_attribute(self.object_id(), Vec::new())))
1825            .await?;
1826        let mut out = Vec::new();
1827        while let Some(item) = iter.get() {
1828            // Skip deleted extended attributes.
1829            if item.value != &ObjectValue::None {
1830                match item.key {
1831                    ObjectKey { object_id, data: ObjectKeyData::ExtendedAttribute { name } } => {
1832                        if self.object_id() != *object_id {
1833                            bail!(
1834                                anyhow!(FxfsError::Inconsistent)
1835                                    .context("list_extended_attributes: wrong object id")
1836                            )
1837                        }
1838                        out.push(name.clone());
1839                    }
1840                    // Once we hit something that isn't an extended attribute key, we've gotten to
1841                    // the end.
1842                    _ => break,
1843                }
1844            }
1845            iter.advance().await?;
1846        }
1847        Ok(out)
1848    }
1849
1850    /// Looks up the values for the extended attribute `fio::SELINUX_CONTEXT_NAME`, returning it
1851    /// if it is found inline. If it is not inline, it will request use of the
1852    /// `get_extended_attributes` method. If the entry doesn't exist at all, returns None.
1853    pub async fn get_inline_selinux_context(&self) -> Result<Option<fio::SelinuxContext>, Error> {
1854        // This optimization is only useful as long as the attribute is smaller than inline sizes.
1855        // Avoid reading the data out of the attributes.
1856        const_assert!(fio::MAX_SELINUX_CONTEXT_ATTRIBUTE_LEN as usize <= MAX_INLINE_XATTR_SIZE);
1857        let item = match self
1858            .store()
1859            .tree()
1860            .find(&ObjectKey::extended_attribute(
1861                self.object_id(),
1862                fio::SELINUX_CONTEXT_NAME.into(),
1863            ))
1864            .await?
1865        {
1866            Some(item) => item,
1867            None => return Ok(None),
1868        };
1869        match item.value {
1870            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => {
1871                Ok(Some(fio::SelinuxContext::Data(value)))
1872            }
1873            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(_)) => {
1874                Ok(Some(fio::SelinuxContext::UseExtendedAttributes(fio::EmptyStruct {})))
1875            }
1876            _ => {
1877                bail!(
1878                    anyhow!(FxfsError::Inconsistent)
1879                        .context("get_inline_extended_attribute: Expected ExtendedAttribute value")
1880                )
1881            }
1882        }
1883    }
1884
1885    pub async fn get_extended_attribute(&self, name: Vec<u8>) -> Result<Vec<u8>, Error> {
1886        let item = self
1887            .store()
1888            .tree()
1889            .find(&ObjectKey::extended_attribute(self.object_id(), name))
1890            .await?
1891            .ok_or(FxfsError::NotFound)?;
1892        match item.value {
1893            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => Ok(value),
1894            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1895                Ok(self.read_attr(id).await?.ok_or(FxfsError::Inconsistent)?.into_vec())
1896            }
1897            _ => {
1898                bail!(
1899                    anyhow!(FxfsError::Inconsistent)
1900                        .context("get_extended_attribute: Expected ExtendedAttribute value")
1901                )
1902            }
1903        }
1904    }
1905
1906    pub async fn set_extended_attribute(
1907        &self,
1908        name: Vec<u8>,
1909        value: Vec<u8>,
1910        mode: SetExtendedAttributeMode,
1911    ) -> Result<(), Error> {
1912        let store = self.store();
1913        let fs = store.filesystem();
1914        // NB: We need to take this lock before we potentially look up the value to prevent racing
1915        // with another set.
1916        let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
1917        let mut transaction = fs.new_transaction(keys, Options::default()).await?;
1918        self.set_extended_attribute_impl(name, value, mode, &mut transaction).await?;
1919        transaction.commit().await?;
1920        Ok(())
1921    }
1922
1923    async fn set_extended_attribute_impl(
1924        &self,
1925        name: Vec<u8>,
1926        value: Vec<u8>,
1927        mode: SetExtendedAttributeMode,
1928        transaction: &mut Transaction<'_>,
1929    ) -> Result<(), Error> {
1930        ensure!(name.len() <= MAX_XATTR_NAME_SIZE, FxfsError::TooBig);
1931        ensure!(value.len() <= MAX_XATTR_VALUE_SIZE, FxfsError::TooBig);
1932        let tree = self.store().tree();
1933        let object_key = ObjectKey::extended_attribute(self.object_id(), name);
1934
1935        let existing_attribute_id = {
1936            let (found, existing_attribute_id) = match tree.find(&object_key).await? {
1937                None => (false, None),
1938                Some(Item { value, .. }) => (
1939                    true,
1940                    match value {
1941                        ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
1942                        ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1943                            Some(id)
1944                        }
1945                        _ => bail!(
1946                            anyhow!(FxfsError::Inconsistent)
1947                                .context("expected extended attribute value")
1948                        ),
1949                    },
1950                ),
1951            };
1952            match mode {
1953                SetExtendedAttributeMode::Create if found => {
1954                    bail!(FxfsError::AlreadyExists)
1955                }
1956                SetExtendedAttributeMode::Replace if !found => {
1957                    bail!(FxfsError::NotFound)
1958                }
1959                _ => (),
1960            }
1961            existing_attribute_id
1962        };
1963
1964        if let Some(attribute_id) = existing_attribute_id {
1965            // If we already have an attribute id allocated for this extended attribute, we always
1966            // use it, even if the value has shrunk enough to be stored inline. We don't need to
1967            // worry about trimming here for the same reason we don't need to worry about it when
1968            // we delete xattrs - they simply aren't large enough to ever need more than one
1969            // transaction.
1970            let _ = self.write_attr(transaction, attribute_id, &value).await?;
1971        } else if value.len() <= MAX_INLINE_XATTR_SIZE {
1972            transaction.add(
1973                self.store().store_object_id(),
1974                Mutation::replace_or_insert_object(
1975                    object_key,
1976                    ObjectValue::inline_extended_attribute(value),
1977                ),
1978            );
1979        } else {
1980            // If there isn't an existing attribute id and we are going to store the value in
1981            // an attribute, find the next empty attribute id in the range. We search for fxfs
1982            // attribute records specifically, instead of the extended attribute records, because
1983            // even if the extended attribute record is removed the attribute may not be fully
1984            // trimmed yet.
1985            let mut attribute_id = EXTENDED_ATTRIBUTE_RANGE_START;
1986            let layer_set = tree.layer_set();
1987            let mut merger = layer_set.merger();
1988            let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1989            let mut iter = merger.query(Query::FullRange(&key)).await?;
1990            loop {
1991                match iter.get() {
1992                    // None means the key passed to seek wasn't found. That means the first
1993                    // attribute is available and we can just stop right away.
1994                    None => break,
1995                    Some(ItemRef {
1996                        key: ObjectKey { object_id, data: ObjectKeyData::Attribute(attr_id, _) },
1997                        value,
1998                        ..
1999                    }) if *object_id == self.object_id() => {
2000                        if matches!(value, ObjectValue::None) {
2001                            // This attribute was once used but is now deleted, so it's safe to use
2002                            // again.
2003                            break;
2004                        }
2005                        if attribute_id < *attr_id {
2006                            // We found a gap - use it.
2007                            break;
2008                        } else if attribute_id == *attr_id {
2009                            // This attribute id is in use, try the next one.
2010                            attribute_id += 1;
2011                            if attribute_id == EXTENDED_ATTRIBUTE_RANGE_END {
2012                                bail!(FxfsError::NoSpace);
2013                            }
2014                        }
2015                        // If we don't hit either of those cases, we are still moving through the
2016                        // extent keys for the current attribute, so just keep advancing until the
2017                        // attribute id changes.
2018                    }
2019                    // As we are working our way through the iterator, if we hit anything that
2020                    // doesn't have our object id or attribute key data, we've gone past the end of
2021                    // this section and can stop.
2022                    _ => break,
2023                }
2024                iter.advance().await?;
2025            }
2026
2027            // We know this won't need trimming because it's a new attribute.
2028            let _ = self.write_attr(transaction, attribute_id, &value).await?;
2029            transaction.add(
2030                self.store().store_object_id(),
2031                Mutation::replace_or_insert_object(
2032                    object_key,
2033                    ObjectValue::extended_attribute(attribute_id),
2034                ),
2035            );
2036        }
2037
2038        Ok(())
2039    }
2040
2041    pub async fn remove_extended_attribute(&self, name: Vec<u8>) -> Result<(), Error> {
2042        let store = self.store();
2043        let tree = store.tree();
2044        let object_key = ObjectKey::extended_attribute(self.object_id(), name);
2045
2046        // NB: The API says we have to return an error if the attribute doesn't exist, so we have
2047        // to look it up first to make sure we have a record of it before we delete it. Make sure
2048        // we take a lock and make a transaction before we do so we don't race with other
2049        // operations.
2050        let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
2051        let mut transaction = store.filesystem().new_transaction(keys, Options::default()).await?;
2052
2053        let attribute_to_delete =
2054            match tree.find(&object_key).await?.ok_or(FxfsError::NotFound)?.value {
2055                ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => Some(id),
2056                ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
2057                _ => {
2058                    bail!(
2059                        anyhow!(FxfsError::Inconsistent)
2060                            .context("remove_extended_attribute: Expected ExtendedAttribute value")
2061                    )
2062                }
2063            };
2064
2065        transaction.add(
2066            store.store_object_id(),
2067            Mutation::replace_or_insert_object(object_key, ObjectValue::None),
2068        );
2069
2070        // If the attribute wasn't stored inline, we need to deallocate all the extents too. This
2071        // would normally need to interact with the graveyard for correctness - if there are too
2072        // many extents to delete to fit in a single transaction then we could potentially have
2073        // consistency issues. However, the maximum size of an extended attribute is small enough
2074        // that it will never come close to that limit even in the worst case, so we just delete
2075        // everything in one shot.
2076        if let Some(attribute_id) = attribute_to_delete {
2077            let trim_result = store
2078                .trim_some(
2079                    &mut transaction,
2080                    self.object_id(),
2081                    attribute_id,
2082                    TrimMode::FromOffset(0),
2083                )
2084                .await?;
2085            // In case you didn't read the comment above - this should not be used to delete
2086            // arbitrary attributes!
2087            assert_matches!(trim_result, TrimResult::Done(_));
2088            transaction.add(
2089                store.store_object_id(),
2090                Mutation::replace_or_insert_object(
2091                    ObjectKey::attribute(self.object_id, attribute_id, AttributeKey::Attribute),
2092                    ObjectValue::None,
2093                ),
2094            );
2095        }
2096
2097        transaction.commit().await?;
2098        Ok(())
2099    }
2100
2101    /// Returns a future that will pre-fetches the keys so as to avoid paying the performance
2102    /// penalty later. Must ensure that the object is not removed before the future completes.
2103    pub fn pre_fetch_keys(&self) -> Option<impl Future<Output = ()> + use<S>> {
2104        if let Encryption::CachedKeys = self.encryption {
2105            let owner = self.owner.clone();
2106            let object_id = self.object_id;
2107            Some(async move {
2108                let store = owner.as_ref().as_ref();
2109                if let Some(crypt) = store.crypt() {
2110                    let _ = store
2111                        .key_manager
2112                        .get_keys(
2113                            object_id,
2114                            crypt.as_ref(),
2115                            &mut Some(async || store.get_keys(object_id).await),
2116                            /* permanent= */ false,
2117                            /* force= */ false,
2118                        )
2119                        .await;
2120                }
2121            })
2122        } else {
2123            None
2124        }
2125    }
2126}
2127
2128impl<S: HandleOwner> Drop for StoreObjectHandle<S> {
2129    fn drop(&mut self) {
2130        if self.is_encrypted() {
2131            let _ = self.store().key_manager.remove(self.object_id);
2132        }
2133    }
2134}
2135
2136/// When truncating an object, sometimes it might not be possible to complete the transaction in a
2137/// single transaction, in which case the caller needs to finish trimming the object in subsequent
2138/// transactions (by calling ObjectStore::trim).
2139#[must_use]
2140pub struct NeedsTrim(pub bool);
2141
2142#[cfg(test)]
2143mod tests {
2144    use super::{ChecksumRangeChunk, OverwriteBitmaps};
2145    use crate::errors::FxfsError;
2146    use crate::filesystem::{FxFilesystem, OpenFxFilesystem};
2147    use crate::object_handle::ObjectHandle;
2148    use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
2149    use crate::object_store::transaction::{Mutation, Options, lock_keys};
2150    use crate::object_store::{
2151        AttributeKey, DataObjectHandle, Directory, FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions,
2152        LockKey, ObjectKey, ObjectStore, ObjectValue, SetExtendedAttributeMode, StoreObjectHandle,
2153    };
2154    use bit_vec::BitVec;
2155    use fuchsia_async as fasync;
2156    use futures::join;
2157    use std::sync::Arc;
2158    use storage_device::DeviceHolder;
2159    use storage_device::fake_device::FakeDevice;
2160
2161    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2162    const TEST_OBJECT_NAME: &str = "foo";
2163
2164    fn is_error(actual: anyhow::Error, expected: FxfsError) {
2165        assert_eq!(*actual.root_cause().downcast_ref::<FxfsError>().unwrap(), expected)
2166    }
2167
2168    async fn test_filesystem() -> OpenFxFilesystem {
2169        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2170        FxFilesystem::new_empty(device).await.expect("new_empty failed")
2171    }
2172
2173    async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
2174    {
2175        let fs = test_filesystem().await;
2176        let store = fs.root_store();
2177
2178        let mut transaction = fs
2179            .clone()
2180            .new_transaction(
2181                lock_keys![LockKey::object(
2182                    store.store_object_id(),
2183                    store.root_directory_object_id()
2184                )],
2185                Options::default(),
2186            )
2187            .await
2188            .expect("new_transaction failed");
2189
2190        let object =
2191            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2192                .await
2193                .expect("create_object failed");
2194
2195        let root_directory =
2196            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2197        root_directory
2198            .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
2199            .await
2200            .expect("add_child_file failed");
2201
2202        transaction.commit().await.expect("commit failed");
2203
2204        (fs, object)
2205    }
2206
2207    #[fuchsia::test(threads = 3)]
2208    async fn extended_attribute_double_remove() {
2209        // This test is intended to trip a potential race condition in remove. Removing an
2210        // attribute that doesn't exist is an error, so we need to check before we remove, but if
2211        // we aren't careful, two parallel removes might both succeed in the check and then both
2212        // remove the value.
2213        let (fs, object) = test_filesystem_and_empty_object().await;
2214        let basic = Arc::new(StoreObjectHandle::new(
2215            object.owner().clone(),
2216            object.object_id(),
2217            /* permanent_keys: */ false,
2218            HandleOptions::default(),
2219            false,
2220        ));
2221        let basic_a = basic.clone();
2222        let basic_b = basic.clone();
2223
2224        basic
2225            .set_extended_attribute(
2226                b"security.selinux".to_vec(),
2227                b"bar".to_vec(),
2228                SetExtendedAttributeMode::Set,
2229            )
2230            .await
2231            .expect("failed to set attribute");
2232
2233        // Try to remove the attribute twice at the same time. One should succeed in the race and
2234        // return Ok, and the other should fail the race and return NOT_FOUND.
2235        let a_task = fasync::Task::spawn(async move {
2236            basic_a.remove_extended_attribute(b"security.selinux".to_vec()).await
2237        });
2238        let b_task = fasync::Task::spawn(async move {
2239            basic_b.remove_extended_attribute(b"security.selinux".to_vec()).await
2240        });
2241        match join!(a_task, b_task) {
2242            (Ok(()), Ok(())) => panic!("both remove calls succeeded"),
2243            (Err(_), Err(_)) => panic!("both remove calls failed"),
2244
2245            (Ok(()), Err(e)) => is_error(e, FxfsError::NotFound),
2246            (Err(e), Ok(())) => is_error(e, FxfsError::NotFound),
2247        }
2248
2249        fs.close().await.expect("Close failed");
2250    }
2251
2252    #[fuchsia::test(threads = 3)]
2253    async fn extended_attribute_double_create() {
2254        // This test is intended to trip a potential race in set when using the create flag,
2255        // similar to above. If the create mode is set, we need to check that the attribute isn't
2256        // already created, but if two parallel creates both succeed in that check, and we aren't
2257        // careful with locking, they will both succeed and one will overwrite the other.
2258        let (fs, object) = test_filesystem_and_empty_object().await;
2259        let basic = Arc::new(StoreObjectHandle::new(
2260            object.owner().clone(),
2261            object.object_id(),
2262            /* permanent_keys: */ false,
2263            HandleOptions::default(),
2264            false,
2265        ));
2266        let basic_a = basic.clone();
2267        let basic_b = basic.clone();
2268
2269        // Try to set the attribute twice at the same time. One should succeed in the race and
2270        // return Ok, and the other should fail the race and return ALREADY_EXISTS.
2271        let a_task = fasync::Task::spawn(async move {
2272            basic_a
2273                .set_extended_attribute(
2274                    b"security.selinux".to_vec(),
2275                    b"one".to_vec(),
2276                    SetExtendedAttributeMode::Create,
2277                )
2278                .await
2279        });
2280        let b_task = fasync::Task::spawn(async move {
2281            basic_b
2282                .set_extended_attribute(
2283                    b"security.selinux".to_vec(),
2284                    b"two".to_vec(),
2285                    SetExtendedAttributeMode::Create,
2286                )
2287                .await
2288        });
2289        match join!(a_task, b_task) {
2290            (Ok(()), Ok(())) => panic!("both set calls succeeded"),
2291            (Err(_), Err(_)) => panic!("both set calls failed"),
2292
2293            (Ok(()), Err(e)) => {
2294                assert_eq!(
2295                    basic
2296                        .get_extended_attribute(b"security.selinux".to_vec())
2297                        .await
2298                        .expect("failed to get xattr"),
2299                    b"one"
2300                );
2301                is_error(e, FxfsError::AlreadyExists);
2302            }
2303            (Err(e), Ok(())) => {
2304                assert_eq!(
2305                    basic
2306                        .get_extended_attribute(b"security.selinux".to_vec())
2307                        .await
2308                        .expect("failed to get xattr"),
2309                    b"two"
2310                );
2311                is_error(e, FxfsError::AlreadyExists);
2312            }
2313        }
2314
2315        fs.close().await.expect("Close failed");
2316    }
2317
2318    struct TestAttr {
2319        name: Vec<u8>,
2320        value: Vec<u8>,
2321    }
2322
2323    impl TestAttr {
2324        fn new(name: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> Self {
2325            Self { name: name.as_ref().to_vec(), value: value.as_ref().to_vec() }
2326        }
2327        fn name(&self) -> Vec<u8> {
2328            self.name.clone()
2329        }
2330        fn value(&self) -> Vec<u8> {
2331            self.value.clone()
2332        }
2333    }
2334
2335    #[fuchsia::test]
2336    async fn extended_attributes() {
2337        let (fs, object) = test_filesystem_and_empty_object().await;
2338
2339        let test_attr = TestAttr::new(b"security.selinux", b"foo");
2340
2341        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2342        is_error(
2343            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2344            FxfsError::NotFound,
2345        );
2346
2347        object
2348            .set_extended_attribute(
2349                test_attr.name(),
2350                test_attr.value(),
2351                SetExtendedAttributeMode::Set,
2352            )
2353            .await
2354            .unwrap();
2355        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2356        assert_eq!(
2357            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2358            test_attr.value()
2359        );
2360
2361        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2362        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2363        is_error(
2364            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2365            FxfsError::NotFound,
2366        );
2367
2368        // Make sure we can object the same attribute being set again.
2369        object
2370            .set_extended_attribute(
2371                test_attr.name(),
2372                test_attr.value(),
2373                SetExtendedAttributeMode::Set,
2374            )
2375            .await
2376            .unwrap();
2377        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2378        assert_eq!(
2379            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2380            test_attr.value()
2381        );
2382
2383        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2384        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2385        is_error(
2386            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2387            FxfsError::NotFound,
2388        );
2389
2390        fs.close().await.expect("close failed");
2391    }
2392
2393    #[fuchsia::test]
2394    async fn large_extended_attribute() {
2395        let (fs, object) = test_filesystem_and_empty_object().await;
2396
2397        let test_attr = TestAttr::new(b"security.selinux", vec![3u8; 300]);
2398
2399        object
2400            .set_extended_attribute(
2401                test_attr.name(),
2402                test_attr.value(),
2403                SetExtendedAttributeMode::Set,
2404            )
2405            .await
2406            .unwrap();
2407        assert_eq!(
2408            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2409            test_attr.value()
2410        );
2411
2412        // Probe the fxfs attributes to make sure it did the expected thing. This relies on inside
2413        // knowledge of how the attribute id is chosen.
2414        assert_eq!(
2415            object
2416                .read_attr(64)
2417                .await
2418                .expect("read_attr failed")
2419                .expect("read_attr returned none")
2420                .into_vec(),
2421            test_attr.value()
2422        );
2423
2424        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2425        is_error(
2426            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2427            FxfsError::NotFound,
2428        );
2429
2430        // Make sure we can object the same attribute being set again.
2431        object
2432            .set_extended_attribute(
2433                test_attr.name(),
2434                test_attr.value(),
2435                SetExtendedAttributeMode::Set,
2436            )
2437            .await
2438            .unwrap();
2439        assert_eq!(
2440            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2441            test_attr.value()
2442        );
2443        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2444        is_error(
2445            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2446            FxfsError::NotFound,
2447        );
2448
2449        fs.close().await.expect("close failed");
2450    }
2451
2452    #[fuchsia::test]
2453    async fn multiple_extended_attributes() {
2454        let (fs, object) = test_filesystem_and_empty_object().await;
2455
2456        let attrs = [
2457            TestAttr::new(b"security.selinux", b"foo"),
2458            TestAttr::new(b"large.attribute", vec![3u8; 300]),
2459            TestAttr::new(b"an.attribute", b"asdf"),
2460            TestAttr::new(b"user.big", vec![5u8; 288]),
2461            TestAttr::new(b"user.tiny", b"smol"),
2462            TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2463            TestAttr::new(b"also big", vec![7u8; 500]),
2464            TestAttr::new(b"all.ones", vec![1u8; 11111]),
2465        ];
2466
2467        for i in 0..attrs.len() {
2468            object
2469                .set_extended_attribute(
2470                    attrs[i].name(),
2471                    attrs[i].value(),
2472                    SetExtendedAttributeMode::Set,
2473                )
2474                .await
2475                .unwrap();
2476            assert_eq!(
2477                object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2478                attrs[i].value()
2479            );
2480        }
2481
2482        for i in 0..attrs.len() {
2483            // Make sure expected attributes are still available.
2484            let mut found_attrs = object.list_extended_attributes().await.unwrap();
2485            let mut expected_attrs: Vec<Vec<u8>> = attrs.iter().skip(i).map(|a| a.name()).collect();
2486            found_attrs.sort();
2487            expected_attrs.sort();
2488            assert_eq!(found_attrs, expected_attrs);
2489            for j in i..attrs.len() {
2490                assert_eq!(
2491                    object.get_extended_attribute(attrs[j].name()).await.unwrap(),
2492                    attrs[j].value()
2493                );
2494            }
2495
2496            object.remove_extended_attribute(attrs[i].name()).await.expect("failed to remove");
2497            is_error(
2498                object.get_extended_attribute(attrs[i].name()).await.unwrap_err(),
2499                FxfsError::NotFound,
2500            );
2501        }
2502
2503        fs.close().await.expect("close failed");
2504    }
2505
2506    #[fuchsia::test]
2507    async fn multiple_extended_attributes_delete() {
2508        let (fs, object) = test_filesystem_and_empty_object().await;
2509        let store = object.owner().clone();
2510
2511        let attrs = [
2512            TestAttr::new(b"security.selinux", b"foo"),
2513            TestAttr::new(b"large.attribute", vec![3u8; 300]),
2514            TestAttr::new(b"an.attribute", b"asdf"),
2515            TestAttr::new(b"user.big", vec![5u8; 288]),
2516            TestAttr::new(b"user.tiny", b"smol"),
2517            TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2518            TestAttr::new(b"also big", vec![7u8; 500]),
2519            TestAttr::new(b"all.ones", vec![1u8; 11111]),
2520        ];
2521
2522        for i in 0..attrs.len() {
2523            object
2524                .set_extended_attribute(
2525                    attrs[i].name(),
2526                    attrs[i].value(),
2527                    SetExtendedAttributeMode::Set,
2528                )
2529                .await
2530                .unwrap();
2531            assert_eq!(
2532                object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2533                attrs[i].value()
2534            );
2535        }
2536
2537        // Unlink the file
2538        let root_directory =
2539            Directory::open(object.owner(), object.store().root_directory_object_id())
2540                .await
2541                .expect("open failed");
2542        let mut transaction = fs
2543            .clone()
2544            .new_transaction(
2545                lock_keys![
2546                    LockKey::object(store.store_object_id(), store.root_directory_object_id()),
2547                    LockKey::object(store.store_object_id(), object.object_id()),
2548                ],
2549                Options::default(),
2550            )
2551            .await
2552            .expect("new_transaction failed");
2553        crate::object_store::directory::replace_child(
2554            &mut transaction,
2555            None,
2556            (&root_directory, TEST_OBJECT_NAME),
2557        )
2558        .await
2559        .expect("replace_child failed");
2560        transaction.commit().await.unwrap();
2561        store.tombstone_object(object.object_id(), Options::default()).await.unwrap();
2562
2563        crate::fsck::fsck(fs.clone()).await.unwrap();
2564
2565        fs.close().await.expect("close failed");
2566    }
2567
2568    #[fuchsia::test]
2569    async fn extended_attribute_changing_sizes() {
2570        let (fs, object) = test_filesystem_and_empty_object().await;
2571
2572        let test_name = b"security.selinux";
2573        let test_small_attr = TestAttr::new(test_name, b"smol");
2574        let test_large_attr = TestAttr::new(test_name, vec![3u8; 300]);
2575
2576        object
2577            .set_extended_attribute(
2578                test_small_attr.name(),
2579                test_small_attr.value(),
2580                SetExtendedAttributeMode::Set,
2581            )
2582            .await
2583            .unwrap();
2584        assert_eq!(
2585            object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2586            test_small_attr.value()
2587        );
2588
2589        // With a small attribute, we don't expect it to write to an fxfs attribute.
2590        assert!(object.read_attr(64).await.expect("read_attr failed").is_none());
2591
2592        crate::fsck::fsck(fs.clone()).await.unwrap();
2593
2594        object
2595            .set_extended_attribute(
2596                test_large_attr.name(),
2597                test_large_attr.value(),
2598                SetExtendedAttributeMode::Set,
2599            )
2600            .await
2601            .unwrap();
2602        assert_eq!(
2603            object.get_extended_attribute(test_large_attr.name()).await.unwrap(),
2604            test_large_attr.value()
2605        );
2606
2607        // Once the value is above the threshold, we expect it to get upgraded to an fxfs
2608        // attribute.
2609        assert_eq!(
2610            object
2611                .read_attr(64)
2612                .await
2613                .expect("read_attr failed")
2614                .expect("read_attr returned none")
2615                .into_vec(),
2616            test_large_attr.value()
2617        );
2618
2619        crate::fsck::fsck(fs.clone()).await.unwrap();
2620
2621        object
2622            .set_extended_attribute(
2623                test_small_attr.name(),
2624                test_small_attr.value(),
2625                SetExtendedAttributeMode::Set,
2626            )
2627            .await
2628            .unwrap();
2629        assert_eq!(
2630            object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2631            test_small_attr.value()
2632        );
2633
2634        // Even though we are back under the threshold, we still expect it to be stored in an fxfs
2635        // attribute, because we don't downgrade to inline once we've allocated one.
2636        assert_eq!(
2637            object
2638                .read_attr(64)
2639                .await
2640                .expect("read_attr failed")
2641                .expect("read_attr returned none")
2642                .into_vec(),
2643            test_small_attr.value()
2644        );
2645
2646        crate::fsck::fsck(fs.clone()).await.unwrap();
2647
2648        object.remove_extended_attribute(test_small_attr.name()).await.expect("failed to remove");
2649
2650        crate::fsck::fsck(fs.clone()).await.unwrap();
2651
2652        fs.close().await.expect("close failed");
2653    }
2654
2655    #[fuchsia::test]
2656    async fn extended_attribute_max_size() {
2657        let (fs, object) = test_filesystem_and_empty_object().await;
2658
2659        let test_attr = TestAttr::new(
2660            vec![3u8; super::MAX_XATTR_NAME_SIZE],
2661            vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2662        );
2663
2664        object
2665            .set_extended_attribute(
2666                test_attr.name(),
2667                test_attr.value(),
2668                SetExtendedAttributeMode::Set,
2669            )
2670            .await
2671            .unwrap();
2672        assert_eq!(
2673            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2674            test_attr.value()
2675        );
2676        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2677        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2678
2679        fs.close().await.expect("close failed");
2680    }
2681
2682    #[fuchsia::test]
2683    async fn extended_attribute_remove_then_create() {
2684        let (fs, object) = test_filesystem_and_empty_object().await;
2685
2686        let test_attr = TestAttr::new(
2687            vec![3u8; super::MAX_XATTR_NAME_SIZE],
2688            vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2689        );
2690
2691        object
2692            .set_extended_attribute(
2693                test_attr.name(),
2694                test_attr.value(),
2695                SetExtendedAttributeMode::Create,
2696            )
2697            .await
2698            .unwrap();
2699        fs.journal().force_compact().await.unwrap();
2700        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2701        object
2702            .set_extended_attribute(
2703                test_attr.name(),
2704                test_attr.value(),
2705                SetExtendedAttributeMode::Create,
2706            )
2707            .await
2708            .unwrap();
2709
2710        assert_eq!(
2711            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2712            test_attr.value()
2713        );
2714
2715        fs.close().await.expect("close failed");
2716    }
2717
2718    #[fuchsia::test]
2719    async fn large_extended_attribute_max_number() {
2720        let (fs, object) = test_filesystem_and_empty_object().await;
2721
2722        let max_xattrs =
2723            super::EXTENDED_ATTRIBUTE_RANGE_END - super::EXTENDED_ATTRIBUTE_RANGE_START;
2724        for i in 0..max_xattrs {
2725            let test_attr = TestAttr::new(format!("{}", i).as_bytes(), vec![0x3; 300]);
2726            object
2727                .set_extended_attribute(
2728                    test_attr.name(),
2729                    test_attr.value(),
2730                    SetExtendedAttributeMode::Set,
2731                )
2732                .await
2733                .unwrap_or_else(|_| panic!("failed to set xattr number {}", i));
2734        }
2735
2736        // That should have taken up all the attributes we've allocated to extended attributes, so
2737        // this one should return ERR_NO_SPACE.
2738        match object
2739            .set_extended_attribute(
2740                b"one.too.many".to_vec(),
2741                vec![0x3; 300],
2742                SetExtendedAttributeMode::Set,
2743            )
2744            .await
2745        {
2746            Ok(()) => panic!("set should not succeed"),
2747            Err(e) => is_error(e, FxfsError::NoSpace),
2748        }
2749
2750        // But inline attributes don't need an attribute number, so it should work fine.
2751        object
2752            .set_extended_attribute(
2753                b"this.is.okay".to_vec(),
2754                b"small value".to_vec(),
2755                SetExtendedAttributeMode::Set,
2756            )
2757            .await
2758            .unwrap();
2759
2760        // And updating existing ones should be okay.
2761        object
2762            .set_extended_attribute(b"11".to_vec(), vec![0x4; 300], SetExtendedAttributeMode::Set)
2763            .await
2764            .unwrap();
2765        object
2766            .set_extended_attribute(
2767                b"12".to_vec(),
2768                vec![0x1; 300],
2769                SetExtendedAttributeMode::Replace,
2770            )
2771            .await
2772            .unwrap();
2773
2774        // And we should be able to remove an attribute and set another one.
2775        object.remove_extended_attribute(b"5".to_vec()).await.unwrap();
2776        object
2777            .set_extended_attribute(
2778                b"new attr".to_vec(),
2779                vec![0x3; 300],
2780                SetExtendedAttributeMode::Set,
2781            )
2782            .await
2783            .unwrap();
2784
2785        fs.close().await.expect("close failed");
2786    }
2787
2788    #[fuchsia::test]
2789    async fn write_attr_trims_beyond_new_end() {
2790        // When writing, multi_write will deallocate old extents that overlap with the new data,
2791        // but it doesn't trim anything beyond that, since it doesn't know what the total size will
2792        // be. write_attr does know, because it writes the whole attribute at once, so we need to
2793        // make sure it cleans up properly.
2794        let (fs, object) = test_filesystem_and_empty_object().await;
2795
2796        let block_size = fs.block_size();
2797        let buf_size = block_size * 2;
2798        let attribute_id = 10;
2799
2800        let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2801        let mut buffer = object.allocate_buffer(buf_size as usize).await;
2802        buffer.as_mut_slice().fill(3);
2803        // Writing two separate ranges, even if they are contiguous, forces them to be separate
2804        // extent records.
2805        object
2806            .multi_write(
2807                &mut transaction,
2808                attribute_id,
2809                &[0..block_size, block_size..block_size * 2],
2810                buffer.as_mut(),
2811            )
2812            .await
2813            .unwrap();
2814        transaction.add(
2815            object.store().store_object_id,
2816            Mutation::replace_or_insert_object(
2817                ObjectKey::attribute(object.object_id(), attribute_id, AttributeKey::Attribute),
2818                ObjectValue::attribute(block_size * 2, false),
2819            ),
2820        );
2821        transaction.commit().await.unwrap();
2822
2823        crate::fsck::fsck(fs.clone()).await.unwrap();
2824
2825        let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2826        let needs_trim = (*object)
2827            .write_attr(&mut transaction, attribute_id, &vec![3u8; block_size as usize])
2828            .await
2829            .unwrap();
2830        assert!(!needs_trim.0);
2831        transaction.commit().await.unwrap();
2832
2833        crate::fsck::fsck(fs.clone()).await.unwrap();
2834
2835        fs.close().await.expect("close failed");
2836    }
2837
2838    #[fuchsia::test]
2839    async fn write_new_attr_in_batches_multiple_txns() {
2840        let (fs, object) = test_filesystem_and_empty_object().await;
2841        let merkle_tree = vec![1; 3 * WRITE_ATTR_BATCH_SIZE];
2842        let mut transaction =
2843            (*object).new_transaction(FSVERITY_MERKLE_ATTRIBUTE_ID).await.unwrap();
2844        object
2845            .write_new_attr_in_batches(
2846                &mut transaction,
2847                FSVERITY_MERKLE_ATTRIBUTE_ID,
2848                &merkle_tree,
2849                WRITE_ATTR_BATCH_SIZE,
2850            )
2851            .await
2852            .expect("failed to write merkle attribute");
2853
2854        transaction.add(
2855            object.store().store_object_id,
2856            Mutation::replace_or_insert_object(
2857                ObjectKey::graveyard_attribute_entry(
2858                    object.store().graveyard_directory_object_id(),
2859                    object.object_id(),
2860                    FSVERITY_MERKLE_ATTRIBUTE_ID,
2861                ),
2862                ObjectValue::None,
2863            ),
2864        );
2865        transaction.commit().await.unwrap();
2866        assert_eq!(
2867            object.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
2868            Some(merkle_tree.into())
2869        );
2870
2871        fs.close().await.expect("close failed");
2872    }
2873
2874    // Running on target only, to use fake time features in the executor.
2875    #[cfg(target_os = "fuchsia")]
2876    #[fuchsia::test(allow_stalls = false)]
2877    async fn test_watchdog() {
2878        use super::Watchdog;
2879        use fuchsia_async::{MonotonicDuration, MonotonicInstant, TestExecutor};
2880        use std::sync::mpsc::channel;
2881
2882        TestExecutor::advance_to(make_time(0)).await;
2883        let (sender, receiver) = channel();
2884
2885        fn make_time(time_secs: i64) -> MonotonicInstant {
2886            MonotonicInstant::from_nanos(0) + MonotonicDuration::from_seconds(time_secs)
2887        }
2888
2889        {
2890            let _watchdog = Watchdog::new(10, move |count| {
2891                sender.send(count).expect("Sending value");
2892            });
2893
2894            // Too early.
2895            TestExecutor::advance_to(make_time(5)).await;
2896            receiver.try_recv().expect_err("Should not have message");
2897
2898            // First message.
2899            TestExecutor::advance_to(make_time(10)).await;
2900            assert_eq!(1, receiver.recv().expect("Receiving"));
2901
2902            // Too early for the next.
2903            TestExecutor::advance_to(make_time(15)).await;
2904            receiver.try_recv().expect_err("Should not have message");
2905
2906            // Missed one. They'll be spooled up.
2907            TestExecutor::advance_to(make_time(30)).await;
2908            assert_eq!(2, receiver.recv().expect("Receiving"));
2909            assert_eq!(3, receiver.recv().expect("Receiving"));
2910        }
2911
2912        // Watchdog is dropped, nothing should trigger.
2913        TestExecutor::advance_to(make_time(100)).await;
2914        receiver.recv().expect_err("Watchdog should be gone");
2915    }
2916
2917    #[fuchsia::test]
2918    fn test_checksum_range_chunk() {
2919        let block_size = 4096;
2920
2921        // No bitmap means one chunk that covers the whole range
2922        assert_eq!(
2923            ChecksumRangeChunk::group_first_write_ranges(
2924                &mut OverwriteBitmaps::None,
2925                block_size,
2926                block_size * 2..block_size * 5,
2927            ),
2928            vec![ChecksumRangeChunk {
2929                checksum_range: 0..3,
2930                device_range: block_size * 2..block_size * 5,
2931                is_first_write: false,
2932            }],
2933        );
2934
2935        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2936        assert_eq!(
2937            ChecksumRangeChunk::group_first_write_ranges(
2938                &mut bitmaps,
2939                block_size,
2940                block_size * 2..block_size * 5,
2941            ),
2942            vec![ChecksumRangeChunk {
2943                checksum_range: 0..3,
2944                device_range: block_size * 2..block_size * 5,
2945                is_first_write: false,
2946            }],
2947        );
2948        assert_eq!(
2949            bitmaps.take_bitmaps(),
2950            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b11100000])))
2951        );
2952
2953        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2954        bitmaps.set_offset(2);
2955        assert_eq!(
2956            ChecksumRangeChunk::group_first_write_ranges(
2957                &mut bitmaps,
2958                block_size,
2959                block_size * 2..block_size * 5,
2960            ),
2961            vec![
2962                ChecksumRangeChunk {
2963                    checksum_range: 0..2,
2964                    device_range: block_size * 2..block_size * 4,
2965                    is_first_write: false,
2966                },
2967                ChecksumRangeChunk {
2968                    checksum_range: 2..3,
2969                    device_range: block_size * 4..block_size * 5,
2970                    is_first_write: true,
2971                },
2972            ],
2973        );
2974        assert_eq!(
2975            bitmaps.take_bitmaps(),
2976            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00111000])))
2977        );
2978
2979        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2980        bitmaps.set_offset(4);
2981        assert_eq!(
2982            ChecksumRangeChunk::group_first_write_ranges(
2983                &mut bitmaps,
2984                block_size,
2985                block_size * 2..block_size * 5,
2986            ),
2987            vec![ChecksumRangeChunk {
2988                checksum_range: 0..3,
2989                device_range: block_size * 2..block_size * 5,
2990                is_first_write: true,
2991            }],
2992        );
2993        assert_eq!(
2994            bitmaps.take_bitmaps(),
2995            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00001110])))
2996        );
2997
2998        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b01010101]));
2999        assert_eq!(
3000            ChecksumRangeChunk::group_first_write_ranges(
3001                &mut bitmaps,
3002                block_size,
3003                block_size * 2..block_size * 10,
3004            ),
3005            vec![
3006                ChecksumRangeChunk {
3007                    checksum_range: 0..1,
3008                    device_range: block_size * 2..block_size * 3,
3009                    is_first_write: true,
3010                },
3011                ChecksumRangeChunk {
3012                    checksum_range: 1..2,
3013                    device_range: block_size * 3..block_size * 4,
3014                    is_first_write: false,
3015                },
3016                ChecksumRangeChunk {
3017                    checksum_range: 2..3,
3018                    device_range: block_size * 4..block_size * 5,
3019                    is_first_write: true,
3020                },
3021                ChecksumRangeChunk {
3022                    checksum_range: 3..4,
3023                    device_range: block_size * 5..block_size * 6,
3024                    is_first_write: false,
3025                },
3026                ChecksumRangeChunk {
3027                    checksum_range: 4..5,
3028                    device_range: block_size * 6..block_size * 7,
3029                    is_first_write: true,
3030                },
3031                ChecksumRangeChunk {
3032                    checksum_range: 5..6,
3033                    device_range: block_size * 7..block_size * 8,
3034                    is_first_write: false,
3035                },
3036                ChecksumRangeChunk {
3037                    checksum_range: 6..7,
3038                    device_range: block_size * 8..block_size * 9,
3039                    is_first_write: true,
3040                },
3041                ChecksumRangeChunk {
3042                    checksum_range: 7..8,
3043                    device_range: block_size * 9..block_size * 10,
3044                    is_first_write: false,
3045                },
3046            ],
3047        );
3048        assert_eq!(
3049            bitmaps.take_bitmaps(),
3050            Some((BitVec::from_bytes(&[0b01010101]), BitVec::from_bytes(&[0b11111111])))
3051        );
3052    }
3053}