1use crate::checksum::{Checksum, Checksums, fletcher64};
6use crate::errors::FxfsError;
7use crate::log::*;
8use crate::lsm_tree::Query;
9use crate::lsm_tree::merge::MergerIterator;
10use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
11use crate::object_handle::ObjectHandle;
12use crate::object_store::extent_record::{ExtentMode, ExtentValue};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::object_record::{
15 AttributeKey, ExtendedAttributeValue, ObjectAttributes, ObjectItem, ObjectKey, ObjectKeyData,
16 ObjectValue, Timestamp,
17};
18use crate::object_store::transaction::{
19 AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Options, ReadGuard,
20 Transaction, lock_keys,
21};
22use crate::object_store::{
23 AttributeId, Extent, HandleOptions, HandleOwner, ObjectStore, TrimMode, TrimResult,
24 VOLUME_DATA_KEY_ID,
25};
26use crate::range::RangeExt;
27use crate::round::{round_down, round_up};
28use anyhow::{Context, Error, anyhow, bail, ensure};
29use assert_matches::assert_matches;
30use bit_vec::BitVec;
31use futures::stream::{FuturesOrdered, FuturesUnordered};
32use futures::{TryStreamExt, try_join};
33use fxfs_crypto::{
34 Cipher, CipherHolder, CipherSet, EncryptionKey, FindKeyResult, FxfsCipher, KeyPurpose,
35};
36use fxfs_trace::{TraceFutureExt, trace, trace_future_args};
37use static_assertions::const_assert;
38use std::cmp::min;
39use std::future::Future;
40use std::ops::Range;
41use std::sync::Arc;
42use std::sync::atomic::{self, AtomicBool, Ordering};
43use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
44use storage_device::{InlineCryptoOptions, ReadOptions, WriteOptions};
45
46use fidl_fuchsia_io as fio;
47use fuchsia_async as fasync;
48
49pub const MAX_XATTR_NAME_SIZE: usize = 255;
51pub const MAX_INLINE_XATTR_SIZE: usize = 256;
54pub const MAX_XATTR_VALUE_SIZE: usize = 64000;
58
59fn apply_bitmap_zeroing(
61 block_size: usize,
62 bitmap: &bit_vec::BitVec,
63 mut buffer: MutableBufferRef<'_>,
64) {
65 let buf = buffer.as_mut_slice();
66 debug_assert_eq!(bitmap.len() * block_size, buf.len());
67 for (i, block) in bitmap.iter().enumerate() {
68 if !block {
69 let start = i * block_size;
70 buf[start..start + block_size].fill(0);
71 }
72 }
73}
74
75#[derive(Debug, Clone, PartialEq)]
79pub enum MaybeChecksums {
80 None,
81 Fletcher(Vec<Checksum>),
82}
83
84impl MaybeChecksums {
85 pub fn maybe_as_ref(&self) -> Option<&[Checksum]> {
86 match self {
87 Self::None => None,
88 Self::Fletcher(sums) => Some(&sums),
89 }
90 }
91
92 pub fn split_off(&mut self, at: usize) -> Self {
93 match self {
94 Self::None => Self::None,
95 Self::Fletcher(sums) => Self::Fletcher(sums.split_off(at)),
96 }
97 }
98
99 pub fn to_mode(self) -> ExtentMode {
100 match self {
101 Self::None => ExtentMode::Raw,
102 Self::Fletcher(sums) => ExtentMode::Cow(Checksums::fletcher(sums)),
103 }
104 }
105
106 pub fn into_option(self) -> Option<Vec<Checksum>> {
107 match self {
108 Self::None => None,
109 Self::Fletcher(sums) => Some(sums),
110 }
111 }
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118pub enum SetExtendedAttributeMode {
119 Set,
121 Create,
123 Replace,
125}
126
127impl From<fio::SetExtendedAttributeMode> for SetExtendedAttributeMode {
128 fn from(other: fio::SetExtendedAttributeMode) -> SetExtendedAttributeMode {
129 match other {
130 fio::SetExtendedAttributeMode::Set => SetExtendedAttributeMode::Set,
131 fio::SetExtendedAttributeMode::Create => SetExtendedAttributeMode::Create,
132 fio::SetExtendedAttributeMode::Replace => SetExtendedAttributeMode::Replace,
133 }
134 }
135}
136
137enum Encryption {
138 None,
140
141 CachedKeys,
144
145 PermanentKeys,
147}
148
149#[derive(PartialEq, Debug)]
150enum OverwriteBitmaps {
151 None,
152 Some {
153 extent_bitmap: BitVec,
155 write_bitmap: BitVec,
157 bitmap_offset: usize,
160 },
161}
162
163impl OverwriteBitmaps {
164 fn new(extent_bitmap: BitVec) -> Self {
165 OverwriteBitmaps::Some {
166 write_bitmap: BitVec::from_elem(extent_bitmap.len(), false),
167 extent_bitmap,
168 bitmap_offset: 0,
169 }
170 }
171
172 fn is_none(&self) -> bool {
173 *self == OverwriteBitmaps::None
174 }
175
176 fn set_offset(&mut self, new_offset: usize) {
177 match self {
178 OverwriteBitmaps::None => (),
179 OverwriteBitmaps::Some { bitmap_offset, .. } => *bitmap_offset = new_offset,
180 }
181 }
182
183 fn get_from_extent_bitmap(&self, i: usize) -> Option<bool> {
184 match self {
185 OverwriteBitmaps::None => None,
186 OverwriteBitmaps::Some { extent_bitmap, bitmap_offset, .. } => {
187 extent_bitmap.get(*bitmap_offset + i)
188 }
189 }
190 }
191
192 fn set_in_write_bitmap(&mut self, i: usize, x: bool) {
193 match self {
194 OverwriteBitmaps::None => (),
195 OverwriteBitmaps::Some { write_bitmap, bitmap_offset, .. } => {
196 write_bitmap.set(*bitmap_offset + i, x)
197 }
198 }
199 }
200
201 fn take_bitmaps(self) -> Option<(BitVec, BitVec)> {
202 match self {
203 OverwriteBitmaps::None => None,
204 OverwriteBitmaps::Some { extent_bitmap, write_bitmap, .. } => {
205 Some((extent_bitmap, write_bitmap))
206 }
207 }
208 }
209}
210
211#[derive(PartialEq, Debug)]
215struct ChecksumRangeChunk {
216 checksum_range: Range<usize>,
217 device_range: Range<u64>,
218 is_first_write: bool,
219}
220
221impl ChecksumRangeChunk {
222 fn group_first_write_ranges(
223 bitmaps: &mut OverwriteBitmaps,
224 block_size: u64,
225 write_device_range: Range<u64>,
226 ) -> Vec<ChecksumRangeChunk> {
227 let write_block_len = (write_device_range.length().unwrap() / block_size) as usize;
228 if bitmaps.is_none() {
229 vec![ChecksumRangeChunk {
234 checksum_range: 0..write_block_len,
235 device_range: write_device_range,
236 is_first_write: false,
237 }]
238 } else {
239 let mut checksum_ranges = vec![ChecksumRangeChunk {
240 checksum_range: 0..0,
241 device_range: write_device_range.start..write_device_range.start,
242 is_first_write: !bitmaps.get_from_extent_bitmap(0).unwrap(),
243 }];
244 let mut working_range = checksum_ranges.last_mut().unwrap();
245 for i in 0..write_block_len {
246 bitmaps.set_in_write_bitmap(i, true);
247
248 if working_range.is_first_write != bitmaps.get_from_extent_bitmap(i).unwrap() {
251 working_range.checksum_range.end += 1;
254 working_range.device_range.end += block_size;
255 } else {
256 let new_chunk = ChecksumRangeChunk {
258 checksum_range: working_range.checksum_range.end
259 ..working_range.checksum_range.end + 1,
260 device_range: working_range.device_range.end
261 ..working_range.device_range.end + block_size,
262 is_first_write: !working_range.is_first_write,
263 };
264 checksum_ranges.push(new_chunk);
265 working_range = checksum_ranges.last_mut().unwrap();
266 }
267 }
268 checksum_ranges
269 }
270 }
271}
272
273pub struct StoreObjectHandle<S: HandleOwner> {
286 owner: Arc<S>,
287 object_id: u64,
288 options: HandleOptions,
289 trace: AtomicBool,
290 encryption: Encryption,
291}
292
293impl<S: HandleOwner> ObjectHandle for StoreObjectHandle<S> {
294 fn set_trace(&self, v: bool) {
295 info!(store_id = self.store().store_object_id, oid = self.object_id(), trace = v; "trace");
296 self.trace.store(v, atomic::Ordering::Relaxed);
297 }
298
299 fn object_id(&self) -> u64 {
300 return self.object_id;
301 }
302
303 fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
304 self.store().device.allocate_buffer(size)
305 }
306
307 fn block_size(&self) -> u64 {
308 self.store().block_size()
309 }
310}
311
312struct Watchdog {
313 _task: fasync::Task<()>,
314}
315
316impl Watchdog {
317 fn new(increment_seconds: u64, cb: impl Fn(u64) + Send + 'static) -> Self {
318 Self {
319 _task: fasync::Task::spawn(
320 async move {
321 let increment = increment_seconds.try_into().unwrap();
322 let mut fired_counter = 0;
323 let mut next_wake = fasync::MonotonicInstant::now();
324 loop {
325 next_wake += std::time::Duration::from_secs(increment).into();
326 if fasync::MonotonicInstant::now() < next_wake {
330 fasync::Timer::new(next_wake).await;
331 }
332 fired_counter += 1;
333 cb(fired_counter);
334 }
335 }
336 .trace(trace_future_args!("StoreObjectHandle::Watchdog")),
337 ),
338 }
339 }
340}
341
342impl<S: HandleOwner> StoreObjectHandle<S> {
343 pub fn new(
345 owner: Arc<S>,
346 object_id: u64,
347 permanent_keys: bool,
348 options: HandleOptions,
349 trace: bool,
350 ) -> Self {
351 let encryption = if permanent_keys {
352 Encryption::PermanentKeys
353 } else if owner.as_ref().as_ref().is_encrypted() {
354 Encryption::CachedKeys
355 } else {
356 Encryption::None
357 };
358 Self { owner, object_id, encryption, options, trace: AtomicBool::new(trace) }
359 }
360
361 pub fn owner(&self) -> &Arc<S> {
362 &self.owner
363 }
364
365 pub fn store(&self) -> &ObjectStore {
366 self.owner.as_ref().as_ref()
367 }
368
369 pub fn trace(&self) -> bool {
370 self.trace.load(atomic::Ordering::Relaxed)
371 }
372
373 pub fn is_encrypted(&self) -> bool {
374 !matches!(self.encryption, Encryption::None)
375 }
376
377 pub fn default_transaction_options<'b>(&self) -> Options<'b> {
380 Options { skip_journal_checks: self.options.skip_journal_checks, ..Default::default() }
381 }
382
383 pub async fn new_transaction_with_options<'b>(
384 &self,
385 attribute_id: AttributeId,
386 options: Options<'b>,
387 ) -> Result<Transaction<'b>, Error> {
388 Ok(self
389 .store()
390 .filesystem()
391 .new_transaction(
392 lock_keys![
393 LockKey::object_attribute(
394 self.store().store_object_id(),
395 self.object_id(),
396 attribute_id,
397 ),
398 LockKey::object(self.store().store_object_id(), self.object_id()),
399 ],
400 options,
401 )
402 .await?)
403 }
404
405 pub async fn new_transaction<'b>(
406 &self,
407 attribute_id: AttributeId,
408 ) -> Result<Transaction<'b>, Error> {
409 self.new_transaction_with_options(attribute_id, self.default_transaction_options()).await
410 }
411
412 async fn txn_get_object_mutation(
415 &self,
416 transaction: &Transaction<'_>,
417 ) -> Result<ObjectStoreMutation, Error> {
418 self.store().txn_get_object_mutation(transaction, self.object_id()).await
419 }
420
421 async fn deallocate_old_extents(
423 &self,
424 transaction: &mut Transaction<'_>,
425 attribute_id: AttributeId,
426 range: Range<u64>,
427 ) -> Result<u64, Error> {
428 let block_size = self.block_size();
429 assert_eq!(range.start % block_size, 0);
430 assert_eq!(range.end % block_size, 0);
431 if range.start == range.end {
432 return Ok(0);
433 }
434 let tree = &self.store().tree;
435 let layer_set = tree.layer_set();
436 let key = Extent(range);
437 let lower_bound = ObjectKey::attribute(
438 self.object_id(),
439 attribute_id,
440 AttributeKey::Extent(key.search_key()),
441 );
442 let mut merger = layer_set.merger();
443 let mut iter = merger.query(Query::FullRange(&lower_bound)).await?;
444 let allocator = self.store().allocator();
445 let mut deallocated = 0;
446 let trace = self.trace();
447 while let Some(ItemRef {
448 key:
449 ObjectKey {
450 object_id,
451 data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
452 },
453 value: ObjectValue::Extent(value),
454 ..
455 }) = iter.get()
456 {
457 if *object_id != self.object_id() || *attr_id != attribute_id {
458 break;
459 }
460 if let ExtentValue::Some { device_offset, .. } = value {
461 if let Some(overlap) = key.overlap(extent_key) {
462 let range = device_offset + overlap.start - extent_key.start
463 ..device_offset + overlap.end - extent_key.start;
464 ensure!(range.is_aligned(block_size), FxfsError::Inconsistent);
465 if trace {
466 info!(
467 store_id = self.store().store_object_id(),
468 oid = self.object_id(),
469 device_range:? = range,
470 len = range.end - range.start,
471 extent_key:?;
472 "D",
473 );
474 }
475 allocator
476 .deallocate(transaction, self.store().store_object_id(), range)
477 .await?;
478 deallocated += overlap.end - overlap.start;
479 } else {
480 break;
481 }
482 }
483 iter.advance().await?;
484 }
485 Ok(deallocated)
486 }
487
488 async fn write_aligned(
491 &self,
492 buf: BufferRef<'_>,
493 device_offset: u64,
494 crypt_ctx: Option<(u32, u8)>,
495 ) -> Result<MaybeChecksums, Error> {
496 if self.trace() {
497 info!(
498 store_id = self.store().store_object_id(),
499 oid = self.object_id(),
500 device_range:? = (device_offset..device_offset + buf.len() as u64),
501 len = buf.len();
502 "W",
503 );
504 }
505 let store = self.store();
506 store.device_write_ops.fetch_add(1, Ordering::Relaxed);
507 let mut checksums = Vec::new();
508 let _watchdog = Watchdog::new(10, |count| {
509 warn!("Write has been stalled for {} seconds", count * 10);
510 });
511
512 match crypt_ctx {
513 Some((dun, slot)) => {
514 if !store.filesystem().options().barriers_enabled {
515 return Err(anyhow!(FxfsError::InvalidArgs)
516 .context("Barriers must be enabled for inline encrypted writes."));
517 }
518 store
519 .device
520 .write_with_opts(
521 device_offset as u64,
522 buf,
523 WriteOptions {
524 inline_crypto: InlineCryptoOptions::enabled(slot, dun),
525 ..Default::default()
526 },
527 )
528 .await?;
529 Ok(MaybeChecksums::None)
530 }
531 None => {
532 if self.options.skip_checksums {
533 store
534 .device
535 .write_with_opts(device_offset as u64, buf, WriteOptions::default())
536 .await?;
537 Ok(MaybeChecksums::None)
538 } else {
539 try_join!(store.device.write(device_offset, buf), async {
540 let block_size = self.block_size();
541 for chunk in buf.as_slice().chunks_exact(block_size as usize) {
542 checksums.push(fletcher64(chunk, 0));
543 }
544 Ok(())
545 })?;
546 Ok(MaybeChecksums::Fletcher(checksums))
547 }
548 }
549 }
550 }
551
552 pub async fn flush_device(&self) -> Result<(), Error> {
554 self.store().device().flush().await
555 }
556
557 pub async fn update_allocated_size(
558 &self,
559 transaction: &mut Transaction<'_>,
560 allocated: u64,
561 deallocated: u64,
562 ) -> Result<(), Error> {
563 if allocated == deallocated {
564 return Ok(());
565 }
566 let mut mutation = self.txn_get_object_mutation(transaction).await?;
567 if let ObjectValue::Object {
568 attributes: ObjectAttributes { project_id, allocated_size, .. },
569 ..
570 } = &mut mutation.item.value
571 {
572 *allocated_size = allocated_size
574 .checked_add(allocated)
575 .ok_or_else(|| anyhow!(FxfsError::Inconsistent).context("Allocated size overflow"))?
576 .checked_sub(deallocated)
577 .ok_or_else(|| {
578 anyhow!(FxfsError::Inconsistent).context("Allocated size underflow")
579 })?;
580
581 if let Some(project_id) = project_id {
582 let diff = i64::try_from(allocated).unwrap() - i64::try_from(deallocated).unwrap();
585 transaction.add(
586 self.store().store_object_id(),
587 Mutation::merge_object(
588 ObjectKey::project_usage(
589 self.store().root_directory_object_id(),
590 *project_id,
591 ),
592 ObjectValue::BytesAndNodes { bytes: diff, nodes: 0 },
593 ),
594 );
595 }
596 } else {
597 bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
600 }
601 transaction.add(self.store().store_object_id, Mutation::ObjectStore(mutation));
602 Ok(())
603 }
604
605 pub async fn update_attributes<'a>(
606 &self,
607 transaction: &mut Transaction<'a>,
608 node_attributes: Option<&fio::MutableNodeAttributes>,
609 change_time: Option<Timestamp>,
610 ) -> Result<(), Error> {
611 if let Some(&fio::MutableNodeAttributes { selinux_context: Some(ref context), .. }) =
612 node_attributes
613 {
614 if let fio::SelinuxContext::Data(context) = context {
615 self.set_extended_attribute_impl(
616 "security.selinux".into(),
617 context.clone(),
618 SetExtendedAttributeMode::Set,
619 transaction,
620 )
621 .await?;
622 } else {
623 return Err(anyhow!(FxfsError::InvalidArgs)
624 .context("Only set SELinux context with `data` member."));
625 }
626 }
627 self.store()
628 .update_attributes(transaction, self.object_id, node_attributes, change_time)
629 .await
630 }
631
632 pub async fn zero(
634 &self,
635 transaction: &mut Transaction<'_>,
636 attribute_id: AttributeId,
637 range: Range<u64>,
638 ) -> Result<(), Error> {
639 let deallocated =
640 self.deallocate_old_extents(transaction, attribute_id, range.clone()).await?;
641 if deallocated > 0 {
642 self.update_allocated_size(transaction, 0, deallocated).await?;
643 transaction.add(
644 self.store().store_object_id,
645 Mutation::merge_object(
646 ObjectKey::extent(self.object_id(), attribute_id, range),
647 ObjectValue::Extent(ExtentValue::deleted_extent()),
648 ),
649 );
650 }
651 Ok(())
652 }
653
654 pub async fn align_buffer(
657 &self,
658 attribute_id: AttributeId,
659 offset: u64,
660 buf: BufferRef<'_>,
661 ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
662 let block_size = self.block_size();
663 let end = offset + buf.len() as u64;
664 let aligned =
665 round_down(offset, block_size)..round_up(end, block_size).ok_or(FxfsError::TooBig)?;
666
667 let mut aligned_buf =
668 self.store().device.allocate_buffer((aligned.end - aligned.start) as usize).await;
669
670 if aligned.start < offset {
672 let mut head_block = aligned_buf.subslice_mut(..block_size as usize);
673 let read = self.read(attribute_id, aligned.start, head_block.reborrow()).await?;
674 head_block.as_mut_slice()[read..].fill(0);
675 }
676
677 if aligned.end > end {
679 let end_block_offset = aligned.end - block_size;
680 if offset <= end_block_offset {
682 let mut tail_block =
683 aligned_buf.subslice_mut(aligned_buf.len() - block_size as usize..);
684 let read = self.read(attribute_id, end_block_offset, tail_block.reborrow()).await?;
685 tail_block.as_mut_slice()[read..].fill(0);
686 }
687 }
688
689 aligned_buf.as_mut_slice()
690 [(offset - aligned.start) as usize..(end - aligned.start) as usize]
691 .copy_from_slice(buf.as_slice());
692
693 Ok((aligned, aligned_buf))
694 }
695
696 pub async fn shrink(
702 &self,
703 transaction: &mut Transaction<'_>,
704 attribute_id: AttributeId,
705 size: u64,
706 ) -> Result<NeedsTrim, Error> {
707 let store = self.store();
708 let needs_trim = matches!(
709 store
710 .trim_some(transaction, self.object_id(), attribute_id, TrimMode::FromOffset(size))
711 .await?,
712 TrimResult::Incomplete
713 );
714 if needs_trim {
715 let graveyard_id = store.graveyard_directory_object_id();
718 match store
719 .tree
720 .find(&ObjectKey::graveyard_entry(graveyard_id, self.object_id()))
721 .await?
722 {
723 Some(ObjectItem { value: ObjectValue::Some, .. })
724 | Some(ObjectItem { value: ObjectValue::Trim, .. }) => {
725 }
727 _ => {
728 transaction.add(
729 store.store_object_id,
730 Mutation::replace_or_insert_object(
731 ObjectKey::graveyard_entry(graveyard_id, self.object_id()),
732 ObjectValue::Trim,
733 ),
734 );
735 }
736 }
737 }
738 Ok(NeedsTrim(needs_trim))
739 }
740
741 pub async fn read_and_decrypt(
743 &self,
744 device_offset: u64,
745 file_offset: u64,
746 mut buffer: MutableBufferRef<'_>,
747 key_id: u64,
748 ) -> Result<(), Error> {
749 let store = self.store();
750 store.device_read_ops.fetch_add(1, Ordering::Relaxed);
751
752 let _watchdog = Watchdog::new(10, |count| {
753 warn!("Read has been stalled for {} seconds", count * 10);
754 });
755
756 let (_key_id, key) = self.get_key(Some(key_id)).await?;
757 if let Some(key) = key {
758 if let Some((dun, slot)) = key.crypt_ctx(self.object_id, file_offset) {
759 store
760 .device
761 .read_with_opts(
762 device_offset as u64,
763 buffer.reborrow(),
764 ReadOptions { inline_crypto: InlineCryptoOptions::enabled(slot, dun) },
765 )
766 .await?;
767 } else {
768 store.device.read(device_offset, buffer.reborrow()).await?;
769 key.decrypt(self.object_id, device_offset, file_offset, buffer.as_mut_slice())?;
770 }
771 } else {
772 store.device.read(device_offset, buffer.reborrow()).await?;
773 }
774
775 Ok(())
776 }
777
778 pub async fn get_key(
783 &self,
784 key_id: Option<u64>,
785 ) -> Result<(u64, Option<Arc<dyn Cipher>>), Error> {
786 let store = self.store();
787 let result = match self.encryption {
788 Encryption::None => (VOLUME_DATA_KEY_ID, None),
789 Encryption::CachedKeys => {
790 if let Some(key_id) = key_id {
791 (
792 key_id,
793 Some(
794 store
795 .key_manager
796 .get_key(
797 self.object_id,
798 store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
799 async || store.get_keys(self.object_id).await,
800 key_id,
801 )
802 .await?,
803 ),
804 )
805 } else {
806 let (key_id, key) = store
807 .key_manager
808 .get_fscrypt_key_if_present(
809 self.object_id,
810 store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
811 async || store.get_keys(self.object_id).await,
812 )
813 .await?;
814 (key_id, Some(key))
815 }
816 }
817 Encryption::PermanentKeys => {
818 (VOLUME_DATA_KEY_ID, Some(store.key_manager.get(self.object_id).await?.unwrap()))
819 }
820 };
821
822 if let Some(ref key) = result.1 {
824 if key.crypt_ctx(self.object_id, 0).is_some() {
825 if !store.filesystem().options().barriers_enabled {
826 return Err(anyhow!(FxfsError::InvalidArgs)
827 .context("Barriers must be enabled for inline encrypted writes."));
828 }
829 }
830 }
831
832 Ok(result)
833 }
834
835 async fn get_or_create_key(
839 &self,
840 transaction: &mut Transaction<'_>,
841 ) -> Result<Arc<dyn Cipher>, Error> {
842 let store = self.store();
843
844 if let Some(key) = store.key_manager.get(self.object_id).await.context("get failed")? {
846 return Ok(key);
847 }
848
849 let crypt = store.crypt().ok_or_else(|| anyhow!("No crypt!"))?;
850
851 let (mut encryption_keys, mut cipher_set) = if let Some(item) =
853 store.tree.find(&ObjectKey::keys(self.object_id)).await.context("find failed")?
854 {
855 if let ObjectValue::Keys(encryption_keys) = item.value {
856 let cipher_set = store
857 .key_manager
858 .get_keys(
859 self.object_id,
860 crypt.as_ref(),
861 &mut Some(async || Ok(encryption_keys.clone())),
862 false,
863 false,
864 )
865 .await
866 .context("get_keys failed")?;
867 match cipher_set.find_key(VOLUME_DATA_KEY_ID) {
868 FindKeyResult::NotFound => {}
869 FindKeyResult::Unavailable => return Err(FxfsError::NoKey.into()),
870 FindKeyResult::Key(key) => return Ok(key),
871 }
872 (encryption_keys, (*cipher_set).clone())
873 } else {
874 return Err(anyhow!(FxfsError::Inconsistent));
875 }
876 } else {
877 Default::default()
878 };
879
880 let (key, unwrapped_key) = crypt.create_key(self.object_id, KeyPurpose::Data).await?;
882 let cipher: Arc<dyn Cipher> = Arc::new(FxfsCipher::new(&unwrapped_key));
883
884 cipher_set.add_key(VOLUME_DATA_KEY_ID, CipherHolder::Cipher(cipher.clone()));
887 let cipher_set = Arc::new(cipher_set);
888
889 struct UnwrappedKeys {
892 object_id: u64,
893 new_keys: Arc<CipherSet>,
894 }
895
896 impl AssociatedObject for UnwrappedKeys {
897 fn will_apply_mutation(
898 &self,
899 _mutation: &Mutation,
900 object_id: u64,
901 manager: &ObjectManager,
902 ) {
903 manager.store(object_id).unwrap().key_manager.insert(
904 self.object_id,
905 self.new_keys.clone(),
906 false,
907 );
908 }
909 }
910
911 encryption_keys.insert(VOLUME_DATA_KEY_ID, EncryptionKey::Fxfs(key).into());
912
913 transaction.add_with_object(
914 store.store_object_id(),
915 Mutation::replace_or_insert_object(
916 ObjectKey::keys(self.object_id),
917 ObjectValue::keys(encryption_keys),
918 ),
919 AssocObj::Owned(Box::new(UnwrappedKeys {
920 object_id: self.object_id,
921 new_keys: cipher_set,
922 })),
923 );
924
925 Ok(cipher)
926 }
927
928 pub async fn read(
929 &self,
930 attribute_id: AttributeId,
931 offset: u64,
932 mut buf: MutableBufferRef<'_>,
933 ) -> Result<usize, Error> {
934 let fs = self.store().filesystem();
935 let guard = fs
936 .lock_manager()
937 .read_lock(lock_keys![LockKey::object_attribute(
938 self.store().store_object_id(),
939 self.object_id(),
940 attribute_id,
941 )])
942 .await;
943
944 let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
945 let item = self.store().tree().find(&key).await?;
946 let size = match item {
947 Some(item) if item.key == key => match item.value {
948 ObjectValue::Attribute { size, .. } => size,
949 _ => bail!(FxfsError::Inconsistent),
950 },
951 _ => return Ok(0),
952 };
953 if offset >= size {
954 return Ok(0);
955 }
956 let length = min(buf.len() as u64, size - offset) as usize;
957 buf = buf.subslice_mut(0..length);
958 self.read_unchecked(attribute_id, offset, buf, &guard).await?;
959 Ok(length)
960 }
961
962 pub async fn read_unchecked(
971 &self,
972 attribute_id: AttributeId,
973 mut offset: u64,
974 mut buf: MutableBufferRef<'_>,
975 _guard: &ReadGuard<'_>,
976 ) -> Result<(), Error> {
977 if buf.len() == 0 {
978 return Ok(());
979 }
980 let end_offset = offset + buf.len() as u64;
981
982 self.store().logical_read_ops.fetch_add(1, Ordering::Relaxed);
983
984 let block_size = self.block_size() as u64;
987 let device_block_size = self.store().device.block_size() as u64;
988 assert_eq!(offset % block_size, 0);
989 assert_eq!(buf.range().start as u64 % device_block_size, 0);
990 let tree = &self.store().tree;
991 let layer_set = tree.layer_set();
992 let mut merger = layer_set.merger();
993 let mut iter = merger
994 .query(Query::LimitedRange(&ObjectKey::extent(
995 self.object_id(),
996 attribute_id,
997 offset..end_offset,
998 )))
999 .await?;
1000 let end_align = ((offset + buf.len() as u64) % block_size) as usize;
1001 let trace = self.trace();
1002 let reads = FuturesUnordered::new();
1003 while let Some(ItemRef {
1004 key:
1005 ObjectKey {
1006 object_id,
1007 data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
1008 },
1009 value: ObjectValue::Extent(extent_value),
1010 ..
1011 }) = iter.get()
1012 {
1013 if *object_id != self.object_id() || *attr_id != attribute_id {
1014 break;
1015 }
1016 ensure!(
1017 extent_key.is_valid() && extent_key.is_aligned(block_size),
1018 FxfsError::Inconsistent
1019 );
1020 if extent_key.start > offset {
1021 let to_zero = min(extent_key.start - offset, buf.len() as u64) as usize;
1023 buf.as_mut_slice()[..to_zero].fill(0);
1024 buf = buf.subslice_mut(to_zero..);
1025 if buf.is_empty() {
1026 break;
1027 }
1028 offset += to_zero as u64;
1029 }
1030
1031 if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1032 let mut device_offset = device_offset + (offset - extent_key.start);
1033 let key_id = *key_id;
1034
1035 let to_copy = min(buf.len() - end_align, (extent_key.end - offset) as usize);
1036 if to_copy > 0 {
1037 if trace {
1038 info!(
1039 store_id = self.store().store_object_id(),
1040 oid = self.object_id(),
1041 device_range:? = (device_offset..device_offset + to_copy as u64),
1042 offset,
1043 range:? = **extent_key,
1044 block_size;
1045 "R",
1046 );
1047 }
1048 let (mut head, tail) = buf.split_at_mut(to_copy);
1049 let maybe_bitmap = match mode {
1050 ExtentMode::OverwritePartial(bitmap) => {
1051 let mut read_bitmap = bitmap
1052 .clone()
1053 .split_off(((offset - extent_key.start) / block_size) as usize);
1054 read_bitmap.truncate(to_copy / block_size as usize);
1055 Some(read_bitmap)
1056 }
1057 _ => None,
1058 };
1059 reads.push(async move {
1060 self.read_and_decrypt(device_offset, offset, head.reborrow(), key_id)
1061 .await?;
1062 if let Some(bitmap) = maybe_bitmap {
1063 apply_bitmap_zeroing(self.block_size() as usize, &bitmap, head);
1064 }
1065 Ok::<(), Error>(())
1066 });
1067 buf = tail;
1068 if buf.is_empty() {
1069 break;
1070 }
1071 offset += to_copy as u64;
1072 device_offset += to_copy as u64;
1073 }
1074
1075 if offset < extent_key.end && end_align > 0 {
1078 if let ExtentMode::OverwritePartial(bitmap) = mode {
1079 let bitmap_offset = (offset - extent_key.start) / block_size;
1080 if !bitmap.get(bitmap_offset as usize).ok_or(FxfsError::Inconsistent)? {
1081 break;
1083 }
1084 }
1085 let mut align_buf =
1086 self.store().device.allocate_buffer(block_size as usize).await;
1087 if trace {
1088 info!(
1089 store_id = self.store().store_object_id(),
1090 oid = self.object_id(),
1091 device_range:? = (device_offset..device_offset + align_buf.len() as u64);
1092 "RT",
1093 );
1094 }
1095 self.read_and_decrypt(device_offset, offset, align_buf.as_mut(), key_id)
1096 .await?;
1097 buf.as_mut_slice().copy_from_slice(&align_buf.as_slice()[..end_align]);
1098 buf = buf.subslice_mut(0..0);
1099 break;
1100 }
1101 } else if extent_key.end >= offset + buf.len() as u64 {
1102 break;
1104 }
1105
1106 iter.advance().await?;
1107 }
1108 reads.try_collect::<()>().await?;
1109 buf.as_mut_slice().fill(0);
1110 Ok(())
1111 }
1112
1113 pub async fn read_attr(&self, attribute_id: AttributeId) -> Result<Option<Box<[u8]>>, Error> {
1115 let store = self.store();
1116 let tree = &store.tree;
1117 let layer_set = tree.layer_set();
1118 let mut merger = layer_set.merger();
1119 let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1120 let iter = merger.query(Query::FullRange(&key)).await?;
1121 match iter.get() {
1122 Some(item) if item.key == &key => match item.value {
1123 ObjectValue::Attribute { .. } => Ok(Some(self.read_attr_from_iter(iter).await?)),
1124 ObjectValue::None => Ok(None),
1126 _ => Err(FxfsError::Inconsistent.into()),
1127 },
1128 _ => Ok(None),
1129 }
1130 }
1131
1132 pub async fn read_attr_from_iter(
1135 &self,
1136 mut iter: MergerIterator<'_, '_, ObjectKey, ObjectValue>,
1137 ) -> Result<Box<[u8]>, Error> {
1138 let (mut buffer, size, attribute_id) = match iter.get() {
1139 Some(ItemRef {
1140 key:
1141 ObjectKey {
1142 object_id,
1143 data: ObjectKeyData::Attribute(attribute_id, AttributeKey::Attribute),
1144 },
1145 value: ObjectValue::Attribute { size, .. },
1146 ..
1147 }) if *object_id == self.object_id => {
1148 (
1150 self.store()
1151 .device
1152 .allocate_buffer(round_up(*size, self.block_size()).unwrap() as usize)
1153 .await,
1154 *size as usize,
1155 *attribute_id,
1156 )
1157 }
1158 _ => bail!(FxfsError::InvalidArgs),
1159 };
1160
1161 self.store().logical_read_ops.fetch_add(1, Ordering::Relaxed);
1162 let mut last_offset = 0;
1163 loop {
1164 iter.advance().await?;
1165 match iter.get() {
1166 Some(ItemRef {
1167 key:
1168 ObjectKey {
1169 object_id,
1170 data:
1171 ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
1172 },
1173 value: ObjectValue::Extent(extent_value),
1174 ..
1175 }) if *object_id == self.object_id() && *attr_id == attribute_id => {
1176 if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1177 let offset = extent_key.start as usize;
1178 buffer.as_mut_slice()[last_offset..offset].fill(0);
1179 let end = std::cmp::min(extent_key.end as usize, buffer.len());
1180 let maybe_bitmap = match mode {
1181 ExtentMode::OverwritePartial(bitmap) => {
1182 let mut read_bitmap = bitmap.clone();
1185 read_bitmap.truncate(
1186 (end - extent_key.start as usize) / self.block_size() as usize,
1187 );
1188 Some(read_bitmap)
1189 }
1190 _ => None,
1191 };
1192 self.read_and_decrypt(
1193 *device_offset,
1194 extent_key.start,
1195 buffer.subslice_mut(offset..end as usize),
1196 *key_id,
1197 )
1198 .await?;
1199 if let Some(bitmap) = maybe_bitmap {
1200 apply_bitmap_zeroing(
1201 self.block_size() as usize,
1202 &bitmap,
1203 buffer.subslice_mut(offset..end as usize),
1204 );
1205 }
1206 last_offset = end;
1207 if last_offset >= size {
1208 break;
1209 }
1210 }
1211 }
1212 _ => break,
1213 }
1214 }
1215 buffer.as_mut_slice()[std::cmp::min(last_offset, size)..].fill(0);
1216 Ok(buffer.as_slice()[..size].into())
1217 }
1218
1219 pub async fn write_at(
1227 &self,
1228 attribute_id: AttributeId,
1229 offset: u64,
1230 buf: MutableBufferRef<'_>,
1231 key_id: Option<u64>,
1232 mut device_offset: u64,
1233 ) -> Result<MaybeChecksums, Error> {
1234 let mut transfer_buf;
1235 let block_size = self.block_size();
1236 let (range, mut transfer_buf_ref) =
1237 if offset % block_size == 0 && buf.len() as u64 % block_size == 0 {
1238 (offset..offset + buf.len() as u64, buf)
1239 } else {
1240 let (range, buf) = self.align_buffer(attribute_id, offset, buf.as_ref()).await?;
1241 transfer_buf = buf;
1242 device_offset -= offset - range.start;
1243 (range, transfer_buf.as_mut())
1244 };
1245
1246 let mut crypt_ctx = None;
1247 if let (_, Some(key)) = self.get_key(key_id).await? {
1248 if let Some(ctx) = key.crypt_ctx(self.object_id, range.start) {
1249 crypt_ctx = Some(ctx);
1250 } else {
1251 key.encrypt(
1252 self.object_id,
1253 device_offset,
1254 range.start,
1255 transfer_buf_ref.as_mut_slice(),
1256 )?;
1257 }
1258 }
1259 self.write_aligned(transfer_buf_ref.as_ref(), device_offset, crypt_ctx).await
1260 }
1261
1262 #[cfg(feature = "migration")]
1267 pub async fn raw_multi_write(
1268 &self,
1269 transaction: &mut Transaction<'_>,
1270 attribute_id: AttributeId,
1271 key_id: Option<u64>,
1272 ranges: &[Range<u64>],
1273 buf: MutableBufferRef<'_>,
1274 ) -> Result<(), Error> {
1275 self.multi_write_internal(transaction, attribute_id, key_id, ranges, buf).await?;
1276 Ok(())
1277 }
1278
1279 async fn multi_write_internal(
1285 &self,
1286 transaction: &mut Transaction<'_>,
1287 attribute_id: AttributeId,
1288 key_id: Option<u64>,
1289 ranges: &[Range<u64>],
1290 mut buf: MutableBufferRef<'_>,
1291 ) -> Result<(u64, u64), Error> {
1292 if buf.is_empty() {
1293 return Ok((0, 0));
1294 }
1295 let block_size = self.block_size();
1296 let store = self.store();
1297 let store_id = store.store_object_id();
1298
1299 let (key_id, key) = if key_id == Some(VOLUME_DATA_KEY_ID)
1302 && matches!(self.encryption, Encryption::CachedKeys)
1303 {
1304 (
1305 VOLUME_DATA_KEY_ID,
1306 Some(
1307 self.get_or_create_key(transaction)
1308 .await
1309 .context("get_or_create_key failed")?,
1310 ),
1311 )
1312 } else {
1313 self.get_key(key_id).await?
1314 };
1315 if let Some(key) = &key {
1316 if !key.supports_inline_encryption() {
1317 let mut slice = buf.as_mut_slice();
1318 for r in ranges {
1319 let l = r.end - r.start;
1320 let (head, tail) = slice.split_at_mut(l as usize);
1321 key.encrypt(
1322 self.object_id,
1323 0, r.start,
1325 head,
1326 )?;
1327 slice = tail;
1328 }
1329 }
1330 }
1331
1332 let mut allocated = 0;
1333 let allocator = store.allocator();
1334 let trace = self.trace();
1335 let mut writes = FuturesOrdered::new();
1336
1337 let mut logical_ranges = ranges.iter();
1338 let mut current_range = logical_ranges.next().unwrap().clone();
1339
1340 while !buf.is_empty() {
1341 let mut device_range = allocator
1342 .allocate(transaction, store_id, buf.len() as u64)
1343 .await
1344 .context("allocation failed")?;
1345 if trace {
1346 info!(
1347 store_id,
1348 oid = self.object_id(),
1349 device_range:?,
1350 len = device_range.end - device_range.start;
1351 "A",
1352 );
1353 }
1354 let mut device_range_len = device_range.end - device_range.start;
1355 allocated += device_range_len;
1356 while device_range_len > 0 {
1358 if current_range.end <= current_range.start {
1359 current_range = logical_ranges.next().unwrap().clone();
1360 }
1361 let (crypt_ctx, split) = if let Some(key) = &key {
1362 if key.supports_inline_encryption() {
1363 let split = std::cmp::min(
1364 current_range.end - current_range.start,
1365 device_range_len,
1366 );
1367 let crypt_ctx = key.crypt_ctx(self.object_id, current_range.start);
1368 current_range.start += split;
1369 (crypt_ctx, split)
1370 } else {
1371 (None, device_range_len)
1372 }
1373 } else {
1374 (None, device_range_len)
1375 };
1376
1377 let (head, tail) = buf.split_at_mut(split as usize);
1378 buf = tail;
1379
1380 writes.push_back(async move {
1381 let len = head.len() as u64;
1382 Result::<_, Error>::Ok((
1383 device_range.start,
1384 len,
1385 self.write_aligned(head.as_ref(), device_range.start, crypt_ctx).await?,
1386 ))
1387 });
1388 device_range.start += split;
1389 device_range_len = device_range.end - device_range.start;
1390 }
1391 }
1392
1393 self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1394 let ((mutations, checksums), deallocated) = try_join!(
1395 async {
1396 let mut current_range = 0..0;
1397 let mut mutations = Vec::new();
1398 let mut out_checksums = Vec::new();
1399 let mut ranges = ranges.iter();
1400 while let Some((mut device_offset, mut len, mut checksums)) =
1401 writes.try_next().await?
1402 {
1403 while len > 0 {
1404 if current_range.end <= current_range.start {
1405 current_range = ranges.next().unwrap().clone();
1406 }
1407 let chunk_len = std::cmp::min(len, current_range.end - current_range.start);
1408 let tail = checksums.split_off((chunk_len / block_size) as usize);
1409 if let Some(checksums) = checksums.maybe_as_ref() {
1410 out_checksums.push((
1411 device_offset..device_offset + chunk_len,
1412 checksums.to_owned(),
1413 ));
1414 }
1415 mutations.push(Mutation::merge_object(
1416 ObjectKey::extent(
1417 self.object_id(),
1418 attribute_id,
1419 current_range.start..current_range.start + chunk_len,
1420 ),
1421 ObjectValue::Extent(ExtentValue::new(
1422 device_offset,
1423 checksums.to_mode(),
1424 key_id,
1425 )),
1426 ));
1427 checksums = tail;
1428 device_offset += chunk_len;
1429 len -= chunk_len;
1430 current_range.start += chunk_len;
1431 }
1432 }
1433 Result::<_, Error>::Ok((mutations, out_checksums))
1434 },
1435 async {
1436 let mut deallocated = 0;
1437 for r in ranges {
1438 deallocated +=
1439 self.deallocate_old_extents(transaction, attribute_id, r.clone()).await?;
1440 }
1441 Result::<_, Error>::Ok(deallocated)
1442 }
1443 )?;
1444
1445 for m in mutations {
1446 transaction.add(store_id, m);
1447 }
1448
1449 if !store.filesystem().options().barriers_enabled {
1451 for (r, c) in checksums {
1452 transaction.add_checksum(r, c, true);
1453 }
1454 }
1455 Ok((allocated, deallocated))
1456 }
1457
1458 pub async fn multi_write(
1464 &self,
1465 transaction: &mut Transaction<'_>,
1466 attribute_id: AttributeId,
1467 key_id: Option<u64>,
1468 ranges: &[Range<u64>],
1469 buf: MutableBufferRef<'_>,
1470 ) -> Result<(), Error> {
1471 let (allocated, deallocated) =
1472 self.multi_write_internal(transaction, attribute_id, key_id, ranges, buf).await?;
1473 if allocated == 0 && deallocated == 0 {
1474 return Ok(());
1475 }
1476 self.update_allocated_size(transaction, allocated, deallocated).await
1477 }
1478
1479 pub async fn multi_overwrite<'a>(
1484 &'a self,
1485 transaction: &mut Transaction<'a>,
1486 attr_id: AttributeId,
1487 ranges: &[Range<u64>],
1488 mut buf: MutableBufferRef<'_>,
1489 ) -> Result<(), Error> {
1490 if buf.is_empty() {
1491 return Ok(());
1492 }
1493 let block_size = self.block_size();
1494 let store = self.store();
1495 let tree = store.tree();
1496 let store_id = store.store_object_id();
1497
1498 let (key_id, key) = self.get_key(None).await?;
1499 if let Some(key) = &key {
1500 if !key.supports_inline_encryption() {
1501 let mut slice = buf.as_mut_slice();
1502 for r in ranges {
1503 let l = r.end - r.start;
1504 let (head, tail) = slice.split_at_mut(l as usize);
1505 key.encrypt(
1506 self.object_id,
1507 0, r.start,
1509 head,
1510 )?;
1511 slice = tail;
1512 }
1513 }
1514 }
1515
1516 let mut range_iter = ranges.iter();
1517 let mut target_range = range_iter.next().unwrap().clone();
1519 let mut mutations = Vec::new();
1520 let writes = FuturesUnordered::new();
1521
1522 let layer_set = tree.layer_set();
1523 let mut merger = layer_set.merger();
1524 let mut iter = merger
1525 .query(Query::FullRange(&ObjectKey::attribute(
1526 self.object_id(),
1527 attr_id,
1528 AttributeKey::Extent(Extent::search_key_from_offset(target_range.start)),
1529 )))
1530 .await?;
1531
1532 loop {
1533 match iter.get() {
1534 Some(ItemRef {
1535 key:
1536 ObjectKey {
1537 object_id,
1538 data:
1539 ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent)),
1540 },
1541 value: ObjectValue::Extent(extent_value),
1542 ..
1543 }) if *object_id == self.object_id() && *attribute_id == attr_id => {
1544 if extent.end <= target_range.start {
1548 iter.advance().await?;
1549 continue;
1550 }
1551 let (device_offset, mode) = match extent_value {
1552 ExtentValue::None => {
1553 return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1554 format!(
1555 "multi_overwrite failed: target_range ({}, {}) overlaps with \
1556 deleted extent found at ({}, {})",
1557 target_range.start, target_range.end, extent.start, extent.end,
1558 )
1559 });
1560 }
1561 ExtentValue::Some { device_offset, mode, .. } => (device_offset, mode),
1562 };
1563 if extent.start > target_range.start {
1566 return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1567 format!(
1568 "multi_overwrite failed: target range ({}, {}) starts before first \
1569 extent found at ({}, {})",
1570 target_range.start, target_range.end, extent.start, extent.end,
1571 )
1572 });
1573 }
1574 let mut bitmap = match mode {
1575 ExtentMode::Raw | ExtentMode::Cow(_) => {
1576 return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1577 format!(
1578 "multi_overwrite failed: \
1579 extent from ({}, {}) which overlaps target range ({}, {}) had the \
1580 wrong extent mode",
1581 extent.start, extent.end, target_range.start, target_range.end,
1582 )
1583 });
1584 }
1585 ExtentMode::OverwritePartial(bitmap) => {
1586 OverwriteBitmaps::new(bitmap.clone())
1587 }
1588 ExtentMode::Overwrite => OverwriteBitmaps::None,
1589 };
1590 loop {
1591 let offset_within_extent = target_range.start - extent.start;
1592 let bitmap_offset = offset_within_extent / block_size;
1593 let write_device_offset = *device_offset + offset_within_extent;
1594 let write_end = min(extent.end, target_range.end);
1595 let write_len = write_end - target_range.start;
1596 let write_device_range =
1597 write_device_offset..write_device_offset + write_len;
1598 let (current_buf, remaining_buf) = buf.split_at_mut(write_len as usize);
1599
1600 bitmap.set_offset(bitmap_offset as usize);
1601 let checksum_ranges = ChecksumRangeChunk::group_first_write_ranges(
1602 &mut bitmap,
1603 block_size,
1604 write_device_range,
1605 );
1606
1607 let crypt_ctx = if let Some(key) = &key {
1608 key.crypt_ctx(self.object_id, target_range.start)
1609 } else {
1610 None
1611 };
1612
1613 writes.push(async move {
1614 let maybe_checksums = self
1615 .write_aligned(current_buf.as_ref(), write_device_offset, crypt_ctx)
1616 .await?;
1617 Ok::<_, Error>(match maybe_checksums {
1618 MaybeChecksums::None => Vec::new(),
1619 MaybeChecksums::Fletcher(checksums) => checksum_ranges
1620 .into_iter()
1621 .map(
1622 |ChecksumRangeChunk {
1623 checksum_range,
1624 device_range,
1625 is_first_write,
1626 }| {
1627 (
1628 device_range,
1629 checksums[checksum_range].to_vec(),
1630 is_first_write,
1631 )
1632 },
1633 )
1634 .collect(),
1635 })
1636 });
1637 buf = remaining_buf;
1638 target_range.start += write_len;
1639 if target_range.start == target_range.end {
1640 match range_iter.next() {
1641 None => break,
1642 Some(next_range) => target_range = next_range.clone(),
1643 }
1644 }
1645 if extent.end <= target_range.start {
1646 break;
1647 }
1648 }
1649 if let Some((mut bitmap, write_bitmap)) = bitmap.take_bitmaps() {
1650 if bitmap.or(&write_bitmap) {
1651 let mode = if bitmap.all() {
1652 ExtentMode::Overwrite
1653 } else {
1654 ExtentMode::OverwritePartial(bitmap)
1655 };
1656 mutations.push(Mutation::merge_object(
1657 ObjectKey::extent(self.object_id(), attr_id, extent.clone().into()),
1658 ObjectValue::Extent(ExtentValue::new(*device_offset, mode, key_id)),
1659 ))
1660 }
1661 }
1662 if target_range.start == target_range.end {
1663 break;
1664 }
1665 iter.advance().await?;
1666 }
1667 _ => bail!(anyhow!(FxfsError::Internal).context(
1671 "found a non-extent object record while there were still ranges to process"
1672 )),
1673 }
1674 }
1675
1676 let checksums = writes.try_collect::<Vec<_>>().await?;
1677 if !store.filesystem().options().barriers_enabled {
1679 for (r, c, first_write) in checksums.into_iter().flatten() {
1680 transaction.add_checksum(r, c, first_write);
1681 }
1682 }
1683
1684 for m in mutations {
1685 transaction.add(store_id, m);
1686 }
1687
1688 Ok(())
1689 }
1690
1691 #[trace]
1698 pub async fn write_new_attr_in_batches<'a>(
1699 &'a self,
1700 transaction: &mut Transaction<'a>,
1701 attribute_id: AttributeId,
1702 data: &[u8],
1703 batch_size: usize,
1704 ) -> Result<(), Error> {
1705 transaction.add(
1706 self.store().store_object_id,
1707 Mutation::replace_or_insert_object(
1708 ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1709 ObjectValue::attribute(data.len() as u64, false),
1710 ),
1711 );
1712 let chunks = data.chunks(batch_size);
1713 let num_chunks = chunks.len();
1714 if num_chunks > 1 {
1715 transaction.add(
1716 self.store().store_object_id,
1717 Mutation::replace_or_insert_object(
1718 ObjectKey::graveyard_attribute_entry(
1719 self.store().graveyard_directory_object_id(),
1720 self.object_id(),
1721 attribute_id,
1722 ),
1723 ObjectValue::Some,
1724 ),
1725 );
1726 }
1727 let mut start_offset = 0;
1728 for (i, chunk) in chunks.enumerate() {
1729 let rounded_len = round_up(chunk.len() as u64, self.block_size()).unwrap();
1730 let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1731 let slice = buffer.as_mut_slice();
1732 slice[..chunk.len()].copy_from_slice(chunk);
1733 slice[chunk.len()..].fill(0);
1734 self.multi_write(
1735 transaction,
1736 attribute_id,
1737 Some(VOLUME_DATA_KEY_ID),
1738 &[start_offset..start_offset + rounded_len],
1739 buffer.as_mut(),
1740 )
1741 .await?;
1742 start_offset += rounded_len;
1743 if i < num_chunks - 1 {
1745 transaction.commit_and_continue().await?;
1746 }
1747 }
1748 Ok(())
1749 }
1750
1751 pub async fn write_attr(
1758 &self,
1759 transaction: &mut Transaction<'_>,
1760 attribute_id: AttributeId,
1761 data: &[u8],
1762 ) -> Result<NeedsTrim, Error> {
1763 let rounded_len = round_up(data.len() as u64, self.block_size()).unwrap();
1764 let store = self.store();
1765 let tree = store.tree();
1766 let should_trim = if let Some(item) = tree
1767 .find(&ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute))
1768 .await?
1769 {
1770 match item.value {
1771 ObjectValue::Attribute { size: _, has_overwrite_extents: true } => {
1772 bail!(
1773 anyhow!(FxfsError::Inconsistent)
1774 .context("write_attr on an attribute with overwrite extents")
1775 )
1776 }
1777 ObjectValue::Attribute { size, .. } => (data.len() as u64) < size,
1778 _ => bail!(FxfsError::Inconsistent),
1779 }
1780 } else {
1781 false
1782 };
1783 let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1784 let slice = buffer.as_mut_slice();
1785 slice[..data.len()].copy_from_slice(data);
1786 slice[data.len()..].fill(0);
1787 self.multi_write(
1788 transaction,
1789 attribute_id,
1790 Some(VOLUME_DATA_KEY_ID),
1791 &[0..rounded_len],
1792 buffer.as_mut(),
1793 )
1794 .await?;
1795 transaction.add(
1796 self.store().store_object_id,
1797 Mutation::replace_or_insert_object(
1798 ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1799 ObjectValue::attribute(data.len() as u64, false),
1800 ),
1801 );
1802 if should_trim {
1803 self.shrink(transaction, attribute_id, data.len() as u64).await
1804 } else {
1805 Ok(NeedsTrim(false))
1806 }
1807 }
1808
1809 pub async fn list_extended_attributes(&self) -> Result<Vec<Vec<u8>>, Error> {
1810 let layer_set = self.store().tree().layer_set();
1811 let mut merger = layer_set.merger();
1812 let mut iter = merger
1814 .query(Query::FullRange(&ObjectKey::extended_attribute(self.object_id(), Vec::new())))
1815 .await?;
1816 let mut out = Vec::new();
1817 while let Some(item) = iter.get() {
1818 if item.value != &ObjectValue::None {
1820 match item.key {
1821 ObjectKey { object_id, data: ObjectKeyData::ExtendedAttribute { name } }
1822 if *object_id == self.object_id() =>
1823 {
1824 out.push(name.clone());
1825 }
1826 _ => break,
1831 }
1832 }
1833 iter.advance().await?;
1834 }
1835 Ok(out)
1836 }
1837
1838 pub async fn get_inline_selinux_context(&self) -> Result<Option<fio::SelinuxContext>, Error> {
1842 const_assert!(fio::MAX_SELINUX_CONTEXT_ATTRIBUTE_LEN as usize <= MAX_INLINE_XATTR_SIZE);
1845 let item = match self
1846 .store()
1847 .tree()
1848 .find(&ObjectKey::extended_attribute(
1849 self.object_id(),
1850 fio::SELINUX_CONTEXT_NAME.into(),
1851 ))
1852 .await?
1853 {
1854 Some(item) => item,
1855 None => return Ok(None),
1856 };
1857 match item.value {
1858 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => {
1859 Ok(Some(fio::SelinuxContext::Data(value)))
1860 }
1861 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(_)) => {
1862 Ok(Some(fio::SelinuxContext::UseExtendedAttributes(fio::EmptyStruct {})))
1863 }
1864 _ => {
1865 bail!(
1866 anyhow!(FxfsError::Inconsistent)
1867 .context("get_inline_extended_attribute: Expected ExtendedAttribute value")
1868 )
1869 }
1870 }
1871 }
1872
1873 pub async fn get_extended_attribute(&self, name: Vec<u8>) -> Result<Vec<u8>, Error> {
1874 let item = self
1875 .store()
1876 .tree()
1877 .find(&ObjectKey::extended_attribute(self.object_id(), name))
1878 .await?
1879 .ok_or(FxfsError::NotFound)?;
1880 match item.value {
1881 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => Ok(value),
1882 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1883 Ok(self.read_attr(id).await?.ok_or(FxfsError::Inconsistent)?.into_vec())
1884 }
1885 _ => {
1886 bail!(
1887 anyhow!(FxfsError::Inconsistent)
1888 .context("get_extended_attribute: Expected ExtendedAttribute value")
1889 )
1890 }
1891 }
1892 }
1893
1894 pub async fn set_extended_attribute(
1895 &self,
1896 name: Vec<u8>,
1897 value: Vec<u8>,
1898 mode: SetExtendedAttributeMode,
1899 ) -> Result<(), Error> {
1900 let store = self.store();
1901 let fs = store.filesystem();
1902 let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
1905 let mut transaction = fs.new_transaction(keys, Options::default()).await?;
1906 self.set_extended_attribute_impl(name, value, mode, &mut transaction).await?;
1907 transaction.commit().await?;
1908 Ok(())
1909 }
1910
1911 async fn set_extended_attribute_impl(
1912 &self,
1913 name: Vec<u8>,
1914 value: Vec<u8>,
1915 mode: SetExtendedAttributeMode,
1916 transaction: &mut Transaction<'_>,
1917 ) -> Result<(), Error> {
1918 ensure!(name.len() <= MAX_XATTR_NAME_SIZE, FxfsError::TooBig);
1919 ensure!(value.len() <= MAX_XATTR_VALUE_SIZE, FxfsError::TooBig);
1920 let tree = self.store().tree();
1921 let object_key = ObjectKey::extended_attribute(self.object_id(), name);
1922
1923 let existing_attribute_id = {
1924 let (found, existing_attribute_id) = match tree.find(&object_key).await? {
1925 None => (false, None),
1926 Some(Item { value, .. }) => (
1927 true,
1928 match value {
1929 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
1930 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1931 Some(id)
1932 }
1933 _ => bail!(
1934 anyhow!(FxfsError::Inconsistent)
1935 .context("expected extended attribute value")
1936 ),
1937 },
1938 ),
1939 };
1940 match mode {
1941 SetExtendedAttributeMode::Create if found => {
1942 bail!(FxfsError::AlreadyExists)
1943 }
1944 SetExtendedAttributeMode::Replace if !found => {
1945 bail!(FxfsError::NotFound)
1946 }
1947 _ => (),
1948 }
1949 existing_attribute_id
1950 };
1951
1952 if let Some(attribute_id) = existing_attribute_id {
1953 let _ = self.write_attr(transaction, attribute_id, &value).await?;
1959 } else if value.len() <= MAX_INLINE_XATTR_SIZE {
1960 transaction.add(
1961 self.store().store_object_id(),
1962 Mutation::replace_or_insert_object(
1963 object_key,
1964 ObjectValue::inline_extended_attribute(value),
1965 ),
1966 );
1967 } else {
1968 let mut attribute_id = AttributeId::XATTR_RANGE_START;
1974 let layer_set = tree.layer_set();
1975 let mut merger = layer_set.merger();
1976 let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1977 let mut iter = merger.query(Query::FullRange(&key)).await?;
1978 loop {
1979 match iter.get() {
1980 None => break,
1983 Some(ItemRef {
1984 key: ObjectKey { object_id, data: ObjectKeyData::Attribute(attr_id, _) },
1985 value,
1986 ..
1987 }) if *object_id == self.object_id() => {
1988 if matches!(value, ObjectValue::None) {
1989 break;
1992 }
1993 if attribute_id < *attr_id {
1994 break;
1996 } else if attribute_id == *attr_id {
1997 attribute_id = attribute_id.next();
1999 if attribute_id == AttributeId::XATTR_RANGE_END {
2000 bail!(FxfsError::NoSpace);
2001 }
2002 }
2003 }
2007 _ => break,
2011 }
2012 iter.advance().await?;
2013 }
2014
2015 let _ = self.write_attr(transaction, attribute_id, &value).await?;
2017 transaction.add(
2018 self.store().store_object_id(),
2019 Mutation::replace_or_insert_object(
2020 object_key,
2021 ObjectValue::extended_attribute(attribute_id),
2022 ),
2023 );
2024 }
2025
2026 Ok(())
2027 }
2028
2029 pub async fn remove_extended_attribute(&self, name: Vec<u8>) -> Result<(), Error> {
2030 let store = self.store();
2031 let tree = store.tree();
2032 let object_key = ObjectKey::extended_attribute(self.object_id(), name);
2033
2034 let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
2039 let mut transaction = store.filesystem().new_transaction(keys, Options::default()).await?;
2040
2041 let attribute_to_delete =
2042 match tree.find(&object_key).await?.ok_or(FxfsError::NotFound)?.value {
2043 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => Some(id),
2044 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
2045 _ => {
2046 bail!(
2047 anyhow!(FxfsError::Inconsistent)
2048 .context("remove_extended_attribute: Expected ExtendedAttribute value")
2049 )
2050 }
2051 };
2052
2053 transaction.add(
2054 store.store_object_id(),
2055 Mutation::replace_or_insert_object(object_key, ObjectValue::None),
2056 );
2057
2058 if let Some(attribute_id) = attribute_to_delete {
2065 let trim_result = store
2066 .trim_some(
2067 &mut transaction,
2068 self.object_id(),
2069 attribute_id,
2070 TrimMode::FromOffset(0),
2071 )
2072 .await?;
2073 assert_matches!(trim_result, TrimResult::Done(_));
2076 transaction.add(
2077 store.store_object_id(),
2078 Mutation::replace_or_insert_object(
2079 ObjectKey::attribute(self.object_id, attribute_id, AttributeKey::Attribute),
2080 ObjectValue::None,
2081 ),
2082 );
2083 }
2084
2085 transaction.commit().await?;
2086 Ok(())
2087 }
2088
2089 pub fn pre_fetch_keys(&self) -> Option<impl Future<Output = ()> + use<S>> {
2092 if let Encryption::CachedKeys = self.encryption {
2093 let owner = self.owner.clone();
2094 let object_id = self.object_id;
2095 Some(async move {
2096 let store = owner.as_ref().as_ref();
2097 if let Some(crypt) = store.crypt() {
2098 let _ = store
2099 .key_manager
2100 .get_keys(
2101 object_id,
2102 crypt.as_ref(),
2103 &mut Some(async || store.get_keys(object_id).await),
2104 false,
2105 false,
2106 )
2107 .await;
2108 }
2109 })
2110 } else {
2111 None
2112 }
2113 }
2114}
2115
2116impl<S: HandleOwner> Drop for StoreObjectHandle<S> {
2117 fn drop(&mut self) {
2118 if self.is_encrypted() {
2119 let _ = self.store().key_manager.remove(self.object_id);
2120 }
2121 }
2122}
2123
2124#[must_use]
2128pub struct NeedsTrim(pub bool);
2129
2130#[cfg(test)]
2131mod tests {
2132 use super::{ChecksumRangeChunk, OverwriteBitmaps};
2133 use crate::errors::FxfsError;
2134 use crate::filesystem::{FxFilesystem, OpenFxFilesystem};
2135 use crate::object_handle::ObjectHandle;
2136 use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
2137 use crate::object_store::transaction::{Mutation, Options, lock_keys};
2138 use crate::object_store::{
2139 AttributeId, AttributeKey, DataObjectHandle, Directory, HandleOptions, LockKey, ObjectKey,
2140 ObjectStore, ObjectValue, SetExtendedAttributeMode, StoreObjectHandle,
2141 };
2142 use bit_vec::BitVec;
2143 use fuchsia_async as fasync;
2144 use futures::join;
2145 use std::sync::Arc;
2146 use storage_device::DeviceHolder;
2147 use storage_device::fake_device::FakeDevice;
2148
2149 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2150 const TEST_OBJECT_NAME: &str = "foo";
2151
2152 fn is_error(actual: anyhow::Error, expected: FxfsError) {
2153 assert_eq!(*actual.root_cause().downcast_ref::<FxfsError>().unwrap(), expected)
2154 }
2155
2156 async fn test_filesystem() -> OpenFxFilesystem {
2157 let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2158 FxFilesystem::new_empty(device).await.expect("new_empty failed")
2159 }
2160
2161 async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
2162 {
2163 let fs = test_filesystem().await;
2164 let store = fs.root_store();
2165
2166 let mut transaction = fs
2167 .clone()
2168 .new_transaction(
2169 lock_keys![LockKey::object(
2170 store.store_object_id(),
2171 store.root_directory_object_id()
2172 )],
2173 Options::default(),
2174 )
2175 .await
2176 .expect("new_transaction failed");
2177
2178 let object =
2179 ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2180 .await
2181 .expect("create_object failed");
2182
2183 let root_directory =
2184 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2185 root_directory
2186 .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
2187 .await
2188 .expect("add_child_file failed");
2189
2190 transaction.commit().await.expect("commit failed");
2191
2192 (fs, object)
2193 }
2194
2195 #[fuchsia::test(threads = 3)]
2196 async fn extended_attribute_double_remove() {
2197 let (fs, object) = test_filesystem_and_empty_object().await;
2202 let basic = Arc::new(StoreObjectHandle::new(
2203 object.owner().clone(),
2204 object.object_id(),
2205 false,
2206 HandleOptions::default(),
2207 false,
2208 ));
2209 let basic_a = basic.clone();
2210 let basic_b = basic.clone();
2211
2212 basic
2213 .set_extended_attribute(
2214 b"security.selinux".to_vec(),
2215 b"bar".to_vec(),
2216 SetExtendedAttributeMode::Set,
2217 )
2218 .await
2219 .expect("failed to set attribute");
2220
2221 let a_task = fasync::Task::spawn(async move {
2224 basic_a.remove_extended_attribute(b"security.selinux".to_vec()).await
2225 });
2226 let b_task = fasync::Task::spawn(async move {
2227 basic_b.remove_extended_attribute(b"security.selinux".to_vec()).await
2228 });
2229 match join!(a_task, b_task) {
2230 (Ok(()), Ok(())) => panic!("both remove calls succeeded"),
2231 (Err(_), Err(_)) => panic!("both remove calls failed"),
2232
2233 (Ok(()), Err(e)) => is_error(e, FxfsError::NotFound),
2234 (Err(e), Ok(())) => is_error(e, FxfsError::NotFound),
2235 }
2236
2237 fs.close().await.expect("Close failed");
2238 }
2239
2240 #[fuchsia::test(threads = 3)]
2241 async fn extended_attribute_double_create() {
2242 let (fs, object) = test_filesystem_and_empty_object().await;
2247 let basic = Arc::new(StoreObjectHandle::new(
2248 object.owner().clone(),
2249 object.object_id(),
2250 false,
2251 HandleOptions::default(),
2252 false,
2253 ));
2254 let basic_a = basic.clone();
2255 let basic_b = basic.clone();
2256
2257 let a_task = fasync::Task::spawn(async move {
2260 basic_a
2261 .set_extended_attribute(
2262 b"security.selinux".to_vec(),
2263 b"one".to_vec(),
2264 SetExtendedAttributeMode::Create,
2265 )
2266 .await
2267 });
2268 let b_task = fasync::Task::spawn(async move {
2269 basic_b
2270 .set_extended_attribute(
2271 b"security.selinux".to_vec(),
2272 b"two".to_vec(),
2273 SetExtendedAttributeMode::Create,
2274 )
2275 .await
2276 });
2277 match join!(a_task, b_task) {
2278 (Ok(()), Ok(())) => panic!("both set calls succeeded"),
2279 (Err(_), Err(_)) => panic!("both set calls failed"),
2280
2281 (Ok(()), Err(e)) => {
2282 assert_eq!(
2283 basic
2284 .get_extended_attribute(b"security.selinux".to_vec())
2285 .await
2286 .expect("failed to get xattr"),
2287 b"one"
2288 );
2289 is_error(e, FxfsError::AlreadyExists);
2290 }
2291 (Err(e), Ok(())) => {
2292 assert_eq!(
2293 basic
2294 .get_extended_attribute(b"security.selinux".to_vec())
2295 .await
2296 .expect("failed to get xattr"),
2297 b"two"
2298 );
2299 is_error(e, FxfsError::AlreadyExists);
2300 }
2301 }
2302
2303 fs.close().await.expect("Close failed");
2304 }
2305
2306 struct TestAttr {
2307 name: Vec<u8>,
2308 value: Vec<u8>,
2309 }
2310
2311 impl TestAttr {
2312 fn new(name: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> Self {
2313 Self { name: name.as_ref().to_vec(), value: value.as_ref().to_vec() }
2314 }
2315 fn name(&self) -> Vec<u8> {
2316 self.name.clone()
2317 }
2318 fn value(&self) -> Vec<u8> {
2319 self.value.clone()
2320 }
2321 }
2322
2323 #[fuchsia::test]
2324 async fn extended_attributes() {
2325 let (fs, object) = test_filesystem_and_empty_object().await;
2326
2327 let test_attr = TestAttr::new(b"security.selinux", b"foo");
2328
2329 assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2330 is_error(
2331 object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2332 FxfsError::NotFound,
2333 );
2334
2335 object
2336 .set_extended_attribute(
2337 test_attr.name(),
2338 test_attr.value(),
2339 SetExtendedAttributeMode::Set,
2340 )
2341 .await
2342 .unwrap();
2343 assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2344 assert_eq!(
2345 object.get_extended_attribute(test_attr.name()).await.unwrap(),
2346 test_attr.value()
2347 );
2348
2349 object.remove_extended_attribute(test_attr.name()).await.unwrap();
2350 assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2351 is_error(
2352 object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2353 FxfsError::NotFound,
2354 );
2355
2356 object
2358 .set_extended_attribute(
2359 test_attr.name(),
2360 test_attr.value(),
2361 SetExtendedAttributeMode::Set,
2362 )
2363 .await
2364 .unwrap();
2365 assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2366 assert_eq!(
2367 object.get_extended_attribute(test_attr.name()).await.unwrap(),
2368 test_attr.value()
2369 );
2370
2371 object.remove_extended_attribute(test_attr.name()).await.unwrap();
2372 assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2373 is_error(
2374 object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2375 FxfsError::NotFound,
2376 );
2377
2378 fs.close().await.expect("close failed");
2379 }
2380
2381 #[fuchsia::test]
2382 async fn large_extended_attribute() {
2383 let (fs, object) = test_filesystem_and_empty_object().await;
2384
2385 let test_attr = TestAttr::new(b"security.selinux", vec![3u8; 300]);
2386
2387 object
2388 .set_extended_attribute(
2389 test_attr.name(),
2390 test_attr.value(),
2391 SetExtendedAttributeMode::Set,
2392 )
2393 .await
2394 .unwrap();
2395 assert_eq!(
2396 object.get_extended_attribute(test_attr.name()).await.unwrap(),
2397 test_attr.value()
2398 );
2399
2400 assert_eq!(
2403 object
2404 .read_attr(AttributeId::XATTR_RANGE_START)
2405 .await
2406 .expect("read_attr failed")
2407 .expect("read_attr returned none")
2408 .into_vec(),
2409 test_attr.value()
2410 );
2411
2412 object.remove_extended_attribute(test_attr.name()).await.unwrap();
2413 is_error(
2414 object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2415 FxfsError::NotFound,
2416 );
2417
2418 object
2420 .set_extended_attribute(
2421 test_attr.name(),
2422 test_attr.value(),
2423 SetExtendedAttributeMode::Set,
2424 )
2425 .await
2426 .unwrap();
2427 assert_eq!(
2428 object.get_extended_attribute(test_attr.name()).await.unwrap(),
2429 test_attr.value()
2430 );
2431 object.remove_extended_attribute(test_attr.name()).await.unwrap();
2432 is_error(
2433 object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2434 FxfsError::NotFound,
2435 );
2436
2437 fs.close().await.expect("close failed");
2438 }
2439
2440 #[fuchsia::test]
2441 async fn multiple_extended_attributes() {
2442 let (fs, object) = test_filesystem_and_empty_object().await;
2443
2444 let attrs = [
2445 TestAttr::new(b"security.selinux", b"foo"),
2446 TestAttr::new(b"large.attribute", vec![3u8; 300]),
2447 TestAttr::new(b"an.attribute", b"asdf"),
2448 TestAttr::new(b"user.big", vec![5u8; 288]),
2449 TestAttr::new(b"user.tiny", b"smol"),
2450 TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2451 TestAttr::new(b"also big", vec![7u8; 500]),
2452 TestAttr::new(b"all.ones", vec![1u8; 11111]),
2453 ];
2454
2455 for i in 0..attrs.len() {
2456 object
2457 .set_extended_attribute(
2458 attrs[i].name(),
2459 attrs[i].value(),
2460 SetExtendedAttributeMode::Set,
2461 )
2462 .await
2463 .unwrap();
2464 assert_eq!(
2465 object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2466 attrs[i].value()
2467 );
2468 }
2469
2470 for i in 0..attrs.len() {
2471 let mut found_attrs = object.list_extended_attributes().await.unwrap();
2473 let mut expected_attrs: Vec<Vec<u8>> = attrs.iter().skip(i).map(|a| a.name()).collect();
2474 found_attrs.sort();
2475 expected_attrs.sort();
2476 assert_eq!(found_attrs, expected_attrs);
2477 for j in i..attrs.len() {
2478 assert_eq!(
2479 object.get_extended_attribute(attrs[j].name()).await.unwrap(),
2480 attrs[j].value()
2481 );
2482 }
2483
2484 object.remove_extended_attribute(attrs[i].name()).await.expect("failed to remove");
2485 is_error(
2486 object.get_extended_attribute(attrs[i].name()).await.unwrap_err(),
2487 FxfsError::NotFound,
2488 );
2489 }
2490
2491 fs.close().await.expect("close failed");
2492 }
2493
2494 #[fuchsia::test]
2495 async fn multiple_extended_attributes_delete() {
2496 let (fs, object) = test_filesystem_and_empty_object().await;
2497 let store = object.owner().clone();
2498
2499 let attrs = [
2500 TestAttr::new(b"security.selinux", b"foo"),
2501 TestAttr::new(b"large.attribute", vec![3u8; 300]),
2502 TestAttr::new(b"an.attribute", b"asdf"),
2503 TestAttr::new(b"user.big", vec![5u8; 288]),
2504 TestAttr::new(b"user.tiny", b"smol"),
2505 TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2506 TestAttr::new(b"also big", vec![7u8; 500]),
2507 TestAttr::new(b"all.ones", vec![1u8; 11111]),
2508 ];
2509
2510 for i in 0..attrs.len() {
2511 object
2512 .set_extended_attribute(
2513 attrs[i].name(),
2514 attrs[i].value(),
2515 SetExtendedAttributeMode::Set,
2516 )
2517 .await
2518 .unwrap();
2519 assert_eq!(
2520 object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2521 attrs[i].value()
2522 );
2523 }
2524
2525 let root_directory =
2527 Directory::open(object.owner(), object.store().root_directory_object_id())
2528 .await
2529 .expect("open failed");
2530 let mut transaction = fs
2531 .clone()
2532 .new_transaction(
2533 lock_keys![
2534 LockKey::object(store.store_object_id(), store.root_directory_object_id()),
2535 LockKey::object(store.store_object_id(), object.object_id()),
2536 ],
2537 Options::default(),
2538 )
2539 .await
2540 .expect("new_transaction failed");
2541 crate::object_store::directory::replace_child(
2542 &mut transaction,
2543 None,
2544 (&root_directory, TEST_OBJECT_NAME),
2545 )
2546 .await
2547 .expect("replace_child failed");
2548 transaction.commit().await.unwrap();
2549 store.tombstone_object(object.object_id(), Options::default()).await.unwrap();
2550
2551 crate::fsck::fsck(fs.clone()).await.unwrap();
2552
2553 fs.close().await.expect("close failed");
2554 }
2555
2556 #[fuchsia::test]
2557 async fn extended_attribute_changing_sizes() {
2558 let (fs, object) = test_filesystem_and_empty_object().await;
2559
2560 let test_name = b"security.selinux";
2561 let test_small_attr = TestAttr::new(test_name, b"smol");
2562 let test_large_attr = TestAttr::new(test_name, vec![3u8; 300]);
2563
2564 object
2565 .set_extended_attribute(
2566 test_small_attr.name(),
2567 test_small_attr.value(),
2568 SetExtendedAttributeMode::Set,
2569 )
2570 .await
2571 .unwrap();
2572 assert_eq!(
2573 object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2574 test_small_attr.value()
2575 );
2576
2577 assert!(
2579 object
2580 .read_attr(AttributeId::XATTR_RANGE_START)
2581 .await
2582 .expect("read_attr failed")
2583 .is_none()
2584 );
2585
2586 crate::fsck::fsck(fs.clone()).await.unwrap();
2587
2588 object
2589 .set_extended_attribute(
2590 test_large_attr.name(),
2591 test_large_attr.value(),
2592 SetExtendedAttributeMode::Set,
2593 )
2594 .await
2595 .unwrap();
2596 assert_eq!(
2597 object.get_extended_attribute(test_large_attr.name()).await.unwrap(),
2598 test_large_attr.value()
2599 );
2600
2601 assert_eq!(
2604 object
2605 .read_attr(AttributeId::XATTR_RANGE_START)
2606 .await
2607 .expect("read_attr failed")
2608 .expect("read_attr returned none")
2609 .into_vec(),
2610 test_large_attr.value()
2611 );
2612
2613 crate::fsck::fsck(fs.clone()).await.unwrap();
2614
2615 object
2616 .set_extended_attribute(
2617 test_small_attr.name(),
2618 test_small_attr.value(),
2619 SetExtendedAttributeMode::Set,
2620 )
2621 .await
2622 .unwrap();
2623 assert_eq!(
2624 object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2625 test_small_attr.value()
2626 );
2627
2628 assert_eq!(
2631 object
2632 .read_attr(AttributeId::XATTR_RANGE_START)
2633 .await
2634 .expect("read_attr failed")
2635 .expect("read_attr returned none")
2636 .into_vec(),
2637 test_small_attr.value()
2638 );
2639
2640 crate::fsck::fsck(fs.clone()).await.unwrap();
2641
2642 object.remove_extended_attribute(test_small_attr.name()).await.expect("failed to remove");
2643
2644 crate::fsck::fsck(fs.clone()).await.unwrap();
2645
2646 fs.close().await.expect("close failed");
2647 }
2648
2649 #[fuchsia::test]
2650 async fn extended_attribute_max_size() {
2651 let (fs, object) = test_filesystem_and_empty_object().await;
2652
2653 let test_attr = TestAttr::new(
2654 vec![3u8; super::MAX_XATTR_NAME_SIZE],
2655 vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2656 );
2657
2658 object
2659 .set_extended_attribute(
2660 test_attr.name(),
2661 test_attr.value(),
2662 SetExtendedAttributeMode::Set,
2663 )
2664 .await
2665 .unwrap();
2666 assert_eq!(
2667 object.get_extended_attribute(test_attr.name()).await.unwrap(),
2668 test_attr.value()
2669 );
2670 assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2671 object.remove_extended_attribute(test_attr.name()).await.unwrap();
2672
2673 fs.close().await.expect("close failed");
2674 }
2675
2676 #[fuchsia::test]
2677 async fn extended_attribute_remove_then_create() {
2678 let (fs, object) = test_filesystem_and_empty_object().await;
2679
2680 let test_attr = TestAttr::new(
2681 vec![3u8; super::MAX_XATTR_NAME_SIZE],
2682 vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2683 );
2684
2685 object
2686 .set_extended_attribute(
2687 test_attr.name(),
2688 test_attr.value(),
2689 SetExtendedAttributeMode::Create,
2690 )
2691 .await
2692 .unwrap();
2693 fs.journal().force_compact().await.unwrap();
2694 object.remove_extended_attribute(test_attr.name()).await.unwrap();
2695 object
2696 .set_extended_attribute(
2697 test_attr.name(),
2698 test_attr.value(),
2699 SetExtendedAttributeMode::Create,
2700 )
2701 .await
2702 .unwrap();
2703
2704 assert_eq!(
2705 object.get_extended_attribute(test_attr.name()).await.unwrap(),
2706 test_attr.value()
2707 );
2708
2709 fs.close().await.expect("close failed");
2710 }
2711
2712 #[fuchsia::test]
2713 async fn large_extended_attribute_max_number() {
2714 let (fs, object) = test_filesystem_and_empty_object().await;
2715
2716 let max_xattrs = AttributeId::XATTR_RANGE_END.raw() - AttributeId::XATTR_RANGE_START.raw();
2717 for i in 0..max_xattrs {
2718 let test_attr = TestAttr::new(format!("{}", i).as_bytes(), vec![0x3; 300]);
2719 object
2720 .set_extended_attribute(
2721 test_attr.name(),
2722 test_attr.value(),
2723 SetExtendedAttributeMode::Set,
2724 )
2725 .await
2726 .unwrap_or_else(|_| panic!("failed to set xattr number {}", i));
2727 }
2728
2729 match object
2732 .set_extended_attribute(
2733 b"one.too.many".to_vec(),
2734 vec![0x3; 300],
2735 SetExtendedAttributeMode::Set,
2736 )
2737 .await
2738 {
2739 Ok(()) => panic!("set should not succeed"),
2740 Err(e) => is_error(e, FxfsError::NoSpace),
2741 }
2742
2743 object
2745 .set_extended_attribute(
2746 b"this.is.okay".to_vec(),
2747 b"small value".to_vec(),
2748 SetExtendedAttributeMode::Set,
2749 )
2750 .await
2751 .unwrap();
2752
2753 object
2755 .set_extended_attribute(b"11".to_vec(), vec![0x4; 300], SetExtendedAttributeMode::Set)
2756 .await
2757 .unwrap();
2758 object
2759 .set_extended_attribute(
2760 b"12".to_vec(),
2761 vec![0x1; 300],
2762 SetExtendedAttributeMode::Replace,
2763 )
2764 .await
2765 .unwrap();
2766
2767 object.remove_extended_attribute(b"5".to_vec()).await.unwrap();
2769 object
2770 .set_extended_attribute(
2771 b"new attr".to_vec(),
2772 vec![0x3; 300],
2773 SetExtendedAttributeMode::Set,
2774 )
2775 .await
2776 .unwrap();
2777
2778 fs.close().await.expect("close failed");
2779 }
2780
2781 #[fuchsia::test]
2782 async fn write_attr_trims_beyond_new_end() {
2783 let (fs, object) = test_filesystem_and_empty_object().await;
2788
2789 let block_size = fs.block_size();
2790 let buf_size = block_size * 2;
2791 let attribute_id = AttributeId::TEST_ID;
2792
2793 let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2794 let mut buffer = object.allocate_buffer(buf_size as usize).await;
2795 buffer.as_mut_slice().fill(3);
2796 object
2799 .multi_write(
2800 &mut transaction,
2801 attribute_id,
2802 &[0..block_size, block_size..block_size * 2],
2803 buffer.as_mut(),
2804 )
2805 .await
2806 .unwrap();
2807 transaction.add(
2808 object.store().store_object_id,
2809 Mutation::replace_or_insert_object(
2810 ObjectKey::attribute(object.object_id(), attribute_id, AttributeKey::Attribute),
2811 ObjectValue::attribute(block_size * 2, false),
2812 ),
2813 );
2814 transaction.commit().await.unwrap();
2815
2816 crate::fsck::fsck(fs.clone()).await.unwrap();
2817
2818 let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2819 let needs_trim = (*object)
2820 .write_attr(&mut transaction, attribute_id, &vec![3u8; block_size as usize])
2821 .await
2822 .unwrap();
2823 assert!(!needs_trim.0);
2824 transaction.commit().await.unwrap();
2825
2826 crate::fsck::fsck(fs.clone()).await.unwrap();
2827
2828 fs.close().await.expect("close failed");
2829 }
2830
2831 #[fuchsia::test]
2832 async fn write_new_attr_in_batches_multiple_txns() {
2833 let (fs, object) = test_filesystem_and_empty_object().await;
2834 let merkle_tree = vec![1; 3 * WRITE_ATTR_BATCH_SIZE];
2835 let mut transaction = (*object).new_transaction(AttributeId::TEST_ID).await.unwrap();
2836 object
2837 .write_new_attr_in_batches(
2838 &mut transaction,
2839 AttributeId::TEST_ID,
2840 &merkle_tree,
2841 WRITE_ATTR_BATCH_SIZE,
2842 )
2843 .await
2844 .expect("failed to write merkle attribute");
2845
2846 transaction.add(
2847 object.store().store_object_id,
2848 Mutation::replace_or_insert_object(
2849 ObjectKey::graveyard_attribute_entry(
2850 object.store().graveyard_directory_object_id(),
2851 object.object_id(),
2852 AttributeId::TEST_ID,
2853 ),
2854 ObjectValue::None,
2855 ),
2856 );
2857 transaction.commit().await.unwrap();
2858 assert_eq!(
2859 object.read_attr(AttributeId::TEST_ID).await.expect("read_attr failed"),
2860 Some(merkle_tree.into())
2861 );
2862
2863 fs.close().await.expect("close failed");
2864 }
2865
2866 #[cfg(target_os = "fuchsia")]
2868 #[fuchsia::test(allow_stalls = false)]
2869 async fn test_watchdog() {
2870 use super::Watchdog;
2871 use fuchsia_async::{MonotonicDuration, MonotonicInstant, TestExecutor};
2872 use std::sync::mpsc::channel;
2873
2874 TestExecutor::advance_to(make_time(0)).await;
2875 let (sender, receiver) = channel();
2876
2877 fn make_time(time_secs: i64) -> MonotonicInstant {
2878 MonotonicInstant::from_nanos(0) + MonotonicDuration::from_seconds(time_secs)
2879 }
2880
2881 {
2882 let _watchdog = Watchdog::new(10, move |count| {
2883 sender.send(count).expect("Sending value");
2884 });
2885
2886 TestExecutor::advance_to(make_time(5)).await;
2888 receiver.try_recv().expect_err("Should not have message");
2889
2890 TestExecutor::advance_to(make_time(10)).await;
2892 assert_eq!(1, receiver.recv().expect("Receiving"));
2893
2894 TestExecutor::advance_to(make_time(15)).await;
2896 receiver.try_recv().expect_err("Should not have message");
2897
2898 TestExecutor::advance_to(make_time(30)).await;
2900 assert_eq!(2, receiver.recv().expect("Receiving"));
2901 assert_eq!(3, receiver.recv().expect("Receiving"));
2902 }
2903
2904 TestExecutor::advance_to(make_time(100)).await;
2906 receiver.recv().expect_err("Watchdog should be gone");
2907 }
2908
2909 #[fuchsia::test]
2910 fn test_checksum_range_chunk() {
2911 let block_size = 4096;
2912
2913 assert_eq!(
2915 ChecksumRangeChunk::group_first_write_ranges(
2916 &mut OverwriteBitmaps::None,
2917 block_size,
2918 block_size * 2..block_size * 5,
2919 ),
2920 vec![ChecksumRangeChunk {
2921 checksum_range: 0..3,
2922 device_range: block_size * 2..block_size * 5,
2923 is_first_write: false,
2924 }],
2925 );
2926
2927 let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2928 assert_eq!(
2929 ChecksumRangeChunk::group_first_write_ranges(
2930 &mut bitmaps,
2931 block_size,
2932 block_size * 2..block_size * 5,
2933 ),
2934 vec![ChecksumRangeChunk {
2935 checksum_range: 0..3,
2936 device_range: block_size * 2..block_size * 5,
2937 is_first_write: false,
2938 }],
2939 );
2940 assert_eq!(
2941 bitmaps.take_bitmaps(),
2942 Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b11100000])))
2943 );
2944
2945 let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2946 bitmaps.set_offset(2);
2947 assert_eq!(
2948 ChecksumRangeChunk::group_first_write_ranges(
2949 &mut bitmaps,
2950 block_size,
2951 block_size * 2..block_size * 5,
2952 ),
2953 vec![
2954 ChecksumRangeChunk {
2955 checksum_range: 0..2,
2956 device_range: block_size * 2..block_size * 4,
2957 is_first_write: false,
2958 },
2959 ChecksumRangeChunk {
2960 checksum_range: 2..3,
2961 device_range: block_size * 4..block_size * 5,
2962 is_first_write: true,
2963 },
2964 ],
2965 );
2966 assert_eq!(
2967 bitmaps.take_bitmaps(),
2968 Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00111000])))
2969 );
2970
2971 let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2972 bitmaps.set_offset(4);
2973 assert_eq!(
2974 ChecksumRangeChunk::group_first_write_ranges(
2975 &mut bitmaps,
2976 block_size,
2977 block_size * 2..block_size * 5,
2978 ),
2979 vec![ChecksumRangeChunk {
2980 checksum_range: 0..3,
2981 device_range: block_size * 2..block_size * 5,
2982 is_first_write: true,
2983 }],
2984 );
2985 assert_eq!(
2986 bitmaps.take_bitmaps(),
2987 Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00001110])))
2988 );
2989
2990 let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b01010101]));
2991 assert_eq!(
2992 ChecksumRangeChunk::group_first_write_ranges(
2993 &mut bitmaps,
2994 block_size,
2995 block_size * 2..block_size * 10,
2996 ),
2997 vec![
2998 ChecksumRangeChunk {
2999 checksum_range: 0..1,
3000 device_range: block_size * 2..block_size * 3,
3001 is_first_write: true,
3002 },
3003 ChecksumRangeChunk {
3004 checksum_range: 1..2,
3005 device_range: block_size * 3..block_size * 4,
3006 is_first_write: false,
3007 },
3008 ChecksumRangeChunk {
3009 checksum_range: 2..3,
3010 device_range: block_size * 4..block_size * 5,
3011 is_first_write: true,
3012 },
3013 ChecksumRangeChunk {
3014 checksum_range: 3..4,
3015 device_range: block_size * 5..block_size * 6,
3016 is_first_write: false,
3017 },
3018 ChecksumRangeChunk {
3019 checksum_range: 4..5,
3020 device_range: block_size * 6..block_size * 7,
3021 is_first_write: true,
3022 },
3023 ChecksumRangeChunk {
3024 checksum_range: 5..6,
3025 device_range: block_size * 7..block_size * 8,
3026 is_first_write: false,
3027 },
3028 ChecksumRangeChunk {
3029 checksum_range: 6..7,
3030 device_range: block_size * 8..block_size * 9,
3031 is_first_write: true,
3032 },
3033 ChecksumRangeChunk {
3034 checksum_range: 7..8,
3035 device_range: block_size * 9..block_size * 10,
3036 is_first_write: false,
3037 },
3038 ],
3039 );
3040 assert_eq!(
3041 bitmaps.take_bitmaps(),
3042 Some((BitVec::from_bytes(&[0b01010101]), BitVec::from_bytes(&[0b11111111])))
3043 );
3044 }
3045}