1use crate::errors::FxfsError;
6use crate::filesystem::{ApplyContext, ApplyMode, JournalingObject};
7use crate::log::*;
8use crate::metrics;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::allocator::{Allocator, Reservation};
11use crate::object_store::directory::Directory;
12use crate::object_store::journal::{self, JournalCheckpoint};
13use crate::object_store::transaction::{
14 AssocObj, AssociatedObject, MetadataReservation, Mutation, Transaction, TxnMutation,
15};
16use crate::object_store::tree_cache::TreeCache;
17use crate::object_store::volume::{VOLUMES_DIRECTORY, list_volumes};
18use crate::object_store::{ObjectDescriptor, ObjectStore};
19use crate::round::round_div;
20use crate::serialized_types::{LATEST_VERSION, Version};
21use anyhow::{Context, Error, anyhow, bail, ensure};
22use fuchsia_inspect::{Property as _, UintProperty};
23use fuchsia_sync::RwLock;
24use futures::FutureExt as _;
25use rustc_hash::FxHashMap as HashMap;
26use std::collections::hash_map::Entry;
27use std::num::Saturating;
28use std::sync::{Arc, OnceLock};
29
30pub const fn reserved_space_from_journal_usage(journal_usage: u64) -> u64 {
40 journal_usage * 4
41}
42
43pub struct ObjectManager {
45 inner: RwLock<Inner>,
46 metadata_reservation: OnceLock<Reservation>,
47 volume_directory: OnceLock<Directory<ObjectStore>>,
48 on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
49}
50
51#[derive(Debug)]
54enum Checkpoints {
55 Current(JournalCheckpoint),
56 Old(JournalCheckpoint),
57 Both(JournalCheckpoint, JournalCheckpoint),
58}
59
60impl Checkpoints {
61 fn earliest(&self) -> &JournalCheckpoint {
63 match self {
64 Checkpoints::Old(x) | Checkpoints::Both(x, _) | Checkpoints::Current(x) => x,
65 }
66 }
67}
68
69struct Inner {
72 stores: HashMap<u64, Arc<ObjectStore>>,
73 root_parent_store_object_id: u64,
74 root_store_object_id: u64,
75 allocator_object_id: u64,
76 allocator: Option<Arc<Allocator>>,
77
78 journal_checkpoints: HashMap<u64, Checkpoints>,
81
82 reservations: HashMap<u64, u64>,
88
89 last_end_offset: u64,
92
93 borrowed_metadata_space: u64,
96
97 max_transaction_size: (u64, UintProperty),
99
100 reserved_space: u64,
102}
103
104impl Inner {
105 fn earliest_journal_offset(&self) -> Option<u64> {
106 self.journal_checkpoints.values().map(|c| c.earliest().file_offset).min()
107 }
108
109 fn required_reservation(&self) -> u64 {
112 self.reservations.values().max().unwrap_or(&0)
114
115 + self.earliest_journal_offset()
118 .map(|min| reserved_space_from_journal_usage(self.last_end_offset - min))
119 .unwrap_or(0)
120
121 + self.reserved_space
123 }
124
125 fn object(&self, object_id: u64) -> Option<Arc<dyn JournalingObject>> {
126 if object_id == self.allocator_object_id {
127 Some(self.allocator.clone().unwrap() as Arc<dyn JournalingObject>)
128 } else {
129 self.stores.get(&object_id).map(|x| x.clone() as Arc<dyn JournalingObject>)
130 }
131 }
132}
133
134impl ObjectManager {
135 pub fn new(on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>) -> ObjectManager {
136 ObjectManager {
137 inner: RwLock::new(Inner {
138 stores: HashMap::default(),
139 root_parent_store_object_id: INVALID_OBJECT_ID,
140 root_store_object_id: INVALID_OBJECT_ID,
141 allocator_object_id: INVALID_OBJECT_ID,
142 allocator: None,
143 journal_checkpoints: HashMap::default(),
144 reservations: HashMap::default(),
145 last_end_offset: 0,
146 borrowed_metadata_space: 0,
147 max_transaction_size: (0, metrics::detail().create_uint("max_transaction_size", 0)),
148 reserved_space: journal::RESERVED_SPACE,
149 }),
150 metadata_reservation: OnceLock::new(),
151 volume_directory: OnceLock::new(),
152 on_new_store,
153 }
154 }
155
156 pub fn required_reservation(&self) -> u64 {
157 self.inner.read().required_reservation()
158 }
159
160 pub fn root_parent_store_object_id(&self) -> u64 {
161 self.inner.read().root_parent_store_object_id
162 }
163
164 pub fn root_parent_store(&self) -> Arc<ObjectStore> {
165 let inner = self.inner.read();
166 inner.stores.get(&inner.root_parent_store_object_id).unwrap().clone()
167 }
168
169 pub fn set_root_parent_store(&self, store: Arc<ObjectStore>) {
170 if let Some(on_new_store) = &self.on_new_store {
171 on_new_store(&store);
172 }
173 let mut inner = self.inner.write();
174 let store_id = store.store_object_id();
175 inner.stores.insert(store_id, store);
176 inner.root_parent_store_object_id = store_id;
177 }
178
179 pub fn root_store_object_id(&self) -> u64 {
180 self.inner.read().root_store_object_id
181 }
182
183 pub fn root_store(&self) -> Arc<ObjectStore> {
184 let inner = self.inner.read();
185 inner.stores.get(&inner.root_store_object_id).unwrap().clone()
186 }
187
188 pub fn set_root_store(&self, store: Arc<ObjectStore>) {
189 if let Some(on_new_store) = &self.on_new_store {
190 on_new_store(&store);
191 }
192 let mut inner = self.inner.write();
193 let store_id = store.store_object_id();
194 inner.stores.insert(store_id, store);
195 inner.root_store_object_id = store_id;
196 }
197
198 pub fn is_system_store(&self, store_id: u64) -> bool {
199 let inner = self.inner.read();
200 store_id == inner.root_store_object_id || store_id == inner.root_parent_store_object_id
201 }
202
203 pub fn store(&self, store_object_id: u64) -> Option<Arc<ObjectStore>> {
205 self.inner.read().stores.get(&store_object_id).cloned()
206 }
207
208 pub fn compaction_bytes_written(&self) -> u64 {
211 let inner = self.inner.read();
212 let mut total = 0;
213 if let Some(allocator) = &inner.allocator {
214 total += allocator.tree().compaction_bytes_written();
215 }
216 for store in inner.stores.values() {
217 total += store.tree().compaction_bytes_written();
218 }
219 total
220 }
221
222 pub async fn on_replay_complete(&self) -> Result<(), Error> {
225 let root_store = self.root_store();
226
227 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
228 .await
229 .context("Unable to open root volume directory")?;
230
231 match root_directory.lookup(VOLUMES_DIRECTORY).await? {
232 None => bail!("Root directory not found"),
233 Some((object_id, ObjectDescriptor::Directory, _)) => {
234 let volume_directory = Directory::open(&root_store, object_id)
235 .await
236 .context("Unable to open volumes directory")?;
237 self.volume_directory.set(volume_directory).unwrap();
238 }
239 _ => {
240 bail!(
241 anyhow!(FxfsError::Inconsistent)
242 .context("Unexpected type for volumes directory")
243 )
244 }
245 }
246
247 let object_ids = list_volumes(self.volume_directory.get().unwrap())
248 .await
249 .context("Failed to list volumes")?;
250
251 for store_id in object_ids {
252 self.open_store(&root_store, store_id).await?;
253 }
254
255 self.init_metadata_reservation()
258 .context("Insufficient free space for metadata reservation.")?;
259
260 Ok(())
261 }
262
263 pub fn volume_directory(&self) -> &Directory<ObjectStore> {
264 self.volume_directory.get().unwrap()
265 }
266
267 pub fn set_volume_directory(&self, volume_directory: Directory<ObjectStore>) {
268 self.volume_directory.set(volume_directory).unwrap();
269 }
270
271 pub fn add_store(&self, store: Arc<ObjectStore>) {
272 if let Some(on_new_store) = &self.on_new_store {
273 on_new_store(&store);
274 }
275 let mut inner = self.inner.write();
276 let store_object_id = store.store_object_id();
277 assert_ne!(store_object_id, inner.root_parent_store_object_id);
278 assert_ne!(store_object_id, inner.root_store_object_id);
279 assert_ne!(store_object_id, inner.allocator_object_id);
280 inner.stores.insert(store_object_id, store);
281 }
282
283 pub fn forget_store(&self, store_object_id: u64) {
284 let mut inner = self.inner.write();
285 assert_ne!(store_object_id, inner.allocator_object_id);
286 inner.stores.remove(&store_object_id);
287 inner.reservations.remove(&store_object_id);
288 }
289
290 pub fn set_allocator(&self, allocator: Arc<Allocator>) {
291 let mut inner = self.inner.write();
292 assert!(!inner.stores.contains_key(&allocator.object_id()));
293 inner.allocator_object_id = allocator.object_id();
294 inner.allocator = Some(allocator);
295 }
296
297 pub fn allocator(&self) -> Arc<Allocator> {
298 self.inner.read().allocator.clone().unwrap()
299 }
300
301 pub fn apply_mutation(
303 &self,
304 object_id: u64,
305 mutation: Mutation,
306 context: &ApplyContext<'_, '_>,
307 associated_object: AssocObj<'_>,
308 ) -> Result<(), Error> {
309 debug!(oid = object_id, mutation:?; "applying mutation");
310 let object = {
311 let mut inner = self.inner.write();
312 match mutation {
313 Mutation::BeginFlush => {
314 if let Some(entry) = inner.journal_checkpoints.get_mut(&object_id) {
315 match entry {
316 Checkpoints::Current(x) | Checkpoints::Both(x, _) => {
317 *entry = Checkpoints::Old(x.clone());
318 }
319 _ => {}
320 }
321 }
322 }
323 Mutation::EndFlush => {
324 if let Entry::Occupied(mut o) = inner.journal_checkpoints.entry(object_id) {
325 let entry = o.get_mut();
326 match entry {
327 Checkpoints::Old(_) => {
328 o.remove();
329 }
330 Checkpoints::Both(_, x) => {
331 *entry = Checkpoints::Current(x.clone());
332 }
333 _ => {}
334 }
335 }
336 }
337 Mutation::DeleteVolume => {
338 if let Some(store) = inner.stores.remove(&object_id) {
339 store.mark_deleted();
340 }
341 inner.reservations.remove(&object_id);
342 inner.journal_checkpoints.remove(&object_id);
343 return Ok(());
344 }
345 _ => {
346 if object_id != inner.root_parent_store_object_id {
347 inner
348 .journal_checkpoints
349 .entry(object_id)
350 .and_modify(|entry| {
351 if let Checkpoints::Old(x) = entry {
352 *entry =
353 Checkpoints::Both(x.clone(), context.checkpoint.clone());
354 }
355 })
356 .or_insert_with(|| Checkpoints::Current(context.checkpoint.clone()));
357 }
358 }
359 }
360 if object_id == inner.allocator_object_id {
361 inner.allocator.clone().unwrap() as Arc<dyn JournalingObject>
362 } else {
363 inner.stores.get(&object_id).unwrap().clone() as Arc<dyn JournalingObject>
364 }
365 };
366 associated_object.map(|o| o.will_apply_mutation(&mutation, object_id, self));
367 object.apply_mutation(mutation, context, associated_object)
368 }
369
370 pub async fn replay_mutations(
375 &self,
376 mutations: Vec<(u64, Mutation)>,
377 journal_offsets: &HashMap<u64, u64>,
378 context: &ApplyContext<'_, '_>,
379 end_offset: u64,
380 ) -> Result<(), Error> {
381 debug!(checkpoint = context.checkpoint.file_offset; "REPLAY");
382 let txn_size = {
383 let mut inner = self.inner.write();
384 if end_offset > inner.last_end_offset {
385 Some(end_offset - std::mem::replace(&mut inner.last_end_offset, end_offset))
386 } else {
387 None
388 }
389 };
390
391 let allocator_object_id = self.inner.read().allocator_object_id;
392
393 for (object_id, mutation) in mutations {
394 if let Mutation::UpdateBorrowed(borrowed) = mutation {
395 if let Some(txn_size) = txn_size {
396 self.inner.write().borrowed_metadata_space = borrowed
397 .checked_add(reserved_space_from_journal_usage(txn_size))
398 .ok_or(FxfsError::Inconsistent)?;
399 }
400 continue;
401 }
402
403 if let Some(&offset) = journal_offsets.get(&object_id) {
405 if context.checkpoint.file_offset < offset {
406 continue;
407 }
408 }
409
410 if object_id != allocator_object_id {
412 self.open_store(&self.root_store(), object_id).await?;
413 }
414
415 self.apply_mutation(object_id, mutation, context, AssocObj::None)?;
416 }
417 Ok(())
418 }
419
420 async fn open_store(&self, parent: &Arc<ObjectStore>, object_id: u64) -> Result<(), Error> {
422 if self.inner.read().stores.contains_key(&object_id) {
423 return Ok(());
424 }
425 let store = ObjectStore::open(parent, object_id, Box::new(TreeCache::new()))
426 .await
427 .with_context(|| format!("Failed to open store {object_id}"))?;
428 if let Some(on_new_store) = &self.on_new_store {
429 on_new_store(&store);
430 }
431 assert!(self.inner.write().stores.insert(object_id, store).is_none());
432 Ok(())
433 }
434
435 pub fn apply_transaction(
439 &self,
440 transaction: &mut Transaction<'_>,
441 checkpoint: &JournalCheckpoint,
442 ) -> Result<Option<Mutation>, Error> {
443 let old_amount = self.metadata_reservation().amount();
445 let old_required = self.inner.read().required_reservation();
446
447 debug!(checkpoint = checkpoint.file_offset; "BEGIN TXN");
448 let mutations = transaction.take_mutations();
449 let context =
450 ApplyContext { mode: ApplyMode::Live(transaction), checkpoint: checkpoint.clone() };
451 for TxnMutation { object_id, mutation, associated_object, .. } in mutations {
452 self.apply_mutation(object_id, mutation, &context, associated_object)?;
453 }
454 debug!("END TXN");
455
456 Ok(if let MetadataReservation::Borrowed = transaction.metadata_reservation {
457 let new_amount = self.metadata_reservation().amount();
462 let mut inner = self.inner.write();
463 let new_required = inner.required_reservation();
464 let add = old_amount + new_required;
465 let sub = new_amount + old_required;
466 if add >= sub {
467 inner.borrowed_metadata_space += add - sub;
468 } else {
469 inner.borrowed_metadata_space =
470 inner.borrowed_metadata_space.saturating_sub(sub - add);
471 }
472 Some(Mutation::UpdateBorrowed(inner.borrowed_metadata_space))
473 } else {
474 debug_assert_eq!(self.metadata_reservation().amount(), old_amount);
477 debug_assert_eq!(self.inner.read().required_reservation(), old_required);
478 None
479 })
480 }
481
482 pub fn did_commit_transaction(
485 &self,
486 transaction: &mut Transaction<'_>,
487 _checkpoint: &JournalCheckpoint,
488 end_offset: u64,
489 ) {
490 let reservation = self.metadata_reservation();
491 let mut inner = self.inner.write();
492 let journal_usage = end_offset - std::mem::replace(&mut inner.last_end_offset, end_offset);
493
494 if journal_usage > inner.max_transaction_size.0 {
495 inner.max_transaction_size.0 = journal_usage;
496 inner.max_transaction_size.1.set(journal_usage);
497 }
498
499 let txn_space = reserved_space_from_journal_usage(journal_usage);
500 match &mut transaction.metadata_reservation {
501 MetadataReservation::None => unreachable!(),
502 MetadataReservation::Borrowed => {
503 inner.borrowed_metadata_space += txn_space;
506
507 let to_give_back = (reservation.amount() + inner.borrowed_metadata_space)
510 .saturating_sub(inner.required_reservation());
511 if to_give_back > 0 {
512 reservation.give_back(to_give_back);
513 }
514 }
515 MetadataReservation::Hold(hold_amount) => {
516 let txn_reservation = transaction.allocator_reservation.unwrap();
518 assert_ne!(
519 txn_reservation as *const _, reservation as *const _,
520 "MetadataReservation::Borrowed should be used."
521 );
522 txn_reservation.commit(txn_space);
523 if txn_reservation.owner_object_id() != reservation.owner_object_id() {
524 assert_eq!(
525 reservation.owner_object_id(),
526 None,
527 "Should not be mixing attributed owners."
528 );
529 inner
530 .allocator
531 .as_ref()
532 .unwrap()
533 .disown_reservation(txn_reservation.owner_object_id(), txn_space);
534 }
535 if let Some(amount) = hold_amount.checked_sub(txn_space) {
536 *hold_amount = amount;
537 } else {
538 panic!("Transaction was larger than metadata reservation");
539 }
540 reservation.add(txn_space);
541 }
542 MetadataReservation::Reservation(txn_reservation) => {
543 txn_reservation.move_to(reservation, txn_space);
545 }
546 }
547 debug_assert_eq!(
549 reservation.amount() + inner.borrowed_metadata_space,
550 inner.required_reservation(),
551 "txn_space: {}, reservation_amount: {}, borrowed: {}, required: {}",
552 txn_space,
553 reservation.amount(),
554 inner.borrowed_metadata_space,
555 inner.required_reservation(),
556 );
557 }
558
559 pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
564 for TxnMutation { object_id, mutation, .. } in transaction.take_mutations() {
565 self.object(object_id).map(|o| o.drop_mutation(mutation, transaction));
566 }
567 }
568
569 pub fn journal_file_offsets(&self) -> (HashMap<u64, u64>, Option<JournalCheckpoint>) {
572 let inner = self.inner.read();
573 let mut min_checkpoint = None;
574 let mut offsets = HashMap::default();
575 for (&object_id, checkpoint) in &inner.journal_checkpoints {
576 let checkpoint = checkpoint.earliest();
577 match &mut min_checkpoint {
578 None => min_checkpoint = Some(checkpoint),
579 Some(min_checkpoint) => {
580 if checkpoint.file_offset < min_checkpoint.file_offset {
581 *min_checkpoint = checkpoint;
582 }
583 }
584 }
585 offsets.insert(object_id, checkpoint.file_offset);
586 }
587 (offsets, min_checkpoint.cloned())
588 }
589
590 pub fn journal_checkpoint(&self, object_id: u64) -> Option<JournalCheckpoint> {
593 self.inner
594 .read()
595 .journal_checkpoints
596 .get(&object_id)
597 .map(|checkpoints| checkpoints.earliest().clone())
598 }
599
600 pub fn needs_flush(&self, object_id: u64) -> bool {
603 self.inner.read().journal_checkpoints.contains_key(&object_id)
604 }
605
606 pub async fn flush(&self) -> Result<Version, Error> {
610 let objects = {
611 let inner = self.inner.read();
612 let mut object_ids = inner.journal_checkpoints.keys().cloned().collect::<Vec<_>>();
613 object_ids.sort_unstable();
617 object_ids
618 .iter()
619 .rev()
620 .map(|oid| (*oid, inner.object(*oid).unwrap()))
621 .collect::<Vec<_>>()
622 };
623
624 let mut earliest_version: Version = LATEST_VERSION;
626 for (object_id, object) in objects {
627 let object_earliest_version =
628 object.flush().await.with_context(|| format!("Failed to flush oid {object_id}"))?;
629 if object_earliest_version < earliest_version {
630 earliest_version = object_earliest_version;
631 }
632 }
633
634 Ok(earliest_version)
635 }
636
637 fn object(&self, object_id: u64) -> Option<Arc<dyn JournalingObject>> {
638 self.inner.read().object(object_id)
639 }
640
641 pub fn init_metadata_reservation(&self) -> Result<(), Error> {
642 if self.root_parent_store().filesystem().options().read_only {
643 return Ok(());
645 }
646 let inner = self.inner.read();
647 let required = inner.required_reservation();
648 ensure!(required >= inner.borrowed_metadata_space, FxfsError::Inconsistent);
649 let allocator = inner.allocator.as_ref().cloned().unwrap();
650 self.metadata_reservation
651 .set(
652 allocator
653 .clone()
654 .reserve(None, inner.required_reservation() - inner.borrowed_metadata_space)
655 .with_context(|| {
656 format!(
657 "Failed to reserve {} - {} = {} bytes, free={}, \
658 owner_bytes={}",
659 inner.required_reservation(),
660 inner.borrowed_metadata_space,
661 inner.required_reservation() - inner.borrowed_metadata_space,
662 Saturating(allocator.get_disk_bytes()) - allocator.get_used_bytes(),
663 allocator.owner_bytes_debug(),
664 )
665 })?,
666 )
667 .unwrap();
668 Ok(())
669 }
670
671 pub fn metadata_reservation(&self) -> &Reservation {
672 self.metadata_reservation.get().unwrap()
673 }
674
675 pub fn update_reservation(&self, object_id: u64, amount: u64) {
676 self.inner.write().reservations.insert(object_id, amount);
677 }
678
679 pub fn reservation(&self, object_id: u64) -> Option<u64> {
680 self.inner.read().reservations.get(&object_id).cloned()
681 }
682
683 pub fn set_reserved_space(&self, amount: u64) {
684 self.inner.write().reserved_space = amount;
685 }
686
687 pub fn last_end_offset(&self) -> u64 {
688 self.inner.read().last_end_offset
689 }
690
691 pub fn set_last_end_offset(&self, v: u64) {
692 self.inner.write().last_end_offset = v;
693 }
694
695 pub fn borrowed_metadata_space(&self) -> u64 {
696 self.inner.read().borrowed_metadata_space
697 }
698
699 pub fn set_borrowed_metadata_space(&self, v: u64) {
700 self.inner.write().borrowed_metadata_space = v;
701 }
702
703 pub fn write_mutation(&self, object_id: u64, mutation: &Mutation, writer: journal::Writer<'_>) {
704 self.object(object_id).unwrap().write_mutation(mutation, writer);
705 }
706
707 pub fn unlocked_stores(&self) -> Vec<Arc<ObjectStore>> {
708 let inner = self.inner.read();
709 let mut stores = Vec::new();
710 for store in inner.stores.values() {
711 if !store.is_locked() {
712 stores.push(store.clone());
713 }
714 }
715 stores
716 }
717
718 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
721 let this = Arc::downgrade(self);
722 parent.record_lazy_child(name, move || {
723 let this_clone = this.clone();
724 async move {
725 let inspector = fuchsia_inspect::Inspector::default();
726 if let Some(this) = this_clone.upgrade() {
727 let (required, borrowed, earliest_checkpoint) = {
728 let inner = this.inner.read();
730 (
731 inner.required_reservation(),
732 inner.borrowed_metadata_space,
733 inner.earliest_journal_offset(),
734 )
735 };
736 let root = inspector.root();
737 if let Some(reservation) = this.metadata_reservation.get() {
738 root.record_uint("metadata_reservation", reservation.amount());
739 }
740 root.record_uint("required_reservation", required);
741 root.record_uint("borrowed_reservation", borrowed);
742 if let Some(earliest_checkpoint) = earliest_checkpoint {
743 root.record_uint("earliest_checkpoint", earliest_checkpoint);
744 }
745
746 if let Some(x) = round_div(100 * borrowed, required) {
748 root.record_uint("borrowed_to_required_reservation_percent", x);
749 }
750 }
751 Ok(inspector)
752 }
753 .boxed()
754 });
755 }
756
757 pub fn needs_borrow_for_journal(&self, checkpoint: u64) -> bool {
762 checkpoint.checked_sub(self.inner.read().last_end_offset).unwrap() > 256
763 }
764}
765
766pub struct ReservationUpdate(u64);
771
772impl ReservationUpdate {
773 pub fn new(amount: u64) -> Self {
774 Self(amount)
775 }
776}
777
778impl AssociatedObject for ReservationUpdate {
779 fn will_apply_mutation(&self, _mutation: &Mutation, object_id: u64, manager: &ObjectManager) {
780 manager.update_reservation(object_id, self.0);
781 }
782}