1use crate::checksum::{Checksum, Checksums, fletcher64};
6use crate::errors::FxfsError;
7use crate::log::*;
8use crate::lsm_tree::Query;
9use crate::lsm_tree::merge::MergerIterator;
10use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
11use crate::object_handle::ObjectHandle;
12use crate::object_store::extent_record::{ExtentKey, ExtentMode, ExtentValue};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::object_record::{
15 AttributeKey, ExtendedAttributeValue, ObjectAttributes, ObjectItem, ObjectKey, ObjectKeyData,
16 ObjectValue, Timestamp,
17};
18use crate::object_store::transaction::{
19 AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Options, ReadGuard,
20 Transaction, lock_keys,
21};
22use crate::object_store::{
23 HandleOptions, HandleOwner, ObjectStore, TrimMode, TrimResult, VOLUME_DATA_KEY_ID,
24};
25use crate::range::RangeExt;
26use crate::round::{round_down, round_up};
27use anyhow::{Context, Error, anyhow, bail, ensure};
28use assert_matches::assert_matches;
29use bit_vec::BitVec;
30use futures::stream::{FuturesOrdered, FuturesUnordered};
31use futures::{TryStreamExt, try_join};
32use fxfs_crypto::{
33 Cipher, CipherHolder, CipherSet, EncryptionKey, FindKeyResult, FxfsCipher, KeyPurpose,
34};
35use fxfs_trace::{TraceFutureExt, trace, trace_future_args};
36use static_assertions::const_assert;
37use std::cmp::min;
38use std::future::Future;
39use std::ops::Range;
40use std::sync::Arc;
41use std::sync::atomic::{self, AtomicBool, Ordering};
42use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
43use storage_device::{InlineCryptoOptions, ReadOptions, WriteOptions};
44
45use fidl_fuchsia_io as fio;
46use fuchsia_async as fasync;
47
48pub const MAX_XATTR_NAME_SIZE: usize = 255;
50pub const MAX_INLINE_XATTR_SIZE: usize = 256;
53pub const MAX_XATTR_VALUE_SIZE: usize = 64000;
57pub const EXTENDED_ATTRIBUTE_RANGE_START: u64 = 64;
64pub const EXTENDED_ATTRIBUTE_RANGE_END: u64 = 512;
65
66fn apply_bitmap_zeroing(
68 block_size: usize,
69 bitmap: &bit_vec::BitVec,
70 mut buffer: MutableBufferRef<'_>,
71) {
72 let buf = buffer.as_mut_slice();
73 debug_assert_eq!(bitmap.len() * block_size, buf.len());
74 for (i, block) in bitmap.iter().enumerate() {
75 if !block {
76 let start = i * block_size;
77 buf[start..start + block_size].fill(0);
78 }
79 }
80}
81
82#[derive(Debug, Clone, PartialEq)]
86pub enum MaybeChecksums {
87 None,
88 Fletcher(Vec<Checksum>),
89}
90
91impl MaybeChecksums {
92 pub fn maybe_as_ref(&self) -> Option<&[Checksum]> {
93 match self {
94 Self::None => None,
95 Self::Fletcher(sums) => Some(&sums),
96 }
97 }
98
99 pub fn split_off(&mut self, at: usize) -> Self {
100 match self {
101 Self::None => Self::None,
102 Self::Fletcher(sums) => Self::Fletcher(sums.split_off(at)),
103 }
104 }
105
106 pub fn to_mode(self) -> ExtentMode {
107 match self {
108 Self::None => ExtentMode::Raw,
109 Self::Fletcher(sums) => ExtentMode::Cow(Checksums::fletcher(sums)),
110 }
111 }
112
113 pub fn into_option(self) -> Option<Vec<Checksum>> {
114 match self {
115 Self::None => None,
116 Self::Fletcher(sums) => Some(sums),
117 }
118 }
119}
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125pub enum SetExtendedAttributeMode {
126 Set,
128 Create,
130 Replace,
132}
133
134impl From<fio::SetExtendedAttributeMode> for SetExtendedAttributeMode {
135 fn from(other: fio::SetExtendedAttributeMode) -> SetExtendedAttributeMode {
136 match other {
137 fio::SetExtendedAttributeMode::Set => SetExtendedAttributeMode::Set,
138 fio::SetExtendedAttributeMode::Create => SetExtendedAttributeMode::Create,
139 fio::SetExtendedAttributeMode::Replace => SetExtendedAttributeMode::Replace,
140 }
141 }
142}
143
144enum Encryption {
145 None,
147
148 CachedKeys,
151
152 PermanentKeys,
154}
155
156#[derive(PartialEq, Debug)]
157enum OverwriteBitmaps {
158 None,
159 Some {
160 extent_bitmap: BitVec,
162 write_bitmap: BitVec,
164 bitmap_offset: usize,
167 },
168}
169
170impl OverwriteBitmaps {
171 fn new(extent_bitmap: BitVec) -> Self {
172 OverwriteBitmaps::Some {
173 write_bitmap: BitVec::from_elem(extent_bitmap.len(), false),
174 extent_bitmap,
175 bitmap_offset: 0,
176 }
177 }
178
179 fn is_none(&self) -> bool {
180 *self == OverwriteBitmaps::None
181 }
182
183 fn set_offset(&mut self, new_offset: usize) {
184 match self {
185 OverwriteBitmaps::None => (),
186 OverwriteBitmaps::Some { bitmap_offset, .. } => *bitmap_offset = new_offset,
187 }
188 }
189
190 fn get_from_extent_bitmap(&self, i: usize) -> Option<bool> {
191 match self {
192 OverwriteBitmaps::None => None,
193 OverwriteBitmaps::Some { extent_bitmap, bitmap_offset, .. } => {
194 extent_bitmap.get(*bitmap_offset + i)
195 }
196 }
197 }
198
199 fn set_in_write_bitmap(&mut self, i: usize, x: bool) {
200 match self {
201 OverwriteBitmaps::None => (),
202 OverwriteBitmaps::Some { write_bitmap, bitmap_offset, .. } => {
203 write_bitmap.set(*bitmap_offset + i, x)
204 }
205 }
206 }
207
208 fn take_bitmaps(self) -> Option<(BitVec, BitVec)> {
209 match self {
210 OverwriteBitmaps::None => None,
211 OverwriteBitmaps::Some { extent_bitmap, write_bitmap, .. } => {
212 Some((extent_bitmap, write_bitmap))
213 }
214 }
215 }
216}
217
218#[derive(PartialEq, Debug)]
222struct ChecksumRangeChunk {
223 checksum_range: Range<usize>,
224 device_range: Range<u64>,
225 is_first_write: bool,
226}
227
228impl ChecksumRangeChunk {
229 fn group_first_write_ranges(
230 bitmaps: &mut OverwriteBitmaps,
231 block_size: u64,
232 write_device_range: Range<u64>,
233 ) -> Vec<ChecksumRangeChunk> {
234 let write_block_len = (write_device_range.length().unwrap() / block_size) as usize;
235 if bitmaps.is_none() {
236 vec![ChecksumRangeChunk {
241 checksum_range: 0..write_block_len,
242 device_range: write_device_range,
243 is_first_write: false,
244 }]
245 } else {
246 let mut checksum_ranges = vec![ChecksumRangeChunk {
247 checksum_range: 0..0,
248 device_range: write_device_range.start..write_device_range.start,
249 is_first_write: !bitmaps.get_from_extent_bitmap(0).unwrap(),
250 }];
251 let mut working_range = checksum_ranges.last_mut().unwrap();
252 for i in 0..write_block_len {
253 bitmaps.set_in_write_bitmap(i, true);
254
255 if working_range.is_first_write != bitmaps.get_from_extent_bitmap(i).unwrap() {
258 working_range.checksum_range.end += 1;
261 working_range.device_range.end += block_size;
262 } else {
263 let new_chunk = ChecksumRangeChunk {
265 checksum_range: working_range.checksum_range.end
266 ..working_range.checksum_range.end + 1,
267 device_range: working_range.device_range.end
268 ..working_range.device_range.end + block_size,
269 is_first_write: !working_range.is_first_write,
270 };
271 checksum_ranges.push(new_chunk);
272 working_range = checksum_ranges.last_mut().unwrap();
273 }
274 }
275 checksum_ranges
276 }
277 }
278}
279
280pub struct StoreObjectHandle<S: HandleOwner> {
293 owner: Arc<S>,
294 object_id: u64,
295 options: HandleOptions,
296 trace: AtomicBool,
297 encryption: Encryption,
298}
299
300impl<S: HandleOwner> ObjectHandle for StoreObjectHandle<S> {
301 fn set_trace(&self, v: bool) {
302 info!(store_id = self.store().store_object_id, oid = self.object_id(), trace = v; "trace");
303 self.trace.store(v, atomic::Ordering::Relaxed);
304 }
305
306 fn object_id(&self) -> u64 {
307 return self.object_id;
308 }
309
310 fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
311 self.store().device.allocate_buffer(size)
312 }
313
314 fn block_size(&self) -> u64 {
315 self.store().block_size()
316 }
317}
318
319struct Watchdog {
320 _task: fasync::Task<()>,
321}
322
323impl Watchdog {
324 fn new(increment_seconds: u64, cb: impl Fn(u64) + Send + 'static) -> Self {
325 Self {
326 _task: fasync::Task::spawn(
327 async move {
328 let increment = increment_seconds.try_into().unwrap();
329 let mut fired_counter = 0;
330 let mut next_wake = fasync::MonotonicInstant::now();
331 loop {
332 next_wake += std::time::Duration::from_secs(increment).into();
333 if fasync::MonotonicInstant::now() < next_wake {
337 fasync::Timer::new(next_wake).await;
338 }
339 fired_counter += 1;
340 cb(fired_counter);
341 }
342 }
343 .trace(trace_future_args!("StoreObjectHandle::Watchdog")),
344 ),
345 }
346 }
347}
348
349impl<S: HandleOwner> StoreObjectHandle<S> {
350 pub fn new(
352 owner: Arc<S>,
353 object_id: u64,
354 permanent_keys: bool,
355 options: HandleOptions,
356 trace: bool,
357 ) -> Self {
358 let encryption = if permanent_keys {
359 Encryption::PermanentKeys
360 } else if owner.as_ref().as_ref().is_encrypted() {
361 Encryption::CachedKeys
362 } else {
363 Encryption::None
364 };
365 Self { owner, object_id, encryption, options, trace: AtomicBool::new(trace) }
366 }
367
368 pub fn owner(&self) -> &Arc<S> {
369 &self.owner
370 }
371
372 pub fn store(&self) -> &ObjectStore {
373 self.owner.as_ref().as_ref()
374 }
375
376 pub fn trace(&self) -> bool {
377 self.trace.load(atomic::Ordering::Relaxed)
378 }
379
380 pub fn is_encrypted(&self) -> bool {
381 !matches!(self.encryption, Encryption::None)
382 }
383
384 pub fn default_transaction_options<'b>(&self) -> Options<'b> {
387 Options { skip_journal_checks: self.options.skip_journal_checks, ..Default::default() }
388 }
389
390 pub async fn new_transaction_with_options<'b>(
391 &self,
392 attribute_id: u64,
393 options: Options<'b>,
394 ) -> Result<Transaction<'b>, Error> {
395 Ok(self
396 .store()
397 .filesystem()
398 .new_transaction(
399 lock_keys![
400 LockKey::object_attribute(
401 self.store().store_object_id(),
402 self.object_id(),
403 attribute_id,
404 ),
405 LockKey::object(self.store().store_object_id(), self.object_id()),
406 ],
407 options,
408 )
409 .await?)
410 }
411
412 pub async fn new_transaction<'b>(&self, attribute_id: u64) -> Result<Transaction<'b>, Error> {
413 self.new_transaction_with_options(attribute_id, self.default_transaction_options()).await
414 }
415
416 async fn txn_get_object_mutation(
419 &self,
420 transaction: &Transaction<'_>,
421 ) -> Result<ObjectStoreMutation, Error> {
422 self.store().txn_get_object_mutation(transaction, self.object_id()).await
423 }
424
425 async fn deallocate_old_extents(
427 &self,
428 transaction: &mut Transaction<'_>,
429 attribute_id: u64,
430 range: Range<u64>,
431 ) -> Result<u64, Error> {
432 let block_size = self.block_size();
433 assert_eq!(range.start % block_size, 0);
434 assert_eq!(range.end % block_size, 0);
435 if range.start == range.end {
436 return Ok(0);
437 }
438 let tree = &self.store().tree;
439 let layer_set = tree.layer_set();
440 let key = ExtentKey { range };
441 let lower_bound = ObjectKey::attribute(
442 self.object_id(),
443 attribute_id,
444 AttributeKey::Extent(key.search_key()),
445 );
446 let mut merger = layer_set.merger();
447 let mut iter = merger.query(Query::FullRange(&lower_bound)).await?;
448 let allocator = self.store().allocator();
449 let mut deallocated = 0;
450 let trace = self.trace();
451 while let Some(ItemRef {
452 key:
453 ObjectKey {
454 object_id,
455 data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
456 },
457 value: ObjectValue::Extent(value),
458 ..
459 }) = iter.get()
460 {
461 if *object_id != self.object_id() || *attr_id != attribute_id {
462 break;
463 }
464 if let ExtentValue::Some { device_offset, .. } = value {
465 if let Some(overlap) = key.overlap(extent_key) {
466 let range = device_offset + overlap.start - extent_key.range.start
467 ..device_offset + overlap.end - extent_key.range.start;
468 ensure!(range.is_aligned(block_size), FxfsError::Inconsistent);
469 if trace {
470 info!(
471 store_id = self.store().store_object_id(),
472 oid = self.object_id(),
473 device_range:? = range,
474 len = range.end - range.start,
475 extent_key:?;
476 "D",
477 );
478 }
479 allocator
480 .deallocate(transaction, self.store().store_object_id(), range)
481 .await?;
482 deallocated += overlap.end - overlap.start;
483 } else {
484 break;
485 }
486 }
487 iter.advance().await?;
488 }
489 Ok(deallocated)
490 }
491
492 async fn write_aligned(
495 &self,
496 buf: BufferRef<'_>,
497 device_offset: u64,
498 crypt_ctx: Option<(u32, u8)>,
499 ) -> Result<MaybeChecksums, Error> {
500 if self.trace() {
501 info!(
502 store_id = self.store().store_object_id(),
503 oid = self.object_id(),
504 device_range:? = (device_offset..device_offset + buf.len() as u64),
505 len = buf.len();
506 "W",
507 );
508 }
509 let store = self.store();
510 store.device_write_ops.fetch_add(1, Ordering::Relaxed);
511 let mut checksums = Vec::new();
512 let _watchdog = Watchdog::new(10, |count| {
513 warn!("Write has been stalled for {} seconds", count * 10);
514 });
515
516 match crypt_ctx {
517 Some((dun, slot)) => {
518 if !store.filesystem().options().barriers_enabled {
519 return Err(anyhow!(FxfsError::InvalidArgs)
520 .context("Barriers must be enabled for inline encrypted writes."));
521 }
522 store
523 .device
524 .write_with_opts(
525 device_offset as u64,
526 buf,
527 WriteOptions {
528 inline_crypto: InlineCryptoOptions::enabled(slot, dun),
529 ..Default::default()
530 },
531 )
532 .await?;
533 Ok(MaybeChecksums::None)
534 }
535 None => {
536 if self.options.skip_checksums {
537 store
538 .device
539 .write_with_opts(device_offset as u64, buf, WriteOptions::default())
540 .await?;
541 Ok(MaybeChecksums::None)
542 } else {
543 try_join!(store.device.write(device_offset, buf), async {
544 let block_size = self.block_size();
545 for chunk in buf.as_slice().chunks_exact(block_size as usize) {
546 checksums.push(fletcher64(chunk, 0));
547 }
548 Ok(())
549 })?;
550 Ok(MaybeChecksums::Fletcher(checksums))
551 }
552 }
553 }
554 }
555
556 pub async fn flush_device(&self) -> Result<(), Error> {
558 self.store().device().flush().await
559 }
560
561 pub async fn update_allocated_size(
562 &self,
563 transaction: &mut Transaction<'_>,
564 allocated: u64,
565 deallocated: u64,
566 ) -> Result<(), Error> {
567 if allocated == deallocated {
568 return Ok(());
569 }
570 let mut mutation = self.txn_get_object_mutation(transaction).await?;
571 if let ObjectValue::Object {
572 attributes: ObjectAttributes { project_id, allocated_size, .. },
573 ..
574 } = &mut mutation.item.value
575 {
576 *allocated_size = allocated_size
578 .checked_add(allocated)
579 .ok_or_else(|| anyhow!(FxfsError::Inconsistent).context("Allocated size overflow"))?
580 .checked_sub(deallocated)
581 .ok_or_else(|| {
582 anyhow!(FxfsError::Inconsistent).context("Allocated size underflow")
583 })?;
584
585 if *project_id != 0 {
586 let diff = i64::try_from(allocated).unwrap() - i64::try_from(deallocated).unwrap();
589 transaction.add(
590 self.store().store_object_id(),
591 Mutation::merge_object(
592 ObjectKey::project_usage(
593 self.store().root_directory_object_id(),
594 *project_id,
595 ),
596 ObjectValue::BytesAndNodes { bytes: diff, nodes: 0 },
597 ),
598 );
599 }
600 } else {
601 bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
604 }
605 transaction.add(self.store().store_object_id, Mutation::ObjectStore(mutation));
606 Ok(())
607 }
608
609 pub async fn update_attributes<'a>(
610 &self,
611 transaction: &mut Transaction<'a>,
612 node_attributes: Option<&fio::MutableNodeAttributes>,
613 change_time: Option<Timestamp>,
614 ) -> Result<(), Error> {
615 if let Some(&fio::MutableNodeAttributes { selinux_context: Some(ref context), .. }) =
616 node_attributes
617 {
618 if let fio::SelinuxContext::Data(context) = context {
619 self.set_extended_attribute_impl(
620 "security.selinux".into(),
621 context.clone(),
622 SetExtendedAttributeMode::Set,
623 transaction,
624 )
625 .await?;
626 } else {
627 return Err(anyhow!(FxfsError::InvalidArgs)
628 .context("Only set SELinux context with `data` member."));
629 }
630 }
631 self.store()
632 .update_attributes(transaction, self.object_id, node_attributes, change_time)
633 .await
634 }
635
636 pub async fn zero(
638 &self,
639 transaction: &mut Transaction<'_>,
640 attribute_id: u64,
641 range: Range<u64>,
642 ) -> Result<(), Error> {
643 let deallocated =
644 self.deallocate_old_extents(transaction, attribute_id, range.clone()).await?;
645 if deallocated > 0 {
646 self.update_allocated_size(transaction, 0, deallocated).await?;
647 transaction.add(
648 self.store().store_object_id,
649 Mutation::merge_object(
650 ObjectKey::extent(self.object_id(), attribute_id, range),
651 ObjectValue::Extent(ExtentValue::deleted_extent()),
652 ),
653 );
654 }
655 Ok(())
656 }
657
658 pub async fn align_buffer(
661 &self,
662 attribute_id: u64,
663 offset: u64,
664 buf: BufferRef<'_>,
665 ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
666 let block_size = self.block_size();
667 let end = offset + buf.len() as u64;
668 let aligned =
669 round_down(offset, block_size)..round_up(end, block_size).ok_or(FxfsError::TooBig)?;
670
671 let mut aligned_buf =
672 self.store().device.allocate_buffer((aligned.end - aligned.start) as usize).await;
673
674 if aligned.start < offset {
676 let mut head_block = aligned_buf.subslice_mut(..block_size as usize);
677 let read = self.read(attribute_id, aligned.start, head_block.reborrow()).await?;
678 head_block.as_mut_slice()[read..].fill(0);
679 }
680
681 if aligned.end > end {
683 let end_block_offset = aligned.end - block_size;
684 if offset <= end_block_offset {
686 let mut tail_block =
687 aligned_buf.subslice_mut(aligned_buf.len() - block_size as usize..);
688 let read = self.read(attribute_id, end_block_offset, tail_block.reborrow()).await?;
689 tail_block.as_mut_slice()[read..].fill(0);
690 }
691 }
692
693 aligned_buf.as_mut_slice()
694 [(offset - aligned.start) as usize..(end - aligned.start) as usize]
695 .copy_from_slice(buf.as_slice());
696
697 Ok((aligned, aligned_buf))
698 }
699
700 pub async fn shrink(
706 &self,
707 transaction: &mut Transaction<'_>,
708 attribute_id: u64,
709 size: u64,
710 ) -> Result<NeedsTrim, Error> {
711 let store = self.store();
712 let needs_trim = matches!(
713 store
714 .trim_some(transaction, self.object_id(), attribute_id, TrimMode::FromOffset(size))
715 .await?,
716 TrimResult::Incomplete
717 );
718 if needs_trim {
719 let graveyard_id = store.graveyard_directory_object_id();
722 match store
723 .tree
724 .find(&ObjectKey::graveyard_entry(graveyard_id, self.object_id()))
725 .await?
726 {
727 Some(ObjectItem { value: ObjectValue::Some, .. })
728 | Some(ObjectItem { value: ObjectValue::Trim, .. }) => {
729 }
731 _ => {
732 transaction.add(
733 store.store_object_id,
734 Mutation::replace_or_insert_object(
735 ObjectKey::graveyard_entry(graveyard_id, self.object_id()),
736 ObjectValue::Trim,
737 ),
738 );
739 }
740 }
741 }
742 Ok(NeedsTrim(needs_trim))
743 }
744
745 pub async fn read_and_decrypt(
747 &self,
748 device_offset: u64,
749 file_offset: u64,
750 mut buffer: MutableBufferRef<'_>,
751 key_id: u64,
752 ) -> Result<(), Error> {
753 let store = self.store();
754 store.device_read_ops.fetch_add(1, Ordering::Relaxed);
755
756 let _watchdog = Watchdog::new(10, |count| {
757 warn!("Read has been stalled for {} seconds", count * 10);
758 });
759
760 let (_key_id, key) = self.get_key(Some(key_id)).await?;
761 if let Some(key) = key {
762 if let Some((dun, slot)) = key.crypt_ctx(self.object_id, file_offset) {
763 store
764 .device
765 .read_with_opts(
766 device_offset as u64,
767 buffer.reborrow(),
768 ReadOptions { inline_crypto: InlineCryptoOptions::enabled(slot, dun) },
769 )
770 .await?;
771 } else {
772 store.device.read(device_offset, buffer.reborrow()).await?;
773 key.decrypt(self.object_id, device_offset, file_offset, buffer.as_mut_slice())?;
774 }
775 } else {
776 store.device.read(device_offset, buffer.reborrow()).await?;
777 }
778
779 Ok(())
780 }
781
782 pub async fn get_key(
787 &self,
788 key_id: Option<u64>,
789 ) -> Result<(u64, Option<Arc<dyn Cipher>>), Error> {
790 let store = self.store();
791 let result = match self.encryption {
792 Encryption::None => (VOLUME_DATA_KEY_ID, None),
793 Encryption::CachedKeys => {
794 if let Some(key_id) = key_id {
795 (
796 key_id,
797 Some(
798 store
799 .key_manager
800 .get_key(
801 self.object_id,
802 store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
803 async || store.get_keys(self.object_id).await,
804 key_id,
805 )
806 .await?,
807 ),
808 )
809 } else {
810 let (key_id, key) = store
811 .key_manager
812 .get_fscrypt_key_if_present(
813 self.object_id,
814 store.crypt().ok_or_else(|| anyhow!("No crypt!"))?.as_ref(),
815 async || store.get_keys(self.object_id).await,
816 )
817 .await?;
818 (key_id, Some(key))
819 }
820 }
821 Encryption::PermanentKeys => {
822 (VOLUME_DATA_KEY_ID, Some(store.key_manager.get(self.object_id).await?.unwrap()))
823 }
824 };
825
826 if let Some(ref key) = result.1 {
828 if key.crypt_ctx(self.object_id, 0).is_some() {
829 if !store.filesystem().options().barriers_enabled {
830 return Err(anyhow!(FxfsError::InvalidArgs)
831 .context("Barriers must be enabled for inline encrypted writes."));
832 }
833 }
834 }
835
836 Ok(result)
837 }
838
839 async fn get_or_create_key(
843 &self,
844 transaction: &mut Transaction<'_>,
845 ) -> Result<Arc<dyn Cipher>, Error> {
846 let store = self.store();
847
848 if let Some(key) = store.key_manager.get(self.object_id).await.context("get failed")? {
850 return Ok(key);
851 }
852
853 let crypt = store.crypt().ok_or_else(|| anyhow!("No crypt!"))?;
854
855 let (mut encryption_keys, mut cipher_set) = if let Some(item) =
857 store.tree.find(&ObjectKey::keys(self.object_id)).await.context("find failed")?
858 {
859 if let ObjectValue::Keys(encryption_keys) = item.value {
860 let cipher_set = store
861 .key_manager
862 .get_keys(
863 self.object_id,
864 crypt.as_ref(),
865 &mut Some(async || Ok(encryption_keys.clone())),
866 false,
867 false,
868 )
869 .await
870 .context("get_keys failed")?;
871 match cipher_set.find_key(VOLUME_DATA_KEY_ID) {
872 FindKeyResult::NotFound => {}
873 FindKeyResult::Unavailable => return Err(FxfsError::NoKey.into()),
874 FindKeyResult::Key(key) => return Ok(key),
875 }
876 (encryption_keys, (*cipher_set).clone())
877 } else {
878 return Err(anyhow!(FxfsError::Inconsistent));
879 }
880 } else {
881 Default::default()
882 };
883
884 let (key, unwrapped_key) = crypt.create_key(self.object_id, KeyPurpose::Data).await?;
886 let cipher: Arc<dyn Cipher> = Arc::new(FxfsCipher::new(&unwrapped_key));
887
888 cipher_set.add_key(VOLUME_DATA_KEY_ID, CipherHolder::Cipher(cipher.clone()));
891 let cipher_set = Arc::new(cipher_set);
892
893 struct UnwrappedKeys {
896 object_id: u64,
897 new_keys: Arc<CipherSet>,
898 }
899
900 impl AssociatedObject for UnwrappedKeys {
901 fn will_apply_mutation(
902 &self,
903 _mutation: &Mutation,
904 object_id: u64,
905 manager: &ObjectManager,
906 ) {
907 manager.store(object_id).unwrap().key_manager.insert(
908 self.object_id,
909 self.new_keys.clone(),
910 false,
911 );
912 }
913 }
914
915 encryption_keys.insert(VOLUME_DATA_KEY_ID, EncryptionKey::Fxfs(key).into());
916
917 transaction.add_with_object(
918 store.store_object_id(),
919 Mutation::replace_or_insert_object(
920 ObjectKey::keys(self.object_id),
921 ObjectValue::keys(encryption_keys),
922 ),
923 AssocObj::Owned(Box::new(UnwrappedKeys {
924 object_id: self.object_id,
925 new_keys: cipher_set,
926 })),
927 );
928
929 Ok(cipher)
930 }
931
932 pub async fn read(
933 &self,
934 attribute_id: u64,
935 offset: u64,
936 mut buf: MutableBufferRef<'_>,
937 ) -> Result<usize, Error> {
938 let fs = self.store().filesystem();
939 let guard = fs
940 .lock_manager()
941 .read_lock(lock_keys![LockKey::object_attribute(
942 self.store().store_object_id(),
943 self.object_id(),
944 attribute_id,
945 )])
946 .await;
947
948 let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
949 let item = self.store().tree().find(&key).await?;
950 let size = match item {
951 Some(item) if item.key == key => match item.value {
952 ObjectValue::Attribute { size, .. } => size,
953 _ => bail!(FxfsError::Inconsistent),
954 },
955 _ => return Ok(0),
956 };
957 if offset >= size {
958 return Ok(0);
959 }
960 let length = min(buf.len() as u64, size - offset) as usize;
961 buf = buf.subslice_mut(0..length);
962 self.read_unchecked(attribute_id, offset, buf, &guard).await?;
963 Ok(length)
964 }
965
966 pub(super) async fn read_unchecked(
975 &self,
976 attribute_id: u64,
977 mut offset: u64,
978 mut buf: MutableBufferRef<'_>,
979 _guard: &ReadGuard<'_>,
980 ) -> Result<(), Error> {
981 if buf.len() == 0 {
982 return Ok(());
983 }
984 let end_offset = offset + buf.len() as u64;
985
986 self.store().logical_read_ops.fetch_add(1, Ordering::Relaxed);
987
988 let block_size = self.block_size() as u64;
991 let device_block_size = self.store().device.block_size() as u64;
992 assert_eq!(offset % block_size, 0);
993 assert_eq!(buf.range().start as u64 % device_block_size, 0);
994 let tree = &self.store().tree;
995 let layer_set = tree.layer_set();
996 let mut merger = layer_set.merger();
997 let mut iter = merger
998 .query(Query::LimitedRange(&ObjectKey::extent(
999 self.object_id(),
1000 attribute_id,
1001 offset..end_offset,
1002 )))
1003 .await?;
1004 let end_align = ((offset + buf.len() as u64) % block_size) as usize;
1005 let trace = self.trace();
1006 let reads = FuturesUnordered::new();
1007 while let Some(ItemRef {
1008 key:
1009 ObjectKey {
1010 object_id,
1011 data: ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
1012 },
1013 value: ObjectValue::Extent(extent_value),
1014 ..
1015 }) = iter.get()
1016 {
1017 if *object_id != self.object_id() || *attr_id != attribute_id {
1018 break;
1019 }
1020 ensure!(
1021 extent_key.range.is_valid() && extent_key.range.is_aligned(block_size),
1022 FxfsError::Inconsistent
1023 );
1024 if extent_key.range.start > offset {
1025 let to_zero = min(extent_key.range.start - offset, buf.len() as u64) as usize;
1027 for i in &mut buf.as_mut_slice()[..to_zero] {
1028 *i = 0;
1029 }
1030 buf = buf.subslice_mut(to_zero..);
1031 if buf.is_empty() {
1032 break;
1033 }
1034 offset += to_zero as u64;
1035 }
1036
1037 if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1038 let mut device_offset = device_offset + (offset - extent_key.range.start);
1039 let key_id = *key_id;
1040
1041 let to_copy = min(buf.len() - end_align, (extent_key.range.end - offset) as usize);
1042 if to_copy > 0 {
1043 if trace {
1044 info!(
1045 store_id = self.store().store_object_id(),
1046 oid = self.object_id(),
1047 device_range:? = (device_offset..device_offset + to_copy as u64),
1048 offset,
1049 range:? = extent_key.range,
1050 block_size;
1051 "R",
1052 );
1053 }
1054 let (mut head, tail) = buf.split_at_mut(to_copy);
1055 let maybe_bitmap = match mode {
1056 ExtentMode::OverwritePartial(bitmap) => {
1057 let mut read_bitmap = bitmap.clone().split_off(
1058 ((offset - extent_key.range.start) / block_size) as usize,
1059 );
1060 read_bitmap.truncate(to_copy / block_size as usize);
1061 Some(read_bitmap)
1062 }
1063 _ => None,
1064 };
1065 reads.push(async move {
1066 self.read_and_decrypt(device_offset, offset, head.reborrow(), key_id)
1067 .await?;
1068 if let Some(bitmap) = maybe_bitmap {
1069 apply_bitmap_zeroing(self.block_size() as usize, &bitmap, head);
1070 }
1071 Ok::<(), Error>(())
1072 });
1073 buf = tail;
1074 if buf.is_empty() {
1075 break;
1076 }
1077 offset += to_copy as u64;
1078 device_offset += to_copy as u64;
1079 }
1080
1081 if offset < extent_key.range.end && end_align > 0 {
1084 if let ExtentMode::OverwritePartial(bitmap) = mode {
1085 let bitmap_offset = (offset - extent_key.range.start) / block_size;
1086 if !bitmap.get(bitmap_offset as usize).ok_or(FxfsError::Inconsistent)? {
1087 break;
1089 }
1090 }
1091 let mut align_buf =
1092 self.store().device.allocate_buffer(block_size as usize).await;
1093 if trace {
1094 info!(
1095 store_id = self.store().store_object_id(),
1096 oid = self.object_id(),
1097 device_range:? = (device_offset..device_offset + align_buf.len() as u64);
1098 "RT",
1099 );
1100 }
1101 self.read_and_decrypt(device_offset, offset, align_buf.as_mut(), key_id)
1102 .await?;
1103 buf.as_mut_slice().copy_from_slice(&align_buf.as_slice()[..end_align]);
1104 buf = buf.subslice_mut(0..0);
1105 break;
1106 }
1107 } else if extent_key.range.end >= offset + buf.len() as u64 {
1108 break;
1110 }
1111
1112 iter.advance().await?;
1113 }
1114 reads.try_collect::<()>().await?;
1115 buf.as_mut_slice().fill(0);
1116 Ok(())
1117 }
1118
1119 pub async fn read_attr(&self, attribute_id: u64) -> Result<Option<Box<[u8]>>, Error> {
1121 let store = self.store();
1122 let tree = &store.tree;
1123 let layer_set = tree.layer_set();
1124 let mut merger = layer_set.merger();
1125 let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1126 let iter = merger.query(Query::FullRange(&key)).await?;
1127 match iter.get() {
1128 Some(item) if item.key == &key => match item.value {
1129 ObjectValue::Attribute { .. } => Ok(Some(self.read_attr_from_iter(iter).await?)),
1130 ObjectValue::None => Ok(None),
1132 _ => Err(FxfsError::Inconsistent.into()),
1133 },
1134 _ => Ok(None),
1135 }
1136 }
1137
1138 pub async fn read_attr_from_iter(
1141 &self,
1142 mut iter: MergerIterator<'_, '_, ObjectKey, ObjectValue>,
1143 ) -> Result<Box<[u8]>, Error> {
1144 let (mut buffer, size, attribute_id) = match iter.get() {
1145 Some(ItemRef {
1146 key:
1147 ObjectKey {
1148 object_id,
1149 data: ObjectKeyData::Attribute(attribute_id, AttributeKey::Attribute),
1150 },
1151 value: ObjectValue::Attribute { size, .. },
1152 ..
1153 }) if *object_id == self.object_id => {
1154 (
1156 self.store()
1157 .device
1158 .allocate_buffer(round_up(*size, self.block_size()).unwrap() as usize)
1159 .await,
1160 *size as usize,
1161 *attribute_id,
1162 )
1163 }
1164 _ => bail!(FxfsError::InvalidArgs),
1165 };
1166
1167 self.store().logical_read_ops.fetch_add(1, Ordering::Relaxed);
1168 let mut last_offset = 0;
1169 loop {
1170 iter.advance().await?;
1171 match iter.get() {
1172 Some(ItemRef {
1173 key:
1174 ObjectKey {
1175 object_id,
1176 data:
1177 ObjectKeyData::Attribute(attr_id, AttributeKey::Extent(extent_key)),
1178 },
1179 value: ObjectValue::Extent(extent_value),
1180 ..
1181 }) if *object_id == self.object_id() && *attr_id == attribute_id => {
1182 if let ExtentValue::Some { device_offset, key_id, mode } = extent_value {
1183 let offset = extent_key.range.start as usize;
1184 buffer.as_mut_slice()[last_offset..offset].fill(0);
1185 let end = std::cmp::min(extent_key.range.end as usize, buffer.len());
1186 let maybe_bitmap = match mode {
1187 ExtentMode::OverwritePartial(bitmap) => {
1188 let mut read_bitmap = bitmap.clone();
1191 read_bitmap.truncate(
1192 (end - extent_key.range.start as usize)
1193 / self.block_size() as usize,
1194 );
1195 Some(read_bitmap)
1196 }
1197 _ => None,
1198 };
1199 self.read_and_decrypt(
1200 *device_offset,
1201 extent_key.range.start,
1202 buffer.subslice_mut(offset..end as usize),
1203 *key_id,
1204 )
1205 .await?;
1206 if let Some(bitmap) = maybe_bitmap {
1207 apply_bitmap_zeroing(
1208 self.block_size() as usize,
1209 &bitmap,
1210 buffer.subslice_mut(offset..end as usize),
1211 );
1212 }
1213 last_offset = end;
1214 if last_offset >= size {
1215 break;
1216 }
1217 }
1218 }
1219 _ => break,
1220 }
1221 }
1222 buffer.as_mut_slice()[std::cmp::min(last_offset, size)..].fill(0);
1223 Ok(buffer.as_slice()[..size].into())
1224 }
1225
1226 pub async fn write_at(
1234 &self,
1235 attribute_id: u64,
1236 offset: u64,
1237 buf: MutableBufferRef<'_>,
1238 key_id: Option<u64>,
1239 mut device_offset: u64,
1240 ) -> Result<MaybeChecksums, Error> {
1241 let mut transfer_buf;
1242 let block_size = self.block_size();
1243 let (range, mut transfer_buf_ref) =
1244 if offset % block_size == 0 && buf.len() as u64 % block_size == 0 {
1245 (offset..offset + buf.len() as u64, buf)
1246 } else {
1247 let (range, buf) = self.align_buffer(attribute_id, offset, buf.as_ref()).await?;
1248 transfer_buf = buf;
1249 device_offset -= offset - range.start;
1250 (range, transfer_buf.as_mut())
1251 };
1252
1253 let mut crypt_ctx = None;
1254 if let (_, Some(key)) = self.get_key(key_id).await? {
1255 if let Some(ctx) = key.crypt_ctx(self.object_id, range.start) {
1256 crypt_ctx = Some(ctx);
1257 } else {
1258 key.encrypt(
1259 self.object_id,
1260 device_offset,
1261 range.start,
1262 transfer_buf_ref.as_mut_slice(),
1263 )?;
1264 }
1265 }
1266 self.write_aligned(transfer_buf_ref.as_ref(), device_offset, crypt_ctx).await
1267 }
1268
1269 #[cfg(feature = "migration")]
1274 pub async fn raw_multi_write(
1275 &self,
1276 transaction: &mut Transaction<'_>,
1277 attribute_id: u64,
1278 key_id: Option<u64>,
1279 ranges: &[Range<u64>],
1280 buf: MutableBufferRef<'_>,
1281 ) -> Result<(), Error> {
1282 self.multi_write_internal(transaction, attribute_id, key_id, ranges, buf).await?;
1283 Ok(())
1284 }
1285
1286 async fn multi_write_internal(
1292 &self,
1293 transaction: &mut Transaction<'_>,
1294 attribute_id: u64,
1295 key_id: Option<u64>,
1296 ranges: &[Range<u64>],
1297 mut buf: MutableBufferRef<'_>,
1298 ) -> Result<(u64, u64), Error> {
1299 if buf.is_empty() {
1300 return Ok((0, 0));
1301 }
1302 let block_size = self.block_size();
1303 let store = self.store();
1304 let store_id = store.store_object_id();
1305
1306 let (key_id, key) = if key_id == Some(VOLUME_DATA_KEY_ID)
1309 && matches!(self.encryption, Encryption::CachedKeys)
1310 {
1311 (
1312 VOLUME_DATA_KEY_ID,
1313 Some(
1314 self.get_or_create_key(transaction)
1315 .await
1316 .context("get_or_create_key failed")?,
1317 ),
1318 )
1319 } else {
1320 self.get_key(key_id).await?
1321 };
1322 if let Some(key) = &key {
1323 if !key.supports_inline_encryption() {
1324 let mut slice = buf.as_mut_slice();
1325 for r in ranges {
1326 let l = r.end - r.start;
1327 let (head, tail) = slice.split_at_mut(l as usize);
1328 key.encrypt(
1329 self.object_id,
1330 0, r.start,
1332 head,
1333 )?;
1334 slice = tail;
1335 }
1336 }
1337 }
1338
1339 let mut allocated = 0;
1340 let allocator = store.allocator();
1341 let trace = self.trace();
1342 let mut writes = FuturesOrdered::new();
1343
1344 let mut logical_ranges = ranges.iter();
1345 let mut current_range = logical_ranges.next().unwrap().clone();
1346
1347 while !buf.is_empty() {
1348 let mut device_range = allocator
1349 .allocate(transaction, store_id, buf.len() as u64)
1350 .await
1351 .context("allocation failed")?;
1352 if trace {
1353 info!(
1354 store_id,
1355 oid = self.object_id(),
1356 device_range:?,
1357 len = device_range.end - device_range.start;
1358 "A",
1359 );
1360 }
1361 let mut device_range_len = device_range.end - device_range.start;
1362 allocated += device_range_len;
1363 while device_range_len > 0 {
1365 if current_range.end <= current_range.start {
1366 current_range = logical_ranges.next().unwrap().clone();
1367 }
1368 let (crypt_ctx, split) = if let Some(key) = &key {
1369 if key.supports_inline_encryption() {
1370 let split = std::cmp::min(
1371 current_range.end - current_range.start,
1372 device_range_len,
1373 );
1374 let crypt_ctx = key.crypt_ctx(self.object_id, current_range.start);
1375 current_range.start += split;
1376 (crypt_ctx, split)
1377 } else {
1378 (None, device_range_len)
1379 }
1380 } else {
1381 (None, device_range_len)
1382 };
1383
1384 let (head, tail) = buf.split_at_mut(split as usize);
1385 buf = tail;
1386
1387 writes.push_back(async move {
1388 let len = head.len() as u64;
1389 Result::<_, Error>::Ok((
1390 device_range.start,
1391 len,
1392 self.write_aligned(head.as_ref(), device_range.start, crypt_ctx).await?,
1393 ))
1394 });
1395 device_range.start += split;
1396 device_range_len = device_range.end - device_range.start;
1397 }
1398 }
1399
1400 self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1401 let ((mutations, checksums), deallocated) = try_join!(
1402 async {
1403 let mut current_range = 0..0;
1404 let mut mutations = Vec::new();
1405 let mut out_checksums = Vec::new();
1406 let mut ranges = ranges.iter();
1407 while let Some((mut device_offset, mut len, mut checksums)) =
1408 writes.try_next().await?
1409 {
1410 while len > 0 {
1411 if current_range.end <= current_range.start {
1412 current_range = ranges.next().unwrap().clone();
1413 }
1414 let chunk_len = std::cmp::min(len, current_range.end - current_range.start);
1415 let tail = checksums.split_off((chunk_len / block_size) as usize);
1416 if let Some(checksums) = checksums.maybe_as_ref() {
1417 out_checksums.push((
1418 device_offset..device_offset + chunk_len,
1419 checksums.to_owned(),
1420 ));
1421 }
1422 mutations.push(Mutation::merge_object(
1423 ObjectKey::extent(
1424 self.object_id(),
1425 attribute_id,
1426 current_range.start..current_range.start + chunk_len,
1427 ),
1428 ObjectValue::Extent(ExtentValue::new(
1429 device_offset,
1430 checksums.to_mode(),
1431 key_id,
1432 )),
1433 ));
1434 checksums = tail;
1435 device_offset += chunk_len;
1436 len -= chunk_len;
1437 current_range.start += chunk_len;
1438 }
1439 }
1440 Result::<_, Error>::Ok((mutations, out_checksums))
1441 },
1442 async {
1443 let mut deallocated = 0;
1444 for r in ranges {
1445 deallocated +=
1446 self.deallocate_old_extents(transaction, attribute_id, r.clone()).await?;
1447 }
1448 Result::<_, Error>::Ok(deallocated)
1449 }
1450 )?;
1451
1452 for m in mutations {
1453 transaction.add(store_id, m);
1454 }
1455
1456 if !store.filesystem().options().barriers_enabled {
1458 for (r, c) in checksums {
1459 transaction.add_checksum(r, c, true);
1460 }
1461 }
1462 Ok((allocated, deallocated))
1463 }
1464
1465 pub async fn multi_write(
1471 &self,
1472 transaction: &mut Transaction<'_>,
1473 attribute_id: u64,
1474 key_id: Option<u64>,
1475 ranges: &[Range<u64>],
1476 buf: MutableBufferRef<'_>,
1477 ) -> Result<(), Error> {
1478 let (allocated, deallocated) =
1479 self.multi_write_internal(transaction, attribute_id, key_id, ranges, buf).await?;
1480 if allocated == 0 && deallocated == 0 {
1481 return Ok(());
1482 }
1483 self.update_allocated_size(transaction, allocated, deallocated).await
1484 }
1485
1486 pub async fn multi_overwrite<'a>(
1491 &'a self,
1492 transaction: &mut Transaction<'a>,
1493 attr_id: u64,
1494 ranges: &[Range<u64>],
1495 mut buf: MutableBufferRef<'_>,
1496 ) -> Result<(), Error> {
1497 if buf.is_empty() {
1498 return Ok(());
1499 }
1500 let block_size = self.block_size();
1501 let store = self.store();
1502 let tree = store.tree();
1503 let store_id = store.store_object_id();
1504
1505 let (key_id, key) = self.get_key(None).await?;
1506 if let Some(key) = &key {
1507 if !key.supports_inline_encryption() {
1508 let mut slice = buf.as_mut_slice();
1509 for r in ranges {
1510 let l = r.end - r.start;
1511 let (head, tail) = slice.split_at_mut(l as usize);
1512 key.encrypt(
1513 self.object_id,
1514 0, r.start,
1516 head,
1517 )?;
1518 slice = tail;
1519 }
1520 }
1521 }
1522
1523 let mut range_iter = ranges.iter();
1524 let mut target_range = range_iter.next().unwrap().clone();
1526 let mut mutations = Vec::new();
1527 let writes = FuturesUnordered::new();
1528
1529 let layer_set = tree.layer_set();
1530 let mut merger = layer_set.merger();
1531 let mut iter = merger
1532 .query(Query::FullRange(&ObjectKey::attribute(
1533 self.object_id(),
1534 attr_id,
1535 AttributeKey::Extent(ExtentKey::search_key_from_offset(target_range.start)),
1536 )))
1537 .await?;
1538
1539 loop {
1540 match iter.get() {
1541 Some(ItemRef {
1542 key:
1543 ObjectKey {
1544 object_id,
1545 data:
1546 ObjectKeyData::Attribute(
1547 attribute_id,
1548 AttributeKey::Extent(ExtentKey { range }),
1549 ),
1550 },
1551 value: ObjectValue::Extent(extent_value),
1552 ..
1553 }) if *object_id == self.object_id() && *attribute_id == attr_id => {
1554 if range.end <= target_range.start {
1558 iter.advance().await?;
1559 continue;
1560 }
1561 let (device_offset, mode) = match extent_value {
1562 ExtentValue::None => {
1563 return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1564 format!(
1565 "multi_overwrite failed: target_range ({}, {}) overlaps with \
1566 deleted extent found at ({}, {})",
1567 target_range.start, target_range.end, range.start, range.end,
1568 )
1569 });
1570 }
1571 ExtentValue::Some { device_offset, mode, .. } => (device_offset, mode),
1572 };
1573 if range.start > target_range.start {
1576 return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1577 format!(
1578 "multi_overwrite failed: target range ({}, {}) starts before first \
1579 extent found at ({}, {})",
1580 target_range.start, target_range.end, range.start, range.end,
1581 )
1582 });
1583 }
1584 let mut bitmap = match mode {
1585 ExtentMode::Raw | ExtentMode::Cow(_) => {
1586 return Err(anyhow!(FxfsError::Inconsistent)).with_context(|| {
1587 format!(
1588 "multi_overwrite failed: \
1589 extent from ({}, {}) which overlaps target range ({}, {}) had the \
1590 wrong extent mode",
1591 range.start, range.end, target_range.start, target_range.end,
1592 )
1593 });
1594 }
1595 ExtentMode::OverwritePartial(bitmap) => {
1596 OverwriteBitmaps::new(bitmap.clone())
1597 }
1598 ExtentMode::Overwrite => OverwriteBitmaps::None,
1599 };
1600 loop {
1601 let offset_within_extent = target_range.start - range.start;
1602 let bitmap_offset = offset_within_extent / block_size;
1603 let write_device_offset = *device_offset + offset_within_extent;
1604 let write_end = min(range.end, target_range.end);
1605 let write_len = write_end - target_range.start;
1606 let write_device_range =
1607 write_device_offset..write_device_offset + write_len;
1608 let (current_buf, remaining_buf) = buf.split_at_mut(write_len as usize);
1609
1610 bitmap.set_offset(bitmap_offset as usize);
1611 let checksum_ranges = ChecksumRangeChunk::group_first_write_ranges(
1612 &mut bitmap,
1613 block_size,
1614 write_device_range,
1615 );
1616
1617 let crypt_ctx = if let Some(key) = &key {
1618 key.crypt_ctx(self.object_id, target_range.start)
1619 } else {
1620 None
1621 };
1622
1623 writes.push(async move {
1624 let maybe_checksums = self
1625 .write_aligned(current_buf.as_ref(), write_device_offset, crypt_ctx)
1626 .await?;
1627 Ok::<_, Error>(match maybe_checksums {
1628 MaybeChecksums::None => Vec::new(),
1629 MaybeChecksums::Fletcher(checksums) => checksum_ranges
1630 .into_iter()
1631 .map(
1632 |ChecksumRangeChunk {
1633 checksum_range,
1634 device_range,
1635 is_first_write,
1636 }| {
1637 (
1638 device_range,
1639 checksums[checksum_range].to_vec(),
1640 is_first_write,
1641 )
1642 },
1643 )
1644 .collect(),
1645 })
1646 });
1647 buf = remaining_buf;
1648 target_range.start += write_len;
1649 if target_range.start == target_range.end {
1650 match range_iter.next() {
1651 None => break,
1652 Some(next_range) => target_range = next_range.clone(),
1653 }
1654 }
1655 if range.end <= target_range.start {
1656 break;
1657 }
1658 }
1659 if let Some((mut bitmap, write_bitmap)) = bitmap.take_bitmaps() {
1660 if bitmap.or(&write_bitmap) {
1661 let mode = if bitmap.all() {
1662 ExtentMode::Overwrite
1663 } else {
1664 ExtentMode::OverwritePartial(bitmap)
1665 };
1666 mutations.push(Mutation::merge_object(
1667 ObjectKey::extent(self.object_id(), attr_id, range.clone()),
1668 ObjectValue::Extent(ExtentValue::new(*device_offset, mode, key_id)),
1669 ))
1670 }
1671 }
1672 if target_range.start == target_range.end {
1673 break;
1674 }
1675 iter.advance().await?;
1676 }
1677 _ => bail!(anyhow!(FxfsError::Internal).context(
1681 "found a non-extent object record while there were still ranges to process"
1682 )),
1683 }
1684 }
1685
1686 let checksums = writes.try_collect::<Vec<_>>().await?;
1687 if !store.filesystem().options().barriers_enabled {
1689 for (r, c, first_write) in checksums.into_iter().flatten() {
1690 transaction.add_checksum(r, c, first_write);
1691 }
1692 }
1693
1694 for m in mutations {
1695 transaction.add(store_id, m);
1696 }
1697
1698 Ok(())
1699 }
1700
1701 #[trace]
1708 pub async fn write_new_attr_in_batches<'a>(
1709 &'a self,
1710 transaction: &mut Transaction<'a>,
1711 attribute_id: u64,
1712 data: &[u8],
1713 batch_size: usize,
1714 ) -> Result<(), Error> {
1715 transaction.add(
1716 self.store().store_object_id,
1717 Mutation::replace_or_insert_object(
1718 ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1719 ObjectValue::attribute(data.len() as u64, false),
1720 ),
1721 );
1722 let chunks = data.chunks(batch_size);
1723 let num_chunks = chunks.len();
1724 if num_chunks > 1 {
1725 transaction.add(
1726 self.store().store_object_id,
1727 Mutation::replace_or_insert_object(
1728 ObjectKey::graveyard_attribute_entry(
1729 self.store().graveyard_directory_object_id(),
1730 self.object_id(),
1731 attribute_id,
1732 ),
1733 ObjectValue::Some,
1734 ),
1735 );
1736 }
1737 let mut start_offset = 0;
1738 for (i, chunk) in chunks.enumerate() {
1739 let rounded_len = round_up(chunk.len() as u64, self.block_size()).unwrap();
1740 let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1741 let slice = buffer.as_mut_slice();
1742 slice[..chunk.len()].copy_from_slice(chunk);
1743 slice[chunk.len()..].fill(0);
1744 self.multi_write(
1745 transaction,
1746 attribute_id,
1747 Some(VOLUME_DATA_KEY_ID),
1748 &[start_offset..start_offset + rounded_len],
1749 buffer.as_mut(),
1750 )
1751 .await?;
1752 start_offset += rounded_len;
1753 if i < num_chunks - 1 {
1755 transaction.commit_and_continue().await?;
1756 }
1757 }
1758 Ok(())
1759 }
1760
1761 pub async fn write_attr(
1768 &self,
1769 transaction: &mut Transaction<'_>,
1770 attribute_id: u64,
1771 data: &[u8],
1772 ) -> Result<NeedsTrim, Error> {
1773 let rounded_len = round_up(data.len() as u64, self.block_size()).unwrap();
1774 let store = self.store();
1775 let tree = store.tree();
1776 let should_trim = if let Some(item) = tree
1777 .find(&ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute))
1778 .await?
1779 {
1780 match item.value {
1781 ObjectValue::Attribute { size: _, has_overwrite_extents: true } => {
1782 bail!(
1783 anyhow!(FxfsError::Inconsistent)
1784 .context("write_attr on an attribute with overwrite extents")
1785 )
1786 }
1787 ObjectValue::Attribute { size, .. } => (data.len() as u64) < size,
1788 _ => bail!(FxfsError::Inconsistent),
1789 }
1790 } else {
1791 false
1792 };
1793 let mut buffer = self.store().device.allocate_buffer(rounded_len as usize).await;
1794 let slice = buffer.as_mut_slice();
1795 slice[..data.len()].copy_from_slice(data);
1796 slice[data.len()..].fill(0);
1797 self.multi_write(
1798 transaction,
1799 attribute_id,
1800 Some(VOLUME_DATA_KEY_ID),
1801 &[0..rounded_len],
1802 buffer.as_mut(),
1803 )
1804 .await?;
1805 transaction.add(
1806 self.store().store_object_id,
1807 Mutation::replace_or_insert_object(
1808 ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute),
1809 ObjectValue::attribute(data.len() as u64, false),
1810 ),
1811 );
1812 if should_trim {
1813 self.shrink(transaction, attribute_id, data.len() as u64).await
1814 } else {
1815 Ok(NeedsTrim(false))
1816 }
1817 }
1818
1819 pub async fn list_extended_attributes(&self) -> Result<Vec<Vec<u8>>, Error> {
1820 let layer_set = self.store().tree().layer_set();
1821 let mut merger = layer_set.merger();
1822 let mut iter = merger
1824 .query(Query::FullRange(&ObjectKey::extended_attribute(self.object_id(), Vec::new())))
1825 .await?;
1826 let mut out = Vec::new();
1827 while let Some(item) = iter.get() {
1828 if item.value != &ObjectValue::None {
1830 match item.key {
1831 ObjectKey { object_id, data: ObjectKeyData::ExtendedAttribute { name } } => {
1832 if self.object_id() != *object_id {
1833 bail!(
1834 anyhow!(FxfsError::Inconsistent)
1835 .context("list_extended_attributes: wrong object id")
1836 )
1837 }
1838 out.push(name.clone());
1839 }
1840 _ => break,
1843 }
1844 }
1845 iter.advance().await?;
1846 }
1847 Ok(out)
1848 }
1849
1850 pub async fn get_inline_selinux_context(&self) -> Result<Option<fio::SelinuxContext>, Error> {
1854 const_assert!(fio::MAX_SELINUX_CONTEXT_ATTRIBUTE_LEN as usize <= MAX_INLINE_XATTR_SIZE);
1857 let item = match self
1858 .store()
1859 .tree()
1860 .find(&ObjectKey::extended_attribute(
1861 self.object_id(),
1862 fio::SELINUX_CONTEXT_NAME.into(),
1863 ))
1864 .await?
1865 {
1866 Some(item) => item,
1867 None => return Ok(None),
1868 };
1869 match item.value {
1870 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => {
1871 Ok(Some(fio::SelinuxContext::Data(value)))
1872 }
1873 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(_)) => {
1874 Ok(Some(fio::SelinuxContext::UseExtendedAttributes(fio::EmptyStruct {})))
1875 }
1876 _ => {
1877 bail!(
1878 anyhow!(FxfsError::Inconsistent)
1879 .context("get_inline_extended_attribute: Expected ExtendedAttribute value")
1880 )
1881 }
1882 }
1883 }
1884
1885 pub async fn get_extended_attribute(&self, name: Vec<u8>) -> Result<Vec<u8>, Error> {
1886 let item = self
1887 .store()
1888 .tree()
1889 .find(&ObjectKey::extended_attribute(self.object_id(), name))
1890 .await?
1891 .ok_or(FxfsError::NotFound)?;
1892 match item.value {
1893 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(value)) => Ok(value),
1894 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1895 Ok(self.read_attr(id).await?.ok_or(FxfsError::Inconsistent)?.into_vec())
1896 }
1897 _ => {
1898 bail!(
1899 anyhow!(FxfsError::Inconsistent)
1900 .context("get_extended_attribute: Expected ExtendedAttribute value")
1901 )
1902 }
1903 }
1904 }
1905
1906 pub async fn set_extended_attribute(
1907 &self,
1908 name: Vec<u8>,
1909 value: Vec<u8>,
1910 mode: SetExtendedAttributeMode,
1911 ) -> Result<(), Error> {
1912 let store = self.store();
1913 let fs = store.filesystem();
1914 let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
1917 let mut transaction = fs.new_transaction(keys, Options::default()).await?;
1918 self.set_extended_attribute_impl(name, value, mode, &mut transaction).await?;
1919 transaction.commit().await?;
1920 Ok(())
1921 }
1922
1923 async fn set_extended_attribute_impl(
1924 &self,
1925 name: Vec<u8>,
1926 value: Vec<u8>,
1927 mode: SetExtendedAttributeMode,
1928 transaction: &mut Transaction<'_>,
1929 ) -> Result<(), Error> {
1930 ensure!(name.len() <= MAX_XATTR_NAME_SIZE, FxfsError::TooBig);
1931 ensure!(value.len() <= MAX_XATTR_VALUE_SIZE, FxfsError::TooBig);
1932 let tree = self.store().tree();
1933 let object_key = ObjectKey::extended_attribute(self.object_id(), name);
1934
1935 let existing_attribute_id = {
1936 let (found, existing_attribute_id) = match tree.find(&object_key).await? {
1937 None => (false, None),
1938 Some(Item { value, .. }) => (
1939 true,
1940 match value {
1941 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
1942 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => {
1943 Some(id)
1944 }
1945 _ => bail!(
1946 anyhow!(FxfsError::Inconsistent)
1947 .context("expected extended attribute value")
1948 ),
1949 },
1950 ),
1951 };
1952 match mode {
1953 SetExtendedAttributeMode::Create if found => {
1954 bail!(FxfsError::AlreadyExists)
1955 }
1956 SetExtendedAttributeMode::Replace if !found => {
1957 bail!(FxfsError::NotFound)
1958 }
1959 _ => (),
1960 }
1961 existing_attribute_id
1962 };
1963
1964 if let Some(attribute_id) = existing_attribute_id {
1965 let _ = self.write_attr(transaction, attribute_id, &value).await?;
1971 } else if value.len() <= MAX_INLINE_XATTR_SIZE {
1972 transaction.add(
1973 self.store().store_object_id(),
1974 Mutation::replace_or_insert_object(
1975 object_key,
1976 ObjectValue::inline_extended_attribute(value),
1977 ),
1978 );
1979 } else {
1980 let mut attribute_id = EXTENDED_ATTRIBUTE_RANGE_START;
1986 let layer_set = tree.layer_set();
1987 let mut merger = layer_set.merger();
1988 let key = ObjectKey::attribute(self.object_id(), attribute_id, AttributeKey::Attribute);
1989 let mut iter = merger.query(Query::FullRange(&key)).await?;
1990 loop {
1991 match iter.get() {
1992 None => break,
1995 Some(ItemRef {
1996 key: ObjectKey { object_id, data: ObjectKeyData::Attribute(attr_id, _) },
1997 value,
1998 ..
1999 }) if *object_id == self.object_id() => {
2000 if matches!(value, ObjectValue::None) {
2001 break;
2004 }
2005 if attribute_id < *attr_id {
2006 break;
2008 } else if attribute_id == *attr_id {
2009 attribute_id += 1;
2011 if attribute_id == EXTENDED_ATTRIBUTE_RANGE_END {
2012 bail!(FxfsError::NoSpace);
2013 }
2014 }
2015 }
2019 _ => break,
2023 }
2024 iter.advance().await?;
2025 }
2026
2027 let _ = self.write_attr(transaction, attribute_id, &value).await?;
2029 transaction.add(
2030 self.store().store_object_id(),
2031 Mutation::replace_or_insert_object(
2032 object_key,
2033 ObjectValue::extended_attribute(attribute_id),
2034 ),
2035 );
2036 }
2037
2038 Ok(())
2039 }
2040
2041 pub async fn remove_extended_attribute(&self, name: Vec<u8>) -> Result<(), Error> {
2042 let store = self.store();
2043 let tree = store.tree();
2044 let object_key = ObjectKey::extended_attribute(self.object_id(), name);
2045
2046 let keys = lock_keys![LockKey::object(store.store_object_id(), self.object_id())];
2051 let mut transaction = store.filesystem().new_transaction(keys, Options::default()).await?;
2052
2053 let attribute_to_delete =
2054 match tree.find(&object_key).await?.ok_or(FxfsError::NotFound)?.value {
2055 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::AttributeId(id)) => Some(id),
2056 ObjectValue::ExtendedAttribute(ExtendedAttributeValue::Inline(..)) => None,
2057 _ => {
2058 bail!(
2059 anyhow!(FxfsError::Inconsistent)
2060 .context("remove_extended_attribute: Expected ExtendedAttribute value")
2061 )
2062 }
2063 };
2064
2065 transaction.add(
2066 store.store_object_id(),
2067 Mutation::replace_or_insert_object(object_key, ObjectValue::None),
2068 );
2069
2070 if let Some(attribute_id) = attribute_to_delete {
2077 let trim_result = store
2078 .trim_some(
2079 &mut transaction,
2080 self.object_id(),
2081 attribute_id,
2082 TrimMode::FromOffset(0),
2083 )
2084 .await?;
2085 assert_matches!(trim_result, TrimResult::Done(_));
2088 transaction.add(
2089 store.store_object_id(),
2090 Mutation::replace_or_insert_object(
2091 ObjectKey::attribute(self.object_id, attribute_id, AttributeKey::Attribute),
2092 ObjectValue::None,
2093 ),
2094 );
2095 }
2096
2097 transaction.commit().await?;
2098 Ok(())
2099 }
2100
2101 pub fn pre_fetch_keys(&self) -> Option<impl Future<Output = ()> + use<S>> {
2104 if let Encryption::CachedKeys = self.encryption {
2105 let owner = self.owner.clone();
2106 let object_id = self.object_id;
2107 Some(async move {
2108 let store = owner.as_ref().as_ref();
2109 if let Some(crypt) = store.crypt() {
2110 let _ = store
2111 .key_manager
2112 .get_keys(
2113 object_id,
2114 crypt.as_ref(),
2115 &mut Some(async || store.get_keys(object_id).await),
2116 false,
2117 false,
2118 )
2119 .await;
2120 }
2121 })
2122 } else {
2123 None
2124 }
2125 }
2126}
2127
2128impl<S: HandleOwner> Drop for StoreObjectHandle<S> {
2129 fn drop(&mut self) {
2130 if self.is_encrypted() {
2131 let _ = self.store().key_manager.remove(self.object_id);
2132 }
2133 }
2134}
2135
2136#[must_use]
2140pub struct NeedsTrim(pub bool);
2141
2142#[cfg(test)]
2143mod tests {
2144 use super::{ChecksumRangeChunk, OverwriteBitmaps};
2145 use crate::errors::FxfsError;
2146 use crate::filesystem::{FxFilesystem, OpenFxFilesystem};
2147 use crate::object_handle::ObjectHandle;
2148 use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
2149 use crate::object_store::transaction::{Mutation, Options, lock_keys};
2150 use crate::object_store::{
2151 AttributeKey, DataObjectHandle, Directory, FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions,
2152 LockKey, ObjectKey, ObjectStore, ObjectValue, SetExtendedAttributeMode, StoreObjectHandle,
2153 };
2154 use bit_vec::BitVec;
2155 use fuchsia_async as fasync;
2156 use futures::join;
2157 use std::sync::Arc;
2158 use storage_device::DeviceHolder;
2159 use storage_device::fake_device::FakeDevice;
2160
2161 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2162 const TEST_OBJECT_NAME: &str = "foo";
2163
2164 fn is_error(actual: anyhow::Error, expected: FxfsError) {
2165 assert_eq!(*actual.root_cause().downcast_ref::<FxfsError>().unwrap(), expected)
2166 }
2167
2168 async fn test_filesystem() -> OpenFxFilesystem {
2169 let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2170 FxFilesystem::new_empty(device).await.expect("new_empty failed")
2171 }
2172
2173 async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
2174 {
2175 let fs = test_filesystem().await;
2176 let store = fs.root_store();
2177
2178 let mut transaction = fs
2179 .clone()
2180 .new_transaction(
2181 lock_keys![LockKey::object(
2182 store.store_object_id(),
2183 store.root_directory_object_id()
2184 )],
2185 Options::default(),
2186 )
2187 .await
2188 .expect("new_transaction failed");
2189
2190 let object =
2191 ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2192 .await
2193 .expect("create_object failed");
2194
2195 let root_directory =
2196 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2197 root_directory
2198 .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
2199 .await
2200 .expect("add_child_file failed");
2201
2202 transaction.commit().await.expect("commit failed");
2203
2204 (fs, object)
2205 }
2206
2207 #[fuchsia::test(threads = 3)]
2208 async fn extended_attribute_double_remove() {
2209 let (fs, object) = test_filesystem_and_empty_object().await;
2214 let basic = Arc::new(StoreObjectHandle::new(
2215 object.owner().clone(),
2216 object.object_id(),
2217 false,
2218 HandleOptions::default(),
2219 false,
2220 ));
2221 let basic_a = basic.clone();
2222 let basic_b = basic.clone();
2223
2224 basic
2225 .set_extended_attribute(
2226 b"security.selinux".to_vec(),
2227 b"bar".to_vec(),
2228 SetExtendedAttributeMode::Set,
2229 )
2230 .await
2231 .expect("failed to set attribute");
2232
2233 let a_task = fasync::Task::spawn(async move {
2236 basic_a.remove_extended_attribute(b"security.selinux".to_vec()).await
2237 });
2238 let b_task = fasync::Task::spawn(async move {
2239 basic_b.remove_extended_attribute(b"security.selinux".to_vec()).await
2240 });
2241 match join!(a_task, b_task) {
2242 (Ok(()), Ok(())) => panic!("both remove calls succeeded"),
2243 (Err(_), Err(_)) => panic!("both remove calls failed"),
2244
2245 (Ok(()), Err(e)) => is_error(e, FxfsError::NotFound),
2246 (Err(e), Ok(())) => is_error(e, FxfsError::NotFound),
2247 }
2248
2249 fs.close().await.expect("Close failed");
2250 }
2251
2252 #[fuchsia::test(threads = 3)]
2253 async fn extended_attribute_double_create() {
2254 let (fs, object) = test_filesystem_and_empty_object().await;
2259 let basic = Arc::new(StoreObjectHandle::new(
2260 object.owner().clone(),
2261 object.object_id(),
2262 false,
2263 HandleOptions::default(),
2264 false,
2265 ));
2266 let basic_a = basic.clone();
2267 let basic_b = basic.clone();
2268
2269 let a_task = fasync::Task::spawn(async move {
2272 basic_a
2273 .set_extended_attribute(
2274 b"security.selinux".to_vec(),
2275 b"one".to_vec(),
2276 SetExtendedAttributeMode::Create,
2277 )
2278 .await
2279 });
2280 let b_task = fasync::Task::spawn(async move {
2281 basic_b
2282 .set_extended_attribute(
2283 b"security.selinux".to_vec(),
2284 b"two".to_vec(),
2285 SetExtendedAttributeMode::Create,
2286 )
2287 .await
2288 });
2289 match join!(a_task, b_task) {
2290 (Ok(()), Ok(())) => panic!("both set calls succeeded"),
2291 (Err(_), Err(_)) => panic!("both set calls failed"),
2292
2293 (Ok(()), Err(e)) => {
2294 assert_eq!(
2295 basic
2296 .get_extended_attribute(b"security.selinux".to_vec())
2297 .await
2298 .expect("failed to get xattr"),
2299 b"one"
2300 );
2301 is_error(e, FxfsError::AlreadyExists);
2302 }
2303 (Err(e), Ok(())) => {
2304 assert_eq!(
2305 basic
2306 .get_extended_attribute(b"security.selinux".to_vec())
2307 .await
2308 .expect("failed to get xattr"),
2309 b"two"
2310 );
2311 is_error(e, FxfsError::AlreadyExists);
2312 }
2313 }
2314
2315 fs.close().await.expect("Close failed");
2316 }
2317
2318 struct TestAttr {
2319 name: Vec<u8>,
2320 value: Vec<u8>,
2321 }
2322
2323 impl TestAttr {
2324 fn new(name: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> Self {
2325 Self { name: name.as_ref().to_vec(), value: value.as_ref().to_vec() }
2326 }
2327 fn name(&self) -> Vec<u8> {
2328 self.name.clone()
2329 }
2330 fn value(&self) -> Vec<u8> {
2331 self.value.clone()
2332 }
2333 }
2334
2335 #[fuchsia::test]
2336 async fn extended_attributes() {
2337 let (fs, object) = test_filesystem_and_empty_object().await;
2338
2339 let test_attr = TestAttr::new(b"security.selinux", b"foo");
2340
2341 assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2342 is_error(
2343 object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2344 FxfsError::NotFound,
2345 );
2346
2347 object
2348 .set_extended_attribute(
2349 test_attr.name(),
2350 test_attr.value(),
2351 SetExtendedAttributeMode::Set,
2352 )
2353 .await
2354 .unwrap();
2355 assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2356 assert_eq!(
2357 object.get_extended_attribute(test_attr.name()).await.unwrap(),
2358 test_attr.value()
2359 );
2360
2361 object.remove_extended_attribute(test_attr.name()).await.unwrap();
2362 assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2363 is_error(
2364 object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2365 FxfsError::NotFound,
2366 );
2367
2368 object
2370 .set_extended_attribute(
2371 test_attr.name(),
2372 test_attr.value(),
2373 SetExtendedAttributeMode::Set,
2374 )
2375 .await
2376 .unwrap();
2377 assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2378 assert_eq!(
2379 object.get_extended_attribute(test_attr.name()).await.unwrap(),
2380 test_attr.value()
2381 );
2382
2383 object.remove_extended_attribute(test_attr.name()).await.unwrap();
2384 assert_eq!(object.list_extended_attributes().await.unwrap(), Vec::<Vec<u8>>::new());
2385 is_error(
2386 object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2387 FxfsError::NotFound,
2388 );
2389
2390 fs.close().await.expect("close failed");
2391 }
2392
2393 #[fuchsia::test]
2394 async fn large_extended_attribute() {
2395 let (fs, object) = test_filesystem_and_empty_object().await;
2396
2397 let test_attr = TestAttr::new(b"security.selinux", vec![3u8; 300]);
2398
2399 object
2400 .set_extended_attribute(
2401 test_attr.name(),
2402 test_attr.value(),
2403 SetExtendedAttributeMode::Set,
2404 )
2405 .await
2406 .unwrap();
2407 assert_eq!(
2408 object.get_extended_attribute(test_attr.name()).await.unwrap(),
2409 test_attr.value()
2410 );
2411
2412 assert_eq!(
2415 object
2416 .read_attr(64)
2417 .await
2418 .expect("read_attr failed")
2419 .expect("read_attr returned none")
2420 .into_vec(),
2421 test_attr.value()
2422 );
2423
2424 object.remove_extended_attribute(test_attr.name()).await.unwrap();
2425 is_error(
2426 object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2427 FxfsError::NotFound,
2428 );
2429
2430 object
2432 .set_extended_attribute(
2433 test_attr.name(),
2434 test_attr.value(),
2435 SetExtendedAttributeMode::Set,
2436 )
2437 .await
2438 .unwrap();
2439 assert_eq!(
2440 object.get_extended_attribute(test_attr.name()).await.unwrap(),
2441 test_attr.value()
2442 );
2443 object.remove_extended_attribute(test_attr.name()).await.unwrap();
2444 is_error(
2445 object.get_extended_attribute(test_attr.name()).await.unwrap_err(),
2446 FxfsError::NotFound,
2447 );
2448
2449 fs.close().await.expect("close failed");
2450 }
2451
2452 #[fuchsia::test]
2453 async fn multiple_extended_attributes() {
2454 let (fs, object) = test_filesystem_and_empty_object().await;
2455
2456 let attrs = [
2457 TestAttr::new(b"security.selinux", b"foo"),
2458 TestAttr::new(b"large.attribute", vec![3u8; 300]),
2459 TestAttr::new(b"an.attribute", b"asdf"),
2460 TestAttr::new(b"user.big", vec![5u8; 288]),
2461 TestAttr::new(b"user.tiny", b"smol"),
2462 TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2463 TestAttr::new(b"also big", vec![7u8; 500]),
2464 TestAttr::new(b"all.ones", vec![1u8; 11111]),
2465 ];
2466
2467 for i in 0..attrs.len() {
2468 object
2469 .set_extended_attribute(
2470 attrs[i].name(),
2471 attrs[i].value(),
2472 SetExtendedAttributeMode::Set,
2473 )
2474 .await
2475 .unwrap();
2476 assert_eq!(
2477 object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2478 attrs[i].value()
2479 );
2480 }
2481
2482 for i in 0..attrs.len() {
2483 let mut found_attrs = object.list_extended_attributes().await.unwrap();
2485 let mut expected_attrs: Vec<Vec<u8>> = attrs.iter().skip(i).map(|a| a.name()).collect();
2486 found_attrs.sort();
2487 expected_attrs.sort();
2488 assert_eq!(found_attrs, expected_attrs);
2489 for j in i..attrs.len() {
2490 assert_eq!(
2491 object.get_extended_attribute(attrs[j].name()).await.unwrap(),
2492 attrs[j].value()
2493 );
2494 }
2495
2496 object.remove_extended_attribute(attrs[i].name()).await.expect("failed to remove");
2497 is_error(
2498 object.get_extended_attribute(attrs[i].name()).await.unwrap_err(),
2499 FxfsError::NotFound,
2500 );
2501 }
2502
2503 fs.close().await.expect("close failed");
2504 }
2505
2506 #[fuchsia::test]
2507 async fn multiple_extended_attributes_delete() {
2508 let (fs, object) = test_filesystem_and_empty_object().await;
2509 let store = object.owner().clone();
2510
2511 let attrs = [
2512 TestAttr::new(b"security.selinux", b"foo"),
2513 TestAttr::new(b"large.attribute", vec![3u8; 300]),
2514 TestAttr::new(b"an.attribute", b"asdf"),
2515 TestAttr::new(b"user.big", vec![5u8; 288]),
2516 TestAttr::new(b"user.tiny", b"smol"),
2517 TestAttr::new(b"this string doesn't matter", b"the quick brown fox etc"),
2518 TestAttr::new(b"also big", vec![7u8; 500]),
2519 TestAttr::new(b"all.ones", vec![1u8; 11111]),
2520 ];
2521
2522 for i in 0..attrs.len() {
2523 object
2524 .set_extended_attribute(
2525 attrs[i].name(),
2526 attrs[i].value(),
2527 SetExtendedAttributeMode::Set,
2528 )
2529 .await
2530 .unwrap();
2531 assert_eq!(
2532 object.get_extended_attribute(attrs[i].name()).await.unwrap(),
2533 attrs[i].value()
2534 );
2535 }
2536
2537 let root_directory =
2539 Directory::open(object.owner(), object.store().root_directory_object_id())
2540 .await
2541 .expect("open failed");
2542 let mut transaction = fs
2543 .clone()
2544 .new_transaction(
2545 lock_keys![
2546 LockKey::object(store.store_object_id(), store.root_directory_object_id()),
2547 LockKey::object(store.store_object_id(), object.object_id()),
2548 ],
2549 Options::default(),
2550 )
2551 .await
2552 .expect("new_transaction failed");
2553 crate::object_store::directory::replace_child(
2554 &mut transaction,
2555 None,
2556 (&root_directory, TEST_OBJECT_NAME),
2557 )
2558 .await
2559 .expect("replace_child failed");
2560 transaction.commit().await.unwrap();
2561 store.tombstone_object(object.object_id(), Options::default()).await.unwrap();
2562
2563 crate::fsck::fsck(fs.clone()).await.unwrap();
2564
2565 fs.close().await.expect("close failed");
2566 }
2567
2568 #[fuchsia::test]
2569 async fn extended_attribute_changing_sizes() {
2570 let (fs, object) = test_filesystem_and_empty_object().await;
2571
2572 let test_name = b"security.selinux";
2573 let test_small_attr = TestAttr::new(test_name, b"smol");
2574 let test_large_attr = TestAttr::new(test_name, vec![3u8; 300]);
2575
2576 object
2577 .set_extended_attribute(
2578 test_small_attr.name(),
2579 test_small_attr.value(),
2580 SetExtendedAttributeMode::Set,
2581 )
2582 .await
2583 .unwrap();
2584 assert_eq!(
2585 object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2586 test_small_attr.value()
2587 );
2588
2589 assert!(object.read_attr(64).await.expect("read_attr failed").is_none());
2591
2592 crate::fsck::fsck(fs.clone()).await.unwrap();
2593
2594 object
2595 .set_extended_attribute(
2596 test_large_attr.name(),
2597 test_large_attr.value(),
2598 SetExtendedAttributeMode::Set,
2599 )
2600 .await
2601 .unwrap();
2602 assert_eq!(
2603 object.get_extended_attribute(test_large_attr.name()).await.unwrap(),
2604 test_large_attr.value()
2605 );
2606
2607 assert_eq!(
2610 object
2611 .read_attr(64)
2612 .await
2613 .expect("read_attr failed")
2614 .expect("read_attr returned none")
2615 .into_vec(),
2616 test_large_attr.value()
2617 );
2618
2619 crate::fsck::fsck(fs.clone()).await.unwrap();
2620
2621 object
2622 .set_extended_attribute(
2623 test_small_attr.name(),
2624 test_small_attr.value(),
2625 SetExtendedAttributeMode::Set,
2626 )
2627 .await
2628 .unwrap();
2629 assert_eq!(
2630 object.get_extended_attribute(test_small_attr.name()).await.unwrap(),
2631 test_small_attr.value()
2632 );
2633
2634 assert_eq!(
2637 object
2638 .read_attr(64)
2639 .await
2640 .expect("read_attr failed")
2641 .expect("read_attr returned none")
2642 .into_vec(),
2643 test_small_attr.value()
2644 );
2645
2646 crate::fsck::fsck(fs.clone()).await.unwrap();
2647
2648 object.remove_extended_attribute(test_small_attr.name()).await.expect("failed to remove");
2649
2650 crate::fsck::fsck(fs.clone()).await.unwrap();
2651
2652 fs.close().await.expect("close failed");
2653 }
2654
2655 #[fuchsia::test]
2656 async fn extended_attribute_max_size() {
2657 let (fs, object) = test_filesystem_and_empty_object().await;
2658
2659 let test_attr = TestAttr::new(
2660 vec![3u8; super::MAX_XATTR_NAME_SIZE],
2661 vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2662 );
2663
2664 object
2665 .set_extended_attribute(
2666 test_attr.name(),
2667 test_attr.value(),
2668 SetExtendedAttributeMode::Set,
2669 )
2670 .await
2671 .unwrap();
2672 assert_eq!(
2673 object.get_extended_attribute(test_attr.name()).await.unwrap(),
2674 test_attr.value()
2675 );
2676 assert_eq!(object.list_extended_attributes().await.unwrap(), vec![test_attr.name()]);
2677 object.remove_extended_attribute(test_attr.name()).await.unwrap();
2678
2679 fs.close().await.expect("close failed");
2680 }
2681
2682 #[fuchsia::test]
2683 async fn extended_attribute_remove_then_create() {
2684 let (fs, object) = test_filesystem_and_empty_object().await;
2685
2686 let test_attr = TestAttr::new(
2687 vec![3u8; super::MAX_XATTR_NAME_SIZE],
2688 vec![1u8; super::MAX_XATTR_VALUE_SIZE],
2689 );
2690
2691 object
2692 .set_extended_attribute(
2693 test_attr.name(),
2694 test_attr.value(),
2695 SetExtendedAttributeMode::Create,
2696 )
2697 .await
2698 .unwrap();
2699 fs.journal().force_compact().await.unwrap();
2700 object.remove_extended_attribute(test_attr.name()).await.unwrap();
2701 object
2702 .set_extended_attribute(
2703 test_attr.name(),
2704 test_attr.value(),
2705 SetExtendedAttributeMode::Create,
2706 )
2707 .await
2708 .unwrap();
2709
2710 assert_eq!(
2711 object.get_extended_attribute(test_attr.name()).await.unwrap(),
2712 test_attr.value()
2713 );
2714
2715 fs.close().await.expect("close failed");
2716 }
2717
2718 #[fuchsia::test]
2719 async fn large_extended_attribute_max_number() {
2720 let (fs, object) = test_filesystem_and_empty_object().await;
2721
2722 let max_xattrs =
2723 super::EXTENDED_ATTRIBUTE_RANGE_END - super::EXTENDED_ATTRIBUTE_RANGE_START;
2724 for i in 0..max_xattrs {
2725 let test_attr = TestAttr::new(format!("{}", i).as_bytes(), vec![0x3; 300]);
2726 object
2727 .set_extended_attribute(
2728 test_attr.name(),
2729 test_attr.value(),
2730 SetExtendedAttributeMode::Set,
2731 )
2732 .await
2733 .unwrap_or_else(|_| panic!("failed to set xattr number {}", i));
2734 }
2735
2736 match object
2739 .set_extended_attribute(
2740 b"one.too.many".to_vec(),
2741 vec![0x3; 300],
2742 SetExtendedAttributeMode::Set,
2743 )
2744 .await
2745 {
2746 Ok(()) => panic!("set should not succeed"),
2747 Err(e) => is_error(e, FxfsError::NoSpace),
2748 }
2749
2750 object
2752 .set_extended_attribute(
2753 b"this.is.okay".to_vec(),
2754 b"small value".to_vec(),
2755 SetExtendedAttributeMode::Set,
2756 )
2757 .await
2758 .unwrap();
2759
2760 object
2762 .set_extended_attribute(b"11".to_vec(), vec![0x4; 300], SetExtendedAttributeMode::Set)
2763 .await
2764 .unwrap();
2765 object
2766 .set_extended_attribute(
2767 b"12".to_vec(),
2768 vec![0x1; 300],
2769 SetExtendedAttributeMode::Replace,
2770 )
2771 .await
2772 .unwrap();
2773
2774 object.remove_extended_attribute(b"5".to_vec()).await.unwrap();
2776 object
2777 .set_extended_attribute(
2778 b"new attr".to_vec(),
2779 vec![0x3; 300],
2780 SetExtendedAttributeMode::Set,
2781 )
2782 .await
2783 .unwrap();
2784
2785 fs.close().await.expect("close failed");
2786 }
2787
2788 #[fuchsia::test]
2789 async fn write_attr_trims_beyond_new_end() {
2790 let (fs, object) = test_filesystem_and_empty_object().await;
2795
2796 let block_size = fs.block_size();
2797 let buf_size = block_size * 2;
2798 let attribute_id = 10;
2799
2800 let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2801 let mut buffer = object.allocate_buffer(buf_size as usize).await;
2802 buffer.as_mut_slice().fill(3);
2803 object
2806 .multi_write(
2807 &mut transaction,
2808 attribute_id,
2809 &[0..block_size, block_size..block_size * 2],
2810 buffer.as_mut(),
2811 )
2812 .await
2813 .unwrap();
2814 transaction.add(
2815 object.store().store_object_id,
2816 Mutation::replace_or_insert_object(
2817 ObjectKey::attribute(object.object_id(), attribute_id, AttributeKey::Attribute),
2818 ObjectValue::attribute(block_size * 2, false),
2819 ),
2820 );
2821 transaction.commit().await.unwrap();
2822
2823 crate::fsck::fsck(fs.clone()).await.unwrap();
2824
2825 let mut transaction = (*object).new_transaction(attribute_id).await.unwrap();
2826 let needs_trim = (*object)
2827 .write_attr(&mut transaction, attribute_id, &vec![3u8; block_size as usize])
2828 .await
2829 .unwrap();
2830 assert!(!needs_trim.0);
2831 transaction.commit().await.unwrap();
2832
2833 crate::fsck::fsck(fs.clone()).await.unwrap();
2834
2835 fs.close().await.expect("close failed");
2836 }
2837
2838 #[fuchsia::test]
2839 async fn write_new_attr_in_batches_multiple_txns() {
2840 let (fs, object) = test_filesystem_and_empty_object().await;
2841 let merkle_tree = vec![1; 3 * WRITE_ATTR_BATCH_SIZE];
2842 let mut transaction =
2843 (*object).new_transaction(FSVERITY_MERKLE_ATTRIBUTE_ID).await.unwrap();
2844 object
2845 .write_new_attr_in_batches(
2846 &mut transaction,
2847 FSVERITY_MERKLE_ATTRIBUTE_ID,
2848 &merkle_tree,
2849 WRITE_ATTR_BATCH_SIZE,
2850 )
2851 .await
2852 .expect("failed to write merkle attribute");
2853
2854 transaction.add(
2855 object.store().store_object_id,
2856 Mutation::replace_or_insert_object(
2857 ObjectKey::graveyard_attribute_entry(
2858 object.store().graveyard_directory_object_id(),
2859 object.object_id(),
2860 FSVERITY_MERKLE_ATTRIBUTE_ID,
2861 ),
2862 ObjectValue::None,
2863 ),
2864 );
2865 transaction.commit().await.unwrap();
2866 assert_eq!(
2867 object.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
2868 Some(merkle_tree.into())
2869 );
2870
2871 fs.close().await.expect("close failed");
2872 }
2873
2874 #[cfg(target_os = "fuchsia")]
2876 #[fuchsia::test(allow_stalls = false)]
2877 async fn test_watchdog() {
2878 use super::Watchdog;
2879 use fuchsia_async::{MonotonicDuration, MonotonicInstant, TestExecutor};
2880 use std::sync::mpsc::channel;
2881
2882 TestExecutor::advance_to(make_time(0)).await;
2883 let (sender, receiver) = channel();
2884
2885 fn make_time(time_secs: i64) -> MonotonicInstant {
2886 MonotonicInstant::from_nanos(0) + MonotonicDuration::from_seconds(time_secs)
2887 }
2888
2889 {
2890 let _watchdog = Watchdog::new(10, move |count| {
2891 sender.send(count).expect("Sending value");
2892 });
2893
2894 TestExecutor::advance_to(make_time(5)).await;
2896 receiver.try_recv().expect_err("Should not have message");
2897
2898 TestExecutor::advance_to(make_time(10)).await;
2900 assert_eq!(1, receiver.recv().expect("Receiving"));
2901
2902 TestExecutor::advance_to(make_time(15)).await;
2904 receiver.try_recv().expect_err("Should not have message");
2905
2906 TestExecutor::advance_to(make_time(30)).await;
2908 assert_eq!(2, receiver.recv().expect("Receiving"));
2909 assert_eq!(3, receiver.recv().expect("Receiving"));
2910 }
2911
2912 TestExecutor::advance_to(make_time(100)).await;
2914 receiver.recv().expect_err("Watchdog should be gone");
2915 }
2916
2917 #[fuchsia::test]
2918 fn test_checksum_range_chunk() {
2919 let block_size = 4096;
2920
2921 assert_eq!(
2923 ChecksumRangeChunk::group_first_write_ranges(
2924 &mut OverwriteBitmaps::None,
2925 block_size,
2926 block_size * 2..block_size * 5,
2927 ),
2928 vec![ChecksumRangeChunk {
2929 checksum_range: 0..3,
2930 device_range: block_size * 2..block_size * 5,
2931 is_first_write: false,
2932 }],
2933 );
2934
2935 let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2936 assert_eq!(
2937 ChecksumRangeChunk::group_first_write_ranges(
2938 &mut bitmaps,
2939 block_size,
2940 block_size * 2..block_size * 5,
2941 ),
2942 vec![ChecksumRangeChunk {
2943 checksum_range: 0..3,
2944 device_range: block_size * 2..block_size * 5,
2945 is_first_write: false,
2946 }],
2947 );
2948 assert_eq!(
2949 bitmaps.take_bitmaps(),
2950 Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b11100000])))
2951 );
2952
2953 let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2954 bitmaps.set_offset(2);
2955 assert_eq!(
2956 ChecksumRangeChunk::group_first_write_ranges(
2957 &mut bitmaps,
2958 block_size,
2959 block_size * 2..block_size * 5,
2960 ),
2961 vec![
2962 ChecksumRangeChunk {
2963 checksum_range: 0..2,
2964 device_range: block_size * 2..block_size * 4,
2965 is_first_write: false,
2966 },
2967 ChecksumRangeChunk {
2968 checksum_range: 2..3,
2969 device_range: block_size * 4..block_size * 5,
2970 is_first_write: true,
2971 },
2972 ],
2973 );
2974 assert_eq!(
2975 bitmaps.take_bitmaps(),
2976 Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00111000])))
2977 );
2978
2979 let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b11110000]));
2980 bitmaps.set_offset(4);
2981 assert_eq!(
2982 ChecksumRangeChunk::group_first_write_ranges(
2983 &mut bitmaps,
2984 block_size,
2985 block_size * 2..block_size * 5,
2986 ),
2987 vec![ChecksumRangeChunk {
2988 checksum_range: 0..3,
2989 device_range: block_size * 2..block_size * 5,
2990 is_first_write: true,
2991 }],
2992 );
2993 assert_eq!(
2994 bitmaps.take_bitmaps(),
2995 Some((BitVec::from_bytes(&[0b11110000]), BitVec::from_bytes(&[0b00001110])))
2996 );
2997
2998 let mut bitmaps = OverwriteBitmaps::new(BitVec::from_bytes(&[0b01010101]));
2999 assert_eq!(
3000 ChecksumRangeChunk::group_first_write_ranges(
3001 &mut bitmaps,
3002 block_size,
3003 block_size * 2..block_size * 10,
3004 ),
3005 vec![
3006 ChecksumRangeChunk {
3007 checksum_range: 0..1,
3008 device_range: block_size * 2..block_size * 3,
3009 is_first_write: true,
3010 },
3011 ChecksumRangeChunk {
3012 checksum_range: 1..2,
3013 device_range: block_size * 3..block_size * 4,
3014 is_first_write: false,
3015 },
3016 ChecksumRangeChunk {
3017 checksum_range: 2..3,
3018 device_range: block_size * 4..block_size * 5,
3019 is_first_write: true,
3020 },
3021 ChecksumRangeChunk {
3022 checksum_range: 3..4,
3023 device_range: block_size * 5..block_size * 6,
3024 is_first_write: false,
3025 },
3026 ChecksumRangeChunk {
3027 checksum_range: 4..5,
3028 device_range: block_size * 6..block_size * 7,
3029 is_first_write: true,
3030 },
3031 ChecksumRangeChunk {
3032 checksum_range: 5..6,
3033 device_range: block_size * 7..block_size * 8,
3034 is_first_write: false,
3035 },
3036 ChecksumRangeChunk {
3037 checksum_range: 6..7,
3038 device_range: block_size * 8..block_size * 9,
3039 is_first_write: true,
3040 },
3041 ChecksumRangeChunk {
3042 checksum_range: 7..8,
3043 device_range: block_size * 9..block_size * 10,
3044 is_first_write: false,
3045 },
3046 ],
3047 );
3048 assert_eq!(
3049 bitmaps.take_bitmaps(),
3050 Some((BitVec::from_bytes(&[0b01010101]), BitVec::from_bytes(&[0b11111111])))
3051 );
3052 }
3053}