Skip to main content

fxfs/object_store/
graveyard.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::log::*;
7use crate::lsm_tree::Query;
8use crate::lsm_tree::merge::{Merger, MergerIterator};
9use crate::lsm_tree::types::{ItemRef, LayerIterator};
10use crate::object_store::ObjectStore;
11use crate::object_store::object_manager::ObjectManager;
12use crate::object_store::object_record::{
13    ObjectAttributes, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue, Timestamp,
14};
15use crate::object_store::transaction::{Mutation, Options, Transaction};
16use anyhow::{Context, Error, anyhow, bail};
17use fuchsia_async::{self as fasync};
18use fuchsia_sync::Mutex;
19use futures::StreamExt;
20use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender, unbounded};
21use futures::channel::oneshot;
22use fxfs_trace::{TraceFutureExt, trace_future_args};
23use std::sync::Arc;
24use std::sync::atomic::Ordering;
25
26enum ReaperTask {
27    None,
28    Pending(UnboundedReceiver<Message>),
29    Running(fasync::Task<()>),
30}
31
32/// A graveyard exists as a place to park objects that should be deleted when they are no longer in
33/// use.  How objects enter and leave the graveyard is up to the caller to decide.  The intention is
34/// that at mount time, any objects in the graveyard will get removed.  Each object store has a
35/// directory like object that contains a list of the objects within that store that are part of the
36/// graveyard.  A single instance of this Graveyard struct manages *all* stores.
37pub struct Graveyard {
38    object_manager: Arc<ObjectManager>,
39    reaper_task: Mutex<ReaperTask>,
40    channel: UnboundedSender<Message>,
41}
42
43enum Message {
44    // Tombstone the object identified by <store-id>, <object-id>, Option<attribute-id>. If
45    // <attribute-id> is Some, tombstone just the attribute instead of the entire object.
46    Tombstone(u64, u64, Option<u64>),
47
48    // Trims the identified object.
49    Trim(u64, u64),
50
51    // When the flush message is processed, notifies sender.  This allows the receiver to know
52    // that all preceding tombstone messages have been processed.
53    Flush(oneshot::Sender<()>),
54}
55
56#[fxfs_trace::trace]
57impl Graveyard {
58    /// Creates a new instance of the graveyard manager.
59    pub fn new(object_manager: Arc<ObjectManager>) -> Arc<Self> {
60        let (sender, receiver) = unbounded();
61        Arc::new(Graveyard {
62            object_manager,
63            reaper_task: Mutex::new(ReaperTask::Pending(receiver)),
64            channel: sender,
65        })
66    }
67
68    /// Creates a graveyard object in `store`.  Returns the object ID for the graveyard object.
69    pub async fn create(
70        transaction: &mut Transaction<'_>,
71        store: &ObjectStore,
72    ) -> Result<u64, Error> {
73        let reserved_object_id = store.get_next_object_id(transaction.txn_guard()).await?;
74        let object_id = reserved_object_id.get();
75        let now = Timestamp::now();
76        transaction.add(
77            store.store_object_id,
78            Mutation::insert_object(
79                ObjectKey::object(reserved_object_id.release()),
80                ObjectValue::Object {
81                    kind: ObjectKind::Graveyard,
82                    attributes: ObjectAttributes {
83                        creation_time: now.clone(),
84                        modification_time: now,
85                        project_id: 0,
86                        ..Default::default()
87                    },
88                },
89            ),
90        );
91        Ok(object_id)
92    }
93
94    /// Starts an asynchronous task to reap the graveyard for all entries older than
95    /// |journal_offset| (exclusive).
96    /// If a task is already started, this has no effect, even if that task was targeting an older
97    /// |journal_offset|.
98    pub fn reap_async(self: Arc<Self>) {
99        let mut reaper_task = self.reaper_task.lock();
100        if let ReaperTask::Pending(_) = &*reaper_task {
101            if let ReaperTask::Pending(receiver) =
102                std::mem::replace(&mut *reaper_task, ReaperTask::None)
103            {
104                *reaper_task = ReaperTask::Running(fasync::Task::spawn(
105                    self.clone()
106                        .reap_task(receiver)
107                        .trace(trace_future_args!("Graveyard::reap_task")),
108                ));
109            } else {
110                unreachable!();
111            }
112        }
113    }
114
115    /// Returns a future which completes when the ongoing reap task (if it exists) completes.
116    pub async fn wait_for_reap(&self) {
117        self.channel.close_channel();
118        let task = std::mem::replace(&mut *self.reaper_task.lock(), ReaperTask::None);
119        if let ReaperTask::Running(task) = task {
120            task.await;
121        }
122    }
123
124    async fn reap_task(self: Arc<Self>, mut receiver: UnboundedReceiver<Message>) {
125        // Wait and process reap requests.
126        while let Some(message) = receiver.next().await {
127            match message {
128                Message::Tombstone(store_id, object_id, attribute_id) => {
129                    let res = if let Some(attribute_id) = attribute_id {
130                        self.tombstone_attribute(store_id, object_id, attribute_id).await
131                    } else {
132                        self.tombstone_object(store_id, object_id).await
133                    };
134                    if let Err(e) = res {
135                        debug_assert!(
136                            false,
137                            "Tombstone error: {e:?}, store_id: {store_id}, oid: {object_id}, \
138                             attribute_id: {attribute_id:?}"
139                        );
140                        error!(
141                            error:? = e,
142                            store_id,
143                            oid = object_id,
144                            attribute_id;
145                            "Tombstone error"
146                        );
147                    }
148                }
149                Message::Trim(store_id, object_id) => {
150                    if let Err(e) = self.trim(store_id, object_id).await {
151                        debug_assert!(
152                            false,
153                            "Tombstone error: {e:?}, store_id: {store_id}, oid: {object_id}"
154                        );
155                        error!(error:? = e, store_id, oid = object_id; "Tombstone error");
156                    }
157                }
158                Message::Flush(sender) => {
159                    let _ = sender.send(());
160                }
161            }
162        }
163    }
164
165    /// Performs the initial mount-time reap for the given store.  This will queue all items in the
166    /// graveyard.  Concurrently adding more entries to the graveyard will lead to undefined
167    /// behaviour: the entries might or might not be immediately tombstoned, so callers should wait
168    /// for this to return before changing to a state where more entries can be added.  Once this
169    /// has returned, entries will be tombstoned in the background.
170    #[trace]
171    pub async fn initial_reap(self: &Arc<Self>, store: &ObjectStore) -> Result<usize, Error> {
172        if store.filesystem().options().skip_initial_reap {
173            return Ok(0);
174        }
175        let mut count = 0;
176        let layer_set = store.tree().layer_set();
177        let mut merger = layer_set.merger();
178        let graveyard_object_id = store.graveyard_directory_object_id();
179        let mut iter = Self::iter(graveyard_object_id, &mut merger).await?;
180        let store_id = store.store_object_id();
181        // TODO(https://fxbug.dev/355246066): This iteration can sometimes attempt to reap
182        // attributes after they've been reaped as part of the parent object.
183        while let Some(GraveyardEntryInfo { object_id, attribute_id, value }) = iter.get() {
184            store.graveyard_entries.fetch_add(1, Ordering::Relaxed);
185            match value {
186                ObjectValue::Some => {
187                    if let Some(attribute_id) = attribute_id {
188                        self.queue_tombstone_attribute(store_id, object_id, attribute_id)
189                    } else {
190                        self.queue_tombstone_object(store_id, object_id)
191                    }
192                }
193                ObjectValue::Trim => {
194                    if attribute_id.is_some() {
195                        return Err(anyhow!(
196                            "Trim is not currently supported for a single attribute"
197                        ));
198                    }
199                    self.queue_trim(store_id, object_id)
200                }
201                _ => bail!(anyhow!(FxfsError::Inconsistent).context("Bad graveyard value")),
202            }
203            count += 1;
204            iter.advance().await?;
205        }
206        Ok(count)
207    }
208    /// Queues an object for tombstoning.
209    pub fn queue_tombstone_object(&self, store_id: u64, object_id: u64) {
210        let _ = self.channel.unbounded_send(Message::Tombstone(store_id, object_id, None));
211    }
212
213    /// Queues an object's attribute for tombstoning.
214    pub fn queue_tombstone_attribute(&self, store_id: u64, object_id: u64, attribute_id: u64) {
215        let _ = self.channel.unbounded_send(Message::Tombstone(
216            store_id,
217            object_id,
218            Some(attribute_id),
219        ));
220    }
221
222    fn queue_trim(&self, store_id: u64, object_id: u64) {
223        let _ = self.channel.unbounded_send(Message::Trim(store_id, object_id));
224    }
225
226    /// Waits for all preceding queued tombstones to finish.
227    pub async fn flush(&self) {
228        let (sender, receiver) = oneshot::channel::<()>();
229        self.channel.unbounded_send(Message::Flush(sender)).unwrap();
230        receiver.await.unwrap();
231    }
232
233    /// Immediately tombstones (discards) an object in the graveyard.
234    /// NB: Code should generally use |queue_tombstone| instead.
235    pub async fn tombstone_object(&self, store_id: u64, object_id: u64) -> Result<(), Error> {
236        let store = self
237            .object_manager
238            .store(store_id)
239            .with_context(|| format!("Failed to get store {}", store_id))?;
240        // For now, it's safe to assume that all objects in the root parent and root store should
241        // return space to the metadata reservation, but we might have to revisit that if we end up
242        // with objects that are in other stores.
243        let options = if store_id == self.object_manager.root_parent_store_object_id()
244            || store_id == self.object_manager.root_store_object_id()
245        {
246            Options {
247                skip_journal_checks: true,
248                borrow_metadata_space: true,
249                allocator_reservation: Some(self.object_manager.metadata_reservation()),
250                ..Default::default()
251            }
252        } else {
253            Options { skip_journal_checks: true, borrow_metadata_space: true, ..Default::default() }
254        };
255        store.tombstone_object(object_id, options).await
256    }
257
258    /// Immediately tombstones (discards) and attribute in the graveyard.
259    /// NB: Code should generally use |queue_tombstone| instead.
260    pub async fn tombstone_attribute(
261        &self,
262        store_id: u64,
263        object_id: u64,
264        attribute_id: u64,
265    ) -> Result<(), Error> {
266        let store = self
267            .object_manager
268            .store(store_id)
269            .with_context(|| format!("Failed to get store {}", store_id))?;
270        // For now, it's safe to assume that all objects in the root parent and root store should
271        // return space to the metadata reservation, but we might have to revisit that if we end up
272        // with objects that are in other stores.
273        let options = if store_id == self.object_manager.root_parent_store_object_id()
274            || store_id == self.object_manager.root_store_object_id()
275        {
276            Options {
277                skip_journal_checks: true,
278                borrow_metadata_space: true,
279                allocator_reservation: Some(self.object_manager.metadata_reservation()),
280                ..Default::default()
281            }
282        } else {
283            Options { skip_journal_checks: true, borrow_metadata_space: true, ..Default::default() }
284        };
285        store.tombstone_attribute(object_id, attribute_id, options).await
286    }
287
288    async fn trim(&self, store_id: u64, object_id: u64) -> Result<(), Error> {
289        let store = self
290            .object_manager
291            .store(store_id)
292            .with_context(|| format!("Failed to get store {}", store_id))?;
293        let fs = store.filesystem();
294        let truncate_guard = fs.truncate_guard(store_id, object_id).await;
295        store.trim(object_id, &truncate_guard).await.context("Failed to trim object")
296    }
297
298    /// Returns an iterator that will return graveyard entries skipping deleted ones.  Example
299    /// usage:
300    ///
301    ///   let layer_set = graveyard.store().tree().layer_set();
302    ///   let mut merger = layer_set.merger();
303    ///   let mut iter = graveyard.iter(&mut merger).await?;
304    ///
305    pub async fn iter<'a, 'b>(
306        graveyard_object_id: u64,
307        merger: &'a mut Merger<'b, ObjectKey, ObjectValue>,
308    ) -> Result<GraveyardIterator<'a, 'b>, Error> {
309        Self::iter_from(merger, graveyard_object_id, 0).await
310    }
311
312    /// Like "iter", but seeks from a specific (store-id, object-id) tuple.  Example usage:
313    ///
314    ///   let layer_set = graveyard.store().tree().layer_set();
315    ///   let mut merger = layer_set.merger();
316    ///   let mut iter = graveyard.iter_from(&mut merger, (2, 3)).await?;
317    ///
318    async fn iter_from<'a, 'b>(
319        merger: &'a mut Merger<'b, ObjectKey, ObjectValue>,
320        graveyard_object_id: u64,
321        from: u64,
322    ) -> Result<GraveyardIterator<'a, 'b>, Error> {
323        GraveyardIterator::new(
324            graveyard_object_id,
325            merger
326                .query(Query::FullRange(&ObjectKey::graveyard_entry(graveyard_object_id, from)))
327                .await?,
328        )
329        .await
330    }
331}
332
333pub struct GraveyardIterator<'a, 'b> {
334    object_id: u64,
335    iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>,
336}
337
338/// Contains information about a graveyard entry associated with a particular object or
339/// attribute.
340#[derive(Debug, PartialEq)]
341pub struct GraveyardEntryInfo {
342    object_id: u64,
343    attribute_id: Option<u64>,
344    value: ObjectValue,
345}
346
347impl GraveyardEntryInfo {
348    pub fn object_id(&self) -> u64 {
349        self.object_id
350    }
351
352    pub fn attribute_id(&self) -> Option<u64> {
353        self.attribute_id
354    }
355
356    pub fn value(&self) -> &ObjectValue {
357        &self.value
358    }
359}
360
361impl<'a, 'b> GraveyardIterator<'a, 'b> {
362    async fn new(
363        object_id: u64,
364        iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>,
365    ) -> Result<GraveyardIterator<'a, 'b>, Error> {
366        let mut iter = GraveyardIterator { object_id, iter };
367        iter.skip_deleted_entries().await?;
368        Ok(iter)
369    }
370
371    async fn skip_deleted_entries(&mut self) -> Result<(), Error> {
372        loop {
373            match self.iter.get() {
374                Some(ItemRef {
375                    key: ObjectKey { object_id, .. },
376                    value: ObjectValue::None,
377                    ..
378                }) if *object_id == self.object_id => {}
379                _ => return Ok(()),
380            }
381            self.iter.advance().await?;
382        }
383    }
384
385    pub fn get(&self) -> Option<GraveyardEntryInfo> {
386        match self.iter.get() {
387            Some(ItemRef {
388                key: ObjectKey { object_id: oid, data: ObjectKeyData::GraveyardEntry { object_id } },
389                value,
390                ..
391            }) if *oid == self.object_id => Some(GraveyardEntryInfo {
392                object_id: *object_id,
393                attribute_id: None,
394                value: value.clone(),
395            }),
396            Some(ItemRef {
397                key:
398                    ObjectKey {
399                        object_id: oid,
400                        data: ObjectKeyData::GraveyardAttributeEntry { object_id, attribute_id },
401                    },
402                value,
403                ..
404            }) if *oid == self.object_id => Some(GraveyardEntryInfo {
405                object_id: *object_id,
406                attribute_id: Some(*attribute_id),
407                value: value.clone(),
408            }),
409            _ => None,
410        }
411    }
412
413    pub async fn advance(&mut self) -> Result<(), Error> {
414        self.iter.advance().await?;
415        self.skip_deleted_entries().await
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use super::{Graveyard, GraveyardEntryInfo, ObjectStore};
422    use crate::errors::FxfsError;
423    use crate::filesystem::{FxFilesystem, FxFilesystemBuilder};
424    use crate::fsck::fsck;
425    use crate::object_handle::ObjectHandle;
426    use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
427    use crate::object_store::object_record::ObjectValue;
428    use crate::object_store::transaction::{Options, lock_keys};
429    use crate::object_store::{FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions, Mutation, ObjectKey};
430    use assert_matches::assert_matches;
431    use storage_device::DeviceHolder;
432    use storage_device::fake_device::FakeDevice;
433
434    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
435
436    #[fuchsia::test]
437    async fn test_graveyard() {
438        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
439        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
440        let root_store = fs.root_store();
441
442        assert_eq!(root_store.graveyard_count(), 0);
443
444        let mut transaction = fs
445            .clone()
446            .new_transaction(lock_keys![], Options::default())
447            .await
448            .expect("new_transaction failed");
449        let handle1 = ObjectStore::create_object(
450            &root_store,
451            &mut transaction,
452            HandleOptions::default(),
453            None,
454        )
455        .await
456        .expect("create_object failed");
457        let handle2 = ObjectStore::create_object(
458            &root_store,
459            &mut transaction,
460            HandleOptions::default(),
461            None,
462        )
463        .await
464        .expect("create_object failed");
465        transaction.commit().await.expect("commit failed");
466        let id1 = handle1.object_id();
467        let id2 = handle2.object_id();
468
469        // Create and add two objects to the graveyard.
470        let mut transaction = fs
471            .clone()
472            .new_transaction(lock_keys![], Options::default())
473            .await
474            .expect("new_transaction failed");
475
476        root_store.add_to_graveyard(&mut transaction, id1);
477        root_store.add_to_graveyard(&mut transaction, id2);
478        transaction.commit().await.expect("commit failed");
479
480        assert_eq!(root_store.graveyard_count(), 2);
481
482        // Check that we see the objects we added.
483        {
484            let layer_set = root_store.tree().layer_set();
485            let mut merger = layer_set.merger();
486            let mut iter = Graveyard::iter(root_store.graveyard_directory_object_id(), &mut merger)
487                .await
488                .expect("iter failed");
489            assert_matches!(
490                iter.get().expect("missing entry"),
491                GraveyardEntryInfo { object_id, attribute_id: None, value: ObjectValue::Some }
492                if object_id == id1
493            );
494            iter.advance().await.expect("advance failed");
495            assert_matches!(
496                iter.get().expect("missing entry"),
497                GraveyardEntryInfo { object_id, attribute_id: None, value: ObjectValue::Some }
498                if object_id == id2
499            );
500            iter.advance().await.expect("advance failed");
501            assert_eq!(iter.get(), None);
502        }
503
504        // Remove one of the objects.
505        let mut transaction = fs
506            .clone()
507            .new_transaction(lock_keys![], Options::default())
508            .await
509            .expect("new_transaction failed");
510        root_store.remove_from_graveyard(&mut transaction, id2);
511        transaction.commit().await.expect("commit failed");
512
513        assert_eq!(root_store.graveyard_count(), 1);
514
515        // Check that the graveyard has been updated as expected.
516        let layer_set = root_store.tree().layer_set();
517        let mut merger = layer_set.merger();
518        let mut iter = Graveyard::iter(root_store.graveyard_directory_object_id(), &mut merger)
519            .await
520            .expect("iter failed");
521        assert_matches!(
522            iter.get().expect("missing entry"),
523            GraveyardEntryInfo { object_id, attribute_id: None, value: ObjectValue::Some }
524            if object_id == id1
525        );
526        iter.advance().await.expect("advance failed");
527        assert_eq!(iter.get(), None);
528    }
529
530    #[fuchsia::test]
531    async fn test_graveyard_count_replay() {
532        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
533        let (device, _object_ids) = {
534            let fs = FxFilesystemBuilder::new()
535                .skip_initial_reap(true)
536                .format(true)
537                .open(device)
538                .await
539                .expect("open failed");
540            let root_store = fs.root_store();
541
542            let mut object_ids = Vec::new();
543            let mut transaction = fs
544                .clone()
545                .new_transaction(lock_keys![], Options::default())
546                .await
547                .expect("new_transaction failed");
548            let handle1 = ObjectStore::create_object(
549                &root_store,
550                &mut transaction,
551                HandleOptions::default(),
552                None,
553            )
554            .await
555            .expect("create_object failed");
556            let handle2 = ObjectStore::create_object(
557                &root_store,
558                &mut transaction,
559                HandleOptions::default(),
560                None,
561            )
562            .await
563            .expect("create_object failed");
564            transaction.commit().await.expect("commit failed");
565            object_ids.push(handle1.object_id());
566            object_ids.push(handle2.object_id());
567
568            // Create and add two objects to the graveyard.
569            let mut transaction = fs
570                .clone()
571                .new_transaction(lock_keys![], Options::default())
572                .await
573                .expect("new_transaction failed");
574
575            root_store.add_to_graveyard(&mut transaction, object_ids[0]);
576            root_store.add_to_graveyard(&mut transaction, object_ids[1]);
577            transaction.commit().await.expect("commit failed");
578
579            assert_eq!(root_store.graveyard_count(), 2);
580            fs.close().await.expect("close failed");
581            (fs.take_device().await, object_ids)
582        };
583        device.reopen(false);
584        let device = {
585            let fs =
586                FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
587            let root_store = fs.root_store();
588            // Counter is 0 because initial_reap is not called for read-only mounts.
589            assert_eq!(root_store.graveyard_count(), 0);
590
591            // Now manually run it. This will count and queue (but the reaper isn't running).
592            let count =
593                fs.graveyard().initial_reap(&root_store).await.expect("initial_reap failed");
594            let actual_count = root_store.graveyard_count();
595            assert_eq!(count, 2, "initial_reap found wrong number of items (count={})", count);
596            assert_eq!(
597                actual_count, 2,
598                "graveyard_count returned {} but initial_reap found {}",
599                actual_count, count
600            );
601
602            fs.close().await.expect("close failed");
603            fs.take_device().await
604        };
605        device.reopen(false);
606        {
607            // Now test the full flow where they are automatically reaped.
608            let fs = FxFilesystem::open(device).await.expect("open failed");
609            let root_store = fs.root_store();
610
611            // They might or might not have been reaped yet.
612            // Wait for the reaper to finish.
613            fs.graveyard().wait_for_reap().await;
614
615            // Now the count MUST be 0.
616            assert_eq!(root_store.graveyard_count(), 0);
617            fs.close().await.expect("close failed");
618        }
619    }
620
621    #[fuchsia::test]
622    async fn test_tombstone_attribute() {
623        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
624        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
625        let root_store = fs.root_store();
626        let mut transaction = fs
627            .clone()
628            .new_transaction(lock_keys![], Options::default())
629            .await
630            .expect("new_transaction failed");
631
632        let handle = ObjectStore::create_object(
633            &root_store,
634            &mut transaction,
635            HandleOptions::default(),
636            None,
637        )
638        .await
639        .expect("failed to create object");
640        transaction.commit().await.expect("commit failed");
641
642        handle
643            .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &[0; 8192])
644            .await
645            .expect("failed to write merkle attribute");
646        let object_id = handle.object_id();
647        let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
648        transaction.add(
649            root_store.store_object_id(),
650            Mutation::replace_or_insert_object(
651                ObjectKey::graveyard_attribute_entry(
652                    root_store.graveyard_directory_object_id(),
653                    object_id,
654                    FSVERITY_MERKLE_ATTRIBUTE_ID,
655                ),
656                ObjectValue::Some,
657            ),
658        );
659
660        transaction.commit().await.expect("commit failed");
661
662        fs.close().await.expect("failed to close filesystem");
663        let device = fs.take_device().await;
664        device.reopen(false);
665
666        let fs =
667            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
668        fsck(fs.clone()).await.expect("fsck failed");
669        fs.close().await.expect("failed to close filesystem");
670        let device = fs.take_device().await;
671        device.reopen(false);
672
673        // On open, the filesystem will call initial_reap which will call queue_tombstone().
674        let fs = FxFilesystem::open(device).await.expect("open failed");
675        // `wait_for_reap` ensures that the Message::Tombstone is actually processed.
676        fs.graveyard().wait_for_reap().await;
677        let root_store = fs.root_store();
678
679        let handle =
680            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
681                .await
682                .expect("failed to open object");
683
684        assert_eq!(
685            handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
686            None
687        );
688        fsck(fs.clone()).await.expect("fsck failed");
689    }
690
691    #[fuchsia::test]
692    async fn test_tombstone_attribute_and_object() {
693        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
694        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
695        let root_store = fs.root_store();
696        let mut transaction = fs
697            .clone()
698            .new_transaction(lock_keys![], Options::default())
699            .await
700            .expect("new_transaction failed");
701
702        let handle = ObjectStore::create_object(
703            &root_store,
704            &mut transaction,
705            HandleOptions::default(),
706            None,
707        )
708        .await
709        .expect("failed to create object");
710        transaction.commit().await.expect("commit failed");
711
712        handle
713            .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &[0; 8192])
714            .await
715            .expect("failed to write merkle attribute");
716        let object_id = handle.object_id();
717        let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
718        transaction.add(
719            root_store.store_object_id(),
720            Mutation::replace_or_insert_object(
721                ObjectKey::graveyard_attribute_entry(
722                    root_store.graveyard_directory_object_id(),
723                    object_id,
724                    FSVERITY_MERKLE_ATTRIBUTE_ID,
725                ),
726                ObjectValue::Some,
727            ),
728        );
729        transaction.commit().await.expect("commit failed");
730        let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
731        transaction.add(
732            root_store.store_object_id(),
733            Mutation::replace_or_insert_object(
734                ObjectKey::graveyard_entry(root_store.graveyard_directory_object_id(), object_id),
735                ObjectValue::Some,
736            ),
737        );
738        transaction.commit().await.expect("commit failed");
739
740        fs.close().await.expect("failed to close filesystem");
741        let device = fs.take_device().await;
742        device.reopen(false);
743
744        let fs =
745            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
746        fsck(fs.clone()).await.expect("fsck failed");
747        fs.close().await.expect("failed to close filesystem");
748        let device = fs.take_device().await;
749        device.reopen(false);
750
751        // On open, the filesystem will call initial_reap which will call queue_tombstone().
752        let fs = FxFilesystem::open(device).await.expect("open failed");
753        // `wait_for_reap` ensures that the two tombstone messages are processed.
754        fs.graveyard().wait_for_reap().await;
755
756        let root_store = fs.root_store();
757        if let Err(e) =
758            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None).await
759        {
760            assert!(FxfsError::NotFound.matches(&e));
761        } else {
762            panic!("open_object succeeded");
763        };
764        fsck(fs.clone()).await.expect("fsck failed");
765    }
766
767    #[fuchsia::test]
768    async fn test_tombstone_large_attribute() {
769        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
770        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
771        let root_store = fs.root_store();
772        let mut transaction = fs
773            .clone()
774            .new_transaction(lock_keys![], Options::default())
775            .await
776            .expect("new_transaction failed");
777
778        let handle = ObjectStore::create_object(
779            &root_store,
780            &mut transaction,
781            HandleOptions::default(),
782            None,
783        )
784        .await
785        .expect("failed to create object");
786        transaction.commit().await.expect("commit failed");
787
788        let object_id = {
789            let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
790            transaction.add(
791                root_store.store_object_id(),
792                Mutation::replace_or_insert_object(
793                    ObjectKey::graveyard_attribute_entry(
794                        root_store.graveyard_directory_object_id(),
795                        handle.object_id(),
796                        FSVERITY_MERKLE_ATTRIBUTE_ID,
797                    ),
798                    ObjectValue::Some,
799                ),
800            );
801
802            // This write should span three transactions. This test mimics the behavior when the
803            // last transaction gets interrupted by a filesystem.close().
804            handle
805                .write_new_attr_in_batches(
806                    &mut transaction,
807                    FSVERITY_MERKLE_ATTRIBUTE_ID,
808                    &vec![0; 3 * WRITE_ATTR_BATCH_SIZE],
809                    WRITE_ATTR_BATCH_SIZE,
810                )
811                .await
812                .expect("failed to write merkle attribute");
813
814            handle.object_id()
815            // Drop the transaction to simulate interrupting the merkle tree creation as well as to
816            // release the transaction locks.
817        };
818
819        fs.close().await.expect("failed to close filesystem");
820        let device = fs.take_device().await;
821        device.reopen(false);
822
823        let fs =
824            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
825        fsck(fs.clone()).await.expect("fsck failed");
826        fs.close().await.expect("failed to close filesystem");
827        let device = fs.take_device().await;
828        device.reopen(false);
829
830        // On open, the filesystem will call initial_reap which will call queue_tombstone().
831        let fs = FxFilesystem::open(device).await.expect("open failed");
832        // `wait_for_reap` ensures that the two tombstone messages are processed.
833        fs.graveyard().wait_for_reap().await;
834
835        let root_store = fs.root_store();
836
837        let handle =
838            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
839                .await
840                .expect("failed to open object");
841
842        assert_eq!(
843            handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
844            None
845        );
846        fsck(fs.clone()).await.expect("fsck failed");
847    }
848}