1use crate::errors::FxfsError;
6use crate::log::*;
7use crate::lsm_tree::types::{ItemRef, LayerIterator};
8use crate::lsm_tree::Query;
9use crate::object_handle::{
10 ObjectHandle, ObjectProperties, ReadObjectHandle, WriteBytes, WriteObjectHandle,
11};
12use crate::object_store::extent_record::{ExtentKey, ExtentMode, ExtentValue};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::object_record::{
15 AttributeKey, FsverityMetadata, ObjectAttributes, ObjectItem, ObjectKey, ObjectKeyData,
16 ObjectKind, ObjectValue, Timestamp,
17};
18use crate::object_store::store_object_handle::{MaybeChecksums, NeedsTrim};
19use crate::object_store::transaction::{
20 self, lock_keys, AssocObj, AssociatedObject, LockKey, Mutation, ObjectStoreMutation, Operation,
21 Options, Transaction,
22};
23use crate::object_store::{
24 HandleOptions, HandleOwner, RootDigest, StoreObjectHandle, TrimMode, TrimResult,
25 DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID, TRANSACTION_MUTATION_THRESHOLD,
26};
27use crate::range::RangeExt;
28use crate::round::{round_down, round_up};
29use anyhow::{anyhow, bail, ensure, Context, Error};
30use async_trait::async_trait;
31use fidl_fuchsia_io as fio;
32use fsverity_merkle::{FsVerityHasher, FsVerityHasherOptions, MerkleTreeBuilder};
33use fuchsia_sync::Mutex;
34use futures::stream::FuturesUnordered;
35use futures::TryStreamExt;
36use fxfs_trace::trace;
37use mundane::hash::{Digest, Hasher, Sha256, Sha512};
38use std::cmp::min;
39use std::ops::{Deref, DerefMut, Range};
40use std::sync::atomic::{self, AtomicU64, Ordering};
41use std::sync::Arc;
42use storage_device::buffer::{Buffer, BufferFuture, BufferRef, MutableBufferRef};
43
44mod allocated_ranges;
45pub use allocated_ranges::{AllocatedRanges, RangeType};
46
47pub const WRITE_ATTR_BATCH_SIZE: usize = 524_288;
50
51pub struct DataObjectHandle<S: HandleOwner> {
59 handle: StoreObjectHandle<S>,
60 attribute_id: u64,
61 content_size: AtomicU64,
62 fsverity_state: Mutex<FsverityState>,
63 overwrite_ranges: AllocatedRanges,
64}
65
66#[derive(Debug)]
67pub enum FsverityState {
68 None,
69 Started,
70 Pending(FsverityStateInner),
71 Some(FsverityStateInner),
72}
73
74#[derive(Debug)]
75pub struct FsverityStateInner {
76 descriptor: FsverityMetadata,
77 merkle_tree: Box<[u8]>,
80}
81
82impl FsverityStateInner {
83 pub fn new(descriptor: FsverityMetadata, merkle_tree: Box<[u8]>) -> Self {
84 FsverityStateInner { descriptor, merkle_tree }
85 }
86}
87
88impl<S: HandleOwner> Deref for DataObjectHandle<S> {
89 type Target = StoreObjectHandle<S>;
90 fn deref(&self) -> &Self::Target {
91 &self.handle
92 }
93}
94
95impl<S: HandleOwner> DataObjectHandle<S> {
96 pub fn new(
97 owner: Arc<S>,
98 object_id: u64,
99 permanent_keys: bool,
100 attribute_id: u64,
101 size: u64,
102 fsverity_state: FsverityState,
103 options: HandleOptions,
104 trace: bool,
105 overwrite_ranges: Vec<Range<u64>>,
106 ) -> Self {
107 Self {
108 handle: StoreObjectHandle::new(owner, object_id, permanent_keys, options, trace),
109 attribute_id,
110 content_size: AtomicU64::new(size),
111 fsverity_state: Mutex::new(fsverity_state),
112 overwrite_ranges: AllocatedRanges::new(overwrite_ranges),
113 }
114 }
115
116 pub fn attribute_id(&self) -> u64 {
117 self.attribute_id
118 }
119
120 pub fn overwrite_ranges(&self) -> &AllocatedRanges {
121 &self.overwrite_ranges
122 }
123
124 pub fn is_verified_file(&self) -> bool {
125 matches!(*self.fsverity_state.lock(), FsverityState::Some(_))
126 }
127
128 pub fn set_fsverity_state_started(&self) -> Result<(), Error> {
133 let mut fsverity_guard = self.fsverity_state.lock();
134 match *fsverity_guard {
135 FsverityState::None => {
136 *fsverity_guard = FsverityState::Started;
137 Ok(())
138 }
139 FsverityState::Started | FsverityState::Pending(_) => {
140 Err(anyhow!(FxfsError::Unavailable))
141 }
142 FsverityState::Some(_) => Err(anyhow!(FxfsError::AlreadyExists)),
143 }
144 }
145
146 pub fn set_fsverity_state_pending(&self, descriptor: FsverityMetadata, merkle_tree: Box<[u8]>) {
149 let mut fsverity_guard = self.fsverity_state.lock();
150 assert!(matches!(*fsverity_guard, FsverityState::Started));
151 *fsverity_guard = FsverityState::Pending(FsverityStateInner { descriptor, merkle_tree });
152 }
153
154 pub fn finalize_fsverity_state(&self) {
157 let mut fsverity_state_guard = self.fsverity_state.lock();
158 let mut_fsverity_state = fsverity_state_guard.deref_mut();
159 let fsverity_state = std::mem::replace(mut_fsverity_state, FsverityState::None);
160 match fsverity_state {
161 FsverityState::None => panic!("Cannot go from FsverityState::None to Some"),
162 FsverityState::Started => panic!("Cannot go from FsverityState::Started to Some"),
163 FsverityState::Pending(inner) => *mut_fsverity_state = FsverityState::Some(inner),
164 FsverityState::Some(_) => panic!("Fsverity state was already set to Some"),
165 }
166 self.overwrite_ranges.clear();
171 }
172
173 pub fn set_fsverity_state_some(&self, descriptor: FsverityMetadata, merkle_tree: Box<[u8]>) {
176 let mut fsverity_guard = self.fsverity_state.lock();
177 assert!(matches!(*fsverity_guard, FsverityState::None));
178 *fsverity_guard = FsverityState::Some(FsverityStateInner { descriptor, merkle_tree });
179 }
180
181 async fn verify_data(&self, mut offset: usize, buffer: &[u8]) -> Result<(), Error> {
185 let block_size = self.block_size() as usize;
186 assert!(offset % block_size == 0);
187 let fsverity_state = self.fsverity_state.lock();
188 match &*fsverity_state {
189 FsverityState::None => {
190 Err(anyhow!("Tried to verify read on a non verity-enabled file"))
191 }
192 FsverityState::Started | FsverityState::Pending(_) => Err(anyhow!(
193 "Enable verity has not yet completed, fsverity state: {:?}",
194 &*fsverity_state
195 )),
196 FsverityState::Some(metadata) => {
197 let (hasher, digest_size) = match metadata.descriptor.root_digest {
198 RootDigest::Sha256(_) => {
199 let hasher = FsVerityHasher::Sha256(FsVerityHasherOptions::new(
200 metadata.descriptor.salt.clone(),
201 block_size,
202 ));
203 (hasher, <Sha256 as Hasher>::Digest::DIGEST_LEN)
204 }
205 RootDigest::Sha512(_) => {
206 let hasher = FsVerityHasher::Sha512(FsVerityHasherOptions::new(
207 metadata.descriptor.salt.clone(),
208 block_size,
209 ));
210 (hasher, <Sha512 as Hasher>::Digest::DIGEST_LEN)
211 }
212 };
213 let leaf_nodes: Vec<&[u8]> = metadata.merkle_tree.chunks(digest_size).collect();
214 fxfs_trace::duration!(c"fsverity-verify", "len" => buffer.len());
215 for b in buffer.chunks(block_size) {
217 ensure!(
218 hasher.hash_block(b) == leaf_nodes[offset / block_size],
219 anyhow!(FxfsError::Inconsistent).context("Hash mismatch")
220 );
221 offset += block_size;
222 }
223 Ok(())
224 }
225 }
226 }
227
228 pub async fn extend<'a>(
231 &'a self,
232 transaction: &mut Transaction<'a>,
233 device_range: Range<u64>,
234 ) -> Result<(), Error> {
235 let old_end =
236 round_up(self.txn_get_size(transaction), self.block_size()).ok_or(FxfsError::TooBig)?;
237 let new_size = old_end + device_range.end - device_range.start;
238 self.store()
239 .allocator()
240 .mark_allocated(transaction, self.store().store_object_id(), device_range.clone())
241 .await?;
242 self.txn_update_size(transaction, new_size, None).await?;
243 let key_id = self.get_key(None).await?.map_or(0, |k| k.key_id());
244 transaction.add(
245 self.store().store_object_id,
246 Mutation::merge_object(
247 ObjectKey::extent(self.object_id(), self.attribute_id(), old_end..new_size),
248 ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
249 ),
250 );
251 self.update_allocated_size(transaction, device_range.end - device_range.start, 0).await
252 }
253
254 async fn align_buffer(
257 &self,
258 offset: u64,
259 buf: BufferRef<'_>,
260 ) -> Result<(std::ops::Range<u64>, Buffer<'_>), Error> {
261 self.handle.align_buffer(self.attribute_id(), offset, buf).await
262 }
263
264 async fn write_at(
269 &self,
270 offset: u64,
271 buf: MutableBufferRef<'_>,
272 device_offset: u64,
273 ) -> Result<MaybeChecksums, Error> {
274 self.handle.write_at(self.attribute_id(), offset, buf, None, device_offset).await
275 }
276
277 pub async fn zero(
279 &self,
280 transaction: &mut Transaction<'_>,
281 range: Range<u64>,
282 ) -> Result<(), Error> {
283 self.handle.zero(transaction, self.attribute_id(), range).await
284 }
285
286 pub async fn get_descriptor(
290 &self,
291 ) -> Result<Option<(fio::VerificationOptions, Vec<u8>)>, Error> {
292 let fsverity_state = self.fsverity_state.lock();
293 match &*fsverity_state {
294 FsverityState::None => Ok(None),
295 FsverityState::Started | FsverityState::Pending(_) => Err(anyhow!(
296 "Enable verity has not yet completed, fsverity state: {:?}",
297 &*fsverity_state
298 )),
299 FsverityState::Some(metadata) => {
300 let (options, root_hash) = match &metadata.descriptor.root_digest {
301 RootDigest::Sha256(root_hash) => {
302 let mut root_vec = root_hash.to_vec();
303 root_vec.extend_from_slice(&[0; 32]);
305 (
306 fio::VerificationOptions {
307 hash_algorithm: Some(fio::HashAlgorithm::Sha256),
308 salt: Some(metadata.descriptor.salt.clone()),
309 ..Default::default()
310 },
311 root_vec,
312 )
313 }
314 RootDigest::Sha512(root_hash) => (
315 fio::VerificationOptions {
316 hash_algorithm: Some(fio::HashAlgorithm::Sha512),
317 salt: Some(metadata.descriptor.salt.clone()),
318 ..Default::default()
319 },
320 root_hash.clone(),
321 ),
322 };
323 Ok(Some((options, root_hash)))
324 }
325 }
326 }
327
328 #[trace]
335 pub async fn enable_verity(&self, options: fio::VerificationOptions) -> Result<(), Error> {
336 self.set_fsverity_state_started()?;
337 if let Some(_) = self
340 .store()
341 .tree()
342 .find(&ObjectKey::graveyard_attribute_entry(
343 self.store().graveyard_directory_object_id(),
344 self.object_id(),
345 FSVERITY_MERKLE_ATTRIBUTE_ID,
346 ))
347 .await?
348 {
349 self.store().filesystem().graveyard().flush().await;
350 }
351 let mut transaction = self.new_transaction().await?;
352 let hash_alg =
353 options.hash_algorithm.ok_or_else(|| anyhow!("No hash algorithm provided"))?;
354 let salt = options.salt.ok_or_else(|| anyhow!("No salt provided"))?;
355 let (root_digest, merkle_tree) = match hash_alg {
356 fio::HashAlgorithm::Sha256 => {
357 let hasher = FsVerityHasher::Sha256(FsVerityHasherOptions::new(
358 salt.clone(),
359 self.block_size() as usize,
360 ));
361 let mut builder = MerkleTreeBuilder::new(hasher);
362 let mut offset = 0;
363 let size = self.get_size();
364 let mut buf = self.allocate_buffer(64 * self.block_size() as usize).await;
367 while offset < size {
368 let read = self.read(offset, buf.as_mut()).await? as u64;
370 assert!(offset + read <= size);
371 builder.write(&buf.as_slice()[0..read as usize]);
372 offset += read;
373 }
374 let tree = builder.finish();
375 let merkle_leaf_nodes: Vec<u8> =
376 tree.as_ref()[0].iter().flat_map(|x| x.clone()).collect();
377 self.handle
381 .write_new_attr_in_batches(
382 &mut transaction,
383 FSVERITY_MERKLE_ATTRIBUTE_ID,
384 &merkle_leaf_nodes,
385 WRITE_ATTR_BATCH_SIZE,
386 )
387 .await?;
388 let root: [u8; 32] = tree.root().try_into().unwrap();
389 (RootDigest::Sha256(root), merkle_leaf_nodes)
390 }
391 fio::HashAlgorithm::Sha512 => {
392 let hasher = FsVerityHasher::Sha512(FsVerityHasherOptions::new(
393 salt.clone(),
394 self.block_size() as usize,
395 ));
396 let mut builder = MerkleTreeBuilder::new(hasher);
397 let mut offset = 0;
398 let size = self.get_size();
399 let mut buf = self.allocate_buffer(64 * self.block_size() as usize).await;
402 while offset < size {
403 let read = self.read(offset, buf.as_mut()).await? as u64;
405 assert!(offset + read <= size);
406 builder.write(&buf.as_slice()[0..read as usize]);
407 offset += read;
408 }
409 let tree = builder.finish();
410 let merkle_leaf_nodes: Vec<u8> =
411 tree.as_ref()[0].iter().flat_map(|x| x.clone()).collect();
412 self.handle
416 .write_new_attr_in_batches(
417 &mut transaction,
418 FSVERITY_MERKLE_ATTRIBUTE_ID,
419 &merkle_leaf_nodes,
420 WRITE_ATTR_BATCH_SIZE,
421 )
422 .await?;
423 (RootDigest::Sha512(tree.root().to_vec()), merkle_leaf_nodes)
424 }
425 _ => {
426 bail!(anyhow!(FxfsError::NotSupported)
427 .context(format!("hash algorithm not supported")));
428 }
429 };
430 if merkle_tree.len() > WRITE_ATTR_BATCH_SIZE {
431 transaction.add(
432 self.store().store_object_id,
433 Mutation::replace_or_insert_object(
434 ObjectKey::graveyard_attribute_entry(
435 self.store().graveyard_directory_object_id(),
436 self.object_id(),
437 FSVERITY_MERKLE_ATTRIBUTE_ID,
438 ),
439 ObjectValue::None,
440 ),
441 );
442 };
443 let descriptor = FsverityMetadata { root_digest, salt };
444 self.set_fsverity_state_pending(descriptor.clone(), merkle_tree.into());
445 transaction.add_with_object(
446 self.store().store_object_id(),
447 Mutation::replace_or_insert_object(
448 ObjectKey::attribute(
449 self.object_id(),
450 DEFAULT_DATA_ATTRIBUTE_ID,
451 AttributeKey::Attribute,
452 ),
453 ObjectValue::verified_attribute(self.get_size(), descriptor),
454 ),
455 AssocObj::Borrowed(self),
456 );
457 transaction.commit().await?;
458 Ok(())
459 }
460
461 pub async fn allocate(&self, range: Range<u64>) -> Result<(), Error> {
464 debug_assert!(range.start < range.end);
465
466 let mut new_range = range.clone();
471 new_range.start = round_down(new_range.start, self.block_size());
472 new_range.end = round_up(new_range.end, self.block_size()).ok_or(FxfsError::TooBig)?;
475
476 let mut transaction = self.new_transaction().await?;
477 let mut to_allocate = Vec::new();
478 let mut to_switch = Vec::new();
479 let key_id = self.get_key(None).await?.map_or(0, |k| k.key_id());
480
481 {
482 let tree = &self.store().tree;
483 let layer_set = tree.layer_set();
484 let offset_key = ObjectKey::attribute(
485 self.object_id(),
486 self.attribute_id(),
487 AttributeKey::Extent(ExtentKey::search_key_from_offset(new_range.start)),
488 );
489 let mut merger = layer_set.merger();
490 let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
491
492 loop {
493 match iter.get() {
494 Some(ItemRef {
495 key:
496 ObjectKey {
497 object_id,
498 data:
499 ObjectKeyData::Attribute(
500 attribute_id,
501 AttributeKey::Extent(extent_key),
502 ),
503 },
504 value: ObjectValue::Extent(extent_value),
505 ..
506 }) if *object_id == self.object_id()
507 && *attribute_id == self.attribute_id() =>
508 {
509 if new_range.end <= extent_key.range.start {
512 break;
513 }
514 if new_range.start < extent_key.range.start {
516 to_allocate.push(new_range.start..extent_key.range.start);
517 new_range.start = extent_key.range.start;
518 }
519 let device_offset = match extent_value {
520 ExtentValue::None => {
521 iter.advance().await?;
526 continue;
527 }
528 ExtentValue::Some { mode: ExtentMode::OverwritePartial(_), .. }
529 | ExtentValue::Some { mode: ExtentMode::Overwrite, .. } => {
530 if extent_key.range.end < new_range.end {
532 new_range.start = extent_key.range.end;
533 iter.advance().await?;
534 continue;
535 } else {
536 new_range.start = new_range.end;
537 break;
538 }
539 }
540 ExtentValue::Some { device_offset, .. } => *device_offset,
541 };
542
543 let device_offset =
545 device_offset + (new_range.start - extent_key.range.start);
546 if extent_key.range.end < new_range.end {
547 to_switch.push((new_range.start..extent_key.range.end, device_offset));
548 new_range.start = extent_key.range.end;
549 } else {
550 to_switch.push((new_range.start..new_range.end, device_offset));
551 new_range.start = new_range.end;
552 break;
553 }
554 }
555 _ => break,
559 }
560 iter.advance().await?;
561 }
562 }
563
564 if new_range.start < new_range.end {
565 to_allocate.push(new_range.clone());
566 }
567
568 let new_size = std::cmp::max(range.end, self.get_size());
577 transaction.add_with_object(
582 self.store().store_object_id(),
583 Mutation::replace_or_insert_object(
584 ObjectKey::attribute(
585 self.object_id(),
586 self.attribute_id(),
587 AttributeKey::Attribute,
588 ),
589 ObjectValue::Attribute { size: new_size, has_overwrite_extents: true },
590 ),
591 AssocObj::Borrowed(self),
592 );
593
594 const MAX_TRANSACTION_SIZE: usize = 256;
598 for (switch_range, device_offset) in to_switch {
599 transaction.add_with_object(
600 self.store().store_object_id(),
601 Mutation::merge_object(
602 ObjectKey::extent(self.object_id(), self.attribute_id(), switch_range),
603 ObjectValue::Extent(ExtentValue::initialized_overwrite_extent(
604 device_offset,
605 key_id,
606 )),
607 ),
608 AssocObj::Borrowed(self),
609 );
610 if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
611 transaction.commit_and_continue().await?;
612 }
613 }
614
615 let mut allocated = 0;
616 let allocator = self.store().allocator();
617 for mut allocate_range in to_allocate {
618 while allocate_range.start < allocate_range.end {
619 let device_range = allocator
620 .allocate(
621 &mut transaction,
622 self.store().store_object_id(),
623 allocate_range.end - allocate_range.start,
624 )
625 .await
626 .context("allocation failed")?;
627 let device_range_len = device_range.end - device_range.start;
628
629 transaction.add_with_object(
630 self.store().store_object_id(),
631 Mutation::merge_object(
632 ObjectKey::extent(
633 self.object_id(),
634 self.attribute_id(),
635 allocate_range.start..allocate_range.start + device_range_len,
636 ),
637 ObjectValue::Extent(ExtentValue::blank_overwrite_extent(
638 device_range.start,
639 (device_range_len / self.block_size()) as usize,
640 key_id,
641 )),
642 ),
643 AssocObj::Borrowed(self),
644 );
645
646 allocate_range.start += device_range_len;
647 allocated += device_range_len;
648
649 if transaction.mutations().len() >= MAX_TRANSACTION_SIZE {
650 self.update_allocated_size(&mut transaction, allocated, 0).await?;
651 transaction.commit_and_continue().await?;
652 allocated = 0;
653 }
654 }
655 }
656
657 self.update_allocated_size(&mut transaction, allocated, 0).await?;
658 transaction.commit().await?;
659
660 Ok(())
661 }
662
663 pub async fn is_allocated(&self, start_offset: u64) -> Result<(bool, u64), Error> {
669 let block_size = self.block_size();
670 assert_eq!(start_offset % block_size, 0);
671
672 if start_offset > self.get_size() {
673 bail!(FxfsError::OutOfRange)
674 }
675
676 if start_offset == self.get_size() {
677 return Ok((false, 0));
678 }
679
680 let tree = &self.store().tree;
681 let layer_set = tree.layer_set();
682 let offset_key = ObjectKey::attribute(
683 self.object_id(),
684 self.attribute_id(),
685 AttributeKey::Extent(ExtentKey::search_key_from_offset(start_offset)),
686 );
687 let mut merger = layer_set.merger();
688 let mut iter = merger.query(Query::FullRange(&offset_key)).await?;
689
690 let mut allocated = None;
691 let mut end = start_offset;
692
693 loop {
694 match iter.get() {
697 Some(ItemRef {
698 key:
699 ObjectKey {
700 object_id,
701 data:
702 ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
703 },
704 value: ObjectValue::Extent(extent_value),
705 ..
706 }) => {
707 if *object_id != self.object_id() || *attribute_id != self.attribute_id() {
709 if allocated == Some(false) || allocated.is_none() {
710 end = self.get_size();
711 allocated = Some(false);
712 }
713 break;
714 }
715 ensure!(extent_key.range.is_aligned(block_size), FxfsError::Inconsistent);
716 if extent_key.range.start > end {
717 if allocated == Some(true) {
721 break;
722 } else {
723 end = extent_key.range.start;
725 allocated = Some(false);
726 }
730 }
731
732 match extent_value {
735 ExtentValue::Some { .. } => {
737 if allocated == Some(false) {
739 break;
740 }
741 allocated = Some(true);
742 }
743 ExtentValue::None => {
745 if allocated == Some(true) {
747 break;
748 }
749 allocated = Some(false);
750 }
751 }
752 end = extent_key.range.end;
753 }
754 None => {
756 if allocated == Some(false) || allocated.is_none() {
757 end = self.get_size();
758 allocated = Some(false);
759 }
760 break;
762 }
763 Some(_) => {}
765 }
766 iter.advance().await?;
767 }
768
769 Ok((allocated.unwrap(), end - start_offset))
770 }
771
772 pub async fn txn_write<'a>(
773 &'a self,
774 transaction: &mut Transaction<'a>,
775 offset: u64,
776 buf: BufferRef<'_>,
777 ) -> Result<(), Error> {
778 if buf.is_empty() {
779 return Ok(());
780 }
781 let (aligned, mut transfer_buf) = self.align_buffer(offset, buf).await?;
782 self.multi_write(
783 transaction,
784 self.attribute_id(),
785 &[aligned.clone()],
786 transfer_buf.as_mut(),
787 )
788 .await?;
789 if offset + buf.len() as u64 > self.txn_get_size(transaction) {
790 self.txn_update_size(transaction, offset + buf.len() as u64, None).await?;
791 }
792 Ok(())
793 }
794
795 pub async fn multi_write<'a>(
799 &'a self,
800 transaction: &mut Transaction<'a>,
801 attribute_id: u64,
802 ranges: &[Range<u64>],
803 buf: MutableBufferRef<'_>,
804 ) -> Result<(), Error> {
805 self.handle.multi_write(transaction, attribute_id, None, ranges, buf).await
806 }
807
808 pub async fn overwrite(
819 &self,
820 mut offset: u64,
821 mut buf: MutableBufferRef<'_>,
822 allow_allocations: bool,
823 ) -> Result<(), Error> {
824 assert_eq!((buf.len() as u32) % self.store().device.block_size(), 0);
825 let end = offset + buf.len() as u64;
826
827 let key_id = self.get_key(None).await?.map_or(0, |k| k.key_id());
828
829 let mut transaction =
831 if allow_allocations { Some(self.new_transaction().await?) } else { None };
832
833 let writes = FuturesUnordered::new();
835
836 {
839 let store = self.store();
840 let store_object_id = store.store_object_id;
841 let allocator = store.allocator();
842 let tree = &store.tree;
843 let layer_set = tree.layer_set();
844 let mut merger = layer_set.merger();
845 let mut iter = merger
846 .query(Query::FullRange(&ObjectKey::attribute(
847 self.object_id(),
848 self.attribute_id(),
849 AttributeKey::Extent(ExtentKey::search_key_from_offset(offset)),
850 )))
851 .await?;
852 let block_size = self.block_size();
853
854 loop {
855 let (device_offset, bytes_to_write, should_advance) = match iter.get() {
856 Some(ItemRef {
857 key:
858 ObjectKey {
859 object_id,
860 data:
861 ObjectKeyData::Attribute(
862 attribute_id,
863 AttributeKey::Extent(ExtentKey { range }),
864 ),
865 },
866 value: ObjectValue::Extent(ExtentValue::Some { .. }),
867 ..
868 }) if *object_id == self.object_id()
869 && *attribute_id == self.attribute_id()
870 && range.end == offset =>
871 {
872 iter.advance().await?;
873 continue;
874 }
875 Some(ItemRef {
876 key:
877 ObjectKey {
878 object_id,
879 data:
880 ObjectKeyData::Attribute(
881 attribute_id,
882 AttributeKey::Extent(ExtentKey { range }),
883 ),
884 },
885 value,
886 ..
887 }) if *object_id == self.object_id()
888 && *attribute_id == self.attribute_id()
889 && range.start <= offset =>
890 {
891 match value {
892 ObjectValue::Extent(ExtentValue::Some {
893 device_offset,
894 mode: ExtentMode::Raw,
895 ..
896 }) => {
897 ensure!(
898 range.is_aligned(block_size) && device_offset % block_size == 0,
899 FxfsError::Inconsistent
900 );
901 let offset_within_extent = offset - range.start;
902 let remaining_length_of_extent = (range
903 .end
904 .checked_sub(offset)
905 .ok_or(FxfsError::Inconsistent)?)
906 as usize;
907 (
909 device_offset + offset_within_extent,
910 min(buf.len(), remaining_length_of_extent),
911 true,
912 )
913 }
914 ObjectValue::Extent(ExtentValue::Some { .. }) => {
915 bail!(
918 "extent from ({},{}) which overlaps offset \
919 {} has the wrong extent mode",
920 range.start,
921 range.end,
922 offset
923 )
924 }
925 _ => {
926 bail!(
927 "overwrite failed: extent overlapping offset {} has \
928 unexpected ObjectValue",
929 offset
930 )
931 }
932 }
933 }
934 maybe_item_ref => {
935 if let Some(transaction) = transaction.as_mut() {
936 assert_eq!(allow_allocations, true);
937 assert_eq!(offset % self.block_size(), 0);
938
939 let mut bytes_to_allocate =
943 round_up(buf.len() as u64, self.block_size())
944 .ok_or(FxfsError::TooBig)?;
945 if let Some(ItemRef {
946 key:
947 ObjectKey {
948 object_id,
949 data:
950 ObjectKeyData::Attribute(
951 attribute_id,
952 AttributeKey::Extent(ExtentKey { range }),
953 ),
954 },
955 ..
956 }) = maybe_item_ref
957 {
958 if *object_id == self.object_id()
959 && *attribute_id == self.attribute_id()
960 && offset < range.start
961 {
962 let bytes_until_next_extent = range.start - offset;
963 bytes_to_allocate =
964 min(bytes_to_allocate, bytes_until_next_extent);
965 }
966 }
967
968 let device_range = allocator
969 .allocate(transaction, store_object_id, bytes_to_allocate)
970 .await?;
971 let device_range_len = device_range.end - device_range.start;
972 transaction.add(
973 store_object_id,
974 Mutation::insert_object(
975 ObjectKey::extent(
976 self.object_id(),
977 self.attribute_id(),
978 offset..offset + device_range_len,
979 ),
980 ObjectValue::Extent(ExtentValue::new_raw(
981 device_range.start,
982 key_id,
983 )),
984 ),
985 );
986
987 self.update_allocated_size(transaction, device_range_len, 0).await?;
988
989 (device_range.start, min(buf.len(), device_range_len as usize), false)
991 } else {
992 bail!(
993 "no extent overlapping offset {}, \
994 and new allocations are not allowed",
995 offset
996 )
997 }
998 }
999 };
1000 let (current_buf, remaining_buf) = buf.split_at_mut(bytes_to_write);
1001 writes.push(self.write_at(offset, current_buf, device_offset));
1002 if remaining_buf.len() == 0 {
1003 break;
1004 } else {
1005 buf = remaining_buf;
1006 offset += bytes_to_write as u64;
1007 if should_advance {
1008 iter.advance().await?;
1009 }
1010 }
1011 }
1012 }
1013
1014 self.store().logical_write_ops.fetch_add(1, Ordering::Relaxed);
1015 writes.try_collect::<Vec<MaybeChecksums>>().await?;
1017
1018 if let Some(mut transaction) = transaction {
1019 assert_eq!(allow_allocations, true);
1020 if !transaction.is_empty() {
1021 if end > self.get_size() {
1022 self.grow(&mut transaction, self.get_size(), end).await?;
1023 }
1024 transaction.commit().await?;
1025 }
1026 }
1027
1028 Ok(())
1029 }
1030
1031 fn txn_get_size(&self, transaction: &Transaction<'_>) -> u64 {
1034 transaction
1035 .get_object_mutation(
1036 self.store().store_object_id,
1037 ObjectKey::attribute(
1038 self.object_id(),
1039 self.attribute_id(),
1040 AttributeKey::Attribute,
1041 ),
1042 )
1043 .and_then(|m| {
1044 if let ObjectItem { value: ObjectValue::Attribute { size, .. }, .. } = m.item {
1045 Some(size)
1046 } else {
1047 None
1048 }
1049 })
1050 .unwrap_or_else(|| self.get_size())
1051 }
1052
1053 pub async fn txn_update_size<'a>(
1054 &'a self,
1055 transaction: &mut Transaction<'a>,
1056 new_size: u64,
1057 update_has_overwrite_extents: Option<bool>,
1060 ) -> Result<(), Error> {
1061 let key =
1062 ObjectKey::attribute(self.object_id(), self.attribute_id(), AttributeKey::Attribute);
1063 let mut mutation = if let Some(mutation) =
1064 transaction.get_object_mutation(self.store().store_object_id(), key.clone())
1065 {
1066 mutation.clone()
1067 } else {
1068 ObjectStoreMutation {
1069 item: self.store().tree().find(&key).await?.ok_or(FxfsError::NotFound)?,
1070 op: Operation::ReplaceOrInsert,
1071 }
1072 };
1073 if let ObjectValue::Attribute { size, has_overwrite_extents } = &mut mutation.item.value {
1074 *size = new_size;
1075 if let Some(update_has_overwrite_extents) = update_has_overwrite_extents {
1076 *has_overwrite_extents = update_has_overwrite_extents;
1077 }
1078 } else {
1079 bail!(anyhow!(FxfsError::Inconsistent).context("Unexpected object value"));
1080 }
1081 transaction.add_with_object(
1082 self.store().store_object_id(),
1083 Mutation::ObjectStore(mutation),
1084 AssocObj::Borrowed(self),
1085 );
1086 Ok(())
1087 }
1088
1089 async fn update_allocated_size(
1090 &self,
1091 transaction: &mut Transaction<'_>,
1092 allocated: u64,
1093 deallocated: u64,
1094 ) -> Result<(), Error> {
1095 self.handle.update_allocated_size(transaction, allocated, deallocated).await
1096 }
1097
1098 pub fn truncate_overwrite_ranges(&self, size: u64) -> Result<Option<bool>, Error> {
1099 if self
1100 .overwrite_ranges
1101 .truncate(round_up(size, self.block_size()).ok_or(FxfsError::TooBig)?)
1102 {
1103 Ok(Some(false))
1106 } else {
1107 Ok(None)
1108 }
1109 }
1110
1111 pub async fn shrink<'a>(
1112 &'a self,
1113 transaction: &mut Transaction<'a>,
1114 size: u64,
1115 update_has_overwrite_extents: Option<bool>,
1116 ) -> Result<NeedsTrim, Error> {
1117 let needs_trim = self.handle.shrink(transaction, self.attribute_id(), size).await?;
1118 self.txn_update_size(transaction, size, update_has_overwrite_extents).await?;
1119 Ok(needs_trim)
1120 }
1121
1122 pub async fn grow<'a>(
1123 &'a self,
1124 transaction: &mut Transaction<'a>,
1125 old_size: u64,
1126 size: u64,
1127 ) -> Result<(), Error> {
1128 let store = self.store();
1130 while matches!(
1131 store
1132 .trim_some(
1133 transaction,
1134 self.object_id(),
1135 self.attribute_id(),
1136 TrimMode::FromOffset(old_size)
1137 )
1138 .await?,
1139 TrimResult::Incomplete
1140 ) {
1141 transaction.commit_and_continue().await?;
1142 }
1143 let block_size = self.block_size();
1145 if old_size % block_size != 0 {
1146 let layer_set = store.tree.layer_set();
1147 let mut merger = layer_set.merger();
1148 let aligned_old_size = round_down(old_size, block_size);
1149 let iter = merger
1150 .query(Query::FullRange(&ObjectKey::extent(
1151 self.object_id(),
1152 self.attribute_id(),
1153 aligned_old_size..aligned_old_size + 1,
1154 )))
1155 .await?;
1156 if let Some(ItemRef {
1157 key:
1158 ObjectKey {
1159 object_id,
1160 data:
1161 ObjectKeyData::Attribute(attribute_id, AttributeKey::Extent(extent_key)),
1162 },
1163 value: ObjectValue::Extent(ExtentValue::Some { device_offset, key_id, .. }),
1164 ..
1165 }) = iter.get()
1166 {
1167 if *object_id == self.object_id() && *attribute_id == self.attribute_id() {
1168 let device_offset = device_offset
1169 .checked_add(aligned_old_size - extent_key.range.start)
1170 .ok_or(FxfsError::Inconsistent)?;
1171 ensure!(device_offset % block_size == 0, FxfsError::Inconsistent);
1172 let mut buf = self.allocate_buffer(block_size as usize).await;
1173 self.read_and_decrypt(
1179 device_offset,
1180 aligned_old_size,
1181 buf.as_mut(),
1182 *key_id,
1183 None,
1184 )
1185 .await?;
1186 buf.as_mut_slice()[(old_size % block_size) as usize..].fill(0);
1187 self.multi_write(
1188 transaction,
1189 *attribute_id,
1190 &[aligned_old_size..aligned_old_size + block_size],
1191 buf.as_mut(),
1192 )
1193 .await?;
1194 }
1195 }
1196 }
1197 self.txn_update_size(transaction, size, None).await?;
1198 Ok(())
1199 }
1200
1201 pub async fn preallocate_range<'a>(
1214 &'a self,
1215 transaction: &mut Transaction<'a>,
1216 file_range: &mut Range<u64>,
1217 ) -> Result<Vec<Range<u64>>, Error> {
1218 let block_size = self.block_size();
1219 assert!(file_range.is_aligned(block_size));
1220 assert!(!self.handle.is_encrypted());
1221 let mut ranges = Vec::new();
1222 let tree = &self.store().tree;
1223 let layer_set = tree.layer_set();
1224 let mut merger = layer_set.merger();
1225 let mut iter = merger
1226 .query(Query::FullRange(&ObjectKey::attribute(
1227 self.object_id(),
1228 self.attribute_id(),
1229 AttributeKey::Extent(ExtentKey::search_key_from_offset(file_range.start)),
1230 )))
1231 .await?;
1232 let mut allocated = 0;
1233 let key_id = self.get_key(None).await?.map_or(0, |k| k.key_id());
1234 'outer: while file_range.start < file_range.end {
1235 let allocate_end = loop {
1236 match iter.get() {
1237 Some(ItemRef {
1239 key:
1240 ObjectKey {
1241 object_id,
1242 data:
1243 ObjectKeyData::Attribute(
1244 attribute_id,
1245 AttributeKey::Extent(ExtentKey { range }),
1246 ),
1247 },
1248 value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1249 ..
1250 }) if *object_id == self.object_id()
1251 && *attribute_id == self.attribute_id()
1252 && range.start < file_range.end =>
1253 {
1254 ensure!(
1255 range.is_valid()
1256 && range.is_aligned(block_size)
1257 && device_offset % block_size == 0,
1258 FxfsError::Inconsistent
1259 );
1260 if range.start <= file_range.start {
1262 let device_range = device_offset
1264 .checked_add(file_range.start - range.start)
1265 .ok_or(FxfsError::Inconsistent)?
1266 ..device_offset
1267 .checked_add(min(range.end, file_range.end) - range.start)
1268 .ok_or(FxfsError::Inconsistent)?;
1269 file_range.start += device_range.end - device_range.start;
1270 ranges.push(device_range);
1271 if file_range.start >= file_range.end {
1272 break 'outer;
1273 }
1274 iter.advance().await?;
1275 continue;
1276 } else {
1277 break range.start;
1280 }
1281 }
1282 Some(ItemRef {
1284 key:
1285 ObjectKey {
1286 object_id,
1287 data:
1288 ObjectKeyData::Attribute(
1289 attribute_id,
1290 AttributeKey::Extent(ExtentKey { range }),
1291 ),
1292 },
1293 value: ObjectValue::Extent(ExtentValue::None),
1294 ..
1295 }) if *object_id == self.object_id()
1296 && *attribute_id == self.attribute_id()
1297 && range.end < file_range.end =>
1298 {
1299 iter.advance().await?;
1300 }
1301 _ => {
1302 break file_range.end;
1304 }
1305 }
1306 };
1307 let device_range = self
1308 .store()
1309 .allocator()
1310 .allocate(
1311 transaction,
1312 self.store().store_object_id(),
1313 allocate_end - file_range.start,
1314 )
1315 .await
1316 .context("Allocation failed")?;
1317 allocated += device_range.end - device_range.start;
1318 let this_file_range =
1319 file_range.start..file_range.start + device_range.end - device_range.start;
1320 file_range.start = this_file_range.end;
1321 transaction.add(
1322 self.store().store_object_id,
1323 Mutation::merge_object(
1324 ObjectKey::extent(self.object_id(), self.attribute_id(), this_file_range),
1325 ObjectValue::Extent(ExtentValue::new_raw(device_range.start, key_id)),
1326 ),
1327 );
1328 ranges.push(device_range);
1329 if transaction.mutations().len() > TRANSACTION_MUTATION_THRESHOLD {
1332 break;
1333 }
1334 }
1335 if file_range.start > round_up(self.txn_get_size(transaction), block_size).unwrap() {
1337 self.txn_update_size(transaction, file_range.start, None).await?;
1338 }
1339 self.update_allocated_size(transaction, allocated, 0).await?;
1340 Ok(ranges)
1341 }
1342
1343 pub async fn update_attributes<'a>(
1344 &self,
1345 transaction: &mut Transaction<'a>,
1346 node_attributes: Option<&fio::MutableNodeAttributes>,
1347 change_time: Option<Timestamp>,
1348 ) -> Result<(), Error> {
1349 ensure!(
1352 !matches!(
1353 node_attributes,
1354 Some(fio::MutableNodeAttributes { wrapping_key_id: Some(_), .. })
1355 ),
1356 FxfsError::BadPath
1357 );
1358 self.handle.update_attributes(transaction, node_attributes, change_time).await
1359 }
1360
1361 pub fn default_transaction_options<'b>(&self) -> Options<'b> {
1364 self.handle.default_transaction_options()
1365 }
1366
1367 pub async fn new_transaction<'b>(&self) -> Result<Transaction<'b>, Error> {
1368 self.new_transaction_with_options(self.default_transaction_options()).await
1369 }
1370
1371 pub async fn new_transaction_with_options<'b>(
1372 &self,
1373 options: Options<'b>,
1374 ) -> Result<Transaction<'b>, Error> {
1375 self.handle.new_transaction_with_options(self.attribute_id(), options).await
1376 }
1377
1378 pub async fn flush_device(&self) -> Result<(), Error> {
1380 self.handle.flush_device().await
1381 }
1382
1383 pub async fn read_attr(&self, attribute_id: u64) -> Result<Option<Box<[u8]>>, Error> {
1385 self.handle.read_attr(attribute_id).await
1386 }
1387
1388 pub async fn write_attr(&self, attribute_id: u64, data: &[u8]) -> Result<(), Error> {
1390 assert_ne!(attribute_id, self.attribute_id());
1392 let store = self.store();
1393 let mut transaction = self.new_transaction().await?;
1394 if self.handle.write_attr(&mut transaction, attribute_id, data).await?.0 {
1395 transaction.commit_and_continue().await?;
1396 while matches!(
1397 store
1398 .trim_some(
1399 &mut transaction,
1400 self.object_id(),
1401 attribute_id,
1402 TrimMode::FromOffset(data.len() as u64),
1403 )
1404 .await?,
1405 TrimResult::Incomplete
1406 ) {
1407 transaction.commit_and_continue().await?;
1408 }
1409 }
1410 transaction.commit().await?;
1411 Ok(())
1412 }
1413
1414 async fn read_and_decrypt(
1415 &self,
1416 device_offset: u64,
1417 file_offset: u64,
1418 buffer: MutableBufferRef<'_>,
1419 key_id: u64,
1420 block_bitmap: Option<bit_vec::BitVec>,
1421 ) -> Result<(), Error> {
1422 self.handle.read_and_decrypt(device_offset, file_offset, buffer, key_id, block_bitmap).await
1423 }
1424
1425 pub async fn truncate_with_options(
1430 &self,
1431 options: Options<'_>,
1432 size: u64,
1433 ) -> Result<(), Error> {
1434 let mut transaction = self.new_transaction_with_options(options).await?;
1435 let old_size = self.get_size();
1436 if size == old_size {
1437 return Ok(());
1438 }
1439 if size < old_size {
1440 let update_has_overwrite_ranges = self.truncate_overwrite_ranges(size)?;
1441 if self.shrink(&mut transaction, size, update_has_overwrite_ranges).await?.0 {
1442 transaction.commit_and_continue().await?;
1444 let store = self.store();
1445 while matches!(
1446 store
1447 .trim_some(
1448 &mut transaction,
1449 self.object_id(),
1450 self.attribute_id(),
1451 TrimMode::FromOffset(size)
1452 )
1453 .await?,
1454 TrimResult::Incomplete
1455 ) {
1456 if let Err(error) = transaction.commit_and_continue().await {
1457 warn!(error:?; "Failed to trim after truncate");
1458 return Ok(());
1459 }
1460 }
1461 if let Err(error) = transaction.commit().await {
1462 warn!(error:?; "Failed to trim after truncate");
1463 }
1464 return Ok(());
1465 }
1466 } else {
1467 self.grow(&mut transaction, old_size, size).await?;
1468 }
1469 transaction.commit().await?;
1470 Ok(())
1471 }
1472
1473 pub async fn get_properties(&self) -> Result<ObjectProperties, Error> {
1474 let item = self
1478 .store()
1479 .tree
1480 .find(&ObjectKey::object(self.object_id()))
1481 .await?
1482 .expect("Unable to find object record");
1483 match item.value {
1484 ObjectValue::Object {
1485 kind: ObjectKind::File { refs, .. },
1486 attributes:
1487 ObjectAttributes {
1488 creation_time,
1489 modification_time,
1490 posix_attributes,
1491 allocated_size,
1492 access_time,
1493 change_time,
1494 ..
1495 },
1496 } => Ok(ObjectProperties {
1497 refs,
1498 allocated_size,
1499 data_attribute_size: self.get_size(),
1500 creation_time,
1501 modification_time,
1502 access_time,
1503 change_time,
1504 sub_dirs: 0,
1505 posix_attributes,
1506 casefold: false,
1507 wrapping_key_id: None,
1508 }),
1509 _ => bail!(FxfsError::NotFile),
1510 }
1511 }
1512
1513 pub async fn contents(&self, limit: usize) -> Result<Box<[u8]>, Error> {
1515 let size = self.get_size();
1516 if size > limit as u64 {
1517 bail!("Object too big ({} > {})", size, limit);
1518 }
1519 let mut buf = self.allocate_buffer(size as usize).await;
1520 self.read(0u64, buf.as_mut()).await?;
1521 Ok(buf.as_slice().into())
1522 }
1523
1524 pub async fn device_extents(&self) -> Result<Vec<(u64, Range<u64>)>, Error> {
1527 let mut extents = Vec::new();
1528 let tree = &self.store().tree;
1529 let layer_set = tree.layer_set();
1530 let mut merger = layer_set.merger();
1531 let mut iter = merger
1532 .query(Query::FullRange(&ObjectKey::attribute(
1533 self.object_id(),
1534 self.attribute_id(),
1535 AttributeKey::Extent(ExtentKey::search_key_from_offset(0)),
1536 )))
1537 .await?;
1538 loop {
1539 match iter.get() {
1540 Some(ItemRef {
1541 key:
1542 ObjectKey {
1543 object_id,
1544 data:
1545 ObjectKeyData::Attribute(
1546 attribute_id,
1547 AttributeKey::Extent(ExtentKey { range }),
1548 ),
1549 },
1550 value: ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1551 ..
1552 }) if *object_id == self.object_id() && *attribute_id == self.attribute_id() => {
1553 extents.push((
1554 range.start,
1555 *device_offset..*device_offset + range.length().unwrap(),
1556 ))
1557 }
1558 _ => break,
1559 }
1560 iter.advance().await?;
1561 }
1562 Ok(extents)
1563 }
1564}
1565
1566impl<S: HandleOwner> AssociatedObject for DataObjectHandle<S> {
1567 fn will_apply_mutation(&self, mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
1568 match mutation {
1569 Mutation::ObjectStore(ObjectStoreMutation {
1570 item: ObjectItem { value: ObjectValue::Attribute { size, .. }, .. },
1571 ..
1572 }) => self.content_size.store(*size, atomic::Ordering::Relaxed),
1573 Mutation::ObjectStore(ObjectStoreMutation {
1574 item: ObjectItem { value: ObjectValue::VerifiedAttribute { size, .. }, .. },
1575 ..
1576 }) => {
1577 debug_assert_eq!(self.get_size(), *size, "VerifiedAttribute size should be set when verity is enabled and should not change");
1578 self.finalize_fsverity_state()
1579 }
1580 Mutation::ObjectStore(ObjectStoreMutation {
1581 item:
1582 ObjectItem {
1583 key:
1584 ObjectKey {
1585 object_id,
1586 data:
1587 ObjectKeyData::Attribute(
1588 attr_id,
1589 AttributeKey::Extent(ExtentKey { range }),
1590 ),
1591 },
1592 value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
1593 ..
1594 },
1595 ..
1596 }) if self.object_id() == *object_id && self.attribute_id() == *attr_id => match mode {
1597 ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => {
1598 self.overwrite_ranges.apply_range(range.clone())
1599 }
1600 ExtentMode::Raw | ExtentMode::Cow(_) => (),
1601 },
1602 _ => {}
1603 }
1604 }
1605}
1606
1607impl<S: HandleOwner> ObjectHandle for DataObjectHandle<S> {
1608 fn set_trace(&self, v: bool) {
1609 self.handle.set_trace(v)
1610 }
1611
1612 fn object_id(&self) -> u64 {
1613 self.handle.object_id()
1614 }
1615
1616 fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
1617 self.handle.allocate_buffer(size)
1618 }
1619
1620 fn block_size(&self) -> u64 {
1621 self.handle.block_size()
1622 }
1623}
1624
1625#[async_trait]
1626impl<S: HandleOwner> ReadObjectHandle for DataObjectHandle<S> {
1627 async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
1628 let fs = self.store().filesystem();
1629 let guard = fs
1630 .lock_manager()
1631 .read_lock(lock_keys![LockKey::object_attribute(
1632 self.store().store_object_id,
1633 self.object_id(),
1634 self.attribute_id(),
1635 )])
1636 .await;
1637
1638 let size = self.get_size();
1639 if offset >= size {
1640 return Ok(0);
1641 }
1642 let length = min(buf.len() as u64, size - offset) as usize;
1643 buf = buf.subslice_mut(0..length);
1644 self.handle.read_unchecked(self.attribute_id(), offset, buf.reborrow(), &guard).await?;
1645 if self.is_verified_file() {
1646 self.verify_data(offset as usize, buf.as_slice()).await?;
1647 }
1648 Ok(length)
1649 }
1650
1651 fn get_size(&self) -> u64 {
1652 self.content_size.load(atomic::Ordering::Relaxed)
1653 }
1654}
1655
1656impl<S: HandleOwner> WriteObjectHandle for DataObjectHandle<S> {
1657 async fn write_or_append(&self, offset: Option<u64>, buf: BufferRef<'_>) -> Result<u64, Error> {
1658 let offset = offset.unwrap_or_else(|| self.get_size());
1659 let mut transaction = self.new_transaction().await?;
1660 self.txn_write(&mut transaction, offset, buf).await?;
1661 let new_size = self.txn_get_size(&transaction);
1662 transaction.commit().await?;
1663 Ok(new_size)
1664 }
1665
1666 async fn truncate(&self, size: u64) -> Result<(), Error> {
1667 self.truncate_with_options(self.default_transaction_options(), size).await
1668 }
1669
1670 async fn flush(&self) -> Result<(), Error> {
1671 Ok(())
1672 }
1673}
1674
1675pub struct DirectWriter<'a, S: HandleOwner> {
1678 handle: &'a DataObjectHandle<S>,
1679 options: transaction::Options<'a>,
1680 buffer: Buffer<'a>,
1681 offset: u64,
1682 buf_offset: usize,
1683}
1684
1685const BUFFER_SIZE: usize = 1_048_576;
1686
1687impl<S: HandleOwner> Drop for DirectWriter<'_, S> {
1688 fn drop(&mut self) {
1689 if self.buf_offset != 0 {
1690 warn!("DirectWriter: dropping data, did you forget to call complete?");
1691 }
1692 }
1693}
1694
1695impl<'a, S: HandleOwner> DirectWriter<'a, S> {
1696 pub async fn new(
1697 handle: &'a DataObjectHandle<S>,
1698 options: transaction::Options<'a>,
1699 ) -> DirectWriter<'a, S> {
1700 Self {
1701 handle,
1702 options,
1703 buffer: handle.allocate_buffer(BUFFER_SIZE).await,
1704 offset: 0,
1705 buf_offset: 0,
1706 }
1707 }
1708
1709 async fn flush(&mut self) -> Result<(), Error> {
1710 let mut transaction = self.handle.new_transaction_with_options(self.options).await?;
1711 self.handle
1712 .txn_write(&mut transaction, self.offset, self.buffer.subslice(..self.buf_offset))
1713 .await?;
1714 transaction.commit().await?;
1715 self.offset += self.buf_offset as u64;
1716 self.buf_offset = 0;
1717 Ok(())
1718 }
1719}
1720
1721impl<'a, S: HandleOwner> WriteBytes for DirectWriter<'a, S> {
1722 fn block_size(&self) -> u64 {
1723 self.handle.block_size()
1724 }
1725
1726 async fn write_bytes(&mut self, mut buf: &[u8]) -> Result<(), Error> {
1727 while buf.len() > 0 {
1728 let to_do = std::cmp::min(buf.len(), BUFFER_SIZE - self.buf_offset);
1729 self.buffer
1730 .subslice_mut(self.buf_offset..self.buf_offset + to_do)
1731 .as_mut_slice()
1732 .copy_from_slice(&buf[..to_do]);
1733 self.buf_offset += to_do;
1734 if self.buf_offset == BUFFER_SIZE {
1735 self.flush().await?;
1736 }
1737 buf = &buf[to_do..];
1738 }
1739 Ok(())
1740 }
1741
1742 async fn complete(&mut self) -> Result<(), Error> {
1743 self.flush().await?;
1744 Ok(())
1745 }
1746
1747 async fn skip(&mut self, amount: u64) -> Result<(), Error> {
1748 if (BUFFER_SIZE - self.buf_offset) as u64 > amount {
1749 self.buffer
1750 .subslice_mut(self.buf_offset..self.buf_offset + amount as usize)
1751 .as_mut_slice()
1752 .fill(0);
1753 self.buf_offset += amount as usize;
1754 } else {
1755 self.flush().await?;
1756 self.offset += amount;
1757 }
1758 Ok(())
1759 }
1760}
1761
1762#[cfg(test)]
1763mod tests {
1764 use crate::errors::FxfsError;
1765 use crate::filesystem::{
1766 FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem, SyncOptions,
1767 };
1768 use crate::fsck::{
1769 fsck, fsck_volume, fsck_volume_with_options, fsck_with_options, FsckOptions,
1770 };
1771 use crate::lsm_tree::types::{ItemRef, LayerIterator};
1772 use crate::lsm_tree::Query;
1773 use crate::object_handle::{
1774 ObjectHandle, ObjectProperties, ReadObjectHandle, WriteObjectHandle,
1775 };
1776 use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
1777 use crate::object_store::directory::replace_child;
1778 use crate::object_store::object_record::{ObjectKey, ObjectValue, Timestamp};
1779 use crate::object_store::transaction::{lock_keys, Mutation, Options};
1780 use crate::object_store::volume::root_volume;
1781 use crate::object_store::{
1782 AttributeKey, DataObjectHandle, Directory, ExtentKey, ExtentMode, ExtentValue,
1783 HandleOptions, LockKey, ObjectKeyData, ObjectStore, PosixAttributes,
1784 FSVERITY_MERKLE_ATTRIBUTE_ID, NO_OWNER, TRANSACTION_MUTATION_THRESHOLD,
1785 };
1786 use crate::range::RangeExt;
1787 use crate::round::{round_down, round_up};
1788 use assert_matches::assert_matches;
1789 use bit_vec::BitVec;
1790 use fuchsia_sync::Mutex;
1791 use futures::channel::oneshot::channel;
1792 use futures::stream::{FuturesUnordered, StreamExt};
1793 use futures::FutureExt;
1794 use fxfs_crypto::{Crypt, KeyPurpose};
1795 use fxfs_insecure_crypto::InsecureCrypt;
1796 use mundane::hash::{Digest, Hasher, Sha256};
1797 use rand::Rng;
1798 use std::ops::Range;
1799 use std::sync::Arc;
1800 use std::time::Duration;
1801 use storage_device::fake_device::FakeDevice;
1802 use storage_device::DeviceHolder;
1803 use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
1804
1805 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1806
1807 const TEST_DATA_OFFSET: u64 = 5000;
1810 const TEST_DATA: &[u8] = b"hello";
1811 const TEST_OBJECT_SIZE: u64 = 5678;
1812 const TEST_OBJECT_ALLOCATED_SIZE: u64 = 4096;
1813 const TEST_OBJECT_NAME: &str = "foo";
1814
1815 async fn test_filesystem() -> OpenFxFilesystem {
1816 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1817 FxFilesystem::new_empty(device).await.expect("new_empty failed")
1818 }
1819
1820 async fn test_filesystem_and_object_with_key(
1821 crypt: Option<&dyn Crypt>,
1822 write_object_test_data: bool,
1823 ) -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
1824 let fs = test_filesystem().await;
1825 let store = fs.root_store();
1826 let object;
1827
1828 let mut transaction = fs
1829 .clone()
1830 .new_transaction(
1831 lock_keys![LockKey::object(
1832 store.store_object_id(),
1833 store.root_directory_object_id()
1834 )],
1835 Options::default(),
1836 )
1837 .await
1838 .expect("new_transaction failed");
1839
1840 object = if let Some(crypt) = crypt {
1841 let object_id = store.get_next_object_id(transaction.txn_guard()).await.unwrap();
1842 let (key, unwrapped_key) = crypt.create_key(object_id, KeyPurpose::Data).await.unwrap();
1843 ObjectStore::create_object_with_key(
1844 &store,
1845 &mut transaction,
1846 object_id,
1847 HandleOptions::default(),
1848 key,
1849 unwrapped_key,
1850 )
1851 .await
1852 .expect("create_object failed")
1853 } else {
1854 ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
1855 .await
1856 .expect("create_object failed")
1857 };
1858
1859 let root_directory =
1860 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1861 root_directory
1862 .add_child_file(&mut transaction, TEST_OBJECT_NAME, &object)
1863 .await
1864 .expect("add_child_file failed");
1865
1866 if write_object_test_data {
1867 let align = TEST_DATA_OFFSET as usize % TEST_DEVICE_BLOCK_SIZE as usize;
1868 let mut buf = object.allocate_buffer(align + TEST_DATA.len()).await;
1869 buf.as_mut_slice()[align..].copy_from_slice(TEST_DATA);
1870 object
1871 .txn_write(&mut transaction, TEST_DATA_OFFSET, buf.subslice(align..))
1872 .await
1873 .expect("write failed");
1874 }
1875 transaction.commit().await.expect("commit failed");
1876 object.truncate(TEST_OBJECT_SIZE).await.expect("truncate failed");
1877 (fs, object)
1878 }
1879
1880 async fn test_filesystem_and_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>) {
1881 test_filesystem_and_object_with_key(Some(&InsecureCrypt::new()), true).await
1882 }
1883
1884 async fn test_filesystem_and_empty_object() -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>)
1885 {
1886 test_filesystem_and_object_with_key(Some(&InsecureCrypt::new()), false).await
1887 }
1888
1889 #[fuchsia::test]
1890 async fn test_zero_buf_len_read() {
1891 let (fs, object) = test_filesystem_and_object().await;
1892 let mut buf = object.allocate_buffer(0).await;
1893 assert_eq!(object.read(0u64, buf.as_mut()).await.expect("read failed"), 0);
1894 fs.close().await.expect("Close failed");
1895 }
1896
1897 #[fuchsia::test]
1898 async fn test_beyond_eof_read() {
1899 let (fs, object) = test_filesystem_and_object().await;
1900 let offset = TEST_OBJECT_SIZE as usize - 2;
1901 let align = offset % fs.block_size() as usize;
1902 let len: usize = 2;
1903 let mut buf = object.allocate_buffer(align + len + 1).await;
1904 buf.as_mut_slice().fill(123u8);
1905 assert_eq!(
1906 object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
1907 align + len
1908 );
1909 assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
1910 assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
1911 fs.close().await.expect("Close failed");
1912 }
1913
1914 #[fuchsia::test]
1915 async fn test_beyond_eof_read_from() {
1916 let (fs, object) = test_filesystem_and_object().await;
1917 let handle = &*object;
1918 let offset = TEST_OBJECT_SIZE as usize - 2;
1919 let align = offset % fs.block_size() as usize;
1920 let len: usize = 2;
1921 let mut buf = object.allocate_buffer(align + len + 1).await;
1922 buf.as_mut_slice().fill(123u8);
1923 assert_eq!(
1924 handle.read(0, (offset - align) as u64, buf.as_mut()).await.expect("read failed"),
1925 align + len
1926 );
1927 assert_eq!(&buf.as_slice()[align..align + len], &vec![0u8; len]);
1928 assert_eq!(&buf.as_slice()[align + len..], &vec![123u8; buf.len() - align - len]);
1929 fs.close().await.expect("Close failed");
1930 }
1931
1932 #[fuchsia::test]
1933 async fn test_beyond_eof_read_unchecked() {
1934 let (fs, object) = test_filesystem_and_object().await;
1935 let offset = TEST_OBJECT_SIZE as usize - 2;
1936 let align = offset % fs.block_size() as usize;
1937 let len: usize = 2;
1938 let mut buf = object.allocate_buffer(align + len + 1).await;
1939 buf.as_mut_slice().fill(123u8);
1940 let guard = fs
1941 .lock_manager()
1942 .read_lock(lock_keys![LockKey::object_attribute(
1943 object.store().store_object_id,
1944 object.object_id(),
1945 0,
1946 )])
1947 .await;
1948 object
1949 .read_unchecked(0, (offset - align) as u64, buf.as_mut(), &guard)
1950 .await
1951 .expect("read failed");
1952 assert_eq!(&buf.as_slice()[align..], &vec![0u8; len + 1]);
1953 fs.close().await.expect("Close failed");
1954 }
1955
1956 #[fuchsia::test]
1957 async fn test_read_sparse() {
1958 let (fs, object) = test_filesystem_and_object().await;
1959 let len = TEST_OBJECT_SIZE as usize - 1;
1961 let mut buf = object.allocate_buffer(len).await;
1962 buf.as_mut_slice().fill(123u8);
1963 assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
1964 let mut expected = vec![0; len];
1965 let offset = TEST_DATA_OFFSET as usize;
1966 expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
1967 assert_eq!(buf.as_slice()[..len], expected[..]);
1968 fs.close().await.expect("Close failed");
1969 }
1970
1971 #[fuchsia::test]
1972 async fn test_read_after_writes_interspersed_with_flush() {
1973 let (fs, object) = test_filesystem_and_object().await;
1974
1975 object.owner().flush().await.expect("flush failed");
1976
1977 let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
1979 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1980 object.write_or_append(Some(0u64), buf.as_ref()).await.expect("write failed");
1981
1982 let len = TEST_OBJECT_SIZE as usize - 1;
1983 let mut buf = object.allocate_buffer(len).await;
1984 buf.as_mut_slice().fill(123u8);
1985 assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), len);
1986
1987 let mut expected = vec![0u8; len];
1988 let offset = TEST_DATA_OFFSET as usize;
1989 expected[offset..offset + TEST_DATA.len()].copy_from_slice(TEST_DATA);
1990 expected[..TEST_DATA.len()].copy_from_slice(TEST_DATA);
1991 assert_eq!(buf.as_slice(), &expected);
1992 fs.close().await.expect("Close failed");
1993 }
1994
1995 #[fuchsia::test]
1996 async fn test_read_after_truncate_and_extend() {
1997 let (fs, object) = test_filesystem_and_object().await;
1998
1999 let mut buf = object.allocate_buffer(TEST_DATA.len()).await;
2001 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2002 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2004 object.truncate(3).await.expect("truncate failed");
2006 let data = b"foo";
2007 let offset = 1500u64;
2008 let align = (offset % fs.block_size() as u64) as usize;
2009 let mut buf = object.allocate_buffer(align + data.len()).await;
2010 buf.as_mut_slice()[align..].copy_from_slice(data);
2011 object.write_or_append(Some(1500), buf.subslice(align..)).await.expect("write failed");
2013
2014 const LEN1: usize = 1503;
2015 let mut buf = object.allocate_buffer(LEN1).await;
2016 buf.as_mut_slice().fill(123u8);
2017 assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN1);
2018 let mut expected = [0; LEN1];
2019 expected[..3].copy_from_slice(&TEST_DATA[..3]);
2020 expected[1500..].copy_from_slice(b"foo");
2021 assert_eq!(buf.as_slice(), &expected);
2022
2023 const LEN2: usize = 601;
2025 let mut buf = object.allocate_buffer(LEN2).await;
2026 buf.as_mut_slice().fill(123u8);
2027 assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), LEN2);
2028 assert_eq!(buf.as_slice(), &expected[..LEN2]);
2029 fs.close().await.expect("Close failed");
2030 }
2031
2032 #[fuchsia::test]
2033 async fn test_read_whole_blocks_with_multiple_objects() {
2034 let (fs, object) = test_filesystem_and_object().await;
2035 let block_size = object.block_size() as usize;
2036 let mut buffer = object.allocate_buffer(block_size).await;
2037 buffer.as_mut_slice().fill(0xaf);
2038 object.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2039
2040 let store = object.owner();
2041 let mut transaction = fs
2042 .clone()
2043 .new_transaction(lock_keys![], Options::default())
2044 .await
2045 .expect("new_transaction failed");
2046 let object2 =
2047 ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2048 .await
2049 .expect("create_object failed");
2050 transaction.commit().await.expect("commit failed");
2051 let mut ef_buffer = object.allocate_buffer(block_size).await;
2052 ef_buffer.as_mut_slice().fill(0xef);
2053 object2.write_or_append(Some(0), ef_buffer.as_ref()).await.expect("write failed");
2054
2055 let mut buffer = object.allocate_buffer(block_size).await;
2056 buffer.as_mut_slice().fill(0xaf);
2057 object
2058 .write_or_append(Some(block_size as u64), buffer.as_ref())
2059 .await
2060 .expect("write failed");
2061 object.truncate(3 * block_size as u64).await.expect("truncate failed");
2062 object2
2063 .write_or_append(Some(block_size as u64), ef_buffer.as_ref())
2064 .await
2065 .expect("write failed");
2066
2067 let mut buffer = object.allocate_buffer(4 * block_size).await;
2068 buffer.as_mut_slice().fill(123);
2069 assert_eq!(object.read(0, buffer.as_mut()).await.expect("read failed"), 3 * block_size);
2070 assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xaf; 2 * block_size]);
2071 assert_eq!(&buffer.as_slice()[2 * block_size..3 * block_size], &vec![0; block_size]);
2072 assert_eq!(object2.read(0, buffer.as_mut()).await.expect("read failed"), 2 * block_size);
2073 assert_eq!(&buffer.as_slice()[..2 * block_size], &vec![0xef; 2 * block_size]);
2074 fs.close().await.expect("Close failed");
2075 }
2076
2077 #[fuchsia::test]
2078 async fn test_alignment() {
2079 let (fs, object) = test_filesystem_and_object().await;
2080
2081 struct AlignTest {
2082 fill: u8,
2083 object: DataObjectHandle<ObjectStore>,
2084 mirror: Vec<u8>,
2085 }
2086
2087 impl AlignTest {
2088 async fn new(object: DataObjectHandle<ObjectStore>) -> Self {
2089 let mirror = {
2090 let mut buf = object.allocate_buffer(object.get_size() as usize).await;
2091 assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2092 buf.as_slice().to_vec()
2093 };
2094 Self { fill: 0, object, mirror }
2095 }
2096
2097 async fn test(&mut self, range: Range<u64>) {
2102 let mut buf = self.object.allocate_buffer((range.end - range.start) as usize).await;
2103 self.fill += 1;
2104 buf.as_mut_slice().fill(self.fill);
2105 self.object
2106 .write_or_append(Some(range.start), buf.as_ref())
2107 .await
2108 .expect("write_or_append failed");
2109 if range.end > self.mirror.len() as u64 {
2110 self.mirror.resize(range.end as usize, 0);
2111 }
2112 self.mirror[range.start as usize..range.end as usize].fill(self.fill);
2113 let mut buf = self.object.allocate_buffer(self.mirror.len() + 1).await;
2114 assert_eq!(
2115 self.object.read(0, buf.as_mut()).await.expect("read failed"),
2116 self.mirror.len()
2117 );
2118 assert_eq!(&buf.as_slice()[..self.mirror.len()], self.mirror.as_slice());
2119 }
2120 }
2121
2122 let block_size = object.block_size() as u64;
2123 let mut align = AlignTest::new(object).await;
2124
2125 align.test(0..2 * block_size + 1).await;
2127
2128 align.test(1..block_size).await;
2130 align.test(1..2 * block_size).await;
2131
2132 align.test(0..block_size - 1).await;
2134 align.test(0..2 * block_size - 1).await;
2135
2136 align.test(1..block_size - 1).await;
2138 align.test(1..2 * block_size - 1).await;
2139
2140 fs.close().await.expect("Close failed");
2141 }
2142
2143 async fn test_preallocate_common(fs: &FxFilesystem, object: DataObjectHandle<ObjectStore>) {
2144 let allocator = fs.allocator();
2145 let allocated_before = allocator.get_allocated_bytes();
2146 let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2147 object
2148 .preallocate_range(&mut transaction, &mut (0..fs.block_size() as u64))
2149 .await
2150 .expect("preallocate_range failed");
2151 transaction.commit().await.expect("commit failed");
2152 assert!(object.get_size() < 1048576);
2153 let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2154 object
2155 .preallocate_range(&mut transaction, &mut (0..1048576))
2156 .await
2157 .expect("preallocate_range failed");
2158 transaction.commit().await.expect("commit failed");
2159 assert_eq!(object.get_size(), 1048576);
2160 let allocated_after = allocator.get_allocated_bytes();
2162 assert_eq!(allocated_after - allocated_before, 1048576 - fs.block_size() as u64);
2163
2164 let mut buf = object
2165 .allocate_buffer(round_up(TEST_DATA_OFFSET, fs.block_size()).unwrap() as usize)
2166 .await;
2167 buf.as_mut_slice().fill(47);
2168 object
2169 .write_or_append(Some(0), buf.subslice(..TEST_DATA_OFFSET as usize))
2170 .await
2171 .expect("write failed");
2172 buf.as_mut_slice().fill(95);
2173 let offset = round_up(TEST_OBJECT_SIZE, fs.block_size()).unwrap();
2174 object.overwrite(offset, buf.as_mut(), false).await.expect("write failed");
2175
2176 assert_eq!(allocator.get_allocated_bytes(), allocated_after);
2178
2179 let mut buf = object.allocate_buffer(104876).await;
2181 assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), buf.len());
2182 assert_eq!(&buf.as_slice()[..TEST_DATA_OFFSET as usize], &[47; TEST_DATA_OFFSET as usize]);
2183 assert_eq!(
2184 &buf.as_slice()[TEST_DATA_OFFSET as usize..TEST_DATA_OFFSET as usize + TEST_DATA.len()],
2185 TEST_DATA
2186 );
2187 assert_eq!(&buf.as_slice()[offset as usize..offset as usize + 2048], &[95; 2048]);
2188 }
2189
2190 #[fuchsia::test]
2191 async fn test_preallocate_range() {
2192 let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2193 test_preallocate_common(&fs, object).await;
2194 fs.close().await.expect("Close failed");
2195 }
2196
2197 #[fuchsia::test]
2200 async fn test_preallocate_succeeds_when_extents_are_in_different_layers() {
2201 let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2202 object.owner().flush().await.expect("flush failed");
2203 test_preallocate_common(&fs, object).await;
2204 fs.close().await.expect("Close failed");
2205 }
2206
2207 #[fuchsia::test]
2208 async fn test_already_preallocated() {
2209 let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
2210 let allocator = fs.allocator();
2211 let allocated_before = allocator.get_allocated_bytes();
2212 let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2213 let offset = TEST_DATA_OFFSET - TEST_DATA_OFFSET % fs.block_size() as u64;
2214 object
2215 .preallocate_range(&mut transaction, &mut (offset..offset + fs.block_size() as u64))
2216 .await
2217 .expect("preallocate_range failed");
2218 transaction.commit().await.expect("commit failed");
2219 assert_eq!(allocator.get_allocated_bytes(), allocated_before);
2221 fs.close().await.expect("Close failed");
2222 }
2223
2224 #[fuchsia::test]
2225 async fn test_overwrite_when_preallocated_at_start_of_file() {
2226 let (fs, object) = test_filesystem_and_empty_object().await;
2229
2230 let object = ObjectStore::open_object(
2231 object.owner(),
2232 object.object_id(),
2233 HandleOptions::default(),
2234 None,
2235 )
2236 .await
2237 .expect("open_object failed");
2238
2239 assert_eq!(fs.block_size(), 4096);
2240
2241 let mut write_buf = object.allocate_buffer(4096).await;
2242 write_buf.as_mut_slice().fill(95);
2243
2244 object.overwrite(0, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
2247
2248 let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2250 object
2251 .preallocate_range(&mut transaction, &mut (0..4096 as u64))
2252 .await
2253 .expect("preallocate_range failed");
2254 transaction.commit().await.expect("commit failed");
2255
2256 {
2259 let mut read_buf = object.allocate_buffer(4096).await;
2260 object.read(0, read_buf.as_mut()).await.expect("read failed");
2261 assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2262 }
2263 object.overwrite(0, write_buf.as_mut(), false).await.expect("overwrite failed");
2264 {
2265 let mut read_buf = object.allocate_buffer(4096).await;
2266 object.read(0, read_buf.as_mut()).await.expect("read failed");
2267 assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2268 }
2269
2270 object.overwrite(4096, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
2273
2274 object.overwrite(4096, write_buf.as_mut(), true).await.expect("overwrite failed");
2277 {
2278 let mut read_buf = object.allocate_buffer(4096).await;
2279 object.read(4096, read_buf.as_mut()).await.expect("read failed");
2280 assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2281 }
2282
2283 let fsck_options = FsckOptions {
2285 fail_on_warning: true,
2286 no_lock: true,
2287 on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2288 ..Default::default()
2289 };
2290 fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2291
2292 fs.close().await.expect("Close failed");
2293 }
2294
2295 #[fuchsia::test]
2296 async fn test_overwrite_large_buffer_and_file_with_many_holes() {
2297 let (fs, object) = test_filesystem_and_empty_object().await;
2300
2301 let object = ObjectStore::open_object(
2302 object.owner(),
2303 object.object_id(),
2304 HandleOptions::default(),
2305 None,
2306 )
2307 .await
2308 .expect("open_object failed");
2309
2310 assert_eq!(fs.block_size(), 4096);
2311 assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2312
2313 let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2315 object
2316 .preallocate_range(&mut transaction, &mut (4096..8192 as u64))
2317 .await
2318 .expect("preallocate_range failed");
2319 object
2320 .preallocate_range(&mut transaction, &mut (16384..32768 as u64))
2321 .await
2322 .expect("preallocate_range failed");
2323 object
2324 .preallocate_range(&mut transaction, &mut (65536..131072 as u64))
2325 .await
2326 .expect("preallocate_range failed");
2327 object
2328 .preallocate_range(&mut transaction, &mut (262144..524288 as u64))
2329 .await
2330 .expect("preallocate_range failed");
2331 transaction.commit().await.expect("commit failed");
2332
2333 assert_eq!(object.get_size(), 524288);
2334
2335 let mut write_buf = object.allocate_buffer(4096).await;
2336 write_buf.as_mut_slice().fill(95);
2337
2338 object.overwrite(0, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
2340 object.overwrite(8192, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
2341 object.overwrite(32768, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
2342 object.overwrite(131072, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
2343
2344 {
2346 let mut read_buf = object.allocate_buffer(4096).await;
2347 object.read(4096, read_buf.as_mut()).await.expect("read failed");
2348 assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2349 }
2350 object.overwrite(4096, write_buf.as_mut(), false).await.expect("overwrite failed");
2351 {
2352 let mut read_buf = object.allocate_buffer(4096).await;
2353 object.read(4096, read_buf.as_mut()).await.expect("read failed");
2354 assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2355 }
2356 {
2357 let mut read_buf = object.allocate_buffer(4096).await;
2358 object.read(16384, read_buf.as_mut()).await.expect("read failed");
2359 assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2360 }
2361 object.overwrite(16384, write_buf.as_mut(), false).await.expect("overwrite failed");
2362 {
2363 let mut read_buf = object.allocate_buffer(4096).await;
2364 object.read(16384, read_buf.as_mut()).await.expect("read failed");
2365 assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2366 }
2367 {
2368 let mut read_buf = object.allocate_buffer(4096).await;
2369 object.read(65536, read_buf.as_mut()).await.expect("read failed");
2370 assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2371 }
2372 object.overwrite(65536, write_buf.as_mut(), false).await.expect("overwrite failed");
2373 {
2374 let mut read_buf = object.allocate_buffer(4096).await;
2375 object.read(65536, read_buf.as_mut()).await.expect("read failed");
2376 assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2377 }
2378 {
2379 let mut read_buf = object.allocate_buffer(4096).await;
2380 object.read(262144, read_buf.as_mut()).await.expect("read failed");
2381 assert_eq!(&read_buf.as_slice(), &[0; 4096]);
2382 }
2383 object.overwrite(262144, write_buf.as_mut(), false).await.expect("overwrite failed");
2384 {
2385 let mut read_buf = object.allocate_buffer(4096).await;
2386 object.read(262144, read_buf.as_mut()).await.expect("read failed");
2387 assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2388 }
2389
2390 let mut huge_write_buf = object.allocate_buffer(524288).await;
2392 huge_write_buf.as_mut_slice().fill(96);
2393
2394 object.overwrite(0, huge_write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
2396 object.overwrite(0, huge_write_buf.as_mut(), true).await.expect("overwrite failed");
2398 {
2399 let mut read_buf = object.allocate_buffer(524288).await;
2400 object.read(0, read_buf.as_mut()).await.expect("read failed");
2401 assert_eq!(&read_buf.as_slice(), &[96; 524288]);
2402 }
2403
2404 let fsck_options = FsckOptions {
2406 fail_on_warning: true,
2407 no_lock: true,
2408 on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2409 ..Default::default()
2410 };
2411 fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2412
2413 fs.close().await.expect("Close failed");
2414 }
2415
2416 #[fuchsia::test]
2417 async fn test_overwrite_when_unallocated_at_start_of_file() {
2418 let (fs, object) = test_filesystem_and_empty_object().await;
2421
2422 let object = ObjectStore::open_object(
2423 object.owner(),
2424 object.object_id(),
2425 HandleOptions::default(),
2426 None,
2427 )
2428 .await
2429 .expect("open_object failed");
2430
2431 assert_eq!(fs.block_size(), 4096);
2432
2433 let mut write_buf = object.allocate_buffer(4096).await;
2434 write_buf.as_mut_slice().fill(95);
2435
2436 object.overwrite(0, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
2439
2440 object.overwrite(0, write_buf.as_mut(), true).await.expect("overwrite failed");
2442 {
2443 let mut read_buf = object.allocate_buffer(4096).await;
2444 object.read(0, read_buf.as_mut()).await.expect("read failed");
2445 assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2446 }
2447
2448 object.overwrite(4096, write_buf.as_mut(), false).await.expect_err("overwrite succeeded");
2450
2451 object.overwrite(4096, write_buf.as_mut(), true).await.expect("overwrite failed");
2453 {
2454 let mut read_buf = object.allocate_buffer(4096).await;
2455 object.read(4096, read_buf.as_mut()).await.expect("read failed");
2456 assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2457 }
2458
2459 let fsck_options = FsckOptions {
2461 fail_on_warning: true,
2462 no_lock: true,
2463 on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2464 ..Default::default()
2465 };
2466 fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2467
2468 fs.close().await.expect("Close failed");
2469 }
2470
2471 #[fuchsia::test]
2472 async fn test_overwrite_can_extend_a_file() {
2473 let (fs, object) = test_filesystem_and_empty_object().await;
2476
2477 let object = ObjectStore::open_object(
2478 object.owner(),
2479 object.object_id(),
2480 HandleOptions::default(),
2481 None,
2482 )
2483 .await
2484 .expect("open_object failed");
2485
2486 assert_eq!(fs.block_size(), 4096);
2487 assert_eq!(object.get_size(), TEST_OBJECT_SIZE);
2488
2489 let mut write_buf = object.allocate_buffer(4096).await;
2490 write_buf.as_mut_slice().fill(95);
2491
2492 let last_block_offset = round_down(TEST_OBJECT_SIZE, 4096 as u32);
2494
2495 object
2497 .overwrite(last_block_offset, write_buf.as_mut(), false)
2498 .await
2499 .expect_err("overwrite succeeded");
2500 object
2502 .overwrite(last_block_offset, write_buf.as_mut(), true)
2503 .await
2504 .expect("overwrite failed");
2505 {
2506 let mut read_buf = object.allocate_buffer(4096).await;
2507 object.read(last_block_offset, read_buf.as_mut()).await.expect("read failed");
2508 assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2509 }
2510
2511 assert_eq!(object.get_size(), 8192);
2512
2513 let next_block_offset = round_up(TEST_OBJECT_SIZE, 4096 as u32).unwrap();
2515
2516 object
2518 .overwrite(next_block_offset, write_buf.as_mut(), false)
2519 .await
2520 .expect_err("overwrite succeeded");
2521 object
2523 .overwrite(next_block_offset, write_buf.as_mut(), true)
2524 .await
2525 .expect("overwrite failed");
2526 {
2527 let mut read_buf = object.allocate_buffer(4096).await;
2528 object.read(next_block_offset, read_buf.as_mut()).await.expect("read failed");
2529 assert_eq!(&read_buf.as_slice(), &[95; 4096]);
2530 }
2531
2532 assert_eq!(object.get_size(), 12288);
2533
2534 let fsck_options = FsckOptions {
2536 fail_on_warning: true,
2537 no_lock: true,
2538 on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2539 ..Default::default()
2540 };
2541 fsck_with_options(fs.clone(), &fsck_options).await.expect("fsck failed");
2542
2543 fs.close().await.expect("Close failed");
2544 }
2545
2546 #[fuchsia::test]
2547 async fn test_enable_verity() {
2548 let fs: OpenFxFilesystem = test_filesystem().await;
2549 let mut transaction = fs
2550 .clone()
2551 .new_transaction(lock_keys![], Options::default())
2552 .await
2553 .expect("new_transaction failed");
2554 let store = fs.root_store();
2555 let object = Arc::new(
2556 ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2557 .await
2558 .expect("create_object failed"),
2559 );
2560
2561 transaction.commit().await.unwrap();
2562
2563 object
2564 .enable_verity(fio::VerificationOptions {
2565 hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2566 salt: Some(vec![]),
2567 ..Default::default()
2568 })
2569 .await
2570 .expect("set verified file metadata failed");
2571
2572 let handle =
2573 ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2574 .await
2575 .expect("open_object failed");
2576
2577 assert!(handle.is_verified_file());
2578
2579 fs.close().await.expect("Close failed");
2580 }
2581
2582 #[fuchsia::test]
2583 async fn test_enable_verity_large_file() {
2584 let device = DeviceHolder::new(FakeDevice::new(262144, TEST_DEVICE_BLOCK_SIZE));
2586 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2587 let root_store = fs.root_store();
2588 let mut transaction = fs
2589 .clone()
2590 .new_transaction(lock_keys![], Options::default())
2591 .await
2592 .expect("new_transaction failed");
2593
2594 let handle = ObjectStore::create_object(
2595 &root_store,
2596 &mut transaction,
2597 HandleOptions::default(),
2598 None,
2599 )
2600 .await
2601 .expect("failed to create object");
2602 transaction.commit().await.expect("commit failed");
2603 let mut offset = 0;
2604
2605 let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2607 buf.as_mut_slice().fill(1);
2608 for _ in 0..130 {
2609 handle.write_or_append(Some(offset), buf.as_ref()).await.expect("write failed");
2610 offset += WRITE_ATTR_BATCH_SIZE as u64;
2611 }
2612
2613 handle
2614 .enable_verity(fio::VerificationOptions {
2615 hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2616 salt: Some(vec![]),
2617 ..Default::default()
2618 })
2619 .await
2620 .expect("set verified file metadata failed");
2621
2622 let mut buf = handle.allocate_buffer(WRITE_ATTR_BATCH_SIZE).await;
2623 offset = 0;
2624 for _ in 0..130 {
2625 handle.read(offset, buf.as_mut()).await.expect("verification during read should fail");
2626 assert_eq!(buf.as_slice(), &[1; WRITE_ATTR_BATCH_SIZE]);
2627 offset += WRITE_ATTR_BATCH_SIZE as u64;
2628 }
2629
2630 fsck(fs.clone()).await.expect("fsck failed");
2631 fs.close().await.expect("Close failed");
2632 }
2633
2634 #[fuchsia::test]
2635 async fn test_retry_enable_verity_on_reboot() {
2636 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2637 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2638 let root_store = fs.root_store();
2639 let mut transaction = fs
2640 .clone()
2641 .new_transaction(lock_keys![], Options::default())
2642 .await
2643 .expect("new_transaction failed");
2644
2645 let handle = ObjectStore::create_object(
2646 &root_store,
2647 &mut transaction,
2648 HandleOptions::default(),
2649 None,
2650 )
2651 .await
2652 .expect("failed to create object");
2653 transaction.commit().await.expect("commit failed");
2654
2655 let object_id = {
2656 let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
2657 transaction.add(
2658 root_store.store_object_id(),
2659 Mutation::replace_or_insert_object(
2660 ObjectKey::graveyard_attribute_entry(
2661 root_store.graveyard_directory_object_id(),
2662 handle.object_id(),
2663 FSVERITY_MERKLE_ATTRIBUTE_ID,
2664 ),
2665 ObjectValue::Some,
2666 ),
2667 );
2668
2669 handle
2672 .write_new_attr_in_batches(
2673 &mut transaction,
2674 FSVERITY_MERKLE_ATTRIBUTE_ID,
2675 &vec![0; 2 * WRITE_ATTR_BATCH_SIZE],
2676 WRITE_ATTR_BATCH_SIZE,
2677 )
2678 .await
2679 .expect("failed to write merkle attribute");
2680
2681 handle.object_id()
2682 };
2685
2686 fs.close().await.expect("failed to close filesystem");
2687 let device = fs.take_device().await;
2688 device.reopen(false);
2689
2690 let fs =
2691 FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
2692 fsck(fs.clone()).await.expect("fsck failed");
2693 fs.close().await.expect("failed to close filesystem");
2694 let device = fs.take_device().await;
2695 device.reopen(false);
2696
2697 let fs = FxFilesystem::open(device).await.expect("open failed");
2699 let root_store = fs.root_store();
2700 let handle =
2701 ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
2702 .await
2703 .expect("open_object failed");
2704 handle
2705 .enable_verity(fio::VerificationOptions {
2706 hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2707 salt: Some(vec![]),
2708 ..Default::default()
2709 })
2710 .await
2711 .expect("set verified file metadata failed");
2712
2713 fs.graveyard().flush().await;
2718 assert_eq!(
2719 handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
2720 Some(vec![0; <Sha256 as Hasher>::Digest::DIGEST_LEN].into())
2721 );
2722 fsck(fs.clone()).await.expect("fsck failed");
2723 fs.close().await.expect("Close failed");
2724 }
2725
2726 #[fuchsia::test]
2727 async fn test_verify_data_corrupt_file() {
2728 let fs: OpenFxFilesystem = test_filesystem().await;
2729 let mut transaction = fs
2730 .clone()
2731 .new_transaction(lock_keys![], Options::default())
2732 .await
2733 .expect("new_transaction failed");
2734 let store = fs.root_store();
2735 let object = Arc::new(
2736 ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2737 .await
2738 .expect("create_object failed"),
2739 );
2740
2741 transaction.commit().await.unwrap();
2742
2743 let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
2744 buf.as_mut_slice().fill(123);
2745 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2746
2747 object
2748 .enable_verity(fio::VerificationOptions {
2749 hash_algorithm: Some(fio::HashAlgorithm::Sha256),
2750 salt: Some(vec![]),
2751 ..Default::default()
2752 })
2753 .await
2754 .expect("set verified file metadata failed");
2755
2756 buf.as_mut_slice().fill(234);
2758 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2759 object.read(0, buf.as_mut()).await.expect_err("verification during read should fail");
2760
2761 fs.close().await.expect("Close failed");
2762 }
2763
2764 #[fuchsia::test]
2765 async fn test_extend() {
2766 let fs = test_filesystem().await;
2767 let handle;
2768 let mut transaction = fs
2769 .clone()
2770 .new_transaction(lock_keys![], Options::default())
2771 .await
2772 .expect("new_transaction failed");
2773 let store = fs.root_store();
2774 handle =
2775 ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2776 .await
2777 .expect("create_object failed");
2778
2779 const START_OFFSET: u64 = 2048 * 1024;
2783 handle
2784 .extend(&mut transaction, START_OFFSET..START_OFFSET + 5 * fs.block_size() as u64)
2785 .await
2786 .expect("extend failed");
2787 transaction.commit().await.expect("commit failed");
2788 let mut buf = handle.allocate_buffer(5 * fs.block_size() as usize).await;
2789 buf.as_mut_slice().fill(123);
2790 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2791 buf.as_mut_slice().fill(67);
2792 handle.read(0, buf.as_mut()).await.expect("read failed");
2793 assert_eq!(buf.as_slice(), &vec![123; 5 * fs.block_size() as usize]);
2794 fs.close().await.expect("Close failed");
2795 }
2796
2797 #[fuchsia::test]
2798 async fn test_truncate_deallocates_old_extents() {
2799 let (fs, object) = test_filesystem_and_object().await;
2800 let mut buf = object.allocate_buffer(5 * fs.block_size() as usize).await;
2801 buf.as_mut_slice().fill(0xaa);
2802 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2803
2804 let allocator = fs.allocator();
2805 let allocated_before = allocator.get_allocated_bytes();
2806 object.truncate(fs.block_size() as u64).await.expect("truncate failed");
2807 let allocated_after = allocator.get_allocated_bytes();
2808 assert!(
2809 allocated_after < allocated_before,
2810 "before = {} after = {}",
2811 allocated_before,
2812 allocated_after
2813 );
2814 fs.close().await.expect("Close failed");
2815 }
2816
2817 #[fuchsia::test]
2818 async fn test_truncate_zeroes_tail_block() {
2819 let (fs, object) = test_filesystem_and_object().await;
2820
2821 WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + 3).await.expect("truncate failed");
2822 WriteObjectHandle::truncate(&object, TEST_DATA_OFFSET + TEST_DATA.len() as u64)
2823 .await
2824 .expect("truncate failed");
2825
2826 let mut buf = object.allocate_buffer(fs.block_size() as usize).await;
2827 let offset = (TEST_DATA_OFFSET % fs.block_size()) as usize;
2828 object.read(TEST_DATA_OFFSET - offset as u64, buf.as_mut()).await.expect("read failed");
2829
2830 let mut expected = TEST_DATA.to_vec();
2831 expected[3..].fill(0);
2832 assert_eq!(&buf.as_slice()[offset..offset + expected.len()], &expected);
2833 }
2834
2835 #[fuchsia::test]
2836 async fn test_trim() {
2837 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2839 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2840 let block_size = fs.block_size();
2841 root_volume(fs.clone())
2842 .await
2843 .expect("root_volume failed")
2844 .new_volume("test", NO_OWNER, None)
2845 .await
2846 .expect("volume failed");
2847 fs.close().await.expect("close failed");
2848 let device = fs.take_device().await;
2849 device.reopen(false);
2850
2851 #[derive(Default)]
2856 struct Context {
2857 store: Option<Arc<ObjectStore>>,
2858 object_id: Option<u64>,
2859 }
2860 let shared_context = Arc::new(Mutex::new(Context::default()));
2861
2862 let object_size = (TRANSACTION_MUTATION_THRESHOLD as u64 + 10) * 2 * block_size;
2863
2864 async fn expect_tombstoned(store: &Arc<ObjectStore>, object_id: u64) {
2866 loop {
2867 if let Err(e) =
2868 ObjectStore::open_object(store, object_id, HandleOptions::default(), None).await
2869 {
2870 assert!(
2871 FxfsError::NotFound.matches(&e),
2872 "open_object didn't fail with NotFound: {:?}",
2873 e
2874 );
2875 break;
2876 }
2877 fasync::Timer::new(std::time::Duration::from_millis(100)).await;
2879 }
2880 }
2881
2882 async fn needs_trim(store: &Arc<ObjectStore>) -> Option<DataObjectHandle<ObjectStore>> {
2884 let root_directory = Directory::open(store, store.root_directory_object_id())
2885 .await
2886 .expect("open failed");
2887 let oid = root_directory.lookup("foo").await.expect("lookup failed");
2888 if let Some((oid, _)) = oid {
2889 let object = ObjectStore::open_object(store, oid, HandleOptions::default(), None)
2890 .await
2891 .expect("open_object failed");
2892 let props = object.get_properties().await.expect("get_properties failed");
2893 if props.allocated_size > 0 && props.data_attribute_size == 0 {
2894 Some(object)
2895 } else {
2896 None
2897 }
2898 } else {
2899 None
2900 }
2901 }
2902
2903 let shared_context_clone = shared_context.clone();
2904 let post_commit = move || {
2905 let store = shared_context_clone.lock().store.as_ref().cloned().unwrap();
2906 let shared_context = shared_context_clone.clone();
2907 async move {
2908 let options = FsckOptions {
2910 fail_on_warning: true,
2911 no_lock: true,
2912 on_error: Box::new(|err| println!("fsck error: {:?}", err)),
2913 ..Default::default()
2914 };
2915 let fs = store.filesystem();
2916
2917 fsck_with_options(fs.clone(), &options).await.expect("fsck_with_options failed");
2918 fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
2919 .await
2920 .expect("fsck_volume_with_options failed");
2921
2922 fs.sync(SyncOptions { flush_device: true, ..Default::default() })
2924 .await
2925 .expect("sync failed");
2926 let device = fs.device().snapshot().expect("snapshot failed");
2927
2928 let object_id = shared_context.lock().object_id.clone();
2929
2930 let fs2 = FxFilesystemBuilder::new()
2931 .skip_initial_reap(object_id.is_none())
2932 .open(device)
2933 .await
2934 .expect("open failed");
2935
2936 let root_vol = root_volume(fs2.clone()).await.expect("root_volume failed");
2938 let store = root_vol.volume("test", NO_OWNER, None).await.expect("volume failed");
2939
2940 if let Some(oid) = object_id {
2941 expect_tombstoned(&store, oid).await;
2943 } else if let Some(object) = needs_trim(&store).await {
2944 object.truncate(object_size).await.expect("truncate failed");
2946 let mut buf = object.allocate_buffer(block_size as usize).await;
2947 object
2948 .read(object_size - block_size * 2, buf.as_mut())
2949 .await
2950 .expect("read failed");
2951 assert_eq!(buf.as_slice(), &vec![0; block_size as usize]);
2952
2953 let fs = FxFilesystem::open(fs.device().snapshot().expect("snapshot failed"))
2956 .await
2957 .expect("open failed");
2958 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2959 let store =
2960 root_vol.volume("test", NO_OWNER, None).await.expect("volume failed");
2961 while needs_trim(&store).await.is_some() {
2962 fasync::Timer::new(std::time::Duration::from_millis(100)).await;
2965 }
2966
2967 fsck_with_options(fs.clone(), &options)
2969 .await
2970 .expect("fsck_with_options failed");
2971 fsck_volume_with_options(fs.as_ref(), &options, store.store_object_id(), None)
2972 .await
2973 .expect("fsck_volume_with_options failed");
2974 fs.close().await.expect("close failed");
2975 }
2976
2977 fsck_with_options(fs2.clone(), &options).await.expect("fsck_with_options failed");
2979 fsck_volume_with_options(fs2.as_ref(), &options, store.store_object_id(), None)
2980 .await
2981 .expect("fsck_volume_with_options failed");
2982 fs2.close().await.expect("close failed");
2983 }
2984 .boxed()
2985 };
2986
2987 let fs = FxFilesystemBuilder::new()
2988 .post_commit_hook(post_commit)
2989 .open(device)
2990 .await
2991 .expect("open failed");
2992
2993 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2994 let store = root_vol.volume("test", NO_OWNER, None).await.expect("volume failed");
2995
2996 shared_context.lock().store = Some(store.clone());
2997
2998 let root_directory =
2999 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3000
3001 let object;
3002 let mut transaction = fs
3003 .clone()
3004 .new_transaction(
3005 lock_keys![LockKey::object(
3006 store.store_object_id(),
3007 store.root_directory_object_id()
3008 )],
3009 Options::default(),
3010 )
3011 .await
3012 .expect("new_transaction failed");
3013 object = root_directory
3014 .create_child_file(&mut transaction, "foo")
3015 .await
3016 .expect("create_object failed");
3017 transaction.commit().await.expect("commit failed");
3018
3019 let mut transaction = fs
3020 .clone()
3021 .new_transaction(
3022 lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3023 Options::default(),
3024 )
3025 .await
3026 .expect("new_transaction failed");
3027
3028 let mut pass = 0;
3031 loop {
3032 let mut buf = object.allocate_buffer(5).await;
3035 buf.as_mut_slice().fill(1);
3036 for offset in (0..object_size).into_iter().step_by(2 * block_size as usize) {
3038 object
3039 .txn_write(&mut transaction, offset, buf.as_ref())
3040 .await
3041 .expect("write failed");
3042 }
3043 transaction.commit().await.expect("commit failed");
3044 WriteObjectHandle::truncate(&object, 0).await.expect("truncate failed");
3046
3047 if pass == 1 {
3048 break;
3049 }
3050
3051 shared_context.lock().object_id = Some(object.object_id());
3054
3055 transaction = fs
3056 .clone()
3057 .new_transaction(
3058 lock_keys![
3059 LockKey::object(store.store_object_id(), store.root_directory_object_id()),
3060 LockKey::object(store.store_object_id(), object.object_id()),
3061 ],
3062 Options::default(),
3063 )
3064 .await
3065 .expect("new_transaction failed");
3066
3067 replace_child(&mut transaction, None, (&root_directory, "foo"))
3069 .await
3070 .expect("replace_child failed");
3071 store.add_to_graveyard(&mut transaction, object.object_id());
3072
3073 pass += 1;
3074 }
3075
3076 fs.close().await.expect("Close failed");
3077 }
3078
3079 #[fuchsia::test]
3080 async fn test_adjust_refs() {
3081 let (fs, object) = test_filesystem_and_object().await;
3082 let store = object.owner();
3083 let mut transaction = fs
3084 .clone()
3085 .new_transaction(
3086 lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3087 Options::default(),
3088 )
3089 .await
3090 .expect("new_transaction failed");
3091 assert_eq!(
3092 store
3093 .adjust_refs(&mut transaction, object.object_id(), 1)
3094 .await
3095 .expect("adjust_refs failed"),
3096 false
3097 );
3098 transaction.commit().await.expect("commit failed");
3099
3100 let allocator = fs.allocator();
3101 let allocated_before = allocator.get_allocated_bytes();
3102 let mut transaction = fs
3103 .clone()
3104 .new_transaction(
3105 lock_keys![LockKey::object(store.store_object_id(), object.object_id())],
3106 Options::default(),
3107 )
3108 .await
3109 .expect("new_transaction failed");
3110 assert_eq!(
3111 store
3112 .adjust_refs(&mut transaction, object.object_id(), -2)
3113 .await
3114 .expect("adjust_refs failed"),
3115 true
3116 );
3117 transaction.commit().await.expect("commit failed");
3118
3119 assert_eq!(allocator.get_allocated_bytes(), allocated_before);
3120
3121 store
3122 .tombstone_object(
3123 object.object_id(),
3124 Options { borrow_metadata_space: true, ..Default::default() },
3125 )
3126 .await
3127 .expect("purge failed");
3128
3129 assert_eq!(allocated_before - allocator.get_allocated_bytes(), fs.block_size() as u64);
3130
3131 {
3133 let mut transaction = fs
3134 .clone()
3135 .new_transaction(
3136 lock_keys![LockKey::object(
3137 store.store_object_id(),
3138 store.root_directory_object_id()
3139 )],
3140 Options::default(),
3141 )
3142 .await
3143 .expect("new_transaction failed");
3144 let root_directory = Directory::open(&store, store.root_directory_object_id())
3145 .await
3146 .expect("open failed");
3147 transaction.add(
3148 store.store_object_id(),
3149 Mutation::replace_or_insert_object(
3150 ObjectKey::child(root_directory.object_id(), TEST_OBJECT_NAME, false),
3151 ObjectValue::None,
3152 ),
3153 );
3154 transaction.commit().await.expect("commit failed");
3155 }
3156
3157 fsck_with_options(
3158 fs.clone(),
3159 &FsckOptions {
3160 fail_on_warning: true,
3161 on_error: Box::new(|err| println!("fsck error: {:?}", err)),
3162 ..Default::default()
3163 },
3164 )
3165 .await
3166 .expect("fsck_with_options failed");
3167
3168 fs.close().await.expect("Close failed");
3169 }
3170
3171 #[fuchsia::test]
3172 async fn test_locks() {
3173 let (fs, object) = test_filesystem_and_object().await;
3174 let (send1, recv1) = channel();
3175 let (send2, recv2) = channel();
3176 let (send3, recv3) = channel();
3177 let done = Mutex::new(false);
3178 let mut futures = FuturesUnordered::new();
3179 futures.push(
3180 async {
3181 let mut t = object.new_transaction().await.expect("new_transaction failed");
3182 send1.send(()).unwrap(); send3.send(()).unwrap(); recv2.await.unwrap();
3185 let mut buf = object.allocate_buffer(5).await;
3186 buf.as_mut_slice().copy_from_slice(b"hello");
3187 object.txn_write(&mut t, 0, buf.as_ref()).await.expect("write failed");
3188 fasync::Timer::new(Duration::from_millis(100)).await;
3190 assert!(!*done.lock());
3191 t.commit().await.expect("commit failed");
3192 }
3193 .boxed(),
3194 );
3195 futures.push(
3196 async {
3197 recv1.await.unwrap();
3198 let offset = TEST_DATA_OFFSET as usize;
3200 let align = offset % fs.block_size() as usize;
3201 let len = TEST_DATA.len();
3202 let mut buf = object.allocate_buffer(align + len).await;
3203 assert_eq!(
3204 object.read((offset - align) as u64, buf.as_mut()).await.expect("read failed"),
3205 align + TEST_DATA.len()
3206 );
3207 assert_eq!(&buf.as_slice()[align..], TEST_DATA);
3208 send2.send(()).unwrap();
3210 }
3211 .boxed(),
3212 );
3213 futures.push(
3214 async {
3215 recv3.await.unwrap();
3217 let _t = object.new_transaction().await.expect("new_transaction failed");
3218 let mut buf = object.allocate_buffer(5).await;
3219 assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 5);
3220 assert_eq!(buf.as_slice(), b"hello");
3221 }
3222 .boxed(),
3223 );
3224 while let Some(()) = futures.next().await {}
3225 fs.close().await.expect("Close failed");
3226 }
3227
3228 #[fuchsia::test(threads = 10)]
3229 async fn test_racy_reads() {
3230 let fs = test_filesystem().await;
3231 let object;
3232 let mut transaction = fs
3233 .clone()
3234 .new_transaction(lock_keys![], Options::default())
3235 .await
3236 .expect("new_transaction failed");
3237 let store = fs.root_store();
3238 object = Arc::new(
3239 ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3240 .await
3241 .expect("create_object failed"),
3242 );
3243 transaction.commit().await.expect("commit failed");
3244 for _ in 0..100 {
3245 let cloned_object = object.clone();
3246 let writer = fasync::Task::spawn(async move {
3247 let mut buf = cloned_object.allocate_buffer(10).await;
3248 buf.as_mut_slice().fill(123);
3249 cloned_object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3250 });
3251 let cloned_object = object.clone();
3252 let reader = fasync::Task::spawn(async move {
3253 let wait_time = rand::thread_rng().gen_range(0..5);
3254 fasync::Timer::new(Duration::from_millis(wait_time)).await;
3255 let mut buf = cloned_object.allocate_buffer(10).await;
3256 buf.as_mut_slice().fill(23);
3257 let amount = cloned_object.read(0, buf.as_mut()).await.expect("write failed");
3258 if amount != 0 {
3263 assert_eq!(amount, 10);
3264 assert_eq!(buf.as_slice(), &[123; 10]);
3265 }
3266 });
3267 writer.await;
3268 reader.await;
3269 object.truncate(0).await.expect("truncate failed");
3270 }
3271 fs.close().await.expect("Close failed");
3272 }
3273
3274 #[fuchsia::test]
3275 async fn test_allocated_size() {
3276 let (fs, object) = test_filesystem_and_object_with_key(None, true).await;
3277
3278 let before = object.get_properties().await.expect("get_properties failed").allocated_size;
3279 let mut buf = object.allocate_buffer(5).await;
3280 buf.as_mut_slice().copy_from_slice(b"hello");
3281 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3282 let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3283 assert_eq!(after, before + fs.block_size() as u64);
3284
3285 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3287 assert_eq!(
3288 object.get_properties().await.expect("get_properties failed").allocated_size,
3289 after
3290 );
3291
3292 let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3294 let offset = 1000 * fs.block_size() as u64;
3295 let before = after;
3296 object
3297 .extend(&mut transaction, offset..offset + fs.block_size() as u64)
3298 .await
3299 .expect("extend failed");
3300 transaction.commit().await.expect("commit failed");
3301 let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3302 assert_eq!(after, before + fs.block_size() as u64);
3303
3304 let before = after;
3306 let size = object.get_size();
3307 object.truncate(size - fs.block_size() as u64).await.expect("extend failed");
3308 let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3309 assert_eq!(after, before - fs.block_size() as u64);
3310
3311 let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3313 let before = after;
3314 let mut file_range = offset..offset + fs.block_size() as u64;
3315 object.preallocate_range(&mut transaction, &mut file_range).await.expect("extend failed");
3316 transaction.commit().await.expect("commit failed");
3317 let after = object.get_properties().await.expect("get_properties failed").allocated_size;
3318 assert_eq!(after, before + fs.block_size() as u64);
3319 fs.close().await.expect("Close failed");
3320 }
3321
3322 #[fuchsia::test(threads = 10)]
3323 async fn test_zero() {
3324 let (fs, object) = test_filesystem_and_object().await;
3325 let expected_size = object.get_size();
3326 let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3327 object.zero(&mut transaction, 0..fs.block_size() as u64 * 10).await.expect("zero failed");
3328 transaction.commit().await.expect("commit failed");
3329 assert_eq!(object.get_size(), expected_size);
3330 let mut buf = object.allocate_buffer(fs.block_size() as usize * 10).await;
3331 assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed") as u64, expected_size);
3332 assert_eq!(
3333 &buf.as_slice()[0..expected_size as usize],
3334 vec![0u8; expected_size as usize].as_slice()
3335 );
3336 fs.close().await.expect("Close failed");
3337 }
3338
3339 #[fuchsia::test]
3340 async fn test_properties() {
3341 let (fs, object) = test_filesystem_and_object().await;
3342 const CRTIME: Timestamp = Timestamp::from_nanos(1234);
3343 const MTIME: Timestamp = Timestamp::from_nanos(5678);
3344 const CTIME: Timestamp = Timestamp::from_nanos(8765);
3345
3346 let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3349 object
3350 .update_attributes(
3351 &mut transaction,
3352 Some(&fio::MutableNodeAttributes {
3353 creation_time: Some(CRTIME.as_nanos()),
3354 modification_time: Some(MTIME.as_nanos()),
3355 mode: Some(111),
3356 gid: Some(222),
3357 ..Default::default()
3358 }),
3359 None,
3360 )
3361 .await
3362 .expect("update_attributes failed");
3363 const MTIME_NEW: Timestamp = Timestamp::from_nanos(12345678);
3364 object
3365 .update_attributes(
3366 &mut transaction,
3367 Some(&fio::MutableNodeAttributes {
3368 modification_time: Some(MTIME_NEW.as_nanos()),
3369 gid: Some(333),
3370 rdev: Some(444),
3371 ..Default::default()
3372 }),
3373 Some(CTIME),
3374 )
3375 .await
3376 .expect("update_timestamps failed");
3377 transaction.commit().await.expect("commit failed");
3378
3379 let properties = object.get_properties().await.expect("get_properties failed");
3380 assert_matches!(
3381 properties,
3382 ObjectProperties {
3383 refs: 1u64,
3384 allocated_size: TEST_OBJECT_ALLOCATED_SIZE,
3385 data_attribute_size: TEST_OBJECT_SIZE,
3386 creation_time: CRTIME,
3387 modification_time: MTIME_NEW,
3388 posix_attributes: Some(PosixAttributes { mode: 111, gid: 333, rdev: 444, .. }),
3389 change_time: CTIME,
3390 ..
3391 }
3392 );
3393 fs.close().await.expect("Close failed");
3394 }
3395
3396 #[fuchsia::test]
3397 async fn test_is_allocated() {
3398 let (fs, object) = test_filesystem_and_object().await;
3399
3400 let aligned_offset = round_down(TEST_DATA_OFFSET, fs.block_size());
3403 let aligned_length = round_up(TEST_DATA.len() as u64, fs.block_size()).unwrap();
3404
3405 let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3409 assert_eq!(count, aligned_offset);
3410 assert_eq!(allocated, false);
3411
3412 let (allocated, count) =
3413 object.is_allocated(aligned_offset).await.expect("is_allocated failed");
3414 assert_eq!(count, aligned_length);
3415 assert_eq!(allocated, true);
3416
3417 let end = aligned_offset + aligned_length;
3419 object
3420 .is_allocated(end)
3421 .await
3422 .expect_err("is_allocated should have returned ERR_OUT_OF_RANGE");
3423
3424 let size = 50 * fs.block_size() as u64;
3427 object.truncate(size).await.expect("extend failed");
3428
3429 let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3430 assert_eq!(count, size - end);
3431 assert_eq!(allocated, false);
3432
3433 let buf_length = 5 * fs.block_size();
3436 let mut buf = object.allocate_buffer(buf_length as usize).await;
3437 buf.as_mut_slice().fill(123);
3438 let new_offset = end + 20 * fs.block_size() as u64;
3439 object.write_or_append(Some(new_offset), buf.as_ref()).await.expect("write failed");
3440 object
3441 .write_or_append(Some(new_offset + buf_length), buf.as_ref())
3442 .await
3443 .expect("write failed");
3444
3445 let (allocated, count) = object.is_allocated(end).await.expect("is_allocated failed");
3446 assert_eq!(count, new_offset - end);
3447 assert_eq!(allocated, false);
3448
3449 let (allocated, count) =
3450 object.is_allocated(new_offset).await.expect("is_allocated failed");
3451 assert_eq!(count, 2 * buf_length);
3452 assert_eq!(allocated, true);
3453
3454 let (allocated, count) = object
3456 .is_allocated(new_offset + 4 * fs.block_size())
3457 .await
3458 .expect("is_allocated failed");
3459 assert_eq!(count, 2 * buf_length - 4 * fs.block_size());
3460 assert_eq!(allocated, true);
3461
3462 let other_buf_length = 3 * fs.block_size();
3466 let mut other_buf = object.allocate_buffer(other_buf_length as usize).await;
3467 other_buf.as_mut_slice().fill(231);
3468 object.write_or_append(Some(new_offset), other_buf.as_ref()).await.expect("write failed");
3469
3470 let (allocated, count) =
3473 object.is_allocated(new_offset).await.expect("is_allocated failed");
3474 assert_eq!(count, 2 * buf_length);
3475 assert_eq!(allocated, true);
3476
3477 let mut transaction = object.new_transaction().await.expect("new_transaction failed");
3481 object
3482 .zero(&mut transaction, aligned_offset..aligned_offset + aligned_length)
3483 .await
3484 .expect("zero failed");
3485 object
3487 .zero(&mut transaction, new_offset..new_offset + buf_length)
3488 .await
3489 .expect("zero failed");
3490 transaction.commit().await.expect("commit transaction failed");
3491
3492 let (allocated, count) = object.is_allocated(0).await.expect("is_allocated failed");
3493 assert_eq!(count, new_offset + buf_length);
3494 assert_eq!(allocated, false);
3495
3496 let (allocated, count) =
3497 object.is_allocated(new_offset + buf_length).await.expect("is_allocated failed");
3498 assert_eq!(count, buf_length);
3499 assert_eq!(allocated, true);
3500
3501 let new_end = new_offset + buf_length + count;
3502
3503 let store = object.owner();
3507 let mut transaction = fs
3508 .clone()
3509 .new_transaction(lock_keys![], Options::default())
3510 .await
3511 .expect("new_transaction failed");
3512 let object2 =
3513 ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3514 .await
3515 .expect("create_object failed");
3516 transaction.commit().await.expect("commit failed");
3517
3518 object2
3519 .write_or_append(Some(new_end + fs.block_size()), buf.as_ref())
3520 .await
3521 .expect("write failed");
3522
3523 let (allocated, count) = object.is_allocated(new_end).await.expect("is_allocated failed");
3525 assert_eq!(count, size - new_end);
3526 assert_eq!(allocated, false);
3527
3528 fs.close().await.expect("close failed");
3529 }
3530
3531 #[fuchsia::test(threads = 10)]
3532 async fn test_read_write_attr() {
3533 let (_fs, object) = test_filesystem_and_object().await;
3534 let data = [0xffu8; 16_384];
3535 object.write_attr(20, &data).await.expect("write_attr failed");
3536 let rdata =
3537 object.read_attr(20).await.expect("read_attr failed").expect("no attribute data found");
3538 assert_eq!(&data[..], &rdata[..]);
3539
3540 assert_eq!(object.read_attr(21).await.expect("read_attr failed"), None);
3541 }
3542
3543 #[fuchsia::test(threads = 10)]
3544 async fn test_allocate_basic() {
3545 let (fs, object) = test_filesystem_and_empty_object().await;
3546 let block_size = fs.block_size();
3547 let file_size = block_size * 10;
3548 object.truncate(file_size).await.unwrap();
3549
3550 let small_buf_size = 1024;
3551 let large_buf_aligned_size = block_size as usize * 2;
3552 let large_buf_size = block_size as usize * 2 + 1024;
3553
3554 let mut small_buf = object.allocate_buffer(small_buf_size).await;
3555 let mut large_buf_aligned = object.allocate_buffer(large_buf_aligned_size).await;
3556 let mut large_buf = object.allocate_buffer(large_buf_size).await;
3557
3558 assert_eq!(object.read(0, small_buf.as_mut()).await.unwrap(), small_buf_size);
3559 assert_eq!(small_buf.as_slice(), &vec![0; small_buf_size]);
3560 assert_eq!(object.read(0, large_buf.as_mut()).await.unwrap(), large_buf_size);
3561 assert_eq!(large_buf.as_slice(), &vec![0; large_buf_size]);
3562 assert_eq!(
3563 object.read(0, large_buf_aligned.as_mut()).await.unwrap(),
3564 large_buf_aligned_size
3565 );
3566 assert_eq!(large_buf_aligned.as_slice(), &vec![0; large_buf_aligned_size]);
3567
3568 object.allocate(block_size..block_size * 3).await.unwrap();
3570
3571 for (buf_index, buf) in [small_buf, large_buf, large_buf_aligned].iter_mut().enumerate() {
3573 for offset in 0..4 {
3574 assert_eq!(
3575 object.read(block_size * offset, buf.as_mut()).await.unwrap(),
3576 buf.len(),
3577 "buf_index: {}, read offset: {}",
3578 buf_index,
3579 offset,
3580 );
3581 assert_eq!(
3582 buf.as_slice(),
3583 &vec![0; buf.len()],
3584 "buf_index: {}, read offset: {}",
3585 buf_index,
3586 offset,
3587 );
3588 }
3589 }
3590
3591 fs.close().await.expect("close failed");
3592 }
3593
3594 #[fuchsia::test(threads = 10)]
3595 async fn test_allocate_extends_file() {
3596 const BUF_SIZE: usize = 1024;
3597 let (fs, object) = test_filesystem_and_empty_object().await;
3598 let mut buf = object.allocate_buffer(BUF_SIZE).await;
3599 let block_size = fs.block_size();
3600
3601 assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
3602 assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3603
3604 assert!(TEST_OBJECT_SIZE < block_size * 4);
3605 object.allocate(0..block_size * 4).await.unwrap();
3607 assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
3608 assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3609 assert_eq!(object.read(block_size, buf.as_mut()).await.unwrap(), buf.len());
3610 assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3611 assert_eq!(object.read(block_size * 3, buf.as_mut()).await.unwrap(), buf.len());
3612 assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3613
3614 fs.close().await.expect("close failed");
3615 }
3616
3617 #[fuchsia::test(threads = 10)]
3618 async fn test_allocate_past_end() {
3619 const BUF_SIZE: usize = 1024;
3620 let (fs, object) = test_filesystem_and_empty_object().await;
3621 let mut buf = object.allocate_buffer(BUF_SIZE).await;
3622 let block_size = fs.block_size();
3623
3624 assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
3625 assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3626
3627 assert!(TEST_OBJECT_SIZE < block_size * 4);
3628 object.allocate(block_size * 4..block_size * 6).await.unwrap();
3630 assert_eq!(object.read(0, buf.as_mut()).await.unwrap(), buf.len());
3631 assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3632 assert_eq!(object.read(block_size * 4, buf.as_mut()).await.unwrap(), buf.len());
3633 assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3634 assert_eq!(object.read(block_size * 5, buf.as_mut()).await.unwrap(), buf.len());
3635 assert_eq!(buf.as_slice(), &[0; BUF_SIZE]);
3636
3637 fs.close().await.expect("close failed");
3638 }
3639
3640 #[fuchsia::test(threads = 10)]
3641 async fn test_allocate_read_attr() {
3642 let (fs, object) = test_filesystem_and_empty_object().await;
3643 let block_size = fs.block_size();
3644 let file_size = block_size * 4;
3645 object.truncate(file_size).await.unwrap();
3646
3647 let content = object
3648 .read_attr(object.attribute_id())
3649 .await
3650 .expect("failed to read attr")
3651 .expect("attr returned none");
3652 assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
3653
3654 object.allocate(block_size..block_size * 3).await.unwrap();
3655
3656 let content = object
3657 .read_attr(object.attribute_id())
3658 .await
3659 .expect("failed to read attr")
3660 .expect("attr returned none");
3661 assert_eq!(content.as_ref(), &vec![0; file_size as usize]);
3662
3663 fs.close().await.expect("close failed");
3664 }
3665
3666 #[fuchsia::test(threads = 10)]
3667 async fn test_allocate_existing_data() {
3668 struct Case {
3669 written_ranges: Vec<Range<usize>>,
3670 allocate_range: Range<u64>,
3671 }
3672 let cases = [
3673 Case { written_ranges: vec![4..7], allocate_range: 4..7 },
3674 Case { written_ranges: vec![4..7], allocate_range: 3..8 },
3675 Case { written_ranges: vec![4..7], allocate_range: 5..6 },
3676 Case { written_ranges: vec![4..7], allocate_range: 5..8 },
3677 Case { written_ranges: vec![4..7], allocate_range: 3..5 },
3678 Case { written_ranges: vec![0..1, 2..3, 4..5, 6..7, 8..9], allocate_range: 0..10 },
3679 Case { written_ranges: vec![0..2, 4..6, 7..10], allocate_range: 1..8 },
3680 ];
3681
3682 for case in cases {
3683 let (fs, object) = test_filesystem_and_empty_object().await;
3684 let block_size = fs.block_size();
3685 let file_size = block_size * 10;
3686 object.truncate(file_size).await.unwrap();
3687
3688 for write in &case.written_ranges {
3689 let write_len = (write.end - write.start) * block_size as usize;
3690 let mut write_buf = object.allocate_buffer(write_len).await;
3691 write_buf.as_mut_slice().fill(0xff);
3692 assert_eq!(
3693 object
3694 .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
3695 .await
3696 .unwrap(),
3697 file_size
3698 );
3699 }
3700
3701 let mut expected_buf = object.allocate_buffer(file_size as usize).await;
3702 assert_eq!(object.read(0, expected_buf.as_mut()).await.unwrap(), expected_buf.len());
3703
3704 object
3705 .allocate(
3706 case.allocate_range.start * block_size..case.allocate_range.end * block_size,
3707 )
3708 .await
3709 .unwrap();
3710
3711 let mut read_buf = object.allocate_buffer(file_size as usize).await;
3712 assert_eq!(object.read(0, read_buf.as_mut()).await.unwrap(), read_buf.len());
3713 assert_eq!(read_buf.as_slice(), expected_buf.as_slice());
3714
3715 fs.close().await.expect("close failed");
3716 }
3717 }
3718
3719 async fn get_modes(
3720 obj: &DataObjectHandle<ObjectStore>,
3721 mut search_range: Range<u64>,
3722 ) -> Vec<(Range<u64>, ExtentMode)> {
3723 let mut modes = Vec::new();
3724 let store = obj.store();
3725 let tree = store.tree();
3726 let layer_set = tree.layer_set();
3727 let mut merger = layer_set.merger();
3728 let mut iter = merger
3729 .query(Query::FullRange(&ObjectKey::attribute(
3730 obj.object_id(),
3731 0,
3732 AttributeKey::Extent(ExtentKey::search_key_from_offset(search_range.start)),
3733 )))
3734 .await
3735 .unwrap();
3736 loop {
3737 match iter.get() {
3738 Some(ItemRef {
3739 key:
3740 ObjectKey {
3741 object_id,
3742 data:
3743 ObjectKeyData::Attribute(
3744 attribute_id,
3745 AttributeKey::Extent(ExtentKey { range }),
3746 ),
3747 },
3748 value: ObjectValue::Extent(ExtentValue::Some { mode, .. }),
3749 ..
3750 }) if *object_id == obj.object_id() && *attribute_id == 0 => {
3751 if search_range.end <= range.start {
3752 break;
3753 }
3754 let found_range = std::cmp::max(search_range.start, range.start)
3755 ..std::cmp::min(search_range.end, range.end);
3756 search_range.start = found_range.end;
3757 modes.push((found_range, mode.clone()));
3758 if search_range.start == search_range.end {
3759 break;
3760 }
3761 iter.advance().await.unwrap();
3762 }
3763 x => panic!("looking for extent record, found this {:?}", x),
3764 }
3765 }
3766 modes
3767 }
3768
3769 async fn assert_all_overwrite(
3770 obj: &DataObjectHandle<ObjectStore>,
3771 mut search_range: Range<u64>,
3772 ) {
3773 let modes = get_modes(obj, search_range.clone()).await;
3774 for mode in modes {
3775 assert_eq!(
3776 mode.0.start, search_range.start,
3777 "missing mode in range {}..{}",
3778 search_range.start, mode.0.start
3779 );
3780 match mode.1 {
3781 ExtentMode::Overwrite | ExtentMode::OverwritePartial(_) => (),
3782 m => panic!("mode at range {:?} was not overwrite, instead found {:?}", mode.0, m),
3783 }
3784 assert!(
3785 mode.0.end <= search_range.end,
3786 "mode ends beyond search range (bug in test) - search_range: {:?}, mode: {:?}",
3787 search_range,
3788 mode,
3789 );
3790 search_range.start = mode.0.end;
3791 }
3792 assert_eq!(
3793 search_range.start, search_range.end,
3794 "missing mode in range {:?}",
3795 search_range
3796 );
3797 }
3798
3799 #[fuchsia::test(threads = 10)]
3800 async fn test_multi_overwrite() {
3801 #[derive(Debug)]
3802 struct Case {
3803 pre_writes: Vec<Range<usize>>,
3804 allocate_ranges: Vec<Range<u64>>,
3805 overwrites: Vec<Vec<Range<u64>>>,
3806 }
3807 let cases = [
3808 Case {
3809 pre_writes: Vec::new(),
3810 allocate_ranges: vec![1..3],
3811 overwrites: vec![vec![1..3]],
3812 },
3813 Case {
3814 pre_writes: Vec::new(),
3815 allocate_ranges: vec![0..1, 1..2, 2..3, 3..4],
3816 overwrites: vec![vec![0..4]],
3817 },
3818 Case {
3819 pre_writes: Vec::new(),
3820 allocate_ranges: vec![0..4],
3821 overwrites: vec![vec![0..1], vec![1..2], vec![3..4]],
3822 },
3823 Case {
3824 pre_writes: Vec::new(),
3825 allocate_ranges: vec![0..4],
3826 overwrites: vec![vec![3..4]],
3827 },
3828 Case {
3829 pre_writes: Vec::new(),
3830 allocate_ranges: vec![0..4],
3831 overwrites: vec![vec![3..4], vec![2..3], vec![1..2]],
3832 },
3833 Case {
3834 pre_writes: Vec::new(),
3835 allocate_ranges: vec![1..2, 5..6, 7..8],
3836 overwrites: vec![vec![5..6]],
3837 },
3838 Case {
3839 pre_writes: Vec::new(),
3840 allocate_ranges: vec![1..3],
3841 overwrites: vec![
3842 vec![1..3],
3843 vec![1..3],
3844 vec![1..3],
3845 vec![1..3],
3846 vec![1..3],
3847 vec![1..3],
3848 vec![1..3],
3849 vec![1..3],
3850 ],
3851 },
3852 Case {
3853 pre_writes: Vec::new(),
3854 allocate_ranges: vec![0..5],
3855 overwrites: vec![
3856 vec![1..3],
3857 vec![1..3],
3858 vec![1..3],
3859 vec![1..3],
3860 vec![1..3],
3861 vec![1..3],
3862 vec![1..3],
3863 vec![1..3],
3864 ],
3865 },
3866 Case {
3867 pre_writes: Vec::new(),
3868 allocate_ranges: vec![0..5],
3869 overwrites: vec![vec![0..2, 2..4, 4..5]],
3870 },
3871 Case {
3872 pre_writes: Vec::new(),
3873 allocate_ranges: vec![0..5, 5..10],
3874 overwrites: vec![vec![1..2, 2..3, 4..7, 7..8]],
3875 },
3876 Case {
3877 pre_writes: Vec::new(),
3878 allocate_ranges: vec![0..4, 6..10],
3879 overwrites: vec![vec![2..3, 7..9]],
3880 },
3881 Case {
3882 pre_writes: Vec::new(),
3883 allocate_ranges: vec![0..10],
3884 overwrites: vec![vec![1..2, 5..10], vec![0..1, 5..10], vec![0..5, 5..10]],
3885 },
3886 Case {
3887 pre_writes: Vec::new(),
3888 allocate_ranges: vec![0..10],
3889 overwrites: vec![vec![0..2, 2..4, 4..6, 6..8, 8..10], vec![0..5, 5..10]],
3890 },
3891 Case {
3892 pre_writes: vec![1..3],
3893 allocate_ranges: vec![1..3],
3894 overwrites: vec![vec![1..3]],
3895 },
3896 Case {
3897 pre_writes: vec![1..3],
3898 allocate_ranges: vec![4..6],
3899 overwrites: vec![vec![5..6]],
3900 },
3901 Case {
3902 pre_writes: vec![1..3],
3903 allocate_ranges: vec![0..4],
3904 overwrites: vec![vec![0..4]],
3905 },
3906 Case {
3907 pre_writes: vec![1..3],
3908 allocate_ranges: vec![2..4],
3909 overwrites: vec![vec![2..4]],
3910 },
3911 Case {
3912 pre_writes: vec![3..5],
3913 allocate_ranges: vec![1..3, 6..7],
3914 overwrites: vec![vec![1..3, 6..7]],
3915 },
3916 Case {
3917 pre_writes: vec![1..3, 5..7, 8..9],
3918 allocate_ranges: vec![0..5],
3919 overwrites: vec![vec![0..2, 2..5], vec![0..5]],
3920 },
3921 Case {
3922 pre_writes: Vec::new(),
3923 allocate_ranges: vec![0..10, 4..6],
3924 overwrites: Vec::new(),
3925 },
3926 Case {
3927 pre_writes: Vec::new(),
3928 allocate_ranges: vec![3..8, 5..10],
3929 overwrites: Vec::new(),
3930 },
3931 Case {
3932 pre_writes: Vec::new(),
3933 allocate_ranges: vec![5..10, 3..8],
3934 overwrites: Vec::new(),
3935 },
3936 ];
3937
3938 for (i, case) in cases.into_iter().enumerate() {
3939 log::info!("running case {} - {:?}", i, case);
3940 let (fs, object) = test_filesystem_and_empty_object().await;
3941 let block_size = fs.block_size();
3942 let file_size = block_size * 10;
3943 object.truncate(file_size).await.unwrap();
3944
3945 for write in case.pre_writes {
3946 let write_len = (write.end - write.start) * block_size as usize;
3947 let mut write_buf = object.allocate_buffer(write_len).await;
3948 write_buf.as_mut_slice().fill(0xff);
3949 assert_eq!(
3950 object
3951 .write_or_append(Some(block_size * write.start as u64), write_buf.as_ref())
3952 .await
3953 .unwrap(),
3954 file_size
3955 );
3956 }
3957
3958 for allocate_range in &case.allocate_ranges {
3959 object
3960 .allocate(allocate_range.start * block_size..allocate_range.end * block_size)
3961 .await
3962 .unwrap();
3963 }
3964
3965 for allocate_range in case.allocate_ranges {
3966 assert_all_overwrite(
3967 &object,
3968 allocate_range.start * block_size..allocate_range.end * block_size,
3969 )
3970 .await;
3971 }
3972
3973 for overwrite in case.overwrites {
3974 let mut write_len = 0;
3975 let overwrite = overwrite
3976 .into_iter()
3977 .map(|r| {
3978 write_len += (r.end - r.start) * block_size;
3979 r.start * block_size..r.end * block_size
3980 })
3981 .collect::<Vec<_>>();
3982 let mut write_buf = object.allocate_buffer(write_len as usize).await;
3983 let data = (0..20).cycle().take(write_len as usize).collect::<Vec<_>>();
3984 write_buf.as_mut_slice().copy_from_slice(&data);
3985
3986 let mut expected_buf = object.allocate_buffer(file_size as usize).await;
3987 assert_eq!(
3988 object.read(0, expected_buf.as_mut()).await.unwrap(),
3989 expected_buf.len()
3990 );
3991 let expected_buf_slice = expected_buf.as_mut_slice();
3992 let mut data_slice = data.as_slice();
3993 for r in &overwrite {
3994 let len = r.length().unwrap() as usize;
3995 let (copy_from, rest) = data_slice.split_at(len);
3996 expected_buf_slice[r.start as usize..r.end as usize]
3997 .copy_from_slice(©_from);
3998 data_slice = rest;
3999 }
4000
4001 let mut transaction = object.new_transaction().await.unwrap();
4002 object
4003 .multi_overwrite(&mut transaction, 0, &overwrite, write_buf.as_mut())
4004 .await
4005 .unwrap_or_else(|_| panic!("multi_overwrite error on case {}", i));
4006 let mut checksummed_range_length = 0;
4009 let mut num_checksums = 0;
4010 for (device_range, checksums, _) in transaction.checksums() {
4011 let range_len = device_range.end - device_range.start;
4012 let checksums_len = checksums.len() as u64;
4013 assert_eq!(range_len / checksums_len, block_size);
4014 checksummed_range_length += range_len;
4015 num_checksums += checksums_len;
4016 }
4017 assert_eq!(checksummed_range_length, write_len);
4018 assert_eq!(num_checksums, write_len / block_size);
4019 transaction.commit().await.unwrap();
4020
4021 let mut buf = object.allocate_buffer(file_size as usize).await;
4022 assert_eq!(
4023 object.read(0, buf.as_mut()).await.unwrap(),
4024 buf.len(),
4025 "failed length check on case {}",
4026 i,
4027 );
4028 assert_eq!(buf.as_slice(), expected_buf.as_slice(), "failed on case {}", i);
4029 }
4030
4031 fsck_volume(&fs, object.store().store_object_id(), None).await.expect("fsck failed");
4032 fs.close().await.expect("close failed");
4033 }
4034 }
4035
4036 #[fuchsia::test(threads = 10)]
4037 async fn test_multi_overwrite_mode_updates() {
4038 let (fs, object) = test_filesystem_and_empty_object().await;
4039 let block_size = fs.block_size();
4040 let file_size = block_size * 10;
4041 object.truncate(file_size).await.unwrap();
4042
4043 let mut expected_bitmap = BitVec::from_elem(10, false);
4044
4045 object.allocate(0..10 * block_size).await.unwrap();
4046 assert_eq!(
4047 get_modes(&object, 0..10 * block_size).await,
4048 vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4049 );
4050
4051 let mut write_buf = object.allocate_buffer(2 * block_size as usize).await;
4052 let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4053 write_buf.as_mut_slice().copy_from_slice(&data);
4054 let mut transaction = object.new_transaction().await.unwrap();
4055 object
4056 .multi_overwrite(
4057 &mut transaction,
4058 0,
4059 &[2 * block_size..4 * block_size],
4060 write_buf.as_mut(),
4061 )
4062 .await
4063 .unwrap();
4064 transaction.commit().await.unwrap();
4065
4066 expected_bitmap.set(2, true);
4067 expected_bitmap.set(3, true);
4068 assert_eq!(
4069 get_modes(&object, 0..10 * block_size).await,
4070 vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4071 );
4072
4073 let mut write_buf = object.allocate_buffer(3 * block_size as usize).await;
4074 let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4075 write_buf.as_mut_slice().copy_from_slice(&data);
4076 let mut transaction = object.new_transaction().await.unwrap();
4077 object
4078 .multi_overwrite(
4079 &mut transaction,
4080 0,
4081 &[3 * block_size..5 * block_size, 6 * block_size..7 * block_size],
4082 write_buf.as_mut(),
4083 )
4084 .await
4085 .unwrap();
4086 transaction.commit().await.unwrap();
4087
4088 expected_bitmap.set(4, true);
4089 expected_bitmap.set(6, true);
4090 assert_eq!(
4091 get_modes(&object, 0..10 * block_size).await,
4092 vec![(0..10 * block_size, ExtentMode::OverwritePartial(expected_bitmap.clone()))]
4093 );
4094
4095 let mut write_buf = object.allocate_buffer(6 * block_size as usize).await;
4096 let data = (0..20).cycle().take(write_buf.len()).collect::<Vec<_>>();
4097 write_buf.as_mut_slice().copy_from_slice(&data);
4098 let mut transaction = object.new_transaction().await.unwrap();
4099 object
4100 .multi_overwrite(
4101 &mut transaction,
4102 0,
4103 &[
4104 0..2 * block_size,
4105 5 * block_size..6 * block_size,
4106 7 * block_size..10 * block_size,
4107 ],
4108 write_buf.as_mut(),
4109 )
4110 .await
4111 .unwrap();
4112 transaction.commit().await.unwrap();
4113
4114 assert_eq!(
4115 get_modes(&object, 0..10 * block_size).await,
4116 vec![(0..10 * block_size, ExtentMode::Overwrite)]
4117 );
4118
4119 fs.close().await.expect("close failed");
4120 }
4121}