Skip to main content

fxfs/object_store/
store_object_handle.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::{Checksum, Checksums, fletcher64};
6use crate::errors::FxfsError;
7use crate::log::*;
8use crate::lsm_tree::Query;
9use crate::lsm_tree::merge::MergerIterator;
10use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
11use crate::object_handle::ObjectHandle;
12use crate::object_store::extent_record::{ExtentKey, ExtentMode, ExtentValue};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::object_record::{
15    AttributeKey, ExtendedAttributeValue, ObjectAttributes, ObjectItem, ObjectKey, ObjectKeyData,
16    ObjectValue, Timestamp,
17};
18use crate::object_store::transaction::{
19    AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Options, ReadGuard,
20    Transaction, lock_keys,
21};
22use crate::object_store::{
23    HandleOptions, HandleOwner, ObjectStore, TrimMode, TrimResult, VOLUME_DATA_KEY_ID,
24};
25use crate::range::RangeExt;
26use crate::round::{round_down, round_up};
27use anyhow::{Context, Error, anyhow, bail, ensure};
28use assert_matches::assert_matches;
29use bit_vec::BitVec;
30use futures::stream::{FuturesOrdered, FuturesUnordered};
31use futures::{TryStreamExt, try_join};
32use fxfs_crypto::{
33    Cipher, CipherHolder, CipherSet, EncryptionKey, FindKeyResult, FxfsCipher, KeyPurpose,
34};
35use fxfs_trace::{TraceFutureExt, trace, trace_future_args};
36use static_assertions::const_assert;
37use std::cmp::min;
38use std::future::Future;
39use std::ops::Range;
40use std::sync::Arc;
41use std::sync::atomic::{self, AtomicBool, Ordering};
42use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
43use storage_device::{InlineCryptoOptions, ReadOptions, WriteOptions};
44
45use fidl_fuchsia_io as fio;
46use fuchsia_async as fasync;
47
48/// Maximum size for an extended attribute name.
49pub const MAX_XATTR_NAME_SIZE: usize = 255;
50/// Maximum size an extended attribute can be before it's stored in an object attribute instead of
51/// inside the record directly.
52pub const MAX_INLINE_XATTR_SIZE: usize = 256;
53/// Maximum size for an extended attribute value. NB: the maximum size for an extended attribute is
54/// 64kB, which we rely on for correctness when deleting attributes, so ensure it's always
55/// enforced.
56pub const MAX_XATTR_VALUE_SIZE: usize = 64000;
57/// The range of fxfs attribute ids which are reserved for extended attribute values. Whenever a
58/// new attribute is needed, the first unused id will be chosen from this range. It's technically
59/// safe to change these values, but it has potential consequences - they are only used during id
60/// selection, so any existing extended attributes keep their ids, which means any past or present
61/// selected range here could potentially have used attributes unless they are explicitly migrated,
62/// which isn't currently done.
63pub const EXTENDED_ATTRIBUTE_RANGE_START: u64 = 64;
64pub const EXTENDED_ATTRIBUTE_RANGE_END: u64 = 512;
65
66/// Zeroes blocks in 'buffer' based on `bitmap`, one bit per block from start of buffer.
67fn apply_bitmap_zeroing(
68    block_size: usize,
69    bitmap: &bit_vec::BitVec,
70    mut buffer: MutableBufferRef<'_>,
71) {
72    let buf = buffer.as_mut_slice();
73    debug_assert_eq!(bitmap.len() * block_size, buf.len());
74    for (i, block) in bitmap.iter().enumerate() {
75        if !block {
76            let start = i * block_size;
77            buf[start..start + block_size].fill(0);
78        }
79    }
80}
81
82/// When writing, often the logic should be generic over whether or not checksums are generated.
83/// This provides that and a handy way to convert to the more general ExtentMode that eventually
84/// stores it on disk.
85#[derive(Debug, Clone, PartialEq)]
86pub enum MaybeChecksums {
87    None,
88    Fletcher(Vec<Checksum>),
89}
90
91impl MaybeChecksums {
92    pub fn maybe_as_ref(&self) -> Option<&[Checksum]> {
93        match self {
94            Self::None => None,
95            Self::Fletcher(sums) => Some(&sums),
96        }
97    }
98
99    pub fn split_off(&mut self, at: usize) -> Self {
100        match self {
101            Self::None => Self::None,
102            Self::Fletcher(sums) => Self::Fletcher(sums.split_off(at)),
103        }
104    }
105
106    pub fn to_mode(self) -> ExtentMode {
107        match self {
108            Self::None => ExtentMode::Raw,
109            Self::Fletcher(sums) => ExtentMode::Cow(Checksums::fletcher(sums)),
110        }
111    }
112
113    pub fn into_option(self) -> Option<Vec<Checksum>> {
114        match self {
115            Self::None => None,
116            Self::Fletcher(sums) => Some(sums),
117        }
118    }
119}
120
121/// The mode of operation when setting extended attributes. This is the same as the fidl definition
122/// but is replicated here so we don't have fuchsia.io structures in the api, so this can be used
123/// on host.
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125pub enum SetExtendedAttributeMode {
126    /// Create the extended attribute if it doesn't exist, replace the value if it does.
127    Set,
128    /// Create the extended attribute if it doesn't exist, fail if it does.
129    Create,
130    /// Replace the extended attribute value if it exists, fail if it doesn't.
131    Replace,
132}
133
134impl From<fio::SetExtendedAttributeMode> for SetExtendedAttributeMode {
135    fn from(other: fio::SetExtendedAttributeMode) -> SetExtendedAttributeMode {
136        match other {
137            fio::SetExtendedAttributeMode::Set => SetExtendedAttributeMode::Set,
138            fio::SetExtendedAttributeMode::Create => SetExtendedAttributeMode::Create,
139            fio::SetExtendedAttributeMode::Replace => SetExtendedAttributeMode::Replace,
140        }
141    }
142}
143
144enum Encryption {
145    /// The object doesn't use encryption.
146    None,
147
148    /// The object has keys that are cached (which means unwrapping occurs on-demand) with
149    /// KeyManager.
150    CachedKeys,
151
152    /// The object has permanent keys registered with KeyManager.
153    PermanentKeys,
154}
155
156#[derive(PartialEq, Debug)]
157enum OverwriteBitmaps {
158    None,
159    Some {
160        /// The block bitmap of a partial overwrite extent in the tree.
161        extent_bitmap: BitVec,
162        /// A bitmap of the blocks written to by the current overwrite.
163        write_bitmap: BitVec,
164        /// BitVec doesn't have a slice equivalent, so for a particular section of the write we
165        /// keep track of an offset in the bitmaps to operate on.
166        bitmap_offset: usize,
167    },
168}
169
170impl OverwriteBitmaps {
171    fn new(extent_bitmap: BitVec) -> Self {
172        OverwriteBitmaps::Some {
173            write_bitmap: BitVec::from_elem(extent_bitmap.len(), false),
174            extent_bitmap,
175            bitmap_offset: 0,
176        }
177    }
178
179    fn is_none(&self) -> bool {
180        *self == OverwriteBitmaps::None
181    }
182
183    fn set_offset(&mut self, new_offset: usize) {
184        match self {
185            OverwriteBitmaps::None => (),
186            OverwriteBitmaps::Some { bitmap_offset, .. } => *bitmap_offset = new_offset,
187        }
188    }
189
190    fn get_from_extent_bitmap(&self, i: usize) -> Option<bool> {
191        match self {
192            OverwriteBitmaps::None => None,
193            OverwriteBitmaps::Some { extent_bitmap, bitmap_offset, .. } => {
194                extent_bitmap.get(*bitmap_offset + i)
195            }
196        }
197    }
198
199    fn set_in_write_bitmap(&mut self, i: usize, x: bool) {
200        match self {
201            OverwriteBitmaps::None => (),
202            OverwriteBitmaps::Some { write_bitmap, bitmap_offset, .. } => {
203                write_bitmap.set(*bitmap_offset + i, x)
204            }
205        }
206    }
207
208    fn take_bitmaps(self) -> Option<(BitVec, BitVec)> {
209        match self {
210            OverwriteBitmaps::None => None,
211            OverwriteBitmaps::Some { extent_bitmap, write_bitmap, .. } => {
212                Some((extent_bitmap, write_bitmap))
213            }
214        }
215    }
216}
217
218/// When writing to Overwrite ranges, we need to emit whether a set of checksums for a device range
219/// is the first write to that region or not. This tracks one such range so we can use it after the
220/// write to break up the returned checksum list.
221#[derive(PartialEq, Debug)]
222struct ChecksumRangeChunk {
223    checksum_range: Range<usize>,
224    device_range: Range<u64>,
225    is_first_write: bool,
226}
227
228impl ChecksumRangeChunk {
229    fn group_first_write_ranges(
230        bitmaps: &mut OverwriteBitmaps,
231        block_size: u64,
232        write_device_range: Range<u64>,
233    ) -> Vec<ChecksumRangeChunk> {
234        let write_block_len = (write_device_range.length().unwrap() / block_size) as usize;
235        if bitmaps.is_none() {
236            // If there is no bitmap, then the overwrite range is fully written to. However, we
237            // could still be within the journal flush window where one of the blocks was written
238            // to for the first time to put it in this state, so we still need to emit the
239            // checksums in case replay needs them.
240            vec![ChecksumRangeChunk {
241                checksum_range: 0..write_block_len,
242                device_range: write_device_range,
243                is_first_write: false,
244            }]
245        } else {
246            let mut checksum_ranges = vec![ChecksumRangeChunk {
247                checksum_range: 0..0,
248                device_range: write_device_range.start..write_device_range.start,
249                is_first_write: !bitmaps.get_from_extent_bitmap(0).unwrap(),
250            }];
251            let mut working_range = checksum_ranges.last_mut().unwrap();
252            for i in 0..write_block_len {
253                bitmaps.set_in_write_bitmap(i, true);
254
255                // bitmap.get returning true means the block is initialized and therefore has been
256                // written to before.
257                if working_range.is_first_write != bitmaps.get_from_extent_bitmap(i).unwrap() {
258                    // is_first_write is tracking opposite of what comes back from the bitmap, so
259                    // if the are still opposites we continue our current range.
260                    working_range.checksum_range.end += 1;
261                    working_range.device_range.end += block_size;
262                } else {
263                    // If they are the same, then we need to make a new chunk.
264                    let new_chunk = ChecksumRangeChunk {
265                        checksum_range: working_range.checksum_range.end
266                            ..working_range.checksum_range.end + 1,
267                        device_range: working_range.device_range.end
268                            ..working_range.device_range.end + block_size,
269                        is_first_write: !working_range.is_first_write,
270                    };
271                    checksum_ranges.push(new_chunk);
272                    working_range = checksum_ranges.last_mut().unwrap();
273                }
274            }
275            checksum_ranges
276        }
277    }
278}
279
280/// StoreObjectHandle is the lowest-level, untyped handle to an object with the id [`object_id`] in
281/// a particular store, [`owner`]. It provides functionality shared across all objects, such as
282/// reading and writing attributes and managing encryption keys.
283///
284/// Since it's untyped, it doesn't do any object kind validation, and is generally meant to
285/// implement higher-level typed handles.
286///
287/// For file-like objects with a data attribute, DataObjectHandle implements traits and helpers for
288/// doing more complex extent management and caches the content size.
289///
290/// For directory-like objects, Directory knows how to add and remove child objects and enumerate
291/// its children.
292pub struct StoreObjectHandle<S: HandleOwner> {
293    owner: Arc<S>,
294    object_id: u64,
295    options: HandleOptions,
296    trace: AtomicBool,
297    encryption: Encryption,
298}
299
300impl<S: HandleOwner> ObjectHandle for StoreObjectHandle<S> {
301    fn set_trace(&self, v: bool) {
302        info!(store_id = self.store().store_object_id, oid = self.object_id(), trace = v; "trace");
303        self.trace.store(v, atomic::Ordering::Relaxed);
304    }
305
306    fn object_id(&self) -> u64 {
307        return self.object_id;
308    }
309
310    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
311        self.store().device.allocate_buffer(size)
312    }
313
314    fn block_size(&self) -> u64 {
315        self.store().block_size()
316    }
317}
318
319struct Watchdog {
320    _task: fasync::Task<()>,
321}
322
323impl Watchdog {
324    fn new(increment_seconds: u64, cb: impl Fn(u64) + Send + 'static) -> Self {
325        Self {
326            _task: fasync::Task::spawn(
327                async move {
328                    let increment = increment_seconds.try_into().unwrap();
329                    let mut fired_counter = 0;
330                    let mut next_wake = fasync::MonotonicInstant::now();
331                    loop {
332                        next_wake += std::time::Duration::from_secs(increment).into();
333                        // If this isn't being scheduled this will purposely result in fast looping
334                        // when it does. This will be insightful about the state of the thread and
335                        // task scheduling.
336                        if fasync::MonotonicInstant::now() < next_wake {
337                            fasync::Timer::new(next_wake).await;
338                        }
339                        fired_counter += 1;
340                        cb(fired_counter);
341                    }
342                }
343                .trace(trace_future_args!("StoreObjectHandle::Watchdog")),
344            ),
345        }
346    }
347}
348
349impl<S: HandleOwner> StoreObjectHandle<S> {
350    /// Make a new StoreObjectHandle for the object with id [`object_id`] in store [`owner`].
351    pub fn new(
352        owner: Arc<S>,
353        object_id: u64,
354        permanent_keys: bool,
355        options: HandleOptions,
356        trace: bool,
357    ) -> Self {
358        let encryption = if permanent_keys {
359            Encryption::PermanentKeys
360        } else if owner.as_ref().as_ref().is_encrypted() {
361            Encryption::CachedKeys
362        } else {
363            Encryption::None
364        };
365        Self { owner, object_id, encryption, options, trace: AtomicBool::new(trace) }
366    }
367
368    pub fn owner(&self) -> &Arc<S> {
369        &self.owner
370    }
371
372    pub fn store(&self) -> &ObjectStore {
373        self.owner.as_ref().as_ref()
374    }
375
376    pub fn trace(&self) -> bool {
377        self.trace.load(atomic::Ordering::Relaxed)
378    }
379
380    pub fn is_encrypted(&self) -> bool {
381        !matches!(self.encryption, Encryption::None)
382    }
383
384    /// Get the default set of transaction options for this object. This is mostly the overall
385    /// default, modified by any [`HandleOptions`] held by this handle.
386    pub fn default_transaction_options<'b>(&self) -> Options<'b> {
387        Options { skip_journal_checks: self.options.skip_journal_checks, ..Default::default() }
388    }
389
390    pub async fn new_transaction_with_options<'b>(
391        &self,
392        attribute_id: u64,
393        options: Options<'b>,
394    ) -> Result<Transaction<'b>, Error> {
395        Ok(self
396            .store()
397            .filesystem()
398            .new_transaction(
399                lock_keys![
400                    LockKey::object_attribute(
401                        self.store().store_object_id(),
402                        self.object_id(),
403                        attribute_id,
404                    ),
405                    LockKey::object(self.store().store_object_id(), self.object_id()),
406                ],
407                options,
408            )
409            .await?)
410    }
411
412    pub async fn new_transaction<'b>(&self, attribute_id: u64) -> Result<Transaction<'b>, Error> {
413        self.new_transaction_with_options(attribute_id, self.default_transaction_options()).await
414    }
415
416    // If |transaction| has an impending mutation for the underlying object, returns that.
417    // Otherwise, looks up the object from the tree.
418    async fn txn_get_object_mutation(
419        &self,
420        transaction: &Transaction<'_>,
421    ) -> Result<ObjectStoreMutation, Error> {
422        self.store().txn_get_object_mutation(transaction, self.object_id()).await
423    }
424
425    // Returns the amount deallocated.
426    async fn deallocate_old_extents(
427        &self,
428        transaction: &mut Transaction<'_>,
429        attribute_id: u64,
430        range: Range<u64>,
431    ) -> Result<u64, Error> {
432        let block_size = self.block_size();
433        assert_eq!(range.start % block_size, 0);
434        assert_eq!(range.end % block_size, 0);
435        if range.start == range.end {
436            return Ok(0);
437        }
438        let tree = &self.store().tree;
439        let layer_set = tree.layer_set();
440        let key = ExtentKey { range };
441        let lower_bound = ObjectKey::attribute(
442            self.object_id(),
443            attribute_id,
444            AttributeKey::Extent(key.search_key()),
445        );
446        let mut merger = layer_set.merger();
447        let mut iter = merger.query(Query::FullRange(&lower_bound)).await?;
448        let allocator = self.store().allocator();
449        let mut deallocated = 0;
450        let trace = self.trace();
451        while let Some(ItemRef {
452            key:
453                ObjectKey {
454                    object_id,
455                    data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
456                },
457            value: ObjectValue::Extent(value),
458            ..
459        }) = iter.get()
460        {
461            if *object_id != self.object_id() || *attr_id != attribute_id {
462                break;
463            }
464            if let ExtentValue::Some { device_offset, .. } = value {
465                if let Some(overlap) = key.overlap(extent_key) {
466                    let range = device_offset + overlap.start - extent_key.range.start
467                        ..device_offset + overlap.end - extent_key.range.start;
468                    ensure!(range.is_aligned(block_size), FxfsError::Inconsistent);
469                    if trace {
470                        info!(
471                            store_id = self.store().store_object_id(),
472                            oid = self.object_id(),
473                            device_range:? = range,
474                            len = range.end - range.start,
475                            extent_key:?;
476                            "D",
477                        );
478                    }
479                    allocator
480                        .deallocate(transaction, self.store().store_object_id(), range)
481                        .await?;
482                    deallocated += overlap.end - overlap.start;
483                } else {
484                    break;
485                }
486            }
487            iter.advance().await?;
488        }
489        Ok(deallocated)
490    }
491
492    // Writes aligned data (that should already be encrypted) to the given offset and computes
493    // checksums if requested. The aligned data must be from a single logical file range.
494    async fn write_aligned(
495        &self,
496        buf: BufferRef<'_>,
497        device_offset: u64,
498        crypt_ctx: Option<(u32, u8)>,
499    ) -> Result<MaybeChecksums, Error> {
500        if self.trace() {
501            info!(
502                store_id = self.store().store_object_id(),
503                oid = self.object_id(),
504                device_range:? = (device_offset..device_offset + buf.len() as u64),
505                len = buf.len();
506                "W",
507            );
508        }
509        let store = self.store();
510        store.device_write_ops.fetch_add(1, Ordering::Relaxed);
511        let mut checksums = Vec::new();
512        let _watchdog = Watchdog::new(10, |count| {
513            warn!("Write has been stalled for {} seconds", count * 10);
514        });
515
516        match crypt_ctx {
517            Some((dun, slot)) => {
518                if !store.filesystem().options().barriers_enabled {
519                    return Err(anyhow!(FxfsError::InvalidArgs)
520                        .context("Barriers must be enabled for inline encrypted writes."));
521                }
522                store
523                    .device
524                    .write_with_opts(
525                        device_offset as u64,
526                        buf,
527                        WriteOptions {
528                            inline_crypto: InlineCryptoOptions::enabled(slot, dun),
529                            ..Default::default()
530                        },
531                    )
532                    .await?;
533                Ok(MaybeChecksums::None)
534            }
535            None => {
536                if self.options.skip_checksums {
537                    store
538                        .device
539                        .write_with_opts(device_offset as u64, buf, WriteOptions::default())
540                        .await?;
541                    Ok(MaybeChecksums::None)
542                } else {
543                    try_join!(store.device.write(device_offset, buf), async {
544                        let block_size = self.block_size();
545                        for chunk in buf.as_slice().chunks_exact(block_size as usize) {
546                            checksums.push(fletcher64(chunk, 0));
547                        }
548                        Ok(())
549                    })?;
550                    Ok(MaybeChecksums::Fletcher(checksums))
551                }
552            }
553        }
554    }
555
556    /// Flushes the underlying device.  This is expensive and should be used sparingly.
557    pub async fn flush_device(&self) -> Result<(), Error> {
558        self.store().device().flush().await
559    }
560
561    pub async fn update_allocated_size(
562        &self,
563        transaction: &mut Transaction<'_>,
564        allocated: u64,
565        deallocated: u64,
566    ) -> Result<(), Error> {
567        if allocated == deallocated {
568            return Ok(());
569        }
570        let mut mutation = self.txn_get_object_mutation(transaction).await?;
571        if let ObjectValue::Object {
572            attributes: ObjectAttributes { project_id, allocated_size, .. },
573            ..
574        } = &mut mutation.item.value
575        {
576            // The only way for these to fail are if the volume is inconsistent.
577            *allocated_size = allocated_size
578                .checked_add(allocated)
579                .ok_or_else(|| anyhow!(FxfsError::Inconsistent).context("Allocated size overflow"))?
580                .checked_sub(deallocated)
581                .ok_or_else(|| {
582                    anyhow!(FxfsError::Inconsistent).context("Allocated size underflow")
583                })?;
584
585            if *project_id != 0 {
586                // The allocated and deallocated shouldn't exceed the max size of the file which is
587                // bound within i64.
588                let diff = i64::try_from(allocated).unwrap() - i64::try_from(deallocated).unwrap();
589                transaction.add(
590                    self.store().store_object_id(),
591                    Mutation::merge_object(
592                        ObjectKey::project_usage(
593                            self.store().root_directory_object_id(),
594                            *project_id,
595                        ),
596                        ObjectValue::BytesAndNodes { bytes: diff, nodes: 0 },
597                    ),
598                );
599            }
600        } else {
601            // This can occur when the object mutation is created from an object in the tree which
602            // was corrupt.
603            bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
604        }
605        transaction.add(self.store().store_object_id, Mutation::ObjectStore(mutation));
606        Ok(())
607    }
608
609    pub async fn update_attributes<'a>(
610        &self,
611        transaction: &mut Transaction<'a>,
612        node_attributes: Option<&fio::MutableNodeAttributes>,
613        change_time: Option<Timestamp>,
614    ) -> Result<(), Error> {
615        if let Some(&fio::MutableNodeAttributes { selinux_context: Some(ref context), .. }) =
616            node_attributes
617        {
618            if let fio::SelinuxContext::Data(context) = context {
619                self.set_extended_attribute_impl(
620                    "security.selinux".into(),
621                    context.clone(),
622                    SetExtendedAttributeMode::Set,
623                    transaction,
624                )
625                .await?;
626            } else {
627                return Err(anyhow!(FxfsError::InvalidArgs)
628                    .context("Only set SELinux context with `data` member."));
629            }
630        }
631        self.store()
632            .update_attributes(transaction, self.object_id, node_attributes, change_time)
633            .await
634    }
635
636    /// Zeroes the given range.  The range must be aligned.  Returns the amount of data deallocated.
637    pub async fn zero(
638        &self,
639        transaction: &mut Transaction<'_>,
640        attribute_id: u64,
641        range: Range<u64>,
642    ) -> Result<(), Error> {
643        let deallocated =
644            self.deallocate_old_extents(transaction, attribute_id, range.clone()).await?;
645        if deallocated > 0 {
646            self.update_allocated_size(transaction, 0, deallocated).await?;
647            transaction.add(
648                self.store().store_object_id,
649                Mutation::merge_object(
650                    ObjectKey::extent(self.object_id(), attribute_id, range),
651                    ObjectValue::Extent(ExtentValue::deleted_extent()),
652                ),
653            );
654        }
655        Ok(())
656    }
657
658    // Returns a new aligned buffer (reading the head and tail blocks if necessary) with a copy of
659    // the data from `buf`.
660    pub async fn align_buffer(
661        &self,
662        attribute_id: u64,
663        offset: u64,
664        buf: BufferRef<'_>,
665    ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
666        let block_size = self.block_size();
667        let end = offset + buf.len() as u64;
668        let aligned =
669            round_down(offset, block_size)..round_up(end, block_size).ok_or(FxfsError::TooBig)?;
670
671        let mut aligned_buf =
672            self.store().device.allocate_buffer((aligned.end - aligned.start) as usize).await;
673
674        // Deal with head alignment.
675        if aligned.start < offset {
676            let mut head_block = aligned_buf.subslice_mut(..block_size as usize);
677            let read = self.read(attribute_id, aligned.start, head_block.reborrow()).await?;
678            head_block.as_mut_slice()[read..].fill(0);
679        }
680
681        // Deal with tail alignment.
682        if aligned.end > end {
683            let end_block_offset = aligned.end - block_size;
684            // There's no need to read the tail block if we read it as part of the head block.
685            if offset <= end_block_offset {
686                let mut tail_block =
687                    aligned_buf.subslice_mut(aligned_buf.len() - block_size as usize..);
688                let read = self.read(attribute_id, end_block_offset, tail_block.reborrow()).await?;
689                tail_block.as_mut_slice()[read..].fill(0);
690            }
691        }
692
693        aligned_buf.as_mut_slice()
694            [(offset - aligned.start) as usize..(end - aligned.start) as usize]
695            .copy_from_slice(buf.as_slice());
696
697        Ok((aligned, aligned_buf))
698    }
699
700    /// Trim an attribute's extents, potentially adding a graveyard trim entry if more trimming is
701    /// needed, so the transaction can be committed without worrying about leaking data.
702    ///
703    /// This doesn't update the size stored in the attribute value - the caller is responsible for
704    /// doing that to keep the size up to date.
705    pub async fn shrink(
706        &self,
707        transaction: &mut Transaction<'_>,
708        attribute_id: u64,
709        size: u64,
710    ) -> Result<NeedsTrim, Error> {
711        let store = self.store();
712        let needs_trim = matches!(
713            store
714                .trim_some(transaction, self.object_id(), attribute_id, TrimMode::FromOffset(size))
715                .await?,
716            TrimResult::Incomplete
717        );
718        if needs_trim {
719            // Add the object to the graveyard in case the following transactions don't get
720            // replayed.
721            let graveyard_id = store.graveyard_directory_object_id();
722            match store
723                .tree
724                .find(&ObjectKey::graveyard_entry(graveyard_id, self.object_id()))
725                .await?
726            {
727                Some(ObjectItem { value: ObjectValue::Some, .. })
728                | Some(ObjectItem { value: ObjectValue::Trim, .. }) => {
729                    // This object is already in the graveyard so we don't need to do anything.
730                }
731                _ => {
732                    transaction.add(
733                        store.store_object_id,
734                        Mutation::replace_or_insert_object(
735                            ObjectKey::graveyard_entry(graveyard_id, self.object_id()),
736                            ObjectValue::Trim,
737                        ),
738                    );
739                }
740            }
741        }
742        Ok(NeedsTrim(needs_trim))
743    }
744
745    /// Reads and decrypts a singular logical range.
746    pub async fn read_and_decrypt(
747        &self,
748        device_offset: u64,
749        file_offset: u64,
750        mut buffer: MutableBufferRef<'_>,
751        key_id: u64,
752    ) -> Result<(), Error> {
753        let store = self.store();
754        store.device_read_ops.fetch_add(1, Ordering::Relaxed);
755
756        let _watchdog = Watchdog::new(10, |count| {
757            warn!("Read has been stalled for {} seconds", count * 10);
758        });
759
760        let (_key_id, key) = self.get_key(Some(key_id)).await?;
761        if let Some(key) = key {
762            if let Some((dun, slot)) = key.crypt_ctx(self.object_id, file_offset) {
763                store
764                    .device
765                    .read_with_opts(
766                        device_offset as u64,
767                        buffer.reborrow(),
768                        ReadOptions { inline_crypto: InlineCryptoOptions::enabled(slot, dun) },
769                    )
770                    .await?;
771            } else {
772                store.device.read(device_offset, buffer.reborrow()).await?;
773                key.decrypt(self.object_id, device_offset, file_offset, buffer.as_mut_slice())?;
774            }
775        } else {
776            store.device.read(device_offset, buffer.reborrow()).await?;
777        }
778
779        Ok(())
780    }
781
782    /// Returns the specified key. If `key_id` is None, it will try and return the fscrypt key if
783    /// it is present, or the volume key if it isn't. If the fscrypt key is present, but the key
784    /// cannot be unwrapped, then this will return `FxfsError::NoKey`. If the volume is not
785    /// encrypted, this returns None.
786    pub async fn get_key(
787        &self,
788        key_id: Option<u64>,
789    ) -> Result<(u64, Option<Arc<dyn Cipher>>), Error> {
790        let store = self.store();
791        let result = match self.encryption {
792            Encryption::None => (VOLUME_DATA_KEY_ID, None),
793            Encryption::CachedKeys => {
794                if let Some(key_id) = key_id {
795                    (
796                        key_id,
797                        Some(
798                            store
799                                .key_manager
800                                .get_key(
801                                    self.object_id,
802                                    store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
803                                    async || store.get_keys(self.object_id).await,
804                                    key_id,
805                                )
806                                .await?,
807                        ),
808                    )
809                } else {
810                    let (key_id, key) = store
811                        .key_manager
812                        .get_fscrypt_key_if_present(
813                            self.object_id,
814                            store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
815                            async || store.get_keys(self.object_id).await,
816                        )
817                        .await?;
818                    (key_id, Some(key))
819                }
820            }
821            Encryption::PermanentKeys => {
822                (VOLUME_DATA_KEY_ID, Some(store.key_manager.get(self.object_id).await?.unwrap()))
823            }
824        };
825
826        // Ensure that if the key we receive uses inline encryption, barriers should be enabled.
827        if let Some(ref key) = result.1 {
828            if key.crypt_ctx(self.object_id, 0).is_some() {
829                if !store.filesystem().options().barriers_enabled {
830                    return Err(anyhow!(FxfsError::InvalidArgs)
831                        .context("Barriers must be enabled for inline encrypted writes."));
832                }
833            }
834        }
835
836        Ok(result)
837    }
838
839    /// This will only work for a non-permanent volume data key. This is designed to be used with
840    /// extended attributes where we'll only create the key on demand for directories and encrypted
841    /// files.
842    async fn get_or_create_key(
843        &self,
844        transaction: &mut Transaction<'_>,
845    ) -> Result<Arc<dyn Cipher>, Error> {
846        let store = self.store();
847
848        // Fast path: try and get keys from the cache.
849        if let Some(key) = store.key_manager.get(self.object_id).await.context("get failed")? {
850            return Ok(key);
851        }
852
853        let crypt = store.crypt().ok_or_else(|| anyhow!("No crypt!"))?;
854
855        // Next, see if the keys are already created.
856        let (mut encryption_keys, mut cipher_set) = if let Some(item) =
857            store.tree.find(&ObjectKey::keys(self.object_id)).await.context("find failed")?
858        {
859            if let ObjectValue::Keys(encryption_keys) = item.value {
860                let cipher_set = store
861                    .key_manager
862                    .get_keys(
863                        self.object_id,
864                        crypt.as_ref(),
865                        &mut Some(async || Ok(encryption_keys.clone())),
866                        /* permanent= */ false,
867                        /* force= */ false,
868                    )
869                    .await
870                    .context("get_keys failed")?;
871                match cipher_set.find_key(VOLUME_DATA_KEY_ID) {
872                    FindKeyResult::NotFound => {}
873                    FindKeyResult::Unavailable => return Err(FxfsError::NoKey.into()),
874                    FindKeyResult::Key(key) => return Ok(key),
875                }
876                (encryption_keys, (*cipher_set).clone())
877            } else {
878                return Err(anyhow!(FxfsError::Inconsistent));
879            }
880        } else {
881            Default::default()
882        };
883
884        // Proceed to create the key.  The transaction holds the required locks.
885        let (key, unwrapped_key) = crypt.create_key(self.object_id, KeyPurpose::Data).await?;
886        let cipher: Arc<dyn Cipher> = Arc::new(FxfsCipher::new(&unwrapped_key));
887
888        // Add new cipher to cloned cipher set. This will replace existing one
889        // if transaction is successful.
890        cipher_set.add_key(VOLUME_DATA_KEY_ID, CipherHolder::Cipher(cipher.clone()));
891        let cipher_set = Arc::new(cipher_set);
892
893        // Arrange for the CipherSet to be added to the cache when (and if) the transaction
894        // commits.
895        struct UnwrappedKeys {
896            object_id: u64,
897            new_keys: Arc<CipherSet>,
898        }
899
900        impl AssociatedObject for UnwrappedKeys {
901            fn will_apply_mutation(
902                &self,
903                _mutation: &Mutation,
904                object_id: u64,
905                manager: &ObjectManager,
906            ) {
907                manager.store(object_id).unwrap().key_manager.insert(
908                    self.object_id,
909                    self.new_keys.clone(),
910                    /* permanent= */ false,
911                );
912            }
913        }
914
915        encryption_keys.insert(VOLUME_DATA_KEY_ID, EncryptionKey::Fxfs(key).into());
916
917        transaction.add_with_object(
918            store.store_object_id(),
919            Mutation::replace_or_insert_object(
920                ObjectKey::keys(self.object_id),
921                ObjectValue::keys(encryption_keys),
922            ),
923            AssocObj::Owned(Box::new(UnwrappedKeys {
924                object_id: self.object_id,
925                new_keys: cipher_set,
926            })),
927        );
928
929        Ok(cipher)
930    }
931
932    pub async fn read(
933        &self,
934        attribute_id: u64,
935        offset: u64,
936        mut buf: MutableBufferRef<'_>,
937    ) -> Result<usize, Error> {
938        let fs = self.store().filesystem();
939        let guard = fs
940            .lock_manager()
941            .read_lock(lock_keys![LockKey::object_attribute(
942                self.store().store_object_id(),
943                self.object_id(),
944                attribute_id,
945            )])
946            .await;
947
948        let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
949        let item = self.store().tree().find(&key).await?;
950        let size = match item {
951            Some(item) if item.key == key => match item.value {
952                ObjectValue::Attribute { size, .. } => size,
953                _ => bail!(FxfsError::Inconsistent),
954            },
955            _ => return Ok(0),
956        };
957        if offset >= size {
958            return Ok(0);
959        }
960        let length = min(buf.len() as u64, size - offset) as usize;
961        buf = buf.subslice_mut(0..length);
962        self.read_unchecked(attribute_id, offset, buf, &guard).await?;
963        Ok(length)
964    }
965
966    /// Read `buf.len()` bytes from the attribute `attribute_id`, starting at `offset`, into `buf`.
967    /// It's required that a read lock on this attribute id is taken before this is called.
968    ///
969    /// This function doesn't do any size checking - any portion of `buf` past the end of the file
970    /// will be filled with zeros. The caller is responsible for enforcing the file size on reads.
971    /// This is because, just looking at the extents, we can't tell the difference between the file
972    /// actually ending and there just being a section at the end with no data (since attributes
973    /// are sparse).
974    pub(super) async fn read_unchecked(
975        &self,
976        attribute_id: u64,
977        mut offset: u64,
978        mut buf: MutableBufferRef<'_>,
979        _guard: &ReadGuard<'_>,
980    ) -> Result<(), Error> {
981        if buf.len() == 0 {
982            return Ok(());
983        }
984        let end_offset = offset + buf.len() as u64;
985
986        self.store().logical_read_ops.fetch_add(1, Ordering::Relaxed);
987
988        // Whilst the read offset must be aligned to the filesystem block size, the buffer need only
989        // be aligned to the device's block size.
990        let block_size = self.block_size() as u64;
991        let device_block_size = self.store().device.block_size() as u64;
992        assert_eq!(offset % block_size, 0);
993        assert_eq!(buf.range().start as u64 % device_block_size, 0);
994        let tree = &self.store().tree;
995        let layer_set = tree.layer_set();
996        let mut merger = layer_set.merger();
997        let mut iter = merger
998            .query(Query::LimitedRange(&ObjectKey::extent(
999                self.object_id(),
1000                attribute_id,
1001                offset..end_offset,
1002            )))
1003            .await?;
1004        let end_align = ((offset + buf.len() as u64) % block_size) as usize;
1005        let trace = self.trace();
1006        let reads = FuturesUnordered::new();
1007        while let Some(ItemRef {
1008            key:
1009                ObjectKey {
1010                    object_id,
1011                    data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
1012                },
1013            value: ObjectValue::Extent(extent_value),
1014            ..
1015        }) = iter.get()
1016        {
1017            if *object_id != self.object_id() || *attr_id != attribute_id {
1018                break;
1019            }
1020            ensure!(
1021                extent_key.range.is_valid() && extent_key.range.is_aligned(block_size),
1022                FxfsError::Inconsistent
1023            );
1024            if extent_key.range.start > offset {
1025                // Zero everything up to the start of the extent.
1026                let to_zero = min(extent_key.range.start - offset, buf.len() as u64) as usize;
1027                buf.as_mut_slice()[..to_zero].fill(0);
1028                buf = buf.subslice_mut(to_zero..);
1029                if buf.is_empty() {
1030                    break;
1031                }
1032                offset += to_zero as u64;
1033            }
1034
1035            if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1036                let mut device_offset = device_offset + (offset - extent_key.range.start);
1037                let key_id = *key_id;
1038
1039                let to_copy = min(buf.len() - end_align, (extent_key.range.end - offset) as usize);
1040                if to_copy > 0 {
1041                    if trace {
1042                        info!(
1043                            store_id = self.store().store_object_id(),
1044                            oid = self.object_id(),
1045                            device_range:? = (device_offset..device_offset + to_copy as u64),
1046                            offset,
1047                            range:? = extent_key.range,
1048                            block_size;
1049                            "R",
1050                        );
1051                    }
1052                    let (mut head, tail) = buf.split_at_mut(to_copy);
1053                    let maybe_bitmap = match mode {
1054                        ExtentMode::OverwritePartial(bitmap) => {
1055                            let mut read_bitmap = bitmap.clone().split_off(
1056                                ((offset - extent_key.range.start) / block_size) as usize,
1057                            );
1058                            read_bitmap.truncate(to_copy / block_size as usize);
1059                            Some(read_bitmap)
1060                        }
1061                        _ => None,
1062                    };
1063                    reads.push(async move {
1064                        self.read_and_decrypt(device_offset, offset, head.reborrow(), key_id)
1065                            .await?;
1066                        if let Some(bitmap) = maybe_bitmap {
1067                            apply_bitmap_zeroing(self.block_size() as usize, &bitmap, head);
1068                        }
1069                        Ok::<(), Error>(())
1070                    });
1071                    buf = tail;
1072                    if buf.is_empty() {
1073                        break;
1074                    }
1075                    offset += to_copy as u64;
1076                    device_offset += to_copy as u64;
1077                }
1078
1079                // Deal with end alignment by reading the existing contents into an alignment
1080                // buffer.
1081                if offset < extent_key.range.end && end_align > 0 {
1082                    if let ExtentMode::OverwritePartial(bitmap) = mode {
1083                        let bitmap_offset = (offset - extent_key.range.start) / block_size;
1084                        if !bitmap.get(bitmap_offset as usize).ok_or(FxfsError::Inconsistent)? {
1085                            // If this block isn't actually initialized, skip it.
1086                            break;
1087                        }
1088                    }
1089                    let mut align_buf =
1090                        self.store().device.allocate_buffer(block_size as usize).await;
1091                    if trace {
1092                        info!(
1093                            store_id = self.store().store_object_id(),
1094                            oid = self.object_id(),
1095                            device_range:? = (device_offset..device_offset + align_buf.len() as u64);
1096                            "RT",
1097                        );
1098                    }
1099                    self.read_and_decrypt(device_offset, offset, align_buf.as_mut(), key_id)
1100                        .await?;
1101                    buf.as_mut_slice().copy_from_slice(&align_buf.as_slice()[..end_align]);
1102                    buf = buf.subslice_mut(0..0);
1103                    break;
1104                }
1105            } else if extent_key.range.end >= offset + buf.len() as u64 {
1106                // Deleted extent covers remainder, so we're done.
1107                break;
1108            }
1109
1110            iter.advance().await?;
1111        }
1112        reads.try_collect::<()>().await?;
1113        buf.as_mut_slice().fill(0);
1114        Ok(())
1115    }
1116
1117    /// Reads an entire attribute.
1118    pub async fn read_attr(&self, attribute_id: u64) -> Result<Option<Box<[u8]>>, Error> {
1119        let store = self.store();
1120        let tree = &store.tree;
1121        let layer_set = tree.layer_set();
1122        let mut merger = layer_set.merger();
1123        let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1124        let iter = merger.query(Query::FullRange(&key)).await?;
1125        match iter.get() {
1126            Some(item) if item.key == &key => match item.value {
1127                ObjectValue::Attribute { .. } => Ok(Some(self.read_attr_from_iter(iter).await?)),
1128                // Attribute was deleted.
1129                ObjectValue::None => Ok(None),
1130                _ => Err(FxfsError::Inconsistent.into()),
1131            },
1132            _ => Ok(None),
1133        }
1134    }
1135
1136    /// Reads an entire attribute pointed to by `iter`. `iter` must be pointing to the
1137    /// `AttributeKey::Attribute` of the attribute.
1138    pub async fn read_attr_from_iter(
1139        &self,
1140        mut iter: MergerIterator<'_, '_, ObjectKey, ObjectValue>,
1141    ) -> Result<Box<[u8]>, Error> {
1142        let (mut buffer, size, attribute_id) = match iter.get() {
1143            Some(ItemRef {
1144                key:
1145                    ObjectKey {
1146                        object_id,
1147                        data: ObjectKeyData::Attribute(attribute_id, AttributeKey::Attribute),
1148                    },
1149                value: ObjectValue::Attribute { size, .. },
1150                ..
1151            }) if *object_id == self.object_id => {
1152                // TODO(https://fxbug.dev/42073113): size > max buffer size
1153                (
1154                    self.store()
1155                        .device
1156                        .allocate_buffer(round_up(*size, self.block_size()).unwrap() as usize)
1157                        .await,
1158                    *size as usize,
1159                    *attribute_id,
1160                )
1161            }
1162            _ => bail!(FxfsError::InvalidArgs),
1163        };
1164
1165        self.store().logical_read_ops.fetch_add(1, Ordering::Relaxed);
1166        let mut last_offset = 0;
1167        loop {
1168            iter.advance().await?;
1169            match iter.get() {
1170                Some(ItemRef {
1171                    key:
1172                        ObjectKey {
1173                            object_id,
1174                            data:
1175                                ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
1176                        },
1177                    value: ObjectValue::Extent(extent_value),
1178                    ..
1179                }) if *object_id == self.object_id() && *attr_id == attribute_id => {
1180                    if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1181                        let offset = extent_key.range.start as usize;
1182                        buffer.as_mut_slice()[last_offset..offset].fill(0);
1183                        let end = std::cmp::min(extent_key.range.end as usize, buffer.len());
1184                        let maybe_bitmap = match mode {
1185                            ExtentMode::OverwritePartial(bitmap) => {
1186                                // The caller has to adjust the bitmap if necessary, but we always
1187                                // start from the beginning of any extent, so we only truncate.
1188                                let mut read_bitmap = bitmap.clone();
1189                                read_bitmap.truncate(
1190                                    (end - extent_key.range.start as usize)
1191                                        / self.block_size() as usize,
1192                                );
1193                                Some(read_bitmap)
1194                            }
1195                            _ => None,
1196                        };
1197                        self.read_and_decrypt(
1198                            *device_offset,
1199                            extent_key.range.start,
1200                            buffer.subslice_mut(offset..end as usize),
1201                            *key_id,
1202                        )
1203                        .await?;
1204                        if let Some(bitmap) = maybe_bitmap {
1205                            apply_bitmap_zeroing(
1206                                self.block_size() as usize,
1207                                &bitmap,
1208                                buffer.subslice_mut(offset..end as usize),
1209                            );
1210                        }
1211                        last_offset = end;
1212                        if last_offset >= size {
1213                            break;
1214                        }
1215                    }
1216                }
1217                _ => break,
1218            }
1219        }
1220        buffer.as_mut_slice()[std::cmp::min(last_offset, size)..].fill(0);
1221        Ok(buffer.as_slice()[..size].into())
1222    }
1223
1224    /// Writes potentially unaligned data at `device_offset` and returns checksums if requested.
1225    /// The data will be encrypted if necessary.  `buf` is mutable as an optimization, since the
1226    /// write may require encryption, we can encrypt the buffer in-place rather than copying to
1227    /// another buffer if the write is already aligned.
1228    ///
1229    /// NOTE: This will not create keys if they are missing (it will fail with an error if that
1230    /// happens to be the case).
1231    pub async fn write_at(
1232        &self,
1233        attribute_id: u64,
1234        offset: u64,
1235        buf: MutableBufferRef<'_>,
1236        key_id: Option<u64>,
1237        mut device_offset: u64,
1238    ) -> Result<MaybeChecksums, Error> {
1239        let mut transfer_buf;
1240        let block_size = self.block_size();
1241        let (range, mut transfer_buf_ref) =
1242            if offset % block_size == 0 && buf.len() as u64 % block_size == 0 {
1243                (offset..offset + buf.len() as u64, buf)
1244            } else {
1245                let (range, buf) = self.align_buffer(attribute_id, offset, buf.as_ref()).await?;
1246                transfer_buf = buf;
1247                device_offset -= offset - range.start;
1248                (range, transfer_buf.as_mut())
1249            };
1250
1251        let mut crypt_ctx = None;
1252        if let (_, Some(key)) = self.get_key(key_id).await? {
1253            if let Some(ctx) = key.crypt_ctx(self.object_id, range.start) {
1254                crypt_ctx = Some(ctx);
1255            } else {
1256                key.encrypt(
1257                    self.object_id,
1258                    device_offset,
1259                    range.start,
1260                    transfer_buf_ref.as_mut_slice(),
1261                )?;
1262            }
1263        }
1264        self.write_aligned(transfer_buf_ref.as_ref(), device_offset, crypt_ctx).await
1265    }
1266
1267    /// Writes to multiple ranges with data provided in `buf`. This function is specifically
1268    /// designed for migration purposes, allowing raw writes to the device without updating
1269    /// object metadata like allocated size or mtime. It's essential for scenarios where
1270    /// data needs to be transferred directly without triggering standard filesystem operations.
1271    #[cfg(feature = "migration")]
1272    pub async fn raw_multi_write(
1273        &self,
1274        transaction: &mut Transaction<'_>,
1275        attribute_id: u64,
1276        key_id: Option<u64>,
1277        ranges: &[Range<u64>],
1278        buf: MutableBufferRef<'_>,
1279    ) -> Result<(), Error> {
1280        self.multi_write_internal(transaction, attribute_id, key_id, ranges, buf).await?;
1281        Ok(())
1282    }
1283
1284    /// This is a low-level write function that writes to multiple ranges. Users should generally
1285    /// use `multi_write` instead of this function as this does not update the object's allocated
1286    /// size, mtime, atime, etc.
1287    ///
1288    /// Returns (allocated, deallocated) bytes on success.
1289    async fn multi_write_internal(
1290        &self,
1291        transaction: &mut Transaction<'_>,
1292        attribute_id: u64,
1293        key_id: Option<u64>,
1294        ranges: &[Range<u64>],
1295        mut buf: MutableBufferRef<'_>,
1296    ) -> Result<(u64, u64), Error> {
1297        if buf.is_empty() {
1298            return Ok((0, 0));
1299        }
1300        let block_size = self.block_size();
1301        let store = self.store();
1302        let store_id = store.store_object_id();
1303
1304        // The only key we allow to be created on-the-fly is a non permanent key wrapped with the
1305        // volume data key.
1306        let (key_id, key) = if key_id == Some(VOLUME_DATA_KEY_ID)
1307            && matches!(self.encryption, Encryption::CachedKeys)
1308        {
1309            (
1310                VOLUME_DATA_KEY_ID,
1311                Some(
1312                    self.get_or_create_key(transaction)
1313                        .await
1314                        .context("get_or_create_key failed")?,
1315                ),
1316            )
1317        } else {
1318            self.get_key(key_id).await?
1319        };
1320        if let Some(key) = &key {
1321            if !key.supports_inline_encryption() {
1322                let mut slice = buf.as_mut_slice();
1323                for r in ranges {
1324                    let l = r.end - r.start;
1325                    let (head, tail) = slice.split_at_mut(l as usize);
1326                    key.encrypt(
1327                        self.object_id,
1328                        0, /* TODO(https://fxbug.dev/421269588): plumb through device_offset. */
1329                        r.start,
1330                        head,
1331                    )?;
1332                    slice = tail;
1333                }
1334            }
1335        }
1336
1337        let mut allocated = 0;
1338        let allocator = store.allocator();
1339        let trace = self.trace();
1340        let mut writes = FuturesOrdered::new();
1341
1342        let mut logical_ranges = ranges.iter();
1343        let mut current_range = logical_ranges.next().unwrap().clone();
1344
1345        while !buf.is_empty() {
1346            let mut device_range = allocator
1347                .allocate(transaction, store_id, buf.len() as u64)
1348                .await
1349                .context("allocation failed")?;
1350            if trace {
1351                info!(
1352                    store_id,
1353                    oid = self.object_id(),
1354                    device_range:?,
1355                    len = device_range.end - device_range.start;
1356                    "A",
1357                );
1358            }
1359            let mut device_range_len = device_range.end - device_range.start;
1360            allocated += device_range_len;
1361            // If inline encryption is NOT supported, this loop should only happen once.
1362            while device_range_len > 0 {
1363                if current_range.end <= current_range.start {
1364                    current_range = logical_ranges.next().unwrap().clone();
1365                }
1366                let (crypt_ctx, split) = if let Some(key) = &key {
1367                    if key.supports_inline_encryption() {
1368                        let split = std::cmp::min(
1369                            current_range.end - current_range.start,
1370                            device_range_len,
1371                        );
1372                        let crypt_ctx = key.crypt_ctx(self.object_id, current_range.start);
1373                        current_range.start += split;
1374                        (crypt_ctx, split)
1375                    } else {
1376                        (None, device_range_len)
1377                    }
1378                } else {
1379                    (None, device_range_len)
1380                };
1381
1382                let (head, tail) = buf.split_at_mut(split as usize);
1383                buf = tail;
1384
1385                writes.push_back(async move {
1386                    let len = head.len() as u64;
1387                    Result::<_, Error>::Ok((
1388                        device_range.start,
1389                        len,
1390                        self.write_aligned(head.as_ref(), device_range.start, crypt_ctx).await?,
1391                    ))
1392                });
1393                device_range.start += split;
1394                device_range_len = device_range.end - device_range.start;
1395            }
1396        }
1397
1398        self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1399        let ((mutations, checksums), deallocated) = try_join!(
1400            async {
1401                let mut current_range = 0..0;
1402                let mut mutations = Vec::new();
1403                let mut out_checksums = Vec::new();
1404                let mut ranges = ranges.iter();
1405                while let Some((mut device_offset, mut len, mut checksums)) =
1406                    writes.try_next().await?
1407                {
1408                    while len > 0 {
1409                        if current_range.end <= current_range.start {
1410                            current_range = ranges.next().unwrap().clone();
1411                        }
1412                        let chunk_len = std::cmp::min(len, current_range.end - current_range.start);
1413                        let tail = checksums.split_off((chunk_len / block_size) as usize);
1414                        if let Some(checksums) = checksums.maybe_as_ref() {
1415                            out_checksums.push((
1416                                device_offset..device_offset + chunk_len,
1417                                checksums.to_owned(),
1418                            ));
1419                        }
1420                        mutations.push(Mutation::merge_object(
1421                            ObjectKey::extent(
1422                                self.object_id(),
1423                                attribute_id,
1424                                current_range.start..current_range.start + chunk_len,
1425                            ),
1426                            ObjectValue::Extent(ExtentValue::new(
1427                                device_offset,
1428                                checksums.to_mode(),
1429                                key_id,
1430                            )),
1431                        ));
1432                        checksums = tail;
1433                        device_offset += chunk_len;
1434                        len -= chunk_len;
1435                        current_range.start += chunk_len;
1436                    }
1437                }
1438                Result::<_, Error>::Ok((mutations, out_checksums))
1439            },
1440            async {
1441                let mut deallocated = 0;
1442                for r in ranges {
1443                    deallocated +=
1444                        self.deallocate_old_extents(transaction, attribute_id, r.clone()).await?;
1445                }
1446                Result::<_, Error>::Ok(deallocated)
1447            }
1448        )?;
1449
1450        for m in mutations {
1451            transaction.add(store_id, m);
1452        }
1453
1454        // Only store checksums in the journal if barriers are not enabled.
1455        if !store.filesystem().options().barriers_enabled {
1456            for (r, c) in checksums {
1457                transaction.add_checksum(r, c, true);
1458            }
1459        }
1460        Ok((allocated, deallocated))
1461    }
1462
1463    /// Writes to multiple ranges with data provided in `buf`.  The buffer can be modified in place
1464    /// if encryption takes place.  The ranges must all be aligned and no change to content size is
1465    /// applied; the caller is responsible for updating size if required.  If `key_id` is None, it
1466    /// means pick the default key for the object which is the fscrypt key if present, or the volume
1467    /// data key, or no key if it's an unencrypted file.
1468    pub async fn multi_write(
1469        &self,
1470        transaction: &mut Transaction<'_>,
1471        attribute_id: u64,
1472        key_id: Option<u64>,
1473        ranges: &[Range<u64>],
1474        buf: MutableBufferRef<'_>,
1475    ) -> Result<(), Error> {
1476        let (allocated, deallocated) =
1477            self.multi_write_internal(transaction, attribute_id, key_id, ranges, buf).await?;
1478        if allocated == 0 && deallocated == 0 {
1479            return Ok(());
1480        }
1481        self.update_allocated_size(transaction, allocated, deallocated).await
1482    }
1483
1484    /// Write data to overwrite extents with the provided set of ranges. This makes a strong
1485    /// assumption that the ranges are actually going to be already allocated overwrite extents and
1486    /// will error out or do something wrong if they aren't. It also assumes the ranges passed to
1487    /// it are sorted.
1488    pub async fn multi_overwrite<'a>(
1489        &'a self,
1490        transaction: &mut Transaction<'a>,
1491        attr_id: u64,
1492        ranges: &[Range<u64>],
1493        mut buf: MutableBufferRef<'_>,
1494    ) -> Result<(), Error> {
1495        if buf.is_empty() {
1496            return Ok(());
1497        }
1498        let block_size = self.block_size();
1499        let store = self.store();
1500        let tree = store.tree();
1501        let store_id = store.store_object_id();
1502
1503        let (key_id, key) = self.get_key(None).await?;
1504        if let Some(key) = &key {
1505            if !key.supports_inline_encryption() {
1506                let mut slice = buf.as_mut_slice();
1507                for r in ranges {
1508                    let l = r.end - r.start;
1509                    let (head, tail) = slice.split_at_mut(l as usize);
1510                    key.encrypt(
1511                        self.object_id,
1512                        0, /* TODO(https://fxbug.dev/421269588): plumb through device_offset. */
1513                        r.start,
1514                        head,
1515                    )?;
1516                    slice = tail;
1517                }
1518            }
1519        }
1520
1521        let mut range_iter = ranges.iter();
1522        // There should be at least one range if the buffer has data in it
1523        let mut target_range = range_iter.next().unwrap().clone();
1524        let mut mutations = Vec::new();
1525        let writes = FuturesUnordered::new();
1526
1527        let layer_set = tree.layer_set();
1528        let mut merger = layer_set.merger();
1529        let mut iter = merger
1530            .query(Query::FullRange(&ObjectKey::attribute(
1531                self.object_id(),
1532                attr_id,
1533                AttributeKey::Extent(ExtentKey::search_key_from_offset(target_range.start)),
1534            )))
1535            .await?;
1536
1537        loop {
1538            match iter.get() {
1539                Some(ItemRef {
1540                    key:
1541                        ObjectKey {
1542                            object_id,
1543                            data:
1544                                ObjectKeyData::Attribute(
1545                                    attribute_id,
1546                                    AttributeKey::Extent(ExtentKey { range }),
1547                                ),
1548                        },
1549                    value: ObjectValue::Extent(extent_value),
1550                    ..
1551                }) if *object_id == self.object_id() && *attribute_id == attr_id => {
1552                    // If this extent ends before the target range starts (not possible on the
1553                    // first loop because of the query parameters but possible on further loops),
1554                    // advance until we find a the next one we care about.
1555                    if range.end <= target_range.start {
1556                        iter.advance().await?;
1557                        continue;
1558                    }
1559                    let (device_offset, mode) = match extent_value {
1560                        ExtentValue::None => {
1561                            return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1562                                format!(
1563                                    "multi_overwrite failed: target_range ({}, {}) overlaps with \
1564                                deleted extent found at ({}, {})",
1565                                    target_range.start, target_range.end, range.start, range.end,
1566                                )
1567                            });
1568                        }
1569                        ExtentValue::Some { device_offset, mode, .. } => (device_offset, mode),
1570                    };
1571                    // The ranges passed to this function should already by allocated, so
1572                    // extent records should exist for them.
1573                    if range.start > target_range.start {
1574                        return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1575                            format!(
1576                                "multi_overwrite failed: target range ({}, {}) starts before first \
1577                            extent found at ({}, {})",
1578                                target_range.start, target_range.end, range.start, range.end,
1579                            )
1580                        });
1581                    }
1582                    let mut bitmap = match mode {
1583                        ExtentMode::Raw | ExtentMode::Cow(_) => {
1584                            return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1585                                format!(
1586                                    "multi_overwrite failed: \
1587                            extent from ({}, {}) which overlaps target range ({}, {}) had the \
1588                            wrong extent mode",
1589                                    range.start, range.end, target_range.start, target_range.end,
1590                                )
1591                            });
1592                        }
1593                        ExtentMode::OverwritePartial(bitmap) => {
1594                            OverwriteBitmaps::new(bitmap.clone())
1595                        }
1596                        ExtentMode::Overwrite => OverwriteBitmaps::None,
1597                    };
1598                    loop {
1599                        let offset_within_extent = target_range.start - range.start;
1600                        let bitmap_offset = offset_within_extent / block_size;
1601                        let write_device_offset = *device_offset + offset_within_extent;
1602                        let write_end = min(range.end, target_range.end);
1603                        let write_len = write_end - target_range.start;
1604                        let write_device_range =
1605                            write_device_offset..write_device_offset + write_len;
1606                        let (current_buf, remaining_buf) = buf.split_at_mut(write_len as usize);
1607
1608                        bitmap.set_offset(bitmap_offset as usize);
1609                        let checksum_ranges = ChecksumRangeChunk::group_first_write_ranges(
1610                            &mut bitmap,
1611                            block_size,
1612                            write_device_range,
1613                        );
1614
1615                        let crypt_ctx = if let Some(key) = &key {
1616                            key.crypt_ctx(self.object_id, target_range.start)
1617                        } else {
1618                            None
1619                        };
1620
1621                        writes.push(async move {
1622                            let maybe_checksums = self
1623                                .write_aligned(current_buf.as_ref(), write_device_offset, crypt_ctx)
1624                                .await?;
1625                            Ok::<_, Error>(match maybe_checksums {
1626                                MaybeChecksums::None => Vec::new(),
1627                                MaybeChecksums::Fletcher(checksums) => checksum_ranges
1628                                    .into_iter()
1629                                    .map(
1630                                        |ChecksumRangeChunk {
1631                                             checksum_range,
1632                                             device_range,
1633                                             is_first_write,
1634                                         }| {
1635                                            (
1636                                                device_range,
1637                                                checksums[checksum_range].to_vec(),
1638                                                is_first_write,
1639                                            )
1640                                        },
1641                                    )
1642                                    .collect(),
1643                            })
1644                        });
1645                        buf = remaining_buf;
1646                        target_range.start += write_len;
1647                        if target_range.start == target_range.end {
1648                            match range_iter.next() {
1649                                None => break,
1650                                Some(next_range) => target_range = next_range.clone(),
1651                            }
1652                        }
1653                        if range.end <= target_range.start {
1654                            break;
1655                        }
1656                    }
1657                    if let Some((mut bitmap, write_bitmap)) = bitmap.take_bitmaps() {
1658                        if bitmap.or(&write_bitmap) {
1659                            let mode = if bitmap.all() {
1660                                ExtentMode::Overwrite
1661                            } else {
1662                                ExtentMode::OverwritePartial(bitmap)
1663                            };
1664                            mutations.push(Mutation::merge_object(
1665                                ObjectKey::extent(self.object_id(), attr_id, range.clone()),
1666                                ObjectValue::Extent(ExtentValue::new(*device_offset, mode, key_id)),
1667                            ))
1668                        }
1669                    }
1670                    if target_range.start == target_range.end {
1671                        break;
1672                    }
1673                    iter.advance().await?;
1674                }
1675                // We've either run past the end of the existing extents or something is wrong with
1676                // the tree. The main section should break if it finishes the ranges, so either
1677                // case, this is an error.
1678                _ => bail!(anyhow!(FxfsError::Internal).context(
1679                    "found a non-extent object record while there were still ranges to process"
1680                )),
1681            }
1682        }
1683
1684        let checksums = writes.try_collect::<Vec<_>>().await?;
1685        // Only store checksums in the journal if barriers are not enabled.
1686        if !store.filesystem().options().barriers_enabled {
1687            for (r, c, first_write) in checksums.into_iter().flatten() {
1688                transaction.add_checksum(r, c, first_write);
1689            }
1690        }
1691
1692        for m in mutations {
1693            transaction.add(store_id, m);
1694        }
1695
1696        Ok(())
1697    }
1698
1699    /// Writes an attribute that should not already exist and therefore does not require trimming.
1700    /// Breaks up the write into multiple transactions if `data.len()` is larger than `batch_size`.
1701    /// If writing the attribute requires multiple transactions, adds the attribute to the
1702    /// graveyard. The caller is responsible for removing the attribute from the graveyard when it
1703    /// commits the last transaction.  This always writes using a key wrapped with the volume data
1704    /// key.
1705    #[trace]
1706    pub async fn write_new_attr_in_batches<'a>(
1707        &'a self,
1708        transaction: &mut Transaction<'a>,
1709        attribute_id: u64,
1710        data: &[u8],
1711        batch_size: usize,
1712    ) -> Result<(), Error> {
1713        transaction.add(
1714            self.store().store_object_id,
1715            Mutation::replace_or_insert_object(
1716                ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1717                ObjectValue::attribute(data.len() as u64, false),
1718            ),
1719        );
1720        let chunks = data.chunks(batch_size);
1721        let num_chunks = chunks.len();
1722        if num_chunks > 1 {
1723            transaction.add(
1724                self.store().store_object_id,
1725                Mutation::replace_or_insert_object(
1726                    ObjectKey::graveyard_attribute_entry(
1727                        self.store().graveyard_directory_object_id(),
1728                        self.object_id(),
1729                        attribute_id,
1730                    ),
1731                    ObjectValue::Some,
1732                ),
1733            );
1734        }
1735        let mut start_offset = 0;
1736        for (i, chunk) in chunks.enumerate() {
1737            let rounded_len = round_up(chunk.len() as u64, self.block_size()).unwrap();
1738            let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1739            let slice = buffer.as_mut_slice();
1740            slice[..chunk.len()].copy_from_slice(chunk);
1741            slice[chunk.len()..].fill(0);
1742            self.multi_write(
1743                transaction,
1744                attribute_id,
1745                Some(VOLUME_DATA_KEY_ID),
1746                &[start_offset..start_offset + rounded_len],
1747                buffer.as_mut(),
1748            )
1749            .await?;
1750            start_offset += rounded_len;
1751            // Do not commit the last chunk.
1752            if i < num_chunks - 1 {
1753                transaction.commit_and_continue().await?;
1754            }
1755        }
1756        Ok(())
1757    }
1758
1759    /// Writes an entire attribute. Returns whether or not the attribute needs to continue being
1760    /// trimmed - if the new data is shorter than the old data, this will trim any extents beyond
1761    /// the end of the new size, but if there were too many for a single transaction, a commit
1762    /// needs to be made before trimming again, so the responsibility is left to the caller so as
1763    /// to not accidentally split the transaction when it's not in a consistent state.  This will
1764    /// write using the volume data key; the fscrypt key is not supported.
1765    pub async fn write_attr(
1766        &self,
1767        transaction: &mut Transaction<'_>,
1768        attribute_id: u64,
1769        data: &[u8],
1770    ) -> Result<NeedsTrim, Error> {
1771        let rounded_len = round_up(data.len() as u64, self.block_size()).unwrap();
1772        let store = self.store();
1773        let tree = store.tree();
1774        let should_trim = if let Some(item) = tree
1775            .find(&ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute))
1776            .await?
1777        {
1778            match item.value {
1779                ObjectValue::Attribute { size: _, has_overwrite_extents: true } => {
1780                    bail!(
1781                        anyhow!(FxfsError::Inconsistent)
1782                            .context("write_attr on an attribute with overwrite extents")
1783                    )
1784                }
1785                ObjectValue::Attribute { size, .. } => (data.len() as u64) < size,
1786                _ => bail!(FxfsError::Inconsistent),
1787            }
1788        } else {
1789            false
1790        };
1791        let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1792        let slice = buffer.as_mut_slice();
1793        slice[..data.len()].copy_from_slice(data);
1794        slice[data.len()..].fill(0);
1795        self.multi_write(
1796            transaction,
1797            attribute_id,
1798            Some(VOLUME_DATA_KEY_ID),
1799            &[0..rounded_len],
1800            buffer.as_mut(),
1801        )
1802        .await?;
1803        transaction.add(
1804            self.store().store_object_id,
1805            Mutation::replace_or_insert_object(
1806                ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1807                ObjectValue::attribute(data.len() as u64, false),
1808            ),
1809        );
1810        if should_trim {
1811            self.shrink(transaction, attribute_id, data.len() as u64).await
1812        } else {
1813            Ok(NeedsTrim(false))
1814        }
1815    }
1816
1817    pub async fn list_extended_attributes(&self) -> Result<Vec<Vec<u8>>, Error> {
1818        let layer_set = self.store().tree().layer_set();
1819        let mut merger = layer_set.merger();
1820        // Seek to the first extended attribute key for this object.
1821        let mut iter = merger
1822            .query(Query::FullRange(&ObjectKey::extended_attribute(self.object_id(), Vec::new())))
1823            .await?;
1824        let mut out = Vec::new();
1825        while let Some(item) = iter.get() {
1826            // Skip deleted extended attributes.
1827            if item.value != &ObjectValue::None {
1828                match item.key {
1829                    ObjectKey { object_id, data: ObjectKeyData::ExtendedAttribute { name } }
1830                        if *object_id == self.object_id() =>
1831                    {
1832                        out.push(name.clone());
1833                    }
1834                    // Once we hit a record belonging to another object, or one that is not an
1835                    // extended attribute key, we have reached the end of this object's extended
1836                    // attributes. Subsequent objects' records will start with lower variants
1837                    // (e.g. ObjectKeyData::Object) which trigger this break.
1838                    _ => break,
1839                }
1840            }
1841            iter.advance().await?;
1842        }
1843        Ok(out)
1844    }
1845
1846    /// Looks up the values for the extended attribute `fio::SELINUX_CONTEXT_NAME`, returning it
1847    /// if it is found inline. If it is not inline, it will request use of the
1848    /// `get_extended_attributes` method. If the entry doesn't exist at all, returns None.
1849    pub async fn get_inline_selinux_context(&self) -> Result<Option<fio::SelinuxContext>, Error> {
1850        // This optimization is only useful as long as the attribute is smaller than inline sizes.
1851        // Avoid reading the data out of the attributes.
1852        const_assert!(fio::MAX_SELINUX_CONTEXT_ATTRIBUTE_LEN as usize <= MAX_INLINE_XATTR_SIZE);
1853        let item = match self
1854            .store()
1855            .tree()
1856            .find(&ObjectKey::extended_attribute(
1857                self.object_id(),
1858                fio::SELINUX_CONTEXT_NAME.into(),
1859            ))
1860            .await?
1861        {
1862            Some(item) => item,
1863            None => return Ok(None),
1864        };
1865        match item.value {
1866            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => {
1867                Ok(Some(fio::SelinuxContext::Data(value)))
1868            }
1869            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(_)) => {
1870                Ok(Some(fio::SelinuxContext::UseExtendedAttributes(fio::EmptyStruct {})))
1871            }
1872            _ => {
1873                bail!(
1874                    anyhow!(FxfsError::Inconsistent)
1875                        .context("get_inline_extended_attribute: Expected ExtendedAttribute value")
1876                )
1877            }
1878        }
1879    }
1880
1881    pub async fn get_extended_attribute(&self, name: Vec<u8>) -> Result<Vec<u8>, Error> {
1882        let item = self
1883            .store()
1884            .tree()
1885            .find(&ObjectKey::extended_attribute(self.object_id(), name))
1886            .await?
1887            .ok_or(FxfsError::NotFound)?;
1888        match item.value {
1889            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => Ok(value),
1890            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1891                Ok(self.read_attr(id).await?.ok_or(FxfsError::Inconsistent)?.into_vec())
1892            }
1893            _ => {
1894                bail!(
1895                    anyhow!(FxfsError::Inconsistent)
1896                        .context("get_extended_attribute: Expected ExtendedAttribute value")
1897                )
1898            }
1899        }
1900    }
1901
1902    pub async fn set_extended_attribute(
1903        &self,
1904        name: Vec<u8>,
1905        value: Vec<u8>,
1906        mode: SetExtendedAttributeMode,
1907    ) -> Result<(), Error> {
1908        let store = self.store();
1909        let fs = store.filesystem();
1910        // NB: We need to take this lock before we potentially look up the value to prevent racing
1911        // with another set.
1912        let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
1913        let mut transaction = fs.new_transaction(keys, Options::default()).await?;
1914        self.set_extended_attribute_impl(name, value, mode, &mut transaction).await?;
1915        transaction.commit().await?;
1916        Ok(())
1917    }
1918
1919    async fn set_extended_attribute_impl(
1920        &self,
1921        name: Vec<u8>,
1922        value: Vec<u8>,
1923        mode: SetExtendedAttributeMode,
1924        transaction: &mut Transaction<'_>,
1925    ) -> Result<(), Error> {
1926        ensure!(name.len() <= MAX_XATTR_NAME_SIZE, FxfsError::TooBig);
1927        ensure!(value.len() <= MAX_XATTR_VALUE_SIZE, FxfsError::TooBig);
1928        let tree = self.store().tree();
1929        let object_key = ObjectKey::extended_attribute(self.object_id(), name);
1930
1931        let existing_attribute_id = {
1932            let (found, existing_attribute_id) = match tree.find(&object_key).await? {
1933                None => (false, None),
1934                Some(Item { value, .. }) => (
1935                    true,
1936                    match value {
1937                        ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
1938                        ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1939                            Some(id)
1940                        }
1941                        _ => bail!(
1942                            anyhow!(FxfsError::Inconsistent)
1943                                .context("expected extended attribute value")
1944                        ),
1945                    },
1946                ),
1947            };
1948            match mode {
1949                SetExtendedAttributeMode::Create if found => {
1950                    bail!(FxfsError::AlreadyExists)
1951                }
1952                SetExtendedAttributeMode::Replace if !found => {
1953                    bail!(FxfsError::NotFound)
1954                }
1955                _ => (),
1956            }
1957            existing_attribute_id
1958        };
1959
1960        if let Some(attribute_id) = existing_attribute_id {
1961            // If we already have an attribute id allocated for this extended attribute, we always
1962            // use it, even if the value has shrunk enough to be stored inline. We don't need to
1963            // worry about trimming here for the same reason we don't need to worry about it when
1964            // we delete xattrs - they simply aren't large enough to ever need more than one
1965            // transaction.
1966            let _ = self.write_attr(transaction, attribute_id, &value).await?;
1967        } else if value.len() <= MAX_INLINE_XATTR_SIZE {
1968            transaction.add(
1969                self.store().store_object_id(),
1970                Mutation::replace_or_insert_object(
1971                    object_key,
1972                    ObjectValue::inline_extended_attribute(value),
1973                ),
1974            );
1975        } else {
1976            // If there isn't an existing attribute id and we are going to store the value in
1977            // an attribute, find the next empty attribute id in the range. We search for fxfs
1978            // attribute records specifically, instead of the extended attribute records, because
1979            // even if the extended attribute record is removed the attribute may not be fully
1980            // trimmed yet.
1981            let mut attribute_id = EXTENDED_ATTRIBUTE_RANGE_START;
1982            let layer_set = tree.layer_set();
1983            let mut merger = layer_set.merger();
1984            let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1985            let mut iter = merger.query(Query::FullRange(&key)).await?;
1986            loop {
1987                match iter.get() {
1988                    // None means the key passed to seek wasn't found. That means the first
1989                    // attribute is available and we can just stop right away.
1990                    None => break,
1991                    Some(ItemRef {
1992                        key: ObjectKey { object_id, data: ObjectKeyData::Attribute(attr_id, _) },
1993                        value,
1994                        ..
1995                    }) if *object_id == self.object_id() => {
1996                        if matches!(value, ObjectValue::None) {
1997                            // This attribute was once used but is now deleted, so it's safe to use
1998                            // again.
1999                            break;
2000                        }
2001                        if attribute_id < *attr_id {
2002                            // We found a gap - use it.
2003                            break;
2004                        } else if attribute_id == *attr_id {
2005                            // This attribute id is in use, try the next one.
2006                            attribute_id += 1;
2007                            if attribute_id == EXTENDED_ATTRIBUTE_RANGE_END {
2008                                bail!(FxfsError::NoSpace);
2009                            }
2010                        }
2011                        // If we don't hit either of those cases, we are still moving through the
2012                        // extent keys for the current attribute, so just keep advancing until the
2013                        // attribute id changes.
2014                    }
2015                    // As we are working our way through the iterator, if we hit anything that
2016                    // doesn't have our object id or attribute key data, we've gone past the end of
2017                    // this section and can stop.
2018                    _ => break,
2019                }
2020                iter.advance().await?;
2021            }
2022
2023            // We know this won't need trimming because it's a new attribute.
2024            let _ = self.write_attr(transaction, attribute_id, &value).await?;
2025            transaction.add(
2026                self.store().store_object_id(),
2027                Mutation::replace_or_insert_object(
2028                    object_key,
2029                    ObjectValue::extended_attribute(attribute_id),
2030                ),
2031            );
2032        }
2033
2034        Ok(())
2035    }
2036
2037    pub async fn remove_extended_attribute(&self, name: Vec<u8>) -> Result<(), Error> {
2038        let store = self.store();
2039        let tree = store.tree();
2040        let object_key = ObjectKey::extended_attribute(self.object_id(), name);
2041
2042        // NB: The API says we have to return an error if the attribute doesn't exist, so we have
2043        // to look it up first to make sure we have a record of it before we delete it. Make sure
2044        // we take a lock and make a transaction before we do so we don't race with other
2045        // operations.
2046        let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
2047        let mut transaction = store.filesystem().new_transaction(keys, Options::default()).await?;
2048
2049        let attribute_to_delete =
2050            match tree.find(&object_key).await?.ok_or(FxfsError::NotFound)?.value {
2051                ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => Some(id),
2052                ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
2053                _ => {
2054                    bail!(
2055                        anyhow!(FxfsError::Inconsistent)
2056                            .context("remove_extended_attribute: Expected ExtendedAttribute value")
2057                    )
2058                }
2059            };
2060
2061        transaction.add(
2062            store.store_object_id(),
2063            Mutation::replace_or_insert_object(object_key, ObjectValue::None),
2064        );
2065
2066        // If the attribute wasn't stored inline, we need to deallocate all the extents too. This
2067        // would normally need to interact with the graveyard for correctness - if there are too
2068        // many extents to delete to fit in a single transaction then we could potentially have
2069        // consistency issues. However, the maximum size of an extended attribute is small enough
2070        // that it will never come close to that limit even in the worst case, so we just delete
2071        // everything in one shot.
2072        if let Some(attribute_id) = attribute_to_delete {
2073            let trim_result = store
2074                .trim_some(
2075                    &mut transaction,
2076                    self.object_id(),
2077                    attribute_id,
2078                    TrimMode::FromOffset(0),
2079                )
2080                .await?;
2081            // In case you didn't read the comment above - this should not be used to delete
2082            // arbitrary attributes!
2083            assert_matches!(trim_result, TrimResult::Done(_));
2084            transaction.add(
2085                store.store_object_id(),
2086                Mutation::replace_or_insert_object(
2087                    ObjectKey::attribute(self.object_id, attribute_id, AttributeKey::Attribute),
2088                    ObjectValue::None,
2089                ),
2090            );
2091        }
2092
2093        transaction.commit().await?;
2094        Ok(())
2095    }
2096
2097    /// Returns a future that will pre-fetches the keys so as to avoid paying the performance
2098    /// penalty later. Must ensure that the object is not removed before the future completes.
2099    pub fn pre_fetch_keys(&self) -> Option<impl Future<Output = ()> + use<S>> {
2100        if let Encryption::CachedKeys = self.encryption {
2101            let owner = self.owner.clone();
2102            let object_id = self.object_id;
2103            Some(async move {
2104                let store = owner.as_ref().as_ref();
2105                if let Some(crypt) = store.crypt() {
2106                    let _ = store
2107                        .key_manager
2108                        .get_keys(
2109                            object_id,
2110                            crypt.as_ref(),
2111                            &mut Some(async || store.get_keys(object_id).await),
2112                            /* permanent= */ false,
2113                            /* force= */ false,
2114                        )
2115                        .await;
2116                }
2117            })
2118        } else {
2119            None
2120        }
2121    }
2122}
2123
2124impl<S: HandleOwner> Drop for StoreObjectHandle<S> {
2125    fn drop(&mut self) {
2126        if self.is_encrypted() {
2127            let _ = self.store().key_manager.remove(self.object_id);
2128        }
2129    }
2130}
2131
2132/// When truncating an object, sometimes it might not be possible to complete the transaction in a
2133/// single transaction, in which case the caller needs to finish trimming the object in subsequent
2134/// transactions (by calling ObjectStore::trim).
2135#[must_use]
2136pub struct NeedsTrim(pub bool);
2137
2138#[cfg(test)]
2139mod tests {
2140    use super::{ChecksumRangeChunk, OverwriteBitmaps};
2141    use crate::errors::FxfsError;
2142    use crate::filesystem::{FxFilesystem, OpenFxFilesystem};
2143    use crate::object_handle::ObjectHandle;
2144    use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
2145    use crate::object_store::transaction::{Mutation, Options, lock_keys};
2146    use crate::object_store::{
2147        AttributeKey, DataObjectHandle, Directory, FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions,
2148        LockKey, ObjectKey, ObjectStore, ObjectValue, SetExtendedAttributeMode, StoreObjectHandle,
2149    };
2150    use bit_vec::BitVec;
2151    use fuchsia_async as fasync;
2152    use futures::join;
2153    use std::sync::Arc;
2154    use storage_device::DeviceHolder;
2155    use storage_device::fake_device::FakeDevice;
2156
2157    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2158    const TEST_OBJECT_NAME: &str = "foo";
2159
2160    fn is_error(actual: anyhow::Error, expected: FxfsError) {
2161        assert_eq!(*actual.root_cause().downcast_ref::<FxfsError>().unwrap(), expected)
2162    }
2163
2164    async fn test_filesystem() -> OpenFxFilesystem {
2165        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2166        FxFilesystem::new_empty(device).await.expect("new_empty failed")
2167    }
2168
2169    async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
2170    {
2171        let fs = test_filesystem().await;
2172        let store = fs.root_store();
2173
2174        let mut transaction = fs
2175            .clone()
2176            .new_transaction(
2177                lock_keys![LockKey::object(
2178                    store.store_object_id(),
2179                    store.root_directory_object_id()
2180                )],
2181                Options::default(),
2182            )
2183            .await
2184            .expect("new_transaction failed");
2185
2186        let object =
2187            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2188                .await
2189                .expect("create_object failed");
2190
2191        let root_directory =
2192            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2193        root_directory
2194            .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
2195            .await
2196            .expect("add_child_file failed");
2197
2198        transaction.commit().await.expect("commit failed");
2199
2200        (fs, object)
2201    }
2202
2203    #[fuchsia::test(threads = 3)]
2204    async fn extended_attribute_double_remove() {
2205        // This test is intended to trip a potential race condition in remove. Removing an
2206        // attribute that doesn't exist is an error, so we need to check before we remove, but if
2207        // we aren't careful, two parallel removes might both succeed in the check and then both
2208        // remove the value.
2209        let (fs, object) = test_filesystem_and_empty_object().await;
2210        let basic = Arc::new(StoreObjectHandle::new(
2211            object.owner().clone(),
2212            object.object_id(),
2213            /* permanent_keys: */ false,
2214            HandleOptions::default(),
2215            false,
2216        ));
2217        let basic_a = basic.clone();
2218        let basic_b = basic.clone();
2219
2220        basic
2221            .set_extended_attribute(
2222                b"security.selinux".to_vec(),
2223                b"bar".to_vec(),
2224                SetExtendedAttributeMode::Set,
2225            )
2226            .await
2227            .expect("failed to set attribute");
2228
2229        // Try to remove the attribute twice at the same time. One should succeed in the race and
2230        // return Ok, and the other should fail the race and return NOT_FOUND.
2231        let a_task = fasync::Task::spawn(async move {
2232            basic_a.remove_extended_attribute(b"security.selinux".to_vec()).await
2233        });
2234        let b_task = fasync::Task::spawn(async move {
2235            basic_b.remove_extended_attribute(b"security.selinux".to_vec()).await
2236        });
2237        match join!(a_task, b_task) {
2238            (Ok(()), Ok(())) => panic!("both remove calls succeeded"),
2239            (Err(_), Err(_)) => panic!("both remove calls failed"),
2240
2241            (Ok(()), Err(e)) => is_error(e, FxfsError::NotFound),
2242            (Err(e), Ok(())) => is_error(e, FxfsError::NotFound),
2243        }
2244
2245        fs.close().await.expect("Close failed");
2246    }
2247
2248    #[fuchsia::test(threads = 3)]
2249    async fn extended_attribute_double_create() {
2250        // This test is intended to trip a potential race in set when using the create flag,
2251        // similar to above. If the create mode is set, we need to check that the attribute isn't
2252        // already created, but if two parallel creates both succeed in that check, and we aren't
2253        // careful with locking, they will both succeed and one will overwrite the other.
2254        let (fs, object) = test_filesystem_and_empty_object().await;
2255        let basic = Arc::new(StoreObjectHandle::new(
2256            object.owner().clone(),
2257            object.object_id(),
2258            /* permanent_keys: */ false,
2259            HandleOptions::default(),
2260            false,
2261        ));
2262        let basic_a = basic.clone();
2263        let basic_b = basic.clone();
2264
2265        // Try to set the attribute twice at the same time. One should succeed in the race and
2266        // return Ok, and the other should fail the race and return ALREADY_EXISTS.
2267        let a_task = fasync::Task::spawn(async move {
2268            basic_a
2269                .set_extended_attribute(
2270                    b"security.selinux".to_vec(),
2271                    b"one".to_vec(),
2272                    SetExtendedAttributeMode::Create,
2273                )
2274                .await
2275        });
2276        let b_task = fasync::Task::spawn(async move {
2277            basic_b
2278                .set_extended_attribute(
2279                    b"security.selinux".to_vec(),
2280                    b"two".to_vec(),
2281                    SetExtendedAttributeMode::Create,
2282                )
2283                .await
2284        });
2285        match join!(a_task, b_task) {
2286            (Ok(()), Ok(())) => panic!("both set calls succeeded"),
2287            (Err(_), Err(_)) => panic!("both set calls failed"),
2288
2289            (Ok(()), Err(e)) => {
2290                assert_eq!(
2291                    basic
2292                        .get_extended_attribute(b"security.selinux".to_vec())
2293                        .await
2294                        .expect("failed to get xattr"),
2295                    b"one"
2296                );
2297                is_error(e, FxfsError::AlreadyExists);
2298            }
2299            (Err(e), Ok(())) => {
2300                assert_eq!(
2301                    basic
2302                        .get_extended_attribute(b"security.selinux".to_vec())
2303                        .await
2304                        .expect("failed to get xattr"),
2305                    b"two"
2306                );
2307                is_error(e, FxfsError::AlreadyExists);
2308            }
2309        }
2310
2311        fs.close().await.expect("Close failed");
2312    }
2313
2314    struct TestAttr {
2315        name: Vec<u8>,
2316        value: Vec<u8>,
2317    }
2318
2319    impl TestAttr {
2320        fn new(name: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> Self {
2321            Self { name: name.as_ref().to_vec(), value: value.as_ref().to_vec() }
2322        }
2323        fn name(&self) -> Vec<u8> {
2324            self.name.clone()
2325        }
2326        fn value(&self) -> Vec<u8> {
2327            self.value.clone()
2328        }
2329    }
2330
2331    #[fuchsia::test]
2332    async fn extended_attributes() {
2333        let (fs, object) = test_filesystem_and_empty_object().await;
2334
2335        let test_attr = TestAttr::new(b"security.selinux", b"foo");
2336
2337        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2338        is_error(
2339            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2340            FxfsError::NotFound,
2341        );
2342
2343        object
2344            .set_extended_attribute(
2345                test_attr.name(),
2346                test_attr.value(),
2347                SetExtendedAttributeMode::Set,
2348            )
2349            .await
2350            .unwrap();
2351        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2352        assert_eq!(
2353            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2354            test_attr.value()
2355        );
2356
2357        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2358        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2359        is_error(
2360            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2361            FxfsError::NotFound,
2362        );
2363
2364        // Make sure we can object the same attribute being set again.
2365        object
2366            .set_extended_attribute(
2367                test_attr.name(),
2368                test_attr.value(),
2369                SetExtendedAttributeMode::Set,
2370            )
2371            .await
2372            .unwrap();
2373        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2374        assert_eq!(
2375            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2376            test_attr.value()
2377        );
2378
2379        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2380        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2381        is_error(
2382            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2383            FxfsError::NotFound,
2384        );
2385
2386        fs.close().await.expect("close failed");
2387    }
2388
2389    #[fuchsia::test]
2390    async fn large_extended_attribute() {
2391        let (fs, object) = test_filesystem_and_empty_object().await;
2392
2393        let test_attr = TestAttr::new(b"security.selinux", vec![3u8; 300]);
2394
2395        object
2396            .set_extended_attribute(
2397                test_attr.name(),
2398                test_attr.value(),
2399                SetExtendedAttributeMode::Set,
2400            )
2401            .await
2402            .unwrap();
2403        assert_eq!(
2404            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2405            test_attr.value()
2406        );
2407
2408        // Probe the fxfs attributes to make sure it did the expected thing. This relies on inside
2409        // knowledge of how the attribute id is chosen.
2410        assert_eq!(
2411            object
2412                .read_attr(64)
2413                .await
2414                .expect("read_attr failed")
2415                .expect("read_attr returned none")
2416                .into_vec(),
2417            test_attr.value()
2418        );
2419
2420        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2421        is_error(
2422            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2423            FxfsError::NotFound,
2424        );
2425
2426        // Make sure we can object the same attribute being set again.
2427        object
2428            .set_extended_attribute(
2429                test_attr.name(),
2430                test_attr.value(),
2431                SetExtendedAttributeMode::Set,
2432            )
2433            .await
2434            .unwrap();
2435        assert_eq!(
2436            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2437            test_attr.value()
2438        );
2439        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2440        is_error(
2441            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2442            FxfsError::NotFound,
2443        );
2444
2445        fs.close().await.expect("close failed");
2446    }
2447
2448    #[fuchsia::test]
2449    async fn multiple_extended_attributes() {
2450        let (fs, object) = test_filesystem_and_empty_object().await;
2451
2452        let attrs = [
2453            TestAttr::new(b"security.selinux", b"foo"),
2454            TestAttr::new(b"large.attribute", vec![3u8; 300]),
2455            TestAttr::new(b"an.attribute", b"asdf"),
2456            TestAttr::new(b"user.big", vec![5u8; 288]),
2457            TestAttr::new(b"user.tiny", b"smol"),
2458            TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2459            TestAttr::new(b"also big", vec![7u8; 500]),
2460            TestAttr::new(b"all.ones", vec![1u8; 11111]),
2461        ];
2462
2463        for i in 0..attrs.len() {
2464            object
2465                .set_extended_attribute(
2466                    attrs[i].name(),
2467                    attrs[i].value(),
2468                    SetExtendedAttributeMode::Set,
2469                )
2470                .await
2471                .unwrap();
2472            assert_eq!(
2473                object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2474                attrs[i].value()
2475            );
2476        }
2477
2478        for i in 0..attrs.len() {
2479            // Make sure expected attributes are still available.
2480            let mut found_attrs = object.list_extended_attributes().await.unwrap();
2481            let mut expected_attrs: Vec<Vec<u8>> = attrs.iter().skip(i).map(|a| a.name()).collect();
2482            found_attrs.sort();
2483            expected_attrs.sort();
2484            assert_eq!(found_attrs, expected_attrs);
2485            for j in i..attrs.len() {
2486                assert_eq!(
2487                    object.get_extended_attribute(attrs[j].name()).await.unwrap(),
2488                    attrs[j].value()
2489                );
2490            }
2491
2492            object.remove_extended_attribute(attrs[i].name()).await.expect("failed to remove");
2493            is_error(
2494                object.get_extended_attribute(attrs[i].name()).await.unwrap_err(),
2495                FxfsError::NotFound,
2496            );
2497        }
2498
2499        fs.close().await.expect("close failed");
2500    }
2501
2502    #[fuchsia::test]
2503    async fn multiple_extended_attributes_delete() {
2504        let (fs, object) = test_filesystem_and_empty_object().await;
2505        let store = object.owner().clone();
2506
2507        let attrs = [
2508            TestAttr::new(b"security.selinux", b"foo"),
2509            TestAttr::new(b"large.attribute", vec![3u8; 300]),
2510            TestAttr::new(b"an.attribute", b"asdf"),
2511            TestAttr::new(b"user.big", vec![5u8; 288]),
2512            TestAttr::new(b"user.tiny", b"smol"),
2513            TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2514            TestAttr::new(b"also big", vec![7u8; 500]),
2515            TestAttr::new(b"all.ones", vec![1u8; 11111]),
2516        ];
2517
2518        for i in 0..attrs.len() {
2519            object
2520                .set_extended_attribute(
2521                    attrs[i].name(),
2522                    attrs[i].value(),
2523                    SetExtendedAttributeMode::Set,
2524                )
2525                .await
2526                .unwrap();
2527            assert_eq!(
2528                object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2529                attrs[i].value()
2530            );
2531        }
2532
2533        // Unlink the file
2534        let root_directory =
2535            Directory::open(object.owner(), object.store().root_directory_object_id())
2536                .await
2537                .expect("open failed");
2538        let mut transaction = fs
2539            .clone()
2540            .new_transaction(
2541                lock_keys![
2542                    LockKey::object(store.store_object_id(), store.root_directory_object_id()),
2543                    LockKey::object(store.store_object_id(), object.object_id()),
2544                ],
2545                Options::default(),
2546            )
2547            .await
2548            .expect("new_transaction failed");
2549        crate::object_store::directory::replace_child(
2550            &mut transaction,
2551            None,
2552            (&root_directory, TEST_OBJECT_NAME),
2553        )
2554        .await
2555        .expect("replace_child failed");
2556        transaction.commit().await.unwrap();
2557        store.tombstone_object(object.object_id(), Options::default()).await.unwrap();
2558
2559        crate::fsck::fsck(fs.clone()).await.unwrap();
2560
2561        fs.close().await.expect("close failed");
2562    }
2563
2564    #[fuchsia::test]
2565    async fn extended_attribute_changing_sizes() {
2566        let (fs, object) = test_filesystem_and_empty_object().await;
2567
2568        let test_name = b"security.selinux";
2569        let test_small_attr = TestAttr::new(test_name, b"smol");
2570        let test_large_attr = TestAttr::new(test_name, vec![3u8; 300]);
2571
2572        object
2573            .set_extended_attribute(
2574                test_small_attr.name(),
2575                test_small_attr.value(),
2576                SetExtendedAttributeMode::Set,
2577            )
2578            .await
2579            .unwrap();
2580        assert_eq!(
2581            object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2582            test_small_attr.value()
2583        );
2584
2585        // With a small attribute, we don't expect it to write to an fxfs attribute.
2586        assert!(object.read_attr(64).await.expect("read_attr failed").is_none());
2587
2588        crate::fsck::fsck(fs.clone()).await.unwrap();
2589
2590        object
2591            .set_extended_attribute(
2592                test_large_attr.name(),
2593                test_large_attr.value(),
2594                SetExtendedAttributeMode::Set,
2595            )
2596            .await
2597            .unwrap();
2598        assert_eq!(
2599            object.get_extended_attribute(test_large_attr.name()).await.unwrap(),
2600            test_large_attr.value()
2601        );
2602
2603        // Once the value is above the threshold, we expect it to get upgraded to an fxfs
2604        // attribute.
2605        assert_eq!(
2606            object
2607                .read_attr(64)
2608                .await
2609                .expect("read_attr failed")
2610                .expect("read_attr returned none")
2611                .into_vec(),
2612            test_large_attr.value()
2613        );
2614
2615        crate::fsck::fsck(fs.clone()).await.unwrap();
2616
2617        object
2618            .set_extended_attribute(
2619                test_small_attr.name(),
2620                test_small_attr.value(),
2621                SetExtendedAttributeMode::Set,
2622            )
2623            .await
2624            .unwrap();
2625        assert_eq!(
2626            object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2627            test_small_attr.value()
2628        );
2629
2630        // Even though we are back under the threshold, we still expect it to be stored in an fxfs
2631        // attribute, because we don't downgrade to inline once we've allocated one.
2632        assert_eq!(
2633            object
2634                .read_attr(64)
2635                .await
2636                .expect("read_attr failed")
2637                .expect("read_attr returned none")
2638                .into_vec(),
2639            test_small_attr.value()
2640        );
2641
2642        crate::fsck::fsck(fs.clone()).await.unwrap();
2643
2644        object.remove_extended_attribute(test_small_attr.name()).await.expect("failed to remove");
2645
2646        crate::fsck::fsck(fs.clone()).await.unwrap();
2647
2648        fs.close().await.expect("close failed");
2649    }
2650
2651    #[fuchsia::test]
2652    async fn extended_attribute_max_size() {
2653        let (fs, object) = test_filesystem_and_empty_object().await;
2654
2655        let test_attr = TestAttr::new(
2656            vec![3u8; super::MAX_XATTR_NAME_SIZE],
2657            vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2658        );
2659
2660        object
2661            .set_extended_attribute(
2662                test_attr.name(),
2663                test_attr.value(),
2664                SetExtendedAttributeMode::Set,
2665            )
2666            .await
2667            .unwrap();
2668        assert_eq!(
2669            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2670            test_attr.value()
2671        );
2672        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2673        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2674
2675        fs.close().await.expect("close failed");
2676    }
2677
2678    #[fuchsia::test]
2679    async fn extended_attribute_remove_then_create() {
2680        let (fs, object) = test_filesystem_and_empty_object().await;
2681
2682        let test_attr = TestAttr::new(
2683            vec![3u8; super::MAX_XATTR_NAME_SIZE],
2684            vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2685        );
2686
2687        object
2688            .set_extended_attribute(
2689                test_attr.name(),
2690                test_attr.value(),
2691                SetExtendedAttributeMode::Create,
2692            )
2693            .await
2694            .unwrap();
2695        fs.journal().force_compact().await.unwrap();
2696        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2697        object
2698            .set_extended_attribute(
2699                test_attr.name(),
2700                test_attr.value(),
2701                SetExtendedAttributeMode::Create,
2702            )
2703            .await
2704            .unwrap();
2705
2706        assert_eq!(
2707            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2708            test_attr.value()
2709        );
2710
2711        fs.close().await.expect("close failed");
2712    }
2713
2714    #[fuchsia::test]
2715    async fn large_extended_attribute_max_number() {
2716        let (fs, object) = test_filesystem_and_empty_object().await;
2717
2718        let max_xattrs =
2719            super::EXTENDED_ATTRIBUTE_RANGE_END - super::EXTENDED_ATTRIBUTE_RANGE_START;
2720        for i in 0..max_xattrs {
2721            let test_attr = TestAttr::new(format!("{}", i).as_bytes(), vec![0x3; 300]);
2722            object
2723                .set_extended_attribute(
2724                    test_attr.name(),
2725                    test_attr.value(),
2726                    SetExtendedAttributeMode::Set,
2727                )
2728                .await
2729                .unwrap_or_else(|_| panic!("failed to set xattr number {}", i));
2730        }
2731
2732        // That should have taken up all the attributes we've allocated to extended attributes, so
2733        // this one should return ERR_NO_SPACE.
2734        match object
2735            .set_extended_attribute(
2736                b"one.too.many".to_vec(),
2737                vec![0x3; 300],
2738                SetExtendedAttributeMode::Set,
2739            )
2740            .await
2741        {
2742            Ok(()) => panic!("set should not succeed"),
2743            Err(e) => is_error(e, FxfsError::NoSpace),
2744        }
2745
2746        // But inline attributes don't need an attribute number, so it should work fine.
2747        object
2748            .set_extended_attribute(
2749                b"this.is.okay".to_vec(),
2750                b"small value".to_vec(),
2751                SetExtendedAttributeMode::Set,
2752            )
2753            .await
2754            .unwrap();
2755
2756        // And updating existing ones should be okay.
2757        object
2758            .set_extended_attribute(b"11".to_vec(), vec![0x4; 300], SetExtendedAttributeMode::Set)
2759            .await
2760            .unwrap();
2761        object
2762            .set_extended_attribute(
2763                b"12".to_vec(),
2764                vec![0x1; 300],
2765                SetExtendedAttributeMode::Replace,
2766            )
2767            .await
2768            .unwrap();
2769
2770        // And we should be able to remove an attribute and set another one.
2771        object.remove_extended_attribute(b"5".to_vec()).await.unwrap();
2772        object
2773            .set_extended_attribute(
2774                b"new attr".to_vec(),
2775                vec![0x3; 300],
2776                SetExtendedAttributeMode::Set,
2777            )
2778            .await
2779            .unwrap();
2780
2781        fs.close().await.expect("close failed");
2782    }
2783
2784    #[fuchsia::test]
2785    async fn write_attr_trims_beyond_new_end() {
2786        // When writing, multi_write will deallocate old extents that overlap with the new data,
2787        // but it doesn't trim anything beyond that, since it doesn't know what the total size will
2788        // be. write_attr does know, because it writes the whole attribute at once, so we need to
2789        // make sure it cleans up properly.
2790        let (fs, object) = test_filesystem_and_empty_object().await;
2791
2792        let block_size = fs.block_size();
2793        let buf_size = block_size * 2;
2794        let attribute_id = 10;
2795
2796        let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2797        let mut buffer = object.allocate_buffer(buf_size as usize).await;
2798        buffer.as_mut_slice().fill(3);
2799        // Writing two separate ranges, even if they are contiguous, forces them to be separate
2800        // extent records.
2801        object
2802            .multi_write(
2803                &mut transaction,
2804                attribute_id,
2805                &[0..block_size, block_size..block_size * 2],
2806                buffer.as_mut(),
2807            )
2808            .await
2809            .unwrap();
2810        transaction.add(
2811            object.store().store_object_id,
2812            Mutation::replace_or_insert_object(
2813                ObjectKey::attribute(object.object_id(), attribute_id, AttributeKey::Attribute),
2814                ObjectValue::attribute(block_size * 2, false),
2815            ),
2816        );
2817        transaction.commit().await.unwrap();
2818
2819        crate::fsck::fsck(fs.clone()).await.unwrap();
2820
2821        let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2822        let needs_trim = (*object)
2823            .write_attr(&mut transaction, attribute_id, &vec![3u8; block_size as usize])
2824            .await
2825            .unwrap();
2826        assert!(!needs_trim.0);
2827        transaction.commit().await.unwrap();
2828
2829        crate::fsck::fsck(fs.clone()).await.unwrap();
2830
2831        fs.close().await.expect("close failed");
2832    }
2833
2834    #[fuchsia::test]
2835    async fn write_new_attr_in_batches_multiple_txns() {
2836        let (fs, object) = test_filesystem_and_empty_object().await;
2837        let merkle_tree = vec![1; 3 * WRITE_ATTR_BATCH_SIZE];
2838        let mut transaction =
2839            (*object).new_transaction(FSVERITY_MERKLE_ATTRIBUTE_ID).await.unwrap();
2840        object
2841            .write_new_attr_in_batches(
2842                &mut transaction,
2843                FSVERITY_MERKLE_ATTRIBUTE_ID,
2844                &merkle_tree,
2845                WRITE_ATTR_BATCH_SIZE,
2846            )
2847            .await
2848            .expect("failed to write merkle attribute");
2849
2850        transaction.add(
2851            object.store().store_object_id,
2852            Mutation::replace_or_insert_object(
2853                ObjectKey::graveyard_attribute_entry(
2854                    object.store().graveyard_directory_object_id(),
2855                    object.object_id(),
2856                    FSVERITY_MERKLE_ATTRIBUTE_ID,
2857                ),
2858                ObjectValue::None,
2859            ),
2860        );
2861        transaction.commit().await.unwrap();
2862        assert_eq!(
2863            object.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
2864            Some(merkle_tree.into())
2865        );
2866
2867        fs.close().await.expect("close failed");
2868    }
2869
2870    // Running on target only, to use fake time features in the executor.
2871    #[cfg(target_os = "fuchsia")]
2872    #[fuchsia::test(allow_stalls = false)]
2873    async fn test_watchdog() {
2874        use super::Watchdog;
2875        use fuchsia_async::{MonotonicDuration, MonotonicInstant, TestExecutor};
2876        use std::sync::mpsc::channel;
2877
2878        TestExecutor::advance_to(make_time(0)).await;
2879        let (sender, receiver) = channel();
2880
2881        fn make_time(time_secs: i64) -> MonotonicInstant {
2882            MonotonicInstant::from_nanos(0) + MonotonicDuration::from_seconds(time_secs)
2883        }
2884
2885        {
2886            let _watchdog = Watchdog::new(10, move |count| {
2887                sender.send(count).expect("Sending value");
2888            });
2889
2890            // Too early.
2891            TestExecutor::advance_to(make_time(5)).await;
2892            receiver.try_recv().expect_err("Should not have message");
2893
2894            // First message.
2895            TestExecutor::advance_to(make_time(10)).await;
2896            assert_eq!(1, receiver.recv().expect("Receiving"));
2897
2898            // Too early for the next.
2899            TestExecutor::advance_to(make_time(15)).await;
2900            receiver.try_recv().expect_err("Should not have message");
2901
2902            // Missed one. They'll be spooled up.
2903            TestExecutor::advance_to(make_time(30)).await;
2904            assert_eq!(2, receiver.recv().expect("Receiving"));
2905            assert_eq!(3, receiver.recv().expect("Receiving"));
2906        }
2907
2908        // Watchdog is dropped, nothing should trigger.
2909        TestExecutor::advance_to(make_time(100)).await;
2910        receiver.recv().expect_err("Watchdog should be gone");
2911    }
2912
2913    #[fuchsia::test]
2914    fn test_checksum_range_chunk() {
2915        let block_size = 4096;
2916
2917        // No bitmap means one chunk that covers the whole range
2918        assert_eq!(
2919            ChecksumRangeChunk::group_first_write_ranges(
2920                &mut OverwriteBitmaps::None,
2921                block_size,
2922                block_size * 2..block_size * 5,
2923            ),
2924            vec![ChecksumRangeChunk {
2925                checksum_range: 0..3,
2926                device_range: block_size * 2..block_size * 5,
2927                is_first_write: false,
2928            }],
2929        );
2930
2931        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2932        assert_eq!(
2933            ChecksumRangeChunk::group_first_write_ranges(
2934                &mut bitmaps,
2935                block_size,
2936                block_size * 2..block_size * 5,
2937            ),
2938            vec![ChecksumRangeChunk {
2939                checksum_range: 0..3,
2940                device_range: block_size * 2..block_size * 5,
2941                is_first_write: false,
2942            }],
2943        );
2944        assert_eq!(
2945            bitmaps.take_bitmaps(),
2946            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b11100000])))
2947        );
2948
2949        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2950        bitmaps.set_offset(2);
2951        assert_eq!(
2952            ChecksumRangeChunk::group_first_write_ranges(
2953                &mut bitmaps,
2954                block_size,
2955                block_size * 2..block_size * 5,
2956            ),
2957            vec![
2958                ChecksumRangeChunk {
2959                    checksum_range: 0..2,
2960                    device_range: block_size * 2..block_size * 4,
2961                    is_first_write: false,
2962                },
2963                ChecksumRangeChunk {
2964                    checksum_range: 2..3,
2965                    device_range: block_size * 4..block_size * 5,
2966                    is_first_write: true,
2967                },
2968            ],
2969        );
2970        assert_eq!(
2971            bitmaps.take_bitmaps(),
2972            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00111000])))
2973        );
2974
2975        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2976        bitmaps.set_offset(4);
2977        assert_eq!(
2978            ChecksumRangeChunk::group_first_write_ranges(
2979                &mut bitmaps,
2980                block_size,
2981                block_size * 2..block_size * 5,
2982            ),
2983            vec![ChecksumRangeChunk {
2984                checksum_range: 0..3,
2985                device_range: block_size * 2..block_size * 5,
2986                is_first_write: true,
2987            }],
2988        );
2989        assert_eq!(
2990            bitmaps.take_bitmaps(),
2991            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00001110])))
2992        );
2993
2994        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b01010101]));
2995        assert_eq!(
2996            ChecksumRangeChunk::group_first_write_ranges(
2997                &mut bitmaps,
2998                block_size,
2999                block_size * 2..block_size * 10,
3000            ),
3001            vec![
3002                ChecksumRangeChunk {
3003                    checksum_range: 0..1,
3004                    device_range: block_size * 2..block_size * 3,
3005                    is_first_write: true,
3006                },
3007                ChecksumRangeChunk {
3008                    checksum_range: 1..2,
3009                    device_range: block_size * 3..block_size * 4,
3010                    is_first_write: false,
3011                },
3012                ChecksumRangeChunk {
3013                    checksum_range: 2..3,
3014                    device_range: block_size * 4..block_size * 5,
3015                    is_first_write: true,
3016                },
3017                ChecksumRangeChunk {
3018                    checksum_range: 3..4,
3019                    device_range: block_size * 5..block_size * 6,
3020                    is_first_write: false,
3021                },
3022                ChecksumRangeChunk {
3023                    checksum_range: 4..5,
3024                    device_range: block_size * 6..block_size * 7,
3025                    is_first_write: true,
3026                },
3027                ChecksumRangeChunk {
3028                    checksum_range: 5..6,
3029                    device_range: block_size * 7..block_size * 8,
3030                    is_first_write: false,
3031                },
3032                ChecksumRangeChunk {
3033                    checksum_range: 6..7,
3034                    device_range: block_size * 8..block_size * 9,
3035                    is_first_write: true,
3036                },
3037                ChecksumRangeChunk {
3038                    checksum_range: 7..8,
3039                    device_range: block_size * 9..block_size * 10,
3040                    is_first_write: false,
3041                },
3042            ],
3043        );
3044        assert_eq!(
3045            bitmaps.take_bitmaps(),
3046            Some((BitVec::from_bytes(&[0b01010101]), BitVec::from_bytes(&[0b11111111])))
3047        );
3048    }
3049}