fxfs/object_store/
graveyard.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::log::*;
7use crate::lsm_tree::merge::{Merger, MergerIterator};
8use crate::lsm_tree::types::{ItemRef, LayerIterator};
9use crate::lsm_tree::Query;
10use crate::object_handle::INVALID_OBJECT_ID;
11use crate::object_store::object_manager::ObjectManager;
12use crate::object_store::object_record::{
13    ObjectAttributes, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue, Timestamp,
14};
15use crate::object_store::transaction::{Mutation, Options, Transaction};
16use crate::object_store::ObjectStore;
17use anyhow::{anyhow, bail, Context, Error};
18use fuchsia_async::{self as fasync};
19use fuchsia_sync::Mutex;
20use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
21use futures::channel::oneshot;
22use futures::StreamExt;
23use std::sync::Arc;
24
25enum ReaperTask {
26    None,
27    Pending(UnboundedReceiver<Message>),
28    Running(fasync::Task<()>),
29}
30
31/// A graveyard exists as a place to park objects that should be deleted when they are no longer in
32/// use.  How objects enter and leave the graveyard is up to the caller to decide.  The intention is
33/// that at mount time, any objects in the graveyard will get removed.  Each object store has a
34/// directory like object that contains a list of the objects within that store that are part of the
35/// graveyard.  A single instance of this Graveyard struct manages *all* stores.
36pub struct Graveyard {
37    object_manager: Arc<ObjectManager>,
38    reaper_task: Mutex<ReaperTask>,
39    channel: UnboundedSender<Message>,
40}
41
42enum Message {
43    // Tombstone the object identified by <store-id>, <object-id>, Option<attribute-id>. If
44    // <attribute-id> is Some, tombstone just the attribute instead of the entire object.
45    Tombstone(u64, u64, Option<u64>),
46
47    // Trims the identified object.
48    Trim(u64, u64),
49
50    // When the flush message is processed, notifies sender.  This allows the receiver to know
51    // that all preceding tombstone messages have been processed.
52    Flush(oneshot::Sender<()>),
53}
54
55#[fxfs_trace::trace]
56impl Graveyard {
57    /// Creates a new instance of the graveyard manager.
58    pub fn new(object_manager: Arc<ObjectManager>) -> Arc<Self> {
59        let (sender, receiver) = unbounded();
60        Arc::new(Graveyard {
61            object_manager,
62            reaper_task: Mutex::new(ReaperTask::Pending(receiver)),
63            channel: sender,
64        })
65    }
66
67    /// Creates a graveyard object in `store`.  Returns the object ID for the graveyard object.
68    pub fn create(transaction: &mut Transaction<'_>, store: &ObjectStore) -> u64 {
69        let object_id = store.maybe_get_next_object_id();
70        // This is OK because we only ever create a graveyard as we are creating a new store so
71        // maybe_get_next_object_id will never fail here due to a lack of an object ID cipher.
72        assert_ne!(object_id, INVALID_OBJECT_ID);
73        let now = Timestamp::now();
74        transaction.add(
75            store.store_object_id,
76            Mutation::insert_object(
77                ObjectKey::object(object_id),
78                ObjectValue::Object {
79                    kind: ObjectKind::Graveyard,
80                    attributes: ObjectAttributes {
81                        creation_time: now.clone(),
82                        modification_time: now,
83                        project_id: 0,
84                        ..Default::default()
85                    },
86                },
87            ),
88        );
89        object_id
90    }
91
92    /// Starts an asynchronous task to reap the graveyard for all entries older than
93    /// |journal_offset| (exclusive).
94    /// If a task is already started, this has no effect, even if that task was targeting an older
95    /// |journal_offset|.
96    pub fn reap_async(self: Arc<Self>) {
97        let mut reaper_task = self.reaper_task.lock();
98        if let ReaperTask::Pending(_) = &*reaper_task {
99            if let ReaperTask::Pending(receiver) =
100                std::mem::replace(&mut *reaper_task, ReaperTask::None)
101            {
102                *reaper_task =
103                    ReaperTask::Running(fasync::Task::spawn(self.clone().reap_task(receiver)));
104            } else {
105                unreachable!();
106            }
107        }
108    }
109
110    /// Returns a future which completes when the ongoing reap task (if it exists) completes.
111    pub async fn wait_for_reap(&self) {
112        self.channel.close_channel();
113        let task = std::mem::replace(&mut *self.reaper_task.lock(), ReaperTask::None);
114        if let ReaperTask::Running(task) = task {
115            task.await;
116        }
117    }
118
119    async fn reap_task(self: Arc<Self>, mut receiver: UnboundedReceiver<Message>) {
120        // Wait and process reap requests.
121        while let Some(message) = receiver.next().await {
122            match message {
123                Message::Tombstone(store_id, object_id, attribute_id) => {
124                    let res = if let Some(attribute_id) = attribute_id {
125                        self.tombstone_attribute(store_id, object_id, attribute_id).await
126                    } else {
127                        self.tombstone_object(store_id, object_id).await
128                    };
129                    if let Err(e) = res {
130                        error!(error:? = e, store_id, oid = object_id, attribute_id; "Tombstone error");
131                    }
132                }
133                Message::Trim(store_id, object_id) => {
134                    if let Err(e) = self.trim(store_id, object_id).await {
135                        error!(error:? = e, store_id, oid = object_id; "Tombstone error");
136                    }
137                }
138                Message::Flush(sender) => {
139                    let _ = sender.send(());
140                }
141            }
142        }
143    }
144
145    /// Performs the initial mount-time reap for the given store.  This will queue all items in the
146    /// graveyard.  Concurrently adding more entries to the graveyard will lead to undefined
147    /// behaviour: the entries might or might not be immediately tombstoned, so callers should wait
148    /// for this to return before changing to a state where more entries can be added.  Once this
149    /// has returned, entries will be tombstoned in the background.
150    #[trace]
151    pub async fn initial_reap(self: &Arc<Self>, store: &ObjectStore) -> Result<usize, Error> {
152        if store.filesystem().options().skip_initial_reap {
153            return Ok(0);
154        }
155        let mut count = 0;
156        let layer_set = store.tree().layer_set();
157        let mut merger = layer_set.merger();
158        let graveyard_object_id = store.graveyard_directory_object_id();
159        let mut iter = Self::iter(graveyard_object_id, &mut merger).await?;
160        let store_id = store.store_object_id();
161        // TODO(https://fxbug.dev/355246066): This iteration can sometimes attempt to reap
162        // attributes after they've been reaped as part of the parent object.
163        while let Some(GraveyardEntryInfo { object_id, attribute_id, sequence: _, value }) =
164            iter.get()
165        {
166            match value {
167                ObjectValue::Some => {
168                    if let Some(attribute_id) = attribute_id {
169                        self.queue_tombstone_attribute(store_id, object_id, attribute_id)
170                    } else {
171                        self.queue_tombstone_object(store_id, object_id)
172                    }
173                }
174                ObjectValue::Trim => {
175                    if attribute_id.is_some() {
176                        return Err(anyhow!(
177                            "Trim is not currently supported for a single attribute"
178                        ));
179                    }
180                    self.queue_trim(store_id, object_id)
181                }
182                _ => bail!(anyhow!(FxfsError::Inconsistent).context("Bad graveyard value")),
183            }
184            count += 1;
185            iter.advance().await?;
186        }
187        Ok(count)
188    }
189
190    /// Queues an object for tombstoning.
191    pub fn queue_tombstone_object(&self, store_id: u64, object_id: u64) {
192        let _ = self.channel.unbounded_send(Message::Tombstone(store_id, object_id, None));
193    }
194
195    /// Queues an object's attribute for tombstoning.
196    pub fn queue_tombstone_attribute(&self, store_id: u64, object_id: u64, attribute_id: u64) {
197        let _ = self.channel.unbounded_send(Message::Tombstone(
198            store_id,
199            object_id,
200            Some(attribute_id),
201        ));
202    }
203
204    fn queue_trim(&self, store_id: u64, object_id: u64) {
205        let _ = self.channel.unbounded_send(Message::Trim(store_id, object_id));
206    }
207
208    /// Waits for all preceding queued tombstones to finish.
209    pub async fn flush(&self) {
210        let (sender, receiver) = oneshot::channel::<()>();
211        self.channel.unbounded_send(Message::Flush(sender)).unwrap();
212        receiver.await.unwrap();
213    }
214
215    /// Immediately tombstones (discards) an object in the graveyard.
216    /// NB: Code should generally use |queue_tombstone| instead.
217    pub async fn tombstone_object(&self, store_id: u64, object_id: u64) -> Result<(), Error> {
218        let store = self
219            .object_manager
220            .store(store_id)
221            .with_context(|| format!("Failed to get store {}", store_id))?;
222        // For now, it's safe to assume that all objects in the root parent and root store should
223        // return space to the metadata reservation, but we might have to revisit that if we end up
224        // with objects that are in other stores.
225        let options = if store_id == self.object_manager.root_parent_store_object_id()
226            || store_id == self.object_manager.root_store_object_id()
227        {
228            Options {
229                skip_journal_checks: true,
230                borrow_metadata_space: true,
231                allocator_reservation: Some(self.object_manager.metadata_reservation()),
232                ..Default::default()
233            }
234        } else {
235            Options { skip_journal_checks: true, borrow_metadata_space: true, ..Default::default() }
236        };
237        store.tombstone_object(object_id, options).await
238    }
239
240    /// Immediately tombstones (discards) and attribute in the graveyard.
241    /// NB: Code should generally use |queue_tombstone| instead.
242    pub async fn tombstone_attribute(
243        &self,
244        store_id: u64,
245        object_id: u64,
246        attribute_id: u64,
247    ) -> Result<(), Error> {
248        let store = self
249            .object_manager
250            .store(store_id)
251            .with_context(|| format!("Failed to get store {}", store_id))?;
252        // For now, it's safe to assume that all objects in the root parent and root store should
253        // return space to the metadata reservation, but we might have to revisit that if we end up
254        // with objects that are in other stores.
255        let options = if store_id == self.object_manager.root_parent_store_object_id()
256            || store_id == self.object_manager.root_store_object_id()
257        {
258            Options {
259                skip_journal_checks: true,
260                borrow_metadata_space: true,
261                allocator_reservation: Some(self.object_manager.metadata_reservation()),
262                ..Default::default()
263            }
264        } else {
265            Options { skip_journal_checks: true, borrow_metadata_space: true, ..Default::default() }
266        };
267        store.tombstone_attribute(object_id, attribute_id, options).await
268    }
269
270    async fn trim(&self, store_id: u64, object_id: u64) -> Result<(), Error> {
271        let store = self
272            .object_manager
273            .store(store_id)
274            .with_context(|| format!("Failed to get store {}", store_id))?;
275        store.trim(object_id).await.context("Failed to trim object")
276    }
277
278    /// Returns an iterator that will return graveyard entries skipping deleted ones.  Example
279    /// usage:
280    ///
281    ///   let layer_set = graveyard.store().tree().layer_set();
282    ///   let mut merger = layer_set.merger();
283    ///   let mut iter = graveyard.iter(&mut merger).await?;
284    ///
285    pub async fn iter<'a, 'b>(
286        graveyard_object_id: u64,
287        merger: &'a mut Merger<'b, ObjectKey, ObjectValue>,
288    ) -> Result<GraveyardIterator<'a, 'b>, Error> {
289        Self::iter_from(merger, graveyard_object_id, 0).await
290    }
291
292    /// Like "iter", but seeks from a specific (store-id, object-id) tuple.  Example usage:
293    ///
294    ///   let layer_set = graveyard.store().tree().layer_set();
295    ///   let mut merger = layer_set.merger();
296    ///   let mut iter = graveyard.iter_from(&mut merger, (2, 3)).await?;
297    ///
298    async fn iter_from<'a, 'b>(
299        merger: &'a mut Merger<'b, ObjectKey, ObjectValue>,
300        graveyard_object_id: u64,
301        from: u64,
302    ) -> Result<GraveyardIterator<'a, 'b>, Error> {
303        GraveyardIterator::new(
304            graveyard_object_id,
305            merger
306                .query(Query::FullRange(&ObjectKey::graveyard_entry(graveyard_object_id, from)))
307                .await?,
308        )
309        .await
310    }
311}
312
313pub struct GraveyardIterator<'a, 'b> {
314    object_id: u64,
315    iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>,
316}
317
318/// Contains information about a graveyard entry associated with a particular object or
319/// attribute.
320#[derive(Debug, PartialEq)]
321pub struct GraveyardEntryInfo {
322    object_id: u64,
323    attribute_id: Option<u64>,
324    sequence: u64,
325    value: ObjectValue,
326}
327
328impl GraveyardEntryInfo {
329    pub fn object_id(&self) -> u64 {
330        self.object_id
331    }
332
333    pub fn attribute_id(&self) -> Option<u64> {
334        self.attribute_id
335    }
336
337    pub fn value(&self) -> &ObjectValue {
338        &self.value
339    }
340}
341
342impl<'a, 'b> GraveyardIterator<'a, 'b> {
343    async fn new(
344        object_id: u64,
345        iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>,
346    ) -> Result<GraveyardIterator<'a, 'b>, Error> {
347        let mut iter = GraveyardIterator { object_id, iter };
348        iter.skip_deleted_entries().await?;
349        Ok(iter)
350    }
351
352    async fn skip_deleted_entries(&mut self) -> Result<(), Error> {
353        loop {
354            match self.iter.get() {
355                Some(ItemRef {
356                    key: ObjectKey { object_id, .. },
357                    value: ObjectValue::None,
358                    ..
359                }) if *object_id == self.object_id => {}
360                _ => return Ok(()),
361            }
362            self.iter.advance().await?;
363        }
364    }
365
366    pub fn get(&self) -> Option<GraveyardEntryInfo> {
367        match self.iter.get() {
368            Some(ItemRef {
369                key: ObjectKey { object_id: oid, data: ObjectKeyData::GraveyardEntry { object_id } },
370                value,
371                sequence,
372                ..
373            }) if *oid == self.object_id => Some(GraveyardEntryInfo {
374                object_id: *object_id,
375                attribute_id: None,
376                sequence,
377                value: value.clone(),
378            }),
379            Some(ItemRef {
380                key:
381                    ObjectKey {
382                        object_id: oid,
383                        data: ObjectKeyData::GraveyardAttributeEntry { object_id, attribute_id },
384                    },
385                value,
386                sequence,
387            }) if *oid == self.object_id => Some(GraveyardEntryInfo {
388                object_id: *object_id,
389                attribute_id: Some(*attribute_id),
390                sequence,
391                value: value.clone(),
392            }),
393            _ => None,
394        }
395    }
396
397    pub async fn advance(&mut self) -> Result<(), Error> {
398        self.iter.advance().await?;
399        self.skip_deleted_entries().await
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::{Graveyard, GraveyardEntryInfo, ObjectStore};
406    use crate::errors::FxfsError;
407    use crate::filesystem::{FxFilesystem, FxFilesystemBuilder};
408    use crate::fsck::fsck;
409    use crate::object_handle::ObjectHandle;
410    use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
411    use crate::object_store::object_record::ObjectValue;
412    use crate::object_store::transaction::{lock_keys, Options};
413    use crate::object_store::{HandleOptions, Mutation, ObjectKey, FSVERITY_MERKLE_ATTRIBUTE_ID};
414    use assert_matches::assert_matches;
415    use storage_device::fake_device::FakeDevice;
416    use storage_device::DeviceHolder;
417
418    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
419
420    #[fuchsia::test]
421    async fn test_graveyard() {
422        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
423        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
424        let root_store = fs.root_store();
425
426        // Create and add two objects to the graveyard.
427        let mut transaction = fs
428            .clone()
429            .new_transaction(lock_keys![], Options::default())
430            .await
431            .expect("new_transaction failed");
432
433        root_store.add_to_graveyard(&mut transaction, 3);
434        root_store.add_to_graveyard(&mut transaction, 4);
435        transaction.commit().await.expect("commit failed");
436
437        // Check that we see the objects we added.
438        {
439            let layer_set = root_store.tree().layer_set();
440            let mut merger = layer_set.merger();
441            let mut iter = Graveyard::iter(root_store.graveyard_directory_object_id(), &mut merger)
442                .await
443                .expect("iter failed");
444            assert_matches!(
445                iter.get().expect("missing entry"),
446                GraveyardEntryInfo {
447                    object_id: 3,
448                    attribute_id: None,
449                    value: ObjectValue::Some,
450                    ..
451                }
452            );
453            iter.advance().await.expect("advance failed");
454            assert_matches!(
455                iter.get().expect("missing entry"),
456                GraveyardEntryInfo {
457                    object_id: 4,
458                    attribute_id: None,
459                    value: ObjectValue::Some,
460                    ..
461                }
462            );
463            iter.advance().await.expect("advance failed");
464            assert_eq!(iter.get(), None);
465        }
466
467        // Remove one of the objects.
468        let mut transaction = fs
469            .clone()
470            .new_transaction(lock_keys![], Options::default())
471            .await
472            .expect("new_transaction failed");
473        root_store.remove_from_graveyard(&mut transaction, 4);
474        transaction.commit().await.expect("commit failed");
475
476        // Check that the graveyard has been updated as expected.
477        let layer_set = root_store.tree().layer_set();
478        let mut merger = layer_set.merger();
479        let mut iter = Graveyard::iter(root_store.graveyard_directory_object_id(), &mut merger)
480            .await
481            .expect("iter failed");
482        assert_matches!(
483            iter.get().expect("missing entry"),
484            GraveyardEntryInfo { object_id: 3, attribute_id: None, value: ObjectValue::Some, .. }
485        );
486        iter.advance().await.expect("advance failed");
487        assert_eq!(iter.get(), None);
488    }
489
490    #[fuchsia::test]
491    async fn test_tombstone_attribute() {
492        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
493        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
494        let root_store = fs.root_store();
495        let mut transaction = fs
496            .clone()
497            .new_transaction(lock_keys![], Options::default())
498            .await
499            .expect("new_transaction failed");
500
501        let handle = ObjectStore::create_object(
502            &root_store,
503            &mut transaction,
504            HandleOptions::default(),
505            None,
506        )
507        .await
508        .expect("failed to create object");
509        transaction.commit().await.expect("commit failed");
510
511        handle
512            .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &[0; 8192])
513            .await
514            .expect("failed to write merkle attribute");
515        let object_id = handle.object_id();
516        let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
517        transaction.add(
518            root_store.store_object_id(),
519            Mutation::replace_or_insert_object(
520                ObjectKey::graveyard_attribute_entry(
521                    root_store.graveyard_directory_object_id(),
522                    object_id,
523                    FSVERITY_MERKLE_ATTRIBUTE_ID,
524                ),
525                ObjectValue::Some,
526            ),
527        );
528
529        transaction.commit().await.expect("commit failed");
530
531        fs.close().await.expect("failed to close filesystem");
532        let device = fs.take_device().await;
533        device.reopen(false);
534
535        let fs =
536            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
537        fsck(fs.clone()).await.expect("fsck failed");
538        fs.close().await.expect("failed to close filesystem");
539        let device = fs.take_device().await;
540        device.reopen(false);
541
542        // On open, the filesystem will call initial_reap which will call queue_tombstone().
543        let fs = FxFilesystem::open(device).await.expect("open failed");
544        // `wait_for_reap` ensures that the Message::Tombstone is actually processed.
545        fs.graveyard().wait_for_reap().await;
546        let root_store = fs.root_store();
547
548        let handle =
549            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
550                .await
551                .expect("failed to open object");
552
553        assert_eq!(
554            handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
555            None
556        );
557        fsck(fs.clone()).await.expect("fsck failed");
558    }
559
560    #[fuchsia::test]
561    async fn test_tombstone_attribute_and_object() {
562        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
563        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
564        let root_store = fs.root_store();
565        let mut transaction = fs
566            .clone()
567            .new_transaction(lock_keys![], Options::default())
568            .await
569            .expect("new_transaction failed");
570
571        let handle = ObjectStore::create_object(
572            &root_store,
573            &mut transaction,
574            HandleOptions::default(),
575            None,
576        )
577        .await
578        .expect("failed to create object");
579        transaction.commit().await.expect("commit failed");
580
581        handle
582            .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &[0; 8192])
583            .await
584            .expect("failed to write merkle attribute");
585        let object_id = handle.object_id();
586        let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
587        transaction.add(
588            root_store.store_object_id(),
589            Mutation::replace_or_insert_object(
590                ObjectKey::graveyard_attribute_entry(
591                    root_store.graveyard_directory_object_id(),
592                    object_id,
593                    FSVERITY_MERKLE_ATTRIBUTE_ID,
594                ),
595                ObjectValue::Some,
596            ),
597        );
598        transaction.commit().await.expect("commit failed");
599        let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
600        transaction.add(
601            root_store.store_object_id(),
602            Mutation::replace_or_insert_object(
603                ObjectKey::graveyard_entry(root_store.graveyard_directory_object_id(), object_id),
604                ObjectValue::Some,
605            ),
606        );
607        transaction.commit().await.expect("commit failed");
608
609        fs.close().await.expect("failed to close filesystem");
610        let device = fs.take_device().await;
611        device.reopen(false);
612
613        let fs =
614            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
615        fsck(fs.clone()).await.expect("fsck failed");
616        fs.close().await.expect("failed to close filesystem");
617        let device = fs.take_device().await;
618        device.reopen(false);
619
620        // On open, the filesystem will call initial_reap which will call queue_tombstone().
621        let fs = FxFilesystem::open(device).await.expect("open failed");
622        // `wait_for_reap` ensures that the two tombstone messages are processed.
623        fs.graveyard().wait_for_reap().await;
624
625        let root_store = fs.root_store();
626        if let Err(e) =
627            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None).await
628        {
629            assert!(FxfsError::NotFound.matches(&e));
630        } else {
631            panic!("open_object succeeded");
632        };
633        fsck(fs.clone()).await.expect("fsck failed");
634    }
635
636    #[fuchsia::test]
637    async fn test_tombstone_large_attribute() {
638        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
639        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
640        let root_store = fs.root_store();
641        let mut transaction = fs
642            .clone()
643            .new_transaction(lock_keys![], Options::default())
644            .await
645            .expect("new_transaction failed");
646
647        let handle = ObjectStore::create_object(
648            &root_store,
649            &mut transaction,
650            HandleOptions::default(),
651            None,
652        )
653        .await
654        .expect("failed to create object");
655        transaction.commit().await.expect("commit failed");
656
657        let object_id = {
658            let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
659            transaction.add(
660                root_store.store_object_id(),
661                Mutation::replace_or_insert_object(
662                    ObjectKey::graveyard_attribute_entry(
663                        root_store.graveyard_directory_object_id(),
664                        handle.object_id(),
665                        FSVERITY_MERKLE_ATTRIBUTE_ID,
666                    ),
667                    ObjectValue::Some,
668                ),
669            );
670
671            // This write should span three transactions. This test mimics the behavior when the
672            // last transaction gets interrupted by a filesystem.close().
673            handle
674                .write_new_attr_in_batches(
675                    &mut transaction,
676                    FSVERITY_MERKLE_ATTRIBUTE_ID,
677                    &vec![0; 3 * WRITE_ATTR_BATCH_SIZE],
678                    WRITE_ATTR_BATCH_SIZE,
679                )
680                .await
681                .expect("failed to write merkle attribute");
682
683            handle.object_id()
684            // Drop the transaction to simulate interrupting the merkle tree creation as well as to
685            // release the transaction locks.
686        };
687
688        fs.close().await.expect("failed to close filesystem");
689        let device = fs.take_device().await;
690        device.reopen(false);
691
692        let fs =
693            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
694        fsck(fs.clone()).await.expect("fsck failed");
695        fs.close().await.expect("failed to close filesystem");
696        let device = fs.take_device().await;
697        device.reopen(false);
698
699        // On open, the filesystem will call initial_reap which will call queue_tombstone().
700        let fs = FxFilesystem::open(device).await.expect("open failed");
701        // `wait_for_reap` ensures that the two tombstone messages are processed.
702        fs.graveyard().wait_for_reap().await;
703
704        let root_store = fs.root_store();
705
706        let handle =
707            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
708                .await
709                .expect("failed to open object");
710
711        assert_eq!(
712            handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
713            None
714        );
715        fsck(fs.clone()).await.expect("fsck failed");
716    }
717}