1use crate::filesystem::TxnGuard;
8use crate::log::*;
9use crate::lsm_tree::types::{ItemRef, LayerIterator};
10use crate::lsm_tree::{layers_from_handles, LSMTree};
11use crate::object_handle::{ObjectHandle, ReadObjectHandle, INVALID_OBJECT_ID};
12use crate::object_store::extent_record::ExtentValue;
13use crate::object_store::object_manager::{ObjectManager, ReservationUpdate};
14use crate::object_store::object_record::{ObjectKey, ObjectValue};
15use crate::object_store::transaction::{lock_keys, AssociatedObject, LockKey, Mutation};
16use crate::object_store::{
17 layer_size_from_encrypted_mutations_size, tree, AssocObj, DirectWriter, EncryptedMutations,
18 HandleOptions, LockState, ObjectStore, Options, StoreInfo, Transaction,
19 MAX_ENCRYPTED_MUTATIONS_SIZE,
20};
21use crate::serialized_types::{Version, VersionedLatest, LATEST_VERSION};
22use anyhow::{bail, Context, Error};
23use fxfs_crypto::KeyPurpose;
24use once_cell::sync::OnceCell;
25use std::sync::atomic::Ordering;
26
27#[derive(Copy, Clone, Debug, PartialEq, Eq)]
28pub enum Reason {
29 Journal,
31
32 Unlock,
34}
35
36#[derive(Debug)]
41enum FlushResult<T> {
42 Ok(T),
43 CryptError(Error),
44}
45
46#[fxfs_trace::trace]
47impl ObjectStore {
48 #[trace("store_object_id" => self.store_object_id)]
49 pub async fn flush_with_reason(&self, reason: Reason) -> Result<Version, Error> {
50 const MAX_RETRIES: usize = 10;
58 let mut retries = 0;
59 loop {
60 match self.try_flush_with_reason(reason).await? {
61 FlushResult::Ok(version) => return Ok(version),
62 FlushResult::CryptError(error) => {
63 if reason == Reason::Unlock || retries >= MAX_RETRIES {
64 return Err(error);
67 }
68 log::warn!(
69 error:?;
70 "Flushing failed for store {}, re-locking and trying again.",
71 self.store_object_id()
72 );
73 let owner = self.lock_state.lock().owner();
74 if let Some(owner) = owner {
75 owner
76 .force_lock(&self)
77 .await
78 .context("Failed to re-lock store during flush")?;
79 } else {
80 bail!("No store owner was registered!");
81 }
82 }
83 }
84 retries += 1;
85 }
86 }
87
88 async fn try_flush_with_reason(&self, reason: Reason) -> Result<FlushResult<Version>, Error> {
89 if self.parent_store.is_none() {
90 return Ok(FlushResult::Ok(self.tree.get_earliest_version()));
92 }
93 let filesystem = self.filesystem();
94 let object_manager = filesystem.object_manager();
95
96 let txn_guard = filesystem.clone().txn_guard().await;
98 let keys = lock_keys![LockKey::flush(self.store_object_id())];
99 let _guard = Some(filesystem.lock_manager().write_lock(keys).await);
100
101 match reason {
102 Reason::Unlock => {
103 if self.store_info().unwrap().encrypted_mutations_object_id == INVALID_OBJECT_ID {
109 return Ok(FlushResult::Ok(self.tree.get_earliest_version()));
114 }
115 }
116 Reason::Journal => {
117 let earliest_version = self.tree.get_earliest_version();
120 if !object_manager.needs_flush(self.store_object_id)
121 && earliest_version == LATEST_VERSION
122 {
123 return Ok(FlushResult::Ok(earliest_version));
126 }
127 }
128 }
129
130 let trace = self.trace.load(Ordering::Relaxed);
131 if trace {
132 info!(store_id = self.store_object_id(); "OS: begin flush");
133 }
134
135 let layer_file_sizes =
136 if matches!(&*self.lock_state.lock(), LockState::Locked) {
137 self.flush_locked(&txn_guard).await.with_context(|| {
138 format!("Failed to flush object store {}", self.store_object_id)
139 })?;
140 None
141 } else {
142 match self.flush_unlocked(&txn_guard).await.with_context(|| {
143 format!("Failed to flush object store {}", self.store_object_id)
144 })? {
145 FlushResult::Ok(layer_file_sizes) => Some(layer_file_sizes),
146 FlushResult::CryptError(error) => return Ok(FlushResult::CryptError(error)),
147 }
148 };
149
150 if trace {
151 info!(store_id = self.store_object_id(); "OS: end flush");
152 }
153 if let Some(callback) = &*self.flush_callback.lock() {
154 callback(self);
155 }
156
157 let mut counters = self.counters.lock();
158 counters.num_flushes += 1;
159 counters.last_flush_time = Some(std::time::SystemTime::now());
160 if let Some(layer_file_sizes) = layer_file_sizes {
161 counters.persistent_layer_file_sizes = layer_file_sizes;
162 }
163 Ok(FlushResult::Ok(self.tree.get_earliest_version()))
165 }
166
167 async fn flush_unlocked(
169 &self,
170 txn_guard: &TxnGuard<'_>,
171 ) -> Result<FlushResult<Vec<u64>>, Error> {
172 let roll_mutations_key = self
173 .mutations_cipher
174 .lock()
175 .as_ref()
176 .map(|cipher| {
177 cipher.offset() >= self.filesystem().options().roll_metadata_key_byte_count
178 })
179 .unwrap_or(false);
180 if roll_mutations_key {
181 if let Err(error) = self.roll_mutations_key(self.crypt().unwrap().as_ref()).await {
182 log::warn!(
183 error:?; "Failed to roll mutations key for store {}", self.store_object_id());
184 return Ok(FlushResult::CryptError(error));
185 }
186 }
187
188 struct StoreInfoSnapshot<'a> {
189 store: &'a ObjectStore,
190 store_info: OnceCell<StoreInfo>,
191 }
192 impl AssociatedObject for StoreInfoSnapshot<'_> {
193 fn will_apply_mutation(
194 &self,
195 _mutation: &Mutation,
196 _object_id: u64,
197 _manager: &ObjectManager,
198 ) {
199 let mut store_info = self.store.store_info().unwrap();
200
201 let mutations_cipher = self.store.mutations_cipher.lock();
203 if let Some(cipher) = mutations_cipher.as_ref() {
204 store_info.mutations_cipher_offset = cipher.offset();
205 }
206
207 store_info.last_object_id = self.store.last_object_id.lock().id;
210
211 self.store_info.set(store_info).unwrap();
212 }
213 }
214
215 let store_info_snapshot = StoreInfoSnapshot { store: self, store_info: OnceCell::new() };
216
217 let filesystem = self.filesystem();
218 let object_manager = filesystem.object_manager();
219 let reservation = object_manager.metadata_reservation();
220 let txn_options = Options {
221 skip_journal_checks: true,
222 borrow_metadata_space: true,
223 allocator_reservation: Some(reservation),
224 txn_guard: Some(txn_guard),
225 ..Default::default()
226 };
227
228 let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
231 transaction.add_with_object(
232 self.store_object_id(),
233 Mutation::BeginFlush,
234 AssocObj::Borrowed(&store_info_snapshot),
235 );
236 transaction.commit().await?;
237
238 let mut new_store_info = store_info_snapshot.store_info.into_inner().unwrap();
239
240 let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
245
246 let reservation_update: ReservationUpdate; let mut end_transaction = filesystem
248 .clone()
249 .new_transaction(
250 lock_keys![LockKey::object(
251 self.parent_store.as_ref().unwrap().store_object_id(),
252 self.store_info_handle_object_id().unwrap(),
253 )],
254 txn_options,
255 )
256 .await?;
257
258 let parent_store = self.parent_store.as_ref().unwrap();
260 let handle_options = HandleOptions { skip_journal_checks: true, ..Default::default() };
261 let new_object_tree_layer = if let Some(crypt) = self.crypt().as_deref() {
262 let object_id = parent_store.get_next_object_id(transaction.txn_guard()).await?;
263 let (key, unwrapped_key) = match crypt.create_key(object_id, KeyPurpose::Data).await {
264 Ok((key, unwrapped_key)) => (key, unwrapped_key),
265 Err(status) => {
266 log::warn!(
267 status:?;
268 "Failed to create keys while flushing store {}",
269 self.store_object_id(),
270 );
271 return Ok(FlushResult::CryptError(status.into()));
272 }
273 };
274 ObjectStore::create_object_with_key(
275 parent_store,
276 &mut transaction,
277 object_id,
278 handle_options,
279 key,
280 unwrapped_key,
281 )
282 .await?
283 } else {
284 ObjectStore::create_object(parent_store, &mut transaction, handle_options, None).await?
285 };
286 let writer = DirectWriter::new(&new_object_tree_layer, txn_options).await;
287 let new_object_tree_layer_object_id = new_object_tree_layer.object_id();
288 parent_store.add_to_graveyard(&mut transaction, new_object_tree_layer_object_id);
289 parent_store.remove_from_graveyard(&mut end_transaction, new_object_tree_layer_object_id);
290
291 transaction.commit().await?;
292 let (layers_to_keep, old_layers) =
293 tree::flush(&self.tree, writer).await.context("Failed to flush tree")?;
294
295 let mut new_layers = layers_from_handles([new_object_tree_layer]).await?;
296 new_layers.extend(layers_to_keep.iter().map(|l| (*l).clone()));
297
298 new_store_info.layers = Vec::new();
299 for layer in &new_layers {
300 if let Some(handle) = layer.handle() {
301 new_store_info.layers.push(handle.object_id());
302 }
303 }
304
305 for layer in &old_layers {
307 if let Some(handle) = layer.handle() {
308 parent_store.add_to_graveyard(&mut end_transaction, handle.object_id());
309 }
310 }
311
312 let old_encrypted_mutations_object_id =
313 std::mem::replace(&mut new_store_info.encrypted_mutations_object_id, INVALID_OBJECT_ID);
314 if old_encrypted_mutations_object_id != INVALID_OBJECT_ID {
315 parent_store.add_to_graveyard(&mut end_transaction, old_encrypted_mutations_object_id);
316 }
317
318 self.write_store_info(&mut end_transaction, &new_store_info).await?;
319
320 let layer_file_sizes = new_layers
321 .iter()
322 .map(|l| l.handle().map(ReadObjectHandle::get_size).unwrap_or(0))
323 .collect::<Vec<u64>>();
324
325 let total_layer_size = layer_file_sizes.iter().sum();
326 reservation_update =
327 ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size));
328
329 end_transaction.add_with_object(
330 self.store_object_id(),
331 Mutation::EndFlush,
332 AssocObj::Borrowed(&reservation_update),
333 );
334
335 if self.trace.load(Ordering::Relaxed) {
336 info!(
337 store_id = self.store_object_id(),
338 old_layer_count = old_layers.len(),
339 new_layer_count = new_layers.len(),
340 total_layer_size,
341 new_store_info:?;
342 "OS: compacting"
343 );
344 }
345
346 end_transaction
347 .commit_with_callback(|_| {
348 let mut store_info = self.store_info.lock();
349 let info = store_info.as_mut().unwrap();
350 info.layers = new_store_info.layers;
351 info.encrypted_mutations_object_id = new_store_info.encrypted_mutations_object_id;
352 info.mutations_cipher_offset = new_store_info.mutations_cipher_offset;
353 self.tree.set_layers(new_layers);
354 })
355 .await?;
356
357 for layer in old_layers {
359 let object_id = layer.handle().map(|h| h.object_id());
360 layer.close_layer().await;
361 if let Some(object_id) = object_id {
362 parent_store.tombstone_object(object_id, txn_options).await?;
363 }
364 }
365
366 if old_encrypted_mutations_object_id != INVALID_OBJECT_ID {
367 parent_store.tombstone_object(old_encrypted_mutations_object_id, txn_options).await?;
368 }
369
370 Ok(FlushResult::Ok(layer_file_sizes))
371 }
372
373 async fn flush_locked(&self, txn_guard: &TxnGuard<'_>) -> Result<(), Error> {
375 let filesystem = self.filesystem();
376 let object_manager = filesystem.object_manager();
377 let reservation = object_manager.metadata_reservation();
378 let txn_options = Options {
379 skip_journal_checks: true,
380 borrow_metadata_space: true,
381 allocator_reservation: Some(reservation),
382 txn_guard: Some(txn_guard),
383 ..Default::default()
384 };
385
386 let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
387 transaction.add(self.store_object_id(), Mutation::BeginFlush);
388 transaction.commit().await?;
389
390 let mut new_store_info = self.load_store_info().await?;
391
392 let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
397
398 let reservation_update: ReservationUpdate; let handle; let mut end_transaction;
401
402 let parent_store = self.parent_store.as_ref().unwrap();
405 handle = if new_store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
406 let handle = ObjectStore::create_object(
407 parent_store,
408 &mut transaction,
409 HandleOptions { skip_journal_checks: true, ..Default::default() },
410 None,
411 )
412 .await?;
413 let oid = handle.object_id();
414 end_transaction = filesystem
415 .clone()
416 .new_transaction(
417 lock_keys![
418 LockKey::object(parent_store.store_object_id(), oid),
419 LockKey::object(
420 parent_store.store_object_id(),
421 self.store_info_handle_object_id().unwrap(),
422 ),
423 ],
424 txn_options,
425 )
426 .await?;
427 new_store_info.encrypted_mutations_object_id = oid;
428 parent_store.add_to_graveyard(&mut transaction, oid);
429 parent_store.remove_from_graveyard(&mut end_transaction, oid);
430 handle
431 } else {
432 end_transaction = filesystem
433 .clone()
434 .new_transaction(
435 lock_keys![
436 LockKey::object(
437 parent_store.store_object_id(),
438 new_store_info.encrypted_mutations_object_id,
439 ),
440 LockKey::object(
441 parent_store.store_object_id(),
442 self.store_info_handle_object_id().unwrap(),
443 ),
444 ],
445 txn_options,
446 )
447 .await?;
448 ObjectStore::open_object(
449 parent_store,
450 new_store_info.encrypted_mutations_object_id,
451 HandleOptions { skip_journal_checks: true, ..Default::default() },
452 None,
453 )
454 .await?
455 };
456 transaction.commit().await?;
457
458 let journaled = filesystem
461 .journal()
462 .read_transactions_for_object(self.store_object_id)
463 .await
464 .context("Failed to read encrypted mutations from journal")?;
465 let mut buffer = handle.allocate_buffer(MAX_ENCRYPTED_MUTATIONS_SIZE).await;
466 let mut cursor = std::io::Cursor::new(buffer.as_mut_slice());
467 EncryptedMutations::from_replayed_mutations(self.store_object_id, journaled)
468 .serialize_with_version(&mut cursor)?;
469 let len = cursor.position() as usize;
470 handle.txn_write(&mut end_transaction, handle.get_size(), buffer.subslice(..len)).await?;
471
472 self.write_store_info(&mut end_transaction, &new_store_info).await?;
473
474 let mut total_layer_size = 0;
475 for &oid in &new_store_info.layers {
476 total_layer_size += parent_store.get_file_size(oid).await?;
477 }
478 total_layer_size +=
479 layer_size_from_encrypted_mutations_size(handle.get_size() + len as u64);
480
481 reservation_update =
482 ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size));
483
484 end_transaction.add_with_object(
485 self.store_object_id(),
486 Mutation::EndFlush,
487 AssocObj::Borrowed(&reservation_update),
488 );
489
490 end_transaction.commit().await?;
491
492 Ok(())
493 }
494
495 async fn write_store_info<'a>(
496 &'a self,
497 transaction: &mut Transaction<'a>,
498 new_store_info: &StoreInfo,
499 ) -> Result<(), Error> {
500 let mut serialized_info = Vec::new();
501 new_store_info.serialize_with_version(&mut serialized_info)?;
502 let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
503 buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
504
505 self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
506 }
507}
508
509#[cfg(test)]
510mod tests {
511 use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, JournalingObject, SyncOptions};
512 use crate::object_handle::{ObjectHandle, INVALID_OBJECT_ID};
513 use crate::object_store::directory::Directory;
514 use crate::object_store::transaction::{lock_keys, Options};
515 use crate::object_store::volume::root_volume;
516 use crate::object_store::{
517 layer_size_from_encrypted_mutations_size, tree, HandleOptions, LockKey, ObjectStore,
518 NO_OWNER,
519 };
520 use fxfs_insecure_crypto::InsecureCrypt;
521 use std::sync::Arc;
522 use storage_device::fake_device::FakeDevice;
523 use storage_device::DeviceHolder;
524
525 async fn run_key_roll_test(flush_before_unlock: bool) {
526 let device = DeviceHolder::new(FakeDevice::new(8192, 1024));
527 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
528 let store_id = {
529 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
530 root_volume
531 .new_volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
532 .await
533 .expect("new_volume failed")
534 .store_object_id()
535 };
536
537 fs.close().await.expect("close failed");
538 let device = fs.take_device().await;
539 device.reopen(false);
540
541 let fs = FxFilesystemBuilder::new()
542 .roll_metadata_key_byte_count(512 * 1024)
543 .open(device)
544 .await
545 .expect("open failed");
546
547 let (first_filename, last_filename) = {
548 let store = fs.object_manager().store(store_id).expect("store not found");
549 store.unlock(NO_OWNER, Arc::new(InsecureCrypt::new())).await.expect("unlock failed");
550
551 let root_dir = Directory::open(&store, store.root_directory_object_id())
553 .await
554 .expect("open failed");
555
556 let mut last_mutations_cipher_offset = 0;
557 let mut i = 0;
558 let first_filename = format!("{:<200}", i);
559 loop {
560 let mut transaction = fs
561 .clone()
562 .new_transaction(
563 lock_keys![LockKey::object(store_id, root_dir.object_id())],
564 Options::default(),
565 )
566 .await
567 .expect("new_transaction failed");
568 root_dir
569 .create_child_file(&mut transaction, &format!("{:<200}", i))
570 .await
571 .expect("create_child_file failed");
572 i += 1;
573 transaction.commit().await.expect("commit failed");
574 let cipher_offset = store.mutations_cipher.lock().as_ref().unwrap().offset();
575 if cipher_offset < last_mutations_cipher_offset {
576 break;
577 }
578 last_mutations_cipher_offset = cipher_offset;
579 }
580
581 fs.sync(SyncOptions::default()).await.expect("sync failed");
584
585 let mut transaction = fs
587 .clone()
588 .new_transaction(
589 lock_keys![LockKey::object(store_id, root_dir.object_id())],
590 Options::default(),
591 )
592 .await
593 .expect("new_transaction failed");
594 let last_filename = format!("{:<200}", i);
595 root_dir
596 .create_child_file(&mut transaction, &last_filename)
597 .await
598 .expect("create_child_file failed");
599 transaction.commit().await.expect("commit failed");
600 (first_filename, last_filename)
601 };
602
603 fs.close().await.expect("close failed");
604
605 let device = fs.take_device().await;
607 device.reopen(false);
608 let fs = FxFilesystemBuilder::new()
609 .roll_metadata_key_byte_count(512 * 1024)
610 .open(device)
611 .await
612 .expect("open failed");
613
614 if flush_before_unlock {
615 fs.object_manager().flush().await.expect("flush failed");
618 }
619
620 {
621 let store = fs.object_manager().store(store_id).expect("store not found");
622 store.unlock(NO_OWNER, Arc::new(InsecureCrypt::new())).await.expect("unlock failed");
623
624 assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
626
627 let root_dir = Directory::open(&store, store.root_directory_object_id())
628 .await
629 .expect("open failed");
630 root_dir
631 .lookup(&first_filename)
632 .await
633 .expect("Lookup failed")
634 .expect("First created file wasn't present");
635 root_dir
636 .lookup(&last_filename)
637 .await
638 .expect("Lookup failed")
639 .expect("Last created file wasn't present");
640 }
641 }
642
643 #[fuchsia::test(threads = 10)]
644 async fn test_metadata_key_roll() {
645 run_key_roll_test(false).await;
646 }
647
648 #[fuchsia::test(threads = 10)]
649 async fn test_metadata_key_roll_with_flush_before_unlock() {
650 run_key_roll_test(true).await;
651 }
652
653 #[fuchsia::test]
654 async fn test_flush_when_locked() {
655 let device = DeviceHolder::new(FakeDevice::new(8192, 1024));
656 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
657 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
658 let crypt = Arc::new(InsecureCrypt::new());
659 let store = root_volume
660 .new_volume("test", NO_OWNER, Some(crypt.clone()))
661 .await
662 .expect("new_volume failed");
663 let root_dir =
664 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
665 let mut transaction = fs
666 .clone()
667 .new_transaction(
668 lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
669 Options::default(),
670 )
671 .await
672 .expect("new_transaction failed");
673 let foo = root_dir
674 .create_child_file(&mut transaction, "foo")
675 .await
676 .expect("create_child_file failed");
677 transaction.commit().await.expect("commit failed");
678
679 store.flush().await.expect("flush failed");
683
684 let mut transaction = fs
685 .clone()
686 .new_transaction(
687 lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
688 Options::default(),
689 )
690 .await
691 .expect("new_transaction failed");
692 let bar = root_dir
693 .create_child_file(&mut transaction, "bar")
694 .await
695 .expect("create_child_file failed");
696 transaction.commit().await.expect("commit failed");
697
698 store.lock().await.expect("lock failed");
699
700 store.flush().await.expect("flush failed");
702
703 let info = store.load_store_info().await.unwrap();
705 let parent_store = store.parent_store().unwrap();
706 let mut total_layer_size = 0;
707 for &oid in &info.layers {
708 total_layer_size +=
709 parent_store.get_file_size(oid).await.expect("get_file_size failed");
710 }
711 assert_ne!(info.encrypted_mutations_object_id, INVALID_OBJECT_ID);
712 total_layer_size += layer_size_from_encrypted_mutations_size(
713 parent_store
714 .get_file_size(info.encrypted_mutations_object_id)
715 .await
716 .expect("get_file_size failed"),
717 );
718 assert_eq!(
719 fs.object_manager().reservation(store.store_object_id()),
720 Some(tree::reservation_amount_from_layer_size(total_layer_size))
721 );
722
723 store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
725
726 ObjectStore::open_object(&store, foo.object_id(), HandleOptions::default(), None)
727 .await
728 .expect("open_object failed");
729
730 ObjectStore::open_object(&store, bar.object_id(), HandleOptions::default(), None)
731 .await
732 .expect("open_object failed");
733
734 fs.close().await.expect("close failed");
735 }
736}
737
738impl tree::MajorCompactable<ObjectKey, ObjectValue> for LSMTree<ObjectKey, ObjectValue> {
739 async fn major_iter(
740 iter: impl LayerIterator<ObjectKey, ObjectValue>,
741 ) -> Result<impl LayerIterator<ObjectKey, ObjectValue>, Error> {
742 iter.filter(|item: ItemRef<'_, _, _>| match item {
743 ItemRef { value: ObjectValue::None, .. } => false,
745 ItemRef { value: ObjectValue::Extent(ExtentValue::None), .. } => false,
747 _ => true,
748 })
749 .await
750 }
751}