Skip to main content

fxfs/object_store/
store_object_handle.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::{Checksum, Checksums, fletcher64};
6use crate::errors::FxfsError;
7use crate::log::*;
8use crate::lsm_tree::Query;
9use crate::lsm_tree::merge::MergerIterator;
10use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
11use crate::object_handle::ObjectHandle;
12use crate::object_store::extent_record::{ExtentMode, ExtentValue};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::object_record::{
15    AttributeKey, ExtendedAttributeValue, ObjectAttributes, ObjectItem, ObjectKey, ObjectKeyData,
16    ObjectValue, Timestamp,
17};
18use crate::object_store::transaction::{
19    AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Options, ReadGuard,
20    Transaction, lock_keys,
21};
22use crate::object_store::{
23    AttributeId, Extent, HandleOptions, HandleOwner, ObjectStore, TrimMode, TrimResult,
24    VOLUME_DATA_KEY_ID,
25};
26use crate::range::RangeExt;
27use crate::round::{round_down, round_up};
28use anyhow::{Context, Error, anyhow, bail, ensure};
29use assert_matches::assert_matches;
30use bit_vec::BitVec;
31use futures::stream::{FuturesOrdered, FuturesUnordered};
32use futures::{TryStreamExt, try_join};
33use fxfs_crypto::{
34    Cipher, CipherHolder, CipherSet, EncryptionKey, FindKeyResult, FxfsCipher, KeyPurpose,
35};
36use fxfs_trace::{TraceFutureExt, trace, trace_future_args};
37use static_assertions::const_assert;
38use std::cmp::min;
39use std::future::Future;
40use std::ops::Range;
41use std::sync::Arc;
42use std::sync::atomic::{self, AtomicBool, Ordering};
43use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
44use storage_device::{InlineCryptoOptions, ReadOptions, WriteOptions};
45
46use fidl_fuchsia_io as fio;
47use fuchsia_async as fasync;
48
49/// Maximum size for an extended attribute name.
50pub const MAX_XATTR_NAME_SIZE: usize = 255;
51/// Maximum size an extended attribute can be before it's stored in an object attribute instead of
52/// inside the record directly.
53pub const MAX_INLINE_XATTR_SIZE: usize = 256;
54/// Maximum size for an extended attribute value. NB: the maximum size for an extended attribute is
55/// 64kB, which we rely on for correctness when deleting attributes, so ensure it's always
56/// enforced.
57pub const MAX_XATTR_VALUE_SIZE: usize = 64000;
58
59/// Zeroes blocks in 'buffer' based on `bitmap`, one bit per block from start of buffer.
60fn apply_bitmap_zeroing(
61    block_size: usize,
62    bitmap: &bit_vec::BitVec,
63    mut buffer: MutableBufferRef<'_>,
64) {
65    let buf = buffer.as_mut_slice();
66    debug_assert_eq!(bitmap.len() * block_size, buf.len());
67    for (i, block) in bitmap.iter().enumerate() {
68        if !block {
69            let start = i * block_size;
70            buf[start..start + block_size].fill(0);
71        }
72    }
73}
74
75/// When writing, often the logic should be generic over whether or not checksums are generated.
76/// This provides that and a handy way to convert to the more general ExtentMode that eventually
77/// stores it on disk.
78#[derive(Debug, Clone, PartialEq)]
79pub enum MaybeChecksums {
80    None,
81    Fletcher(Vec<Checksum>),
82}
83
84impl MaybeChecksums {
85    pub fn maybe_as_ref(&self) -> Option<&[Checksum]> {
86        match self {
87            Self::None => None,
88            Self::Fletcher(sums) => Some(&sums),
89        }
90    }
91
92    pub fn split_off(&mut self, at: usize) -> Self {
93        match self {
94            Self::None => Self::None,
95            Self::Fletcher(sums) => Self::Fletcher(sums.split_off(at)),
96        }
97    }
98
99    pub fn to_mode(self) -> ExtentMode {
100        match self {
101            Self::None => ExtentMode::Raw,
102            Self::Fletcher(sums) => ExtentMode::Cow(Checksums::fletcher(sums)),
103        }
104    }
105
106    pub fn into_option(self) -> Option<Vec<Checksum>> {
107        match self {
108            Self::None => None,
109            Self::Fletcher(sums) => Some(sums),
110        }
111    }
112}
113
114/// The mode of operation when setting extended attributes. This is the same as the fidl definition
115/// but is replicated here so we don't have fuchsia.io structures in the api, so this can be used
116/// on host.
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118pub enum SetExtendedAttributeMode {
119    /// Create the extended attribute if it doesn't exist, replace the value if it does.
120    Set,
121    /// Create the extended attribute if it doesn't exist, fail if it does.
122    Create,
123    /// Replace the extended attribute value if it exists, fail if it doesn't.
124    Replace,
125}
126
127impl From<fio::SetExtendedAttributeMode> for SetExtendedAttributeMode {
128    fn from(other: fio::SetExtendedAttributeMode) -> SetExtendedAttributeMode {
129        match other {
130            fio::SetExtendedAttributeMode::Set => SetExtendedAttributeMode::Set,
131            fio::SetExtendedAttributeMode::Create => SetExtendedAttributeMode::Create,
132            fio::SetExtendedAttributeMode::Replace => SetExtendedAttributeMode::Replace,
133        }
134    }
135}
136
137enum Encryption {
138    /// The object doesn't use encryption.
139    None,
140
141    /// The object has keys that are cached (which means unwrapping occurs on-demand) with
142    /// KeyManager.
143    CachedKeys,
144
145    /// The object has permanent keys registered with KeyManager.
146    PermanentKeys,
147}
148
149#[derive(PartialEq, Debug)]
150enum OverwriteBitmaps {
151    None,
152    Some {
153        /// The block bitmap of a partial overwrite extent in the tree.
154        extent_bitmap: BitVec,
155        /// A bitmap of the blocks written to by the current overwrite.
156        write_bitmap: BitVec,
157        /// BitVec doesn't have a slice equivalent, so for a particular section of the write we
158        /// keep track of an offset in the bitmaps to operate on.
159        bitmap_offset: usize,
160    },
161}
162
163impl OverwriteBitmaps {
164    fn new(extent_bitmap: BitVec) -> Self {
165        OverwriteBitmaps::Some {
166            write_bitmap: BitVec::from_elem(extent_bitmap.len(), false),
167            extent_bitmap,
168            bitmap_offset: 0,
169        }
170    }
171
172    fn is_none(&self) -> bool {
173        *self == OverwriteBitmaps::None
174    }
175
176    fn set_offset(&mut self, new_offset: usize) {
177        match self {
178            OverwriteBitmaps::None => (),
179            OverwriteBitmaps::Some { bitmap_offset, .. } => *bitmap_offset = new_offset,
180        }
181    }
182
183    fn get_from_extent_bitmap(&self, i: usize) -> Option<bool> {
184        match self {
185            OverwriteBitmaps::None => None,
186            OverwriteBitmaps::Some { extent_bitmap, bitmap_offset, .. } => {
187                extent_bitmap.get(*bitmap_offset + i)
188            }
189        }
190    }
191
192    fn set_in_write_bitmap(&mut self, i: usize, x: bool) {
193        match self {
194            OverwriteBitmaps::None => (),
195            OverwriteBitmaps::Some { write_bitmap, bitmap_offset, .. } => {
196                write_bitmap.set(*bitmap_offset + i, x)
197            }
198        }
199    }
200
201    fn take_bitmaps(self) -> Option<(BitVec, BitVec)> {
202        match self {
203            OverwriteBitmaps::None => None,
204            OverwriteBitmaps::Some { extent_bitmap, write_bitmap, .. } => {
205                Some((extent_bitmap, write_bitmap))
206            }
207        }
208    }
209}
210
211/// When writing to Overwrite ranges, we need to emit whether a set of checksums for a device range
212/// is the first write to that region or not. This tracks one such range so we can use it after the
213/// write to break up the returned checksum list.
214#[derive(PartialEq, Debug)]
215struct ChecksumRangeChunk {
216    checksum_range: Range<usize>,
217    device_range: Range<u64>,
218    is_first_write: bool,
219}
220
221impl ChecksumRangeChunk {
222    fn group_first_write_ranges(
223        bitmaps: &mut OverwriteBitmaps,
224        block_size: u64,
225        write_device_range: Range<u64>,
226    ) -> Vec<ChecksumRangeChunk> {
227        let write_block_len = (write_device_range.length().unwrap() / block_size) as usize;
228        if bitmaps.is_none() {
229            // If there is no bitmap, then the overwrite range is fully written to. However, we
230            // could still be within the journal flush window where one of the blocks was written
231            // to for the first time to put it in this state, so we still need to emit the
232            // checksums in case replay needs them.
233            vec![ChecksumRangeChunk {
234                checksum_range: 0..write_block_len,
235                device_range: write_device_range,
236                is_first_write: false,
237            }]
238        } else {
239            let mut checksum_ranges = vec![ChecksumRangeChunk {
240                checksum_range: 0..0,
241                device_range: write_device_range.start..write_device_range.start,
242                is_first_write: !bitmaps.get_from_extent_bitmap(0).unwrap(),
243            }];
244            let mut working_range = checksum_ranges.last_mut().unwrap();
245            for i in 0..write_block_len {
246                bitmaps.set_in_write_bitmap(i, true);
247
248                // bitmap.get returning true means the block is initialized and therefore has been
249                // written to before.
250                if working_range.is_first_write != bitmaps.get_from_extent_bitmap(i).unwrap() {
251                    // is_first_write is tracking opposite of what comes back from the bitmap, so
252                    // if the are still opposites we continue our current range.
253                    working_range.checksum_range.end += 1;
254                    working_range.device_range.end += block_size;
255                } else {
256                    // If they are the same, then we need to make a new chunk.
257                    let new_chunk = ChecksumRangeChunk {
258                        checksum_range: working_range.checksum_range.end
259                            ..working_range.checksum_range.end + 1,
260                        device_range: working_range.device_range.end
261                            ..working_range.device_range.end + block_size,
262                        is_first_write: !working_range.is_first_write,
263                    };
264                    checksum_ranges.push(new_chunk);
265                    working_range = checksum_ranges.last_mut().unwrap();
266                }
267            }
268            checksum_ranges
269        }
270    }
271}
272
273/// StoreObjectHandle is the lowest-level, untyped handle to an object with the id [`object_id`] in
274/// a particular store, [`owner`]. It provides functionality shared across all objects, such as
275/// reading and writing attributes and managing encryption keys.
276///
277/// Since it's untyped, it doesn't do any object kind validation, and is generally meant to
278/// implement higher-level typed handles.
279///
280/// For file-like objects with a data attribute, DataObjectHandle implements traits and helpers for
281/// doing more complex extent management and caches the content size.
282///
283/// For directory-like objects, Directory knows how to add and remove child objects and enumerate
284/// its children.
285pub struct StoreObjectHandle<S: HandleOwner> {
286    owner: Arc<S>,
287    object_id: u64,
288    options: HandleOptions,
289    trace: AtomicBool,
290    encryption: Encryption,
291}
292
293impl<S: HandleOwner> ObjectHandle for StoreObjectHandle<S> {
294    fn set_trace(&self, v: bool) {
295        info!(store_id = self.store().store_object_id, oid = self.object_id(), trace = v; "trace");
296        self.trace.store(v, atomic::Ordering::Relaxed);
297    }
298
299    fn object_id(&self) -> u64 {
300        return self.object_id;
301    }
302
303    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
304        self.store().device.allocate_buffer(size)
305    }
306
307    fn block_size(&self) -> u64 {
308        self.store().block_size()
309    }
310}
311
312struct Watchdog {
313    _task: fasync::Task<()>,
314}
315
316impl Watchdog {
317    fn new(increment_seconds: u64, cb: impl Fn(u64) + Send + 'static) -> Self {
318        Self {
319            _task: fasync::Task::spawn(
320                async move {
321                    let increment = increment_seconds.try_into().unwrap();
322                    let mut fired_counter = 0;
323                    let mut next_wake = fasync::MonotonicInstant::now();
324                    loop {
325                        next_wake += std::time::Duration::from_secs(increment).into();
326                        // If this isn't being scheduled this will purposely result in fast looping
327                        // when it does. This will be insightful about the state of the thread and
328                        // task scheduling.
329                        if fasync::MonotonicInstant::now() < next_wake {
330                            fasync::Timer::new(next_wake).await;
331                        }
332                        fired_counter += 1;
333                        cb(fired_counter);
334                    }
335                }
336                .trace(trace_future_args!("StoreObjectHandle::Watchdog")),
337            ),
338        }
339    }
340}
341
342impl<S: HandleOwner> StoreObjectHandle<S> {
343    /// Make a new StoreObjectHandle for the object with id [`object_id`] in store [`owner`].
344    pub fn new(
345        owner: Arc<S>,
346        object_id: u64,
347        permanent_keys: bool,
348        options: HandleOptions,
349        trace: bool,
350    ) -> Self {
351        let encryption = if permanent_keys {
352            Encryption::PermanentKeys
353        } else if owner.as_ref().as_ref().is_encrypted() {
354            Encryption::CachedKeys
355        } else {
356            Encryption::None
357        };
358        Self { owner, object_id, encryption, options, trace: AtomicBool::new(trace) }
359    }
360
361    pub fn owner(&self) -> &Arc<S> {
362        &self.owner
363    }
364
365    pub fn store(&self) -> &ObjectStore {
366        self.owner.as_ref().as_ref()
367    }
368
369    pub fn trace(&self) -> bool {
370        self.trace.load(atomic::Ordering::Relaxed)
371    }
372
373    pub fn is_encrypted(&self) -> bool {
374        !matches!(self.encryption, Encryption::None)
375    }
376
377    /// Get the default set of transaction options for this object. This is mostly the overall
378    /// default, modified by any [`HandleOptions`] held by this handle.
379    pub fn default_transaction_options<'b>(&self) -> Options<'b> {
380        Options { skip_journal_checks: self.options.skip_journal_checks, ..Default::default() }
381    }
382
383    pub async fn new_transaction_with_options<'b>(
384        &self,
385        attribute_id: AttributeId,
386        options: Options<'b>,
387    ) -> Result<Transaction<'b>, Error> {
388        Ok(self
389            .store()
390            .new_transaction(
391                lock_keys![
392                    LockKey::object_attribute(
393                        self.store().store_object_id(),
394                        self.object_id(),
395                        attribute_id,
396                    ),
397                    LockKey::object(self.store().store_object_id(), self.object_id()),
398                ],
399                options,
400            )
401            .await?)
402    }
403
404    pub async fn new_transaction<'b>(
405        &self,
406        attribute_id: AttributeId,
407    ) -> Result<Transaction<'b>, Error> {
408        self.new_transaction_with_options(attribute_id, self.default_transaction_options()).await
409    }
410
411    // If |transaction| has an impending mutation for the underlying object, returns that.
412    // Otherwise, looks up the object from the tree.
413    async fn txn_get_object_mutation(
414        &self,
415        transaction: &Transaction<'_>,
416    ) -> Result<ObjectStoreMutation, Error> {
417        self.store().txn_get_object_mutation(transaction, self.object_id()).await
418    }
419
420    // Returns the amount deallocated.
421    async fn deallocate_old_extents(
422        &self,
423        transaction: &mut Transaction<'_>,
424        attribute_id: AttributeId,
425        range: Range<u64>,
426    ) -> Result<u64, Error> {
427        let block_size = self.block_size();
428        assert_eq!(range.start % block_size, 0);
429        assert_eq!(range.end % block_size, 0);
430        if range.start == range.end {
431            return Ok(0);
432        }
433        let tree = &self.store().tree;
434        let layer_set = tree.layer_set();
435        let key = Extent(range);
436        let lower_bound = ObjectKey::attribute(
437            self.object_id(),
438            attribute_id,
439            AttributeKey::Extent(key.search_key()),
440        );
441        let mut merger = layer_set.merger();
442        let mut iter = merger.query(Query::FullRange(&lower_bound)).await?;
443        let allocator = self.store().allocator();
444        let mut deallocated = 0;
445        let trace = self.trace();
446        while let Some(ItemRef {
447            key:
448                ObjectKey {
449                    object_id,
450                    data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
451                },
452            value: ObjectValue::Extent(value),
453            ..
454        }) = iter.get()
455        {
456            if *object_id != self.object_id() || *attr_id != attribute_id {
457                break;
458            }
459            if let ExtentValue::Some { device_offset, .. } = value {
460                if let Some(overlap) = key.overlap(extent_key) {
461                    let range = device_offset + overlap.start - extent_key.start
462                        ..device_offset + overlap.end - extent_key.start;
463                    ensure!(range.is_aligned(block_size), FxfsError::Inconsistent);
464                    if trace {
465                        info!(
466                            store_id = self.store().store_object_id(),
467                            oid = self.object_id(),
468                            device_range:? = range,
469                            len = range.end - range.start,
470                            extent_key:?;
471                            "D",
472                        );
473                    }
474                    allocator
475                        .deallocate(transaction, self.store().store_object_id(), range)
476                        .await?;
477                    deallocated += overlap.end - overlap.start;
478                } else {
479                    break;
480                }
481            }
482            iter.advance().await?;
483        }
484        Ok(deallocated)
485    }
486
487    // Writes aligned data (that should already be encrypted) to the given offset and computes
488    // checksums if requested. The aligned data must be from a single logical file range.
489    async fn write_aligned(
490        &self,
491        buf: BufferRef<'_>,
492        device_offset: u64,
493        crypt_ctx: Option<(u32, u8)>,
494    ) -> Result<MaybeChecksums, Error> {
495        if self.trace() {
496            info!(
497                store_id = self.store().store_object_id(),
498                oid = self.object_id(),
499                device_range:? = (device_offset..device_offset + buf.len() as u64),
500                len = buf.len();
501                "W",
502            );
503        }
504        let store = self.store();
505        store.device_write_ops.fetch_add(1, Ordering::Relaxed);
506        let mut checksums = Vec::new();
507        let _watchdog = Watchdog::new(10, |count| {
508            warn!("Write has been stalled for {} seconds", count * 10);
509        });
510
511        match crypt_ctx {
512            Some((dun, slot)) => {
513                if !store.filesystem().options().barriers_enabled {
514                    return Err(anyhow!(FxfsError::InvalidArgs)
515                        .context("Barriers must be enabled for inline encrypted writes."));
516                }
517                store
518                    .device
519                    .write_with_opts(
520                        device_offset as u64,
521                        buf,
522                        WriteOptions {
523                            inline_crypto: InlineCryptoOptions::enabled(slot, dun),
524                            ..Default::default()
525                        },
526                    )
527                    .await?;
528                Ok(MaybeChecksums::None)
529            }
530            None => {
531                if self.options.skip_checksums {
532                    store
533                        .device
534                        .write_with_opts(device_offset as u64, buf, WriteOptions::default())
535                        .await?;
536                    Ok(MaybeChecksums::None)
537                } else {
538                    try_join!(store.device.write(device_offset, buf), async {
539                        let block_size = self.block_size();
540                        for chunk in buf.as_slice().chunks_exact(block_size as usize) {
541                            checksums.push(fletcher64(chunk, 0));
542                        }
543                        Ok(())
544                    })?;
545                    Ok(MaybeChecksums::Fletcher(checksums))
546                }
547            }
548        }
549    }
550
551    /// Flushes the underlying device.  This is expensive and should be used sparingly.
552    pub async fn flush_device(&self) -> Result<(), Error> {
553        self.store().device().flush().await
554    }
555
556    pub async fn update_allocated_size(
557        &self,
558        transaction: &mut Transaction<'_>,
559        allocated: u64,
560        deallocated: u64,
561    ) -> Result<(), Error> {
562        if allocated == deallocated {
563            return Ok(());
564        }
565        let mut mutation = self.txn_get_object_mutation(transaction).await?;
566        if let ObjectValue::Object {
567            attributes: ObjectAttributes { project_id, allocated_size, .. },
568            ..
569        } = &mut mutation.item.value
570        {
571            // The only way for these to fail are if the volume is inconsistent.
572            *allocated_size = allocated_size
573                .checked_add(allocated)
574                .ok_or_else(|| anyhow!(FxfsError::Inconsistent).context("Allocated size overflow"))?
575                .checked_sub(deallocated)
576                .ok_or_else(|| {
577                    anyhow!(FxfsError::Inconsistent).context("Allocated size underflow")
578                })?;
579
580            if let Some(project_id) = project_id {
581                // The allocated and deallocated shouldn't exceed the max size of the file which is
582                // bound within i64.
583                let diff = i64::try_from(allocated).unwrap() - i64::try_from(deallocated).unwrap();
584                transaction.add(
585                    self.store().store_object_id(),
586                    Mutation::merge_object(
587                        ObjectKey::project_usage(
588                            self.store().root_directory_object_id(),
589                            *project_id,
590                        ),
591                        ObjectValue::BytesAndNodes { bytes: diff, nodes: 0 },
592                    ),
593                );
594            }
595        } else {
596            // This can occur when the object mutation is created from an object in the tree which
597            // was corrupt.
598            bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
599        }
600        transaction.add(self.store().store_object_id, Mutation::ObjectStore(mutation));
601        Ok(())
602    }
603
604    pub async fn update_attributes<'a>(
605        &self,
606        transaction: &mut Transaction<'a>,
607        node_attributes: Option<&fio::MutableNodeAttributes>,
608        change_time: Option<Timestamp>,
609    ) -> Result<(), Error> {
610        if let Some(&fio::MutableNodeAttributes { selinux_context: Some(ref context), .. }) =
611            node_attributes
612        {
613            if let fio::SelinuxContext::Data(context) = context {
614                self.set_extended_attribute_impl(
615                    "security.selinux".into(),
616                    context.clone(),
617                    SetExtendedAttributeMode::Set,
618                    transaction,
619                )
620                .await?;
621            } else {
622                return Err(anyhow!(FxfsError::InvalidArgs)
623                    .context("Only set SELinux context with `data` member."));
624            }
625        }
626        self.store()
627            .update_attributes(transaction, self.object_id, node_attributes, change_time)
628            .await
629    }
630
631    /// Zeroes the given range.  The range must be aligned.  Returns the amount of data deallocated.
632    pub async fn zero(
633        &self,
634        transaction: &mut Transaction<'_>,
635        attribute_id: AttributeId,
636        range: Range<u64>,
637    ) -> Result<(), Error> {
638        let deallocated =
639            self.deallocate_old_extents(transaction, attribute_id, range.clone()).await?;
640        if deallocated > 0 {
641            self.update_allocated_size(transaction, 0, deallocated).await?;
642            transaction.add(
643                self.store().store_object_id,
644                Mutation::merge_object(
645                    ObjectKey::extent(self.object_id(), attribute_id, range),
646                    ObjectValue::Extent(ExtentValue::deleted_extent()),
647                ),
648            );
649        }
650        Ok(())
651    }
652
653    // Returns a new aligned buffer (reading the head and tail blocks if necessary) with a copy of
654    // the data from `buf`.
655    pub async fn align_buffer(
656        &self,
657        attribute_id: AttributeId,
658        offset: u64,
659        buf: BufferRef<'_>,
660    ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
661        let block_size = self.block_size();
662        let end = offset + buf.len() as u64;
663        let aligned =
664            round_down(offset, block_size)..round_up(end, block_size).ok_or(FxfsError::TooBig)?;
665
666        let mut aligned_buf =
667            self.store().device.allocate_buffer((aligned.end - aligned.start) as usize).await;
668
669        // Deal with head alignment.
670        if aligned.start < offset {
671            let mut head_block = aligned_buf.subslice_mut(..block_size as usize);
672            let read = self.read(attribute_id, aligned.start, head_block.reborrow()).await?;
673            head_block.as_mut_slice()[read..].fill(0);
674        }
675
676        // Deal with tail alignment.
677        if aligned.end > end {
678            let end_block_offset = aligned.end - block_size;
679            // There's no need to read the tail block if we read it as part of the head block.
680            if offset <= end_block_offset {
681                let mut tail_block =
682                    aligned_buf.subslice_mut(aligned_buf.len() - block_size as usize..);
683                let read = self.read(attribute_id, end_block_offset, tail_block.reborrow()).await?;
684                tail_block.as_mut_slice()[read..].fill(0);
685            }
686        }
687
688        aligned_buf.as_mut_slice()
689            [(offset - aligned.start) as usize..(end - aligned.start) as usize]
690            .copy_from_slice(buf.as_slice());
691
692        Ok((aligned, aligned_buf))
693    }
694
695    /// Trim an attribute's extents, potentially adding a graveyard trim entry if more trimming is
696    /// needed, so the transaction can be committed without worrying about leaking data.
697    ///
698    /// This doesn't update the size stored in the attribute value - the caller is responsible for
699    /// doing that to keep the size up to date.
700    pub async fn shrink(
701        &self,
702        transaction: &mut Transaction<'_>,
703        attribute_id: AttributeId,
704        size: u64,
705    ) -> Result<NeedsTrim, Error> {
706        let store = self.store();
707        let needs_trim = matches!(
708            store
709                .trim_some(transaction, self.object_id(), attribute_id, TrimMode::FromOffset(size))
710                .await?,
711            TrimResult::Incomplete
712        );
713        if needs_trim {
714            // Add the object to the graveyard in case the following transactions don't get
715            // replayed.
716            let graveyard_id = store.graveyard_directory_object_id();
717            match store
718                .tree
719                .find(&ObjectKey::graveyard_entry(graveyard_id, self.object_id()))
720                .await?
721            {
722                Some(ObjectItem { value: ObjectValue::Some, .. })
723                | Some(ObjectItem { value: ObjectValue::Trim, .. }) => {
724                    // This object is already in the graveyard so we don't need to do anything.
725                }
726                _ => {
727                    transaction.add(
728                        store.store_object_id,
729                        Mutation::replace_or_insert_object(
730                            ObjectKey::graveyard_entry(graveyard_id, self.object_id()),
731                            ObjectValue::Trim,
732                        ),
733                    );
734                }
735            }
736        }
737        Ok(NeedsTrim(needs_trim))
738    }
739
740    /// Reads and decrypts a singular logical range.
741    pub async fn read_and_decrypt(
742        &self,
743        device_offset: u64,
744        file_offset: u64,
745        mut buffer: MutableBufferRef<'_>,
746        key_id: u64,
747    ) -> Result<(), Error> {
748        let store = self.store();
749        store.device_read_ops.fetch_add(1, Ordering::Relaxed);
750
751        let _watchdog = Watchdog::new(10, |count| {
752            warn!("Read has been stalled for {} seconds", count * 10);
753        });
754
755        let (_key_id, key) = self.get_key(Some(key_id)).await?;
756        if let Some(key) = key {
757            if let Some((dun, slot)) = key.crypt_ctx(self.object_id, file_offset) {
758                store
759                    .device
760                    .read_with_opts(
761                        device_offset as u64,
762                        buffer.reborrow(),
763                        ReadOptions { inline_crypto: InlineCryptoOptions::enabled(slot, dun) },
764                    )
765                    .await?;
766            } else {
767                store.device.read(device_offset, buffer.reborrow()).await?;
768                key.decrypt(self.object_id, device_offset, file_offset, buffer.as_mut_slice())?;
769            }
770        } else {
771            store.device.read(device_offset, buffer.reborrow()).await?;
772        }
773
774        Ok(())
775    }
776
777    /// Returns the specified key. If `key_id` is None, it will try and return the fscrypt key if
778    /// it is present, or the volume key if it isn't. If the fscrypt key is present, but the key
779    /// cannot be unwrapped, then this will return `FxfsError::NoKey`. If the volume is not
780    /// encrypted, this returns None.
781    pub async fn get_key(
782        &self,
783        key_id: Option<u64>,
784    ) -> Result<(u64, Option<Arc<dyn Cipher>>), Error> {
785        let store = self.store();
786        let result = match self.encryption {
787            Encryption::None => (VOLUME_DATA_KEY_ID, None),
788            Encryption::CachedKeys => {
789                if let Some(key_id) = key_id {
790                    (
791                        key_id,
792                        Some(
793                            store
794                                .key_manager
795                                .get_key(
796                                    self.object_id,
797                                    store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
798                                    async || store.get_keys(self.object_id).await,
799                                    key_id,
800                                )
801                                .await?,
802                        ),
803                    )
804                } else {
805                    let (key_id, key) = store
806                        .key_manager
807                        .get_fscrypt_key_if_present(
808                            self.object_id,
809                            store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
810                            async || store.get_keys(self.object_id).await,
811                        )
812                        .await?;
813                    (key_id, Some(key))
814                }
815            }
816            Encryption::PermanentKeys => {
817                (VOLUME_DATA_KEY_ID, Some(store.key_manager.get(self.object_id).await?.unwrap()))
818            }
819        };
820
821        // Ensure that if the key we receive uses inline encryption, barriers should be enabled.
822        if let Some(ref key) = result.1 {
823            if key.crypt_ctx(self.object_id, 0).is_some() {
824                if !store.filesystem().options().barriers_enabled {
825                    return Err(anyhow!(FxfsError::InvalidArgs)
826                        .context("Barriers must be enabled for inline encrypted writes."));
827                }
828            }
829        }
830
831        Ok(result)
832    }
833
834    /// This will only work for a non-permanent volume data key. This is designed to be used with
835    /// extended attributes where we'll only create the key on demand for directories and encrypted
836    /// files.
837    async fn get_or_create_key(
838        &self,
839        transaction: &mut Transaction<'_>,
840    ) -> Result<Arc<dyn Cipher>, Error> {
841        let store = self.store();
842
843        // Fast path: try and get keys from the cache.
844        if let Some(key) = store.key_manager.get(self.object_id).await.context("get failed")? {
845            return Ok(key);
846        }
847
848        let crypt = store.crypt().ok_or_else(|| anyhow!("No crypt!"))?;
849
850        // Next, see if the keys are already created.
851        let (mut encryption_keys, mut cipher_set) = if let Some(item) =
852            store.tree.find(&ObjectKey::keys(self.object_id)).await.context("find failed")?
853        {
854            if let ObjectValue::Keys(encryption_keys) = item.value {
855                let cipher_set = store
856                    .key_manager
857                    .get_keys(
858                        self.object_id,
859                        crypt.as_ref(),
860                        &mut Some(async || Ok(encryption_keys.clone())),
861                        /* permanent= */ false,
862                        /* force= */ false,
863                    )
864                    .await
865                    .context("get_keys failed")?;
866                match cipher_set.find_key(VOLUME_DATA_KEY_ID) {
867                    FindKeyResult::NotFound => {}
868                    FindKeyResult::Unavailable => return Err(FxfsError::NoKey.into()),
869                    FindKeyResult::Key(key) => return Ok(key),
870                }
871                (encryption_keys, (*cipher_set).clone())
872            } else {
873                return Err(anyhow!(FxfsError::Inconsistent));
874            }
875        } else {
876            Default::default()
877        };
878
879        // Proceed to create the key.  The transaction holds the required locks.
880        let (key, unwrapped_key) = crypt.create_key(self.object_id, KeyPurpose::Data).await?;
881        let cipher: Arc<dyn Cipher> = Arc::new(FxfsCipher::new(&unwrapped_key));
882
883        // Add new cipher to cloned cipher set. This will replace existing one
884        // if transaction is successful.
885        cipher_set.add_key(VOLUME_DATA_KEY_ID, CipherHolder::Cipher(cipher.clone()));
886        let cipher_set = Arc::new(cipher_set);
887
888        // Arrange for the CipherSet to be added to the cache when (and if) the transaction
889        // commits.
890        struct UnwrappedKeys {
891            object_id: u64,
892            new_keys: Arc<CipherSet>,
893        }
894
895        impl AssociatedObject for UnwrappedKeys {
896            fn will_apply_mutation(
897                &self,
898                _mutation: &Mutation,
899                object_id: u64,
900                manager: &ObjectManager,
901            ) {
902                manager.store(object_id).unwrap().key_manager.insert(
903                    self.object_id,
904                    self.new_keys.clone(),
905                    /* permanent= */ false,
906                );
907            }
908        }
909
910        encryption_keys.insert(VOLUME_DATA_KEY_ID, EncryptionKey::Fxfs(key).into());
911
912        transaction.add_with_object(
913            store.store_object_id(),
914            Mutation::replace_or_insert_object(
915                ObjectKey::keys(self.object_id),
916                ObjectValue::keys(encryption_keys),
917            ),
918            AssocObj::Owned(Box::new(UnwrappedKeys {
919                object_id: self.object_id,
920                new_keys: cipher_set,
921            })),
922        );
923
924        Ok(cipher)
925    }
926
927    pub async fn read(
928        &self,
929        attribute_id: AttributeId,
930        offset: u64,
931        mut buf: MutableBufferRef<'_>,
932    ) -> Result<usize, Error> {
933        let fs = self.store().filesystem();
934        let guard = fs
935            .lock_manager()
936            .read_lock(lock_keys![LockKey::object_attribute(
937                self.store().store_object_id(),
938                self.object_id(),
939                attribute_id,
940            )])
941            .await;
942
943        let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
944        let item = self.store().tree().find(&key).await?;
945        let size = match item {
946            Some(item) if item.key == key => match item.value {
947                ObjectValue::Attribute { size, .. } => size,
948                _ => bail!(FxfsError::Inconsistent),
949            },
950            _ => return Ok(0),
951        };
952        if offset >= size {
953            return Ok(0);
954        }
955        let length = min(buf.len() as u64, size - offset) as usize;
956        buf = buf.subslice_mut(0..length);
957        self.read_unchecked(attribute_id, offset, buf, &guard).await?;
958        Ok(length)
959    }
960
961    /// Read `buf.len()` bytes from the attribute `attribute_id`, starting at `offset`, into `buf`.
962    /// It's required that a read lock on this attribute id is taken before this is called.
963    ///
964    /// This function doesn't do any size checking - any portion of `buf` past the end of the file
965    /// will be filled with zeros. The caller is responsible for enforcing the file size on reads.
966    /// This is because, just looking at the extents, we can't tell the difference between the file
967    /// actually ending and there just being a section at the end with no data (since attributes
968    /// are sparse).
969    pub async fn read_unchecked(
970        &self,
971        attribute_id: AttributeId,
972        mut offset: u64,
973        mut buf: MutableBufferRef<'_>,
974        _guard: &ReadGuard<'_>,
975    ) -> Result<(), Error> {
976        if buf.len() == 0 {
977            return Ok(());
978        }
979        let end_offset = offset + buf.len() as u64;
980
981        self.store().logical_read_ops.fetch_add(1, Ordering::Relaxed);
982
983        // Whilst the read offset must be aligned to the filesystem block size, the buffer need only
984        // be aligned to the device's block size.
985        let block_size = self.block_size() as u64;
986        let device_block_size = self.store().device.block_size() as u64;
987        assert_eq!(offset % block_size, 0);
988        assert_eq!(buf.range().start as u64 % device_block_size, 0);
989        let tree = &self.store().tree;
990        let layer_set = tree.layer_set();
991        let mut merger = layer_set.merger();
992        let mut iter = merger
993            .query(Query::LimitedRange(&ObjectKey::extent(
994                self.object_id(),
995                attribute_id,
996                offset..end_offset,
997            )))
998            .await?;
999        let end_align = ((offset + buf.len() as u64) % block_size) as usize;
1000        let trace = self.trace();
1001        let reads = FuturesUnordered::new();
1002        while let Some(ItemRef {
1003            key:
1004                ObjectKey {
1005                    object_id,
1006                    data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
1007                },
1008            value: ObjectValue::Extent(extent_value),
1009            ..
1010        }) = iter.get()
1011        {
1012            if *object_id != self.object_id() || *attr_id != attribute_id {
1013                break;
1014            }
1015            ensure!(
1016                extent_key.is_valid() && extent_key.is_aligned(block_size),
1017                FxfsError::Inconsistent
1018            );
1019            if extent_key.start > offset {
1020                // Zero everything up to the start of the extent.
1021                let to_zero = min(extent_key.start - offset, buf.len() as u64) as usize;
1022                buf.as_mut_slice()[..to_zero].fill(0);
1023                buf = buf.subslice_mut(to_zero..);
1024                if buf.is_empty() {
1025                    break;
1026                }
1027                offset += to_zero as u64;
1028            }
1029
1030            if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1031                let mut device_offset = device_offset + (offset - extent_key.start);
1032                let key_id = *key_id;
1033
1034                let to_copy = min(buf.len() - end_align, (extent_key.end - offset) as usize);
1035                if to_copy > 0 {
1036                    if trace {
1037                        info!(
1038                            store_id = self.store().store_object_id(),
1039                            oid = self.object_id(),
1040                            device_range:? = (device_offset..device_offset + to_copy as u64),
1041                            offset,
1042                            range:? = **extent_key,
1043                            block_size;
1044                            "R",
1045                        );
1046                    }
1047                    let (mut head, tail) = buf.split_at_mut(to_copy);
1048                    let maybe_bitmap = match mode {
1049                        ExtentMode::OverwritePartial(bitmap) => {
1050                            let mut read_bitmap = bitmap
1051                                .clone()
1052                                .split_off(((offset - extent_key.start) / block_size) as usize);
1053                            read_bitmap.truncate(to_copy / block_size as usize);
1054                            Some(read_bitmap)
1055                        }
1056                        _ => None,
1057                    };
1058                    reads.push(async move {
1059                        self.read_and_decrypt(device_offset, offset, head.reborrow(), key_id)
1060                            .await?;
1061                        if let Some(bitmap) = maybe_bitmap {
1062                            apply_bitmap_zeroing(self.block_size() as usize, &bitmap, head);
1063                        }
1064                        Ok::<(), Error>(())
1065                    });
1066                    buf = tail;
1067                    if buf.is_empty() {
1068                        break;
1069                    }
1070                    offset += to_copy as u64;
1071                    device_offset += to_copy as u64;
1072                }
1073
1074                // Deal with end alignment by reading the existing contents into an alignment
1075                // buffer.
1076                if offset < extent_key.end && end_align > 0 {
1077                    if let ExtentMode::OverwritePartial(bitmap) = mode {
1078                        let bitmap_offset = (offset - extent_key.start) / block_size;
1079                        if !bitmap.get(bitmap_offset as usize).ok_or(FxfsError::Inconsistent)? {
1080                            // If this block isn't actually initialized, skip it.
1081                            break;
1082                        }
1083                    }
1084                    let mut align_buf =
1085                        self.store().device.allocate_buffer(block_size as usize).await;
1086                    if trace {
1087                        info!(
1088                            store_id = self.store().store_object_id(),
1089                            oid = self.object_id(),
1090                            device_range:? = (device_offset..device_offset + align_buf.len() as u64);
1091                            "RT",
1092                        );
1093                    }
1094                    self.read_and_decrypt(device_offset, offset, align_buf.as_mut(), key_id)
1095                        .await?;
1096                    buf.as_mut_slice().copy_from_slice(&align_buf.as_slice()[..end_align]);
1097                    buf = buf.subslice_mut(0..0);
1098                    break;
1099                }
1100            } else if extent_key.end >= offset + buf.len() as u64 {
1101                // Deleted extent covers remainder, so we're done.
1102                break;
1103            }
1104
1105            iter.advance().await?;
1106        }
1107        reads.try_collect::<()>().await?;
1108        buf.as_mut_slice().fill(0);
1109        Ok(())
1110    }
1111
1112    /// Reads an entire attribute.
1113    pub async fn read_attr(&self, attribute_id: AttributeId) -> Result<Option<Box<[u8]>>, Error> {
1114        let store = self.store();
1115        let tree = &store.tree;
1116        let layer_set = tree.layer_set();
1117        let mut merger = layer_set.merger();
1118        let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1119        let iter = merger.query(Query::FullRange(&key)).await?;
1120        match iter.get() {
1121            Some(item) if item.key == &key => match item.value {
1122                ObjectValue::Attribute { .. } => Ok(Some(self.read_attr_from_iter(iter).await?)),
1123                // Attribute was deleted.
1124                ObjectValue::None => Ok(None),
1125                _ => Err(FxfsError::Inconsistent.into()),
1126            },
1127            _ => Ok(None),
1128        }
1129    }
1130
1131    /// Reads an entire attribute pointed to by `iter`. `iter` must be pointing to the
1132    /// `AttributeKey::Attribute` of the attribute.
1133    pub async fn read_attr_from_iter(
1134        &self,
1135        mut iter: MergerIterator<'_, '_, ObjectKey, ObjectValue>,
1136    ) -> Result<Box<[u8]>, Error> {
1137        let (mut buffer, size, attribute_id) = match iter.get() {
1138            Some(ItemRef {
1139                key:
1140                    ObjectKey {
1141                        object_id,
1142                        data: ObjectKeyData::Attribute(attribute_id, AttributeKey::Attribute),
1143                    },
1144                value: ObjectValue::Attribute { size, .. },
1145                ..
1146            }) if *object_id == self.object_id => {
1147                // TODO(https://fxbug.dev/42073113): size > max buffer size
1148                (
1149                    self.store()
1150                        .device
1151                        .allocate_buffer(round_up(*size, self.block_size()).unwrap() as usize)
1152                        .await,
1153                    *size as usize,
1154                    *attribute_id,
1155                )
1156            }
1157            _ => bail!(FxfsError::InvalidArgs),
1158        };
1159
1160        self.store().logical_read_ops.fetch_add(1, Ordering::Relaxed);
1161        let mut last_offset = 0;
1162        loop {
1163            iter.advance().await?;
1164            match iter.get() {
1165                Some(ItemRef {
1166                    key:
1167                        ObjectKey {
1168                            object_id,
1169                            data:
1170                                ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
1171                        },
1172                    value: ObjectValue::Extent(extent_value),
1173                    ..
1174                }) if *object_id == self.object_id() && *attr_id == attribute_id => {
1175                    if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1176                        let offset = extent_key.start as usize;
1177                        buffer.as_mut_slice()[last_offset..offset].fill(0);
1178                        let end = std::cmp::min(extent_key.end as usize, buffer.len());
1179                        let maybe_bitmap = match mode {
1180                            ExtentMode::OverwritePartial(bitmap) => {
1181                                // The caller has to adjust the bitmap if necessary, but we always
1182                                // start from the beginning of any extent, so we only truncate.
1183                                let mut read_bitmap = bitmap.clone();
1184                                read_bitmap.truncate(
1185                                    (end - extent_key.start as usize) / self.block_size() as usize,
1186                                );
1187                                Some(read_bitmap)
1188                            }
1189                            _ => None,
1190                        };
1191                        self.read_and_decrypt(
1192                            *device_offset,
1193                            extent_key.start,
1194                            buffer.subslice_mut(offset..end as usize),
1195                            *key_id,
1196                        )
1197                        .await?;
1198                        if let Some(bitmap) = maybe_bitmap {
1199                            apply_bitmap_zeroing(
1200                                self.block_size() as usize,
1201                                &bitmap,
1202                                buffer.subslice_mut(offset..end as usize),
1203                            );
1204                        }
1205                        last_offset = end;
1206                        if last_offset >= size {
1207                            break;
1208                        }
1209                    }
1210                }
1211                _ => break,
1212            }
1213        }
1214        buffer.as_mut_slice()[std::cmp::min(last_offset, size)..].fill(0);
1215        Ok(buffer.as_slice()[..size].into())
1216    }
1217
1218    /// Writes potentially unaligned data at `device_offset` and returns checksums if requested.
1219    /// The data will be encrypted if necessary.  `buf` is mutable as an optimization, since the
1220    /// write may require encryption, we can encrypt the buffer in-place rather than copying to
1221    /// another buffer if the write is already aligned.
1222    ///
1223    /// NOTE: This will not create keys if they are missing (it will fail with an error if that
1224    /// happens to be the case).
1225    pub async fn write_at(
1226        &self,
1227        attribute_id: AttributeId,
1228        offset: u64,
1229        buf: MutableBufferRef<'_>,
1230        key_id: Option<u64>,
1231        mut device_offset: u64,
1232    ) -> Result<MaybeChecksums, Error> {
1233        let mut transfer_buf;
1234        let block_size = self.block_size();
1235        let (range, mut transfer_buf_ref) =
1236            if offset % block_size == 0 && buf.len() as u64 % block_size == 0 {
1237                (offset..offset + buf.len() as u64, buf)
1238            } else {
1239                let (range, buf) = self.align_buffer(attribute_id, offset, buf.as_ref()).await?;
1240                transfer_buf = buf;
1241                device_offset -= offset - range.start;
1242                (range, transfer_buf.as_mut())
1243            };
1244
1245        let mut crypt_ctx = None;
1246        if let (_, Some(key)) = self.get_key(key_id).await? {
1247            if let Some(ctx) = key.crypt_ctx(self.object_id, range.start) {
1248                crypt_ctx = Some(ctx);
1249            } else {
1250                key.encrypt(
1251                    self.object_id,
1252                    device_offset,
1253                    range.start,
1254                    transfer_buf_ref.as_mut_slice(),
1255                )?;
1256            }
1257        }
1258        self.write_aligned(transfer_buf_ref.as_ref(), device_offset, crypt_ctx).await
1259    }
1260
1261    /// Writes to multiple ranges with data provided in `buf`. This function is specifically
1262    /// designed for migration purposes, allowing raw writes to the device without updating
1263    /// object metadata like allocated size or mtime. It's essential for scenarios where
1264    /// data needs to be transferred directly without triggering standard filesystem operations.
1265    #[cfg(feature = "migration")]
1266    pub async fn raw_multi_write(
1267        &self,
1268        transaction: &mut Transaction<'_>,
1269        attribute_id: AttributeId,
1270        key_id: Option<u64>,
1271        ranges: &[Range<u64>],
1272        buf: MutableBufferRef<'_>,
1273    ) -> Result<(), Error> {
1274        self.multi_write_internal(transaction, attribute_id, key_id, ranges, buf).await?;
1275        Ok(())
1276    }
1277
1278    /// This is a low-level write function that writes to multiple ranges. Users should generally
1279    /// use `multi_write` instead of this function as this does not update the object's allocated
1280    /// size, mtime, atime, etc.
1281    ///
1282    /// Returns (allocated, deallocated) bytes on success.
1283    async fn multi_write_internal(
1284        &self,
1285        transaction: &mut Transaction<'_>,
1286        attribute_id: AttributeId,
1287        key_id: Option<u64>,
1288        ranges: &[Range<u64>],
1289        mut buf: MutableBufferRef<'_>,
1290    ) -> Result<(u64, u64), Error> {
1291        if buf.is_empty() {
1292            return Ok((0, 0));
1293        }
1294        let block_size = self.block_size();
1295        let store = self.store();
1296        let store_id = store.store_object_id();
1297
1298        // The only key we allow to be created on-the-fly is a non permanent key wrapped with the
1299        // volume data key.
1300        let (key_id, key) = if key_id == Some(VOLUME_DATA_KEY_ID)
1301            && matches!(self.encryption, Encryption::CachedKeys)
1302        {
1303            (
1304                VOLUME_DATA_KEY_ID,
1305                Some(
1306                    self.get_or_create_key(transaction)
1307                        .await
1308                        .context("get_or_create_key failed")?,
1309                ),
1310            )
1311        } else {
1312            self.get_key(key_id).await?
1313        };
1314        if let Some(key) = &key {
1315            if !key.supports_inline_encryption() {
1316                let mut slice = buf.as_mut_slice();
1317                for r in ranges {
1318                    let l = r.end - r.start;
1319                    let (head, tail) = slice.split_at_mut(l as usize);
1320                    key.encrypt(
1321                        self.object_id,
1322                        0, /* TODO(https://fxbug.dev/421269588): plumb through device_offset. */
1323                        r.start,
1324                        head,
1325                    )?;
1326                    slice = tail;
1327                }
1328            }
1329        }
1330
1331        let mut allocated = 0;
1332        let allocator = store.allocator();
1333        let trace = self.trace();
1334        let mut writes = FuturesOrdered::new();
1335
1336        let mut logical_ranges = ranges.iter();
1337        let mut current_range = logical_ranges.next().unwrap().clone();
1338
1339        while !buf.is_empty() {
1340            let mut device_range = allocator
1341                .allocate(transaction, store_id, buf.len() as u64)
1342                .await
1343                .context("allocation failed")?;
1344            if trace {
1345                info!(
1346                    store_id,
1347                    oid = self.object_id(),
1348                    device_range:?,
1349                    len = device_range.end - device_range.start;
1350                    "A",
1351                );
1352            }
1353            let mut device_range_len = device_range.end - device_range.start;
1354            allocated += device_range_len;
1355            // If inline encryption is NOT supported, this loop should only happen once.
1356            while device_range_len > 0 {
1357                if current_range.end <= current_range.start {
1358                    current_range = logical_ranges.next().unwrap().clone();
1359                }
1360                let (crypt_ctx, split) = if let Some(key) = &key {
1361                    if key.supports_inline_encryption() {
1362                        let split = std::cmp::min(
1363                            current_range.end - current_range.start,
1364                            device_range_len,
1365                        );
1366                        let crypt_ctx = key.crypt_ctx(self.object_id, current_range.start);
1367                        current_range.start += split;
1368                        (crypt_ctx, split)
1369                    } else {
1370                        (None, device_range_len)
1371                    }
1372                } else {
1373                    (None, device_range_len)
1374                };
1375
1376                let (head, tail) = buf.split_at_mut(split as usize);
1377                buf = tail;
1378
1379                writes.push_back(async move {
1380                    let len = head.len() as u64;
1381                    Result::<_, Error>::Ok((
1382                        device_range.start,
1383                        len,
1384                        self.write_aligned(head.as_ref(), device_range.start, crypt_ctx).await?,
1385                    ))
1386                });
1387                device_range.start += split;
1388                device_range_len = device_range.end - device_range.start;
1389            }
1390        }
1391
1392        self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1393        let ((mutations, checksums), deallocated) = try_join!(
1394            async {
1395                let mut current_range = 0..0;
1396                let mut mutations = Vec::new();
1397                let mut out_checksums = Vec::new();
1398                let mut ranges = ranges.iter();
1399                while let Some((mut device_offset, mut len, mut checksums)) =
1400                    writes.try_next().await?
1401                {
1402                    while len > 0 {
1403                        if current_range.end <= current_range.start {
1404                            current_range = ranges.next().unwrap().clone();
1405                        }
1406                        let chunk_len = std::cmp::min(len, current_range.end - current_range.start);
1407                        let tail = checksums.split_off((chunk_len / block_size) as usize);
1408                        if let Some(checksums) = checksums.maybe_as_ref() {
1409                            out_checksums.push((
1410                                device_offset..device_offset + chunk_len,
1411                                checksums.to_owned(),
1412                            ));
1413                        }
1414                        mutations.push(Mutation::merge_object(
1415                            ObjectKey::extent(
1416                                self.object_id(),
1417                                attribute_id,
1418                                current_range.start..current_range.start + chunk_len,
1419                            ),
1420                            ObjectValue::Extent(ExtentValue::new(
1421                                device_offset,
1422                                checksums.to_mode(),
1423                                key_id,
1424                            )),
1425                        ));
1426                        checksums = tail;
1427                        device_offset += chunk_len;
1428                        len -= chunk_len;
1429                        current_range.start += chunk_len;
1430                    }
1431                }
1432                Result::<_, Error>::Ok((mutations, out_checksums))
1433            },
1434            async {
1435                let mut deallocated = 0;
1436                for r in ranges {
1437                    deallocated +=
1438                        self.deallocate_old_extents(transaction, attribute_id, r.clone()).await?;
1439                }
1440                Result::<_, Error>::Ok(deallocated)
1441            }
1442        )?;
1443
1444        for m in mutations {
1445            transaction.add(store_id, m);
1446        }
1447
1448        // Only store checksums in the journal if barriers are not enabled.
1449        if !store.filesystem().options().barriers_enabled {
1450            for (r, c) in checksums {
1451                transaction.add_checksum(r, c, true);
1452            }
1453        }
1454        Ok((allocated, deallocated))
1455    }
1456
1457    /// Writes to multiple ranges with data provided in `buf`.  The buffer can be modified in place
1458    /// if encryption takes place.  The ranges must all be aligned and no change to content size is
1459    /// applied; the caller is responsible for updating size if required.  If `key_id` is None, it
1460    /// means pick the default key for the object which is the fscrypt key if present, or the volume
1461    /// data key, or no key if it's an unencrypted file.
1462    pub async fn multi_write(
1463        &self,
1464        transaction: &mut Transaction<'_>,
1465        attribute_id: AttributeId,
1466        key_id: Option<u64>,
1467        ranges: &[Range<u64>],
1468        buf: MutableBufferRef<'_>,
1469    ) -> Result<(), Error> {
1470        let (allocated, deallocated) =
1471            self.multi_write_internal(transaction, attribute_id, key_id, ranges, buf).await?;
1472        if allocated == 0 && deallocated == 0 {
1473            return Ok(());
1474        }
1475        self.update_allocated_size(transaction, allocated, deallocated).await
1476    }
1477
1478    /// Write data to overwrite extents with the provided set of ranges. This makes a strong
1479    /// assumption that the ranges are actually going to be already allocated overwrite extents and
1480    /// will error out or do something wrong if they aren't. It also assumes the ranges passed to
1481    /// it are sorted.
1482    pub async fn multi_overwrite<'a>(
1483        &'a self,
1484        transaction: &mut Transaction<'a>,
1485        attr_id: AttributeId,
1486        ranges: &[Range<u64>],
1487        mut buf: MutableBufferRef<'_>,
1488    ) -> Result<(), Error> {
1489        if buf.is_empty() {
1490            return Ok(());
1491        }
1492        let block_size = self.block_size();
1493        let store = self.store();
1494        let tree = store.tree();
1495        let store_id = store.store_object_id();
1496
1497        let (key_id, key) = self.get_key(None).await?;
1498        if let Some(key) = &key {
1499            if !key.supports_inline_encryption() {
1500                let mut slice = buf.as_mut_slice();
1501                for r in ranges {
1502                    let l = r.end - r.start;
1503                    let (head, tail) = slice.split_at_mut(l as usize);
1504                    key.encrypt(
1505                        self.object_id,
1506                        0, /* TODO(https://fxbug.dev/421269588): plumb through device_offset. */
1507                        r.start,
1508                        head,
1509                    )?;
1510                    slice = tail;
1511                }
1512            }
1513        }
1514
1515        let mut range_iter = ranges.iter();
1516        // There should be at least one range if the buffer has data in it
1517        let mut target_range = range_iter.next().unwrap().clone();
1518        let mut mutations = Vec::new();
1519        let writes = FuturesUnordered::new();
1520
1521        let layer_set = tree.layer_set();
1522        let mut merger = layer_set.merger();
1523        let mut iter = merger
1524            .query(Query::FullRange(&ObjectKey::attribute(
1525                self.object_id(),
1526                attr_id,
1527                AttributeKey::Extent(Extent::search_key_from_offset(target_range.start)),
1528            )))
1529            .await?;
1530
1531        loop {
1532            match iter.get() {
1533                Some(ItemRef {
1534                    key:
1535                        ObjectKey {
1536                            object_id,
1537                            data:
1538                                ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent)),
1539                        },
1540                    value: ObjectValue::Extent(extent_value),
1541                    ..
1542                }) if *object_id == self.object_id() && *attribute_id == attr_id => {
1543                    // If this extent ends before the target range starts (not possible on the
1544                    // first loop because of the query parameters but possible on further loops),
1545                    // advance until we find a the next one we care about.
1546                    if extent.end <= target_range.start {
1547                        iter.advance().await?;
1548                        continue;
1549                    }
1550                    let (device_offset, mode) = match extent_value {
1551                        ExtentValue::None => {
1552                            return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1553                                format!(
1554                                    "multi_overwrite failed: target_range ({}, {}) overlaps with \
1555                                deleted extent found at ({}, {})",
1556                                    target_range.start, target_range.end, extent.start, extent.end,
1557                                )
1558                            });
1559                        }
1560                        ExtentValue::Some { device_offset, mode, .. } => (device_offset, mode),
1561                    };
1562                    // The ranges passed to this function should already by allocated, so
1563                    // extent records should exist for them.
1564                    if extent.start > target_range.start {
1565                        return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1566                            format!(
1567                                "multi_overwrite failed: target range ({}, {}) starts before first \
1568                            extent found at ({}, {})",
1569                                target_range.start, target_range.end, extent.start, extent.end,
1570                            )
1571                        });
1572                    }
1573                    let mut bitmap = match mode {
1574                        ExtentMode::Raw | ExtentMode::Cow(_) => {
1575                            return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1576                                format!(
1577                                    "multi_overwrite failed: \
1578                            extent from ({}, {}) which overlaps target range ({}, {}) had the \
1579                            wrong extent mode",
1580                                    extent.start, extent.end, target_range.start, target_range.end,
1581                                )
1582                            });
1583                        }
1584                        ExtentMode::OverwritePartial(bitmap) => {
1585                            OverwriteBitmaps::new(bitmap.clone())
1586                        }
1587                        ExtentMode::Overwrite => OverwriteBitmaps::None,
1588                    };
1589                    loop {
1590                        let offset_within_extent = target_range.start - extent.start;
1591                        let bitmap_offset = offset_within_extent / block_size;
1592                        let write_device_offset = *device_offset + offset_within_extent;
1593                        let write_end = min(extent.end, target_range.end);
1594                        let write_len = write_end - target_range.start;
1595                        let write_device_range =
1596                            write_device_offset..write_device_offset + write_len;
1597                        let (current_buf, remaining_buf) = buf.split_at_mut(write_len as usize);
1598
1599                        bitmap.set_offset(bitmap_offset as usize);
1600                        let checksum_ranges = ChecksumRangeChunk::group_first_write_ranges(
1601                            &mut bitmap,
1602                            block_size,
1603                            write_device_range,
1604                        );
1605
1606                        let crypt_ctx = if let Some(key) = &key {
1607                            key.crypt_ctx(self.object_id, target_range.start)
1608                        } else {
1609                            None
1610                        };
1611
1612                        writes.push(async move {
1613                            let maybe_checksums = self
1614                                .write_aligned(current_buf.as_ref(), write_device_offset, crypt_ctx)
1615                                .await?;
1616                            Ok::<_, Error>(match maybe_checksums {
1617                                MaybeChecksums::None => Vec::new(),
1618                                MaybeChecksums::Fletcher(checksums) => checksum_ranges
1619                                    .into_iter()
1620                                    .map(
1621                                        |ChecksumRangeChunk {
1622                                             checksum_range,
1623                                             device_range,
1624                                             is_first_write,
1625                                         }| {
1626                                            (
1627                                                device_range,
1628                                                checksums[checksum_range].to_vec(),
1629                                                is_first_write,
1630                                            )
1631                                        },
1632                                    )
1633                                    .collect(),
1634                            })
1635                        });
1636                        buf = remaining_buf;
1637                        target_range.start += write_len;
1638                        if target_range.start == target_range.end {
1639                            match range_iter.next() {
1640                                None => break,
1641                                Some(next_range) => target_range = next_range.clone(),
1642                            }
1643                        }
1644                        if extent.end <= target_range.start {
1645                            break;
1646                        }
1647                    }
1648                    if let Some((mut bitmap, write_bitmap)) = bitmap.take_bitmaps() {
1649                        if bitmap.or(&write_bitmap) {
1650                            let mode = if bitmap.all() {
1651                                ExtentMode::Overwrite
1652                            } else {
1653                                ExtentMode::OverwritePartial(bitmap)
1654                            };
1655                            mutations.push(Mutation::merge_object(
1656                                ObjectKey::extent(self.object_id(), attr_id, extent.clone().into()),
1657                                ObjectValue::Extent(ExtentValue::new(*device_offset, mode, key_id)),
1658                            ))
1659                        }
1660                    }
1661                    if target_range.start == target_range.end {
1662                        break;
1663                    }
1664                    iter.advance().await?;
1665                }
1666                // We've either run past the end of the existing extents or something is wrong with
1667                // the tree. The main section should break if it finishes the ranges, so either
1668                // case, this is an error.
1669                _ => bail!(anyhow!(FxfsError::Internal).context(
1670                    "found a non-extent object record while there were still ranges to process"
1671                )),
1672            }
1673        }
1674
1675        let checksums = writes.try_collect::<Vec<_>>().await?;
1676        // Only store checksums in the journal if barriers are not enabled.
1677        if !store.filesystem().options().barriers_enabled {
1678            for (r, c, first_write) in checksums.into_iter().flatten() {
1679                transaction.add_checksum(r, c, first_write);
1680            }
1681        }
1682
1683        for m in mutations {
1684            transaction.add(store_id, m);
1685        }
1686
1687        Ok(())
1688    }
1689
1690    /// Writes an attribute that should not already exist and therefore does not require trimming.
1691    /// Breaks up the write into multiple transactions if `data.len()` is larger than `batch_size`.
1692    /// If writing the attribute requires multiple transactions, adds the attribute to the
1693    /// graveyard. The caller is responsible for removing the attribute from the graveyard when it
1694    /// commits the last transaction.  This always writes using a key wrapped with the volume data
1695    /// key.
1696    #[trace]
1697    pub async fn write_new_attr_in_batches<'a>(
1698        &'a self,
1699        transaction: &mut Transaction<'a>,
1700        attribute_id: AttributeId,
1701        data: &[u8],
1702        batch_size: usize,
1703    ) -> Result<(), Error> {
1704        transaction.add(
1705            self.store().store_object_id,
1706            Mutation::replace_or_insert_object(
1707                ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1708                ObjectValue::attribute(data.len() as u64, false),
1709            ),
1710        );
1711        let chunks = data.chunks(batch_size);
1712        let num_chunks = chunks.len();
1713        if num_chunks > 1 {
1714            transaction.add(
1715                self.store().store_object_id,
1716                Mutation::replace_or_insert_object(
1717                    ObjectKey::graveyard_attribute_entry(
1718                        self.store().graveyard_directory_object_id(),
1719                        self.object_id(),
1720                        attribute_id,
1721                    ),
1722                    ObjectValue::Some,
1723                ),
1724            );
1725        }
1726        let mut start_offset = 0;
1727        for (i, chunk) in chunks.enumerate() {
1728            let rounded_len = round_up(chunk.len() as u64, self.block_size()).unwrap();
1729            let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1730            let slice = buffer.as_mut_slice();
1731            slice[..chunk.len()].copy_from_slice(chunk);
1732            slice[chunk.len()..].fill(0);
1733            self.multi_write(
1734                transaction,
1735                attribute_id,
1736                Some(VOLUME_DATA_KEY_ID),
1737                &[start_offset..start_offset + rounded_len],
1738                buffer.as_mut(),
1739            )
1740            .await?;
1741            start_offset += rounded_len;
1742            // Do not commit the last chunk.
1743            if i < num_chunks - 1 {
1744                transaction.commit_and_continue().await?;
1745            }
1746        }
1747        Ok(())
1748    }
1749
1750    /// Writes an entire attribute. Returns whether or not the attribute needs to continue being
1751    /// trimmed - if the new data is shorter than the old data, this will trim any extents beyond
1752    /// the end of the new size, but if there were too many for a single transaction, a commit
1753    /// needs to be made before trimming again, so the responsibility is left to the caller so as
1754    /// to not accidentally split the transaction when it's not in a consistent state.  This will
1755    /// write using the volume data key; the fscrypt key is not supported.
1756    pub async fn write_attr(
1757        &self,
1758        transaction: &mut Transaction<'_>,
1759        attribute_id: AttributeId,
1760        data: &[u8],
1761    ) -> Result<NeedsTrim, Error> {
1762        let rounded_len = round_up(data.len() as u64, self.block_size()).unwrap();
1763        let store = self.store();
1764        let tree = store.tree();
1765        let should_trim = if let Some(item) = tree
1766            .find(&ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute))
1767            .await?
1768        {
1769            match item.value {
1770                ObjectValue::Attribute { size: _, has_overwrite_extents: true } => {
1771                    bail!(
1772                        anyhow!(FxfsError::Inconsistent)
1773                            .context("write_attr on an attribute with overwrite extents")
1774                    )
1775                }
1776                ObjectValue::Attribute { size, .. } => (data.len() as u64) < size,
1777                _ => bail!(FxfsError::Inconsistent),
1778            }
1779        } else {
1780            false
1781        };
1782        let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1783        let slice = buffer.as_mut_slice();
1784        slice[..data.len()].copy_from_slice(data);
1785        slice[data.len()..].fill(0);
1786        self.multi_write(
1787            transaction,
1788            attribute_id,
1789            Some(VOLUME_DATA_KEY_ID),
1790            &[0..rounded_len],
1791            buffer.as_mut(),
1792        )
1793        .await?;
1794        transaction.add(
1795            self.store().store_object_id,
1796            Mutation::replace_or_insert_object(
1797                ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1798                ObjectValue::attribute(data.len() as u64, false),
1799            ),
1800        );
1801        if should_trim {
1802            self.shrink(transaction, attribute_id, data.len() as u64).await
1803        } else {
1804            Ok(NeedsTrim(false))
1805        }
1806    }
1807
1808    pub async fn list_extended_attributes(&self) -> Result<Vec<Vec<u8>>, Error> {
1809        let layer_set = self.store().tree().layer_set();
1810        let mut merger = layer_set.merger();
1811        // Seek to the first extended attribute key for this object.
1812        let mut iter = merger
1813            .query(Query::FullRange(&ObjectKey::extended_attribute(self.object_id(), Vec::new())))
1814            .await?;
1815        let mut out = Vec::new();
1816        while let Some(item) = iter.get() {
1817            // Skip deleted extended attributes.
1818            if item.value != &ObjectValue::None {
1819                match item.key {
1820                    ObjectKey { object_id, data: ObjectKeyData::ExtendedAttribute { name } }
1821                        if *object_id == self.object_id() =>
1822                    {
1823                        out.push(name.clone());
1824                    }
1825                    // Once we hit a record belonging to another object, or one that is not an
1826                    // extended attribute key, we have reached the end of this object's extended
1827                    // attributes. Subsequent objects' records will start with lower variants
1828                    // (e.g. ObjectKeyData::Object) which trigger this break.
1829                    _ => break,
1830                }
1831            }
1832            iter.advance().await?;
1833        }
1834        Ok(out)
1835    }
1836
1837    /// Looks up the values for the extended attribute `fio::SELINUX_CONTEXT_NAME`, returning it
1838    /// if it is found inline. If it is not inline, it will request use of the
1839    /// `get_extended_attributes` method. If the entry doesn't exist at all, returns None.
1840    pub async fn get_inline_selinux_context(&self) -> Result<Option<fio::SelinuxContext>, Error> {
1841        // This optimization is only useful as long as the attribute is smaller than inline sizes.
1842        // Avoid reading the data out of the attributes.
1843        const_assert!(fio::MAX_SELINUX_CONTEXT_ATTRIBUTE_LEN as usize <= MAX_INLINE_XATTR_SIZE);
1844        let item = match self
1845            .store()
1846            .tree()
1847            .find(&ObjectKey::extended_attribute(
1848                self.object_id(),
1849                fio::SELINUX_CONTEXT_NAME.into(),
1850            ))
1851            .await?
1852        {
1853            Some(item) => item,
1854            None => return Ok(None),
1855        };
1856        match item.value {
1857            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => {
1858                Ok(Some(fio::SelinuxContext::Data(value)))
1859            }
1860            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(_)) => {
1861                Ok(Some(fio::SelinuxContext::UseExtendedAttributes(fio::EmptyStruct {})))
1862            }
1863            _ => {
1864                bail!(
1865                    anyhow!(FxfsError::Inconsistent)
1866                        .context("get_inline_extended_attribute: Expected ExtendedAttribute value")
1867                )
1868            }
1869        }
1870    }
1871
1872    pub async fn get_extended_attribute(&self, name: Vec<u8>) -> Result<Vec<u8>, Error> {
1873        let item = self
1874            .store()
1875            .tree()
1876            .find(&ObjectKey::extended_attribute(self.object_id(), name))
1877            .await?
1878            .ok_or(FxfsError::NotFound)?;
1879        match item.value {
1880            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => Ok(value),
1881            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1882                Ok(self.read_attr(id).await?.ok_or(FxfsError::Inconsistent)?.into_vec())
1883            }
1884            _ => {
1885                bail!(
1886                    anyhow!(FxfsError::Inconsistent)
1887                        .context("get_extended_attribute: Expected ExtendedAttribute value")
1888                )
1889            }
1890        }
1891    }
1892
1893    pub async fn set_extended_attribute(
1894        &self,
1895        name: Vec<u8>,
1896        value: Vec<u8>,
1897        mode: SetExtendedAttributeMode,
1898    ) -> Result<(), Error> {
1899        let store = self.store();
1900        // NB: We need to take this lock before we potentially look up the value to prevent racing
1901        // with another set.
1902        let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
1903        let mut transaction = store.new_transaction(keys, Options::default()).await?;
1904        self.set_extended_attribute_impl(name, value, mode, &mut transaction).await?;
1905        transaction.commit().await?;
1906        Ok(())
1907    }
1908
1909    async fn set_extended_attribute_impl(
1910        &self,
1911        name: Vec<u8>,
1912        value: Vec<u8>,
1913        mode: SetExtendedAttributeMode,
1914        transaction: &mut Transaction<'_>,
1915    ) -> Result<(), Error> {
1916        ensure!(name.len() <= MAX_XATTR_NAME_SIZE, FxfsError::TooBig);
1917        ensure!(value.len() <= MAX_XATTR_VALUE_SIZE, FxfsError::TooBig);
1918        let tree = self.store().tree();
1919        let object_key = ObjectKey::extended_attribute(self.object_id(), name);
1920
1921        let existing_attribute_id = {
1922            let (found, existing_attribute_id) = match tree.find(&object_key).await? {
1923                None => (false, None),
1924                Some(Item { value, .. }) => (
1925                    true,
1926                    match value {
1927                        ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
1928                        ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1929                            Some(id)
1930                        }
1931                        _ => bail!(
1932                            anyhow!(FxfsError::Inconsistent)
1933                                .context("expected extended attribute value")
1934                        ),
1935                    },
1936                ),
1937            };
1938            match mode {
1939                SetExtendedAttributeMode::Create if found => {
1940                    bail!(FxfsError::AlreadyExists)
1941                }
1942                SetExtendedAttributeMode::Replace if !found => {
1943                    bail!(FxfsError::NotFound)
1944                }
1945                _ => (),
1946            }
1947            existing_attribute_id
1948        };
1949
1950        if let Some(attribute_id) = existing_attribute_id {
1951            // If we already have an attribute id allocated for this extended attribute, we always
1952            // use it, even if the value has shrunk enough to be stored inline. We don't need to
1953            // worry about trimming here for the same reason we don't need to worry about it when
1954            // we delete xattrs - they simply aren't large enough to ever need more than one
1955            // transaction.
1956            let _ = self.write_attr(transaction, attribute_id, &value).await?;
1957        } else if value.len() <= MAX_INLINE_XATTR_SIZE {
1958            transaction.add(
1959                self.store().store_object_id(),
1960                Mutation::replace_or_insert_object(
1961                    object_key,
1962                    ObjectValue::inline_extended_attribute(value),
1963                ),
1964            );
1965        } else {
1966            // If there isn't an existing attribute id and we are going to store the value in
1967            // an attribute, find the next empty attribute id in the range. We search for fxfs
1968            // attribute records specifically, instead of the extended attribute records, because
1969            // even if the extended attribute record is removed the attribute may not be fully
1970            // trimmed yet.
1971            let mut attribute_id = AttributeId::XATTR_RANGE_START;
1972            let layer_set = tree.layer_set();
1973            let mut merger = layer_set.merger();
1974            let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1975            let mut iter = merger.query(Query::FullRange(&key)).await?;
1976            loop {
1977                match iter.get() {
1978                    // None means the key passed to seek wasn't found. That means the first
1979                    // attribute is available and we can just stop right away.
1980                    None => break,
1981                    Some(ItemRef {
1982                        key: ObjectKey { object_id, data: ObjectKeyData::Attribute(attr_id, _) },
1983                        value,
1984                        ..
1985                    }) if *object_id == self.object_id() => {
1986                        if matches!(value, ObjectValue::None) {
1987                            // This attribute was once used but is now deleted, so it's safe to use
1988                            // again.
1989                            break;
1990                        }
1991                        if attribute_id < *attr_id {
1992                            // We found a gap - use it.
1993                            break;
1994                        } else if attribute_id == *attr_id {
1995                            // This attribute id is in use, try the next one.
1996                            attribute_id = attribute_id.next();
1997                            if attribute_id == AttributeId::XATTR_RANGE_END {
1998                                bail!(FxfsError::NoSpace);
1999                            }
2000                        }
2001                        // If we don't hit either of those cases, we are still moving through the
2002                        // extent keys for the current attribute, so just keep advancing until the
2003                        // attribute id changes.
2004                    }
2005                    // As we are working our way through the iterator, if we hit anything that
2006                    // doesn't have our object id or attribute key data, we've gone past the end of
2007                    // this section and can stop.
2008                    _ => break,
2009                }
2010                iter.advance().await?;
2011            }
2012
2013            // We know this won't need trimming because it's a new attribute.
2014            let _ = self.write_attr(transaction, attribute_id, &value).await?;
2015            transaction.add(
2016                self.store().store_object_id(),
2017                Mutation::replace_or_insert_object(
2018                    object_key,
2019                    ObjectValue::extended_attribute(attribute_id),
2020                ),
2021            );
2022        }
2023
2024        Ok(())
2025    }
2026
2027    pub async fn remove_extended_attribute(&self, name: Vec<u8>) -> Result<(), Error> {
2028        let store = self.store();
2029        let tree = store.tree();
2030        let object_key = ObjectKey::extended_attribute(self.object_id(), name);
2031
2032        // NB: The API says we have to return an error if the attribute doesn't exist, so we have
2033        // to look it up first to make sure we have a record of it before we delete it. Make sure
2034        // we take a lock and make a transaction before we do so we don't race with other
2035        // operations.
2036        let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
2037        let mut transaction = store.new_transaction(keys, Options::default()).await?;
2038
2039        let attribute_to_delete =
2040            match tree.find(&object_key).await?.ok_or(FxfsError::NotFound)?.value {
2041                ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => Some(id),
2042                ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
2043                _ => {
2044                    bail!(
2045                        anyhow!(FxfsError::Inconsistent)
2046                            .context("remove_extended_attribute: Expected ExtendedAttribute value")
2047                    )
2048                }
2049            };
2050
2051        transaction.add(
2052            store.store_object_id(),
2053            Mutation::replace_or_insert_object(object_key, ObjectValue::None),
2054        );
2055
2056        // If the attribute wasn't stored inline, we need to deallocate all the extents too. This
2057        // would normally need to interact with the graveyard for correctness - if there are too
2058        // many extents to delete to fit in a single transaction then we could potentially have
2059        // consistency issues. However, the maximum size of an extended attribute is small enough
2060        // that it will never come close to that limit even in the worst case, so we just delete
2061        // everything in one shot.
2062        if let Some(attribute_id) = attribute_to_delete {
2063            let trim_result = store
2064                .trim_some(
2065                    &mut transaction,
2066                    self.object_id(),
2067                    attribute_id,
2068                    TrimMode::FromOffset(0),
2069                )
2070                .await?;
2071            // In case you didn't read the comment above - this should not be used to delete
2072            // arbitrary attributes!
2073            assert_matches!(trim_result, TrimResult::Done(_));
2074            transaction.add(
2075                store.store_object_id(),
2076                Mutation::replace_or_insert_object(
2077                    ObjectKey::attribute(self.object_id, attribute_id, AttributeKey::Attribute),
2078                    ObjectValue::None,
2079                ),
2080            );
2081        }
2082
2083        transaction.commit().await?;
2084        Ok(())
2085    }
2086
2087    /// Returns a future that will pre-fetches the keys so as to avoid paying the performance
2088    /// penalty later. Must ensure that the object is not removed before the future completes.
2089    pub fn pre_fetch_keys(&self) -> Option<impl Future<Output = ()> + use<S>> {
2090        if let Encryption::CachedKeys = self.encryption {
2091            let owner = self.owner.clone();
2092            let object_id = self.object_id;
2093            Some(async move {
2094                let store = owner.as_ref().as_ref();
2095                if let Some(crypt) = store.crypt() {
2096                    let _ = store
2097                        .key_manager
2098                        .get_keys(
2099                            object_id,
2100                            crypt.as_ref(),
2101                            &mut Some(async || store.get_keys(object_id).await),
2102                            /* permanent= */ false,
2103                            /* force= */ false,
2104                        )
2105                        .await;
2106                }
2107            })
2108        } else {
2109            None
2110        }
2111    }
2112}
2113
2114impl<S: HandleOwner> Drop for StoreObjectHandle<S> {
2115    fn drop(&mut self) {
2116        if self.is_encrypted() {
2117            let _ = self.store().key_manager.remove(self.object_id);
2118        }
2119    }
2120}
2121
2122/// When truncating an object, sometimes it might not be possible to complete the transaction in a
2123/// single transaction, in which case the caller needs to finish trimming the object in subsequent
2124/// transactions (by calling ObjectStore::trim).
2125#[must_use]
2126pub struct NeedsTrim(pub bool);
2127
2128#[cfg(test)]
2129mod tests {
2130    use super::{ChecksumRangeChunk, OverwriteBitmaps};
2131    use crate::errors::FxfsError;
2132    use crate::filesystem::{FxFilesystem, OpenFxFilesystem};
2133    use crate::object_handle::ObjectHandle;
2134    use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
2135    use crate::object_store::transaction::{Mutation, Options, lock_keys};
2136    use crate::object_store::{
2137        AttributeId, AttributeKey, DataObjectHandle, Directory, HandleOptions, LockKey, ObjectKey,
2138        ObjectStore, ObjectValue, SetExtendedAttributeMode, StoreObjectHandle,
2139    };
2140    use bit_vec::BitVec;
2141    use fuchsia_async as fasync;
2142    use futures::join;
2143    use std::sync::Arc;
2144    use storage_device::DeviceHolder;
2145    use storage_device::fake_device::FakeDevice;
2146
2147    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2148    const TEST_OBJECT_NAME: &str = "foo";
2149
2150    fn is_error(actual: anyhow::Error, expected: FxfsError) {
2151        assert_eq!(*actual.root_cause().downcast_ref::<FxfsError>().unwrap(), expected)
2152    }
2153
2154    async fn test_filesystem() -> OpenFxFilesystem {
2155        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2156        FxFilesystem::new_empty(device).await.expect("new_empty failed")
2157    }
2158
2159    async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
2160    {
2161        let fs = test_filesystem().await;
2162        let store = fs.root_store();
2163
2164        let mut transaction = fs
2165            .root_store()
2166            .new_transaction(
2167                lock_keys![LockKey::object(
2168                    store.store_object_id(),
2169                    store.root_directory_object_id()
2170                )],
2171                Options::default(),
2172            )
2173            .await
2174            .expect("new_transaction failed");
2175
2176        let object =
2177            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2178                .await
2179                .expect("create_object failed");
2180
2181        let root_directory =
2182            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2183        root_directory
2184            .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
2185            .await
2186            .expect("add_child_file failed");
2187
2188        transaction.commit().await.expect("commit failed");
2189
2190        (fs, object)
2191    }
2192
2193    #[fuchsia::test(threads = 3)]
2194    async fn extended_attribute_double_remove() {
2195        // This test is intended to trip a potential race condition in remove. Removing an
2196        // attribute that doesn't exist is an error, so we need to check before we remove, but if
2197        // we aren't careful, two parallel removes might both succeed in the check and then both
2198        // remove the value.
2199        let (fs, object) = test_filesystem_and_empty_object().await;
2200        let basic = Arc::new(StoreObjectHandle::new(
2201            object.owner().clone(),
2202            object.object_id(),
2203            /* permanent_keys: */ false,
2204            HandleOptions::default(),
2205            false,
2206        ));
2207        let basic_a = basic.clone();
2208        let basic_b = basic.clone();
2209
2210        basic
2211            .set_extended_attribute(
2212                b"security.selinux".to_vec(),
2213                b"bar".to_vec(),
2214                SetExtendedAttributeMode::Set,
2215            )
2216            .await
2217            .expect("failed to set attribute");
2218
2219        // Try to remove the attribute twice at the same time. One should succeed in the race and
2220        // return Ok, and the other should fail the race and return NOT_FOUND.
2221        let a_task = fasync::Task::spawn(async move {
2222            basic_a.remove_extended_attribute(b"security.selinux".to_vec()).await
2223        });
2224        let b_task = fasync::Task::spawn(async move {
2225            basic_b.remove_extended_attribute(b"security.selinux".to_vec()).await
2226        });
2227        match join!(a_task, b_task) {
2228            (Ok(()), Ok(())) => panic!("both remove calls succeeded"),
2229            (Err(_), Err(_)) => panic!("both remove calls failed"),
2230
2231            (Ok(()), Err(e)) => is_error(e, FxfsError::NotFound),
2232            (Err(e), Ok(())) => is_error(e, FxfsError::NotFound),
2233        }
2234
2235        fs.close().await.expect("Close failed");
2236    }
2237
2238    #[fuchsia::test(threads = 3)]
2239    async fn extended_attribute_double_create() {
2240        // This test is intended to trip a potential race in set when using the create flag,
2241        // similar to above. If the create mode is set, we need to check that the attribute isn't
2242        // already created, but if two parallel creates both succeed in that check, and we aren't
2243        // careful with locking, they will both succeed and one will overwrite the other.
2244        let (fs, object) = test_filesystem_and_empty_object().await;
2245        let basic = Arc::new(StoreObjectHandle::new(
2246            object.owner().clone(),
2247            object.object_id(),
2248            /* permanent_keys: */ false,
2249            HandleOptions::default(),
2250            false,
2251        ));
2252        let basic_a = basic.clone();
2253        let basic_b = basic.clone();
2254
2255        // Try to set the attribute twice at the same time. One should succeed in the race and
2256        // return Ok, and the other should fail the race and return ALREADY_EXISTS.
2257        let a_task = fasync::Task::spawn(async move {
2258            basic_a
2259                .set_extended_attribute(
2260                    b"security.selinux".to_vec(),
2261                    b"one".to_vec(),
2262                    SetExtendedAttributeMode::Create,
2263                )
2264                .await
2265        });
2266        let b_task = fasync::Task::spawn(async move {
2267            basic_b
2268                .set_extended_attribute(
2269                    b"security.selinux".to_vec(),
2270                    b"two".to_vec(),
2271                    SetExtendedAttributeMode::Create,
2272                )
2273                .await
2274        });
2275        match join!(a_task, b_task) {
2276            (Ok(()), Ok(())) => panic!("both set calls succeeded"),
2277            (Err(_), Err(_)) => panic!("both set calls failed"),
2278
2279            (Ok(()), Err(e)) => {
2280                assert_eq!(
2281                    basic
2282                        .get_extended_attribute(b"security.selinux".to_vec())
2283                        .await
2284                        .expect("failed to get xattr"),
2285                    b"one"
2286                );
2287                is_error(e, FxfsError::AlreadyExists);
2288            }
2289            (Err(e), Ok(())) => {
2290                assert_eq!(
2291                    basic
2292                        .get_extended_attribute(b"security.selinux".to_vec())
2293                        .await
2294                        .expect("failed to get xattr"),
2295                    b"two"
2296                );
2297                is_error(e, FxfsError::AlreadyExists);
2298            }
2299        }
2300
2301        fs.close().await.expect("Close failed");
2302    }
2303
2304    struct TestAttr {
2305        name: Vec<u8>,
2306        value: Vec<u8>,
2307    }
2308
2309    impl TestAttr {
2310        fn new(name: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> Self {
2311            Self { name: name.as_ref().to_vec(), value: value.as_ref().to_vec() }
2312        }
2313        fn name(&self) -> Vec<u8> {
2314            self.name.clone()
2315        }
2316        fn value(&self) -> Vec<u8> {
2317            self.value.clone()
2318        }
2319    }
2320
2321    #[fuchsia::test]
2322    async fn extended_attributes() {
2323        let (fs, object) = test_filesystem_and_empty_object().await;
2324
2325        let test_attr = TestAttr::new(b"security.selinux", b"foo");
2326
2327        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2328        is_error(
2329            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2330            FxfsError::NotFound,
2331        );
2332
2333        object
2334            .set_extended_attribute(
2335                test_attr.name(),
2336                test_attr.value(),
2337                SetExtendedAttributeMode::Set,
2338            )
2339            .await
2340            .unwrap();
2341        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2342        assert_eq!(
2343            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2344            test_attr.value()
2345        );
2346
2347        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2348        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2349        is_error(
2350            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2351            FxfsError::NotFound,
2352        );
2353
2354        // Make sure we can object the same attribute being set again.
2355        object
2356            .set_extended_attribute(
2357                test_attr.name(),
2358                test_attr.value(),
2359                SetExtendedAttributeMode::Set,
2360            )
2361            .await
2362            .unwrap();
2363        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2364        assert_eq!(
2365            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2366            test_attr.value()
2367        );
2368
2369        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2370        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2371        is_error(
2372            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2373            FxfsError::NotFound,
2374        );
2375
2376        fs.close().await.expect("close failed");
2377    }
2378
2379    #[fuchsia::test]
2380    async fn large_extended_attribute() {
2381        let (fs, object) = test_filesystem_and_empty_object().await;
2382
2383        let test_attr = TestAttr::new(b"security.selinux", vec![3u8; 300]);
2384
2385        object
2386            .set_extended_attribute(
2387                test_attr.name(),
2388                test_attr.value(),
2389                SetExtendedAttributeMode::Set,
2390            )
2391            .await
2392            .unwrap();
2393        assert_eq!(
2394            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2395            test_attr.value()
2396        );
2397
2398        // Probe the fxfs attributes to make sure it did the expected thing. This relies on inside
2399        // knowledge of how the attribute id is chosen.
2400        assert_eq!(
2401            object
2402                .read_attr(AttributeId::XATTR_RANGE_START)
2403                .await
2404                .expect("read_attr failed")
2405                .expect("read_attr returned none")
2406                .into_vec(),
2407            test_attr.value()
2408        );
2409
2410        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2411        is_error(
2412            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2413            FxfsError::NotFound,
2414        );
2415
2416        // Make sure we can object the same attribute being set again.
2417        object
2418            .set_extended_attribute(
2419                test_attr.name(),
2420                test_attr.value(),
2421                SetExtendedAttributeMode::Set,
2422            )
2423            .await
2424            .unwrap();
2425        assert_eq!(
2426            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2427            test_attr.value()
2428        );
2429        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2430        is_error(
2431            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2432            FxfsError::NotFound,
2433        );
2434
2435        fs.close().await.expect("close failed");
2436    }
2437
2438    #[fuchsia::test]
2439    async fn multiple_extended_attributes() {
2440        let (fs, object) = test_filesystem_and_empty_object().await;
2441
2442        let attrs = [
2443            TestAttr::new(b"security.selinux", b"foo"),
2444            TestAttr::new(b"large.attribute", vec![3u8; 300]),
2445            TestAttr::new(b"an.attribute", b"asdf"),
2446            TestAttr::new(b"user.big", vec![5u8; 288]),
2447            TestAttr::new(b"user.tiny", b"smol"),
2448            TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2449            TestAttr::new(b"also big", vec![7u8; 500]),
2450            TestAttr::new(b"all.ones", vec![1u8; 11111]),
2451        ];
2452
2453        for i in 0..attrs.len() {
2454            object
2455                .set_extended_attribute(
2456                    attrs[i].name(),
2457                    attrs[i].value(),
2458                    SetExtendedAttributeMode::Set,
2459                )
2460                .await
2461                .unwrap();
2462            assert_eq!(
2463                object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2464                attrs[i].value()
2465            );
2466        }
2467
2468        for i in 0..attrs.len() {
2469            // Make sure expected attributes are still available.
2470            let mut found_attrs = object.list_extended_attributes().await.unwrap();
2471            let mut expected_attrs: Vec<Vec<u8>> = attrs.iter().skip(i).map(|a| a.name()).collect();
2472            found_attrs.sort();
2473            expected_attrs.sort();
2474            assert_eq!(found_attrs, expected_attrs);
2475            for j in i..attrs.len() {
2476                assert_eq!(
2477                    object.get_extended_attribute(attrs[j].name()).await.unwrap(),
2478                    attrs[j].value()
2479                );
2480            }
2481
2482            object.remove_extended_attribute(attrs[i].name()).await.expect("failed to remove");
2483            is_error(
2484                object.get_extended_attribute(attrs[i].name()).await.unwrap_err(),
2485                FxfsError::NotFound,
2486            );
2487        }
2488
2489        fs.close().await.expect("close failed");
2490    }
2491
2492    #[fuchsia::test]
2493    async fn multiple_extended_attributes_delete() {
2494        let (fs, object) = test_filesystem_and_empty_object().await;
2495        let store = object.owner().clone();
2496
2497        let attrs = [
2498            TestAttr::new(b"security.selinux", b"foo"),
2499            TestAttr::new(b"large.attribute", vec![3u8; 300]),
2500            TestAttr::new(b"an.attribute", b"asdf"),
2501            TestAttr::new(b"user.big", vec![5u8; 288]),
2502            TestAttr::new(b"user.tiny", b"smol"),
2503            TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2504            TestAttr::new(b"also big", vec![7u8; 500]),
2505            TestAttr::new(b"all.ones", vec![1u8; 11111]),
2506        ];
2507
2508        for i in 0..attrs.len() {
2509            object
2510                .set_extended_attribute(
2511                    attrs[i].name(),
2512                    attrs[i].value(),
2513                    SetExtendedAttributeMode::Set,
2514                )
2515                .await
2516                .unwrap();
2517            assert_eq!(
2518                object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2519                attrs[i].value()
2520            );
2521        }
2522
2523        // Unlink the file
2524        let root_directory =
2525            Directory::open(object.owner(), object.store().root_directory_object_id())
2526                .await
2527                .expect("open failed");
2528        let mut transaction = fs
2529            .root_store()
2530            .new_transaction(
2531                lock_keys![
2532                    LockKey::object(store.store_object_id(), store.root_directory_object_id()),
2533                    LockKey::object(store.store_object_id(), object.object_id()),
2534                ],
2535                Options::default(),
2536            )
2537            .await
2538            .expect("new_transaction failed");
2539        crate::object_store::directory::replace_child(
2540            &mut transaction,
2541            None,
2542            (&root_directory, TEST_OBJECT_NAME),
2543        )
2544        .await
2545        .expect("replace_child failed");
2546        transaction.commit().await.unwrap();
2547        store.tombstone_object(object.object_id(), Options::default()).await.unwrap();
2548
2549        crate::fsck::fsck(fs.clone()).await.unwrap();
2550
2551        fs.close().await.expect("close failed");
2552    }
2553
2554    #[fuchsia::test]
2555    async fn extended_attribute_changing_sizes() {
2556        let (fs, object) = test_filesystem_and_empty_object().await;
2557
2558        let test_name = b"security.selinux";
2559        let test_small_attr = TestAttr::new(test_name, b"smol");
2560        let test_large_attr = TestAttr::new(test_name, vec![3u8; 300]);
2561
2562        object
2563            .set_extended_attribute(
2564                test_small_attr.name(),
2565                test_small_attr.value(),
2566                SetExtendedAttributeMode::Set,
2567            )
2568            .await
2569            .unwrap();
2570        assert_eq!(
2571            object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2572            test_small_attr.value()
2573        );
2574
2575        // With a small attribute, we don't expect it to write to an fxfs attribute.
2576        assert!(
2577            object
2578                .read_attr(AttributeId::XATTR_RANGE_START)
2579                .await
2580                .expect("read_attr failed")
2581                .is_none()
2582        );
2583
2584        crate::fsck::fsck(fs.clone()).await.unwrap();
2585
2586        object
2587            .set_extended_attribute(
2588                test_large_attr.name(),
2589                test_large_attr.value(),
2590                SetExtendedAttributeMode::Set,
2591            )
2592            .await
2593            .unwrap();
2594        assert_eq!(
2595            object.get_extended_attribute(test_large_attr.name()).await.unwrap(),
2596            test_large_attr.value()
2597        );
2598
2599        // Once the value is above the threshold, we expect it to get upgraded to an fxfs
2600        // attribute.
2601        assert_eq!(
2602            object
2603                .read_attr(AttributeId::XATTR_RANGE_START)
2604                .await
2605                .expect("read_attr failed")
2606                .expect("read_attr returned none")
2607                .into_vec(),
2608            test_large_attr.value()
2609        );
2610
2611        crate::fsck::fsck(fs.clone()).await.unwrap();
2612
2613        object
2614            .set_extended_attribute(
2615                test_small_attr.name(),
2616                test_small_attr.value(),
2617                SetExtendedAttributeMode::Set,
2618            )
2619            .await
2620            .unwrap();
2621        assert_eq!(
2622            object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2623            test_small_attr.value()
2624        );
2625
2626        // Even though we are back under the threshold, we still expect it to be stored in an fxfs
2627        // attribute, because we don't downgrade to inline once we've allocated one.
2628        assert_eq!(
2629            object
2630                .read_attr(AttributeId::XATTR_RANGE_START)
2631                .await
2632                .expect("read_attr failed")
2633                .expect("read_attr returned none")
2634                .into_vec(),
2635            test_small_attr.value()
2636        );
2637
2638        crate::fsck::fsck(fs.clone()).await.unwrap();
2639
2640        object.remove_extended_attribute(test_small_attr.name()).await.expect("failed to remove");
2641
2642        crate::fsck::fsck(fs.clone()).await.unwrap();
2643
2644        fs.close().await.expect("close failed");
2645    }
2646
2647    #[fuchsia::test]
2648    async fn extended_attribute_max_size() {
2649        let (fs, object) = test_filesystem_and_empty_object().await;
2650
2651        let test_attr = TestAttr::new(
2652            vec![3u8; super::MAX_XATTR_NAME_SIZE],
2653            vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2654        );
2655
2656        object
2657            .set_extended_attribute(
2658                test_attr.name(),
2659                test_attr.value(),
2660                SetExtendedAttributeMode::Set,
2661            )
2662            .await
2663            .unwrap();
2664        assert_eq!(
2665            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2666            test_attr.value()
2667        );
2668        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2669        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2670
2671        fs.close().await.expect("close failed");
2672    }
2673
2674    #[fuchsia::test]
2675    async fn extended_attribute_remove_then_create() {
2676        let (fs, object) = test_filesystem_and_empty_object().await;
2677
2678        let test_attr = TestAttr::new(
2679            vec![3u8; super::MAX_XATTR_NAME_SIZE],
2680            vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2681        );
2682
2683        object
2684            .set_extended_attribute(
2685                test_attr.name(),
2686                test_attr.value(),
2687                SetExtendedAttributeMode::Create,
2688            )
2689            .await
2690            .unwrap();
2691        fs.journal().force_compact().await.unwrap();
2692        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2693        object
2694            .set_extended_attribute(
2695                test_attr.name(),
2696                test_attr.value(),
2697                SetExtendedAttributeMode::Create,
2698            )
2699            .await
2700            .unwrap();
2701
2702        assert_eq!(
2703            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2704            test_attr.value()
2705        );
2706
2707        fs.close().await.expect("close failed");
2708    }
2709
2710    #[fuchsia::test]
2711    async fn large_extended_attribute_max_number() {
2712        let (fs, object) = test_filesystem_and_empty_object().await;
2713
2714        let max_xattrs = AttributeId::XATTR_RANGE_END.raw() - AttributeId::XATTR_RANGE_START.raw();
2715        for i in 0..max_xattrs {
2716            let test_attr = TestAttr::new(format!("{}", i).as_bytes(), vec![0x3; 300]);
2717            object
2718                .set_extended_attribute(
2719                    test_attr.name(),
2720                    test_attr.value(),
2721                    SetExtendedAttributeMode::Set,
2722                )
2723                .await
2724                .unwrap_or_else(|_| panic!("failed to set xattr number {}", i));
2725        }
2726
2727        // That should have taken up all the attributes we've allocated to extended attributes, so
2728        // this one should return ERR_NO_SPACE.
2729        match object
2730            .set_extended_attribute(
2731                b"one.too.many".to_vec(),
2732                vec![0x3; 300],
2733                SetExtendedAttributeMode::Set,
2734            )
2735            .await
2736        {
2737            Ok(()) => panic!("set should not succeed"),
2738            Err(e) => is_error(e, FxfsError::NoSpace),
2739        }
2740
2741        // But inline attributes don't need an attribute number, so it should work fine.
2742        object
2743            .set_extended_attribute(
2744                b"this.is.okay".to_vec(),
2745                b"small value".to_vec(),
2746                SetExtendedAttributeMode::Set,
2747            )
2748            .await
2749            .unwrap();
2750
2751        // And updating existing ones should be okay.
2752        object
2753            .set_extended_attribute(b"11".to_vec(), vec![0x4; 300], SetExtendedAttributeMode::Set)
2754            .await
2755            .unwrap();
2756        object
2757            .set_extended_attribute(
2758                b"12".to_vec(),
2759                vec![0x1; 300],
2760                SetExtendedAttributeMode::Replace,
2761            )
2762            .await
2763            .unwrap();
2764
2765        // And we should be able to remove an attribute and set another one.
2766        object.remove_extended_attribute(b"5".to_vec()).await.unwrap();
2767        object
2768            .set_extended_attribute(
2769                b"new attr".to_vec(),
2770                vec![0x3; 300],
2771                SetExtendedAttributeMode::Set,
2772            )
2773            .await
2774            .unwrap();
2775
2776        fs.close().await.expect("close failed");
2777    }
2778
2779    #[fuchsia::test]
2780    async fn write_attr_trims_beyond_new_end() {
2781        // When writing, multi_write will deallocate old extents that overlap with the new data,
2782        // but it doesn't trim anything beyond that, since it doesn't know what the total size will
2783        // be. write_attr does know, because it writes the whole attribute at once, so we need to
2784        // make sure it cleans up properly.
2785        let (fs, object) = test_filesystem_and_empty_object().await;
2786
2787        let block_size = fs.block_size();
2788        let buf_size = block_size * 2;
2789        let attribute_id = AttributeId::TEST_ID;
2790
2791        let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2792        let mut buffer = object.allocate_buffer(buf_size as usize).await;
2793        buffer.as_mut_slice().fill(3);
2794        // Writing two separate ranges, even if they are contiguous, forces them to be separate
2795        // extent records.
2796        object
2797            .multi_write(
2798                &mut transaction,
2799                attribute_id,
2800                &[0..block_size, block_size..block_size * 2],
2801                buffer.as_mut(),
2802            )
2803            .await
2804            .unwrap();
2805        transaction.add(
2806            object.store().store_object_id,
2807            Mutation::replace_or_insert_object(
2808                ObjectKey::attribute(object.object_id(), attribute_id, AttributeKey::Attribute),
2809                ObjectValue::attribute(block_size * 2, false),
2810            ),
2811        );
2812        transaction.commit().await.unwrap();
2813
2814        crate::fsck::fsck(fs.clone()).await.unwrap();
2815
2816        let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2817        let needs_trim = (*object)
2818            .write_attr(&mut transaction, attribute_id, &vec![3u8; block_size as usize])
2819            .await
2820            .unwrap();
2821        assert!(!needs_trim.0);
2822        transaction.commit().await.unwrap();
2823
2824        crate::fsck::fsck(fs.clone()).await.unwrap();
2825
2826        fs.close().await.expect("close failed");
2827    }
2828
2829    #[fuchsia::test]
2830    async fn write_new_attr_in_batches_multiple_txns() {
2831        let (fs, object) = test_filesystem_and_empty_object().await;
2832        let merkle_tree = vec![1; 3 * WRITE_ATTR_BATCH_SIZE];
2833        let mut transaction = (*object).new_transaction(AttributeId::TEST_ID).await.unwrap();
2834        object
2835            .write_new_attr_in_batches(
2836                &mut transaction,
2837                AttributeId::TEST_ID,
2838                &merkle_tree,
2839                WRITE_ATTR_BATCH_SIZE,
2840            )
2841            .await
2842            .expect("failed to write merkle attribute");
2843
2844        transaction.add(
2845            object.store().store_object_id,
2846            Mutation::replace_or_insert_object(
2847                ObjectKey::graveyard_attribute_entry(
2848                    object.store().graveyard_directory_object_id(),
2849                    object.object_id(),
2850                    AttributeId::TEST_ID,
2851                ),
2852                ObjectValue::None,
2853            ),
2854        );
2855        transaction.commit().await.unwrap();
2856        assert_eq!(
2857            object.read_attr(AttributeId::TEST_ID).await.expect("read_attr failed"),
2858            Some(merkle_tree.into())
2859        );
2860
2861        fs.close().await.expect("close failed");
2862    }
2863
2864    // Running on target only, to use fake time features in the executor.
2865    #[cfg(target_os = "fuchsia")]
2866    #[fuchsia::test(allow_stalls = false)]
2867    async fn test_watchdog() {
2868        use super::Watchdog;
2869        use fuchsia_async::{MonotonicDuration, MonotonicInstant, TestExecutor};
2870        use std::sync::mpsc::channel;
2871
2872        TestExecutor::advance_to(make_time(0)).await;
2873        let (sender, receiver) = channel();
2874
2875        fn make_time(time_secs: i64) -> MonotonicInstant {
2876            MonotonicInstant::from_nanos(0) + MonotonicDuration::from_seconds(time_secs)
2877        }
2878
2879        {
2880            let _watchdog = Watchdog::new(10, move |count| {
2881                sender.send(count).expect("Sending value");
2882            });
2883
2884            // Too early.
2885            TestExecutor::advance_to(make_time(5)).await;
2886            receiver.try_recv().expect_err("Should not have message");
2887
2888            // First message.
2889            TestExecutor::advance_to(make_time(10)).await;
2890            assert_eq!(1, receiver.recv().expect("Receiving"));
2891
2892            // Too early for the next.
2893            TestExecutor::advance_to(make_time(15)).await;
2894            receiver.try_recv().expect_err("Should not have message");
2895
2896            // Missed one. They'll be spooled up.
2897            TestExecutor::advance_to(make_time(30)).await;
2898            assert_eq!(2, receiver.recv().expect("Receiving"));
2899            assert_eq!(3, receiver.recv().expect("Receiving"));
2900        }
2901
2902        // Watchdog is dropped, nothing should trigger.
2903        TestExecutor::advance_to(make_time(100)).await;
2904        receiver.recv().expect_err("Watchdog should be gone");
2905    }
2906
2907    #[fuchsia::test]
2908    fn test_checksum_range_chunk() {
2909        let block_size = 4096;
2910
2911        // No bitmap means one chunk that covers the whole range
2912        assert_eq!(
2913            ChecksumRangeChunk::group_first_write_ranges(
2914                &mut OverwriteBitmaps::None,
2915                block_size,
2916                block_size * 2..block_size * 5,
2917            ),
2918            vec![ChecksumRangeChunk {
2919                checksum_range: 0..3,
2920                device_range: block_size * 2..block_size * 5,
2921                is_first_write: false,
2922            }],
2923        );
2924
2925        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2926        assert_eq!(
2927            ChecksumRangeChunk::group_first_write_ranges(
2928                &mut bitmaps,
2929                block_size,
2930                block_size * 2..block_size * 5,
2931            ),
2932            vec![ChecksumRangeChunk {
2933                checksum_range: 0..3,
2934                device_range: block_size * 2..block_size * 5,
2935                is_first_write: false,
2936            }],
2937        );
2938        assert_eq!(
2939            bitmaps.take_bitmaps(),
2940            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b11100000])))
2941        );
2942
2943        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2944        bitmaps.set_offset(2);
2945        assert_eq!(
2946            ChecksumRangeChunk::group_first_write_ranges(
2947                &mut bitmaps,
2948                block_size,
2949                block_size * 2..block_size * 5,
2950            ),
2951            vec![
2952                ChecksumRangeChunk {
2953                    checksum_range: 0..2,
2954                    device_range: block_size * 2..block_size * 4,
2955                    is_first_write: false,
2956                },
2957                ChecksumRangeChunk {
2958                    checksum_range: 2..3,
2959                    device_range: block_size * 4..block_size * 5,
2960                    is_first_write: true,
2961                },
2962            ],
2963        );
2964        assert_eq!(
2965            bitmaps.take_bitmaps(),
2966            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00111000])))
2967        );
2968
2969        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2970        bitmaps.set_offset(4);
2971        assert_eq!(
2972            ChecksumRangeChunk::group_first_write_ranges(
2973                &mut bitmaps,
2974                block_size,
2975                block_size * 2..block_size * 5,
2976            ),
2977            vec![ChecksumRangeChunk {
2978                checksum_range: 0..3,
2979                device_range: block_size * 2..block_size * 5,
2980                is_first_write: true,
2981            }],
2982        );
2983        assert_eq!(
2984            bitmaps.take_bitmaps(),
2985            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00001110])))
2986        );
2987
2988        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b01010101]));
2989        assert_eq!(
2990            ChecksumRangeChunk::group_first_write_ranges(
2991                &mut bitmaps,
2992                block_size,
2993                block_size * 2..block_size * 10,
2994            ),
2995            vec![
2996                ChecksumRangeChunk {
2997                    checksum_range: 0..1,
2998                    device_range: block_size * 2..block_size * 3,
2999                    is_first_write: true,
3000                },
3001                ChecksumRangeChunk {
3002                    checksum_range: 1..2,
3003                    device_range: block_size * 3..block_size * 4,
3004                    is_first_write: false,
3005                },
3006                ChecksumRangeChunk {
3007                    checksum_range: 2..3,
3008                    device_range: block_size * 4..block_size * 5,
3009                    is_first_write: true,
3010                },
3011                ChecksumRangeChunk {
3012                    checksum_range: 3..4,
3013                    device_range: block_size * 5..block_size * 6,
3014                    is_first_write: false,
3015                },
3016                ChecksumRangeChunk {
3017                    checksum_range: 4..5,
3018                    device_range: block_size * 6..block_size * 7,
3019                    is_first_write: true,
3020                },
3021                ChecksumRangeChunk {
3022                    checksum_range: 5..6,
3023                    device_range: block_size * 7..block_size * 8,
3024                    is_first_write: false,
3025                },
3026                ChecksumRangeChunk {
3027                    checksum_range: 6..7,
3028                    device_range: block_size * 8..block_size * 9,
3029                    is_first_write: true,
3030                },
3031                ChecksumRangeChunk {
3032                    checksum_range: 7..8,
3033                    device_range: block_size * 9..block_size * 10,
3034                    is_first_write: false,
3035                },
3036            ],
3037        );
3038        assert_eq!(
3039            bitmaps.take_bitmaps(),
3040            Some((BitVec::from_bytes(&[0b01010101]), BitVec::from_bytes(&[0b11111111])))
3041        );
3042    }
3043}