1use crate::errors::FxfsError;
6use crate::log::*;
7use crate::lsm_tree::merge::{Merger, MergerIterator};
8use crate::lsm_tree::types::{ItemRef, LayerIterator};
9use crate::lsm_tree::Query;
10use crate::object_handle::INVALID_OBJECT_ID;
11use crate::object_store::object_manager::ObjectManager;
12use crate::object_store::object_record::{
13 ObjectAttributes, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue, Timestamp,
14};
15use crate::object_store::transaction::{Mutation, Options, Transaction};
16use crate::object_store::ObjectStore;
17use anyhow::{anyhow, bail, Context, Error};
18use fuchsia_async::{self as fasync};
19use fuchsia_sync::Mutex;
20use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
21use futures::channel::oneshot;
22use futures::StreamExt;
23use std::sync::Arc;
24
25enum ReaperTask {
26 None,
27 Pending(UnboundedReceiver<Message>),
28 Running(fasync::Task<()>),
29}
30
31pub struct Graveyard {
37 object_manager: Arc<ObjectManager>,
38 reaper_task: Mutex<ReaperTask>,
39 channel: UnboundedSender<Message>,
40}
41
42enum Message {
43 Tombstone(u64, u64, Option<u64>),
46
47 Trim(u64, u64),
49
50 Flush(oneshot::Sender<()>),
53}
54
55#[fxfs_trace::trace]
56impl Graveyard {
57 pub fn new(object_manager: Arc<ObjectManager>) -> Arc<Self> {
59 let (sender, receiver) = unbounded();
60 Arc::new(Graveyard {
61 object_manager,
62 reaper_task: Mutex::new(ReaperTask::Pending(receiver)),
63 channel: sender,
64 })
65 }
66
67 pub fn create(transaction: &mut Transaction<'_>, store: &ObjectStore) -> u64 {
69 let object_id = store.maybe_get_next_object_id();
70 assert_ne!(object_id, INVALID_OBJECT_ID);
73 let now = Timestamp::now();
74 transaction.add(
75 store.store_object_id,
76 Mutation::insert_object(
77 ObjectKey::object(object_id),
78 ObjectValue::Object {
79 kind: ObjectKind::Graveyard,
80 attributes: ObjectAttributes {
81 creation_time: now.clone(),
82 modification_time: now,
83 project_id: 0,
84 ..Default::default()
85 },
86 },
87 ),
88 );
89 object_id
90 }
91
92 pub fn reap_async(self: Arc<Self>) {
97 let mut reaper_task = self.reaper_task.lock();
98 if let ReaperTask::Pending(_) = &*reaper_task {
99 if let ReaperTask::Pending(receiver) =
100 std::mem::replace(&mut *reaper_task, ReaperTask::None)
101 {
102 *reaper_task =
103 ReaperTask::Running(fasync::Task::spawn(self.clone().reap_task(receiver)));
104 } else {
105 unreachable!();
106 }
107 }
108 }
109
110 pub async fn wait_for_reap(&self) {
112 self.channel.close_channel();
113 let task = std::mem::replace(&mut *self.reaper_task.lock(), ReaperTask::None);
114 if let ReaperTask::Running(task) = task {
115 task.await;
116 }
117 }
118
119 async fn reap_task(self: Arc<Self>, mut receiver: UnboundedReceiver<Message>) {
120 while let Some(message) = receiver.next().await {
122 match message {
123 Message::Tombstone(store_id, object_id, attribute_id) => {
124 let res = if let Some(attribute_id) = attribute_id {
125 self.tombstone_attribute(store_id, object_id, attribute_id).await
126 } else {
127 self.tombstone_object(store_id, object_id).await
128 };
129 if let Err(e) = res {
130 error!(error:? = e, store_id, oid = object_id, attribute_id; "Tombstone error");
131 }
132 }
133 Message::Trim(store_id, object_id) => {
134 if let Err(e) = self.trim(store_id, object_id).await {
135 error!(error:? = e, store_id, oid = object_id; "Tombstone error");
136 }
137 }
138 Message::Flush(sender) => {
139 let _ = sender.send(());
140 }
141 }
142 }
143 }
144
145 #[trace]
151 pub async fn initial_reap(self: &Arc<Self>, store: &ObjectStore) -> Result<usize, Error> {
152 if store.filesystem().options().skip_initial_reap {
153 return Ok(0);
154 }
155 let mut count = 0;
156 let layer_set = store.tree().layer_set();
157 let mut merger = layer_set.merger();
158 let graveyard_object_id = store.graveyard_directory_object_id();
159 let mut iter = Self::iter(graveyard_object_id, &mut merger).await?;
160 let store_id = store.store_object_id();
161 while let Some(GraveyardEntryInfo { object_id, attribute_id, sequence: _, value }) =
164 iter.get()
165 {
166 match value {
167 ObjectValue::Some => {
168 if let Some(attribute_id) = attribute_id {
169 self.queue_tombstone_attribute(store_id, object_id, attribute_id)
170 } else {
171 self.queue_tombstone_object(store_id, object_id)
172 }
173 }
174 ObjectValue::Trim => {
175 if attribute_id.is_some() {
176 return Err(anyhow!(
177 "Trim is not currently supported for a single attribute"
178 ));
179 }
180 self.queue_trim(store_id, object_id)
181 }
182 _ => bail!(anyhow!(FxfsError::Inconsistent).context("Bad graveyard value")),
183 }
184 count += 1;
185 iter.advance().await?;
186 }
187 Ok(count)
188 }
189
190 pub fn queue_tombstone_object(&self, store_id: u64, object_id: u64) {
192 let _ = self.channel.unbounded_send(Message::Tombstone(store_id, object_id, None));
193 }
194
195 pub fn queue_tombstone_attribute(&self, store_id: u64, object_id: u64, attribute_id: u64) {
197 let _ = self.channel.unbounded_send(Message::Tombstone(
198 store_id,
199 object_id,
200 Some(attribute_id),
201 ));
202 }
203
204 fn queue_trim(&self, store_id: u64, object_id: u64) {
205 let _ = self.channel.unbounded_send(Message::Trim(store_id, object_id));
206 }
207
208 pub async fn flush(&self) {
210 let (sender, receiver) = oneshot::channel::<()>();
211 self.channel.unbounded_send(Message::Flush(sender)).unwrap();
212 receiver.await.unwrap();
213 }
214
215 pub async fn tombstone_object(&self, store_id: u64, object_id: u64) -> Result<(), Error> {
218 let store = self
219 .object_manager
220 .store(store_id)
221 .with_context(|| format!("Failed to get store {}", store_id))?;
222 let options = if store_id == self.object_manager.root_parent_store_object_id()
226 || store_id == self.object_manager.root_store_object_id()
227 {
228 Options {
229 skip_journal_checks: true,
230 borrow_metadata_space: true,
231 allocator_reservation: Some(self.object_manager.metadata_reservation()),
232 ..Default::default()
233 }
234 } else {
235 Options { skip_journal_checks: true, borrow_metadata_space: true, ..Default::default() }
236 };
237 store.tombstone_object(object_id, options).await
238 }
239
240 pub async fn tombstone_attribute(
243 &self,
244 store_id: u64,
245 object_id: u64,
246 attribute_id: u64,
247 ) -> Result<(), Error> {
248 let store = self
249 .object_manager
250 .store(store_id)
251 .with_context(|| format!("Failed to get store {}", store_id))?;
252 let options = if store_id == self.object_manager.root_parent_store_object_id()
256 || store_id == self.object_manager.root_store_object_id()
257 {
258 Options {
259 skip_journal_checks: true,
260 borrow_metadata_space: true,
261 allocator_reservation: Some(self.object_manager.metadata_reservation()),
262 ..Default::default()
263 }
264 } else {
265 Options { skip_journal_checks: true, borrow_metadata_space: true, ..Default::default() }
266 };
267 store.tombstone_attribute(object_id, attribute_id, options).await
268 }
269
270 async fn trim(&self, store_id: u64, object_id: u64) -> Result<(), Error> {
271 let store = self
272 .object_manager
273 .store(store_id)
274 .with_context(|| format!("Failed to get store {}", store_id))?;
275 store.trim(object_id).await.context("Failed to trim object")
276 }
277
278 pub async fn iter<'a, 'b>(
286 graveyard_object_id: u64,
287 merger: &'a mut Merger<'b, ObjectKey, ObjectValue>,
288 ) -> Result<GraveyardIterator<'a, 'b>, Error> {
289 Self::iter_from(merger, graveyard_object_id, 0).await
290 }
291
292 async fn iter_from<'a, 'b>(
299 merger: &'a mut Merger<'b, ObjectKey, ObjectValue>,
300 graveyard_object_id: u64,
301 from: u64,
302 ) -> Result<GraveyardIterator<'a, 'b>, Error> {
303 GraveyardIterator::new(
304 graveyard_object_id,
305 merger
306 .query(Query::FullRange(&ObjectKey::graveyard_entry(graveyard_object_id, from)))
307 .await?,
308 )
309 .await
310 }
311}
312
313pub struct GraveyardIterator<'a, 'b> {
314 object_id: u64,
315 iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>,
316}
317
318#[derive(Debug, PartialEq)]
321pub struct GraveyardEntryInfo {
322 object_id: u64,
323 attribute_id: Option<u64>,
324 sequence: u64,
325 value: ObjectValue,
326}
327
328impl GraveyardEntryInfo {
329 pub fn object_id(&self) -> u64 {
330 self.object_id
331 }
332
333 pub fn attribute_id(&self) -> Option<u64> {
334 self.attribute_id
335 }
336
337 pub fn value(&self) -> &ObjectValue {
338 &self.value
339 }
340}
341
342impl<'a, 'b> GraveyardIterator<'a, 'b> {
343 async fn new(
344 object_id: u64,
345 iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>,
346 ) -> Result<GraveyardIterator<'a, 'b>, Error> {
347 let mut iter = GraveyardIterator { object_id, iter };
348 iter.skip_deleted_entries().await?;
349 Ok(iter)
350 }
351
352 async fn skip_deleted_entries(&mut self) -> Result<(), Error> {
353 loop {
354 match self.iter.get() {
355 Some(ItemRef {
356 key: ObjectKey { object_id, .. },
357 value: ObjectValue::None,
358 ..
359 }) if *object_id == self.object_id => {}
360 _ => return Ok(()),
361 }
362 self.iter.advance().await?;
363 }
364 }
365
366 pub fn get(&self) -> Option<GraveyardEntryInfo> {
367 match self.iter.get() {
368 Some(ItemRef {
369 key: ObjectKey { object_id: oid, data: ObjectKeyData::GraveyardEntry { object_id } },
370 value,
371 sequence,
372 ..
373 }) if *oid == self.object_id => Some(GraveyardEntryInfo {
374 object_id: *object_id,
375 attribute_id: None,
376 sequence,
377 value: value.clone(),
378 }),
379 Some(ItemRef {
380 key:
381 ObjectKey {
382 object_id: oid,
383 data: ObjectKeyData::GraveyardAttributeEntry { object_id, attribute_id },
384 },
385 value,
386 sequence,
387 }) if *oid == self.object_id => Some(GraveyardEntryInfo {
388 object_id: *object_id,
389 attribute_id: Some(*attribute_id),
390 sequence,
391 value: value.clone(),
392 }),
393 _ => None,
394 }
395 }
396
397 pub async fn advance(&mut self) -> Result<(), Error> {
398 self.iter.advance().await?;
399 self.skip_deleted_entries().await
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use super::{Graveyard, GraveyardEntryInfo, ObjectStore};
406 use crate::errors::FxfsError;
407 use crate::filesystem::{FxFilesystem, FxFilesystemBuilder};
408 use crate::fsck::fsck;
409 use crate::object_handle::ObjectHandle;
410 use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
411 use crate::object_store::object_record::ObjectValue;
412 use crate::object_store::transaction::{lock_keys, Options};
413 use crate::object_store::{HandleOptions, Mutation, ObjectKey, FSVERITY_MERKLE_ATTRIBUTE_ID};
414 use assert_matches::assert_matches;
415 use storage_device::fake_device::FakeDevice;
416 use storage_device::DeviceHolder;
417
418 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
419
420 #[fuchsia::test]
421 async fn test_graveyard() {
422 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
423 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
424 let root_store = fs.root_store();
425
426 let mut transaction = fs
428 .clone()
429 .new_transaction(lock_keys![], Options::default())
430 .await
431 .expect("new_transaction failed");
432
433 root_store.add_to_graveyard(&mut transaction, 3);
434 root_store.add_to_graveyard(&mut transaction, 4);
435 transaction.commit().await.expect("commit failed");
436
437 {
439 let layer_set = root_store.tree().layer_set();
440 let mut merger = layer_set.merger();
441 let mut iter = Graveyard::iter(root_store.graveyard_directory_object_id(), &mut merger)
442 .await
443 .expect("iter failed");
444 assert_matches!(
445 iter.get().expect("missing entry"),
446 GraveyardEntryInfo {
447 object_id: 3,
448 attribute_id: None,
449 value: ObjectValue::Some,
450 ..
451 }
452 );
453 iter.advance().await.expect("advance failed");
454 assert_matches!(
455 iter.get().expect("missing entry"),
456 GraveyardEntryInfo {
457 object_id: 4,
458 attribute_id: None,
459 value: ObjectValue::Some,
460 ..
461 }
462 );
463 iter.advance().await.expect("advance failed");
464 assert_eq!(iter.get(), None);
465 }
466
467 let mut transaction = fs
469 .clone()
470 .new_transaction(lock_keys![], Options::default())
471 .await
472 .expect("new_transaction failed");
473 root_store.remove_from_graveyard(&mut transaction, 4);
474 transaction.commit().await.expect("commit failed");
475
476 let layer_set = root_store.tree().layer_set();
478 let mut merger = layer_set.merger();
479 let mut iter = Graveyard::iter(root_store.graveyard_directory_object_id(), &mut merger)
480 .await
481 .expect("iter failed");
482 assert_matches!(
483 iter.get().expect("missing entry"),
484 GraveyardEntryInfo { object_id: 3, attribute_id: None, value: ObjectValue::Some, .. }
485 );
486 iter.advance().await.expect("advance failed");
487 assert_eq!(iter.get(), None);
488 }
489
490 #[fuchsia::test]
491 async fn test_tombstone_attribute() {
492 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
493 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
494 let root_store = fs.root_store();
495 let mut transaction = fs
496 .clone()
497 .new_transaction(lock_keys![], Options::default())
498 .await
499 .expect("new_transaction failed");
500
501 let handle = ObjectStore::create_object(
502 &root_store,
503 &mut transaction,
504 HandleOptions::default(),
505 None,
506 )
507 .await
508 .expect("failed to create object");
509 transaction.commit().await.expect("commit failed");
510
511 handle
512 .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &[0; 8192])
513 .await
514 .expect("failed to write merkle attribute");
515 let object_id = handle.object_id();
516 let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
517 transaction.add(
518 root_store.store_object_id(),
519 Mutation::replace_or_insert_object(
520 ObjectKey::graveyard_attribute_entry(
521 root_store.graveyard_directory_object_id(),
522 object_id,
523 FSVERITY_MERKLE_ATTRIBUTE_ID,
524 ),
525 ObjectValue::Some,
526 ),
527 );
528
529 transaction.commit().await.expect("commit failed");
530
531 fs.close().await.expect("failed to close filesystem");
532 let device = fs.take_device().await;
533 device.reopen(false);
534
535 let fs =
536 FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
537 fsck(fs.clone()).await.expect("fsck failed");
538 fs.close().await.expect("failed to close filesystem");
539 let device = fs.take_device().await;
540 device.reopen(false);
541
542 let fs = FxFilesystem::open(device).await.expect("open failed");
544 fs.graveyard().wait_for_reap().await;
546 let root_store = fs.root_store();
547
548 let handle =
549 ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
550 .await
551 .expect("failed to open object");
552
553 assert_eq!(
554 handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
555 None
556 );
557 fsck(fs.clone()).await.expect("fsck failed");
558 }
559
560 #[fuchsia::test]
561 async fn test_tombstone_attribute_and_object() {
562 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
563 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
564 let root_store = fs.root_store();
565 let mut transaction = fs
566 .clone()
567 .new_transaction(lock_keys![], Options::default())
568 .await
569 .expect("new_transaction failed");
570
571 let handle = ObjectStore::create_object(
572 &root_store,
573 &mut transaction,
574 HandleOptions::default(),
575 None,
576 )
577 .await
578 .expect("failed to create object");
579 transaction.commit().await.expect("commit failed");
580
581 handle
582 .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &[0; 8192])
583 .await
584 .expect("failed to write merkle attribute");
585 let object_id = handle.object_id();
586 let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
587 transaction.add(
588 root_store.store_object_id(),
589 Mutation::replace_or_insert_object(
590 ObjectKey::graveyard_attribute_entry(
591 root_store.graveyard_directory_object_id(),
592 object_id,
593 FSVERITY_MERKLE_ATTRIBUTE_ID,
594 ),
595 ObjectValue::Some,
596 ),
597 );
598 transaction.commit().await.expect("commit failed");
599 let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
600 transaction.add(
601 root_store.store_object_id(),
602 Mutation::replace_or_insert_object(
603 ObjectKey::graveyard_entry(root_store.graveyard_directory_object_id(), object_id),
604 ObjectValue::Some,
605 ),
606 );
607 transaction.commit().await.expect("commit failed");
608
609 fs.close().await.expect("failed to close filesystem");
610 let device = fs.take_device().await;
611 device.reopen(false);
612
613 let fs =
614 FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
615 fsck(fs.clone()).await.expect("fsck failed");
616 fs.close().await.expect("failed to close filesystem");
617 let device = fs.take_device().await;
618 device.reopen(false);
619
620 let fs = FxFilesystem::open(device).await.expect("open failed");
622 fs.graveyard().wait_for_reap().await;
624
625 let root_store = fs.root_store();
626 if let Err(e) =
627 ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None).await
628 {
629 assert!(FxfsError::NotFound.matches(&e));
630 } else {
631 panic!("open_object succeeded");
632 };
633 fsck(fs.clone()).await.expect("fsck failed");
634 }
635
636 #[fuchsia::test]
637 async fn test_tombstone_large_attribute() {
638 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
639 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
640 let root_store = fs.root_store();
641 let mut transaction = fs
642 .clone()
643 .new_transaction(lock_keys![], Options::default())
644 .await
645 .expect("new_transaction failed");
646
647 let handle = ObjectStore::create_object(
648 &root_store,
649 &mut transaction,
650 HandleOptions::default(),
651 None,
652 )
653 .await
654 .expect("failed to create object");
655 transaction.commit().await.expect("commit failed");
656
657 let object_id = {
658 let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
659 transaction.add(
660 root_store.store_object_id(),
661 Mutation::replace_or_insert_object(
662 ObjectKey::graveyard_attribute_entry(
663 root_store.graveyard_directory_object_id(),
664 handle.object_id(),
665 FSVERITY_MERKLE_ATTRIBUTE_ID,
666 ),
667 ObjectValue::Some,
668 ),
669 );
670
671 handle
674 .write_new_attr_in_batches(
675 &mut transaction,
676 FSVERITY_MERKLE_ATTRIBUTE_ID,
677 &vec![0; 3 * WRITE_ATTR_BATCH_SIZE],
678 WRITE_ATTR_BATCH_SIZE,
679 )
680 .await
681 .expect("failed to write merkle attribute");
682
683 handle.object_id()
684 };
687
688 fs.close().await.expect("failed to close filesystem");
689 let device = fs.take_device().await;
690 device.reopen(false);
691
692 let fs =
693 FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
694 fsck(fs.clone()).await.expect("fsck failed");
695 fs.close().await.expect("failed to close filesystem");
696 let device = fs.take_device().await;
697 device.reopen(false);
698
699 let fs = FxFilesystem::open(device).await.expect("open failed");
701 fs.graveyard().wait_for_reap().await;
703
704 let root_store = fs.root_store();
705
706 let handle =
707 ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
708 .await
709 .expect("failed to open object");
710
711 assert_eq!(
712 handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
713 None
714 );
715 fsck(fs.clone()).await.expect("fsck failed");
716 }
717}