1use crate::errors::FxfsError;
6use crate::filesystem::{ApplyContext, ApplyMode, JournalingObject};
7use crate::log::*;
8use crate::metrics;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::allocator::{Allocator, Reservation};
11use crate::object_store::directory::Directory;
12use crate::object_store::journal::{self, JournalCheckpoint};
13use crate::object_store::transaction::{
14 AssocObj, AssociatedObject, MetadataReservation, Mutation, Transaction, TxnMutation,
15};
16use crate::object_store::tree_cache::TreeCache;
17use crate::object_store::volume::{VOLUMES_DIRECTORY, list_volumes};
18use crate::object_store::{ObjectDescriptor, ObjectStore};
19use crate::round::round_div;
20use crate::serialized_types::{LATEST_VERSION, Version};
21use anyhow::{Context, Error, anyhow, bail, ensure};
22use fuchsia_inspect::{Property as _, UintProperty};
23use fuchsia_sync::RwLock;
24use futures::FutureExt as _;
25use once_cell::sync::OnceCell;
26use rustc_hash::FxHashMap as HashMap;
27use std::collections::hash_map::Entry;
28use std::num::Saturating;
29use std::sync::Arc;
30
31pub const fn reserved_space_from_journal_usage(journal_usage: u64) -> u64 {
41 journal_usage * 4
42}
43
44pub struct ObjectManager {
46 inner: RwLock<Inner>,
47 metadata_reservation: OnceCell<Reservation>,
48 volume_directory: OnceCell<Directory<ObjectStore>>,
49 on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
50}
51
52#[derive(Debug)]
55enum Checkpoints {
56 Current(JournalCheckpoint),
57 Old(JournalCheckpoint),
58 Both(JournalCheckpoint, JournalCheckpoint),
59}
60
61impl Checkpoints {
62 fn earliest(&self) -> &JournalCheckpoint {
64 match self {
65 Checkpoints::Old(x) | Checkpoints::Both(x, _) | Checkpoints::Current(x) => x,
66 }
67 }
68}
69
70struct Inner {
73 stores: HashMap<u64, Arc<ObjectStore>>,
74 root_parent_store_object_id: u64,
75 root_store_object_id: u64,
76 allocator_object_id: u64,
77 allocator: Option<Arc<Allocator>>,
78
79 journal_checkpoints: HashMap<u64, Checkpoints>,
82
83 reservations: HashMap<u64, u64>,
89
90 last_end_offset: u64,
93
94 borrowed_metadata_space: u64,
97
98 max_transaction_size: (u64, UintProperty),
100
101 reserved_space: u64,
103}
104
105impl Inner {
106 fn earliest_journal_offset(&self) -> Option<u64> {
107 self.journal_checkpoints.values().map(|c| c.earliest().file_offset).min()
108 }
109
110 fn required_reservation(&self) -> u64 {
113 self.reservations.values().max().unwrap_or(&0)
115
116 + self.earliest_journal_offset()
119 .map(|min| reserved_space_from_journal_usage(self.last_end_offset - min))
120 .unwrap_or(0)
121
122 + self.reserved_space
124 }
125
126 fn object(&self, object_id: u64) -> Option<Arc<dyn JournalingObject>> {
127 if object_id == self.allocator_object_id {
128 Some(self.allocator.clone().unwrap() as Arc<dyn JournalingObject>)
129 } else {
130 self.stores.get(&object_id).map(|x| x.clone() as Arc<dyn JournalingObject>)
131 }
132 }
133}
134
135impl ObjectManager {
136 pub fn new(on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>) -> ObjectManager {
137 ObjectManager {
138 inner: RwLock::new(Inner {
139 stores: HashMap::default(),
140 root_parent_store_object_id: INVALID_OBJECT_ID,
141 root_store_object_id: INVALID_OBJECT_ID,
142 allocator_object_id: INVALID_OBJECT_ID,
143 allocator: None,
144 journal_checkpoints: HashMap::default(),
145 reservations: HashMap::default(),
146 last_end_offset: 0,
147 borrowed_metadata_space: 0,
148 max_transaction_size: (0, metrics::detail().create_uint("max_transaction_size", 0)),
149 reserved_space: journal::RESERVED_SPACE,
150 }),
151 metadata_reservation: OnceCell::new(),
152 volume_directory: OnceCell::new(),
153 on_new_store,
154 }
155 }
156
157 pub fn required_reservation(&self) -> u64 {
158 self.inner.read().required_reservation()
159 }
160
161 pub fn root_parent_store_object_id(&self) -> u64 {
162 self.inner.read().root_parent_store_object_id
163 }
164
165 pub fn root_parent_store(&self) -> Arc<ObjectStore> {
166 let inner = self.inner.read();
167 inner.stores.get(&inner.root_parent_store_object_id).unwrap().clone()
168 }
169
170 pub fn set_root_parent_store(&self, store: Arc<ObjectStore>) {
171 if let Some(on_new_store) = &self.on_new_store {
172 on_new_store(&store);
173 }
174 let mut inner = self.inner.write();
175 let store_id = store.store_object_id();
176 inner.stores.insert(store_id, store);
177 inner.root_parent_store_object_id = store_id;
178 }
179
180 pub fn root_store_object_id(&self) -> u64 {
181 self.inner.read().root_store_object_id
182 }
183
184 pub fn root_store(&self) -> Arc<ObjectStore> {
185 let inner = self.inner.read();
186 inner.stores.get(&inner.root_store_object_id).unwrap().clone()
187 }
188
189 pub fn set_root_store(&self, store: Arc<ObjectStore>) {
190 if let Some(on_new_store) = &self.on_new_store {
191 on_new_store(&store);
192 }
193 let mut inner = self.inner.write();
194 let store_id = store.store_object_id();
195 inner.stores.insert(store_id, store);
196 inner.root_store_object_id = store_id;
197 }
198
199 pub fn is_system_store(&self, store_id: u64) -> bool {
200 let inner = self.inner.read();
201 store_id == inner.root_store_object_id || store_id == inner.root_parent_store_object_id
202 }
203
204 pub fn store(&self, store_object_id: u64) -> Option<Arc<ObjectStore>> {
206 self.inner.read().stores.get(&store_object_id).cloned()
207 }
208
209 pub async fn on_replay_complete(&self) -> Result<(), Error> {
212 let root_store = self.root_store();
213
214 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
215 .await
216 .context("Unable to open root volume directory")?;
217
218 match root_directory.lookup(VOLUMES_DIRECTORY).await? {
219 None => bail!("Root directory not found"),
220 Some((object_id, ObjectDescriptor::Directory, _)) => {
221 let volume_directory = Directory::open(&root_store, object_id)
222 .await
223 .context("Unable to open volumes directory")?;
224 self.volume_directory.set(volume_directory).unwrap();
225 }
226 _ => {
227 bail!(
228 anyhow!(FxfsError::Inconsistent)
229 .context("Unexpected type for volumes directory")
230 )
231 }
232 }
233
234 let object_ids = list_volumes(self.volume_directory.get().unwrap())
235 .await
236 .context("Failed to list volumes")?;
237
238 for store_id in object_ids {
239 self.open_store(&root_store, store_id).await?;
240 }
241
242 self.init_metadata_reservation()
245 .context("Insufficient free space for metadata reservation.")?;
246
247 Ok(())
248 }
249
250 pub fn volume_directory(&self) -> &Directory<ObjectStore> {
251 self.volume_directory.get().unwrap()
252 }
253
254 pub fn set_volume_directory(&self, volume_directory: Directory<ObjectStore>) {
255 self.volume_directory.set(volume_directory).unwrap();
256 }
257
258 pub fn add_store(&self, store: Arc<ObjectStore>) {
259 if let Some(on_new_store) = &self.on_new_store {
260 on_new_store(&store);
261 }
262 let mut inner = self.inner.write();
263 let store_object_id = store.store_object_id();
264 assert_ne!(store_object_id, inner.root_parent_store_object_id);
265 assert_ne!(store_object_id, inner.root_store_object_id);
266 assert_ne!(store_object_id, inner.allocator_object_id);
267 inner.stores.insert(store_object_id, store);
268 }
269
270 pub fn forget_store(&self, store_object_id: u64) {
271 let mut inner = self.inner.write();
272 assert_ne!(store_object_id, inner.allocator_object_id);
273 inner.stores.remove(&store_object_id);
274 inner.reservations.remove(&store_object_id);
275 }
276
277 pub fn set_allocator(&self, allocator: Arc<Allocator>) {
278 let mut inner = self.inner.write();
279 assert!(!inner.stores.contains_key(&allocator.object_id()));
280 inner.allocator_object_id = allocator.object_id();
281 inner.allocator = Some(allocator);
282 }
283
284 pub fn allocator(&self) -> Arc<Allocator> {
285 self.inner.read().allocator.clone().unwrap()
286 }
287
288 pub fn apply_mutation(
290 &self,
291 object_id: u64,
292 mutation: Mutation,
293 context: &ApplyContext<'_, '_>,
294 associated_object: AssocObj<'_>,
295 ) -> Result<(), Error> {
296 debug!(oid = object_id, mutation:?; "applying mutation");
297 let object = {
298 let mut inner = self.inner.write();
299 match mutation {
300 Mutation::BeginFlush => {
301 if let Some(entry) = inner.journal_checkpoints.get_mut(&object_id) {
302 match entry {
303 Checkpoints::Current(x) | Checkpoints::Both(x, _) => {
304 *entry = Checkpoints::Old(x.clone());
305 }
306 _ => {}
307 }
308 }
309 }
310 Mutation::EndFlush => {
311 if let Entry::Occupied(mut o) = inner.journal_checkpoints.entry(object_id) {
312 let entry = o.get_mut();
313 match entry {
314 Checkpoints::Old(_) => {
315 o.remove();
316 }
317 Checkpoints::Both(_, x) => {
318 *entry = Checkpoints::Current(x.clone());
319 }
320 _ => {}
321 }
322 }
323 }
324 Mutation::DeleteVolume => {
325 if let Some(store) = inner.stores.remove(&object_id) {
326 store.mark_deleted();
327 }
328 inner.reservations.remove(&object_id);
329 inner.journal_checkpoints.remove(&object_id);
330 return Ok(());
331 }
332 _ => {
333 if object_id != inner.root_parent_store_object_id {
334 inner
335 .journal_checkpoints
336 .entry(object_id)
337 .and_modify(|entry| {
338 if let Checkpoints::Old(x) = entry {
339 *entry =
340 Checkpoints::Both(x.clone(), context.checkpoint.clone());
341 }
342 })
343 .or_insert_with(|| Checkpoints::Current(context.checkpoint.clone()));
344 }
345 }
346 }
347 if object_id == inner.allocator_object_id {
348 inner.allocator.clone().unwrap() as Arc<dyn JournalingObject>
349 } else {
350 inner.stores.get(&object_id).unwrap().clone() as Arc<dyn JournalingObject>
351 }
352 };
353 associated_object.map(|o| o.will_apply_mutation(&mutation, object_id, self));
354 object.apply_mutation(mutation, context, associated_object)
355 }
356
357 pub async fn replay_mutations(
362 &self,
363 mutations: Vec<(u64, Mutation)>,
364 journal_offsets: &HashMap<u64, u64>,
365 context: &ApplyContext<'_, '_>,
366 end_offset: u64,
367 ) -> Result<(), Error> {
368 debug!(checkpoint = context.checkpoint.file_offset; "REPLAY");
369 let txn_size = {
370 let mut inner = self.inner.write();
371 if end_offset > inner.last_end_offset {
372 Some(end_offset - std::mem::replace(&mut inner.last_end_offset, end_offset))
373 } else {
374 None
375 }
376 };
377
378 let allocator_object_id = self.inner.read().allocator_object_id;
379
380 for (object_id, mutation) in mutations {
381 if let Mutation::UpdateBorrowed(borrowed) = mutation {
382 if let Some(txn_size) = txn_size {
383 self.inner.write().borrowed_metadata_space = borrowed
384 .checked_add(reserved_space_from_journal_usage(txn_size))
385 .ok_or(FxfsError::Inconsistent)?;
386 }
387 continue;
388 }
389
390 if let Some(&offset) = journal_offsets.get(&object_id) {
392 if context.checkpoint.file_offset < offset {
393 continue;
394 }
395 }
396
397 if object_id != allocator_object_id {
399 self.open_store(&self.root_store(), object_id).await?;
400 }
401
402 self.apply_mutation(object_id, mutation, context, AssocObj::None)?;
403 }
404 Ok(())
405 }
406
407 async fn open_store(&self, parent: &Arc<ObjectStore>, object_id: u64) -> Result<(), Error> {
409 if self.inner.read().stores.contains_key(&object_id) {
410 return Ok(());
411 }
412 let store = ObjectStore::open(parent, object_id, Box::new(TreeCache::new()))
413 .await
414 .with_context(|| format!("Failed to open store {object_id}"))?;
415 if let Some(on_new_store) = &self.on_new_store {
416 on_new_store(&store);
417 }
418 assert!(self.inner.write().stores.insert(object_id, store).is_none());
419 Ok(())
420 }
421
422 pub fn apply_transaction(
426 &self,
427 transaction: &mut Transaction<'_>,
428 checkpoint: &JournalCheckpoint,
429 ) -> Result<Option<Mutation>, Error> {
430 let old_amount = self.metadata_reservation().amount();
432 let old_required = self.inner.read().required_reservation();
433
434 debug!(checkpoint = checkpoint.file_offset; "BEGIN TXN");
435 let mutations = transaction.take_mutations();
436 let context =
437 ApplyContext { mode: ApplyMode::Live(transaction), checkpoint: checkpoint.clone() };
438 for TxnMutation { object_id, mutation, associated_object, .. } in mutations {
439 self.apply_mutation(object_id, mutation, &context, associated_object)?;
440 }
441 debug!("END TXN");
442
443 Ok(if let MetadataReservation::Borrowed = transaction.metadata_reservation {
444 let new_amount = self.metadata_reservation().amount();
449 let mut inner = self.inner.write();
450 let new_required = inner.required_reservation();
451 let add = old_amount + new_required;
452 let sub = new_amount + old_required;
453 if add >= sub {
454 inner.borrowed_metadata_space += add - sub;
455 } else {
456 inner.borrowed_metadata_space =
457 inner.borrowed_metadata_space.saturating_sub(sub - add);
458 }
459 Some(Mutation::UpdateBorrowed(inner.borrowed_metadata_space))
460 } else {
461 debug_assert_eq!(self.metadata_reservation().amount(), old_amount);
464 debug_assert_eq!(self.inner.read().required_reservation(), old_required);
465 None
466 })
467 }
468
469 pub fn did_commit_transaction(
472 &self,
473 transaction: &mut Transaction<'_>,
474 _checkpoint: &JournalCheckpoint,
475 end_offset: u64,
476 ) {
477 let reservation = self.metadata_reservation();
478 let mut inner = self.inner.write();
479 let journal_usage = end_offset - std::mem::replace(&mut inner.last_end_offset, end_offset);
480
481 if journal_usage > inner.max_transaction_size.0 {
482 inner.max_transaction_size.0 = journal_usage;
483 inner.max_transaction_size.1.set(journal_usage);
484 }
485
486 let txn_space = reserved_space_from_journal_usage(journal_usage);
487 match &mut transaction.metadata_reservation {
488 MetadataReservation::None => unreachable!(),
489 MetadataReservation::Borrowed => {
490 inner.borrowed_metadata_space += txn_space;
493
494 let to_give_back = (reservation.amount() + inner.borrowed_metadata_space)
497 .saturating_sub(inner.required_reservation());
498 if to_give_back > 0 {
499 reservation.give_back(to_give_back);
500 }
501 }
502 MetadataReservation::Hold(hold_amount) => {
503 let txn_reservation = transaction.allocator_reservation.unwrap();
505 assert_ne!(
506 txn_reservation as *const _, reservation as *const _,
507 "MetadataReservation::Borrowed should be used."
508 );
509 txn_reservation.commit(txn_space);
510 if txn_reservation.owner_object_id() != reservation.owner_object_id() {
511 assert_eq!(
512 reservation.owner_object_id(),
513 None,
514 "Should not be mixing attributed owners."
515 );
516 inner
517 .allocator
518 .as_ref()
519 .unwrap()
520 .disown_reservation(txn_reservation.owner_object_id(), txn_space);
521 }
522 if let Some(amount) = hold_amount.checked_sub(txn_space) {
523 *hold_amount = amount;
524 } else {
525 panic!("Transaction was larger than metadata reservation");
526 }
527 reservation.add(txn_space);
528 }
529 MetadataReservation::Reservation(txn_reservation) => {
530 txn_reservation.move_to(reservation, txn_space);
532 }
533 }
534 debug_assert_eq!(
536 reservation.amount() + inner.borrowed_metadata_space,
537 inner.required_reservation(),
538 "txn_space: {}, reservation_amount: {}, borrowed: {}, required: {}",
539 txn_space,
540 reservation.amount(),
541 inner.borrowed_metadata_space,
542 inner.required_reservation(),
543 );
544 }
545
546 pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
551 for TxnMutation { object_id, mutation, .. } in transaction.take_mutations() {
552 self.object(object_id).map(|o| o.drop_mutation(mutation, transaction));
553 }
554 }
555
556 pub fn journal_file_offsets(&self) -> (HashMap<u64, u64>, Option<JournalCheckpoint>) {
559 let inner = self.inner.read();
560 let mut min_checkpoint = None;
561 let mut offsets = HashMap::default();
562 for (&object_id, checkpoint) in &inner.journal_checkpoints {
563 let checkpoint = checkpoint.earliest();
564 match &mut min_checkpoint {
565 None => min_checkpoint = Some(checkpoint),
566 Some(min_checkpoint) => {
567 if checkpoint.file_offset < min_checkpoint.file_offset {
568 *min_checkpoint = checkpoint;
569 }
570 }
571 }
572 offsets.insert(object_id, checkpoint.file_offset);
573 }
574 (offsets, min_checkpoint.cloned())
575 }
576
577 pub fn journal_checkpoint(&self, object_id: u64) -> Option<JournalCheckpoint> {
580 self.inner
581 .read()
582 .journal_checkpoints
583 .get(&object_id)
584 .map(|checkpoints| checkpoints.earliest().clone())
585 }
586
587 pub fn needs_flush(&self, object_id: u64) -> bool {
590 self.inner.read().journal_checkpoints.contains_key(&object_id)
591 }
592
593 pub async fn flush(&self) -> Result<Version, Error> {
597 let objects = {
598 let inner = self.inner.read();
599 let mut object_ids = inner.journal_checkpoints.keys().cloned().collect::<Vec<_>>();
600 object_ids.sort_unstable();
604 object_ids
605 .iter()
606 .rev()
607 .map(|oid| (*oid, inner.object(*oid).unwrap()))
608 .collect::<Vec<_>>()
609 };
610
611 let mut earliest_version: Version = LATEST_VERSION;
613 for (object_id, object) in objects {
614 let object_earliest_version =
615 object.flush().await.with_context(|| format!("Failed to flush oid {object_id}"))?;
616 if object_earliest_version < earliest_version {
617 earliest_version = object_earliest_version;
618 }
619 }
620
621 Ok(earliest_version)
622 }
623
624 fn object(&self, object_id: u64) -> Option<Arc<dyn JournalingObject>> {
625 self.inner.read().object(object_id)
626 }
627
628 pub fn init_metadata_reservation(&self) -> Result<(), Error> {
629 let inner = self.inner.read();
630 let required = inner.required_reservation();
631 ensure!(required >= inner.borrowed_metadata_space, FxfsError::Inconsistent);
632 let allocator = inner.allocator.as_ref().cloned().unwrap();
633 self.metadata_reservation
634 .set(
635 allocator
636 .clone()
637 .reserve(None, inner.required_reservation() - inner.borrowed_metadata_space)
638 .with_context(|| {
639 format!(
640 "Failed to reserve {} - {} = {} bytes, free={}, \
641 owner_bytes={}",
642 inner.required_reservation(),
643 inner.borrowed_metadata_space,
644 inner.required_reservation() - inner.borrowed_metadata_space,
645 Saturating(allocator.get_disk_bytes()) - allocator.get_used_bytes(),
646 allocator.owner_bytes_debug(),
647 )
648 })?,
649 )
650 .unwrap();
651 Ok(())
652 }
653
654 pub fn metadata_reservation(&self) -> &Reservation {
655 self.metadata_reservation.get().unwrap()
656 }
657
658 pub fn update_reservation(&self, object_id: u64, amount: u64) {
659 self.inner.write().reservations.insert(object_id, amount);
660 }
661
662 pub fn reservation(&self, object_id: u64) -> Option<u64> {
663 self.inner.read().reservations.get(&object_id).cloned()
664 }
665
666 pub fn set_reserved_space(&self, amount: u64) {
667 self.inner.write().reserved_space = amount;
668 }
669
670 pub fn last_end_offset(&self) -> u64 {
671 self.inner.read().last_end_offset
672 }
673
674 pub fn set_last_end_offset(&self, v: u64) {
675 self.inner.write().last_end_offset = v;
676 }
677
678 pub fn borrowed_metadata_space(&self) -> u64 {
679 self.inner.read().borrowed_metadata_space
680 }
681
682 pub fn set_borrowed_metadata_space(&self, v: u64) {
683 self.inner.write().borrowed_metadata_space = v;
684 }
685
686 pub fn write_mutation(&self, object_id: u64, mutation: &Mutation, writer: journal::Writer<'_>) {
687 self.object(object_id).unwrap().write_mutation(mutation, writer);
688 }
689
690 pub fn unlocked_stores(&self) -> Vec<Arc<ObjectStore>> {
691 let inner = self.inner.read();
692 let mut stores = Vec::new();
693 for store in inner.stores.values() {
694 if !store.is_locked() {
695 stores.push(store.clone());
696 }
697 }
698 stores
699 }
700
701 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
704 let this = Arc::downgrade(self);
705 parent.record_lazy_child(name, move || {
706 let this_clone = this.clone();
707 async move {
708 let inspector = fuchsia_inspect::Inspector::default();
709 if let Some(this) = this_clone.upgrade() {
710 let (required, borrowed, earliest_checkpoint) = {
711 let inner = this.inner.read();
713 (
714 inner.required_reservation(),
715 inner.borrowed_metadata_space,
716 inner.earliest_journal_offset(),
717 )
718 };
719 let root = inspector.root();
720 root.record_uint("metadata_reservation", this.metadata_reservation().amount());
721 root.record_uint("required_reservation", required);
722 root.record_uint("borrowed_reservation", borrowed);
723 if let Some(earliest_checkpoint) = earliest_checkpoint {
724 root.record_uint("earliest_checkpoint", earliest_checkpoint);
725 }
726
727 if let Some(x) = round_div(100 * borrowed, required) {
729 root.record_uint("borrowed_to_required_reservation_percent", x);
730 }
731 }
732 Ok(inspector)
733 }
734 .boxed()
735 });
736 }
737
738 pub fn needs_borrow_for_journal(&self, checkpoint: u64) -> bool {
743 checkpoint.checked_sub(self.inner.read().last_end_offset).unwrap() > 256
744 }
745}
746
747pub struct ReservationUpdate(u64);
752
753impl ReservationUpdate {
754 pub fn new(amount: u64) -> Self {
755 Self(amount)
756 }
757}
758
759impl AssociatedObject for ReservationUpdate {
760 fn will_apply_mutation(&self, _mutation: &Mutation, object_id: u64, manager: &ObjectManager) {
761 manager.update_reservation(object_id, self.0);
762 }
763}