1use crate::filesystem::TxnGuard;
8use crate::log::*;
9use crate::lsm_tree::types::{ItemRef, LayerIterator};
10use crate::lsm_tree::{LSMTree, layers_from_handles};
11use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
12use crate::object_store::extent_record::ExtentValue;
13use crate::object_store::object_manager::{ObjectManager, ReservationUpdate};
14use crate::object_store::object_record::{ObjectKey, ObjectValue};
15use crate::object_store::transaction::{AssociatedObject, LockKey, Mutation, lock_keys};
16use crate::object_store::{
17 AssocObj, DirectWriter, EncryptedMutations, HandleOptions, LockState,
18 MAX_ENCRYPTED_MUTATIONS_SIZE, ObjectStore, Options, StoreInfo,
19 layer_size_from_encrypted_mutations_size, tree,
20};
21use crate::serialized_types::{LATEST_VERSION, Version, VersionedLatest};
22use anyhow::{Context, Error, bail};
23use fxfs_crypto::{EncryptionKey, KeyPurpose};
24use once_cell::sync::OnceCell;
25use std::sync::atomic::Ordering;
26
27#[derive(Copy, Clone, Debug, PartialEq, Eq)]
28pub enum Reason {
29 Journal,
31
32 Unlock,
34}
35
36#[derive(Debug)]
41enum FlushResult<T> {
42 Ok(T),
43 CryptError(Error),
44}
45
46#[fxfs_trace::trace]
47impl ObjectStore {
48 #[trace("store_object_id" => self.store_object_id)]
49 pub async fn flush_with_reason(&self, reason: Reason) -> Result<Version, Error> {
50 const MAX_RETRIES: usize = 10;
58 let mut retries = 0;
59 loop {
60 match self.try_flush_with_reason(reason).await? {
61 FlushResult::Ok(version) => return Ok(version),
62 FlushResult::CryptError(error) => {
63 if reason == Reason::Unlock || retries >= MAX_RETRIES {
64 return Err(error);
67 }
68 log::warn!(
69 error:?;
70 "Flushing failed for store {}, re-locking and trying again.",
71 self.store_object_id()
72 );
73 let owner = self.lock_state.lock().owner();
74 if let Some(owner) = owner {
75 owner
76 .force_lock(&self)
77 .await
78 .context("Failed to re-lock store during flush")?;
79 } else {
80 bail!("No store owner was registered!");
81 }
82 }
83 }
84 retries += 1;
85 }
86 }
87
88 async fn try_flush_with_reason(&self, reason: Reason) -> Result<FlushResult<Version>, Error> {
89 if self.parent_store.is_none() {
90 return Ok(FlushResult::Ok(self.tree.get_earliest_version()));
92 }
93 let filesystem = self.filesystem();
94 let object_manager = filesystem.object_manager();
95
96 let txn_guard = filesystem.clone().txn_guard().await;
98 let keys = lock_keys![LockKey::flush(self.store_object_id())];
99 let _guard = Some(filesystem.lock_manager().write_lock(keys).await);
100
101 if matches!(*self.lock_state.lock(), LockState::Deleted) {
103 return Ok(FlushResult::Ok(LATEST_VERSION));
106 }
107
108 match reason {
109 Reason::Unlock => {
110 if self.store_info().unwrap().encrypted_mutations_object_id == INVALID_OBJECT_ID {
116 return Ok(FlushResult::Ok(self.tree.get_earliest_version()));
121 }
122 }
123 Reason::Journal => {
124 let earliest_version = self.tree.get_earliest_version();
127 if !object_manager.needs_flush(self.store_object_id)
128 && earliest_version == LATEST_VERSION
129 {
130 return Ok(FlushResult::Ok(earliest_version));
133 }
134 }
135 }
136
137 let trace = self.trace.load(Ordering::Relaxed);
138 if trace {
139 info!(store_id = self.store_object_id(); "OS: begin flush");
140 }
141
142 if matches!(&*self.lock_state.lock(), LockState::Locked) {
143 self.flush_locked(&txn_guard).await.with_context(|| {
144 format!("Failed to flush object store {}", self.store_object_id)
145 })?;
146 } else {
147 if let FlushResult::CryptError(error) = self
148 .flush_unlocked(&txn_guard)
149 .await
150 .with_context(|| format!("Failed to flush object store {}", self.store_object_id))?
151 {
152 return Ok(FlushResult::CryptError(error));
153 }
154 }
155
156 if trace {
157 info!(store_id = self.store_object_id(); "OS: end flush");
158 }
159 if let Some(callback) = &*self.flush_callback.lock() {
160 callback(self);
161 }
162
163 let mut counters = self.counters.lock();
164 counters.num_flushes += 1;
165 counters.last_flush_time = Some(std::time::SystemTime::now());
166 Ok(FlushResult::Ok(self.tree.get_earliest_version()))
168 }
169
170 async fn flush_unlocked(
172 &self,
173 txn_guard: &TxnGuard<'_>,
174 ) -> Result<FlushResult<Vec<u64>>, Error> {
175 let roll_mutations_key = self
176 .mutations_cipher
177 .lock()
178 .as_ref()
179 .map(|cipher| {
180 cipher.offset() >= self.filesystem().options().roll_metadata_key_byte_count
181 })
182 .unwrap_or(false);
183 if roll_mutations_key {
184 if let Err(error) = self.roll_mutations_key(self.crypt().unwrap().as_ref()).await {
185 log::warn!(
186 error:?; "Failed to roll mutations key for store {}", self.store_object_id());
187 return Ok(FlushResult::CryptError(error));
188 }
189 }
190
191 struct StoreInfoSnapshot<'a> {
192 store: &'a ObjectStore,
193 store_info: OnceCell<StoreInfo>,
194 }
195 impl AssociatedObject for StoreInfoSnapshot<'_> {
196 fn will_apply_mutation(
197 &self,
198 _mutation: &Mutation,
199 _object_id: u64,
200 _manager: &ObjectManager,
201 ) {
202 let mut store_info = self.store.store_info().unwrap();
203
204 let mutations_cipher = self.store.mutations_cipher.lock();
206 if let Some(cipher) = mutations_cipher.as_ref() {
207 store_info.mutations_cipher_offset = cipher.offset();
208 }
209
210 store_info.last_object_id = self.store.last_object_id.lock().id;
213
214 self.store_info.set(store_info).unwrap();
215 }
216 }
217
218 let store_info_snapshot = StoreInfoSnapshot { store: self, store_info: OnceCell::new() };
219
220 let filesystem = self.filesystem();
221 let object_manager = filesystem.object_manager();
222 let reservation = object_manager.metadata_reservation();
223 let txn_options = Options {
224 skip_journal_checks: true,
225 borrow_metadata_space: true,
226 allocator_reservation: Some(reservation),
227 txn_guard: Some(txn_guard),
228 ..Default::default()
229 };
230
231 let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
234 transaction.add_with_object(
235 self.store_object_id(),
236 Mutation::BeginFlush,
237 AssocObj::Borrowed(&store_info_snapshot),
238 );
239 transaction.commit().await?;
240
241 let mut new_store_info = store_info_snapshot.store_info.into_inner().unwrap();
242
243 let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
248
249 let reservation_update: ReservationUpdate; let mut end_transaction = filesystem
251 .clone()
252 .new_transaction(
253 lock_keys![LockKey::object(
254 self.parent_store.as_ref().unwrap().store_object_id(),
255 self.store_info_handle_object_id().unwrap(),
256 )],
257 txn_options,
258 )
259 .await?;
260
261 let parent_store = self.parent_store.as_ref().unwrap();
263 let handle_options = HandleOptions { skip_journal_checks: true, ..Default::default() };
264 let new_object_tree_layer = if let Some(crypt) = self.crypt().as_deref() {
265 let object_id = parent_store.get_next_object_id(transaction.txn_guard()).await?;
266 let (key, unwrapped_key) = match crypt.create_key(object_id, KeyPurpose::Data).await {
267 Ok((key, unwrapped_key)) => (key, unwrapped_key),
268 Err(status) => {
269 log::warn!(
270 status:?;
271 "Failed to create keys while flushing store {}",
272 self.store_object_id(),
273 );
274 return Ok(FlushResult::CryptError(status.into()));
275 }
276 };
277 ObjectStore::create_object_with_key(
278 parent_store,
279 &mut transaction,
280 object_id,
281 handle_options,
282 EncryptionKey::Fxfs(key),
283 unwrapped_key,
284 )
285 .await?
286 } else {
287 ObjectStore::create_object(parent_store, &mut transaction, handle_options, None).await?
288 };
289 let writer = DirectWriter::new(&new_object_tree_layer, txn_options).await;
290 let new_object_tree_layer_object_id = new_object_tree_layer.object_id();
291 parent_store.add_to_graveyard(&mut transaction, new_object_tree_layer_object_id);
292 parent_store.remove_from_graveyard(&mut end_transaction, new_object_tree_layer_object_id);
293
294 transaction.commit().await?;
295 let (layers_to_keep, old_layers) =
296 tree::flush(&self.tree, writer).await.context("Failed to flush tree")?;
297
298 let mut new_layers = layers_from_handles([new_object_tree_layer]).await?;
299 new_layers.extend(layers_to_keep.iter().map(|l| (*l).clone()));
300
301 new_store_info.layers = Vec::new();
302 for layer in &new_layers {
303 if let Some(handle) = layer.handle() {
304 new_store_info.layers.push(handle.object_id());
305 }
306 }
307
308 for layer in &old_layers {
310 if let Some(handle) = layer.handle() {
311 parent_store.add_to_graveyard(&mut end_transaction, handle.object_id());
312 }
313 }
314
315 let old_encrypted_mutations_object_id =
316 std::mem::replace(&mut new_store_info.encrypted_mutations_object_id, INVALID_OBJECT_ID);
317 if old_encrypted_mutations_object_id != INVALID_OBJECT_ID {
318 parent_store.add_to_graveyard(&mut end_transaction, old_encrypted_mutations_object_id);
319 }
320
321 self.write_store_info(&mut end_transaction, &new_store_info).await?;
322
323 let layer_file_sizes = new_layers
324 .iter()
325 .map(|l| l.handle().map(ReadObjectHandle::get_size).unwrap_or(0))
326 .collect::<Vec<u64>>();
327
328 let total_layer_size = layer_file_sizes.iter().sum();
329 reservation_update =
330 ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size));
331
332 end_transaction.add_with_object(
333 self.store_object_id(),
334 Mutation::EndFlush,
335 AssocObj::Borrowed(&reservation_update),
336 );
337
338 if self.trace.load(Ordering::Relaxed) {
339 info!(
340 store_id = self.store_object_id(),
341 old_layer_count = old_layers.len(),
342 new_layer_count = new_layers.len(),
343 total_layer_size,
344 new_store_info:?;
345 "OS: compacting"
346 );
347 }
348
349 end_transaction
350 .commit_with_callback(|_| {
351 let mut store_info = self.store_info.lock();
352 let info = store_info.as_mut().unwrap();
353 info.layers = new_store_info.layers;
354 info.encrypted_mutations_object_id = new_store_info.encrypted_mutations_object_id;
355 info.mutations_cipher_offset = new_store_info.mutations_cipher_offset;
356 self.tree.set_layers(new_layers);
357 })
358 .await?;
359
360 for layer in old_layers {
362 let object_id = layer.handle().map(|h| h.object_id());
363 layer.close_layer().await;
364 if let Some(object_id) = object_id {
365 parent_store.tombstone_object(object_id, txn_options).await?;
366 }
367 }
368
369 if old_encrypted_mutations_object_id != INVALID_OBJECT_ID {
370 parent_store.tombstone_object(old_encrypted_mutations_object_id, txn_options).await?;
371 }
372
373 Ok(FlushResult::Ok(layer_file_sizes))
374 }
375
376 async fn flush_locked(&self, txn_guard: &TxnGuard<'_>) -> Result<(), Error> {
378 let filesystem = self.filesystem();
379 let object_manager = filesystem.object_manager();
380 let reservation = object_manager.metadata_reservation();
381 let txn_options = Options {
382 skip_journal_checks: true,
383 borrow_metadata_space: true,
384 allocator_reservation: Some(reservation),
385 txn_guard: Some(txn_guard),
386 ..Default::default()
387 };
388
389 let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
390 transaction.add(self.store_object_id(), Mutation::BeginFlush);
391 transaction.commit().await?;
392
393 let mut new_store_info = self.load_store_info().await?;
394
395 let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
400
401 let reservation_update: ReservationUpdate; let handle; let mut end_transaction;
404
405 let parent_store = self.parent_store.as_ref().unwrap();
408 handle = if new_store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
409 let handle = ObjectStore::create_object(
410 parent_store,
411 &mut transaction,
412 HandleOptions { skip_journal_checks: true, ..Default::default() },
413 None,
414 )
415 .await?;
416 let oid = handle.object_id();
417 end_transaction = filesystem
418 .clone()
419 .new_transaction(
420 lock_keys![
421 LockKey::object(parent_store.store_object_id(), oid),
422 LockKey::object(
423 parent_store.store_object_id(),
424 self.store_info_handle_object_id().unwrap(),
425 ),
426 ],
427 txn_options,
428 )
429 .await?;
430 new_store_info.encrypted_mutations_object_id = oid;
431 parent_store.add_to_graveyard(&mut transaction, oid);
432 parent_store.remove_from_graveyard(&mut end_transaction, oid);
433 handle
434 } else {
435 end_transaction = filesystem
436 .clone()
437 .new_transaction(
438 lock_keys![
439 LockKey::object(
440 parent_store.store_object_id(),
441 new_store_info.encrypted_mutations_object_id,
442 ),
443 LockKey::object(
444 parent_store.store_object_id(),
445 self.store_info_handle_object_id().unwrap(),
446 ),
447 ],
448 txn_options,
449 )
450 .await?;
451 ObjectStore::open_object(
452 parent_store,
453 new_store_info.encrypted_mutations_object_id,
454 HandleOptions { skip_journal_checks: true, ..Default::default() },
455 None,
456 )
457 .await?
458 };
459 transaction.commit().await?;
460
461 let journaled = filesystem
464 .journal()
465 .read_transactions_for_object(self.store_object_id)
466 .await
467 .context("Failed to read encrypted mutations from journal")?;
468 let mut buffer = handle.allocate_buffer(MAX_ENCRYPTED_MUTATIONS_SIZE).await;
469 let mut cursor = std::io::Cursor::new(buffer.as_mut_slice());
470 EncryptedMutations::from_replayed_mutations(self.store_object_id, journaled)
471 .serialize_with_version(&mut cursor)?;
472 let len = cursor.position() as usize;
473 handle.txn_write(&mut end_transaction, handle.get_size(), buffer.subslice(..len)).await?;
474
475 self.write_store_info(&mut end_transaction, &new_store_info).await?;
476
477 let mut total_layer_size = 0;
478 for &oid in &new_store_info.layers {
479 total_layer_size += parent_store.get_file_size(oid).await?;
480 }
481 total_layer_size +=
482 layer_size_from_encrypted_mutations_size(handle.get_size() + len as u64);
483
484 reservation_update =
485 ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size));
486
487 end_transaction.add_with_object(
488 self.store_object_id(),
489 Mutation::EndFlush,
490 AssocObj::Borrowed(&reservation_update),
491 );
492
493 end_transaction.commit().await?;
494
495 Ok(())
496 }
497}
498
499#[cfg(test)]
500mod tests {
501 use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, JournalingObject, SyncOptions};
502 use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle};
503 use crate::object_store::directory::Directory;
504 use crate::object_store::transaction::{Options, lock_keys};
505 use crate::object_store::volume::root_volume;
506 use crate::object_store::{
507 HandleOptions, LockKey, NO_OWNER, NewChildStoreOptions, ObjectStore, StoreOptions,
508 layer_size_from_encrypted_mutations_size, tree,
509 };
510 use fxfs_insecure_crypto::InsecureCrypt;
511 use std::sync::Arc;
512 use storage_device::DeviceHolder;
513 use storage_device::fake_device::FakeDevice;
514
515 async fn run_key_roll_test(flush_before_unlock: bool) {
516 let device = DeviceHolder::new(FakeDevice::new(8192, 1024));
517 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
518 let store_id = {
519 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
520 root_volume
521 .new_volume(
522 "test",
523 NewChildStoreOptions {
524 options: StoreOptions {
525 crypt: Some(Arc::new(InsecureCrypt::new())),
526 ..StoreOptions::default()
527 },
528 ..NewChildStoreOptions::default()
529 },
530 )
531 .await
532 .expect("new_volume failed")
533 .store_object_id()
534 };
535
536 fs.close().await.expect("close failed");
537 let device = fs.take_device().await;
538 device.reopen(false);
539
540 let fs = FxFilesystemBuilder::new()
541 .roll_metadata_key_byte_count(512 * 1024)
542 .open(device)
543 .await
544 .expect("open failed");
545
546 let (first_filename, last_filename) = {
547 let store = fs.object_manager().store(store_id).expect("store not found");
548 store.unlock(NO_OWNER, Arc::new(InsecureCrypt::new())).await.expect("unlock failed");
549
550 let root_dir = Directory::open(&store, store.root_directory_object_id())
552 .await
553 .expect("open failed");
554
555 let mut last_mutations_cipher_offset = 0;
556 let mut i = 0;
557 let first_filename = format!("{:<200}", i);
558 loop {
559 let mut transaction = fs
560 .clone()
561 .new_transaction(
562 lock_keys![LockKey::object(store_id, root_dir.object_id())],
563 Options::default(),
564 )
565 .await
566 .expect("new_transaction failed");
567 root_dir
568 .create_child_file(&mut transaction, &format!("{:<200}", i))
569 .await
570 .expect("create_child_file failed");
571 i += 1;
572 transaction.commit().await.expect("commit failed");
573 let cipher_offset = store.mutations_cipher.lock().as_ref().unwrap().offset();
574 if cipher_offset < last_mutations_cipher_offset {
575 break;
576 }
577 last_mutations_cipher_offset = cipher_offset;
578 }
579
580 fs.sync(SyncOptions::default()).await.expect("sync failed");
583
584 let mut transaction = fs
586 .clone()
587 .new_transaction(
588 lock_keys![LockKey::object(store_id, root_dir.object_id())],
589 Options::default(),
590 )
591 .await
592 .expect("new_transaction failed");
593 let last_filename = format!("{:<200}", i);
594 root_dir
595 .create_child_file(&mut transaction, &last_filename)
596 .await
597 .expect("create_child_file failed");
598 transaction.commit().await.expect("commit failed");
599 (first_filename, last_filename)
600 };
601
602 fs.close().await.expect("close failed");
603
604 let device = fs.take_device().await;
606 device.reopen(false);
607 let fs = FxFilesystemBuilder::new()
608 .roll_metadata_key_byte_count(512 * 1024)
609 .open(device)
610 .await
611 .expect("open failed");
612
613 if flush_before_unlock {
614 fs.object_manager().flush().await.expect("flush failed");
617 }
618
619 {
620 let store = fs.object_manager().store(store_id).expect("store not found");
621 store.unlock(NO_OWNER, Arc::new(InsecureCrypt::new())).await.expect("unlock failed");
622
623 assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
625
626 let root_dir = Directory::open(&store, store.root_directory_object_id())
627 .await
628 .expect("open failed");
629 root_dir
630 .lookup(&first_filename)
631 .await
632 .expect("Lookup failed")
633 .expect("First created file wasn't present");
634 root_dir
635 .lookup(&last_filename)
636 .await
637 .expect("Lookup failed")
638 .expect("Last created file wasn't present");
639 }
640 }
641
642 #[fuchsia::test(threads = 10)]
643 async fn test_metadata_key_roll() {
644 run_key_roll_test(false).await;
645 }
646
647 #[fuchsia::test(threads = 10)]
648 async fn test_metadata_key_roll_with_flush_before_unlock() {
649 run_key_roll_test(true).await;
650 }
651
652 #[fuchsia::test]
653 async fn test_flush_when_locked() {
654 let device = DeviceHolder::new(FakeDevice::new(8192, 1024));
655 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
656 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
657 let crypt = Arc::new(InsecureCrypt::new());
658 let store = root_volume
659 .new_volume(
660 "test",
661 NewChildStoreOptions {
662 options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
663 ..NewChildStoreOptions::default()
664 },
665 )
666 .await
667 .expect("new_volume failed");
668 let root_dir =
669 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
670 let mut transaction = fs
671 .clone()
672 .new_transaction(
673 lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
674 Options::default(),
675 )
676 .await
677 .expect("new_transaction failed");
678 let foo = root_dir
679 .create_child_file(&mut transaction, "foo")
680 .await
681 .expect("create_child_file failed");
682 transaction.commit().await.expect("commit failed");
683
684 store.flush().await.expect("flush failed");
688
689 let mut transaction = fs
690 .clone()
691 .new_transaction(
692 lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
693 Options::default(),
694 )
695 .await
696 .expect("new_transaction failed");
697 let bar = root_dir
698 .create_child_file(&mut transaction, "bar")
699 .await
700 .expect("create_child_file failed");
701 transaction.commit().await.expect("commit failed");
702
703 store.lock().await.expect("lock failed");
704
705 store.flush().await.expect("flush failed");
707
708 let info = store.load_store_info().await.unwrap();
710 let parent_store = store.parent_store().unwrap();
711 let mut total_layer_size = 0;
712 for &oid in &info.layers {
713 total_layer_size +=
714 parent_store.get_file_size(oid).await.expect("get_file_size failed");
715 }
716 assert_ne!(info.encrypted_mutations_object_id, INVALID_OBJECT_ID);
717 total_layer_size += layer_size_from_encrypted_mutations_size(
718 parent_store
719 .get_file_size(info.encrypted_mutations_object_id)
720 .await
721 .expect("get_file_size failed"),
722 );
723 assert_eq!(
724 fs.object_manager().reservation(store.store_object_id()),
725 Some(tree::reservation_amount_from_layer_size(total_layer_size))
726 );
727
728 store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
730
731 ObjectStore::open_object(&store, foo.object_id(), HandleOptions::default(), None)
732 .await
733 .expect("open_object failed");
734
735 ObjectStore::open_object(&store, bar.object_id(), HandleOptions::default(), None)
736 .await
737 .expect("open_object failed");
738
739 fs.close().await.expect("close failed");
740 }
741}
742
743impl tree::MajorCompactable<ObjectKey, ObjectValue> for LSMTree<ObjectKey, ObjectValue> {
744 async fn major_iter(
745 iter: impl LayerIterator<ObjectKey, ObjectValue>,
746 ) -> Result<impl LayerIterator<ObjectKey, ObjectValue>, Error> {
747 iter.filter(|item: ItemRef<'_, _, _>| match item {
748 ItemRef { value: ObjectValue::None, .. } => false,
750 ItemRef { value: ObjectValue::Extent(ExtentValue::None), .. } => false,
752 _ => true,
753 })
754 .await
755 }
756}