1use crate::filesystem::TxnGuard;
8use crate::log::*;
9use crate::lsm_tree::types::{ItemRef, LayerIterator};
10use crate::lsm_tree::{LSMTree, layers_from_handles};
11use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
12use crate::object_store::extent_record::ExtentValue;
13use crate::object_store::object_manager::{ObjectManager, ReservationUpdate};
14use crate::object_store::object_record::{ObjectKey, ObjectValue};
15use crate::object_store::transaction::{AssociatedObject, LockKey, Mutation, lock_keys};
16use crate::object_store::{
17 AssocObj, DirectWriter, EncryptedMutations, HandleOptions, LastObjectId, LastObjectIdInfo,
18 LockState, MAX_ENCRYPTED_MUTATIONS_SIZE, ObjectStore, Options, StoreInfo,
19 layer_size_from_encrypted_mutations_size, tree,
20};
21use crate::serialized_types::{LATEST_VERSION, Version, VersionedLatest};
22use anyhow::{Context, Error};
23use fxfs_crypto::{EncryptionKey, KeyPurpose};
24use std::sync::OnceLock;
25use std::sync::atomic::Ordering;
26
27#[derive(Copy, Clone, Debug, PartialEq, Eq)]
28pub enum Reason {
29 Journal,
31
32 Unlock,
34}
35
36#[derive(Debug)]
41enum FlushResult<T> {
42 Ok(T),
43 CryptError(Error),
44}
45
46#[fxfs_trace::trace]
47impl ObjectStore {
48 #[trace("store_object_id" => self.store_object_id)]
49 pub async fn flush_with_reason(&self, reason: Reason) -> Result<Version, Error> {
50 const MAX_RETRIES: usize = 10;
58 let mut retries = 0;
59 loop {
60 match self.try_flush_with_reason(reason).await? {
61 FlushResult::Ok(version) => return Ok(version),
62 FlushResult::CryptError(error) => {
63 if reason == Reason::Unlock || retries >= MAX_RETRIES {
64 return Err(error);
67 }
68 log::warn!(
69 error:?;
70 "Flushing failed for store {}, re-locking and trying again.",
71 self.store_object_id()
72 );
73 let owner = self.lock_state.lock().owner();
74 if let Some(owner) = owner {
75 owner
76 .force_lock(&self)
77 .await
78 .context("Failed to re-lock store during flush")?;
79 }
80 }
83 }
84 retries += 1;
85 }
86 }
87
88 async fn try_flush_with_reason(&self, reason: Reason) -> Result<FlushResult<Version>, Error> {
89 if self.parent_store.is_none() {
90 return Ok(FlushResult::Ok(self.tree.get_earliest_version()));
92 }
93 let filesystem = self.filesystem();
94 let object_manager = filesystem.object_manager();
95
96 let txn_guard = filesystem.clone().txn_guard().await;
98 let keys = lock_keys![LockKey::flush(self.store_object_id())];
99 let _guard = Some(filesystem.lock_manager().write_lock(keys).await);
100
101 if matches!(*self.lock_state.lock(), LockState::Deleted) {
103 return Ok(FlushResult::Ok(LATEST_VERSION));
106 }
107
108 match reason {
109 Reason::Unlock => {
110 if self.store_info().unwrap().encrypted_mutations_object_id == INVALID_OBJECT_ID {
116 return Ok(FlushResult::Ok(self.tree.get_earliest_version()));
121 }
122 }
123 Reason::Journal => {
124 let earliest_version = self.tree.get_earliest_version();
127 if !object_manager.needs_flush(self.store_object_id)
128 && earliest_version == LATEST_VERSION
129 {
130 return Ok(FlushResult::Ok(earliest_version));
133 }
134 }
135 }
136
137 let trace = self.trace.load(Ordering::Relaxed);
138 if trace {
139 info!(store_id = self.store_object_id(); "OS: begin flush");
140 }
141
142 if matches!(&*self.lock_state.lock(), LockState::Locked) {
143 self.flush_locked(&txn_guard).await.with_context(|| {
144 format!("Failed to flush object store {}", self.store_object_id)
145 })?;
146 } else {
147 if let FlushResult::CryptError(error) = self
148 .flush_unlocked(&txn_guard, reason)
149 .await
150 .with_context(|| format!("Failed to flush object store {}", self.store_object_id))?
151 {
152 return Ok(FlushResult::CryptError(error));
153 }
154 }
155
156 if trace {
157 info!(store_id = self.store_object_id(); "OS: end flush");
158 }
159 if let Some(callback) = &*self.flush_callback.lock() {
160 callback(self);
161 }
162
163 let mut counters = self.counters.lock();
164 counters.num_flushes += 1;
165 counters.last_flush_time = Some(std::time::SystemTime::now());
166 Ok(FlushResult::Ok(self.tree.get_earliest_version()))
168 }
169
170 async fn flush_unlocked(
172 &self,
173 txn_guard: &TxnGuard<'_>,
174 reason: Reason,
175 ) -> Result<FlushResult<Vec<u64>>, Error> {
176 let roll_mutations_key = self
177 .mutations_cipher
178 .lock()
179 .as_ref()
180 .map(|cipher| {
181 cipher.offset() >= self.filesystem().options().roll_metadata_key_byte_count
182 })
183 .unwrap_or(false);
184 if roll_mutations_key {
185 if let Err(error) = self.roll_mutations_key(self.crypt().unwrap().as_ref()).await {
186 log::warn!(
187 error:?; "Failed to roll mutations key for store {}", self.store_object_id());
188 return Ok(FlushResult::CryptError(error));
189 }
190 }
191
192 struct StoreInfoSnapshot<'a> {
193 store: &'a ObjectStore,
194 store_info: OnceLock<StoreInfo>,
195 }
196 impl AssociatedObject for StoreInfoSnapshot<'_> {
197 fn will_apply_mutation(
198 &self,
199 _mutation: &Mutation,
200 _object_id: u64,
201 _manager: &ObjectManager,
202 ) {
203 let mut store_info = self.store.store_info().unwrap();
204
205 let mutations_cipher = self.store.mutations_cipher.lock();
207 if let Some(cipher) = mutations_cipher.as_ref() {
208 store_info.mutations_cipher_offset = cipher.offset();
209 }
210
211 self.store_info.set(store_info).unwrap();
212 }
213 }
214
215 let store_info_snapshot = StoreInfoSnapshot { store: self, store_info: OnceLock::new() };
216
217 let filesystem = self.filesystem();
218 let object_manager = filesystem.object_manager();
219 let reservation = object_manager.metadata_reservation();
220 let txn_options = Options {
221 skip_journal_checks: true,
222 borrow_metadata_space: true,
223 allocator_reservation: Some(reservation),
224 txn_guard: Some(txn_guard),
225 ..Default::default()
226 };
227
228 let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
231 transaction.add_with_object(
232 self.store_object_id(),
233 Mutation::BeginFlush,
234 AssocObj::Borrowed(&store_info_snapshot),
235 );
236 transaction.commit().await?;
237
238 let mut new_store_info = store_info_snapshot.store_info.into_inner().unwrap();
239
240 let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
245
246 let parent_store = self.parent_store.as_ref().unwrap();
248 let handle_options = HandleOptions { skip_journal_checks: true, ..Default::default() };
249 let new_object_tree_layer = if let Some(crypt) = self.crypt().as_deref() {
250 let object_id = parent_store.get_next_object_id(transaction.txn_guard()).await?;
251 let (key, unwrapped_key) =
252 match crypt.create_key(object_id.get(), KeyPurpose::Data).await {
253 Ok((key, unwrapped_key)) => (key, unwrapped_key),
254 Err(status) => {
255 log::warn!(
256 status:?;
257 "Failed to create keys while flushing store {}",
258 self.store_object_id(),
259 );
260 return Ok(FlushResult::CryptError(status.into()));
261 }
262 };
263 ObjectStore::create_object_with_key(
264 parent_store,
265 &mut transaction,
266 object_id,
267 handle_options,
268 EncryptionKey::Fxfs(key),
269 unwrapped_key,
270 )
271 .await?
272 } else {
273 ObjectStore::create_object(parent_store, &mut transaction, handle_options, None).await?
274 };
275 let writer = DirectWriter::new(&new_object_tree_layer, txn_options).await;
276 let new_object_tree_layer_object_id = new_object_tree_layer.object_id();
277 parent_store.add_to_graveyard(&mut transaction, new_object_tree_layer_object_id);
278
279 transaction.commit().await?;
280
281 let (layers_to_keep, old_layers) = tree::flush(
283 &self.tree,
284 writer,
285 (reason == Reason::Journal).then(|| filesystem.journal().get_compaction_yielder()),
286 )
287 .await
288 .context("Failed to flush tree")?;
289
290 let mut new_layers = layers_from_handles([new_object_tree_layer]).await?;
292 new_layers.extend(layers_to_keep.iter().map(|l| (*l).clone()));
293
294 new_store_info.layers = Vec::new();
295 for layer in &new_layers {
296 if let Some(handle) = layer.handle() {
297 new_store_info.layers.push(handle.object_id());
298 }
299 }
300
301 let reservation_update: ReservationUpdate; let mut end_transaction = filesystem
303 .clone()
304 .new_transaction(
305 lock_keys![LockKey::object(
306 self.parent_store.as_ref().unwrap().store_object_id(),
307 self.store_info_handle_object_id().unwrap(),
308 )],
309 txn_options,
310 )
311 .await?;
312
313 parent_store.remove_from_graveyard(&mut end_transaction, new_object_tree_layer_object_id);
314
315 for layer in &old_layers {
317 if let Some(handle) = layer.handle() {
318 parent_store.add_to_graveyard(&mut end_transaction, handle.object_id());
319 }
320 }
321
322 let old_encrypted_mutations_object_id =
323 std::mem::replace(&mut new_store_info.encrypted_mutations_object_id, INVALID_OBJECT_ID);
324 if old_encrypted_mutations_object_id != INVALID_OBJECT_ID {
325 parent_store.add_to_graveyard(&mut end_transaction, old_encrypted_mutations_object_id);
326 }
327
328 match &mut new_store_info.last_object_id {
336 LastObjectIdInfo::Unencrypted { id } => {
337 let LastObjectId::Unencrypted { id: in_memory_value } =
338 &*self.last_object_id.lock()
339 else {
340 unreachable!()
341 };
342 *id = *in_memory_value;
343 }
344 LastObjectIdInfo::Encrypted { id, key } => {
345 let LastObjectId::Encrypted { id: in_memory_value, .. } =
346 &*self.last_object_id.lock()
347 else {
348 unreachable!()
349 };
350 *id = *in_memory_value;
351 let guard = self.store_info.lock();
352 let current_store_info = guard.as_ref().unwrap();
353 let LastObjectIdInfo::Encrypted { key: in_memory_value, .. } =
354 ¤t_store_info.last_object_id
355 else {
356 unreachable!()
357 };
358 *key = in_memory_value.clone();
359 }
360 LastObjectIdInfo::Low32Bit => {}
361 }
362
363 self.write_store_info(&mut end_transaction, &new_store_info).await?;
364
365 let layer_file_sizes = new_layers
366 .iter()
367 .map(|l| l.handle().map(ReadObjectHandle::get_size).unwrap_or(0))
368 .collect::<Vec<u64>>();
369
370 let total_layer_size = layer_file_sizes.iter().sum();
371 reservation_update =
372 ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size));
373
374 end_transaction.add_with_object(
375 self.store_object_id(),
376 Mutation::EndFlush,
377 AssocObj::Borrowed(&reservation_update),
378 );
379
380 if self.trace.load(Ordering::Relaxed) {
381 info!(
382 store_id = self.store_object_id(),
383 old_layer_count = old_layers.len(),
384 new_layer_count = new_layers.len(),
385 total_layer_size,
386 new_store_info:?;
387 "OS: compacting"
388 );
389 }
390
391 end_transaction
392 .commit_with_callback(|_| {
393 let mut store_info = self.store_info.lock();
394 let info = store_info.as_mut().unwrap();
395 info.layers = new_store_info.layers;
396 info.encrypted_mutations_object_id = new_store_info.encrypted_mutations_object_id;
397 info.mutations_cipher_offset = new_store_info.mutations_cipher_offset;
398 self.tree.set_layers(new_layers);
399 })
400 .await?;
401
402 for layer in old_layers {
404 let object_id = layer.handle().map(|h| h.object_id());
405 layer.close_layer().await;
406 if let Some(object_id) = object_id {
407 parent_store.tombstone_object(object_id, txn_options).await?;
408 }
409 }
410
411 if old_encrypted_mutations_object_id != INVALID_OBJECT_ID {
412 parent_store.tombstone_object(old_encrypted_mutations_object_id, txn_options).await?;
413 }
414
415 Ok(FlushResult::Ok(layer_file_sizes))
416 }
417
418 async fn flush_locked(&self, txn_guard: &TxnGuard<'_>) -> Result<(), Error> {
420 let filesystem = self.filesystem();
421 let object_manager = filesystem.object_manager();
422 let reservation = object_manager.metadata_reservation();
423 let txn_options = Options {
424 skip_journal_checks: true,
425 borrow_metadata_space: true,
426 allocator_reservation: Some(reservation),
427 txn_guard: Some(txn_guard),
428 ..Default::default()
429 };
430
431 let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
432 transaction.add(self.store_object_id(), Mutation::BeginFlush);
433 transaction.commit().await?;
434
435 let mut new_store_info = self.load_store_info().await?;
436
437 let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
442
443 let reservation_update: ReservationUpdate; let handle; let mut end_transaction;
446
447 let parent_store = self.parent_store.as_ref().unwrap();
450 handle = if new_store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
451 let handle = ObjectStore::create_object(
452 parent_store,
453 &mut transaction,
454 HandleOptions { skip_journal_checks: true, ..Default::default() },
455 None,
456 )
457 .await?;
458 let oid = handle.object_id();
459 end_transaction = filesystem
460 .clone()
461 .new_transaction(
462 lock_keys![
463 LockKey::object(parent_store.store_object_id(), oid),
464 LockKey::object(
465 parent_store.store_object_id(),
466 self.store_info_handle_object_id().unwrap(),
467 ),
468 ],
469 txn_options,
470 )
471 .await?;
472 new_store_info.encrypted_mutations_object_id = oid;
473 parent_store.add_to_graveyard(&mut transaction, oid);
474 parent_store.remove_from_graveyard(&mut end_transaction, oid);
475 handle
476 } else {
477 end_transaction = filesystem
478 .clone()
479 .new_transaction(
480 lock_keys![
481 LockKey::object(
482 parent_store.store_object_id(),
483 new_store_info.encrypted_mutations_object_id,
484 ),
485 LockKey::object(
486 parent_store.store_object_id(),
487 self.store_info_handle_object_id().unwrap(),
488 ),
489 ],
490 txn_options,
491 )
492 .await?;
493 ObjectStore::open_object(
494 parent_store,
495 new_store_info.encrypted_mutations_object_id,
496 HandleOptions { skip_journal_checks: true, ..Default::default() },
497 None,
498 )
499 .await?
500 };
501 transaction.commit().await?;
502
503 let journaled = filesystem
506 .journal()
507 .read_transactions_for_object(self.store_object_id)
508 .await
509 .context("Failed to read encrypted mutations from journal")?;
510 let mut buffer = handle.allocate_buffer(MAX_ENCRYPTED_MUTATIONS_SIZE).await;
511 let mut cursor = std::io::Cursor::new(buffer.as_mut_slice());
512 EncryptedMutations::from_replayed_mutations(self.store_object_id, journaled)
513 .serialize_with_version(&mut cursor)?;
514 let len = cursor.position() as usize;
515 handle.txn_write(&mut end_transaction, handle.get_size(), buffer.subslice(..len)).await?;
516
517 self.write_store_info(&mut end_transaction, &new_store_info).await?;
518
519 let mut total_layer_size = 0;
520 for &oid in &new_store_info.layers {
521 total_layer_size += parent_store.get_file_size(oid).await?;
522 }
523 total_layer_size +=
524 layer_size_from_encrypted_mutations_size(handle.get_size() + len as u64);
525
526 reservation_update =
527 ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size));
528
529 end_transaction.add_with_object(
530 self.store_object_id(),
531 Mutation::EndFlush,
532 AssocObj::Borrowed(&reservation_update),
533 );
534
535 end_transaction.commit().await?;
536
537 Ok(())
538 }
539}
540
541#[cfg(test)]
542mod tests {
543 use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, JournalingObject, SyncOptions};
544 use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle};
545 use crate::object_store::directory::Directory;
546 use crate::object_store::transaction::{Options, lock_keys};
547 use crate::object_store::volume::root_volume;
548 use crate::object_store::{
549 HandleOptions, LockKey, NO_OWNER, NewChildStoreOptions, ObjectStore, StoreOptions,
550 layer_size_from_encrypted_mutations_size, tree,
551 };
552 use fxfs_insecure_crypto::new_insecure_crypt;
553 use std::sync::Arc;
554 use storage_device::DeviceHolder;
555 use storage_device::fake_device::FakeDevice;
556
557 async fn run_key_roll_test(flush_before_unlock: bool) {
558 let device = DeviceHolder::new(FakeDevice::new(8192, 1024));
559 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
560 let store_id = {
561 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
562 root_volume
563 .new_volume(
564 "test",
565 NewChildStoreOptions {
566 options: StoreOptions {
567 crypt: Some(Arc::new(new_insecure_crypt())),
568 ..StoreOptions::default()
569 },
570 ..NewChildStoreOptions::default()
571 },
572 )
573 .await
574 .expect("new_volume failed")
575 .store_object_id()
576 };
577
578 fs.close().await.expect("close failed");
579 let device = fs.take_device().await;
580 device.reopen(false);
581
582 let fs = FxFilesystemBuilder::new()
583 .roll_metadata_key_byte_count(512 * 1024)
584 .open(device)
585 .await
586 .expect("open failed");
587
588 let (first_filename, last_filename) = {
589 let store = fs.object_manager().store(store_id).expect("store not found");
590 store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
591
592 let root_dir = Directory::open(&store, store.root_directory_object_id())
594 .await
595 .expect("open failed");
596
597 let mut last_mutations_cipher_offset = 0;
598 let mut i = 0;
599 let first_filename = format!("{:<200}", i);
600 loop {
601 let mut transaction = fs
602 .clone()
603 .new_transaction(
604 lock_keys![LockKey::object(store_id, root_dir.object_id())],
605 Options::default(),
606 )
607 .await
608 .expect("new_transaction failed");
609 root_dir
610 .create_child_file(&mut transaction, &format!("{:<200}", i))
611 .await
612 .expect("create_child_file failed");
613 i += 1;
614 transaction.commit().await.expect("commit failed");
615 let cipher_offset = store.mutations_cipher.lock().as_ref().unwrap().offset();
616 if cipher_offset < last_mutations_cipher_offset {
617 break;
618 }
619 last_mutations_cipher_offset = cipher_offset;
620 }
621
622 fs.sync(SyncOptions::default()).await.expect("sync failed");
625
626 let mut transaction = fs
628 .clone()
629 .new_transaction(
630 lock_keys![LockKey::object(store_id, root_dir.object_id())],
631 Options::default(),
632 )
633 .await
634 .expect("new_transaction failed");
635 let last_filename = format!("{:<200}", i);
636 root_dir
637 .create_child_file(&mut transaction, &last_filename)
638 .await
639 .expect("create_child_file failed");
640 transaction.commit().await.expect("commit failed");
641 (first_filename, last_filename)
642 };
643
644 fs.close().await.expect("close failed");
645
646 let device = fs.take_device().await;
648 device.reopen(false);
649 let fs = FxFilesystemBuilder::new()
650 .roll_metadata_key_byte_count(512 * 1024)
651 .open(device)
652 .await
653 .expect("open failed");
654
655 if flush_before_unlock {
656 fs.object_manager().flush().await.expect("flush failed");
659 }
660
661 {
662 let store = fs.object_manager().store(store_id).expect("store not found");
663 store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
664
665 assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
667
668 let root_dir = Directory::open(&store, store.root_directory_object_id())
669 .await
670 .expect("open failed");
671 root_dir
672 .lookup(&first_filename)
673 .await
674 .expect("Lookup failed")
675 .expect("First created file wasn't present");
676 root_dir
677 .lookup(&last_filename)
678 .await
679 .expect("Lookup failed")
680 .expect("Last created file wasn't present");
681 }
682 }
683
684 #[fuchsia::test(threads = 10)]
685 async fn test_metadata_key_roll() {
686 run_key_roll_test(false).await;
687 }
688
689 #[fuchsia::test(threads = 10)]
690 async fn test_metadata_key_roll_with_flush_before_unlock() {
691 run_key_roll_test(true).await;
692 }
693
694 #[fuchsia::test]
695 async fn test_flush_when_locked() {
696 let device = DeviceHolder::new(FakeDevice::new(8192, 1024));
697 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
698 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
699 let crypt = Arc::new(new_insecure_crypt());
700 let store = root_volume
701 .new_volume(
702 "test",
703 NewChildStoreOptions {
704 options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
705 ..NewChildStoreOptions::default()
706 },
707 )
708 .await
709 .expect("new_volume failed");
710 let root_dir =
711 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
712 let mut transaction = fs
713 .clone()
714 .new_transaction(
715 lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
716 Options::default(),
717 )
718 .await
719 .expect("new_transaction failed");
720 let foo = root_dir
721 .create_child_file(&mut transaction, "foo")
722 .await
723 .expect("create_child_file failed");
724 transaction.commit().await.expect("commit failed");
725
726 store.flush().await.expect("flush failed");
730
731 let mut transaction = fs
732 .clone()
733 .new_transaction(
734 lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
735 Options::default(),
736 )
737 .await
738 .expect("new_transaction failed");
739 let bar = root_dir
740 .create_child_file(&mut transaction, "bar")
741 .await
742 .expect("create_child_file failed");
743 transaction.commit().await.expect("commit failed");
744
745 store.lock().await.expect("lock failed");
746
747 store.flush().await.expect("flush failed");
749
750 let info = store.load_store_info().await.unwrap();
752 let parent_store = store.parent_store().unwrap();
753 let mut total_layer_size = 0;
754 for &oid in &info.layers {
755 total_layer_size +=
756 parent_store.get_file_size(oid).await.expect("get_file_size failed");
757 }
758 assert_ne!(info.encrypted_mutations_object_id, INVALID_OBJECT_ID);
759 total_layer_size += layer_size_from_encrypted_mutations_size(
760 parent_store
761 .get_file_size(info.encrypted_mutations_object_id)
762 .await
763 .expect("get_file_size failed"),
764 );
765 assert_eq!(
766 fs.object_manager().reservation(store.store_object_id()),
767 Some(tree::reservation_amount_from_layer_size(total_layer_size))
768 );
769
770 store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
772
773 ObjectStore::open_object(&store, foo.object_id(), HandleOptions::default(), None)
774 .await
775 .expect("open_object failed");
776
777 ObjectStore::open_object(&store, bar.object_id(), HandleOptions::default(), None)
778 .await
779 .expect("open_object failed");
780
781 fs.close().await.expect("close failed");
782 }
783}
784
785impl tree::MajorCompactable<ObjectKey, ObjectValue> for LSMTree<ObjectKey, ObjectValue> {
786 async fn major_iter(
787 iter: impl LayerIterator<ObjectKey, ObjectValue>,
788 ) -> Result<impl LayerIterator<ObjectKey, ObjectValue>, Error> {
789 iter.filter(|item: ItemRef<'_, _, _>| match item {
790 ItemRef { value: ObjectValue::None, .. } => false,
792 ItemRef { value: ObjectValue::Extent(ExtentValue::None), .. } => false,
794 _ => true,
795 })
796 .await
797 }
798}