1use crate::errors::FxfsError;
8use crate::log::*;
9use crate::lsm_tree::types::{ItemRef, LayerIterator};
10use crate::lsm_tree::{LSMTree, layers_from_handles};
11use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
12use crate::object_store::extent_record::ExtentValue;
13use crate::object_store::object_manager::{ObjectManager, ReservationUpdate};
14use crate::object_store::object_record::{ObjectKey, ObjectValue};
15use crate::object_store::transaction::{AssociatedObject, LockKey, Mutation, lock_keys};
16use crate::object_store::{
17 AssocObj, DirectWriter, EncryptedMutations, HandleOptions, LastObjectId, LastObjectIdInfo,
18 LockState, MAX_ENCRYPTED_MUTATIONS_SIZE, ObjectStore, Options, ReservedId, StoreInfo,
19 layer_size_from_encrypted_mutations_size, tree,
20};
21use crate::serialized_types::{LATEST_VERSION, Version, VersionedLatest};
22use anyhow::{Context, Error, anyhow};
23use std::sync::OnceLock;
24use std::sync::atomic::Ordering;
25
26#[derive(Copy, Clone, Debug, PartialEq, Eq)]
27pub enum Reason {
28 Journal,
30
31 Unlock,
33}
34
35#[fxfs_trace::trace]
36impl ObjectStore {
37 #[trace("store_object_id" => self.store_object_id)]
38 pub async fn flush_with_reason(&self, reason: Reason) -> Result<Version, Error> {
39 if self.parent_store.is_none() {
40 return Ok(self.tree.get_earliest_version());
42 }
43 let filesystem = self.filesystem();
44 let object_manager = filesystem.object_manager();
45
46 let keys = lock_keys![LockKey::flush(self.store_object_id())];
47 let _guard = Some(filesystem.lock_manager().write_lock(keys).await);
48
49 if matches!(*self.lock_state.lock(), LockState::Deleted) {
51 return Ok(LATEST_VERSION);
54 }
55
56 match reason {
57 Reason::Unlock => {
58 if self.store_info().unwrap().encrypted_mutations_object_id == INVALID_OBJECT_ID {
64 return Ok(self.tree.get_earliest_version());
69 }
70 }
71 Reason::Journal => {
72 let earliest_version = self.tree.get_earliest_version();
75 if !object_manager.needs_flush(self.store_object_id)
76 && earliest_version == LATEST_VERSION
77 {
78 return Ok(earliest_version);
81 }
82 }
83 }
84
85 let trace = self.trace.load(Ordering::Relaxed);
86 if trace {
87 info!(store_id = self.store_object_id(); "OS: begin flush");
88 }
89
90 if matches!(&*self.lock_state.lock(), LockState::Locked) {
91 self.flush_locked().await.with_context(|| {
92 format!("Failed to flush object store {}", self.store_object_id)
93 })?;
94 } else {
95 self.flush_unlocked(reason).await.with_context(|| {
96 format!("Failed to flush object store {}", self.store_object_id)
97 })?;
98 }
99
100 if trace {
101 info!(store_id = self.store_object_id(); "OS: end flush");
102 }
103 if let Some(callback) = &*self.flush_callback.lock() {
104 callback(self);
105 }
106
107 let mut counters = self.counters.lock();
108 counters.num_flushes += 1;
109 counters.last_flush_time = Some(std::time::SystemTime::now());
110 Ok(self.tree.get_earliest_version())
112 }
113
114 async fn flush_unlocked(&self, reason: Reason) -> Result<Vec<u64>, Error> {
116 struct StoreInfoSnapshot<'a> {
117 store: &'a ObjectStore,
118 store_info: OnceLock<StoreInfo>,
119 }
120 impl AssociatedObject for StoreInfoSnapshot<'_> {
121 fn will_apply_mutation(
122 &self,
123 _mutation: &Mutation,
124 _object_id: u64,
125 _manager: &ObjectManager,
126 ) {
127 let mut store_info = self.store.store_info().unwrap();
128
129 let mutations_cipher = self.store.mutations_cipher.lock();
131 if let Some(cipher) = mutations_cipher.as_ref() {
132 store_info.mutations_cipher_offset = cipher.offset();
133 }
134
135 self.store_info.set(store_info).unwrap();
136 }
137 }
138
139 let store_info_snapshot = StoreInfoSnapshot { store: self, store_info: OnceLock::new() };
140
141 let filesystem = self.filesystem();
142 let object_manager = filesystem.object_manager();
143 let reservation = object_manager.metadata_reservation();
144 let txn_options = Options {
145 skip_journal_checks: true,
146 skip_key_roll: true,
147 borrow_metadata_space: true,
148 allocator_reservation: Some(reservation),
149 ..Default::default()
150 };
151
152 let mut transaction = self.new_transaction(lock_keys![], txn_options).await?;
155 transaction.add_with_object(
156 self.store_object_id(),
157 Mutation::BeginFlush,
158 AssocObj::Borrowed(&store_info_snapshot),
159 );
160 transaction.commit().await?;
161
162 let mut new_store_info = store_info_snapshot.store_info.into_inner().unwrap();
163
164 let mut transaction = self.new_transaction(lock_keys![], txn_options).await?;
169
170 let parent_store = self.parent_store.as_ref().unwrap();
172 let handle_options = HandleOptions { skip_journal_checks: true, ..Default::default() };
173 let id_and_key = {
174 let mut lock_state = self.lock_state.lock();
175 match &mut *lock_state {
176 LockState::Unlocked { cached_keys, .. } => {
177 if let Some(item) = cached_keys.pop() {
178 Ok(Some(item))
179 } else {
180 log::warn!("No cached keys for flush for store {}", self.store_object_id());
181 Err(anyhow!(FxfsError::Internal).context("No cached keys for flush"))
182 }
183 }
184 LockState::UnlockedReadOnly(..) => {
185 Err(anyhow!(FxfsError::Internal).context("Flush on read-only store"))
186 }
187 LockState::Unencrypted => Ok(None),
188 _ => Err(anyhow!(FxfsError::Internal))
189 .with_context(|| format!("Invalid lock state ({:?}) for flush", *lock_state)),
190 }
191 }?;
192
193 let new_object_tree_layer = if let Some((raw_id, key, unwrapped_key)) = id_and_key {
194 let object_id = ReservedId::new(parent_store, raw_id);
195 ObjectStore::create_object_with_key(
196 parent_store,
197 &mut transaction,
198 object_id,
199 handle_options,
200 key,
201 unwrapped_key,
202 )
203 .await?
204 } else {
205 ObjectStore::create_object(parent_store, &mut transaction, handle_options, None).await?
206 };
207 let writer = DirectWriter::new(&new_object_tree_layer, txn_options).await;
208 let new_object_tree_layer_object_id = new_object_tree_layer.object_id();
209 parent_store.add_to_graveyard(&mut transaction, new_object_tree_layer_object_id);
210
211 transaction.commit().await?;
212
213 let (layers_to_keep, old_layers) = tree::flush(
215 &self.tree,
216 writer,
217 (reason == Reason::Journal).then(|| filesystem.journal().get_compaction_yielder()),
218 )
219 .await
220 .context("Failed to flush tree")?;
221
222 let mut new_layers = layers_from_handles([new_object_tree_layer]).await?;
224 new_layers.extend(layers_to_keep.iter().map(|l| (*l).clone()));
225
226 new_store_info.layers = Vec::new();
227 for layer in &new_layers {
228 if let Some(handle) = layer.handle() {
229 new_store_info.layers.push(handle.object_id());
230 }
231 }
232
233 let reservation_update: ReservationUpdate; let mut end_transaction = parent_store
235 .new_transaction(
236 lock_keys![LockKey::object(
237 self.parent_store.as_ref().unwrap().store_object_id(),
238 self.store_info_handle_object_id().unwrap(),
239 )],
240 txn_options,
241 )
242 .await?;
243
244 parent_store.remove_from_graveyard(&mut end_transaction, new_object_tree_layer_object_id);
245
246 for layer in &old_layers {
248 if let Some(handle) = layer.handle() {
249 parent_store.add_to_graveyard(&mut end_transaction, handle.object_id());
250 }
251 }
252
253 let old_encrypted_mutations_object_id =
254 std::mem::replace(&mut new_store_info.encrypted_mutations_object_id, INVALID_OBJECT_ID);
255 if old_encrypted_mutations_object_id != INVALID_OBJECT_ID {
256 parent_store.add_to_graveyard(&mut end_transaction, old_encrypted_mutations_object_id);
257 }
258
259 match &mut new_store_info.last_object_id {
267 LastObjectIdInfo::Unencrypted { id } => {
268 let LastObjectId::Unencrypted { id: in_memory_value } =
269 &*self.last_object_id.lock()
270 else {
271 unreachable!()
272 };
273 *id = *in_memory_value;
274 }
275 LastObjectIdInfo::Encrypted { id, key } => {
276 let LastObjectId::Encrypted { id: in_memory_value, .. } =
277 &*self.last_object_id.lock()
278 else {
279 unreachable!()
280 };
281 *id = *in_memory_value;
282 let guard = self.store_info.lock();
283 let current_store_info = guard.as_ref().unwrap();
284 let LastObjectIdInfo::Encrypted { key: in_memory_value, .. } =
285 ¤t_store_info.last_object_id
286 else {
287 unreachable!()
288 };
289 *key = in_memory_value.clone();
290 }
291 LastObjectIdInfo::Low32Bit => {}
292 }
293
294 self.write_store_info(&mut end_transaction, &new_store_info).await?;
295
296 let layer_file_sizes = new_layers
297 .iter()
298 .map(|l| l.handle().map(ReadObjectHandle::get_size).unwrap_or(0))
299 .collect::<Vec<u64>>();
300
301 let total_layer_size = layer_file_sizes.iter().sum();
302 reservation_update =
303 ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size));
304
305 end_transaction.add_with_object(
306 self.store_object_id(),
307 Mutation::EndFlush,
308 AssocObj::Borrowed(&reservation_update),
309 );
310
311 if self.trace.load(Ordering::Relaxed) {
312 info!(
313 store_id = self.store_object_id(),
314 old_layer_count = old_layers.len(),
315 new_layer_count = new_layers.len(),
316 total_layer_size,
317 new_store_info:?;
318 "OS: compacting"
319 );
320 }
321
322 end_transaction
323 .commit_with_callback(|_| {
324 let mut store_info = self.store_info.lock();
325 let info = store_info.as_mut().unwrap();
326 info.layers = new_store_info.layers;
327 info.encrypted_mutations_object_id = new_store_info.encrypted_mutations_object_id;
328 info.mutations_cipher_offset = new_store_info.mutations_cipher_offset;
329 self.tree.set_layers(new_layers);
330 })
331 .await?;
332
333 for layer in old_layers {
335 let object_id = layer.handle().map(|h| h.object_id());
336 layer.close_layer().await;
337 if let Some(object_id) = object_id {
338 parent_store.tombstone_object(object_id, txn_options).await?;
339 }
340 }
341
342 if old_encrypted_mutations_object_id != INVALID_OBJECT_ID {
343 parent_store.tombstone_object(old_encrypted_mutations_object_id, txn_options).await?;
344 }
345
346 Ok(layer_file_sizes)
347 }
348
349 async fn flush_locked(&self) -> Result<(), Error> {
351 let filesystem = self.filesystem();
352 let object_manager = filesystem.object_manager();
353 let reservation = object_manager.metadata_reservation();
354 let txn_options = Options {
355 skip_journal_checks: true,
356 skip_key_roll: true,
357 borrow_metadata_space: true,
358 allocator_reservation: Some(reservation),
359 ..Default::default()
360 };
361
362 let mut transaction = self.new_transaction(lock_keys![], txn_options).await?;
363 transaction.add(self.store_object_id(), Mutation::BeginFlush);
364 transaction.commit().await?;
365
366 let mut new_store_info = self.load_store_info().await?;
367
368 let mut transaction = self.new_transaction(lock_keys![], txn_options).await?;
373
374 let reservation_update: ReservationUpdate; let handle; let mut end_transaction;
377
378 let parent_store = self.parent_store.as_ref().unwrap();
381 handle = if new_store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
382 let handle = ObjectStore::create_object(
383 parent_store,
384 &mut transaction,
385 HandleOptions { skip_journal_checks: true, ..Default::default() },
386 None,
387 )
388 .await?;
389 let oid = handle.object_id();
390 end_transaction = parent_store
391 .new_transaction(
392 lock_keys![
393 LockKey::object(parent_store.store_object_id(), oid),
394 LockKey::object(
395 parent_store.store_object_id(),
396 self.store_info_handle_object_id().unwrap(),
397 ),
398 ],
399 txn_options,
400 )
401 .await?;
402 new_store_info.encrypted_mutations_object_id = oid;
403 parent_store.add_to_graveyard(&mut transaction, oid);
404 parent_store.remove_from_graveyard(&mut end_transaction, oid);
405 handle
406 } else {
407 end_transaction = parent_store
408 .new_transaction(
409 lock_keys![
410 LockKey::object(
411 parent_store.store_object_id(),
412 new_store_info.encrypted_mutations_object_id,
413 ),
414 LockKey::object(
415 parent_store.store_object_id(),
416 self.store_info_handle_object_id().unwrap(),
417 ),
418 ],
419 txn_options,
420 )
421 .await?;
422 ObjectStore::open_object(
423 parent_store,
424 new_store_info.encrypted_mutations_object_id,
425 HandleOptions { skip_journal_checks: true, ..Default::default() },
426 None,
427 )
428 .await?
429 };
430 transaction.commit().await?;
431
432 let journaled = filesystem
435 .journal()
436 .read_transactions_for_object(self.store_object_id)
437 .await
438 .context("Failed to read encrypted mutations from journal")?;
439 let mut buffer = handle.allocate_buffer(MAX_ENCRYPTED_MUTATIONS_SIZE).await;
440 let mut cursor = std::io::Cursor::new(buffer.as_mut_slice());
441 EncryptedMutations::from_replayed_mutations(self.store_object_id, journaled)
442 .serialize_with_version(&mut cursor)?;
443 let len = cursor.position() as usize;
444 handle.txn_write(&mut end_transaction, handle.get_size(), buffer.subslice(..len)).await?;
445
446 self.write_store_info(&mut end_transaction, &new_store_info).await?;
447
448 let mut total_layer_size = 0;
449 for &oid in &new_store_info.layers {
450 total_layer_size += parent_store.get_file_size(oid).await?;
451 }
452 total_layer_size +=
453 layer_size_from_encrypted_mutations_size(handle.get_size() + len as u64);
454
455 reservation_update =
456 ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size));
457
458 end_transaction.add_with_object(
459 self.store_object_id(),
460 Mutation::EndFlush,
461 AssocObj::Borrowed(&reservation_update),
462 );
463
464 end_transaction.commit().await?;
465
466 Ok(())
467 }
468}
469
470#[cfg(test)]
471mod tests {
472 use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, JournalingObject, SyncOptions};
473 use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle};
474 use crate::object_store::directory::Directory;
475 use crate::object_store::transaction::{Options, lock_keys};
476 use crate::object_store::volume::root_volume;
477 use crate::object_store::{
478 HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions,
479 layer_size_from_encrypted_mutations_size, tree,
480 };
481 use fxfs_insecure_crypto::new_insecure_crypt;
482 use std::sync::Arc;
483 use storage_device::DeviceHolder;
484 use storage_device::fake_device::FakeDevice;
485
486 async fn run_key_roll_test(flush_before_unlock: bool) {
487 let device = DeviceHolder::new(FakeDevice::new(8192, 1024));
488 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
489 let store_id = {
490 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
491 root_volume
492 .new_volume(
493 "test",
494 NewChildStoreOptions {
495 options: StoreOptions {
496 crypt: Some(Arc::new(new_insecure_crypt())),
497 ..StoreOptions::default()
498 },
499 ..NewChildStoreOptions::default()
500 },
501 )
502 .await
503 .expect("new_volume failed")
504 .store_object_id()
505 };
506
507 fs.close().await.expect("close failed");
508 let device = fs.take_device().await;
509 device.reopen(false);
510
511 let fs = FxFilesystemBuilder::new()
512 .roll_metadata_key_byte_count(512 * 1024)
513 .open(device)
514 .await
515 .expect("open failed");
516
517 let (first_filename, last_filename) = {
518 let store = fs.object_manager().store(store_id).expect("store not found");
519 store.unlock(Arc::new(new_insecure_crypt())).await.expect("unlock failed");
520
521 let root_dir = Directory::open(&store, store.root_directory_object_id())
523 .await
524 .expect("open failed");
525
526 let mut last_mutations_cipher_offset = 0;
527 let mut i = 0;
528 let first_filename = format!("{:<200}", i);
529 loop {
530 let mut transaction = store
531 .new_transaction(
532 lock_keys![LockKey::object(store_id, root_dir.object_id())],
533 Options::default(),
534 )
535 .await
536 .expect("new_transaction failed");
537 root_dir
538 .create_child_file(&mut transaction, &format!("{:<200}", i))
539 .await
540 .expect("create_child_file failed");
541 i += 1;
542 transaction.commit().await.expect("commit failed");
543 let cipher_offset = store.mutations_cipher.lock().as_ref().unwrap().offset();
544 if cipher_offset < last_mutations_cipher_offset {
545 break;
546 }
547 last_mutations_cipher_offset = cipher_offset;
548 }
549
550 fs.sync(SyncOptions::default()).await.expect("sync failed");
553
554 let mut transaction = store
556 .new_transaction(
557 lock_keys![LockKey::object(store_id, root_dir.object_id())],
558 Options::default(),
559 )
560 .await
561 .expect("new_transaction failed");
562 let last_filename = format!("{:<200}", i);
563 root_dir
564 .create_child_file(&mut transaction, &last_filename)
565 .await
566 .expect("create_child_file failed");
567 transaction.commit().await.expect("commit failed");
568 (first_filename, last_filename)
569 };
570
571 fs.close().await.expect("close failed");
572
573 let device = fs.take_device().await;
575 device.reopen(false);
576 let fs = FxFilesystemBuilder::new()
577 .roll_metadata_key_byte_count(512 * 1024)
578 .open(device)
579 .await
580 .expect("open failed");
581
582 if flush_before_unlock {
583 fs.object_manager().flush().await.expect("flush failed");
586 }
587
588 {
589 let store = fs.object_manager().store(store_id).expect("store not found");
590 store.unlock(Arc::new(new_insecure_crypt())).await.expect("unlock failed");
591
592 assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
594
595 let root_dir = Directory::open(&store, store.root_directory_object_id())
596 .await
597 .expect("open failed");
598 root_dir
599 .lookup(&first_filename)
600 .await
601 .expect("Lookup failed")
602 .expect("First created file wasn't present");
603 root_dir
604 .lookup(&last_filename)
605 .await
606 .expect("Lookup failed")
607 .expect("Last created file wasn't present");
608 }
609 }
610
611 #[fuchsia::test(threads = 10)]
612 async fn test_metadata_key_roll() {
613 run_key_roll_test(false).await;
614 }
615
616 #[fuchsia::test(threads = 10)]
617 async fn test_metadata_key_roll_with_flush_before_unlock() {
618 run_key_roll_test(true).await;
619 }
620
621 #[fuchsia::test]
622 async fn test_flush_when_locked() {
623 let device = DeviceHolder::new(FakeDevice::new(8192, 1024));
624 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
625 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
626 let crypt = Arc::new(new_insecure_crypt());
627 let store = root_volume
628 .new_volume(
629 "test",
630 NewChildStoreOptions {
631 options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
632 ..NewChildStoreOptions::default()
633 },
634 )
635 .await
636 .expect("new_volume failed");
637 let root_dir =
638 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
639 let mut transaction = fs
640 .root_store()
641 .new_transaction(
642 lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
643 Options::default(),
644 )
645 .await
646 .expect("new_transaction failed");
647 let foo = root_dir
648 .create_child_file(&mut transaction, "foo")
649 .await
650 .expect("create_child_file failed");
651 transaction.commit().await.expect("commit failed");
652
653 store.flush().await.expect("flush failed");
657
658 let mut transaction = fs
659 .root_store()
660 .new_transaction(
661 lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
662 Options::default(),
663 )
664 .await
665 .expect("new_transaction failed");
666 let bar = root_dir
667 .create_child_file(&mut transaction, "bar")
668 .await
669 .expect("create_child_file failed");
670 transaction.commit().await.expect("commit failed");
671
672 store.lock().await.expect("lock failed");
673
674 store.flush().await.expect("flush failed");
676
677 let info = store.load_store_info().await.unwrap();
679 let parent_store = store.parent_store().unwrap();
680 let mut total_layer_size = 0;
681 for &oid in &info.layers {
682 total_layer_size +=
683 parent_store.get_file_size(oid).await.expect("get_file_size failed");
684 }
685 assert_ne!(info.encrypted_mutations_object_id, INVALID_OBJECT_ID);
686 total_layer_size += layer_size_from_encrypted_mutations_size(
687 parent_store
688 .get_file_size(info.encrypted_mutations_object_id)
689 .await
690 .expect("get_file_size failed"),
691 );
692 assert_eq!(
693 fs.object_manager().reservation(store.store_object_id()),
694 Some(tree::reservation_amount_from_layer_size(total_layer_size))
695 );
696
697 store.unlock(crypt).await.expect("unlock failed");
699
700 ObjectStore::open_object(&store, foo.object_id(), HandleOptions::default(), None)
701 .await
702 .expect("open_object failed");
703
704 ObjectStore::open_object(&store, bar.object_id(), HandleOptions::default(), None)
705 .await
706 .expect("open_object failed");
707
708 fs.close().await.expect("close failed");
709 }
710}
711
712impl tree::MajorCompactable<ObjectKey, ObjectValue> for LSMTree<ObjectKey, ObjectValue> {
713 async fn major_iter(
714 iter: impl LayerIterator<ObjectKey, ObjectValue>,
715 ) -> Result<impl LayerIterator<ObjectKey, ObjectValue>, Error> {
716 iter.filter(|item: ItemRef<'_, _, _>| match item {
717 ItemRef { value: ObjectValue::None, .. } => false,
719 ItemRef { value: ObjectValue::Extent(ExtentValue::None), .. } => false,
721 _ => true,
722 })
723 .await
724 }
725}