Skip to main content

fxfs/object_store/
store_object_handle.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::{Checksum, Checksums, fletcher64};
6use crate::errors::FxfsError;
7use crate::log::*;
8use crate::lsm_tree::Query;
9use crate::lsm_tree::merge::MergerIterator;
10use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
11use crate::object_handle::ObjectHandle;
12use crate::object_store::extent_record::{ExtentMode, ExtentValue};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::object_record::{
15    AttributeKey, ExtendedAttributeValue, ObjectAttributes, ObjectItem, ObjectKey, ObjectKeyData,
16    ObjectValue, Timestamp,
17};
18use crate::object_store::transaction::{
19    AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Options, ReadGuard,
20    Transaction, lock_keys,
21};
22use crate::object_store::{
23    AttributeId, Extent, HandleOptions, HandleOwner, ObjectStore, TrimMode, TrimResult,
24    VOLUME_DATA_KEY_ID,
25};
26use crate::range::RangeExt;
27use crate::round::{round_down, round_up};
28use anyhow::{Context, Error, anyhow, bail, ensure};
29use assert_matches::assert_matches;
30use bit_vec::BitVec;
31use futures::stream::{FuturesOrdered, FuturesUnordered};
32use futures::{TryStreamExt, try_join};
33use fxfs_crypto::{
34    Cipher, CipherHolder, CipherSet, EncryptionKey, FindKeyResult, FxfsCipher, KeyPurpose,
35};
36use fxfs_trace::{TraceFutureExt, trace, trace_future_args};
37use static_assertions::const_assert;
38use std::cmp::min;
39use std::future::Future;
40use std::ops::Range;
41use std::sync::Arc;
42use std::sync::atomic::{self, AtomicBool, Ordering};
43use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
44use storage_device::{InlineCryptoOptions, ReadOptions, WriteOptions};
45
46use fidl_fuchsia_io as fio;
47use fuchsia_async as fasync;
48
49/// Maximum size for an extended attribute name.
50pub const MAX_XATTR_NAME_SIZE: usize = 255;
51/// Maximum size an extended attribute can be before it's stored in an object attribute instead of
52/// inside the record directly.
53pub const MAX_INLINE_XATTR_SIZE: usize = 256;
54/// Maximum size for an extended attribute value. NB: the maximum size for an extended attribute is
55/// 64kB, which we rely on for correctness when deleting attributes, so ensure it's always
56/// enforced.
57pub const MAX_XATTR_VALUE_SIZE: usize = 64000;
58
59/// Zeroes blocks in 'buffer' based on `bitmap`, one bit per block from start of buffer.
60fn apply_bitmap_zeroing(
61    block_size: usize,
62    bitmap: &bit_vec::BitVec,
63    mut buffer: MutableBufferRef<'_>,
64) {
65    let buf = buffer.as_mut_slice();
66    debug_assert_eq!(bitmap.len() * block_size, buf.len());
67    for (i, block) in bitmap.iter().enumerate() {
68        if !block {
69            let start = i * block_size;
70            buf[start..start + block_size].fill(0);
71        }
72    }
73}
74
75/// When writing, often the logic should be generic over whether or not checksums are generated.
76/// This provides that and a handy way to convert to the more general ExtentMode that eventually
77/// stores it on disk.
78#[derive(Debug, Clone, PartialEq)]
79pub enum MaybeChecksums {
80    None,
81    Fletcher(Vec<Checksum>),
82}
83
84impl MaybeChecksums {
85    pub fn maybe_as_ref(&self) -> Option<&[Checksum]> {
86        match self {
87            Self::None => None,
88            Self::Fletcher(sums) => Some(&sums),
89        }
90    }
91
92    pub fn split_off(&mut self, at: usize) -> Self {
93        match self {
94            Self::None => Self::None,
95            Self::Fletcher(sums) => Self::Fletcher(sums.split_off(at)),
96        }
97    }
98
99    pub fn to_mode(self) -> ExtentMode {
100        match self {
101            Self::None => ExtentMode::Raw,
102            Self::Fletcher(sums) => ExtentMode::Cow(Checksums::fletcher(sums)),
103        }
104    }
105
106    pub fn into_option(self) -> Option<Vec<Checksum>> {
107        match self {
108            Self::None => None,
109            Self::Fletcher(sums) => Some(sums),
110        }
111    }
112}
113
114/// The mode of operation when setting extended attributes. This is the same as the fidl definition
115/// but is replicated here so we don't have fuchsia.io structures in the api, so this can be used
116/// on host.
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118pub enum SetExtendedAttributeMode {
119    /// Create the extended attribute if it doesn't exist, replace the value if it does.
120    Set,
121    /// Create the extended attribute if it doesn't exist, fail if it does.
122    Create,
123    /// Replace the extended attribute value if it exists, fail if it doesn't.
124    Replace,
125}
126
127impl From<fio::SetExtendedAttributeMode> for SetExtendedAttributeMode {
128    fn from(other: fio::SetExtendedAttributeMode) -> SetExtendedAttributeMode {
129        match other {
130            fio::SetExtendedAttributeMode::Set => SetExtendedAttributeMode::Set,
131            fio::SetExtendedAttributeMode::Create => SetExtendedAttributeMode::Create,
132            fio::SetExtendedAttributeMode::Replace => SetExtendedAttributeMode::Replace,
133        }
134    }
135}
136
137enum Encryption {
138    /// The object doesn't use encryption.
139    None,
140
141    /// The object has keys that are cached (which means unwrapping occurs on-demand) with
142    /// KeyManager.
143    CachedKeys,
144
145    /// The object has permanent keys registered with KeyManager.
146    PermanentKeys,
147}
148
149#[derive(PartialEq, Debug)]
150enum OverwriteBitmaps {
151    None,
152    Some {
153        /// The block bitmap of a partial overwrite extent in the tree.
154        extent_bitmap: BitVec,
155        /// A bitmap of the blocks written to by the current overwrite.
156        write_bitmap: BitVec,
157        /// BitVec doesn't have a slice equivalent, so for a particular section of the write we
158        /// keep track of an offset in the bitmaps to operate on.
159        bitmap_offset: usize,
160    },
161}
162
163impl OverwriteBitmaps {
164    fn new(extent_bitmap: BitVec) -> Self {
165        OverwriteBitmaps::Some {
166            write_bitmap: BitVec::from_elem(extent_bitmap.len(), false),
167            extent_bitmap,
168            bitmap_offset: 0,
169        }
170    }
171
172    fn is_none(&self) -> bool {
173        *self == OverwriteBitmaps::None
174    }
175
176    fn set_offset(&mut self, new_offset: usize) {
177        match self {
178            OverwriteBitmaps::None => (),
179            OverwriteBitmaps::Some { bitmap_offset, .. } => *bitmap_offset = new_offset,
180        }
181    }
182
183    fn get_from_extent_bitmap(&self, i: usize) -> Option<bool> {
184        match self {
185            OverwriteBitmaps::None => None,
186            OverwriteBitmaps::Some { extent_bitmap, bitmap_offset, .. } => {
187                extent_bitmap.get(*bitmap_offset + i)
188            }
189        }
190    }
191
192    fn set_in_write_bitmap(&mut self, i: usize, x: bool) {
193        match self {
194            OverwriteBitmaps::None => (),
195            OverwriteBitmaps::Some { write_bitmap, bitmap_offset, .. } => {
196                write_bitmap.set(*bitmap_offset + i, x)
197            }
198        }
199    }
200
201    fn take_bitmaps(self) -> Option<(BitVec, BitVec)> {
202        match self {
203            OverwriteBitmaps::None => None,
204            OverwriteBitmaps::Some { extent_bitmap, write_bitmap, .. } => {
205                Some((extent_bitmap, write_bitmap))
206            }
207        }
208    }
209}
210
211/// When writing to Overwrite ranges, we need to emit whether a set of checksums for a device range
212/// is the first write to that region or not. This tracks one such range so we can use it after the
213/// write to break up the returned checksum list.
214#[derive(PartialEq, Debug)]
215struct ChecksumRangeChunk {
216    checksum_range: Range<usize>,
217    device_range: Range<u64>,
218    is_first_write: bool,
219}
220
221impl ChecksumRangeChunk {
222    fn group_first_write_ranges(
223        bitmaps: &mut OverwriteBitmaps,
224        block_size: u64,
225        write_device_range: Range<u64>,
226    ) -> Vec<ChecksumRangeChunk> {
227        let write_block_len = (write_device_range.length().unwrap() / block_size) as usize;
228        if bitmaps.is_none() {
229            // If there is no bitmap, then the overwrite range is fully written to. However, we
230            // could still be within the journal flush window where one of the blocks was written
231            // to for the first time to put it in this state, so we still need to emit the
232            // checksums in case replay needs them.
233            vec![ChecksumRangeChunk {
234                checksum_range: 0..write_block_len,
235                device_range: write_device_range,
236                is_first_write: false,
237            }]
238        } else {
239            let mut checksum_ranges = vec![ChecksumRangeChunk {
240                checksum_range: 0..0,
241                device_range: write_device_range.start..write_device_range.start,
242                is_first_write: !bitmaps.get_from_extent_bitmap(0).unwrap(),
243            }];
244            let mut working_range = checksum_ranges.last_mut().unwrap();
245            for i in 0..write_block_len {
246                bitmaps.set_in_write_bitmap(i, true);
247
248                // bitmap.get returning true means the block is initialized and therefore has been
249                // written to before.
250                if working_range.is_first_write != bitmaps.get_from_extent_bitmap(i).unwrap() {
251                    // is_first_write is tracking opposite of what comes back from the bitmap, so
252                    // if the are still opposites we continue our current range.
253                    working_range.checksum_range.end += 1;
254                    working_range.device_range.end += block_size;
255                } else {
256                    // If they are the same, then we need to make a new chunk.
257                    let new_chunk = ChecksumRangeChunk {
258                        checksum_range: working_range.checksum_range.end
259                            ..working_range.checksum_range.end + 1,
260                        device_range: working_range.device_range.end
261                            ..working_range.device_range.end + block_size,
262                        is_first_write: !working_range.is_first_write,
263                    };
264                    checksum_ranges.push(new_chunk);
265                    working_range = checksum_ranges.last_mut().unwrap();
266                }
267            }
268            checksum_ranges
269        }
270    }
271}
272
273/// StoreObjectHandle is the lowest-level, untyped handle to an object with the id [`object_id`] in
274/// a particular store, [`owner`]. It provides functionality shared across all objects, such as
275/// reading and writing attributes and managing encryption keys.
276///
277/// Since it's untyped, it doesn't do any object kind validation, and is generally meant to
278/// implement higher-level typed handles.
279///
280/// For file-like objects with a data attribute, DataObjectHandle implements traits and helpers for
281/// doing more complex extent management and caches the content size.
282///
283/// For directory-like objects, Directory knows how to add and remove child objects and enumerate
284/// its children.
285pub struct StoreObjectHandle<S: HandleOwner> {
286    owner: Arc<S>,
287    object_id: u64,
288    options: HandleOptions,
289    trace: AtomicBool,
290    encryption: Encryption,
291}
292
293impl<S: HandleOwner> ObjectHandle for StoreObjectHandle<S> {
294    fn set_trace(&self, v: bool) {
295        info!(store_id = self.store().store_object_id, oid = self.object_id(), trace = v; "trace");
296        self.trace.store(v, atomic::Ordering::Relaxed);
297    }
298
299    fn object_id(&self) -> u64 {
300        return self.object_id;
301    }
302
303    fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
304        self.store().device.allocate_buffer(size)
305    }
306
307    fn block_size(&self) -> u64 {
308        self.store().block_size()
309    }
310}
311
312struct Watchdog {
313    _task: fasync::Task<()>,
314}
315
316impl Watchdog {
317    fn new(increment_seconds: u64, cb: impl Fn(u64) + Send + 'static) -> Self {
318        Self {
319            _task: fasync::Task::spawn(
320                async move {
321                    let increment = increment_seconds.try_into().unwrap();
322                    let mut fired_counter = 0;
323                    let mut next_wake = fasync::MonotonicInstant::now();
324                    loop {
325                        next_wake += std::time::Duration::from_secs(increment).into();
326                        // If this isn't being scheduled this will purposely result in fast looping
327                        // when it does. This will be insightful about the state of the thread and
328                        // task scheduling.
329                        if fasync::MonotonicInstant::now() < next_wake {
330                            fasync::Timer::new(next_wake).await;
331                        }
332                        fired_counter += 1;
333                        cb(fired_counter);
334                    }
335                }
336                .trace(trace_future_args!("StoreObjectHandle::Watchdog")),
337            ),
338        }
339    }
340}
341
342impl<S: HandleOwner> StoreObjectHandle<S> {
343    /// Make a new StoreObjectHandle for the object with id [`object_id`] in store [`owner`].
344    pub fn new(
345        owner: Arc<S>,
346        object_id: u64,
347        permanent_keys: bool,
348        options: HandleOptions,
349        trace: bool,
350    ) -> Self {
351        let encryption = if permanent_keys {
352            Encryption::PermanentKeys
353        } else if owner.as_ref().as_ref().is_encrypted() {
354            Encryption::CachedKeys
355        } else {
356            Encryption::None
357        };
358        Self { owner, object_id, encryption, options, trace: AtomicBool::new(trace) }
359    }
360
361    pub fn owner(&self) -> &Arc<S> {
362        &self.owner
363    }
364
365    pub fn store(&self) -> &ObjectStore {
366        self.owner.as_ref().as_ref()
367    }
368
369    pub fn trace(&self) -> bool {
370        self.trace.load(atomic::Ordering::Relaxed)
371    }
372
373    pub fn is_encrypted(&self) -> bool {
374        !matches!(self.encryption, Encryption::None)
375    }
376
377    /// Get the default set of transaction options for this object. This is mostly the overall
378    /// default, modified by any [`HandleOptions`] held by this handle.
379    pub fn default_transaction_options<'b>(&self) -> Options<'b> {
380        Options { skip_journal_checks: self.options.skip_journal_checks, ..Default::default() }
381    }
382
383    pub async fn new_transaction_with_options<'b>(
384        &self,
385        attribute_id: AttributeId,
386        options: Options<'b>,
387    ) -> Result<Transaction<'b>, Error> {
388        Ok(self
389            .store()
390            .filesystem()
391            .new_transaction(
392                lock_keys![
393                    LockKey::object_attribute(
394                        self.store().store_object_id(),
395                        self.object_id(),
396                        attribute_id,
397                    ),
398                    LockKey::object(self.store().store_object_id(), self.object_id()),
399                ],
400                options,
401            )
402            .await?)
403    }
404
405    pub async fn new_transaction<'b>(
406        &self,
407        attribute_id: AttributeId,
408    ) -> Result<Transaction<'b>, Error> {
409        self.new_transaction_with_options(attribute_id, self.default_transaction_options()).await
410    }
411
412    // If |transaction| has an impending mutation for the underlying object, returns that.
413    // Otherwise, looks up the object from the tree.
414    async fn txn_get_object_mutation(
415        &self,
416        transaction: &Transaction<'_>,
417    ) -> Result<ObjectStoreMutation, Error> {
418        self.store().txn_get_object_mutation(transaction, self.object_id()).await
419    }
420
421    // Returns the amount deallocated.
422    async fn deallocate_old_extents(
423        &self,
424        transaction: &mut Transaction<'_>,
425        attribute_id: AttributeId,
426        range: Range<u64>,
427    ) -> Result<u64, Error> {
428        let block_size = self.block_size();
429        assert_eq!(range.start % block_size, 0);
430        assert_eq!(range.end % block_size, 0);
431        if range.start == range.end {
432            return Ok(0);
433        }
434        let tree = &self.store().tree;
435        let layer_set = tree.layer_set();
436        let key = Extent(range);
437        let lower_bound = ObjectKey::attribute(
438            self.object_id(),
439            attribute_id,
440            AttributeKey::Extent(key.search_key()),
441        );
442        let mut merger = layer_set.merger();
443        let mut iter = merger.query(Query::FullRange(&lower_bound)).await?;
444        let allocator = self.store().allocator();
445        let mut deallocated = 0;
446        let trace = self.trace();
447        while let Some(ItemRef {
448            key:
449                ObjectKey {
450                    object_id,
451                    data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
452                },
453            value: ObjectValue::Extent(value),
454            ..
455        }) = iter.get()
456        {
457            if *object_id != self.object_id() || *attr_id != attribute_id {
458                break;
459            }
460            if let ExtentValue::Some { device_offset, .. } = value {
461                if let Some(overlap) = key.overlap(extent_key) {
462                    let range = device_offset + overlap.start - extent_key.start
463                        ..device_offset + overlap.end - extent_key.start;
464                    ensure!(range.is_aligned(block_size), FxfsError::Inconsistent);
465                    if trace {
466                        info!(
467                            store_id = self.store().store_object_id(),
468                            oid = self.object_id(),
469                            device_range:? = range,
470                            len = range.end - range.start,
471                            extent_key:?;
472                            "D",
473                        );
474                    }
475                    allocator
476                        .deallocate(transaction, self.store().store_object_id(), range)
477                        .await?;
478                    deallocated += overlap.end - overlap.start;
479                } else {
480                    break;
481                }
482            }
483            iter.advance().await?;
484        }
485        Ok(deallocated)
486    }
487
488    // Writes aligned data (that should already be encrypted) to the given offset and computes
489    // checksums if requested. The aligned data must be from a single logical file range.
490    async fn write_aligned(
491        &self,
492        buf: BufferRef<'_>,
493        device_offset: u64,
494        crypt_ctx: Option<(u32, u8)>,
495    ) -> Result<MaybeChecksums, Error> {
496        if self.trace() {
497            info!(
498                store_id = self.store().store_object_id(),
499                oid = self.object_id(),
500                device_range:? = (device_offset..device_offset + buf.len() as u64),
501                len = buf.len();
502                "W",
503            );
504        }
505        let store = self.store();
506        store.device_write_ops.fetch_add(1, Ordering::Relaxed);
507        let mut checksums = Vec::new();
508        let _watchdog = Watchdog::new(10, |count| {
509            warn!("Write has been stalled for {} seconds", count * 10);
510        });
511
512        match crypt_ctx {
513            Some((dun, slot)) => {
514                if !store.filesystem().options().barriers_enabled {
515                    return Err(anyhow!(FxfsError::InvalidArgs)
516                        .context("Barriers must be enabled for inline encrypted writes."));
517                }
518                store
519                    .device
520                    .write_with_opts(
521                        device_offset as u64,
522                        buf,
523                        WriteOptions {
524                            inline_crypto: InlineCryptoOptions::enabled(slot, dun),
525                            ..Default::default()
526                        },
527                    )
528                    .await?;
529                Ok(MaybeChecksums::None)
530            }
531            None => {
532                if self.options.skip_checksums {
533                    store
534                        .device
535                        .write_with_opts(device_offset as u64, buf, WriteOptions::default())
536                        .await?;
537                    Ok(MaybeChecksums::None)
538                } else {
539                    try_join!(store.device.write(device_offset, buf), async {
540                        let block_size = self.block_size();
541                        for chunk in buf.as_slice().chunks_exact(block_size as usize) {
542                            checksums.push(fletcher64(chunk, 0));
543                        }
544                        Ok(())
545                    })?;
546                    Ok(MaybeChecksums::Fletcher(checksums))
547                }
548            }
549        }
550    }
551
552    /// Flushes the underlying device.  This is expensive and should be used sparingly.
553    pub async fn flush_device(&self) -> Result<(), Error> {
554        self.store().device().flush().await
555    }
556
557    pub async fn update_allocated_size(
558        &self,
559        transaction: &mut Transaction<'_>,
560        allocated: u64,
561        deallocated: u64,
562    ) -> Result<(), Error> {
563        if allocated == deallocated {
564            return Ok(());
565        }
566        let mut mutation = self.txn_get_object_mutation(transaction).await?;
567        if let ObjectValue::Object {
568            attributes: ObjectAttributes { project_id, allocated_size, .. },
569            ..
570        } = &mut mutation.item.value
571        {
572            // The only way for these to fail are if the volume is inconsistent.
573            *allocated_size = allocated_size
574                .checked_add(allocated)
575                .ok_or_else(|| anyhow!(FxfsError::Inconsistent).context("Allocated size overflow"))?
576                .checked_sub(deallocated)
577                .ok_or_else(|| {
578                    anyhow!(FxfsError::Inconsistent).context("Allocated size underflow")
579                })?;
580
581            if let Some(project_id) = project_id {
582                // The allocated and deallocated shouldn't exceed the max size of the file which is
583                // bound within i64.
584                let diff = i64::try_from(allocated).unwrap() - i64::try_from(deallocated).unwrap();
585                transaction.add(
586                    self.store().store_object_id(),
587                    Mutation::merge_object(
588                        ObjectKey::project_usage(
589                            self.store().root_directory_object_id(),
590                            *project_id,
591                        ),
592                        ObjectValue::BytesAndNodes { bytes: diff, nodes: 0 },
593                    ),
594                );
595            }
596        } else {
597            // This can occur when the object mutation is created from an object in the tree which
598            // was corrupt.
599            bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
600        }
601        transaction.add(self.store().store_object_id, Mutation::ObjectStore(mutation));
602        Ok(())
603    }
604
605    pub async fn update_attributes<'a>(
606        &self,
607        transaction: &mut Transaction<'a>,
608        node_attributes: Option<&fio::MutableNodeAttributes>,
609        change_time: Option<Timestamp>,
610    ) -> Result<(), Error> {
611        if let Some(&fio::MutableNodeAttributes { selinux_context: Some(ref context), .. }) =
612            node_attributes
613        {
614            if let fio::SelinuxContext::Data(context) = context {
615                self.set_extended_attribute_impl(
616                    "security.selinux".into(),
617                    context.clone(),
618                    SetExtendedAttributeMode::Set,
619                    transaction,
620                )
621                .await?;
622            } else {
623                return Err(anyhow!(FxfsError::InvalidArgs)
624                    .context("Only set SELinux context with `data` member."));
625            }
626        }
627        self.store()
628            .update_attributes(transaction, self.object_id, node_attributes, change_time)
629            .await
630    }
631
632    /// Zeroes the given range.  The range must be aligned.  Returns the amount of data deallocated.
633    pub async fn zero(
634        &self,
635        transaction: &mut Transaction<'_>,
636        attribute_id: AttributeId,
637        range: Range<u64>,
638    ) -> Result<(), Error> {
639        let deallocated =
640            self.deallocate_old_extents(transaction, attribute_id, range.clone()).await?;
641        if deallocated > 0 {
642            self.update_allocated_size(transaction, 0, deallocated).await?;
643            transaction.add(
644                self.store().store_object_id,
645                Mutation::merge_object(
646                    ObjectKey::extent(self.object_id(), attribute_id, range),
647                    ObjectValue::Extent(ExtentValue::deleted_extent()),
648                ),
649            );
650        }
651        Ok(())
652    }
653
654    // Returns a new aligned buffer (reading the head and tail blocks if necessary) with a copy of
655    // the data from `buf`.
656    pub async fn align_buffer(
657        &self,
658        attribute_id: AttributeId,
659        offset: u64,
660        buf: BufferRef<'_>,
661    ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
662        let block_size = self.block_size();
663        let end = offset + buf.len() as u64;
664        let aligned =
665            round_down(offset, block_size)..round_up(end, block_size).ok_or(FxfsError::TooBig)?;
666
667        let mut aligned_buf =
668            self.store().device.allocate_buffer((aligned.end - aligned.start) as usize).await;
669
670        // Deal with head alignment.
671        if aligned.start < offset {
672            let mut head_block = aligned_buf.subslice_mut(..block_size as usize);
673            let read = self.read(attribute_id, aligned.start, head_block.reborrow()).await?;
674            head_block.as_mut_slice()[read..].fill(0);
675        }
676
677        // Deal with tail alignment.
678        if aligned.end > end {
679            let end_block_offset = aligned.end - block_size;
680            // There's no need to read the tail block if we read it as part of the head block.
681            if offset <= end_block_offset {
682                let mut tail_block =
683                    aligned_buf.subslice_mut(aligned_buf.len() - block_size as usize..);
684                let read = self.read(attribute_id, end_block_offset, tail_block.reborrow()).await?;
685                tail_block.as_mut_slice()[read..].fill(0);
686            }
687        }
688
689        aligned_buf.as_mut_slice()
690            [(offset - aligned.start) as usize..(end - aligned.start) as usize]
691            .copy_from_slice(buf.as_slice());
692
693        Ok((aligned, aligned_buf))
694    }
695
696    /// Trim an attribute's extents, potentially adding a graveyard trim entry if more trimming is
697    /// needed, so the transaction can be committed without worrying about leaking data.
698    ///
699    /// This doesn't update the size stored in the attribute value - the caller is responsible for
700    /// doing that to keep the size up to date.
701    pub async fn shrink(
702        &self,
703        transaction: &mut Transaction<'_>,
704        attribute_id: AttributeId,
705        size: u64,
706    ) -> Result<NeedsTrim, Error> {
707        let store = self.store();
708        let needs_trim = matches!(
709            store
710                .trim_some(transaction, self.object_id(), attribute_id, TrimMode::FromOffset(size))
711                .await?,
712            TrimResult::Incomplete
713        );
714        if needs_trim {
715            // Add the object to the graveyard in case the following transactions don't get
716            // replayed.
717            let graveyard_id = store.graveyard_directory_object_id();
718            match store
719                .tree
720                .find(&ObjectKey::graveyard_entry(graveyard_id, self.object_id()))
721                .await?
722            {
723                Some(ObjectItem { value: ObjectValue::Some, .. })
724                | Some(ObjectItem { value: ObjectValue::Trim, .. }) => {
725                    // This object is already in the graveyard so we don't need to do anything.
726                }
727                _ => {
728                    transaction.add(
729                        store.store_object_id,
730                        Mutation::replace_or_insert_object(
731                            ObjectKey::graveyard_entry(graveyard_id, self.object_id()),
732                            ObjectValue::Trim,
733                        ),
734                    );
735                }
736            }
737        }
738        Ok(NeedsTrim(needs_trim))
739    }
740
741    /// Reads and decrypts a singular logical range.
742    pub async fn read_and_decrypt(
743        &self,
744        device_offset: u64,
745        file_offset: u64,
746        mut buffer: MutableBufferRef<'_>,
747        key_id: u64,
748    ) -> Result<(), Error> {
749        let store = self.store();
750        store.device_read_ops.fetch_add(1, Ordering::Relaxed);
751
752        let _watchdog = Watchdog::new(10, |count| {
753            warn!("Read has been stalled for {} seconds", count * 10);
754        });
755
756        let (_key_id, key) = self.get_key(Some(key_id)).await?;
757        if let Some(key) = key {
758            if let Some((dun, slot)) = key.crypt_ctx(self.object_id, file_offset) {
759                store
760                    .device
761                    .read_with_opts(
762                        device_offset as u64,
763                        buffer.reborrow(),
764                        ReadOptions { inline_crypto: InlineCryptoOptions::enabled(slot, dun) },
765                    )
766                    .await?;
767            } else {
768                store.device.read(device_offset, buffer.reborrow()).await?;
769                key.decrypt(self.object_id, device_offset, file_offset, buffer.as_mut_slice())?;
770            }
771        } else {
772            store.device.read(device_offset, buffer.reborrow()).await?;
773        }
774
775        Ok(())
776    }
777
778    /// Returns the specified key. If `key_id` is None, it will try and return the fscrypt key if
779    /// it is present, or the volume key if it isn't. If the fscrypt key is present, but the key
780    /// cannot be unwrapped, then this will return `FxfsError::NoKey`. If the volume is not
781    /// encrypted, this returns None.
782    pub async fn get_key(
783        &self,
784        key_id: Option<u64>,
785    ) -> Result<(u64, Option<Arc<dyn Cipher>>), Error> {
786        let store = self.store();
787        let result = match self.encryption {
788            Encryption::None => (VOLUME_DATA_KEY_ID, None),
789            Encryption::CachedKeys => {
790                if let Some(key_id) = key_id {
791                    (
792                        key_id,
793                        Some(
794                            store
795                                .key_manager
796                                .get_key(
797                                    self.object_id,
798                                    store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
799                                    async || store.get_keys(self.object_id).await,
800                                    key_id,
801                                )
802                                .await?,
803                        ),
804                    )
805                } else {
806                    let (key_id, key) = store
807                        .key_manager
808                        .get_fscrypt_key_if_present(
809                            self.object_id,
810                            store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
811                            async || store.get_keys(self.object_id).await,
812                        )
813                        .await?;
814                    (key_id, Some(key))
815                }
816            }
817            Encryption::PermanentKeys => {
818                (VOLUME_DATA_KEY_ID, Some(store.key_manager.get(self.object_id).await?.unwrap()))
819            }
820        };
821
822        // Ensure that if the key we receive uses inline encryption, barriers should be enabled.
823        if let Some(ref key) = result.1 {
824            if key.crypt_ctx(self.object_id, 0).is_some() {
825                if !store.filesystem().options().barriers_enabled {
826                    return Err(anyhow!(FxfsError::InvalidArgs)
827                        .context("Barriers must be enabled for inline encrypted writes."));
828                }
829            }
830        }
831
832        Ok(result)
833    }
834
835    /// This will only work for a non-permanent volume data key. This is designed to be used with
836    /// extended attributes where we'll only create the key on demand for directories and encrypted
837    /// files.
838    async fn get_or_create_key(
839        &self,
840        transaction: &mut Transaction<'_>,
841    ) -> Result<Arc<dyn Cipher>, Error> {
842        let store = self.store();
843
844        // Fast path: try and get keys from the cache.
845        if let Some(key) = store.key_manager.get(self.object_id).await.context("get failed")? {
846            return Ok(key);
847        }
848
849        let crypt = store.crypt().ok_or_else(|| anyhow!("No crypt!"))?;
850
851        // Next, see if the keys are already created.
852        let (mut encryption_keys, mut cipher_set) = if let Some(item) =
853            store.tree.find(&ObjectKey::keys(self.object_id)).await.context("find failed")?
854        {
855            if let ObjectValue::Keys(encryption_keys) = item.value {
856                let cipher_set = store
857                    .key_manager
858                    .get_keys(
859                        self.object_id,
860                        crypt.as_ref(),
861                        &mut Some(async || Ok(encryption_keys.clone())),
862                        /* permanent= */ false,
863                        /* force= */ false,
864                    )
865                    .await
866                    .context("get_keys failed")?;
867                match cipher_set.find_key(VOLUME_DATA_KEY_ID) {
868                    FindKeyResult::NotFound => {}
869                    FindKeyResult::Unavailable => return Err(FxfsError::NoKey.into()),
870                    FindKeyResult::Key(key) => return Ok(key),
871                }
872                (encryption_keys, (*cipher_set).clone())
873            } else {
874                return Err(anyhow!(FxfsError::Inconsistent));
875            }
876        } else {
877            Default::default()
878        };
879
880        // Proceed to create the key.  The transaction holds the required locks.
881        let (key, unwrapped_key) = crypt.create_key(self.object_id, KeyPurpose::Data).await?;
882        let cipher: Arc<dyn Cipher> = Arc::new(FxfsCipher::new(&unwrapped_key));
883
884        // Add new cipher to cloned cipher set. This will replace existing one
885        // if transaction is successful.
886        cipher_set.add_key(VOLUME_DATA_KEY_ID, CipherHolder::Cipher(cipher.clone()));
887        let cipher_set = Arc::new(cipher_set);
888
889        // Arrange for the CipherSet to be added to the cache when (and if) the transaction
890        // commits.
891        struct UnwrappedKeys {
892            object_id: u64,
893            new_keys: Arc<CipherSet>,
894        }
895
896        impl AssociatedObject for UnwrappedKeys {
897            fn will_apply_mutation(
898                &self,
899                _mutation: &Mutation,
900                object_id: u64,
901                manager: &ObjectManager,
902            ) {
903                manager.store(object_id).unwrap().key_manager.insert(
904                    self.object_id,
905                    self.new_keys.clone(),
906                    /* permanent= */ false,
907                );
908            }
909        }
910
911        encryption_keys.insert(VOLUME_DATA_KEY_ID, EncryptionKey::Fxfs(key).into());
912
913        transaction.add_with_object(
914            store.store_object_id(),
915            Mutation::replace_or_insert_object(
916                ObjectKey::keys(self.object_id),
917                ObjectValue::keys(encryption_keys),
918            ),
919            AssocObj::Owned(Box::new(UnwrappedKeys {
920                object_id: self.object_id,
921                new_keys: cipher_set,
922            })),
923        );
924
925        Ok(cipher)
926    }
927
928    pub async fn read(
929        &self,
930        attribute_id: AttributeId,
931        offset: u64,
932        mut buf: MutableBufferRef<'_>,
933    ) -> Result<usize, Error> {
934        let fs = self.store().filesystem();
935        let guard = fs
936            .lock_manager()
937            .read_lock(lock_keys![LockKey::object_attribute(
938                self.store().store_object_id(),
939                self.object_id(),
940                attribute_id,
941            )])
942            .await;
943
944        let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
945        let item = self.store().tree().find(&key).await?;
946        let size = match item {
947            Some(item) if item.key == key => match item.value {
948                ObjectValue::Attribute { size, .. } => size,
949                _ => bail!(FxfsError::Inconsistent),
950            },
951            _ => return Ok(0),
952        };
953        if offset >= size {
954            return Ok(0);
955        }
956        let length = min(buf.len() as u64, size - offset) as usize;
957        buf = buf.subslice_mut(0..length);
958        self.read_unchecked(attribute_id, offset, buf, &guard).await?;
959        Ok(length)
960    }
961
962    /// Read `buf.len()` bytes from the attribute `attribute_id`, starting at `offset`, into `buf`.
963    /// It's required that a read lock on this attribute id is taken before this is called.
964    ///
965    /// This function doesn't do any size checking - any portion of `buf` past the end of the file
966    /// will be filled with zeros. The caller is responsible for enforcing the file size on reads.
967    /// This is because, just looking at the extents, we can't tell the difference between the file
968    /// actually ending and there just being a section at the end with no data (since attributes
969    /// are sparse).
970    pub async fn read_unchecked(
971        &self,
972        attribute_id: AttributeId,
973        mut offset: u64,
974        mut buf: MutableBufferRef<'_>,
975        _guard: &ReadGuard<'_>,
976    ) -> Result<(), Error> {
977        if buf.len() == 0 {
978            return Ok(());
979        }
980        let end_offset = offset + buf.len() as u64;
981
982        self.store().logical_read_ops.fetch_add(1, Ordering::Relaxed);
983
984        // Whilst the read offset must be aligned to the filesystem block size, the buffer need only
985        // be aligned to the device's block size.
986        let block_size = self.block_size() as u64;
987        let device_block_size = self.store().device.block_size() as u64;
988        assert_eq!(offset % block_size, 0);
989        assert_eq!(buf.range().start as u64 % device_block_size, 0);
990        let tree = &self.store().tree;
991        let layer_set = tree.layer_set();
992        let mut merger = layer_set.merger();
993        let mut iter = merger
994            .query(Query::LimitedRange(&ObjectKey::extent(
995                self.object_id(),
996                attribute_id,
997                offset..end_offset,
998            )))
999            .await?;
1000        let end_align = ((offset + buf.len() as u64) % block_size) as usize;
1001        let trace = self.trace();
1002        let reads = FuturesUnordered::new();
1003        while let Some(ItemRef {
1004            key:
1005                ObjectKey {
1006                    object_id,
1007                    data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
1008                },
1009            value: ObjectValue::Extent(extent_value),
1010            ..
1011        }) = iter.get()
1012        {
1013            if *object_id != self.object_id() || *attr_id != attribute_id {
1014                break;
1015            }
1016            ensure!(
1017                extent_key.is_valid() && extent_key.is_aligned(block_size),
1018                FxfsError::Inconsistent
1019            );
1020            if extent_key.start > offset {
1021                // Zero everything up to the start of the extent.
1022                let to_zero = min(extent_key.start - offset, buf.len() as u64) as usize;
1023                buf.as_mut_slice()[..to_zero].fill(0);
1024                buf = buf.subslice_mut(to_zero..);
1025                if buf.is_empty() {
1026                    break;
1027                }
1028                offset += to_zero as u64;
1029            }
1030
1031            if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1032                let mut device_offset = device_offset + (offset - extent_key.start);
1033                let key_id = *key_id;
1034
1035                let to_copy = min(buf.len() - end_align, (extent_key.end - offset) as usize);
1036                if to_copy > 0 {
1037                    if trace {
1038                        info!(
1039                            store_id = self.store().store_object_id(),
1040                            oid = self.object_id(),
1041                            device_range:? = (device_offset..device_offset + to_copy as u64),
1042                            offset,
1043                            range:? = **extent_key,
1044                            block_size;
1045                            "R",
1046                        );
1047                    }
1048                    let (mut head, tail) = buf.split_at_mut(to_copy);
1049                    let maybe_bitmap = match mode {
1050                        ExtentMode::OverwritePartial(bitmap) => {
1051                            let mut read_bitmap = bitmap
1052                                .clone()
1053                                .split_off(((offset - extent_key.start) / block_size) as usize);
1054                            read_bitmap.truncate(to_copy / block_size as usize);
1055                            Some(read_bitmap)
1056                        }
1057                        _ => None,
1058                    };
1059                    reads.push(async move {
1060                        self.read_and_decrypt(device_offset, offset, head.reborrow(), key_id)
1061                            .await?;
1062                        if let Some(bitmap) = maybe_bitmap {
1063                            apply_bitmap_zeroing(self.block_size() as usize, &bitmap, head);
1064                        }
1065                        Ok::<(), Error>(())
1066                    });
1067                    buf = tail;
1068                    if buf.is_empty() {
1069                        break;
1070                    }
1071                    offset += to_copy as u64;
1072                    device_offset += to_copy as u64;
1073                }
1074
1075                // Deal with end alignment by reading the existing contents into an alignment
1076                // buffer.
1077                if offset < extent_key.end && end_align > 0 {
1078                    if let ExtentMode::OverwritePartial(bitmap) = mode {
1079                        let bitmap_offset = (offset - extent_key.start) / block_size;
1080                        if !bitmap.get(bitmap_offset as usize).ok_or(FxfsError::Inconsistent)? {
1081                            // If this block isn't actually initialized, skip it.
1082                            break;
1083                        }
1084                    }
1085                    let mut align_buf =
1086                        self.store().device.allocate_buffer(block_size as usize).await;
1087                    if trace {
1088                        info!(
1089                            store_id = self.store().store_object_id(),
1090                            oid = self.object_id(),
1091                            device_range:? = (device_offset..device_offset + align_buf.len() as u64);
1092                            "RT",
1093                        );
1094                    }
1095                    self.read_and_decrypt(device_offset, offset, align_buf.as_mut(), key_id)
1096                        .await?;
1097                    buf.as_mut_slice().copy_from_slice(&align_buf.as_slice()[..end_align]);
1098                    buf = buf.subslice_mut(0..0);
1099                    break;
1100                }
1101            } else if extent_key.end >= offset + buf.len() as u64 {
1102                // Deleted extent covers remainder, so we're done.
1103                break;
1104            }
1105
1106            iter.advance().await?;
1107        }
1108        reads.try_collect::<()>().await?;
1109        buf.as_mut_slice().fill(0);
1110        Ok(())
1111    }
1112
1113    /// Reads an entire attribute.
1114    pub async fn read_attr(&self, attribute_id: AttributeId) -> Result<Option<Box<[u8]>>, Error> {
1115        let store = self.store();
1116        let tree = &store.tree;
1117        let layer_set = tree.layer_set();
1118        let mut merger = layer_set.merger();
1119        let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1120        let iter = merger.query(Query::FullRange(&key)).await?;
1121        match iter.get() {
1122            Some(item) if item.key == &key => match item.value {
1123                ObjectValue::Attribute { .. } => Ok(Some(self.read_attr_from_iter(iter).await?)),
1124                // Attribute was deleted.
1125                ObjectValue::None => Ok(None),
1126                _ => Err(FxfsError::Inconsistent.into()),
1127            },
1128            _ => Ok(None),
1129        }
1130    }
1131
1132    /// Reads an entire attribute pointed to by `iter`. `iter` must be pointing to the
1133    /// `AttributeKey::Attribute` of the attribute.
1134    pub async fn read_attr_from_iter(
1135        &self,
1136        mut iter: MergerIterator<'_, '_, ObjectKey, ObjectValue>,
1137    ) -> Result<Box<[u8]>, Error> {
1138        let (mut buffer, size, attribute_id) = match iter.get() {
1139            Some(ItemRef {
1140                key:
1141                    ObjectKey {
1142                        object_id,
1143                        data: ObjectKeyData::Attribute(attribute_id, AttributeKey::Attribute),
1144                    },
1145                value: ObjectValue::Attribute { size, .. },
1146                ..
1147            }) if *object_id == self.object_id => {
1148                // TODO(https://fxbug.dev/42073113): size > max buffer size
1149                (
1150                    self.store()
1151                        .device
1152                        .allocate_buffer(round_up(*size, self.block_size()).unwrap() as usize)
1153                        .await,
1154                    *size as usize,
1155                    *attribute_id,
1156                )
1157            }
1158            _ => bail!(FxfsError::InvalidArgs),
1159        };
1160
1161        self.store().logical_read_ops.fetch_add(1, Ordering::Relaxed);
1162        let mut last_offset = 0;
1163        loop {
1164            iter.advance().await?;
1165            match iter.get() {
1166                Some(ItemRef {
1167                    key:
1168                        ObjectKey {
1169                            object_id,
1170                            data:
1171                                ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
1172                        },
1173                    value: ObjectValue::Extent(extent_value),
1174                    ..
1175                }) if *object_id == self.object_id() && *attr_id == attribute_id => {
1176                    if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1177                        let offset = extent_key.start as usize;
1178                        buffer.as_mut_slice()[last_offset..offset].fill(0);
1179                        let end = std::cmp::min(extent_key.end as usize, buffer.len());
1180                        let maybe_bitmap = match mode {
1181                            ExtentMode::OverwritePartial(bitmap) => {
1182                                // The caller has to adjust the bitmap if necessary, but we always
1183                                // start from the beginning of any extent, so we only truncate.
1184                                let mut read_bitmap = bitmap.clone();
1185                                read_bitmap.truncate(
1186                                    (end - extent_key.start as usize) / self.block_size() as usize,
1187                                );
1188                                Some(read_bitmap)
1189                            }
1190                            _ => None,
1191                        };
1192                        self.read_and_decrypt(
1193                            *device_offset,
1194                            extent_key.start,
1195                            buffer.subslice_mut(offset..end as usize),
1196                            *key_id,
1197                        )
1198                        .await?;
1199                        if let Some(bitmap) = maybe_bitmap {
1200                            apply_bitmap_zeroing(
1201                                self.block_size() as usize,
1202                                &bitmap,
1203                                buffer.subslice_mut(offset..end as usize),
1204                            );
1205                        }
1206                        last_offset = end;
1207                        if last_offset >= size {
1208                            break;
1209                        }
1210                    }
1211                }
1212                _ => break,
1213            }
1214        }
1215        buffer.as_mut_slice()[std::cmp::min(last_offset, size)..].fill(0);
1216        Ok(buffer.as_slice()[..size].into())
1217    }
1218
1219    /// Writes potentially unaligned data at `device_offset` and returns checksums if requested.
1220    /// The data will be encrypted if necessary.  `buf` is mutable as an optimization, since the
1221    /// write may require encryption, we can encrypt the buffer in-place rather than copying to
1222    /// another buffer if the write is already aligned.
1223    ///
1224    /// NOTE: This will not create keys if they are missing (it will fail with an error if that
1225    /// happens to be the case).
1226    pub async fn write_at(
1227        &self,
1228        attribute_id: AttributeId,
1229        offset: u64,
1230        buf: MutableBufferRef<'_>,
1231        key_id: Option<u64>,
1232        mut device_offset: u64,
1233    ) -> Result<MaybeChecksums, Error> {
1234        let mut transfer_buf;
1235        let block_size = self.block_size();
1236        let (range, mut transfer_buf_ref) =
1237            if offset % block_size == 0 && buf.len() as u64 % block_size == 0 {
1238                (offset..offset + buf.len() as u64, buf)
1239            } else {
1240                let (range, buf) = self.align_buffer(attribute_id, offset, buf.as_ref()).await?;
1241                transfer_buf = buf;
1242                device_offset -= offset - range.start;
1243                (range, transfer_buf.as_mut())
1244            };
1245
1246        let mut crypt_ctx = None;
1247        if let (_, Some(key)) = self.get_key(key_id).await? {
1248            if let Some(ctx) = key.crypt_ctx(self.object_id, range.start) {
1249                crypt_ctx = Some(ctx);
1250            } else {
1251                key.encrypt(
1252                    self.object_id,
1253                    device_offset,
1254                    range.start,
1255                    transfer_buf_ref.as_mut_slice(),
1256                )?;
1257            }
1258        }
1259        self.write_aligned(transfer_buf_ref.as_ref(), device_offset, crypt_ctx).await
1260    }
1261
1262    /// Writes to multiple ranges with data provided in `buf`. This function is specifically
1263    /// designed for migration purposes, allowing raw writes to the device without updating
1264    /// object metadata like allocated size or mtime. It's essential for scenarios where
1265    /// data needs to be transferred directly without triggering standard filesystem operations.
1266    #[cfg(feature = "migration")]
1267    pub async fn raw_multi_write(
1268        &self,
1269        transaction: &mut Transaction<'_>,
1270        attribute_id: AttributeId,
1271        key_id: Option<u64>,
1272        ranges: &[Range<u64>],
1273        buf: MutableBufferRef<'_>,
1274    ) -> Result<(), Error> {
1275        self.multi_write_internal(transaction, attribute_id, key_id, ranges, buf).await?;
1276        Ok(())
1277    }
1278
1279    /// This is a low-level write function that writes to multiple ranges. Users should generally
1280    /// use `multi_write` instead of this function as this does not update the object's allocated
1281    /// size, mtime, atime, etc.
1282    ///
1283    /// Returns (allocated, deallocated) bytes on success.
1284    async fn multi_write_internal(
1285        &self,
1286        transaction: &mut Transaction<'_>,
1287        attribute_id: AttributeId,
1288        key_id: Option<u64>,
1289        ranges: &[Range<u64>],
1290        mut buf: MutableBufferRef<'_>,
1291    ) -> Result<(u64, u64), Error> {
1292        if buf.is_empty() {
1293            return Ok((0, 0));
1294        }
1295        let block_size = self.block_size();
1296        let store = self.store();
1297        let store_id = store.store_object_id();
1298
1299        // The only key we allow to be created on-the-fly is a non permanent key wrapped with the
1300        // volume data key.
1301        let (key_id, key) = if key_id == Some(VOLUME_DATA_KEY_ID)
1302            && matches!(self.encryption, Encryption::CachedKeys)
1303        {
1304            (
1305                VOLUME_DATA_KEY_ID,
1306                Some(
1307                    self.get_or_create_key(transaction)
1308                        .await
1309                        .context("get_or_create_key failed")?,
1310                ),
1311            )
1312        } else {
1313            self.get_key(key_id).await?
1314        };
1315        if let Some(key) = &key {
1316            if !key.supports_inline_encryption() {
1317                let mut slice = buf.as_mut_slice();
1318                for r in ranges {
1319                    let l = r.end - r.start;
1320                    let (head, tail) = slice.split_at_mut(l as usize);
1321                    key.encrypt(
1322                        self.object_id,
1323                        0, /* TODO(https://fxbug.dev/421269588): plumb through device_offset. */
1324                        r.start,
1325                        head,
1326                    )?;
1327                    slice = tail;
1328                }
1329            }
1330        }
1331
1332        let mut allocated = 0;
1333        let allocator = store.allocator();
1334        let trace = self.trace();
1335        let mut writes = FuturesOrdered::new();
1336
1337        let mut logical_ranges = ranges.iter();
1338        let mut current_range = logical_ranges.next().unwrap().clone();
1339
1340        while !buf.is_empty() {
1341            let mut device_range = allocator
1342                .allocate(transaction, store_id, buf.len() as u64)
1343                .await
1344                .context("allocation failed")?;
1345            if trace {
1346                info!(
1347                    store_id,
1348                    oid = self.object_id(),
1349                    device_range:?,
1350                    len = device_range.end - device_range.start;
1351                    "A",
1352                );
1353            }
1354            let mut device_range_len = device_range.end - device_range.start;
1355            allocated += device_range_len;
1356            // If inline encryption is NOT supported, this loop should only happen once.
1357            while device_range_len > 0 {
1358                if current_range.end <= current_range.start {
1359                    current_range = logical_ranges.next().unwrap().clone();
1360                }
1361                let (crypt_ctx, split) = if let Some(key) = &key {
1362                    if key.supports_inline_encryption() {
1363                        let split = std::cmp::min(
1364                            current_range.end - current_range.start,
1365                            device_range_len,
1366                        );
1367                        let crypt_ctx = key.crypt_ctx(self.object_id, current_range.start);
1368                        current_range.start += split;
1369                        (crypt_ctx, split)
1370                    } else {
1371                        (None, device_range_len)
1372                    }
1373                } else {
1374                    (None, device_range_len)
1375                };
1376
1377                let (head, tail) = buf.split_at_mut(split as usize);
1378                buf = tail;
1379
1380                writes.push_back(async move {
1381                    let len = head.len() as u64;
1382                    Result::<_, Error>::Ok((
1383                        device_range.start,
1384                        len,
1385                        self.write_aligned(head.as_ref(), device_range.start, crypt_ctx).await?,
1386                    ))
1387                });
1388                device_range.start += split;
1389                device_range_len = device_range.end - device_range.start;
1390            }
1391        }
1392
1393        self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1394        let ((mutations, checksums), deallocated) = try_join!(
1395            async {
1396                let mut current_range = 0..0;
1397                let mut mutations = Vec::new();
1398                let mut out_checksums = Vec::new();
1399                let mut ranges = ranges.iter();
1400                while let Some((mut device_offset, mut len, mut checksums)) =
1401                    writes.try_next().await?
1402                {
1403                    while len > 0 {
1404                        if current_range.end <= current_range.start {
1405                            current_range = ranges.next().unwrap().clone();
1406                        }
1407                        let chunk_len = std::cmp::min(len, current_range.end - current_range.start);
1408                        let tail = checksums.split_off((chunk_len / block_size) as usize);
1409                        if let Some(checksums) = checksums.maybe_as_ref() {
1410                            out_checksums.push((
1411                                device_offset..device_offset + chunk_len,
1412                                checksums.to_owned(),
1413                            ));
1414                        }
1415                        mutations.push(Mutation::merge_object(
1416                            ObjectKey::extent(
1417                                self.object_id(),
1418                                attribute_id,
1419                                current_range.start..current_range.start + chunk_len,
1420                            ),
1421                            ObjectValue::Extent(ExtentValue::new(
1422                                device_offset,
1423                                checksums.to_mode(),
1424                                key_id,
1425                            )),
1426                        ));
1427                        checksums = tail;
1428                        device_offset += chunk_len;
1429                        len -= chunk_len;
1430                        current_range.start += chunk_len;
1431                    }
1432                }
1433                Result::<_, Error>::Ok((mutations, out_checksums))
1434            },
1435            async {
1436                let mut deallocated = 0;
1437                for r in ranges {
1438                    deallocated +=
1439                        self.deallocate_old_extents(transaction, attribute_id, r.clone()).await?;
1440                }
1441                Result::<_, Error>::Ok(deallocated)
1442            }
1443        )?;
1444
1445        for m in mutations {
1446            transaction.add(store_id, m);
1447        }
1448
1449        // Only store checksums in the journal if barriers are not enabled.
1450        if !store.filesystem().options().barriers_enabled {
1451            for (r, c) in checksums {
1452                transaction.add_checksum(r, c, true);
1453            }
1454        }
1455        Ok((allocated, deallocated))
1456    }
1457
1458    /// Writes to multiple ranges with data provided in `buf`.  The buffer can be modified in place
1459    /// if encryption takes place.  The ranges must all be aligned and no change to content size is
1460    /// applied; the caller is responsible for updating size if required.  If `key_id` is None, it
1461    /// means pick the default key for the object which is the fscrypt key if present, or the volume
1462    /// data key, or no key if it's an unencrypted file.
1463    pub async fn multi_write(
1464        &self,
1465        transaction: &mut Transaction<'_>,
1466        attribute_id: AttributeId,
1467        key_id: Option<u64>,
1468        ranges: &[Range<u64>],
1469        buf: MutableBufferRef<'_>,
1470    ) -> Result<(), Error> {
1471        let (allocated, deallocated) =
1472            self.multi_write_internal(transaction, attribute_id, key_id, ranges, buf).await?;
1473        if allocated == 0 && deallocated == 0 {
1474            return Ok(());
1475        }
1476        self.update_allocated_size(transaction, allocated, deallocated).await
1477    }
1478
1479    /// Write data to overwrite extents with the provided set of ranges. This makes a strong
1480    /// assumption that the ranges are actually going to be already allocated overwrite extents and
1481    /// will error out or do something wrong if they aren't. It also assumes the ranges passed to
1482    /// it are sorted.
1483    pub async fn multi_overwrite<'a>(
1484        &'a self,
1485        transaction: &mut Transaction<'a>,
1486        attr_id: AttributeId,
1487        ranges: &[Range<u64>],
1488        mut buf: MutableBufferRef<'_>,
1489    ) -> Result<(), Error> {
1490        if buf.is_empty() {
1491            return Ok(());
1492        }
1493        let block_size = self.block_size();
1494        let store = self.store();
1495        let tree = store.tree();
1496        let store_id = store.store_object_id();
1497
1498        let (key_id, key) = self.get_key(None).await?;
1499        if let Some(key) = &key {
1500            if !key.supports_inline_encryption() {
1501                let mut slice = buf.as_mut_slice();
1502                for r in ranges {
1503                    let l = r.end - r.start;
1504                    let (head, tail) = slice.split_at_mut(l as usize);
1505                    key.encrypt(
1506                        self.object_id,
1507                        0, /* TODO(https://fxbug.dev/421269588): plumb through device_offset. */
1508                        r.start,
1509                        head,
1510                    )?;
1511                    slice = tail;
1512                }
1513            }
1514        }
1515
1516        let mut range_iter = ranges.iter();
1517        // There should be at least one range if the buffer has data in it
1518        let mut target_range = range_iter.next().unwrap().clone();
1519        let mut mutations = Vec::new();
1520        let writes = FuturesUnordered::new();
1521
1522        let layer_set = tree.layer_set();
1523        let mut merger = layer_set.merger();
1524        let mut iter = merger
1525            .query(Query::FullRange(&ObjectKey::attribute(
1526                self.object_id(),
1527                attr_id,
1528                AttributeKey::Extent(Extent::search_key_from_offset(target_range.start)),
1529            )))
1530            .await?;
1531
1532        loop {
1533            match iter.get() {
1534                Some(ItemRef {
1535                    key:
1536                        ObjectKey {
1537                            object_id,
1538                            data:
1539                                ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent)),
1540                        },
1541                    value: ObjectValue::Extent(extent_value),
1542                    ..
1543                }) if *object_id == self.object_id() && *attribute_id == attr_id => {
1544                    // If this extent ends before the target range starts (not possible on the
1545                    // first loop because of the query parameters but possible on further loops),
1546                    // advance until we find a the next one we care about.
1547                    if extent.end <= target_range.start {
1548                        iter.advance().await?;
1549                        continue;
1550                    }
1551                    let (device_offset, mode) = match extent_value {
1552                        ExtentValue::None => {
1553                            return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1554                                format!(
1555                                    "multi_overwrite failed: target_range ({}, {}) overlaps with \
1556                                deleted extent found at ({}, {})",
1557                                    target_range.start, target_range.end, extent.start, extent.end,
1558                                )
1559                            });
1560                        }
1561                        ExtentValue::Some { device_offset, mode, .. } => (device_offset, mode),
1562                    };
1563                    // The ranges passed to this function should already by allocated, so
1564                    // extent records should exist for them.
1565                    if extent.start > target_range.start {
1566                        return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1567                            format!(
1568                                "multi_overwrite failed: target range ({}, {}) starts before first \
1569                            extent found at ({}, {})",
1570                                target_range.start, target_range.end, extent.start, extent.end,
1571                            )
1572                        });
1573                    }
1574                    let mut bitmap = match mode {
1575                        ExtentMode::Raw | ExtentMode::Cow(_) => {
1576                            return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1577                                format!(
1578                                    "multi_overwrite failed: \
1579                            extent from ({}, {}) which overlaps target range ({}, {}) had the \
1580                            wrong extent mode",
1581                                    extent.start, extent.end, target_range.start, target_range.end,
1582                                )
1583                            });
1584                        }
1585                        ExtentMode::OverwritePartial(bitmap) => {
1586                            OverwriteBitmaps::new(bitmap.clone())
1587                        }
1588                        ExtentMode::Overwrite => OverwriteBitmaps::None,
1589                    };
1590                    loop {
1591                        let offset_within_extent = target_range.start - extent.start;
1592                        let bitmap_offset = offset_within_extent / block_size;
1593                        let write_device_offset = *device_offset + offset_within_extent;
1594                        let write_end = min(extent.end, target_range.end);
1595                        let write_len = write_end - target_range.start;
1596                        let write_device_range =
1597                            write_device_offset..write_device_offset + write_len;
1598                        let (current_buf, remaining_buf) = buf.split_at_mut(write_len as usize);
1599
1600                        bitmap.set_offset(bitmap_offset as usize);
1601                        let checksum_ranges = ChecksumRangeChunk::group_first_write_ranges(
1602                            &mut bitmap,
1603                            block_size,
1604                            write_device_range,
1605                        );
1606
1607                        let crypt_ctx = if let Some(key) = &key {
1608                            key.crypt_ctx(self.object_id, target_range.start)
1609                        } else {
1610                            None
1611                        };
1612
1613                        writes.push(async move {
1614                            let maybe_checksums = self
1615                                .write_aligned(current_buf.as_ref(), write_device_offset, crypt_ctx)
1616                                .await?;
1617                            Ok::<_, Error>(match maybe_checksums {
1618                                MaybeChecksums::None => Vec::new(),
1619                                MaybeChecksums::Fletcher(checksums) => checksum_ranges
1620                                    .into_iter()
1621                                    .map(
1622                                        |ChecksumRangeChunk {
1623                                             checksum_range,
1624                                             device_range,
1625                                             is_first_write,
1626                                         }| {
1627                                            (
1628                                                device_range,
1629                                                checksums[checksum_range].to_vec(),
1630                                                is_first_write,
1631                                            )
1632                                        },
1633                                    )
1634                                    .collect(),
1635                            })
1636                        });
1637                        buf = remaining_buf;
1638                        target_range.start += write_len;
1639                        if target_range.start == target_range.end {
1640                            match range_iter.next() {
1641                                None => break,
1642                                Some(next_range) => target_range = next_range.clone(),
1643                            }
1644                        }
1645                        if extent.end <= target_range.start {
1646                            break;
1647                        }
1648                    }
1649                    if let Some((mut bitmap, write_bitmap)) = bitmap.take_bitmaps() {
1650                        if bitmap.or(&write_bitmap) {
1651                            let mode = if bitmap.all() {
1652                                ExtentMode::Overwrite
1653                            } else {
1654                                ExtentMode::OverwritePartial(bitmap)
1655                            };
1656                            mutations.push(Mutation::merge_object(
1657                                ObjectKey::extent(self.object_id(), attr_id, extent.clone().into()),
1658                                ObjectValue::Extent(ExtentValue::new(*device_offset, mode, key_id)),
1659                            ))
1660                        }
1661                    }
1662                    if target_range.start == target_range.end {
1663                        break;
1664                    }
1665                    iter.advance().await?;
1666                }
1667                // We've either run past the end of the existing extents or something is wrong with
1668                // the tree. The main section should break if it finishes the ranges, so either
1669                // case, this is an error.
1670                _ => bail!(anyhow!(FxfsError::Internal).context(
1671                    "found a non-extent object record while there were still ranges to process"
1672                )),
1673            }
1674        }
1675
1676        let checksums = writes.try_collect::<Vec<_>>().await?;
1677        // Only store checksums in the journal if barriers are not enabled.
1678        if !store.filesystem().options().barriers_enabled {
1679            for (r, c, first_write) in checksums.into_iter().flatten() {
1680                transaction.add_checksum(r, c, first_write);
1681            }
1682        }
1683
1684        for m in mutations {
1685            transaction.add(store_id, m);
1686        }
1687
1688        Ok(())
1689    }
1690
1691    /// Writes an attribute that should not already exist and therefore does not require trimming.
1692    /// Breaks up the write into multiple transactions if `data.len()` is larger than `batch_size`.
1693    /// If writing the attribute requires multiple transactions, adds the attribute to the
1694    /// graveyard. The caller is responsible for removing the attribute from the graveyard when it
1695    /// commits the last transaction.  This always writes using a key wrapped with the volume data
1696    /// key.
1697    #[trace]
1698    pub async fn write_new_attr_in_batches<'a>(
1699        &'a self,
1700        transaction: &mut Transaction<'a>,
1701        attribute_id: AttributeId,
1702        data: &[u8],
1703        batch_size: usize,
1704    ) -> Result<(), Error> {
1705        transaction.add(
1706            self.store().store_object_id,
1707            Mutation::replace_or_insert_object(
1708                ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1709                ObjectValue::attribute(data.len() as u64, false),
1710            ),
1711        );
1712        let chunks = data.chunks(batch_size);
1713        let num_chunks = chunks.len();
1714        if num_chunks > 1 {
1715            transaction.add(
1716                self.store().store_object_id,
1717                Mutation::replace_or_insert_object(
1718                    ObjectKey::graveyard_attribute_entry(
1719                        self.store().graveyard_directory_object_id(),
1720                        self.object_id(),
1721                        attribute_id,
1722                    ),
1723                    ObjectValue::Some,
1724                ),
1725            );
1726        }
1727        let mut start_offset = 0;
1728        for (i, chunk) in chunks.enumerate() {
1729            let rounded_len = round_up(chunk.len() as u64, self.block_size()).unwrap();
1730            let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1731            let slice = buffer.as_mut_slice();
1732            slice[..chunk.len()].copy_from_slice(chunk);
1733            slice[chunk.len()..].fill(0);
1734            self.multi_write(
1735                transaction,
1736                attribute_id,
1737                Some(VOLUME_DATA_KEY_ID),
1738                &[start_offset..start_offset + rounded_len],
1739                buffer.as_mut(),
1740            )
1741            .await?;
1742            start_offset += rounded_len;
1743            // Do not commit the last chunk.
1744            if i < num_chunks - 1 {
1745                transaction.commit_and_continue().await?;
1746            }
1747        }
1748        Ok(())
1749    }
1750
1751    /// Writes an entire attribute. Returns whether or not the attribute needs to continue being
1752    /// trimmed - if the new data is shorter than the old data, this will trim any extents beyond
1753    /// the end of the new size, but if there were too many for a single transaction, a commit
1754    /// needs to be made before trimming again, so the responsibility is left to the caller so as
1755    /// to not accidentally split the transaction when it's not in a consistent state.  This will
1756    /// write using the volume data key; the fscrypt key is not supported.
1757    pub async fn write_attr(
1758        &self,
1759        transaction: &mut Transaction<'_>,
1760        attribute_id: AttributeId,
1761        data: &[u8],
1762    ) -> Result<NeedsTrim, Error> {
1763        let rounded_len = round_up(data.len() as u64, self.block_size()).unwrap();
1764        let store = self.store();
1765        let tree = store.tree();
1766        let should_trim = if let Some(item) = tree
1767            .find(&ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute))
1768            .await?
1769        {
1770            match item.value {
1771                ObjectValue::Attribute { size: _, has_overwrite_extents: true } => {
1772                    bail!(
1773                        anyhow!(FxfsError::Inconsistent)
1774                            .context("write_attr on an attribute with overwrite extents")
1775                    )
1776                }
1777                ObjectValue::Attribute { size, .. } => (data.len() as u64) < size,
1778                _ => bail!(FxfsError::Inconsistent),
1779            }
1780        } else {
1781            false
1782        };
1783        let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1784        let slice = buffer.as_mut_slice();
1785        slice[..data.len()].copy_from_slice(data);
1786        slice[data.len()..].fill(0);
1787        self.multi_write(
1788            transaction,
1789            attribute_id,
1790            Some(VOLUME_DATA_KEY_ID),
1791            &[0..rounded_len],
1792            buffer.as_mut(),
1793        )
1794        .await?;
1795        transaction.add(
1796            self.store().store_object_id,
1797            Mutation::replace_or_insert_object(
1798                ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1799                ObjectValue::attribute(data.len() as u64, false),
1800            ),
1801        );
1802        if should_trim {
1803            self.shrink(transaction, attribute_id, data.len() as u64).await
1804        } else {
1805            Ok(NeedsTrim(false))
1806        }
1807    }
1808
1809    pub async fn list_extended_attributes(&self) -> Result<Vec<Vec<u8>>, Error> {
1810        let layer_set = self.store().tree().layer_set();
1811        let mut merger = layer_set.merger();
1812        // Seek to the first extended attribute key for this object.
1813        let mut iter = merger
1814            .query(Query::FullRange(&ObjectKey::extended_attribute(self.object_id(), Vec::new())))
1815            .await?;
1816        let mut out = Vec::new();
1817        while let Some(item) = iter.get() {
1818            // Skip deleted extended attributes.
1819            if item.value != &ObjectValue::None {
1820                match item.key {
1821                    ObjectKey { object_id, data: ObjectKeyData::ExtendedAttribute { name } }
1822                        if *object_id == self.object_id() =>
1823                    {
1824                        out.push(name.clone());
1825                    }
1826                    // Once we hit a record belonging to another object, or one that is not an
1827                    // extended attribute key, we have reached the end of this object's extended
1828                    // attributes. Subsequent objects' records will start with lower variants
1829                    // (e.g. ObjectKeyData::Object) which trigger this break.
1830                    _ => break,
1831                }
1832            }
1833            iter.advance().await?;
1834        }
1835        Ok(out)
1836    }
1837
1838    /// Looks up the values for the extended attribute `fio::SELINUX_CONTEXT_NAME`, returning it
1839    /// if it is found inline. If it is not inline, it will request use of the
1840    /// `get_extended_attributes` method. If the entry doesn't exist at all, returns None.
1841    pub async fn get_inline_selinux_context(&self) -> Result<Option<fio::SelinuxContext>, Error> {
1842        // This optimization is only useful as long as the attribute is smaller than inline sizes.
1843        // Avoid reading the data out of the attributes.
1844        const_assert!(fio::MAX_SELINUX_CONTEXT_ATTRIBUTE_LEN as usize <= MAX_INLINE_XATTR_SIZE);
1845        let item = match self
1846            .store()
1847            .tree()
1848            .find(&ObjectKey::extended_attribute(
1849                self.object_id(),
1850                fio::SELINUX_CONTEXT_NAME.into(),
1851            ))
1852            .await?
1853        {
1854            Some(item) => item,
1855            None => return Ok(None),
1856        };
1857        match item.value {
1858            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => {
1859                Ok(Some(fio::SelinuxContext::Data(value)))
1860            }
1861            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(_)) => {
1862                Ok(Some(fio::SelinuxContext::UseExtendedAttributes(fio::EmptyStruct {})))
1863            }
1864            _ => {
1865                bail!(
1866                    anyhow!(FxfsError::Inconsistent)
1867                        .context("get_inline_extended_attribute: Expected ExtendedAttribute value")
1868                )
1869            }
1870        }
1871    }
1872
1873    pub async fn get_extended_attribute(&self, name: Vec<u8>) -> Result<Vec<u8>, Error> {
1874        let item = self
1875            .store()
1876            .tree()
1877            .find(&ObjectKey::extended_attribute(self.object_id(), name))
1878            .await?
1879            .ok_or(FxfsError::NotFound)?;
1880        match item.value {
1881            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => Ok(value),
1882            ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1883                Ok(self.read_attr(id).await?.ok_or(FxfsError::Inconsistent)?.into_vec())
1884            }
1885            _ => {
1886                bail!(
1887                    anyhow!(FxfsError::Inconsistent)
1888                        .context("get_extended_attribute: Expected ExtendedAttribute value")
1889                )
1890            }
1891        }
1892    }
1893
1894    pub async fn set_extended_attribute(
1895        &self,
1896        name: Vec<u8>,
1897        value: Vec<u8>,
1898        mode: SetExtendedAttributeMode,
1899    ) -> Result<(), Error> {
1900        let store = self.store();
1901        let fs = store.filesystem();
1902        // NB: We need to take this lock before we potentially look up the value to prevent racing
1903        // with another set.
1904        let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
1905        let mut transaction = fs.new_transaction(keys, Options::default()).await?;
1906        self.set_extended_attribute_impl(name, value, mode, &mut transaction).await?;
1907        transaction.commit().await?;
1908        Ok(())
1909    }
1910
1911    async fn set_extended_attribute_impl(
1912        &self,
1913        name: Vec<u8>,
1914        value: Vec<u8>,
1915        mode: SetExtendedAttributeMode,
1916        transaction: &mut Transaction<'_>,
1917    ) -> Result<(), Error> {
1918        ensure!(name.len() <= MAX_XATTR_NAME_SIZE, FxfsError::TooBig);
1919        ensure!(value.len() <= MAX_XATTR_VALUE_SIZE, FxfsError::TooBig);
1920        let tree = self.store().tree();
1921        let object_key = ObjectKey::extended_attribute(self.object_id(), name);
1922
1923        let existing_attribute_id = {
1924            let (found, existing_attribute_id) = match tree.find(&object_key).await? {
1925                None => (false, None),
1926                Some(Item { value, .. }) => (
1927                    true,
1928                    match value {
1929                        ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
1930                        ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1931                            Some(id)
1932                        }
1933                        _ => bail!(
1934                            anyhow!(FxfsError::Inconsistent)
1935                                .context("expected extended attribute value")
1936                        ),
1937                    },
1938                ),
1939            };
1940            match mode {
1941                SetExtendedAttributeMode::Create if found => {
1942                    bail!(FxfsError::AlreadyExists)
1943                }
1944                SetExtendedAttributeMode::Replace if !found => {
1945                    bail!(FxfsError::NotFound)
1946                }
1947                _ => (),
1948            }
1949            existing_attribute_id
1950        };
1951
1952        if let Some(attribute_id) = existing_attribute_id {
1953            // If we already have an attribute id allocated for this extended attribute, we always
1954            // use it, even if the value has shrunk enough to be stored inline. We don't need to
1955            // worry about trimming here for the same reason we don't need to worry about it when
1956            // we delete xattrs - they simply aren't large enough to ever need more than one
1957            // transaction.
1958            let _ = self.write_attr(transaction, attribute_id, &value).await?;
1959        } else if value.len() <= MAX_INLINE_XATTR_SIZE {
1960            transaction.add(
1961                self.store().store_object_id(),
1962                Mutation::replace_or_insert_object(
1963                    object_key,
1964                    ObjectValue::inline_extended_attribute(value),
1965                ),
1966            );
1967        } else {
1968            // If there isn't an existing attribute id and we are going to store the value in
1969            // an attribute, find the next empty attribute id in the range. We search for fxfs
1970            // attribute records specifically, instead of the extended attribute records, because
1971            // even if the extended attribute record is removed the attribute may not be fully
1972            // trimmed yet.
1973            let mut attribute_id = AttributeId::XATTR_RANGE_START;
1974            let layer_set = tree.layer_set();
1975            let mut merger = layer_set.merger();
1976            let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1977            let mut iter = merger.query(Query::FullRange(&key)).await?;
1978            loop {
1979                match iter.get() {
1980                    // None means the key passed to seek wasn't found. That means the first
1981                    // attribute is available and we can just stop right away.
1982                    None => break,
1983                    Some(ItemRef {
1984                        key: ObjectKey { object_id, data: ObjectKeyData::Attribute(attr_id, _) },
1985                        value,
1986                        ..
1987                    }) if *object_id == self.object_id() => {
1988                        if matches!(value, ObjectValue::None) {
1989                            // This attribute was once used but is now deleted, so it's safe to use
1990                            // again.
1991                            break;
1992                        }
1993                        if attribute_id < *attr_id {
1994                            // We found a gap - use it.
1995                            break;
1996                        } else if attribute_id == *attr_id {
1997                            // This attribute id is in use, try the next one.
1998                            attribute_id = attribute_id.next();
1999                            if attribute_id == AttributeId::XATTR_RANGE_END {
2000                                bail!(FxfsError::NoSpace);
2001                            }
2002                        }
2003                        // If we don't hit either of those cases, we are still moving through the
2004                        // extent keys for the current attribute, so just keep advancing until the
2005                        // attribute id changes.
2006                    }
2007                    // As we are working our way through the iterator, if we hit anything that
2008                    // doesn't have our object id or attribute key data, we've gone past the end of
2009                    // this section and can stop.
2010                    _ => break,
2011                }
2012                iter.advance().await?;
2013            }
2014
2015            // We know this won't need trimming because it's a new attribute.
2016            let _ = self.write_attr(transaction, attribute_id, &value).await?;
2017            transaction.add(
2018                self.store().store_object_id(),
2019                Mutation::replace_or_insert_object(
2020                    object_key,
2021                    ObjectValue::extended_attribute(attribute_id),
2022                ),
2023            );
2024        }
2025
2026        Ok(())
2027    }
2028
2029    pub async fn remove_extended_attribute(&self, name: Vec<u8>) -> Result<(), Error> {
2030        let store = self.store();
2031        let tree = store.tree();
2032        let object_key = ObjectKey::extended_attribute(self.object_id(), name);
2033
2034        // NB: The API says we have to return an error if the attribute doesn't exist, so we have
2035        // to look it up first to make sure we have a record of it before we delete it. Make sure
2036        // we take a lock and make a transaction before we do so we don't race with other
2037        // operations.
2038        let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
2039        let mut transaction = store.filesystem().new_transaction(keys, Options::default()).await?;
2040
2041        let attribute_to_delete =
2042            match tree.find(&object_key).await?.ok_or(FxfsError::NotFound)?.value {
2043                ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => Some(id),
2044                ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
2045                _ => {
2046                    bail!(
2047                        anyhow!(FxfsError::Inconsistent)
2048                            .context("remove_extended_attribute: Expected ExtendedAttribute value")
2049                    )
2050                }
2051            };
2052
2053        transaction.add(
2054            store.store_object_id(),
2055            Mutation::replace_or_insert_object(object_key, ObjectValue::None),
2056        );
2057
2058        // If the attribute wasn't stored inline, we need to deallocate all the extents too. This
2059        // would normally need to interact with the graveyard for correctness - if there are too
2060        // many extents to delete to fit in a single transaction then we could potentially have
2061        // consistency issues. However, the maximum size of an extended attribute is small enough
2062        // that it will never come close to that limit even in the worst case, so we just delete
2063        // everything in one shot.
2064        if let Some(attribute_id) = attribute_to_delete {
2065            let trim_result = store
2066                .trim_some(
2067                    &mut transaction,
2068                    self.object_id(),
2069                    attribute_id,
2070                    TrimMode::FromOffset(0),
2071                )
2072                .await?;
2073            // In case you didn't read the comment above - this should not be used to delete
2074            // arbitrary attributes!
2075            assert_matches!(trim_result, TrimResult::Done(_));
2076            transaction.add(
2077                store.store_object_id(),
2078                Mutation::replace_or_insert_object(
2079                    ObjectKey::attribute(self.object_id, attribute_id, AttributeKey::Attribute),
2080                    ObjectValue::None,
2081                ),
2082            );
2083        }
2084
2085        transaction.commit().await?;
2086        Ok(())
2087    }
2088
2089    /// Returns a future that will pre-fetches the keys so as to avoid paying the performance
2090    /// penalty later. Must ensure that the object is not removed before the future completes.
2091    pub fn pre_fetch_keys(&self) -> Option<impl Future<Output = ()> + use<S>> {
2092        if let Encryption::CachedKeys = self.encryption {
2093            let owner = self.owner.clone();
2094            let object_id = self.object_id;
2095            Some(async move {
2096                let store = owner.as_ref().as_ref();
2097                if let Some(crypt) = store.crypt() {
2098                    let _ = store
2099                        .key_manager
2100                        .get_keys(
2101                            object_id,
2102                            crypt.as_ref(),
2103                            &mut Some(async || store.get_keys(object_id).await),
2104                            /* permanent= */ false,
2105                            /* force= */ false,
2106                        )
2107                        .await;
2108                }
2109            })
2110        } else {
2111            None
2112        }
2113    }
2114}
2115
2116impl<S: HandleOwner> Drop for StoreObjectHandle<S> {
2117    fn drop(&mut self) {
2118        if self.is_encrypted() {
2119            let _ = self.store().key_manager.remove(self.object_id);
2120        }
2121    }
2122}
2123
2124/// When truncating an object, sometimes it might not be possible to complete the transaction in a
2125/// single transaction, in which case the caller needs to finish trimming the object in subsequent
2126/// transactions (by calling ObjectStore::trim).
2127#[must_use]
2128pub struct NeedsTrim(pub bool);
2129
2130#[cfg(test)]
2131mod tests {
2132    use super::{ChecksumRangeChunk, OverwriteBitmaps};
2133    use crate::errors::FxfsError;
2134    use crate::filesystem::{FxFilesystem, OpenFxFilesystem};
2135    use crate::object_handle::ObjectHandle;
2136    use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
2137    use crate::object_store::transaction::{Mutation, Options, lock_keys};
2138    use crate::object_store::{
2139        AttributeId, AttributeKey, DataObjectHandle, Directory, HandleOptions, LockKey, ObjectKey,
2140        ObjectStore, ObjectValue, SetExtendedAttributeMode, StoreObjectHandle,
2141    };
2142    use bit_vec::BitVec;
2143    use fuchsia_async as fasync;
2144    use futures::join;
2145    use std::sync::Arc;
2146    use storage_device::DeviceHolder;
2147    use storage_device::fake_device::FakeDevice;
2148
2149    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2150    const TEST_OBJECT_NAME: &str = "foo";
2151
2152    fn is_error(actual: anyhow::Error, expected: FxfsError) {
2153        assert_eq!(*actual.root_cause().downcast_ref::<FxfsError>().unwrap(), expected)
2154    }
2155
2156    async fn test_filesystem() -> OpenFxFilesystem {
2157        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2158        FxFilesystem::new_empty(device).await.expect("new_empty failed")
2159    }
2160
2161    async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
2162    {
2163        let fs = test_filesystem().await;
2164        let store = fs.root_store();
2165
2166        let mut transaction = fs
2167            .clone()
2168            .new_transaction(
2169                lock_keys![LockKey::object(
2170                    store.store_object_id(),
2171                    store.root_directory_object_id()
2172                )],
2173                Options::default(),
2174            )
2175            .await
2176            .expect("new_transaction failed");
2177
2178        let object =
2179            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2180                .await
2181                .expect("create_object failed");
2182
2183        let root_directory =
2184            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2185        root_directory
2186            .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
2187            .await
2188            .expect("add_child_file failed");
2189
2190        transaction.commit().await.expect("commit failed");
2191
2192        (fs, object)
2193    }
2194
2195    #[fuchsia::test(threads = 3)]
2196    async fn extended_attribute_double_remove() {
2197        // This test is intended to trip a potential race condition in remove. Removing an
2198        // attribute that doesn't exist is an error, so we need to check before we remove, but if
2199        // we aren't careful, two parallel removes might both succeed in the check and then both
2200        // remove the value.
2201        let (fs, object) = test_filesystem_and_empty_object().await;
2202        let basic = Arc::new(StoreObjectHandle::new(
2203            object.owner().clone(),
2204            object.object_id(),
2205            /* permanent_keys: */ false,
2206            HandleOptions::default(),
2207            false,
2208        ));
2209        let basic_a = basic.clone();
2210        let basic_b = basic.clone();
2211
2212        basic
2213            .set_extended_attribute(
2214                b"security.selinux".to_vec(),
2215                b"bar".to_vec(),
2216                SetExtendedAttributeMode::Set,
2217            )
2218            .await
2219            .expect("failed to set attribute");
2220
2221        // Try to remove the attribute twice at the same time. One should succeed in the race and
2222        // return Ok, and the other should fail the race and return NOT_FOUND.
2223        let a_task = fasync::Task::spawn(async move {
2224            basic_a.remove_extended_attribute(b"security.selinux".to_vec()).await
2225        });
2226        let b_task = fasync::Task::spawn(async move {
2227            basic_b.remove_extended_attribute(b"security.selinux".to_vec()).await
2228        });
2229        match join!(a_task, b_task) {
2230            (Ok(()), Ok(())) => panic!("both remove calls succeeded"),
2231            (Err(_), Err(_)) => panic!("both remove calls failed"),
2232
2233            (Ok(()), Err(e)) => is_error(e, FxfsError::NotFound),
2234            (Err(e), Ok(())) => is_error(e, FxfsError::NotFound),
2235        }
2236
2237        fs.close().await.expect("Close failed");
2238    }
2239
2240    #[fuchsia::test(threads = 3)]
2241    async fn extended_attribute_double_create() {
2242        // This test is intended to trip a potential race in set when using the create flag,
2243        // similar to above. If the create mode is set, we need to check that the attribute isn't
2244        // already created, but if two parallel creates both succeed in that check, and we aren't
2245        // careful with locking, they will both succeed and one will overwrite the other.
2246        let (fs, object) = test_filesystem_and_empty_object().await;
2247        let basic = Arc::new(StoreObjectHandle::new(
2248            object.owner().clone(),
2249            object.object_id(),
2250            /* permanent_keys: */ false,
2251            HandleOptions::default(),
2252            false,
2253        ));
2254        let basic_a = basic.clone();
2255        let basic_b = basic.clone();
2256
2257        // Try to set the attribute twice at the same time. One should succeed in the race and
2258        // return Ok, and the other should fail the race and return ALREADY_EXISTS.
2259        let a_task = fasync::Task::spawn(async move {
2260            basic_a
2261                .set_extended_attribute(
2262                    b"security.selinux".to_vec(),
2263                    b"one".to_vec(),
2264                    SetExtendedAttributeMode::Create,
2265                )
2266                .await
2267        });
2268        let b_task = fasync::Task::spawn(async move {
2269            basic_b
2270                .set_extended_attribute(
2271                    b"security.selinux".to_vec(),
2272                    b"two".to_vec(),
2273                    SetExtendedAttributeMode::Create,
2274                )
2275                .await
2276        });
2277        match join!(a_task, b_task) {
2278            (Ok(()), Ok(())) => panic!("both set calls succeeded"),
2279            (Err(_), Err(_)) => panic!("both set calls failed"),
2280
2281            (Ok(()), Err(e)) => {
2282                assert_eq!(
2283                    basic
2284                        .get_extended_attribute(b"security.selinux".to_vec())
2285                        .await
2286                        .expect("failed to get xattr"),
2287                    b"one"
2288                );
2289                is_error(e, FxfsError::AlreadyExists);
2290            }
2291            (Err(e), Ok(())) => {
2292                assert_eq!(
2293                    basic
2294                        .get_extended_attribute(b"security.selinux".to_vec())
2295                        .await
2296                        .expect("failed to get xattr"),
2297                    b"two"
2298                );
2299                is_error(e, FxfsError::AlreadyExists);
2300            }
2301        }
2302
2303        fs.close().await.expect("Close failed");
2304    }
2305
2306    struct TestAttr {
2307        name: Vec<u8>,
2308        value: Vec<u8>,
2309    }
2310
2311    impl TestAttr {
2312        fn new(name: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> Self {
2313            Self { name: name.as_ref().to_vec(), value: value.as_ref().to_vec() }
2314        }
2315        fn name(&self) -> Vec<u8> {
2316            self.name.clone()
2317        }
2318        fn value(&self) -> Vec<u8> {
2319            self.value.clone()
2320        }
2321    }
2322
2323    #[fuchsia::test]
2324    async fn extended_attributes() {
2325        let (fs, object) = test_filesystem_and_empty_object().await;
2326
2327        let test_attr = TestAttr::new(b"security.selinux", b"foo");
2328
2329        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2330        is_error(
2331            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2332            FxfsError::NotFound,
2333        );
2334
2335        object
2336            .set_extended_attribute(
2337                test_attr.name(),
2338                test_attr.value(),
2339                SetExtendedAttributeMode::Set,
2340            )
2341            .await
2342            .unwrap();
2343        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2344        assert_eq!(
2345            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2346            test_attr.value()
2347        );
2348
2349        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2350        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2351        is_error(
2352            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2353            FxfsError::NotFound,
2354        );
2355
2356        // Make sure we can object the same attribute being set again.
2357        object
2358            .set_extended_attribute(
2359                test_attr.name(),
2360                test_attr.value(),
2361                SetExtendedAttributeMode::Set,
2362            )
2363            .await
2364            .unwrap();
2365        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2366        assert_eq!(
2367            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2368            test_attr.value()
2369        );
2370
2371        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2372        assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2373        is_error(
2374            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2375            FxfsError::NotFound,
2376        );
2377
2378        fs.close().await.expect("close failed");
2379    }
2380
2381    #[fuchsia::test]
2382    async fn large_extended_attribute() {
2383        let (fs, object) = test_filesystem_and_empty_object().await;
2384
2385        let test_attr = TestAttr::new(b"security.selinux", vec![3u8; 300]);
2386
2387        object
2388            .set_extended_attribute(
2389                test_attr.name(),
2390                test_attr.value(),
2391                SetExtendedAttributeMode::Set,
2392            )
2393            .await
2394            .unwrap();
2395        assert_eq!(
2396            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2397            test_attr.value()
2398        );
2399
2400        // Probe the fxfs attributes to make sure it did the expected thing. This relies on inside
2401        // knowledge of how the attribute id is chosen.
2402        assert_eq!(
2403            object
2404                .read_attr(AttributeId::XATTR_RANGE_START)
2405                .await
2406                .expect("read_attr failed")
2407                .expect("read_attr returned none")
2408                .into_vec(),
2409            test_attr.value()
2410        );
2411
2412        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2413        is_error(
2414            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2415            FxfsError::NotFound,
2416        );
2417
2418        // Make sure we can object the same attribute being set again.
2419        object
2420            .set_extended_attribute(
2421                test_attr.name(),
2422                test_attr.value(),
2423                SetExtendedAttributeMode::Set,
2424            )
2425            .await
2426            .unwrap();
2427        assert_eq!(
2428            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2429            test_attr.value()
2430        );
2431        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2432        is_error(
2433            object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2434            FxfsError::NotFound,
2435        );
2436
2437        fs.close().await.expect("close failed");
2438    }
2439
2440    #[fuchsia::test]
2441    async fn multiple_extended_attributes() {
2442        let (fs, object) = test_filesystem_and_empty_object().await;
2443
2444        let attrs = [
2445            TestAttr::new(b"security.selinux", b"foo"),
2446            TestAttr::new(b"large.attribute", vec![3u8; 300]),
2447            TestAttr::new(b"an.attribute", b"asdf"),
2448            TestAttr::new(b"user.big", vec![5u8; 288]),
2449            TestAttr::new(b"user.tiny", b"smol"),
2450            TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2451            TestAttr::new(b"also big", vec![7u8; 500]),
2452            TestAttr::new(b"all.ones", vec![1u8; 11111]),
2453        ];
2454
2455        for i in 0..attrs.len() {
2456            object
2457                .set_extended_attribute(
2458                    attrs[i].name(),
2459                    attrs[i].value(),
2460                    SetExtendedAttributeMode::Set,
2461                )
2462                .await
2463                .unwrap();
2464            assert_eq!(
2465                object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2466                attrs[i].value()
2467            );
2468        }
2469
2470        for i in 0..attrs.len() {
2471            // Make sure expected attributes are still available.
2472            let mut found_attrs = object.list_extended_attributes().await.unwrap();
2473            let mut expected_attrs: Vec<Vec<u8>> = attrs.iter().skip(i).map(|a| a.name()).collect();
2474            found_attrs.sort();
2475            expected_attrs.sort();
2476            assert_eq!(found_attrs, expected_attrs);
2477            for j in i..attrs.len() {
2478                assert_eq!(
2479                    object.get_extended_attribute(attrs[j].name()).await.unwrap(),
2480                    attrs[j].value()
2481                );
2482            }
2483
2484            object.remove_extended_attribute(attrs[i].name()).await.expect("failed to remove");
2485            is_error(
2486                object.get_extended_attribute(attrs[i].name()).await.unwrap_err(),
2487                FxfsError::NotFound,
2488            );
2489        }
2490
2491        fs.close().await.expect("close failed");
2492    }
2493
2494    #[fuchsia::test]
2495    async fn multiple_extended_attributes_delete() {
2496        let (fs, object) = test_filesystem_and_empty_object().await;
2497        let store = object.owner().clone();
2498
2499        let attrs = [
2500            TestAttr::new(b"security.selinux", b"foo"),
2501            TestAttr::new(b"large.attribute", vec![3u8; 300]),
2502            TestAttr::new(b"an.attribute", b"asdf"),
2503            TestAttr::new(b"user.big", vec![5u8; 288]),
2504            TestAttr::new(b"user.tiny", b"smol"),
2505            TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2506            TestAttr::new(b"also big", vec![7u8; 500]),
2507            TestAttr::new(b"all.ones", vec![1u8; 11111]),
2508        ];
2509
2510        for i in 0..attrs.len() {
2511            object
2512                .set_extended_attribute(
2513                    attrs[i].name(),
2514                    attrs[i].value(),
2515                    SetExtendedAttributeMode::Set,
2516                )
2517                .await
2518                .unwrap();
2519            assert_eq!(
2520                object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2521                attrs[i].value()
2522            );
2523        }
2524
2525        // Unlink the file
2526        let root_directory =
2527            Directory::open(object.owner(), object.store().root_directory_object_id())
2528                .await
2529                .expect("open failed");
2530        let mut transaction = fs
2531            .clone()
2532            .new_transaction(
2533                lock_keys![
2534                    LockKey::object(store.store_object_id(), store.root_directory_object_id()),
2535                    LockKey::object(store.store_object_id(), object.object_id()),
2536                ],
2537                Options::default(),
2538            )
2539            .await
2540            .expect("new_transaction failed");
2541        crate::object_store::directory::replace_child(
2542            &mut transaction,
2543            None,
2544            (&root_directory, TEST_OBJECT_NAME),
2545        )
2546        .await
2547        .expect("replace_child failed");
2548        transaction.commit().await.unwrap();
2549        store.tombstone_object(object.object_id(), Options::default()).await.unwrap();
2550
2551        crate::fsck::fsck(fs.clone()).await.unwrap();
2552
2553        fs.close().await.expect("close failed");
2554    }
2555
2556    #[fuchsia::test]
2557    async fn extended_attribute_changing_sizes() {
2558        let (fs, object) = test_filesystem_and_empty_object().await;
2559
2560        let test_name = b"security.selinux";
2561        let test_small_attr = TestAttr::new(test_name, b"smol");
2562        let test_large_attr = TestAttr::new(test_name, vec![3u8; 300]);
2563
2564        object
2565            .set_extended_attribute(
2566                test_small_attr.name(),
2567                test_small_attr.value(),
2568                SetExtendedAttributeMode::Set,
2569            )
2570            .await
2571            .unwrap();
2572        assert_eq!(
2573            object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2574            test_small_attr.value()
2575        );
2576
2577        // With a small attribute, we don't expect it to write to an fxfs attribute.
2578        assert!(
2579            object
2580                .read_attr(AttributeId::XATTR_RANGE_START)
2581                .await
2582                .expect("read_attr failed")
2583                .is_none()
2584        );
2585
2586        crate::fsck::fsck(fs.clone()).await.unwrap();
2587
2588        object
2589            .set_extended_attribute(
2590                test_large_attr.name(),
2591                test_large_attr.value(),
2592                SetExtendedAttributeMode::Set,
2593            )
2594            .await
2595            .unwrap();
2596        assert_eq!(
2597            object.get_extended_attribute(test_large_attr.name()).await.unwrap(),
2598            test_large_attr.value()
2599        );
2600
2601        // Once the value is above the threshold, we expect it to get upgraded to an fxfs
2602        // attribute.
2603        assert_eq!(
2604            object
2605                .read_attr(AttributeId::XATTR_RANGE_START)
2606                .await
2607                .expect("read_attr failed")
2608                .expect("read_attr returned none")
2609                .into_vec(),
2610            test_large_attr.value()
2611        );
2612
2613        crate::fsck::fsck(fs.clone()).await.unwrap();
2614
2615        object
2616            .set_extended_attribute(
2617                test_small_attr.name(),
2618                test_small_attr.value(),
2619                SetExtendedAttributeMode::Set,
2620            )
2621            .await
2622            .unwrap();
2623        assert_eq!(
2624            object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2625            test_small_attr.value()
2626        );
2627
2628        // Even though we are back under the threshold, we still expect it to be stored in an fxfs
2629        // attribute, because we don't downgrade to inline once we've allocated one.
2630        assert_eq!(
2631            object
2632                .read_attr(AttributeId::XATTR_RANGE_START)
2633                .await
2634                .expect("read_attr failed")
2635                .expect("read_attr returned none")
2636                .into_vec(),
2637            test_small_attr.value()
2638        );
2639
2640        crate::fsck::fsck(fs.clone()).await.unwrap();
2641
2642        object.remove_extended_attribute(test_small_attr.name()).await.expect("failed to remove");
2643
2644        crate::fsck::fsck(fs.clone()).await.unwrap();
2645
2646        fs.close().await.expect("close failed");
2647    }
2648
2649    #[fuchsia::test]
2650    async fn extended_attribute_max_size() {
2651        let (fs, object) = test_filesystem_and_empty_object().await;
2652
2653        let test_attr = TestAttr::new(
2654            vec![3u8; super::MAX_XATTR_NAME_SIZE],
2655            vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2656        );
2657
2658        object
2659            .set_extended_attribute(
2660                test_attr.name(),
2661                test_attr.value(),
2662                SetExtendedAttributeMode::Set,
2663            )
2664            .await
2665            .unwrap();
2666        assert_eq!(
2667            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2668            test_attr.value()
2669        );
2670        assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2671        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2672
2673        fs.close().await.expect("close failed");
2674    }
2675
2676    #[fuchsia::test]
2677    async fn extended_attribute_remove_then_create() {
2678        let (fs, object) = test_filesystem_and_empty_object().await;
2679
2680        let test_attr = TestAttr::new(
2681            vec![3u8; super::MAX_XATTR_NAME_SIZE],
2682            vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2683        );
2684
2685        object
2686            .set_extended_attribute(
2687                test_attr.name(),
2688                test_attr.value(),
2689                SetExtendedAttributeMode::Create,
2690            )
2691            .await
2692            .unwrap();
2693        fs.journal().force_compact().await.unwrap();
2694        object.remove_extended_attribute(test_attr.name()).await.unwrap();
2695        object
2696            .set_extended_attribute(
2697                test_attr.name(),
2698                test_attr.value(),
2699                SetExtendedAttributeMode::Create,
2700            )
2701            .await
2702            .unwrap();
2703
2704        assert_eq!(
2705            object.get_extended_attribute(test_attr.name()).await.unwrap(),
2706            test_attr.value()
2707        );
2708
2709        fs.close().await.expect("close failed");
2710    }
2711
2712    #[fuchsia::test]
2713    async fn large_extended_attribute_max_number() {
2714        let (fs, object) = test_filesystem_and_empty_object().await;
2715
2716        let max_xattrs = AttributeId::XATTR_RANGE_END.raw() - AttributeId::XATTR_RANGE_START.raw();
2717        for i in 0..max_xattrs {
2718            let test_attr = TestAttr::new(format!("{}", i).as_bytes(), vec![0x3; 300]);
2719            object
2720                .set_extended_attribute(
2721                    test_attr.name(),
2722                    test_attr.value(),
2723                    SetExtendedAttributeMode::Set,
2724                )
2725                .await
2726                .unwrap_or_else(|_| panic!("failed to set xattr number {}", i));
2727        }
2728
2729        // That should have taken up all the attributes we've allocated to extended attributes, so
2730        // this one should return ERR_NO_SPACE.
2731        match object
2732            .set_extended_attribute(
2733                b"one.too.many".to_vec(),
2734                vec![0x3; 300],
2735                SetExtendedAttributeMode::Set,
2736            )
2737            .await
2738        {
2739            Ok(()) => panic!("set should not succeed"),
2740            Err(e) => is_error(e, FxfsError::NoSpace),
2741        }
2742
2743        // But inline attributes don't need an attribute number, so it should work fine.
2744        object
2745            .set_extended_attribute(
2746                b"this.is.okay".to_vec(),
2747                b"small value".to_vec(),
2748                SetExtendedAttributeMode::Set,
2749            )
2750            .await
2751            .unwrap();
2752
2753        // And updating existing ones should be okay.
2754        object
2755            .set_extended_attribute(b"11".to_vec(), vec![0x4; 300], SetExtendedAttributeMode::Set)
2756            .await
2757            .unwrap();
2758        object
2759            .set_extended_attribute(
2760                b"12".to_vec(),
2761                vec![0x1; 300],
2762                SetExtendedAttributeMode::Replace,
2763            )
2764            .await
2765            .unwrap();
2766
2767        // And we should be able to remove an attribute and set another one.
2768        object.remove_extended_attribute(b"5".to_vec()).await.unwrap();
2769        object
2770            .set_extended_attribute(
2771                b"new attr".to_vec(),
2772                vec![0x3; 300],
2773                SetExtendedAttributeMode::Set,
2774            )
2775            .await
2776            .unwrap();
2777
2778        fs.close().await.expect("close failed");
2779    }
2780
2781    #[fuchsia::test]
2782    async fn write_attr_trims_beyond_new_end() {
2783        // When writing, multi_write will deallocate old extents that overlap with the new data,
2784        // but it doesn't trim anything beyond that, since it doesn't know what the total size will
2785        // be. write_attr does know, because it writes the whole attribute at once, so we need to
2786        // make sure it cleans up properly.
2787        let (fs, object) = test_filesystem_and_empty_object().await;
2788
2789        let block_size = fs.block_size();
2790        let buf_size = block_size * 2;
2791        let attribute_id = AttributeId::TEST_ID;
2792
2793        let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2794        let mut buffer = object.allocate_buffer(buf_size as usize).await;
2795        buffer.as_mut_slice().fill(3);
2796        // Writing two separate ranges, even if they are contiguous, forces them to be separate
2797        // extent records.
2798        object
2799            .multi_write(
2800                &mut transaction,
2801                attribute_id,
2802                &[0..block_size, block_size..block_size * 2],
2803                buffer.as_mut(),
2804            )
2805            .await
2806            .unwrap();
2807        transaction.add(
2808            object.store().store_object_id,
2809            Mutation::replace_or_insert_object(
2810                ObjectKey::attribute(object.object_id(), attribute_id, AttributeKey::Attribute),
2811                ObjectValue::attribute(block_size * 2, false),
2812            ),
2813        );
2814        transaction.commit().await.unwrap();
2815
2816        crate::fsck::fsck(fs.clone()).await.unwrap();
2817
2818        let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2819        let needs_trim = (*object)
2820            .write_attr(&mut transaction, attribute_id, &vec![3u8; block_size as usize])
2821            .await
2822            .unwrap();
2823        assert!(!needs_trim.0);
2824        transaction.commit().await.unwrap();
2825
2826        crate::fsck::fsck(fs.clone()).await.unwrap();
2827
2828        fs.close().await.expect("close failed");
2829    }
2830
2831    #[fuchsia::test]
2832    async fn write_new_attr_in_batches_multiple_txns() {
2833        let (fs, object) = test_filesystem_and_empty_object().await;
2834        let merkle_tree = vec![1; 3 * WRITE_ATTR_BATCH_SIZE];
2835        let mut transaction = (*object).new_transaction(AttributeId::TEST_ID).await.unwrap();
2836        object
2837            .write_new_attr_in_batches(
2838                &mut transaction,
2839                AttributeId::TEST_ID,
2840                &merkle_tree,
2841                WRITE_ATTR_BATCH_SIZE,
2842            )
2843            .await
2844            .expect("failed to write merkle attribute");
2845
2846        transaction.add(
2847            object.store().store_object_id,
2848            Mutation::replace_or_insert_object(
2849                ObjectKey::graveyard_attribute_entry(
2850                    object.store().graveyard_directory_object_id(),
2851                    object.object_id(),
2852                    AttributeId::TEST_ID,
2853                ),
2854                ObjectValue::None,
2855            ),
2856        );
2857        transaction.commit().await.unwrap();
2858        assert_eq!(
2859            object.read_attr(AttributeId::TEST_ID).await.expect("read_attr failed"),
2860            Some(merkle_tree.into())
2861        );
2862
2863        fs.close().await.expect("close failed");
2864    }
2865
2866    // Running on target only, to use fake time features in the executor.
2867    #[cfg(target_os = "fuchsia")]
2868    #[fuchsia::test(allow_stalls = false)]
2869    async fn test_watchdog() {
2870        use super::Watchdog;
2871        use fuchsia_async::{MonotonicDuration, MonotonicInstant, TestExecutor};
2872        use std::sync::mpsc::channel;
2873
2874        TestExecutor::advance_to(make_time(0)).await;
2875        let (sender, receiver) = channel();
2876
2877        fn make_time(time_secs: i64) -> MonotonicInstant {
2878            MonotonicInstant::from_nanos(0) + MonotonicDuration::from_seconds(time_secs)
2879        }
2880
2881        {
2882            let _watchdog = Watchdog::new(10, move |count| {
2883                sender.send(count).expect("Sending value");
2884            });
2885
2886            // Too early.
2887            TestExecutor::advance_to(make_time(5)).await;
2888            receiver.try_recv().expect_err("Should not have message");
2889
2890            // First message.
2891            TestExecutor::advance_to(make_time(10)).await;
2892            assert_eq!(1, receiver.recv().expect("Receiving"));
2893
2894            // Too early for the next.
2895            TestExecutor::advance_to(make_time(15)).await;
2896            receiver.try_recv().expect_err("Should not have message");
2897
2898            // Missed one. They'll be spooled up.
2899            TestExecutor::advance_to(make_time(30)).await;
2900            assert_eq!(2, receiver.recv().expect("Receiving"));
2901            assert_eq!(3, receiver.recv().expect("Receiving"));
2902        }
2903
2904        // Watchdog is dropped, nothing should trigger.
2905        TestExecutor::advance_to(make_time(100)).await;
2906        receiver.recv().expect_err("Watchdog should be gone");
2907    }
2908
2909    #[fuchsia::test]
2910    fn test_checksum_range_chunk() {
2911        let block_size = 4096;
2912
2913        // No bitmap means one chunk that covers the whole range
2914        assert_eq!(
2915            ChecksumRangeChunk::group_first_write_ranges(
2916                &mut OverwriteBitmaps::None,
2917                block_size,
2918                block_size * 2..block_size * 5,
2919            ),
2920            vec![ChecksumRangeChunk {
2921                checksum_range: 0..3,
2922                device_range: block_size * 2..block_size * 5,
2923                is_first_write: false,
2924            }],
2925        );
2926
2927        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2928        assert_eq!(
2929            ChecksumRangeChunk::group_first_write_ranges(
2930                &mut bitmaps,
2931                block_size,
2932                block_size * 2..block_size * 5,
2933            ),
2934            vec![ChecksumRangeChunk {
2935                checksum_range: 0..3,
2936                device_range: block_size * 2..block_size * 5,
2937                is_first_write: false,
2938            }],
2939        );
2940        assert_eq!(
2941            bitmaps.take_bitmaps(),
2942            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b11100000])))
2943        );
2944
2945        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2946        bitmaps.set_offset(2);
2947        assert_eq!(
2948            ChecksumRangeChunk::group_first_write_ranges(
2949                &mut bitmaps,
2950                block_size,
2951                block_size * 2..block_size * 5,
2952            ),
2953            vec![
2954                ChecksumRangeChunk {
2955                    checksum_range: 0..2,
2956                    device_range: block_size * 2..block_size * 4,
2957                    is_first_write: false,
2958                },
2959                ChecksumRangeChunk {
2960                    checksum_range: 2..3,
2961                    device_range: block_size * 4..block_size * 5,
2962                    is_first_write: true,
2963                },
2964            ],
2965        );
2966        assert_eq!(
2967            bitmaps.take_bitmaps(),
2968            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00111000])))
2969        );
2970
2971        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2972        bitmaps.set_offset(4);
2973        assert_eq!(
2974            ChecksumRangeChunk::group_first_write_ranges(
2975                &mut bitmaps,
2976                block_size,
2977                block_size * 2..block_size * 5,
2978            ),
2979            vec![ChecksumRangeChunk {
2980                checksum_range: 0..3,
2981                device_range: block_size * 2..block_size * 5,
2982                is_first_write: true,
2983            }],
2984        );
2985        assert_eq!(
2986            bitmaps.take_bitmaps(),
2987            Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00001110])))
2988        );
2989
2990        let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b01010101]));
2991        assert_eq!(
2992            ChecksumRangeChunk::group_first_write_ranges(
2993                &mut bitmaps,
2994                block_size,
2995                block_size * 2..block_size * 10,
2996            ),
2997            vec![
2998                ChecksumRangeChunk {
2999                    checksum_range: 0..1,
3000                    device_range: block_size * 2..block_size * 3,
3001                    is_first_write: true,
3002                },
3003                ChecksumRangeChunk {
3004                    checksum_range: 1..2,
3005                    device_range: block_size * 3..block_size * 4,
3006                    is_first_write: false,
3007                },
3008                ChecksumRangeChunk {
3009                    checksum_range: 2..3,
3010                    device_range: block_size * 4..block_size * 5,
3011                    is_first_write: true,
3012                },
3013                ChecksumRangeChunk {
3014                    checksum_range: 3..4,
3015                    device_range: block_size * 5..block_size * 6,
3016                    is_first_write: false,
3017                },
3018                ChecksumRangeChunk {
3019                    checksum_range: 4..5,
3020                    device_range: block_size * 6..block_size * 7,
3021                    is_first_write: true,
3022                },
3023                ChecksumRangeChunk {
3024                    checksum_range: 5..6,
3025                    device_range: block_size * 7..block_size * 8,
3026                    is_first_write: false,
3027                },
3028                ChecksumRangeChunk {
3029                    checksum_range: 6..7,
3030                    device_range: block_size * 8..block_size * 9,
3031                    is_first_write: true,
3032                },
3033                ChecksumRangeChunk {
3034                    checksum_range: 7..8,
3035                    device_range: block_size * 9..block_size * 10,
3036                    is_first_write: false,
3037                },
3038            ],
3039        );
3040        assert_eq!(
3041            bitmaps.take_bitmaps(),
3042            Some((BitVec::from_bytes(&[0b01010101]), BitVec::from_bytes(&[0b11111111])))
3043        );
3044    }
3045}