1use crate::errors::FxfsError;
6use crate::filesystem::{ApplyContext, ApplyMode, JournalingObject};
7use crate::log::*;
8use crate::metrics;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::allocator::{Allocator, Reservation};
11use crate::object_store::directory::Directory;
12use crate::object_store::journal::{self, JournalCheckpoint};
13use crate::object_store::transaction::{
14 AssocObj, AssociatedObject, MetadataReservation, Mutation, Transaction, TxnMutation,
15};
16use crate::object_store::tree_cache::TreeCache;
17use crate::object_store::volume::{list_volumes, VOLUMES_DIRECTORY};
18use crate::object_store::{ObjectDescriptor, ObjectStore};
19use crate::round::round_div;
20use crate::serialized_types::{Version, LATEST_VERSION};
21use anyhow::{anyhow, bail, ensure, Context, Error};
22use fuchsia_inspect::{Property as _, UintProperty};
23use fuchsia_sync::RwLock;
24use futures::FutureExt as _;
25use once_cell::sync::OnceCell;
26use rustc_hash::FxHashMap as HashMap;
27use std::collections::hash_map::Entry;
28use std::num::Saturating;
29use std::sync::Arc;
30
31pub const fn reserved_space_from_journal_usage(journal_usage: u64) -> u64 {
41 journal_usage * 4
42}
43
44pub struct ObjectManager {
46 inner: RwLock<Inner>,
47 metadata_reservation: OnceCell<Reservation>,
48 volume_directory: OnceCell<Directory<ObjectStore>>,
49 on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
50}
51
52#[derive(Debug)]
55enum Checkpoints {
56 Current(JournalCheckpoint),
57 Old(JournalCheckpoint),
58 Both(JournalCheckpoint, JournalCheckpoint),
59}
60
61impl Checkpoints {
62 fn earliest(&self) -> &JournalCheckpoint {
64 match self {
65 Checkpoints::Old(x) | Checkpoints::Both(x, _) | Checkpoints::Current(x) => x,
66 }
67 }
68}
69
70struct Inner {
73 stores: HashMap<u64, Arc<ObjectStore>>,
74 root_parent_store_object_id: u64,
75 root_store_object_id: u64,
76 allocator_object_id: u64,
77 allocator: Option<Arc<Allocator>>,
78
79 journal_checkpoints: HashMap<u64, Checkpoints>,
82
83 reservations: HashMap<u64, u64>,
89
90 last_end_offset: u64,
93
94 borrowed_metadata_space: u64,
97
98 max_transaction_size: (u64, UintProperty),
100
101 reserved_space: u64,
103}
104
105impl Inner {
106 fn earliest_journal_offset(&self) -> Option<u64> {
107 self.journal_checkpoints.values().map(|c| c.earliest().file_offset).min()
108 }
109
110 fn required_reservation(&self) -> u64 {
113 self.reservations.values().max().unwrap_or(&0)
115
116 + self.earliest_journal_offset()
119 .map(|min| reserved_space_from_journal_usage(self.last_end_offset - min))
120 .unwrap_or(0)
121
122 + self.reserved_space
124 }
125
126 fn object(&self, object_id: u64) -> Option<Arc<dyn JournalingObject>> {
127 if object_id == self.allocator_object_id {
128 Some(self.allocator.clone().unwrap() as Arc<dyn JournalingObject>)
129 } else {
130 self.stores.get(&object_id).map(|x| x.clone() as Arc<dyn JournalingObject>)
131 }
132 }
133}
134
135impl ObjectManager {
136 pub fn new(on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>) -> ObjectManager {
137 ObjectManager {
138 inner: RwLock::new(Inner {
139 stores: HashMap::default(),
140 root_parent_store_object_id: INVALID_OBJECT_ID,
141 root_store_object_id: INVALID_OBJECT_ID,
142 allocator_object_id: INVALID_OBJECT_ID,
143 allocator: None,
144 journal_checkpoints: HashMap::default(),
145 reservations: HashMap::default(),
146 last_end_offset: 0,
147 borrowed_metadata_space: 0,
148 max_transaction_size: (0, metrics::detail().create_uint("max_transaction_size", 0)),
149 reserved_space: journal::RESERVED_SPACE,
150 }),
151 metadata_reservation: OnceCell::new(),
152 volume_directory: OnceCell::new(),
153 on_new_store,
154 }
155 }
156
157 pub fn required_reservation(&self) -> u64 {
158 self.inner.read().required_reservation()
159 }
160
161 pub fn root_parent_store_object_id(&self) -> u64 {
162 self.inner.read().root_parent_store_object_id
163 }
164
165 pub fn root_parent_store(&self) -> Arc<ObjectStore> {
166 let inner = self.inner.read();
167 inner.stores.get(&inner.root_parent_store_object_id).unwrap().clone()
168 }
169
170 pub fn set_root_parent_store(&self, store: Arc<ObjectStore>) {
171 if let Some(on_new_store) = &self.on_new_store {
172 on_new_store(&store);
173 }
174 let mut inner = self.inner.write();
175 let store_id = store.store_object_id();
176 inner.stores.insert(store_id, store);
177 inner.root_parent_store_object_id = store_id;
178 }
179
180 pub fn root_store_object_id(&self) -> u64 {
181 self.inner.read().root_store_object_id
182 }
183
184 pub fn root_store(&self) -> Arc<ObjectStore> {
185 let inner = self.inner.read();
186 inner.stores.get(&inner.root_store_object_id).unwrap().clone()
187 }
188
189 pub fn set_root_store(&self, store: Arc<ObjectStore>) {
190 if let Some(on_new_store) = &self.on_new_store {
191 on_new_store(&store);
192 }
193 let mut inner = self.inner.write();
194 let store_id = store.store_object_id();
195 inner.stores.insert(store_id, store);
196 inner.root_store_object_id = store_id;
197 }
198
199 pub fn is_system_store(&self, store_id: u64) -> bool {
200 let inner = self.inner.read();
201 store_id == inner.root_store_object_id || store_id == inner.root_parent_store_object_id
202 }
203
204 pub fn store(&self, store_object_id: u64) -> Option<Arc<ObjectStore>> {
206 self.inner.read().stores.get(&store_object_id).cloned()
207 }
208
209 pub async fn on_replay_complete(&self) -> Result<(), Error> {
212 let root_store = self.root_store();
213
214 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
215 .await
216 .context("Unable to open root volume directory")?;
217
218 match root_directory.lookup(VOLUMES_DIRECTORY).await? {
219 None => bail!("Root directory not found"),
220 Some((object_id, ObjectDescriptor::Directory)) => {
221 let volume_directory = Directory::open(&root_store, object_id)
222 .await
223 .context("Unable to open volumes directory")?;
224 self.volume_directory.set(volume_directory).unwrap();
225 }
226 _ => {
227 bail!(anyhow!(FxfsError::Inconsistent)
228 .context("Unexpected type for volumes directory"))
229 }
230 }
231
232 let object_ids = list_volumes(self.volume_directory.get().unwrap())
233 .await
234 .context("Failed to list volumes")?;
235
236 for store_id in object_ids {
237 self.open_store(&root_store, store_id).await?;
238 }
239
240 self.init_metadata_reservation()
243 .context("Insufficient free space for metadata reservation.")?;
244
245 Ok(())
246 }
247
248 pub fn volume_directory(&self) -> &Directory<ObjectStore> {
249 self.volume_directory.get().unwrap()
250 }
251
252 pub fn set_volume_directory(&self, volume_directory: Directory<ObjectStore>) {
253 self.volume_directory.set(volume_directory).unwrap();
254 }
255
256 pub fn add_store(&self, store: Arc<ObjectStore>) {
257 if let Some(on_new_store) = &self.on_new_store {
258 on_new_store(&store);
259 }
260 let mut inner = self.inner.write();
261 let store_object_id = store.store_object_id();
262 assert_ne!(store_object_id, inner.root_parent_store_object_id);
263 assert_ne!(store_object_id, inner.root_store_object_id);
264 assert_ne!(store_object_id, inner.allocator_object_id);
265 inner.stores.insert(store_object_id, store);
266 }
267
268 pub fn forget_store(&self, store_object_id: u64) {
269 let mut inner = self.inner.write();
270 assert_ne!(store_object_id, inner.allocator_object_id);
271 inner.stores.remove(&store_object_id);
272 inner.reservations.remove(&store_object_id);
273 }
274
275 pub fn set_allocator(&self, allocator: Arc<Allocator>) {
276 let mut inner = self.inner.write();
277 assert!(!inner.stores.contains_key(&allocator.object_id()));
278 inner.allocator_object_id = allocator.object_id();
279 inner.allocator = Some(allocator);
280 }
281
282 pub fn allocator(&self) -> Arc<Allocator> {
283 self.inner.read().allocator.clone().unwrap()
284 }
285
286 pub fn apply_mutation(
288 &self,
289 object_id: u64,
290 mutation: Mutation,
291 context: &ApplyContext<'_, '_>,
292 associated_object: AssocObj<'_>,
293 ) -> Result<(), Error> {
294 debug!(oid = object_id, mutation:?; "applying mutation");
295 let object = {
296 let mut inner = self.inner.write();
297 match mutation {
298 Mutation::BeginFlush => {
299 if let Some(entry) = inner.journal_checkpoints.get_mut(&object_id) {
300 match entry {
301 Checkpoints::Current(x) | Checkpoints::Both(x, _) => {
302 *entry = Checkpoints::Old(x.clone());
303 }
304 _ => {}
305 }
306 }
307 }
308 Mutation::EndFlush => {
309 if let Entry::Occupied(mut o) = inner.journal_checkpoints.entry(object_id) {
310 let entry = o.get_mut();
311 match entry {
312 Checkpoints::Old(_) => {
313 o.remove();
314 }
315 Checkpoints::Both(_, x) => {
316 *entry = Checkpoints::Current(x.clone());
317 }
318 _ => {}
319 }
320 }
321 }
322 Mutation::DeleteVolume => {
323 inner.stores.remove(&object_id);
324 inner.reservations.remove(&object_id);
325 inner.journal_checkpoints.remove(&object_id);
326 return Ok(());
327 }
328 _ => {
329 if object_id != inner.root_parent_store_object_id {
330 inner
331 .journal_checkpoints
332 .entry(object_id)
333 .and_modify(|entry| {
334 if let Checkpoints::Old(x) = entry {
335 *entry =
336 Checkpoints::Both(x.clone(), context.checkpoint.clone());
337 }
338 })
339 .or_insert_with(|| Checkpoints::Current(context.checkpoint.clone()));
340 }
341 }
342 }
343 if object_id == inner.allocator_object_id {
344 inner.allocator.clone().unwrap() as Arc<dyn JournalingObject>
345 } else {
346 inner.stores.get(&object_id).unwrap().clone() as Arc<dyn JournalingObject>
347 }
348 };
349 associated_object.map(|o| o.will_apply_mutation(&mutation, object_id, self));
350 object.apply_mutation(mutation, context, associated_object)
351 }
352
353 pub async fn replay_mutations(
358 &self,
359 mutations: Vec<(u64, Mutation)>,
360 journal_offsets: &HashMap<u64, u64>,
361 context: &ApplyContext<'_, '_>,
362 end_offset: u64,
363 ) -> Result<(), Error> {
364 debug!(checkpoint = context.checkpoint.file_offset; "REPLAY");
365 let txn_size = {
366 let mut inner = self.inner.write();
367 if end_offset > inner.last_end_offset {
368 Some(end_offset - std::mem::replace(&mut inner.last_end_offset, end_offset))
369 } else {
370 None
371 }
372 };
373
374 let allocator_object_id = self.inner.read().allocator_object_id;
375
376 for (object_id, mutation) in mutations {
377 if let Mutation::UpdateBorrowed(borrowed) = mutation {
378 if let Some(txn_size) = txn_size {
379 self.inner.write().borrowed_metadata_space = borrowed
380 .checked_add(reserved_space_from_journal_usage(txn_size))
381 .ok_or(FxfsError::Inconsistent)?;
382 }
383 continue;
384 }
385
386 if let Some(&offset) = journal_offsets.get(&object_id) {
388 if context.checkpoint.file_offset < offset {
389 continue;
390 }
391 }
392
393 if object_id != allocator_object_id {
395 self.open_store(&self.root_store(), object_id).await?;
396 }
397
398 self.apply_mutation(object_id, mutation, context, AssocObj::None)?;
399 }
400 Ok(())
401 }
402
403 async fn open_store(&self, parent: &Arc<ObjectStore>, object_id: u64) -> Result<(), Error> {
405 if self.inner.read().stores.contains_key(&object_id) {
406 return Ok(());
407 }
408 let store = ObjectStore::open(parent, object_id, Box::new(TreeCache::new()))
409 .await
410 .with_context(|| format!("Failed to open store {object_id}"))?;
411 if let Some(on_new_store) = &self.on_new_store {
412 on_new_store(&store);
413 }
414 assert!(self.inner.write().stores.insert(object_id, store).is_none());
415 Ok(())
416 }
417
418 pub fn apply_transaction(
422 &self,
423 transaction: &mut Transaction<'_>,
424 checkpoint: &JournalCheckpoint,
425 ) -> Result<Option<Mutation>, Error> {
426 let old_amount = self.metadata_reservation().amount();
428 let old_required = self.inner.read().required_reservation();
429
430 debug!(checkpoint = checkpoint.file_offset; "BEGIN TXN");
431 let mutations = transaction.take_mutations();
432 let context =
433 ApplyContext { mode: ApplyMode::Live(transaction), checkpoint: checkpoint.clone() };
434 for TxnMutation { object_id, mutation, associated_object, .. } in mutations {
435 self.apply_mutation(object_id, mutation, &context, associated_object)?;
436 }
437 debug!("END TXN");
438
439 Ok(if let MetadataReservation::Borrowed = transaction.metadata_reservation {
440 let new_amount = self.metadata_reservation().amount();
445 let mut inner = self.inner.write();
446 let new_required = inner.required_reservation();
447 let add = old_amount + new_required;
448 let sub = new_amount + old_required;
449 if add >= sub {
450 inner.borrowed_metadata_space += add - sub;
451 } else {
452 inner.borrowed_metadata_space =
453 inner.borrowed_metadata_space.saturating_sub(sub - add);
454 }
455 Some(Mutation::UpdateBorrowed(inner.borrowed_metadata_space))
456 } else {
457 debug_assert_eq!(self.metadata_reservation().amount(), old_amount);
460 debug_assert_eq!(self.inner.read().required_reservation(), old_required);
461 None
462 })
463 }
464
465 pub fn did_commit_transaction(
468 &self,
469 transaction: &mut Transaction<'_>,
470 _checkpoint: &JournalCheckpoint,
471 end_offset: u64,
472 ) {
473 let reservation = self.metadata_reservation();
474 let mut inner = self.inner.write();
475 let journal_usage = end_offset - std::mem::replace(&mut inner.last_end_offset, end_offset);
476
477 if journal_usage > inner.max_transaction_size.0 {
478 inner.max_transaction_size.0 = journal_usage;
479 inner.max_transaction_size.1.set(journal_usage);
480 }
481
482 let txn_space = reserved_space_from_journal_usage(journal_usage);
483 match &mut transaction.metadata_reservation {
484 MetadataReservation::None => unreachable!(),
485 MetadataReservation::Borrowed => {
486 inner.borrowed_metadata_space += txn_space;
489
490 let to_give_back = (reservation.amount() + inner.borrowed_metadata_space)
493 .saturating_sub(inner.required_reservation());
494 if to_give_back > 0 {
495 reservation.give_back(to_give_back);
496 }
497 }
498 MetadataReservation::Hold(hold_amount) => {
499 let txn_reservation = transaction.allocator_reservation.unwrap();
501 assert_ne!(
502 txn_reservation as *const _, reservation as *const _,
503 "MetadataReservation::Borrowed should be used."
504 );
505 txn_reservation.commit(txn_space);
506 if txn_reservation.owner_object_id() != reservation.owner_object_id() {
507 assert_eq!(
508 reservation.owner_object_id(),
509 None,
510 "Should not be mixing attributed owners."
511 );
512 inner
513 .allocator
514 .as_ref()
515 .unwrap()
516 .disown_reservation(txn_reservation.owner_object_id(), txn_space);
517 }
518 if let Some(amount) = hold_amount.checked_sub(txn_space) {
519 *hold_amount = amount;
520 } else {
521 panic!("Transaction was larger than metadata reservation");
522 }
523 reservation.add(txn_space);
524 }
525 MetadataReservation::Reservation(txn_reservation) => {
526 txn_reservation.move_to(reservation, txn_space);
528 }
529 }
530 debug_assert_eq!(
532 reservation.amount() + inner.borrowed_metadata_space,
533 inner.required_reservation(),
534 "txn_space: {}, reservation_amount: {}, borrowed: {}, required: {}",
535 txn_space,
536 reservation.amount(),
537 inner.borrowed_metadata_space,
538 inner.required_reservation(),
539 );
540 }
541
542 pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
547 for TxnMutation { object_id, mutation, .. } in transaction.take_mutations() {
548 self.object(object_id).map(|o| o.drop_mutation(mutation, transaction));
549 }
550 }
551
552 pub fn journal_file_offsets(&self) -> (HashMap<u64, u64>, Option<JournalCheckpoint>) {
555 let inner = self.inner.read();
556 let mut min_checkpoint = None;
557 let mut offsets = HashMap::default();
558 for (&object_id, checkpoint) in &inner.journal_checkpoints {
559 let checkpoint = checkpoint.earliest();
560 match &mut min_checkpoint {
561 None => min_checkpoint = Some(checkpoint),
562 Some(ref mut min_checkpoint) => {
563 if checkpoint.file_offset < min_checkpoint.file_offset {
564 *min_checkpoint = checkpoint;
565 }
566 }
567 }
568 offsets.insert(object_id, checkpoint.file_offset);
569 }
570 (offsets, min_checkpoint.cloned())
571 }
572
573 pub fn journal_checkpoint(&self, object_id: u64) -> Option<JournalCheckpoint> {
576 self.inner
577 .read()
578 .journal_checkpoints
579 .get(&object_id)
580 .map(|checkpoints| checkpoints.earliest().clone())
581 }
582
583 pub fn needs_flush(&self, object_id: u64) -> bool {
586 self.inner.read().journal_checkpoints.contains_key(&object_id)
587 }
588
589 pub async fn flush(&self) -> Result<Version, Error> {
593 let objects = {
594 let inner = self.inner.read();
595 let mut object_ids = inner.journal_checkpoints.keys().cloned().collect::<Vec<_>>();
596 object_ids.sort_unstable();
600 object_ids
601 .iter()
602 .rev()
603 .map(|oid| (*oid, inner.object(*oid).unwrap()))
604 .collect::<Vec<_>>()
605 };
606
607 let mut earliest_version: Version = LATEST_VERSION;
609 for (object_id, object) in objects {
610 let object_earliest_version =
611 object.flush().await.with_context(|| format!("Failed to flush oid {object_id}"))?;
612 if object_earliest_version < earliest_version {
613 earliest_version = object_earliest_version;
614 }
615 }
616
617 Ok(earliest_version)
618 }
619
620 fn object(&self, object_id: u64) -> Option<Arc<dyn JournalingObject>> {
621 self.inner.read().object(object_id)
622 }
623
624 pub fn init_metadata_reservation(&self) -> Result<(), Error> {
625 let inner = self.inner.read();
626 let required = inner.required_reservation();
627 ensure!(required >= inner.borrowed_metadata_space, FxfsError::Inconsistent);
628 let allocator = inner.allocator.as_ref().cloned().unwrap();
629 self.metadata_reservation
630 .set(
631 allocator
632 .clone()
633 .reserve(None, inner.required_reservation() - inner.borrowed_metadata_space)
634 .with_context(|| {
635 format!(
636 "Failed to reserve {} - {} = {} bytes, free={}, \
637 owner_bytes={}",
638 inner.required_reservation(),
639 inner.borrowed_metadata_space,
640 inner.required_reservation() - inner.borrowed_metadata_space,
641 Saturating(allocator.get_disk_bytes()) - allocator.get_used_bytes(),
642 allocator.owner_bytes_debug(),
643 )
644 })?,
645 )
646 .unwrap();
647 Ok(())
648 }
649
650 pub fn metadata_reservation(&self) -> &Reservation {
651 self.metadata_reservation.get().unwrap()
652 }
653
654 pub fn update_reservation(&self, object_id: u64, amount: u64) {
655 self.inner.write().reservations.insert(object_id, amount);
656 }
657
658 pub fn reservation(&self, object_id: u64) -> Option<u64> {
659 self.inner.read().reservations.get(&object_id).cloned()
660 }
661
662 pub fn set_reserved_space(&self, amount: u64) {
663 self.inner.write().reserved_space = amount;
664 }
665
666 pub fn last_end_offset(&self) -> u64 {
667 self.inner.read().last_end_offset
668 }
669
670 pub fn set_last_end_offset(&self, v: u64) {
671 self.inner.write().last_end_offset = v;
672 }
673
674 pub fn borrowed_metadata_space(&self) -> u64 {
675 self.inner.read().borrowed_metadata_space
676 }
677
678 pub fn set_borrowed_metadata_space(&self, v: u64) {
679 self.inner.write().borrowed_metadata_space = v;
680 }
681
682 pub fn write_mutation(&self, object_id: u64, mutation: &Mutation, writer: journal::Writer<'_>) {
683 self.object(object_id).unwrap().write_mutation(mutation, writer);
684 }
685
686 pub fn unlocked_stores(&self) -> Vec<Arc<ObjectStore>> {
687 let inner = self.inner.read();
688 let mut stores = Vec::new();
689 for store in inner.stores.values() {
690 if !store.is_locked() {
691 stores.push(store.clone());
692 }
693 }
694 stores
695 }
696
697 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
700 let this = Arc::downgrade(self);
701 parent.record_lazy_child(name, move || {
702 let this_clone = this.clone();
703 async move {
704 let inspector = fuchsia_inspect::Inspector::default();
705 if let Some(this) = this_clone.upgrade() {
706 let (required, borrowed, earliest_checkpoint) = {
707 let inner = this.inner.read();
709 (
710 inner.required_reservation(),
711 inner.borrowed_metadata_space,
712 inner.earliest_journal_offset(),
713 )
714 };
715 let root = inspector.root();
716 root.record_uint("metadata_reservation", this.metadata_reservation().amount());
717 root.record_uint("required_reservation", required);
718 root.record_uint("borrowed_reservation", borrowed);
719 if let Some(earliest_checkpoint) = earliest_checkpoint {
720 root.record_uint("earliest_checkpoint", earliest_checkpoint);
721 }
722
723 if let Some(x) = round_div(100 * borrowed, required) {
725 root.record_uint("borrowed_to_required_reservation_percent", x);
726 }
727 }
728 Ok(inspector)
729 }
730 .boxed()
731 });
732 }
733
734 pub fn needs_borrow_for_journal(&self, checkpoint: u64) -> bool {
739 checkpoint.checked_sub(self.inner.read().last_end_offset).unwrap() > 256
740 }
741}
742
743pub struct ReservationUpdate(u64);
748
749impl ReservationUpdate {
750 pub fn new(amount: u64) -> Self {
751 Self(amount)
752 }
753}
754
755impl AssociatedObject for ReservationUpdate {
756 fn will_apply_mutation(&self, _mutation: &Mutation, object_id: u64, manager: &ObjectManager) {
757 manager.update_reservation(object_id, self.0);
758 }
759}