1use crate::checksum::{Checksum, Checksums, fletcher64};
6use crate::errors::FxfsError;
7use crate::log::*;
8use crate::lsm_tree::Query;
9use crate::lsm_tree::merge::MergerIterator;
10use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
11use crate::object_handle::ObjectHandle;
12use crate::object_store::extent_record::{ExtentMode, ExtentValue};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::object_record::{
15 AttributeKey, ExtendedAttributeValue, ObjectAttributes, ObjectItem, ObjectKey, ObjectKeyData,
16 ObjectValue, Timestamp,
17};
18use crate::object_store::transaction::{
19 AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Options, ReadGuard,
20 Transaction, lock_keys,
21};
22use crate::object_store::{
23 AttributeId, Extent, HandleOptions, HandleOwner, ObjectStore, TrimMode, TrimResult,
24 VOLUME_DATA_KEY_ID,
25};
26use crate::range::RangeExt;
27use crate::round::{round_down, round_up};
28use anyhow::{Context, Error, anyhow, bail, ensure};
29use assert_matches::assert_matches;
30use bit_vec::BitVec;
31use futures::stream::{FuturesOrdered, FuturesUnordered};
32use futures::{TryStreamExt, try_join};
33use fxfs_crypto::{
34 Cipher, CipherHolder, CipherSet, EncryptionKey, FindKeyResult, FxfsCipher, KeyPurpose,
35};
36use fxfs_trace::{TraceFutureExt, trace, trace_future_args};
37use static_assertions::const_assert;
38use std::cmp::min;
39use std::future::Future;
40use std::ops::Range;
41use std::sync::Arc;
42use std::sync::atomic::{self, AtomicBool, Ordering};
43use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
44use storage_device::{InlineCryptoOptions, ReadOptions, WriteOptions};
45
46use fidl_fuchsia_io as fio;
47use fuchsia_async as fasync;
48
49pub const MAX_XATTR_NAME_SIZE: usize = 255;
51pub const MAX_INLINE_XATTR_SIZE: usize = 256;
54pub const MAX_XATTR_VALUE_SIZE: usize = 64000;
58
59fn apply_bitmap_zeroing(
61 block_size: usize,
62 bitmap: &bit_vec::BitVec,
63 mut buffer: MutableBufferRef<'_>,
64) {
65 let buf = buffer.as_mut_slice();
66 debug_assert_eq!(bitmap.len() * block_size, buf.len());
67 for (i, block) in bitmap.iter().enumerate() {
68 if !block {
69 let start = i * block_size;
70 buf[start..start + block_size].fill(0);
71 }
72 }
73}
74
75#[derive(Debug, Clone, PartialEq)]
79pub enum MaybeChecksums {
80 None,
81 Fletcher(Vec<Checksum>),
82}
83
84impl MaybeChecksums {
85 pub fn maybe_as_ref(&self) -> Option<&[Checksum]> {
86 match self {
87 Self::None => None,
88 Self::Fletcher(sums) => Some(&sums),
89 }
90 }
91
92 pub fn split_off(&mut self, at: usize) -> Self {
93 match self {
94 Self::None => Self::None,
95 Self::Fletcher(sums) => Self::Fletcher(sums.split_off(at)),
96 }
97 }
98
99 pub fn to_mode(self) -> ExtentMode {
100 match self {
101 Self::None => ExtentMode::Raw,
102 Self::Fletcher(sums) => ExtentMode::Cow(Checksums::fletcher(sums)),
103 }
104 }
105
106 pub fn into_option(self) -> Option<Vec<Checksum>> {
107 match self {
108 Self::None => None,
109 Self::Fletcher(sums) => Some(sums),
110 }
111 }
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118pub enum SetExtendedAttributeMode {
119 Set,
121 Create,
123 Replace,
125}
126
127impl From<fio::SetExtendedAttributeMode> for SetExtendedAttributeMode {
128 fn from(other: fio::SetExtendedAttributeMode) -> SetExtendedAttributeMode {
129 match other {
130 fio::SetExtendedAttributeMode::Set => SetExtendedAttributeMode::Set,
131 fio::SetExtendedAttributeMode::Create => SetExtendedAttributeMode::Create,
132 fio::SetExtendedAttributeMode::Replace => SetExtendedAttributeMode::Replace,
133 }
134 }
135}
136
137enum Encryption {
138 None,
140
141 CachedKeys,
144
145 PermanentKeys,
147}
148
149#[derive(PartialEq, Debug)]
150enum OverwriteBitmaps {
151 None,
152 Some {
153 extent_bitmap: BitVec,
155 write_bitmap: BitVec,
157 bitmap_offset: usize,
160 },
161}
162
163impl OverwriteBitmaps {
164 fn new(extent_bitmap: BitVec) -> Self {
165 OverwriteBitmaps::Some {
166 write_bitmap: BitVec::from_elem(extent_bitmap.len(), false),
167 extent_bitmap,
168 bitmap_offset: 0,
169 }
170 }
171
172 fn is_none(&self) -> bool {
173 *self == OverwriteBitmaps::None
174 }
175
176 fn set_offset(&mut self, new_offset: usize) {
177 match self {
178 OverwriteBitmaps::None => (),
179 OverwriteBitmaps::Some { bitmap_offset, .. } => *bitmap_offset = new_offset,
180 }
181 }
182
183 fn get_from_extent_bitmap(&self, i: usize) -> Option<bool> {
184 match self {
185 OverwriteBitmaps::None => None,
186 OverwriteBitmaps::Some { extent_bitmap, bitmap_offset, .. } => {
187 extent_bitmap.get(*bitmap_offset + i)
188 }
189 }
190 }
191
192 fn set_in_write_bitmap(&mut self, i: usize, x: bool) {
193 match self {
194 OverwriteBitmaps::None => (),
195 OverwriteBitmaps::Some { write_bitmap, bitmap_offset, .. } => {
196 write_bitmap.set(*bitmap_offset + i, x)
197 }
198 }
199 }
200
201 fn take_bitmaps(self) -> Option<(BitVec, BitVec)> {
202 match self {
203 OverwriteBitmaps::None => None,
204 OverwriteBitmaps::Some { extent_bitmap, write_bitmap, .. } => {
205 Some((extent_bitmap, write_bitmap))
206 }
207 }
208 }
209}
210
211#[derive(PartialEq, Debug)]
215struct ChecksumRangeChunk {
216 checksum_range: Range<usize>,
217 device_range: Range<u64>,
218 is_first_write: bool,
219}
220
221impl ChecksumRangeChunk {
222 fn group_first_write_ranges(
223 bitmaps: &mut OverwriteBitmaps,
224 block_size: u64,
225 write_device_range: Range<u64>,
226 ) -> Vec<ChecksumRangeChunk> {
227 let write_block_len = (write_device_range.length().unwrap() / block_size) as usize;
228 if bitmaps.is_none() {
229 vec![ChecksumRangeChunk {
234 checksum_range: 0..write_block_len,
235 device_range: write_device_range,
236 is_first_write: false,
237 }]
238 } else {
239 let mut checksum_ranges = vec![ChecksumRangeChunk {
240 checksum_range: 0..0,
241 device_range: write_device_range.start..write_device_range.start,
242 is_first_write: !bitmaps.get_from_extent_bitmap(0).unwrap(),
243 }];
244 let mut working_range = checksum_ranges.last_mut().unwrap();
245 for i in 0..write_block_len {
246 bitmaps.set_in_write_bitmap(i, true);
247
248 if working_range.is_first_write != bitmaps.get_from_extent_bitmap(i).unwrap() {
251 working_range.checksum_range.end += 1;
254 working_range.device_range.end += block_size;
255 } else {
256 let new_chunk = ChecksumRangeChunk {
258 checksum_range: working_range.checksum_range.end
259 ..working_range.checksum_range.end + 1,
260 device_range: working_range.device_range.end
261 ..working_range.device_range.end + block_size,
262 is_first_write: !working_range.is_first_write,
263 };
264 checksum_ranges.push(new_chunk);
265 working_range = checksum_ranges.last_mut().unwrap();
266 }
267 }
268 checksum_ranges
269 }
270 }
271}
272
273pub struct StoreObjectHandle<S: HandleOwner> {
286 owner: Arc<S>,
287 object_id: u64,
288 options: HandleOptions,
289 trace: AtomicBool,
290 encryption: Encryption,
291}
292
293impl<S: HandleOwner> ObjectHandle for StoreObjectHandle<S> {
294 fn set_trace(&self, v: bool) {
295 info!(store_id = self.store().store_object_id, oid = self.object_id(), trace = v; "trace");
296 self.trace.store(v, atomic::Ordering::Relaxed);
297 }
298
299 fn object_id(&self) -> u64 {
300 return self.object_id;
301 }
302
303 fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
304 self.store().device.allocate_buffer(size)
305 }
306
307 fn block_size(&self) -> u64 {
308 self.store().block_size()
309 }
310}
311
312struct Watchdog {
313 _task: fasync::Task<()>,
314}
315
316impl Watchdog {
317 fn new(increment_seconds: u64, cb: impl Fn(u64) + Send + 'static) -> Self {
318 Self {
319 _task: fasync::Task::spawn(
320 async move {
321 let increment = increment_seconds.try_into().unwrap();
322 let mut fired_counter = 0;
323 let mut next_wake = fasync::MonotonicInstant::now();
324 loop {
325 next_wake += std::time::Duration::from_secs(increment).into();
326 if fasync::MonotonicInstant::now() < next_wake {
330 fasync::Timer::new(next_wake).await;
331 }
332 fired_counter += 1;
333 cb(fired_counter);
334 }
335 }
336 .trace(trace_future_args!("StoreObjectHandle::Watchdog")),
337 ),
338 }
339 }
340}
341
342impl<S: HandleOwner> StoreObjectHandle<S> {
343 pub fn new(
345 owner: Arc<S>,
346 object_id: u64,
347 permanent_keys: bool,
348 options: HandleOptions,
349 trace: bool,
350 ) -> Self {
351 let encryption = if permanent_keys {
352 Encryption::PermanentKeys
353 } else if owner.as_ref().as_ref().is_encrypted() {
354 Encryption::CachedKeys
355 } else {
356 Encryption::None
357 };
358 Self { owner, object_id, encryption, options, trace: AtomicBool::new(trace) }
359 }
360
361 pub fn owner(&self) -> &Arc<S> {
362 &self.owner
363 }
364
365 pub fn store(&self) -> &ObjectStore {
366 self.owner.as_ref().as_ref()
367 }
368
369 pub fn trace(&self) -> bool {
370 self.trace.load(atomic::Ordering::Relaxed)
371 }
372
373 pub fn is_encrypted(&self) -> bool {
374 !matches!(self.encryption, Encryption::None)
375 }
376
377 pub fn default_transaction_options<'b>(&self) -> Options<'b> {
380 Options { skip_journal_checks: self.options.skip_journal_checks, ..Default::default() }
381 }
382
383 pub async fn new_transaction_with_options<'b>(
384 &self,
385 attribute_id: AttributeId,
386 options: Options<'b>,
387 ) -> Result<Transaction<'b>, Error> {
388 Ok(self
389 .store()
390 .new_transaction(
391 lock_keys![
392 LockKey::object_attribute(
393 self.store().store_object_id(),
394 self.object_id(),
395 attribute_id,
396 ),
397 LockKey::object(self.store().store_object_id(), self.object_id()),
398 ],
399 options,
400 )
401 .await?)
402 }
403
404 pub async fn new_transaction<'b>(
405 &self,
406 attribute_id: AttributeId,
407 ) -> Result<Transaction<'b>, Error> {
408 self.new_transaction_with_options(attribute_id, self.default_transaction_options()).await
409 }
410
411 async fn txn_get_object_mutation(
414 &self,
415 transaction: &Transaction<'_>,
416 ) -> Result<ObjectStoreMutation, Error> {
417 self.store().txn_get_object_mutation(transaction, self.object_id()).await
418 }
419
420 async fn deallocate_old_extents(
422 &self,
423 transaction: &mut Transaction<'_>,
424 attribute_id: AttributeId,
425 range: Range<u64>,
426 ) -> Result<u64, Error> {
427 let block_size = self.block_size();
428 assert_eq!(range.start % block_size, 0);
429 assert_eq!(range.end % block_size, 0);
430 if range.start == range.end {
431 return Ok(0);
432 }
433 let tree = &self.store().tree;
434 let layer_set = tree.layer_set();
435 let key = Extent(range);
436 let lower_bound = ObjectKey::attribute(
437 self.object_id(),
438 attribute_id,
439 AttributeKey::Extent(key.search_key()),
440 );
441 let mut merger = layer_set.merger();
442 let mut iter = merger.query(Query::FullRange(&lower_bound)).await?;
443 let allocator = self.store().allocator();
444 let mut deallocated = 0;
445 let trace = self.trace();
446 while let Some(ItemRef {
447 key:
448 ObjectKey {
449 object_id,
450 data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
451 },
452 value: ObjectValue::Extent(value),
453 ..
454 }) = iter.get()
455 {
456 if *object_id != self.object_id() || *attr_id != attribute_id {
457 break;
458 }
459 if let ExtentValue::Some { device_offset, .. } = value {
460 if let Some(overlap) = key.overlap(extent_key) {
461 let range = device_offset + overlap.start - extent_key.start
462 ..device_offset + overlap.end - extent_key.start;
463 ensure!(range.is_aligned(block_size), FxfsError::Inconsistent);
464 if trace {
465 info!(
466 store_id = self.store().store_object_id(),
467 oid = self.object_id(),
468 device_range:? = range,
469 len = range.end - range.start,
470 extent_key:?;
471 "D",
472 );
473 }
474 allocator
475 .deallocate(transaction, self.store().store_object_id(), range)
476 .await?;
477 deallocated += overlap.end - overlap.start;
478 } else {
479 break;
480 }
481 }
482 iter.advance().await?;
483 }
484 Ok(deallocated)
485 }
486
487 async fn write_aligned(
490 &self,
491 buf: BufferRef<'_>,
492 device_offset: u64,
493 crypt_ctx: Option<(u32, u8)>,
494 ) -> Result<MaybeChecksums, Error> {
495 if self.trace() {
496 info!(
497 store_id = self.store().store_object_id(),
498 oid = self.object_id(),
499 device_range:? = (device_offset..device_offset + buf.len() as u64),
500 len = buf.len();
501 "W",
502 );
503 }
504 let store = self.store();
505 store.device_write_ops.fetch_add(1, Ordering::Relaxed);
506 let mut checksums = Vec::new();
507 let _watchdog = Watchdog::new(10, |count| {
508 warn!("Write has been stalled for {} seconds", count * 10);
509 });
510
511 match crypt_ctx {
512 Some((dun, slot)) => {
513 if !store.filesystem().options().barriers_enabled {
514 return Err(anyhow!(FxfsError::InvalidArgs)
515 .context("Barriers must be enabled for inline encrypted writes."));
516 }
517 store
518 .device
519 .write_with_opts(
520 device_offset as u64,
521 buf,
522 WriteOptions {
523 inline_crypto: InlineCryptoOptions::enabled(slot, dun),
524 ..Default::default()
525 },
526 )
527 .await?;
528 Ok(MaybeChecksums::None)
529 }
530 None => {
531 if self.options.skip_checksums {
532 store
533 .device
534 .write_with_opts(device_offset as u64, buf, WriteOptions::default())
535 .await?;
536 Ok(MaybeChecksums::None)
537 } else {
538 try_join!(store.device.write(device_offset, buf), async {
539 let block_size = self.block_size();
540 for chunk in buf.as_slice().chunks_exact(block_size as usize) {
541 checksums.push(fletcher64(chunk, 0));
542 }
543 Ok(())
544 })?;
545 Ok(MaybeChecksums::Fletcher(checksums))
546 }
547 }
548 }
549 }
550
551 pub async fn flush_device(&self) -> Result<(), Error> {
553 self.store().device().flush().await
554 }
555
556 pub async fn update_allocated_size(
557 &self,
558 transaction: &mut Transaction<'_>,
559 allocated: u64,
560 deallocated: u64,
561 ) -> Result<(), Error> {
562 if allocated == deallocated {
563 return Ok(());
564 }
565 let mut mutation = self.txn_get_object_mutation(transaction).await?;
566 if let ObjectValue::Object {
567 attributes: ObjectAttributes { project_id, allocated_size, .. },
568 ..
569 } = &mut mutation.item.value
570 {
571 *allocated_size = allocated_size
573 .checked_add(allocated)
574 .ok_or_else(|| anyhow!(FxfsError::Inconsistent).context("Allocated size overflow"))?
575 .checked_sub(deallocated)
576 .ok_or_else(|| {
577 anyhow!(FxfsError::Inconsistent).context("Allocated size underflow")
578 })?;
579
580 if let Some(project_id) = project_id {
581 let diff = i64::try_from(allocated).unwrap() - i64::try_from(deallocated).unwrap();
584 transaction.add(
585 self.store().store_object_id(),
586 Mutation::merge_object(
587 ObjectKey::project_usage(
588 self.store().root_directory_object_id(),
589 *project_id,
590 ),
591 ObjectValue::BytesAndNodes { bytes: diff, nodes: 0 },
592 ),
593 );
594 }
595 } else {
596 bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
599 }
600 transaction.add(self.store().store_object_id, Mutation::ObjectStore(mutation));
601 Ok(())
602 }
603
604 pub async fn update_attributes<'a>(
605 &self,
606 transaction: &mut Transaction<'a>,
607 node_attributes: Option<&fio::MutableNodeAttributes>,
608 change_time: Option<Timestamp>,
609 ) -> Result<(), Error> {
610 if let Some(&fio::MutableNodeAttributes { selinux_context: Some(ref context), .. }) =
611 node_attributes
612 {
613 if let fio::SelinuxContext::Data(context) = context {
614 self.set_extended_attribute_impl(
615 "security.selinux".into(),
616 context.clone(),
617 SetExtendedAttributeMode::Set,
618 transaction,
619 )
620 .await?;
621 } else {
622 return Err(anyhow!(FxfsError::InvalidArgs)
623 .context("Only set SELinux context with `data` member."));
624 }
625 }
626 self.store()
627 .update_attributes(transaction, self.object_id, node_attributes, change_time)
628 .await
629 }
630
631 pub async fn zero(
633 &self,
634 transaction: &mut Transaction<'_>,
635 attribute_id: AttributeId,
636 range: Range<u64>,
637 ) -> Result<(), Error> {
638 let deallocated =
639 self.deallocate_old_extents(transaction, attribute_id, range.clone()).await?;
640 if deallocated > 0 {
641 self.update_allocated_size(transaction, 0, deallocated).await?;
642 transaction.add(
643 self.store().store_object_id,
644 Mutation::merge_object(
645 ObjectKey::extent(self.object_id(), attribute_id, range),
646 ObjectValue::Extent(ExtentValue::deleted_extent()),
647 ),
648 );
649 }
650 Ok(())
651 }
652
653 pub async fn align_buffer(
656 &self,
657 attribute_id: AttributeId,
658 offset: u64,
659 buf: BufferRef<'_>,
660 ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
661 let block_size = self.block_size();
662 let end = offset + buf.len() as u64;
663 let aligned =
664 round_down(offset, block_size)..round_up(end, block_size).ok_or(FxfsError::TooBig)?;
665
666 let mut aligned_buf =
667 self.store().device.allocate_buffer((aligned.end - aligned.start) as usize).await;
668
669 if aligned.start < offset {
671 let mut head_block = aligned_buf.subslice_mut(..block_size as usize);
672 let read = self.read(attribute_id, aligned.start, head_block.reborrow()).await?;
673 head_block.as_mut_slice()[read..].fill(0);
674 }
675
676 if aligned.end > end {
678 let end_block_offset = aligned.end - block_size;
679 if offset <= end_block_offset {
681 let mut tail_block =
682 aligned_buf.subslice_mut(aligned_buf.len() - block_size as usize..);
683 let read = self.read(attribute_id, end_block_offset, tail_block.reborrow()).await?;
684 tail_block.as_mut_slice()[read..].fill(0);
685 }
686 }
687
688 aligned_buf.as_mut_slice()
689 [(offset - aligned.start) as usize..(end - aligned.start) as usize]
690 .copy_from_slice(buf.as_slice());
691
692 Ok((aligned, aligned_buf))
693 }
694
695 pub async fn shrink(
701 &self,
702 transaction: &mut Transaction<'_>,
703 attribute_id: AttributeId,
704 size: u64,
705 ) -> Result<NeedsTrim, Error> {
706 let store = self.store();
707 let needs_trim = matches!(
708 store
709 .trim_some(transaction, self.object_id(), attribute_id, TrimMode::FromOffset(size))
710 .await?,
711 TrimResult::Incomplete
712 );
713 if needs_trim {
714 let graveyard_id = store.graveyard_directory_object_id();
717 match store
718 .tree
719 .find(&ObjectKey::graveyard_entry(graveyard_id, self.object_id()))
720 .await?
721 {
722 Some(ObjectItem { value: ObjectValue::Some, .. })
723 | Some(ObjectItem { value: ObjectValue::Trim, .. }) => {
724 }
726 _ => {
727 transaction.add(
728 store.store_object_id,
729 Mutation::replace_or_insert_object(
730 ObjectKey::graveyard_entry(graveyard_id, self.object_id()),
731 ObjectValue::Trim,
732 ),
733 );
734 }
735 }
736 }
737 Ok(NeedsTrim(needs_trim))
738 }
739
740 pub async fn read_and_decrypt(
742 &self,
743 device_offset: u64,
744 file_offset: u64,
745 mut buffer: MutableBufferRef<'_>,
746 key_id: u64,
747 ) -> Result<(), Error> {
748 let store = self.store();
749 store.device_read_ops.fetch_add(1, Ordering::Relaxed);
750
751 let _watchdog = Watchdog::new(10, |count| {
752 warn!("Read has been stalled for {} seconds", count * 10);
753 });
754
755 let (_key_id, key) = self.get_key(Some(key_id)).await?;
756 if let Some(key) = key {
757 if let Some((dun, slot)) = key.crypt_ctx(self.object_id, file_offset) {
758 store
759 .device
760 .read_with_opts(
761 device_offset as u64,
762 buffer.reborrow(),
763 ReadOptions { inline_crypto: InlineCryptoOptions::enabled(slot, dun) },
764 )
765 .await?;
766 } else {
767 store.device.read(device_offset, buffer.reborrow()).await?;
768 key.decrypt(self.object_id, device_offset, file_offset, buffer.as_mut_slice())?;
769 }
770 } else {
771 store.device.read(device_offset, buffer.reborrow()).await?;
772 }
773
774 Ok(())
775 }
776
777 pub async fn get_key(
782 &self,
783 key_id: Option<u64>,
784 ) -> Result<(u64, Option<Arc<dyn Cipher>>), Error> {
785 let store = self.store();
786 let result = match self.encryption {
787 Encryption::None => (VOLUME_DATA_KEY_ID, None),
788 Encryption::CachedKeys => {
789 if let Some(key_id) = key_id {
790 (
791 key_id,
792 Some(
793 store
794 .key_manager
795 .get_key(
796 self.object_id,
797 store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
798 async || store.get_keys(self.object_id).await,
799 key_id,
800 )
801 .await?,
802 ),
803 )
804 } else {
805 let (key_id, key) = store
806 .key_manager
807 .get_fscrypt_key_if_present(
808 self.object_id,
809 store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
810 async || store.get_keys(self.object_id).await,
811 )
812 .await?;
813 (key_id, Some(key))
814 }
815 }
816 Encryption::PermanentKeys => {
817 (VOLUME_DATA_KEY_ID, Some(store.key_manager.get(self.object_id).await?.unwrap()))
818 }
819 };
820
821 if let Some(ref key) = result.1 {
823 if key.crypt_ctx(self.object_id, 0).is_some() {
824 if !store.filesystem().options().barriers_enabled {
825 return Err(anyhow!(FxfsError::InvalidArgs)
826 .context("Barriers must be enabled for inline encrypted writes."));
827 }
828 }
829 }
830
831 Ok(result)
832 }
833
834 async fn get_or_create_key(
838 &self,
839 transaction: &mut Transaction<'_>,
840 ) -> Result<Arc<dyn Cipher>, Error> {
841 let store = self.store();
842
843 if let Some(key) = store.key_manager.get(self.object_id).await.context("get failed")? {
845 return Ok(key);
846 }
847
848 let crypt = store.crypt().ok_or_else(|| anyhow!("No crypt!"))?;
849
850 let (mut encryption_keys, mut cipher_set) = if let Some(item) =
852 store.tree.find(&ObjectKey::keys(self.object_id)).await.context("find failed")?
853 {
854 if let ObjectValue::Keys(encryption_keys) = item.value {
855 let cipher_set = store
856 .key_manager
857 .get_keys(
858 self.object_id,
859 crypt.as_ref(),
860 &mut Some(async || Ok(encryption_keys.clone())),
861 false,
862 false,
863 )
864 .await
865 .context("get_keys failed")?;
866 match cipher_set.find_key(VOLUME_DATA_KEY_ID) {
867 FindKeyResult::NotFound => {}
868 FindKeyResult::Unavailable => return Err(FxfsError::NoKey.into()),
869 FindKeyResult::Key(key) => return Ok(key),
870 }
871 (encryption_keys, (*cipher_set).clone())
872 } else {
873 return Err(anyhow!(FxfsError::Inconsistent));
874 }
875 } else {
876 Default::default()
877 };
878
879 let (key, unwrapped_key) = crypt.create_key(self.object_id, KeyPurpose::Data).await?;
881 let cipher: Arc<dyn Cipher> = Arc::new(FxfsCipher::new(&unwrapped_key));
882
883 cipher_set.add_key(VOLUME_DATA_KEY_ID, CipherHolder::Cipher(cipher.clone()));
886 let cipher_set = Arc::new(cipher_set);
887
888 struct UnwrappedKeys {
891 object_id: u64,
892 new_keys: Arc<CipherSet>,
893 }
894
895 impl AssociatedObject for UnwrappedKeys {
896 fn will_apply_mutation(
897 &self,
898 _mutation: &Mutation,
899 object_id: u64,
900 manager: &ObjectManager,
901 ) {
902 manager.store(object_id).unwrap().key_manager.insert(
903 self.object_id,
904 self.new_keys.clone(),
905 false,
906 );
907 }
908 }
909
910 encryption_keys.insert(VOLUME_DATA_KEY_ID, EncryptionKey::Fxfs(key).into());
911
912 transaction.add_with_object(
913 store.store_object_id(),
914 Mutation::replace_or_insert_object(
915 ObjectKey::keys(self.object_id),
916 ObjectValue::keys(encryption_keys),
917 ),
918 AssocObj::Owned(Box::new(UnwrappedKeys {
919 object_id: self.object_id,
920 new_keys: cipher_set,
921 })),
922 );
923
924 Ok(cipher)
925 }
926
927 pub async fn read(
928 &self,
929 attribute_id: AttributeId,
930 offset: u64,
931 mut buf: MutableBufferRef<'_>,
932 ) -> Result<usize, Error> {
933 let fs = self.store().filesystem();
934 let guard = fs
935 .lock_manager()
936 .read_lock(lock_keys![LockKey::object_attribute(
937 self.store().store_object_id(),
938 self.object_id(),
939 attribute_id,
940 )])
941 .await;
942
943 let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
944 let item = self.store().tree().find(&key).await?;
945 let size = match item {
946 Some(item) if item.key == key => match item.value {
947 ObjectValue::Attribute { size, .. } => size,
948 _ => bail!(FxfsError::Inconsistent),
949 },
950 _ => return Ok(0),
951 };
952 if offset >= size {
953 return Ok(0);
954 }
955 let length = min(buf.len() as u64, size - offset) as usize;
956 buf = buf.subslice_mut(0..length);
957 self.read_unchecked(attribute_id, offset, buf, &guard).await?;
958 Ok(length)
959 }
960
961 pub async fn read_unchecked(
970 &self,
971 attribute_id: AttributeId,
972 mut offset: u64,
973 mut buf: MutableBufferRef<'_>,
974 _guard: &ReadGuard<'_>,
975 ) -> Result<(), Error> {
976 if buf.len() == 0 {
977 return Ok(());
978 }
979 let end_offset = offset + buf.len() as u64;
980
981 self.store().logical_read_ops.fetch_add(1, Ordering::Relaxed);
982
983 let block_size = self.block_size() as u64;
986 let device_block_size = self.store().device.block_size() as u64;
987 assert_eq!(offset % block_size, 0);
988 assert_eq!(buf.range().start as u64 % device_block_size, 0);
989 let tree = &self.store().tree;
990 let layer_set = tree.layer_set();
991 let mut merger = layer_set.merger();
992 let mut iter = merger
993 .query(Query::LimitedRange(&ObjectKey::extent(
994 self.object_id(),
995 attribute_id,
996 offset..end_offset,
997 )))
998 .await?;
999 let end_align = ((offset + buf.len() as u64) % block_size) as usize;
1000 let trace = self.trace();
1001 let reads = FuturesUnordered::new();
1002 while let Some(ItemRef {
1003 key:
1004 ObjectKey {
1005 object_id,
1006 data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
1007 },
1008 value: ObjectValue::Extent(extent_value),
1009 ..
1010 }) = iter.get()
1011 {
1012 if *object_id != self.object_id() || *attr_id != attribute_id {
1013 break;
1014 }
1015 ensure!(
1016 extent_key.is_valid() && extent_key.is_aligned(block_size),
1017 FxfsError::Inconsistent
1018 );
1019 if extent_key.start > offset {
1020 let to_zero = min(extent_key.start - offset, buf.len() as u64) as usize;
1022 buf.as_mut_slice()[..to_zero].fill(0);
1023 buf = buf.subslice_mut(to_zero..);
1024 if buf.is_empty() {
1025 break;
1026 }
1027 offset += to_zero as u64;
1028 }
1029
1030 if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1031 let mut device_offset = device_offset + (offset - extent_key.start);
1032 let key_id = *key_id;
1033
1034 let to_copy = min(buf.len() - end_align, (extent_key.end - offset) as usize);
1035 if to_copy > 0 {
1036 if trace {
1037 info!(
1038 store_id = self.store().store_object_id(),
1039 oid = self.object_id(),
1040 device_range:? = (device_offset..device_offset + to_copy as u64),
1041 offset,
1042 range:? = **extent_key,
1043 block_size;
1044 "R",
1045 );
1046 }
1047 let (mut head, tail) = buf.split_at_mut(to_copy);
1048 let maybe_bitmap = match mode {
1049 ExtentMode::OverwritePartial(bitmap) => {
1050 let mut read_bitmap = bitmap
1051 .clone()
1052 .split_off(((offset - extent_key.start) / block_size) as usize);
1053 read_bitmap.truncate(to_copy / block_size as usize);
1054 Some(read_bitmap)
1055 }
1056 _ => None,
1057 };
1058 reads.push(async move {
1059 self.read_and_decrypt(device_offset, offset, head.reborrow(), key_id)
1060 .await?;
1061 if let Some(bitmap) = maybe_bitmap {
1062 apply_bitmap_zeroing(self.block_size() as usize, &bitmap, head);
1063 }
1064 Ok::<(), Error>(())
1065 });
1066 buf = tail;
1067 if buf.is_empty() {
1068 break;
1069 }
1070 offset += to_copy as u64;
1071 device_offset += to_copy as u64;
1072 }
1073
1074 if offset < extent_key.end && end_align > 0 {
1077 if let ExtentMode::OverwritePartial(bitmap) = mode {
1078 let bitmap_offset = (offset - extent_key.start) / block_size;
1079 if !bitmap.get(bitmap_offset as usize).ok_or(FxfsError::Inconsistent)? {
1080 break;
1082 }
1083 }
1084 let mut align_buf =
1085 self.store().device.allocate_buffer(block_size as usize).await;
1086 if trace {
1087 info!(
1088 store_id = self.store().store_object_id(),
1089 oid = self.object_id(),
1090 device_range:? = (device_offset..device_offset + align_buf.len() as u64);
1091 "RT",
1092 );
1093 }
1094 self.read_and_decrypt(device_offset, offset, align_buf.as_mut(), key_id)
1095 .await?;
1096 buf.as_mut_slice().copy_from_slice(&align_buf.as_slice()[..end_align]);
1097 buf = buf.subslice_mut(0..0);
1098 break;
1099 }
1100 } else if extent_key.end >= offset + buf.len() as u64 {
1101 break;
1103 }
1104
1105 iter.advance().await?;
1106 }
1107 reads.try_collect::<()>().await?;
1108 buf.as_mut_slice().fill(0);
1109 Ok(())
1110 }
1111
1112 pub async fn read_attr(&self, attribute_id: AttributeId) -> Result<Option<Box<[u8]>>, Error> {
1114 let store = self.store();
1115 let tree = &store.tree;
1116 let layer_set = tree.layer_set();
1117 let mut merger = layer_set.merger();
1118 let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1119 let iter = merger.query(Query::FullRange(&key)).await?;
1120 match iter.get() {
1121 Some(item) if item.key == &key => match item.value {
1122 ObjectValue::Attribute { .. } => Ok(Some(self.read_attr_from_iter(iter).await?)),
1123 ObjectValue::None => Ok(None),
1125 _ => Err(FxfsError::Inconsistent.into()),
1126 },
1127 _ => Ok(None),
1128 }
1129 }
1130
1131 pub async fn read_attr_from_iter(
1134 &self,
1135 mut iter: MergerIterator<'_, '_, ObjectKey, ObjectValue>,
1136 ) -> Result<Box<[u8]>, Error> {
1137 let (mut buffer, size, attribute_id) = match iter.get() {
1138 Some(ItemRef {
1139 key:
1140 ObjectKey {
1141 object_id,
1142 data: ObjectKeyData::Attribute(attribute_id, AttributeKey::Attribute),
1143 },
1144 value: ObjectValue::Attribute { size, .. },
1145 ..
1146 }) if *object_id == self.object_id => {
1147 (
1149 self.store()
1150 .device
1151 .allocate_buffer(round_up(*size, self.block_size()).unwrap() as usize)
1152 .await,
1153 *size as usize,
1154 *attribute_id,
1155 )
1156 }
1157 _ => bail!(FxfsError::InvalidArgs),
1158 };
1159
1160 self.store().logical_read_ops.fetch_add(1, Ordering::Relaxed);
1161 let mut last_offset = 0;
1162 loop {
1163 iter.advance().await?;
1164 match iter.get() {
1165 Some(ItemRef {
1166 key:
1167 ObjectKey {
1168 object_id,
1169 data:
1170 ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
1171 },
1172 value: ObjectValue::Extent(extent_value),
1173 ..
1174 }) if *object_id == self.object_id() && *attr_id == attribute_id => {
1175 if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1176 let offset = extent_key.start as usize;
1177 buffer.as_mut_slice()[last_offset..offset].fill(0);
1178 let end = std::cmp::min(extent_key.end as usize, buffer.len());
1179 let maybe_bitmap = match mode {
1180 ExtentMode::OverwritePartial(bitmap) => {
1181 let mut read_bitmap = bitmap.clone();
1184 read_bitmap.truncate(
1185 (end - extent_key.start as usize) / self.block_size() as usize,
1186 );
1187 Some(read_bitmap)
1188 }
1189 _ => None,
1190 };
1191 self.read_and_decrypt(
1192 *device_offset,
1193 extent_key.start,
1194 buffer.subslice_mut(offset..end as usize),
1195 *key_id,
1196 )
1197 .await?;
1198 if let Some(bitmap) = maybe_bitmap {
1199 apply_bitmap_zeroing(
1200 self.block_size() as usize,
1201 &bitmap,
1202 buffer.subslice_mut(offset..end as usize),
1203 );
1204 }
1205 last_offset = end;
1206 if last_offset >= size {
1207 break;
1208 }
1209 }
1210 }
1211 _ => break,
1212 }
1213 }
1214 buffer.as_mut_slice()[std::cmp::min(last_offset, size)..].fill(0);
1215 Ok(buffer.as_slice()[..size].into())
1216 }
1217
1218 pub async fn write_at(
1226 &self,
1227 attribute_id: AttributeId,
1228 offset: u64,
1229 buf: MutableBufferRef<'_>,
1230 key_id: Option<u64>,
1231 mut device_offset: u64,
1232 ) -> Result<MaybeChecksums, Error> {
1233 let mut transfer_buf;
1234 let block_size = self.block_size();
1235 let (range, mut transfer_buf_ref) =
1236 if offset % block_size == 0 && buf.len() as u64 % block_size == 0 {
1237 (offset..offset + buf.len() as u64, buf)
1238 } else {
1239 let (range, buf) = self.align_buffer(attribute_id, offset, buf.as_ref()).await?;
1240 transfer_buf = buf;
1241 device_offset -= offset - range.start;
1242 (range, transfer_buf.as_mut())
1243 };
1244
1245 let mut crypt_ctx = None;
1246 if let (_, Some(key)) = self.get_key(key_id).await? {
1247 if let Some(ctx) = key.crypt_ctx(self.object_id, range.start) {
1248 crypt_ctx = Some(ctx);
1249 } else {
1250 key.encrypt(
1251 self.object_id,
1252 device_offset,
1253 range.start,
1254 transfer_buf_ref.as_mut_slice(),
1255 )?;
1256 }
1257 }
1258 self.write_aligned(transfer_buf_ref.as_ref(), device_offset, crypt_ctx).await
1259 }
1260
1261 #[cfg(feature = "migration")]
1266 pub async fn raw_multi_write(
1267 &self,
1268 transaction: &mut Transaction<'_>,
1269 attribute_id: AttributeId,
1270 key_id: Option<u64>,
1271 ranges: &[Range<u64>],
1272 buf: MutableBufferRef<'_>,
1273 ) -> Result<(), Error> {
1274 self.multi_write_internal(transaction, attribute_id, key_id, ranges, buf).await?;
1275 Ok(())
1276 }
1277
1278 async fn multi_write_internal(
1284 &self,
1285 transaction: &mut Transaction<'_>,
1286 attribute_id: AttributeId,
1287 key_id: Option<u64>,
1288 ranges: &[Range<u64>],
1289 mut buf: MutableBufferRef<'_>,
1290 ) -> Result<(u64, u64), Error> {
1291 if buf.is_empty() {
1292 return Ok((0, 0));
1293 }
1294 let block_size = self.block_size();
1295 let store = self.store();
1296 let store_id = store.store_object_id();
1297
1298 let (key_id, key) = if key_id == Some(VOLUME_DATA_KEY_ID)
1301 && matches!(self.encryption, Encryption::CachedKeys)
1302 {
1303 (
1304 VOLUME_DATA_KEY_ID,
1305 Some(
1306 self.get_or_create_key(transaction)
1307 .await
1308 .context("get_or_create_key failed")?,
1309 ),
1310 )
1311 } else {
1312 self.get_key(key_id).await?
1313 };
1314 if let Some(key) = &key {
1315 if !key.supports_inline_encryption() {
1316 let mut slice = buf.as_mut_slice();
1317 for r in ranges {
1318 let l = r.end - r.start;
1319 let (head, tail) = slice.split_at_mut(l as usize);
1320 key.encrypt(
1321 self.object_id,
1322 0, r.start,
1324 head,
1325 )?;
1326 slice = tail;
1327 }
1328 }
1329 }
1330
1331 let mut allocated = 0;
1332 let allocator = store.allocator();
1333 let trace = self.trace();
1334 let mut writes = FuturesOrdered::new();
1335
1336 let mut logical_ranges = ranges.iter();
1337 let mut current_range = logical_ranges.next().unwrap().clone();
1338
1339 while !buf.is_empty() {
1340 let mut device_range = allocator
1341 .allocate(transaction, store_id, buf.len() as u64)
1342 .await
1343 .context("allocation failed")?;
1344 if trace {
1345 info!(
1346 store_id,
1347 oid = self.object_id(),
1348 device_range:?,
1349 len = device_range.end - device_range.start;
1350 "A",
1351 );
1352 }
1353 let mut device_range_len = device_range.end - device_range.start;
1354 allocated += device_range_len;
1355 while device_range_len > 0 {
1357 if current_range.end <= current_range.start {
1358 current_range = logical_ranges.next().unwrap().clone();
1359 }
1360 let (crypt_ctx, split) = if let Some(key) = &key {
1361 if key.supports_inline_encryption() {
1362 let split = std::cmp::min(
1363 current_range.end - current_range.start,
1364 device_range_len,
1365 );
1366 let crypt_ctx = key.crypt_ctx(self.object_id, current_range.start);
1367 current_range.start += split;
1368 (crypt_ctx, split)
1369 } else {
1370 (None, device_range_len)
1371 }
1372 } else {
1373 (None, device_range_len)
1374 };
1375
1376 let (head, tail) = buf.split_at_mut(split as usize);
1377 buf = tail;
1378
1379 writes.push_back(async move {
1380 let len = head.len() as u64;
1381 Result::<_, Error>::Ok((
1382 device_range.start,
1383 len,
1384 self.write_aligned(head.as_ref(), device_range.start, crypt_ctx).await?,
1385 ))
1386 });
1387 device_range.start += split;
1388 device_range_len = device_range.end - device_range.start;
1389 }
1390 }
1391
1392 self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1393 let ((mutations, checksums), deallocated) = try_join!(
1394 async {
1395 let mut current_range = 0..0;
1396 let mut mutations = Vec::new();
1397 let mut out_checksums = Vec::new();
1398 let mut ranges = ranges.iter();
1399 while let Some((mut device_offset, mut len, mut checksums)) =
1400 writes.try_next().await?
1401 {
1402 while len > 0 {
1403 if current_range.end <= current_range.start {
1404 current_range = ranges.next().unwrap().clone();
1405 }
1406 let chunk_len = std::cmp::min(len, current_range.end - current_range.start);
1407 let tail = checksums.split_off((chunk_len / block_size) as usize);
1408 if let Some(checksums) = checksums.maybe_as_ref() {
1409 out_checksums.push((
1410 device_offset..device_offset + chunk_len,
1411 checksums.to_owned(),
1412 ));
1413 }
1414 mutations.push(Mutation::merge_object(
1415 ObjectKey::extent(
1416 self.object_id(),
1417 attribute_id,
1418 current_range.start..current_range.start + chunk_len,
1419 ),
1420 ObjectValue::Extent(ExtentValue::new(
1421 device_offset,
1422 checksums.to_mode(),
1423 key_id,
1424 )),
1425 ));
1426 checksums = tail;
1427 device_offset += chunk_len;
1428 len -= chunk_len;
1429 current_range.start += chunk_len;
1430 }
1431 }
1432 Result::<_, Error>::Ok((mutations, out_checksums))
1433 },
1434 async {
1435 let mut deallocated = 0;
1436 for r in ranges {
1437 deallocated +=
1438 self.deallocate_old_extents(transaction, attribute_id, r.clone()).await?;
1439 }
1440 Result::<_, Error>::Ok(deallocated)
1441 }
1442 )?;
1443
1444 for m in mutations {
1445 transaction.add(store_id, m);
1446 }
1447
1448 if !store.filesystem().options().barriers_enabled {
1450 for (r, c) in checksums {
1451 transaction.add_checksum(r, c, true);
1452 }
1453 }
1454 Ok((allocated, deallocated))
1455 }
1456
1457 pub async fn multi_write(
1463 &self,
1464 transaction: &mut Transaction<'_>,
1465 attribute_id: AttributeId,
1466 key_id: Option<u64>,
1467 ranges: &[Range<u64>],
1468 buf: MutableBufferRef<'_>,
1469 ) -> Result<(), Error> {
1470 let (allocated, deallocated) =
1471 self.multi_write_internal(transaction, attribute_id, key_id, ranges, buf).await?;
1472 if allocated == 0 && deallocated == 0 {
1473 return Ok(());
1474 }
1475 self.update_allocated_size(transaction, allocated, deallocated).await
1476 }
1477
1478 pub async fn multi_overwrite<'a>(
1483 &'a self,
1484 transaction: &mut Transaction<'a>,
1485 attr_id: AttributeId,
1486 ranges: &[Range<u64>],
1487 mut buf: MutableBufferRef<'_>,
1488 ) -> Result<(), Error> {
1489 if buf.is_empty() {
1490 return Ok(());
1491 }
1492 let block_size = self.block_size();
1493 let store = self.store();
1494 let tree = store.tree();
1495 let store_id = store.store_object_id();
1496
1497 let (key_id, key) = self.get_key(None).await?;
1498 if let Some(key) = &key {
1499 if !key.supports_inline_encryption() {
1500 let mut slice = buf.as_mut_slice();
1501 for r in ranges {
1502 let l = r.end - r.start;
1503 let (head, tail) = slice.split_at_mut(l as usize);
1504 key.encrypt(
1505 self.object_id,
1506 0, r.start,
1508 head,
1509 )?;
1510 slice = tail;
1511 }
1512 }
1513 }
1514
1515 let mut range_iter = ranges.iter();
1516 let mut target_range = range_iter.next().unwrap().clone();
1518 let mut mutations = Vec::new();
1519 let writes = FuturesUnordered::new();
1520
1521 let layer_set = tree.layer_set();
1522 let mut merger = layer_set.merger();
1523 let mut iter = merger
1524 .query(Query::FullRange(&ObjectKey::attribute(
1525 self.object_id(),
1526 attr_id,
1527 AttributeKey::Extent(Extent::search_key_from_offset(target_range.start)),
1528 )))
1529 .await?;
1530
1531 loop {
1532 match iter.get() {
1533 Some(ItemRef {
1534 key:
1535 ObjectKey {
1536 object_id,
1537 data:
1538 ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent)),
1539 },
1540 value: ObjectValue::Extent(extent_value),
1541 ..
1542 }) if *object_id == self.object_id() && *attribute_id == attr_id => {
1543 if extent.end <= target_range.start {
1547 iter.advance().await?;
1548 continue;
1549 }
1550 let (device_offset, mode) = match extent_value {
1551 ExtentValue::None => {
1552 return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1553 format!(
1554 "multi_overwrite failed: target_range ({}, {}) overlaps with \
1555 deleted extent found at ({}, {})",
1556 target_range.start, target_range.end, extent.start, extent.end,
1557 )
1558 });
1559 }
1560 ExtentValue::Some { device_offset, mode, .. } => (device_offset, mode),
1561 };
1562 if extent.start > target_range.start {
1565 return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1566 format!(
1567 "multi_overwrite failed: target range ({}, {}) starts before first \
1568 extent found at ({}, {})",
1569 target_range.start, target_range.end, extent.start, extent.end,
1570 )
1571 });
1572 }
1573 let mut bitmap = match mode {
1574 ExtentMode::Raw | ExtentMode::Cow(_) => {
1575 return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1576 format!(
1577 "multi_overwrite failed: \
1578 extent from ({}, {}) which overlaps target range ({}, {}) had the \
1579 wrong extent mode",
1580 extent.start, extent.end, target_range.start, target_range.end,
1581 )
1582 });
1583 }
1584 ExtentMode::OverwritePartial(bitmap) => {
1585 OverwriteBitmaps::new(bitmap.clone())
1586 }
1587 ExtentMode::Overwrite => OverwriteBitmaps::None,
1588 };
1589 loop {
1590 let offset_within_extent = target_range.start - extent.start;
1591 let bitmap_offset = offset_within_extent / block_size;
1592 let write_device_offset = *device_offset + offset_within_extent;
1593 let write_end = min(extent.end, target_range.end);
1594 let write_len = write_end - target_range.start;
1595 let write_device_range =
1596 write_device_offset..write_device_offset + write_len;
1597 let (current_buf, remaining_buf) = buf.split_at_mut(write_len as usize);
1598
1599 bitmap.set_offset(bitmap_offset as usize);
1600 let checksum_ranges = ChecksumRangeChunk::group_first_write_ranges(
1601 &mut bitmap,
1602 block_size,
1603 write_device_range,
1604 );
1605
1606 let crypt_ctx = if let Some(key) = &key {
1607 key.crypt_ctx(self.object_id, target_range.start)
1608 } else {
1609 None
1610 };
1611
1612 writes.push(async move {
1613 let maybe_checksums = self
1614 .write_aligned(current_buf.as_ref(), write_device_offset, crypt_ctx)
1615 .await?;
1616 Ok::<_, Error>(match maybe_checksums {
1617 MaybeChecksums::None => Vec::new(),
1618 MaybeChecksums::Fletcher(checksums) => checksum_ranges
1619 .into_iter()
1620 .map(
1621 |ChecksumRangeChunk {
1622 checksum_range,
1623 device_range,
1624 is_first_write,
1625 }| {
1626 (
1627 device_range,
1628 checksums[checksum_range].to_vec(),
1629 is_first_write,
1630 )
1631 },
1632 )
1633 .collect(),
1634 })
1635 });
1636 buf = remaining_buf;
1637 target_range.start += write_len;
1638 if target_range.start == target_range.end {
1639 match range_iter.next() {
1640 None => break,
1641 Some(next_range) => target_range = next_range.clone(),
1642 }
1643 }
1644 if extent.end <= target_range.start {
1645 break;
1646 }
1647 }
1648 if let Some((mut bitmap, write_bitmap)) = bitmap.take_bitmaps() {
1649 if bitmap.or(&write_bitmap) {
1650 let mode = if bitmap.all() {
1651 ExtentMode::Overwrite
1652 } else {
1653 ExtentMode::OverwritePartial(bitmap)
1654 };
1655 mutations.push(Mutation::merge_object(
1656 ObjectKey::extent(self.object_id(), attr_id, extent.clone().into()),
1657 ObjectValue::Extent(ExtentValue::new(*device_offset, mode, key_id)),
1658 ))
1659 }
1660 }
1661 if target_range.start == target_range.end {
1662 break;
1663 }
1664 iter.advance().await?;
1665 }
1666 _ => bail!(anyhow!(FxfsError::Internal).context(
1670 "found a non-extent object record while there were still ranges to process"
1671 )),
1672 }
1673 }
1674
1675 let checksums = writes.try_collect::<Vec<_>>().await?;
1676 if !store.filesystem().options().barriers_enabled {
1678 for (r, c, first_write) in checksums.into_iter().flatten() {
1679 transaction.add_checksum(r, c, first_write);
1680 }
1681 }
1682
1683 for m in mutations {
1684 transaction.add(store_id, m);
1685 }
1686
1687 Ok(())
1688 }
1689
1690 #[trace]
1697 pub async fn write_new_attr_in_batches<'a>(
1698 &'a self,
1699 transaction: &mut Transaction<'a>,
1700 attribute_id: AttributeId,
1701 data: &[u8],
1702 batch_size: usize,
1703 ) -> Result<(), Error> {
1704 transaction.add(
1705 self.store().store_object_id,
1706 Mutation::replace_or_insert_object(
1707 ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1708 ObjectValue::attribute(data.len() as u64, false),
1709 ),
1710 );
1711 let chunks = data.chunks(batch_size);
1712 let num_chunks = chunks.len();
1713 if num_chunks > 1 {
1714 transaction.add(
1715 self.store().store_object_id,
1716 Mutation::replace_or_insert_object(
1717 ObjectKey::graveyard_attribute_entry(
1718 self.store().graveyard_directory_object_id(),
1719 self.object_id(),
1720 attribute_id,
1721 ),
1722 ObjectValue::Some,
1723 ),
1724 );
1725 }
1726 let mut start_offset = 0;
1727 for (i, chunk) in chunks.enumerate() {
1728 let rounded_len = round_up(chunk.len() as u64, self.block_size()).unwrap();
1729 let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1730 let slice = buffer.as_mut_slice();
1731 slice[..chunk.len()].copy_from_slice(chunk);
1732 slice[chunk.len()..].fill(0);
1733 self.multi_write(
1734 transaction,
1735 attribute_id,
1736 Some(VOLUME_DATA_KEY_ID),
1737 &[start_offset..start_offset + rounded_len],
1738 buffer.as_mut(),
1739 )
1740 .await?;
1741 start_offset += rounded_len;
1742 if i < num_chunks - 1 {
1744 transaction.commit_and_continue().await?;
1745 }
1746 }
1747 Ok(())
1748 }
1749
1750 pub async fn write_attr(
1757 &self,
1758 transaction: &mut Transaction<'_>,
1759 attribute_id: AttributeId,
1760 data: &[u8],
1761 ) -> Result<NeedsTrim, Error> {
1762 let rounded_len = round_up(data.len() as u64, self.block_size()).unwrap();
1763 let store = self.store();
1764 let tree = store.tree();
1765 let should_trim = if let Some(item) = tree
1766 .find(&ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute))
1767 .await?
1768 {
1769 match item.value {
1770 ObjectValue::Attribute { size: _, has_overwrite_extents: true } => {
1771 bail!(
1772 anyhow!(FxfsError::Inconsistent)
1773 .context("write_attr on an attribute with overwrite extents")
1774 )
1775 }
1776 ObjectValue::Attribute { size, .. } => (data.len() as u64) < size,
1777 _ => bail!(FxfsError::Inconsistent),
1778 }
1779 } else {
1780 false
1781 };
1782 let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1783 let slice = buffer.as_mut_slice();
1784 slice[..data.len()].copy_from_slice(data);
1785 slice[data.len()..].fill(0);
1786 self.multi_write(
1787 transaction,
1788 attribute_id,
1789 Some(VOLUME_DATA_KEY_ID),
1790 &[0..rounded_len],
1791 buffer.as_mut(),
1792 )
1793 .await?;
1794 transaction.add(
1795 self.store().store_object_id,
1796 Mutation::replace_or_insert_object(
1797 ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1798 ObjectValue::attribute(data.len() as u64, false),
1799 ),
1800 );
1801 if should_trim {
1802 self.shrink(transaction, attribute_id, data.len() as u64).await
1803 } else {
1804 Ok(NeedsTrim(false))
1805 }
1806 }
1807
1808 pub async fn list_extended_attributes(&self) -> Result<Vec<Vec<u8>>, Error> {
1809 let layer_set = self.store().tree().layer_set();
1810 let mut merger = layer_set.merger();
1811 let mut iter = merger
1813 .query(Query::FullRange(&ObjectKey::extended_attribute(self.object_id(), Vec::new())))
1814 .await?;
1815 let mut out = Vec::new();
1816 while let Some(item) = iter.get() {
1817 if item.value != &ObjectValue::None {
1819 match item.key {
1820 ObjectKey { object_id, data: ObjectKeyData::ExtendedAttribute { name } }
1821 if *object_id == self.object_id() =>
1822 {
1823 out.push(name.clone());
1824 }
1825 _ => break,
1830 }
1831 }
1832 iter.advance().await?;
1833 }
1834 Ok(out)
1835 }
1836
1837 pub async fn get_inline_selinux_context(&self) -> Result<Option<fio::SelinuxContext>, Error> {
1841 const_assert!(fio::MAX_SELINUX_CONTEXT_ATTRIBUTE_LEN as usize <= MAX_INLINE_XATTR_SIZE);
1844 let item = match self
1845 .store()
1846 .tree()
1847 .find(&ObjectKey::extended_attribute(
1848 self.object_id(),
1849 fio::SELINUX_CONTEXT_NAME.into(),
1850 ))
1851 .await?
1852 {
1853 Some(item) => item,
1854 None => return Ok(None),
1855 };
1856 match item.value {
1857 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => {
1858 Ok(Some(fio::SelinuxContext::Data(value)))
1859 }
1860 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(_)) => {
1861 Ok(Some(fio::SelinuxContext::UseExtendedAttributes(fio::EmptyStruct {})))
1862 }
1863 _ => {
1864 bail!(
1865 anyhow!(FxfsError::Inconsistent)
1866 .context("get_inline_extended_attribute: Expected ExtendedAttribute value")
1867 )
1868 }
1869 }
1870 }
1871
1872 pub async fn get_extended_attribute(&self, name: Vec<u8>) -> Result<Vec<u8>, Error> {
1873 let item = self
1874 .store()
1875 .tree()
1876 .find(&ObjectKey::extended_attribute(self.object_id(), name))
1877 .await?
1878 .ok_or(FxfsError::NotFound)?;
1879 match item.value {
1880 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => Ok(value),
1881 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1882 Ok(self.read_attr(id).await?.ok_or(FxfsError::Inconsistent)?.into_vec())
1883 }
1884 _ => {
1885 bail!(
1886 anyhow!(FxfsError::Inconsistent)
1887 .context("get_extended_attribute: Expected ExtendedAttribute value")
1888 )
1889 }
1890 }
1891 }
1892
1893 pub async fn set_extended_attribute(
1894 &self,
1895 name: Vec<u8>,
1896 value: Vec<u8>,
1897 mode: SetExtendedAttributeMode,
1898 ) -> Result<(), Error> {
1899 let store = self.store();
1900 let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
1903 let mut transaction = store.new_transaction(keys, Options::default()).await?;
1904 self.set_extended_attribute_impl(name, value, mode, &mut transaction).await?;
1905 transaction.commit().await?;
1906 Ok(())
1907 }
1908
1909 async fn set_extended_attribute_impl(
1910 &self,
1911 name: Vec<u8>,
1912 value: Vec<u8>,
1913 mode: SetExtendedAttributeMode,
1914 transaction: &mut Transaction<'_>,
1915 ) -> Result<(), Error> {
1916 ensure!(name.len() <= MAX_XATTR_NAME_SIZE, FxfsError::TooBig);
1917 ensure!(value.len() <= MAX_XATTR_VALUE_SIZE, FxfsError::TooBig);
1918 let tree = self.store().tree();
1919 let object_key = ObjectKey::extended_attribute(self.object_id(), name);
1920
1921 let existing_attribute_id = {
1922 let (found, existing_attribute_id) = match tree.find(&object_key).await? {
1923 None => (false, None),
1924 Some(Item { value, .. }) => (
1925 true,
1926 match value {
1927 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
1928 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1929 Some(id)
1930 }
1931 _ => bail!(
1932 anyhow!(FxfsError::Inconsistent)
1933 .context("expected extended attribute value")
1934 ),
1935 },
1936 ),
1937 };
1938 match mode {
1939 SetExtendedAttributeMode::Create if found => {
1940 bail!(FxfsError::AlreadyExists)
1941 }
1942 SetExtendedAttributeMode::Replace if !found => {
1943 bail!(FxfsError::NotFound)
1944 }
1945 _ => (),
1946 }
1947 existing_attribute_id
1948 };
1949
1950 if let Some(attribute_id) = existing_attribute_id {
1951 let _ = self.write_attr(transaction, attribute_id, &value).await?;
1957 } else if value.len() <= MAX_INLINE_XATTR_SIZE {
1958 transaction.add(
1959 self.store().store_object_id(),
1960 Mutation::replace_or_insert_object(
1961 object_key,
1962 ObjectValue::inline_extended_attribute(value),
1963 ),
1964 );
1965 } else {
1966 let mut attribute_id = AttributeId::XATTR_RANGE_START;
1972 let layer_set = tree.layer_set();
1973 let mut merger = layer_set.merger();
1974 let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1975 let mut iter = merger.query(Query::FullRange(&key)).await?;
1976 loop {
1977 match iter.get() {
1978 None => break,
1981 Some(ItemRef {
1982 key: ObjectKey { object_id, data: ObjectKeyData::Attribute(attr_id, _) },
1983 value,
1984 ..
1985 }) if *object_id == self.object_id() => {
1986 if matches!(value, ObjectValue::None) {
1987 break;
1990 }
1991 if attribute_id < *attr_id {
1992 break;
1994 } else if attribute_id == *attr_id {
1995 attribute_id = attribute_id.next();
1997 if attribute_id == AttributeId::XATTR_RANGE_END {
1998 bail!(FxfsError::NoSpace);
1999 }
2000 }
2001 }
2005 _ => break,
2009 }
2010 iter.advance().await?;
2011 }
2012
2013 let _ = self.write_attr(transaction, attribute_id, &value).await?;
2015 transaction.add(
2016 self.store().store_object_id(),
2017 Mutation::replace_or_insert_object(
2018 object_key,
2019 ObjectValue::extended_attribute(attribute_id),
2020 ),
2021 );
2022 }
2023
2024 Ok(())
2025 }
2026
2027 pub async fn remove_extended_attribute(&self, name: Vec<u8>) -> Result<(), Error> {
2028 let store = self.store();
2029 let tree = store.tree();
2030 let object_key = ObjectKey::extended_attribute(self.object_id(), name);
2031
2032 let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
2037 let mut transaction = store.new_transaction(keys, Options::default()).await?;
2038
2039 let attribute_to_delete =
2040 match tree.find(&object_key).await?.ok_or(FxfsError::NotFound)?.value {
2041 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => Some(id),
2042 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
2043 _ => {
2044 bail!(
2045 anyhow!(FxfsError::Inconsistent)
2046 .context("remove_extended_attribute: Expected ExtendedAttribute value")
2047 )
2048 }
2049 };
2050
2051 transaction.add(
2052 store.store_object_id(),
2053 Mutation::replace_or_insert_object(object_key, ObjectValue::None),
2054 );
2055
2056 if let Some(attribute_id) = attribute_to_delete {
2063 let trim_result = store
2064 .trim_some(
2065 &mut transaction,
2066 self.object_id(),
2067 attribute_id,
2068 TrimMode::FromOffset(0),
2069 )
2070 .await?;
2071 assert_matches!(trim_result, TrimResult::Done(_));
2074 transaction.add(
2075 store.store_object_id(),
2076 Mutation::replace_or_insert_object(
2077 ObjectKey::attribute(self.object_id, attribute_id, AttributeKey::Attribute),
2078 ObjectValue::None,
2079 ),
2080 );
2081 }
2082
2083 transaction.commit().await?;
2084 Ok(())
2085 }
2086
2087 pub fn pre_fetch_keys(&self) -> Option<impl Future<Output = ()> + use<S>> {
2090 if let Encryption::CachedKeys = self.encryption {
2091 let owner = self.owner.clone();
2092 let object_id = self.object_id;
2093 Some(async move {
2094 let store = owner.as_ref().as_ref();
2095 if let Some(crypt) = store.crypt() {
2096 let _ = store
2097 .key_manager
2098 .get_keys(
2099 object_id,
2100 crypt.as_ref(),
2101 &mut Some(async || store.get_keys(object_id).await),
2102 false,
2103 false,
2104 )
2105 .await;
2106 }
2107 })
2108 } else {
2109 None
2110 }
2111 }
2112}
2113
2114impl<S: HandleOwner> Drop for StoreObjectHandle<S> {
2115 fn drop(&mut self) {
2116 if self.is_encrypted() {
2117 let _ = self.store().key_manager.remove(self.object_id);
2118 }
2119 }
2120}
2121
2122#[must_use]
2126pub struct NeedsTrim(pub bool);
2127
2128#[cfg(test)]
2129mod tests {
2130 use super::{ChecksumRangeChunk, OverwriteBitmaps};
2131 use crate::errors::FxfsError;
2132 use crate::filesystem::{FxFilesystem, OpenFxFilesystem};
2133 use crate::object_handle::ObjectHandle;
2134 use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
2135 use crate::object_store::transaction::{Mutation, Options, lock_keys};
2136 use crate::object_store::{
2137 AttributeId, AttributeKey, DataObjectHandle, Directory, HandleOptions, LockKey, ObjectKey,
2138 ObjectStore, ObjectValue, SetExtendedAttributeMode, StoreObjectHandle,
2139 };
2140 use bit_vec::BitVec;
2141 use fuchsia_async as fasync;
2142 use futures::join;
2143 use std::sync::Arc;
2144 use storage_device::DeviceHolder;
2145 use storage_device::fake_device::FakeDevice;
2146
2147 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2148 const TEST_OBJECT_NAME: &str = "foo";
2149
2150 fn is_error(actual: anyhow::Error, expected: FxfsError) {
2151 assert_eq!(*actual.root_cause().downcast_ref::<FxfsError>().unwrap(), expected)
2152 }
2153
2154 async fn test_filesystem() -> OpenFxFilesystem {
2155 let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2156 FxFilesystem::new_empty(device).await.expect("new_empty failed")
2157 }
2158
2159 async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
2160 {
2161 let fs = test_filesystem().await;
2162 let store = fs.root_store();
2163
2164 let mut transaction = fs
2165 .root_store()
2166 .new_transaction(
2167 lock_keys![LockKey::object(
2168 store.store_object_id(),
2169 store.root_directory_object_id()
2170 )],
2171 Options::default(),
2172 )
2173 .await
2174 .expect("new_transaction failed");
2175
2176 let object =
2177 ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2178 .await
2179 .expect("create_object failed");
2180
2181 let root_directory =
2182 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2183 root_directory
2184 .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
2185 .await
2186 .expect("add_child_file failed");
2187
2188 transaction.commit().await.expect("commit failed");
2189
2190 (fs, object)
2191 }
2192
2193 #[fuchsia::test(threads = 3)]
2194 async fn extended_attribute_double_remove() {
2195 let (fs, object) = test_filesystem_and_empty_object().await;
2200 let basic = Arc::new(StoreObjectHandle::new(
2201 object.owner().clone(),
2202 object.object_id(),
2203 false,
2204 HandleOptions::default(),
2205 false,
2206 ));
2207 let basic_a = basic.clone();
2208 let basic_b = basic.clone();
2209
2210 basic
2211 .set_extended_attribute(
2212 b"security.selinux".to_vec(),
2213 b"bar".to_vec(),
2214 SetExtendedAttributeMode::Set,
2215 )
2216 .await
2217 .expect("failed to set attribute");
2218
2219 let a_task = fasync::Task::spawn(async move {
2222 basic_a.remove_extended_attribute(b"security.selinux".to_vec()).await
2223 });
2224 let b_task = fasync::Task::spawn(async move {
2225 basic_b.remove_extended_attribute(b"security.selinux".to_vec()).await
2226 });
2227 match join!(a_task, b_task) {
2228 (Ok(()), Ok(())) => panic!("both remove calls succeeded"),
2229 (Err(_), Err(_)) => panic!("both remove calls failed"),
2230
2231 (Ok(()), Err(e)) => is_error(e, FxfsError::NotFound),
2232 (Err(e), Ok(())) => is_error(e, FxfsError::NotFound),
2233 }
2234
2235 fs.close().await.expect("Close failed");
2236 }
2237
2238 #[fuchsia::test(threads = 3)]
2239 async fn extended_attribute_double_create() {
2240 let (fs, object) = test_filesystem_and_empty_object().await;
2245 let basic = Arc::new(StoreObjectHandle::new(
2246 object.owner().clone(),
2247 object.object_id(),
2248 false,
2249 HandleOptions::default(),
2250 false,
2251 ));
2252 let basic_a = basic.clone();
2253 let basic_b = basic.clone();
2254
2255 let a_task = fasync::Task::spawn(async move {
2258 basic_a
2259 .set_extended_attribute(
2260 b"security.selinux".to_vec(),
2261 b"one".to_vec(),
2262 SetExtendedAttributeMode::Create,
2263 )
2264 .await
2265 });
2266 let b_task = fasync::Task::spawn(async move {
2267 basic_b
2268 .set_extended_attribute(
2269 b"security.selinux".to_vec(),
2270 b"two".to_vec(),
2271 SetExtendedAttributeMode::Create,
2272 )
2273 .await
2274 });
2275 match join!(a_task, b_task) {
2276 (Ok(()), Ok(())) => panic!("both set calls succeeded"),
2277 (Err(_), Err(_)) => panic!("both set calls failed"),
2278
2279 (Ok(()), Err(e)) => {
2280 assert_eq!(
2281 basic
2282 .get_extended_attribute(b"security.selinux".to_vec())
2283 .await
2284 .expect("failed to get xattr"),
2285 b"one"
2286 );
2287 is_error(e, FxfsError::AlreadyExists);
2288 }
2289 (Err(e), Ok(())) => {
2290 assert_eq!(
2291 basic
2292 .get_extended_attribute(b"security.selinux".to_vec())
2293 .await
2294 .expect("failed to get xattr"),
2295 b"two"
2296 );
2297 is_error(e, FxfsError::AlreadyExists);
2298 }
2299 }
2300
2301 fs.close().await.expect("Close failed");
2302 }
2303
2304 struct TestAttr {
2305 name: Vec<u8>,
2306 value: Vec<u8>,
2307 }
2308
2309 impl TestAttr {
2310 fn new(name: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> Self {
2311 Self { name: name.as_ref().to_vec(), value: value.as_ref().to_vec() }
2312 }
2313 fn name(&self) -> Vec<u8> {
2314 self.name.clone()
2315 }
2316 fn value(&self) -> Vec<u8> {
2317 self.value.clone()
2318 }
2319 }
2320
2321 #[fuchsia::test]
2322 async fn extended_attributes() {
2323 let (fs, object) = test_filesystem_and_empty_object().await;
2324
2325 let test_attr = TestAttr::new(b"security.selinux", b"foo");
2326
2327 assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2328 is_error(
2329 object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2330 FxfsError::NotFound,
2331 );
2332
2333 object
2334 .set_extended_attribute(
2335 test_attr.name(),
2336 test_attr.value(),
2337 SetExtendedAttributeMode::Set,
2338 )
2339 .await
2340 .unwrap();
2341 assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2342 assert_eq!(
2343 object.get_extended_attribute(test_attr.name()).await.unwrap(),
2344 test_attr.value()
2345 );
2346
2347 object.remove_extended_attribute(test_attr.name()).await.unwrap();
2348 assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2349 is_error(
2350 object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2351 FxfsError::NotFound,
2352 );
2353
2354 object
2356 .set_extended_attribute(
2357 test_attr.name(),
2358 test_attr.value(),
2359 SetExtendedAttributeMode::Set,
2360 )
2361 .await
2362 .unwrap();
2363 assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2364 assert_eq!(
2365 object.get_extended_attribute(test_attr.name()).await.unwrap(),
2366 test_attr.value()
2367 );
2368
2369 object.remove_extended_attribute(test_attr.name()).await.unwrap();
2370 assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2371 is_error(
2372 object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2373 FxfsError::NotFound,
2374 );
2375
2376 fs.close().await.expect("close failed");
2377 }
2378
2379 #[fuchsia::test]
2380 async fn large_extended_attribute() {
2381 let (fs, object) = test_filesystem_and_empty_object().await;
2382
2383 let test_attr = TestAttr::new(b"security.selinux", vec![3u8; 300]);
2384
2385 object
2386 .set_extended_attribute(
2387 test_attr.name(),
2388 test_attr.value(),
2389 SetExtendedAttributeMode::Set,
2390 )
2391 .await
2392 .unwrap();
2393 assert_eq!(
2394 object.get_extended_attribute(test_attr.name()).await.unwrap(),
2395 test_attr.value()
2396 );
2397
2398 assert_eq!(
2401 object
2402 .read_attr(AttributeId::XATTR_RANGE_START)
2403 .await
2404 .expect("read_attr failed")
2405 .expect("read_attr returned none")
2406 .into_vec(),
2407 test_attr.value()
2408 );
2409
2410 object.remove_extended_attribute(test_attr.name()).await.unwrap();
2411 is_error(
2412 object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2413 FxfsError::NotFound,
2414 );
2415
2416 object
2418 .set_extended_attribute(
2419 test_attr.name(),
2420 test_attr.value(),
2421 SetExtendedAttributeMode::Set,
2422 )
2423 .await
2424 .unwrap();
2425 assert_eq!(
2426 object.get_extended_attribute(test_attr.name()).await.unwrap(),
2427 test_attr.value()
2428 );
2429 object.remove_extended_attribute(test_attr.name()).await.unwrap();
2430 is_error(
2431 object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2432 FxfsError::NotFound,
2433 );
2434
2435 fs.close().await.expect("close failed");
2436 }
2437
2438 #[fuchsia::test]
2439 async fn multiple_extended_attributes() {
2440 let (fs, object) = test_filesystem_and_empty_object().await;
2441
2442 let attrs = [
2443 TestAttr::new(b"security.selinux", b"foo"),
2444 TestAttr::new(b"large.attribute", vec![3u8; 300]),
2445 TestAttr::new(b"an.attribute", b"asdf"),
2446 TestAttr::new(b"user.big", vec![5u8; 288]),
2447 TestAttr::new(b"user.tiny", b"smol"),
2448 TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2449 TestAttr::new(b"also big", vec![7u8; 500]),
2450 TestAttr::new(b"all.ones", vec![1u8; 11111]),
2451 ];
2452
2453 for i in 0..attrs.len() {
2454 object
2455 .set_extended_attribute(
2456 attrs[i].name(),
2457 attrs[i].value(),
2458 SetExtendedAttributeMode::Set,
2459 )
2460 .await
2461 .unwrap();
2462 assert_eq!(
2463 object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2464 attrs[i].value()
2465 );
2466 }
2467
2468 for i in 0..attrs.len() {
2469 let mut found_attrs = object.list_extended_attributes().await.unwrap();
2471 let mut expected_attrs: Vec<Vec<u8>> = attrs.iter().skip(i).map(|a| a.name()).collect();
2472 found_attrs.sort();
2473 expected_attrs.sort();
2474 assert_eq!(found_attrs, expected_attrs);
2475 for j in i..attrs.len() {
2476 assert_eq!(
2477 object.get_extended_attribute(attrs[j].name()).await.unwrap(),
2478 attrs[j].value()
2479 );
2480 }
2481
2482 object.remove_extended_attribute(attrs[i].name()).await.expect("failed to remove");
2483 is_error(
2484 object.get_extended_attribute(attrs[i].name()).await.unwrap_err(),
2485 FxfsError::NotFound,
2486 );
2487 }
2488
2489 fs.close().await.expect("close failed");
2490 }
2491
2492 #[fuchsia::test]
2493 async fn multiple_extended_attributes_delete() {
2494 let (fs, object) = test_filesystem_and_empty_object().await;
2495 let store = object.owner().clone();
2496
2497 let attrs = [
2498 TestAttr::new(b"security.selinux", b"foo"),
2499 TestAttr::new(b"large.attribute", vec![3u8; 300]),
2500 TestAttr::new(b"an.attribute", b"asdf"),
2501 TestAttr::new(b"user.big", vec![5u8; 288]),
2502 TestAttr::new(b"user.tiny", b"smol"),
2503 TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2504 TestAttr::new(b"also big", vec![7u8; 500]),
2505 TestAttr::new(b"all.ones", vec![1u8; 11111]),
2506 ];
2507
2508 for i in 0..attrs.len() {
2509 object
2510 .set_extended_attribute(
2511 attrs[i].name(),
2512 attrs[i].value(),
2513 SetExtendedAttributeMode::Set,
2514 )
2515 .await
2516 .unwrap();
2517 assert_eq!(
2518 object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2519 attrs[i].value()
2520 );
2521 }
2522
2523 let root_directory =
2525 Directory::open(object.owner(), object.store().root_directory_object_id())
2526 .await
2527 .expect("open failed");
2528 let mut transaction = fs
2529 .root_store()
2530 .new_transaction(
2531 lock_keys![
2532 LockKey::object(store.store_object_id(), store.root_directory_object_id()),
2533 LockKey::object(store.store_object_id(), object.object_id()),
2534 ],
2535 Options::default(),
2536 )
2537 .await
2538 .expect("new_transaction failed");
2539 crate::object_store::directory::replace_child(
2540 &mut transaction,
2541 None,
2542 (&root_directory, TEST_OBJECT_NAME),
2543 )
2544 .await
2545 .expect("replace_child failed");
2546 transaction.commit().await.unwrap();
2547 store.tombstone_object(object.object_id(), Options::default()).await.unwrap();
2548
2549 crate::fsck::fsck(fs.clone()).await.unwrap();
2550
2551 fs.close().await.expect("close failed");
2552 }
2553
2554 #[fuchsia::test]
2555 async fn extended_attribute_changing_sizes() {
2556 let (fs, object) = test_filesystem_and_empty_object().await;
2557
2558 let test_name = b"security.selinux";
2559 let test_small_attr = TestAttr::new(test_name, b"smol");
2560 let test_large_attr = TestAttr::new(test_name, vec![3u8; 300]);
2561
2562 object
2563 .set_extended_attribute(
2564 test_small_attr.name(),
2565 test_small_attr.value(),
2566 SetExtendedAttributeMode::Set,
2567 )
2568 .await
2569 .unwrap();
2570 assert_eq!(
2571 object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2572 test_small_attr.value()
2573 );
2574
2575 assert!(
2577 object
2578 .read_attr(AttributeId::XATTR_RANGE_START)
2579 .await
2580 .expect("read_attr failed")
2581 .is_none()
2582 );
2583
2584 crate::fsck::fsck(fs.clone()).await.unwrap();
2585
2586 object
2587 .set_extended_attribute(
2588 test_large_attr.name(),
2589 test_large_attr.value(),
2590 SetExtendedAttributeMode::Set,
2591 )
2592 .await
2593 .unwrap();
2594 assert_eq!(
2595 object.get_extended_attribute(test_large_attr.name()).await.unwrap(),
2596 test_large_attr.value()
2597 );
2598
2599 assert_eq!(
2602 object
2603 .read_attr(AttributeId::XATTR_RANGE_START)
2604 .await
2605 .expect("read_attr failed")
2606 .expect("read_attr returned none")
2607 .into_vec(),
2608 test_large_attr.value()
2609 );
2610
2611 crate::fsck::fsck(fs.clone()).await.unwrap();
2612
2613 object
2614 .set_extended_attribute(
2615 test_small_attr.name(),
2616 test_small_attr.value(),
2617 SetExtendedAttributeMode::Set,
2618 )
2619 .await
2620 .unwrap();
2621 assert_eq!(
2622 object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2623 test_small_attr.value()
2624 );
2625
2626 assert_eq!(
2629 object
2630 .read_attr(AttributeId::XATTR_RANGE_START)
2631 .await
2632 .expect("read_attr failed")
2633 .expect("read_attr returned none")
2634 .into_vec(),
2635 test_small_attr.value()
2636 );
2637
2638 crate::fsck::fsck(fs.clone()).await.unwrap();
2639
2640 object.remove_extended_attribute(test_small_attr.name()).await.expect("failed to remove");
2641
2642 crate::fsck::fsck(fs.clone()).await.unwrap();
2643
2644 fs.close().await.expect("close failed");
2645 }
2646
2647 #[fuchsia::test]
2648 async fn extended_attribute_max_size() {
2649 let (fs, object) = test_filesystem_and_empty_object().await;
2650
2651 let test_attr = TestAttr::new(
2652 vec![3u8; super::MAX_XATTR_NAME_SIZE],
2653 vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2654 );
2655
2656 object
2657 .set_extended_attribute(
2658 test_attr.name(),
2659 test_attr.value(),
2660 SetExtendedAttributeMode::Set,
2661 )
2662 .await
2663 .unwrap();
2664 assert_eq!(
2665 object.get_extended_attribute(test_attr.name()).await.unwrap(),
2666 test_attr.value()
2667 );
2668 assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2669 object.remove_extended_attribute(test_attr.name()).await.unwrap();
2670
2671 fs.close().await.expect("close failed");
2672 }
2673
2674 #[fuchsia::test]
2675 async fn extended_attribute_remove_then_create() {
2676 let (fs, object) = test_filesystem_and_empty_object().await;
2677
2678 let test_attr = TestAttr::new(
2679 vec![3u8; super::MAX_XATTR_NAME_SIZE],
2680 vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2681 );
2682
2683 object
2684 .set_extended_attribute(
2685 test_attr.name(),
2686 test_attr.value(),
2687 SetExtendedAttributeMode::Create,
2688 )
2689 .await
2690 .unwrap();
2691 fs.journal().force_compact().await.unwrap();
2692 object.remove_extended_attribute(test_attr.name()).await.unwrap();
2693 object
2694 .set_extended_attribute(
2695 test_attr.name(),
2696 test_attr.value(),
2697 SetExtendedAttributeMode::Create,
2698 )
2699 .await
2700 .unwrap();
2701
2702 assert_eq!(
2703 object.get_extended_attribute(test_attr.name()).await.unwrap(),
2704 test_attr.value()
2705 );
2706
2707 fs.close().await.expect("close failed");
2708 }
2709
2710 #[fuchsia::test]
2711 async fn large_extended_attribute_max_number() {
2712 let (fs, object) = test_filesystem_and_empty_object().await;
2713
2714 let max_xattrs = AttributeId::XATTR_RANGE_END.raw() - AttributeId::XATTR_RANGE_START.raw();
2715 for i in 0..max_xattrs {
2716 let test_attr = TestAttr::new(format!("{}", i).as_bytes(), vec![0x3; 300]);
2717 object
2718 .set_extended_attribute(
2719 test_attr.name(),
2720 test_attr.value(),
2721 SetExtendedAttributeMode::Set,
2722 )
2723 .await
2724 .unwrap_or_else(|_| panic!("failed to set xattr number {}", i));
2725 }
2726
2727 match object
2730 .set_extended_attribute(
2731 b"one.too.many".to_vec(),
2732 vec![0x3; 300],
2733 SetExtendedAttributeMode::Set,
2734 )
2735 .await
2736 {
2737 Ok(()) => panic!("set should not succeed"),
2738 Err(e) => is_error(e, FxfsError::NoSpace),
2739 }
2740
2741 object
2743 .set_extended_attribute(
2744 b"this.is.okay".to_vec(),
2745 b"small value".to_vec(),
2746 SetExtendedAttributeMode::Set,
2747 )
2748 .await
2749 .unwrap();
2750
2751 object
2753 .set_extended_attribute(b"11".to_vec(), vec![0x4; 300], SetExtendedAttributeMode::Set)
2754 .await
2755 .unwrap();
2756 object
2757 .set_extended_attribute(
2758 b"12".to_vec(),
2759 vec![0x1; 300],
2760 SetExtendedAttributeMode::Replace,
2761 )
2762 .await
2763 .unwrap();
2764
2765 object.remove_extended_attribute(b"5".to_vec()).await.unwrap();
2767 object
2768 .set_extended_attribute(
2769 b"new attr".to_vec(),
2770 vec![0x3; 300],
2771 SetExtendedAttributeMode::Set,
2772 )
2773 .await
2774 .unwrap();
2775
2776 fs.close().await.expect("close failed");
2777 }
2778
2779 #[fuchsia::test]
2780 async fn write_attr_trims_beyond_new_end() {
2781 let (fs, object) = test_filesystem_and_empty_object().await;
2786
2787 let block_size = fs.block_size();
2788 let buf_size = block_size * 2;
2789 let attribute_id = AttributeId::TEST_ID;
2790
2791 let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2792 let mut buffer = object.allocate_buffer(buf_size as usize).await;
2793 buffer.as_mut_slice().fill(3);
2794 object
2797 .multi_write(
2798 &mut transaction,
2799 attribute_id,
2800 &[0..block_size, block_size..block_size * 2],
2801 buffer.as_mut(),
2802 )
2803 .await
2804 .unwrap();
2805 transaction.add(
2806 object.store().store_object_id,
2807 Mutation::replace_or_insert_object(
2808 ObjectKey::attribute(object.object_id(), attribute_id, AttributeKey::Attribute),
2809 ObjectValue::attribute(block_size * 2, false),
2810 ),
2811 );
2812 transaction.commit().await.unwrap();
2813
2814 crate::fsck::fsck(fs.clone()).await.unwrap();
2815
2816 let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2817 let needs_trim = (*object)
2818 .write_attr(&mut transaction, attribute_id, &vec![3u8; block_size as usize])
2819 .await
2820 .unwrap();
2821 assert!(!needs_trim.0);
2822 transaction.commit().await.unwrap();
2823
2824 crate::fsck::fsck(fs.clone()).await.unwrap();
2825
2826 fs.close().await.expect("close failed");
2827 }
2828
2829 #[fuchsia::test]
2830 async fn write_new_attr_in_batches_multiple_txns() {
2831 let (fs, object) = test_filesystem_and_empty_object().await;
2832 let merkle_tree = vec![1; 3 * WRITE_ATTR_BATCH_SIZE];
2833 let mut transaction = (*object).new_transaction(AttributeId::TEST_ID).await.unwrap();
2834 object
2835 .write_new_attr_in_batches(
2836 &mut transaction,
2837 AttributeId::TEST_ID,
2838 &merkle_tree,
2839 WRITE_ATTR_BATCH_SIZE,
2840 )
2841 .await
2842 .expect("failed to write merkle attribute");
2843
2844 transaction.add(
2845 object.store().store_object_id,
2846 Mutation::replace_or_insert_object(
2847 ObjectKey::graveyard_attribute_entry(
2848 object.store().graveyard_directory_object_id(),
2849 object.object_id(),
2850 AttributeId::TEST_ID,
2851 ),
2852 ObjectValue::None,
2853 ),
2854 );
2855 transaction.commit().await.unwrap();
2856 assert_eq!(
2857 object.read_attr(AttributeId::TEST_ID).await.expect("read_attr failed"),
2858 Some(merkle_tree.into())
2859 );
2860
2861 fs.close().await.expect("close failed");
2862 }
2863
2864 #[cfg(target_os = "fuchsia")]
2866 #[fuchsia::test(allow_stalls = false)]
2867 async fn test_watchdog() {
2868 use super::Watchdog;
2869 use fuchsia_async::{MonotonicDuration, MonotonicInstant, TestExecutor};
2870 use std::sync::mpsc::channel;
2871
2872 TestExecutor::advance_to(make_time(0)).await;
2873 let (sender, receiver) = channel();
2874
2875 fn make_time(time_secs: i64) -> MonotonicInstant {
2876 MonotonicInstant::from_nanos(0) + MonotonicDuration::from_seconds(time_secs)
2877 }
2878
2879 {
2880 let _watchdog = Watchdog::new(10, move |count| {
2881 sender.send(count).expect("Sending value");
2882 });
2883
2884 TestExecutor::advance_to(make_time(5)).await;
2886 receiver.try_recv().expect_err("Should not have message");
2887
2888 TestExecutor::advance_to(make_time(10)).await;
2890 assert_eq!(1, receiver.recv().expect("Receiving"));
2891
2892 TestExecutor::advance_to(make_time(15)).await;
2894 receiver.try_recv().expect_err("Should not have message");
2895
2896 TestExecutor::advance_to(make_time(30)).await;
2898 assert_eq!(2, receiver.recv().expect("Receiving"));
2899 assert_eq!(3, receiver.recv().expect("Receiving"));
2900 }
2901
2902 TestExecutor::advance_to(make_time(100)).await;
2904 receiver.recv().expect_err("Watchdog should be gone");
2905 }
2906
2907 #[fuchsia::test]
2908 fn test_checksum_range_chunk() {
2909 let block_size = 4096;
2910
2911 assert_eq!(
2913 ChecksumRangeChunk::group_first_write_ranges(
2914 &mut OverwriteBitmaps::None,
2915 block_size,
2916 block_size * 2..block_size * 5,
2917 ),
2918 vec![ChecksumRangeChunk {
2919 checksum_range: 0..3,
2920 device_range: block_size * 2..block_size * 5,
2921 is_first_write: false,
2922 }],
2923 );
2924
2925 let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2926 assert_eq!(
2927 ChecksumRangeChunk::group_first_write_ranges(
2928 &mut bitmaps,
2929 block_size,
2930 block_size * 2..block_size * 5,
2931 ),
2932 vec![ChecksumRangeChunk {
2933 checksum_range: 0..3,
2934 device_range: block_size * 2..block_size * 5,
2935 is_first_write: false,
2936 }],
2937 );
2938 assert_eq!(
2939 bitmaps.take_bitmaps(),
2940 Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b11100000])))
2941 );
2942
2943 let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2944 bitmaps.set_offset(2);
2945 assert_eq!(
2946 ChecksumRangeChunk::group_first_write_ranges(
2947 &mut bitmaps,
2948 block_size,
2949 block_size * 2..block_size * 5,
2950 ),
2951 vec![
2952 ChecksumRangeChunk {
2953 checksum_range: 0..2,
2954 device_range: block_size * 2..block_size * 4,
2955 is_first_write: false,
2956 },
2957 ChecksumRangeChunk {
2958 checksum_range: 2..3,
2959 device_range: block_size * 4..block_size * 5,
2960 is_first_write: true,
2961 },
2962 ],
2963 );
2964 assert_eq!(
2965 bitmaps.take_bitmaps(),
2966 Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00111000])))
2967 );
2968
2969 let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2970 bitmaps.set_offset(4);
2971 assert_eq!(
2972 ChecksumRangeChunk::group_first_write_ranges(
2973 &mut bitmaps,
2974 block_size,
2975 block_size * 2..block_size * 5,
2976 ),
2977 vec![ChecksumRangeChunk {
2978 checksum_range: 0..3,
2979 device_range: block_size * 2..block_size * 5,
2980 is_first_write: true,
2981 }],
2982 );
2983 assert_eq!(
2984 bitmaps.take_bitmaps(),
2985 Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00001110])))
2986 );
2987
2988 let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b01010101]));
2989 assert_eq!(
2990 ChecksumRangeChunk::group_first_write_ranges(
2991 &mut bitmaps,
2992 block_size,
2993 block_size * 2..block_size * 10,
2994 ),
2995 vec![
2996 ChecksumRangeChunk {
2997 checksum_range: 0..1,
2998 device_range: block_size * 2..block_size * 3,
2999 is_first_write: true,
3000 },
3001 ChecksumRangeChunk {
3002 checksum_range: 1..2,
3003 device_range: block_size * 3..block_size * 4,
3004 is_first_write: false,
3005 },
3006 ChecksumRangeChunk {
3007 checksum_range: 2..3,
3008 device_range: block_size * 4..block_size * 5,
3009 is_first_write: true,
3010 },
3011 ChecksumRangeChunk {
3012 checksum_range: 3..4,
3013 device_range: block_size * 5..block_size * 6,
3014 is_first_write: false,
3015 },
3016 ChecksumRangeChunk {
3017 checksum_range: 4..5,
3018 device_range: block_size * 6..block_size * 7,
3019 is_first_write: true,
3020 },
3021 ChecksumRangeChunk {
3022 checksum_range: 5..6,
3023 device_range: block_size * 7..block_size * 8,
3024 is_first_write: false,
3025 },
3026 ChecksumRangeChunk {
3027 checksum_range: 6..7,
3028 device_range: block_size * 8..block_size * 9,
3029 is_first_write: true,
3030 },
3031 ChecksumRangeChunk {
3032 checksum_range: 7..8,
3033 device_range: block_size * 9..block_size * 10,
3034 is_first_write: false,
3035 },
3036 ],
3037 );
3038 assert_eq!(
3039 bitmaps.take_bitmaps(),
3040 Some((BitVec::from_bytes(&[0b01010101]), BitVec::from_bytes(&[0b11111111])))
3041 );
3042 }
3043}