1use crate::filesystem::TxnGuard;
8use crate::log::*;
9use crate::lsm_tree::types::{ItemRef, LayerIterator};
10use crate::lsm_tree::{LSMTree, layers_from_handles};
11use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
12use crate::object_store::extent_record::ExtentValue;
13use crate::object_store::object_manager::{ObjectManager, ReservationUpdate};
14use crate::object_store::object_record::{ObjectKey, ObjectValue};
15use crate::object_store::transaction::{AssociatedObject, LockKey, Mutation, lock_keys};
16use crate::object_store::{
17 AssocObj, DirectWriter, EncryptedMutations, HandleOptions, LastObjectId, LastObjectIdInfo,
18 LockState, MAX_ENCRYPTED_MUTATIONS_SIZE, ObjectStore, Options, StoreInfo,
19 layer_size_from_encrypted_mutations_size, tree,
20};
21use crate::serialized_types::{LATEST_VERSION, Version, VersionedLatest};
22use anyhow::{Context, Error};
23use fxfs_crypto::{EncryptionKey, KeyPurpose};
24use std::sync::OnceLock;
25use std::sync::atomic::Ordering;
26
27#[derive(Copy, Clone, Debug, PartialEq, Eq)]
28pub enum Reason {
29 Journal,
31
32 Unlock,
34}
35
36#[derive(Debug)]
41enum FlushResult<T> {
42 Ok(T),
43 CryptError(Error),
44}
45
46#[fxfs_trace::trace]
47impl ObjectStore {
48 #[trace("store_object_id" => self.store_object_id)]
49 pub async fn flush_with_reason(&self, reason: Reason) -> Result<Version, Error> {
50 const MAX_RETRIES: usize = 10;
58 let mut retries = 0;
59 loop {
60 match self.try_flush_with_reason(reason).await? {
61 FlushResult::Ok(version) => return Ok(version),
62 FlushResult::CryptError(error) => {
63 if reason == Reason::Unlock || retries >= MAX_RETRIES {
64 return Err(error);
67 }
68 log::warn!(
69 error:?;
70 "Flushing failed for store {}, re-locking and trying again.",
71 self.store_object_id()
72 );
73 let owner = self.lock_state.lock().owner();
74 if let Some(owner) = owner {
75 owner
76 .force_lock(&self)
77 .await
78 .context("Failed to re-lock store during flush")?;
79 }
80 }
83 }
84 retries += 1;
85 }
86 }
87
88 async fn try_flush_with_reason(&self, reason: Reason) -> Result<FlushResult<Version>, Error> {
89 if self.parent_store.is_none() {
90 return Ok(FlushResult::Ok(self.tree.get_earliest_version()));
92 }
93 let filesystem = self.filesystem();
94 let object_manager = filesystem.object_manager();
95
96 let txn_guard = filesystem.clone().txn_guard().await;
98 let keys = lock_keys![LockKey::flush(self.store_object_id())];
99 let _guard = Some(filesystem.lock_manager().write_lock(keys).await);
100
101 if matches!(*self.lock_state.lock(), LockState::Deleted) {
103 return Ok(FlushResult::Ok(LATEST_VERSION));
106 }
107
108 match reason {
109 Reason::Unlock => {
110 if self.store_info().unwrap().encrypted_mutations_object_id == INVALID_OBJECT_ID {
116 return Ok(FlushResult::Ok(self.tree.get_earliest_version()));
121 }
122 }
123 Reason::Journal => {
124 let earliest_version = self.tree.get_earliest_version();
127 if !object_manager.needs_flush(self.store_object_id)
128 && earliest_version == LATEST_VERSION
129 {
130 return Ok(FlushResult::Ok(earliest_version));
133 }
134 }
135 }
136
137 let trace = self.trace.load(Ordering::Relaxed);
138 if trace {
139 info!(store_id = self.store_object_id(); "OS: begin flush");
140 }
141
142 if matches!(&*self.lock_state.lock(), LockState::Locked) {
143 self.flush_locked(&txn_guard).await.with_context(|| {
144 format!("Failed to flush object store {}", self.store_object_id)
145 })?;
146 } else {
147 if let FlushResult::CryptError(error) = self
148 .flush_unlocked(&txn_guard)
149 .await
150 .with_context(|| format!("Failed to flush object store {}", self.store_object_id))?
151 {
152 return Ok(FlushResult::CryptError(error));
153 }
154 }
155
156 if trace {
157 info!(store_id = self.store_object_id(); "OS: end flush");
158 }
159 if let Some(callback) = &*self.flush_callback.lock() {
160 callback(self);
161 }
162
163 let mut counters = self.counters.lock();
164 counters.num_flushes += 1;
165 counters.last_flush_time = Some(std::time::SystemTime::now());
166 Ok(FlushResult::Ok(self.tree.get_earliest_version()))
168 }
169
170 async fn flush_unlocked(
172 &self,
173 txn_guard: &TxnGuard<'_>,
174 ) -> Result<FlushResult<Vec<u64>>, Error> {
175 let roll_mutations_key = self
176 .mutations_cipher
177 .lock()
178 .as_ref()
179 .map(|cipher| {
180 cipher.offset() >= self.filesystem().options().roll_metadata_key_byte_count
181 })
182 .unwrap_or(false);
183 if roll_mutations_key {
184 if let Err(error) = self.roll_mutations_key(self.crypt().unwrap().as_ref()).await {
185 log::warn!(
186 error:?; "Failed to roll mutations key for store {}", self.store_object_id());
187 return Ok(FlushResult::CryptError(error));
188 }
189 }
190
191 struct StoreInfoSnapshot<'a> {
192 store: &'a ObjectStore,
193 store_info: OnceLock<StoreInfo>,
194 }
195 impl AssociatedObject for StoreInfoSnapshot<'_> {
196 fn will_apply_mutation(
197 &self,
198 _mutation: &Mutation,
199 _object_id: u64,
200 _manager: &ObjectManager,
201 ) {
202 let mut store_info = self.store.store_info().unwrap();
203
204 let mutations_cipher = self.store.mutations_cipher.lock();
206 if let Some(cipher) = mutations_cipher.as_ref() {
207 store_info.mutations_cipher_offset = cipher.offset();
208 }
209
210 self.store_info.set(store_info).unwrap();
211 }
212 }
213
214 let store_info_snapshot = StoreInfoSnapshot { store: self, store_info: OnceLock::new() };
215
216 let filesystem = self.filesystem();
217 let object_manager = filesystem.object_manager();
218 let reservation = object_manager.metadata_reservation();
219 let txn_options = Options {
220 skip_journal_checks: true,
221 borrow_metadata_space: true,
222 allocator_reservation: Some(reservation),
223 txn_guard: Some(txn_guard),
224 ..Default::default()
225 };
226
227 let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
230 transaction.add_with_object(
231 self.store_object_id(),
232 Mutation::BeginFlush,
233 AssocObj::Borrowed(&store_info_snapshot),
234 );
235 transaction.commit().await?;
236
237 let mut new_store_info = store_info_snapshot.store_info.into_inner().unwrap();
238
239 let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
244
245 let parent_store = self.parent_store.as_ref().unwrap();
247 let handle_options = HandleOptions { skip_journal_checks: true, ..Default::default() };
248 let new_object_tree_layer = if let Some(crypt) = self.crypt().as_deref() {
249 let object_id = parent_store.get_next_object_id(transaction.txn_guard()).await?;
250 let (key, unwrapped_key) =
251 match crypt.create_key(object_id.get(), KeyPurpose::Data).await {
252 Ok((key, unwrapped_key)) => (key, unwrapped_key),
253 Err(status) => {
254 log::warn!(
255 status:?;
256 "Failed to create keys while flushing store {}",
257 self.store_object_id(),
258 );
259 return Ok(FlushResult::CryptError(status.into()));
260 }
261 };
262 ObjectStore::create_object_with_key(
263 parent_store,
264 &mut transaction,
265 object_id,
266 handle_options,
267 EncryptionKey::Fxfs(key),
268 unwrapped_key,
269 )
270 .await?
271 } else {
272 ObjectStore::create_object(parent_store, &mut transaction, handle_options, None).await?
273 };
274 let writer = DirectWriter::new(&new_object_tree_layer, txn_options).await;
275 let new_object_tree_layer_object_id = new_object_tree_layer.object_id();
276 parent_store.add_to_graveyard(&mut transaction, new_object_tree_layer_object_id);
277
278 transaction.commit().await?;
279 let (layers_to_keep, old_layers) =
280 tree::flush(&self.tree, writer).await.context("Failed to flush tree")?;
281
282 let mut new_layers = layers_from_handles([new_object_tree_layer]).await?;
283 new_layers.extend(layers_to_keep.iter().map(|l| (*l).clone()));
284
285 new_store_info.layers = Vec::new();
286 for layer in &new_layers {
287 if let Some(handle) = layer.handle() {
288 new_store_info.layers.push(handle.object_id());
289 }
290 }
291
292 let reservation_update: ReservationUpdate; let mut end_transaction = filesystem
294 .clone()
295 .new_transaction(
296 lock_keys![LockKey::object(
297 self.parent_store.as_ref().unwrap().store_object_id(),
298 self.store_info_handle_object_id().unwrap(),
299 )],
300 txn_options,
301 )
302 .await?;
303
304 parent_store.remove_from_graveyard(&mut end_transaction, new_object_tree_layer_object_id);
305
306 for layer in &old_layers {
308 if let Some(handle) = layer.handle() {
309 parent_store.add_to_graveyard(&mut end_transaction, handle.object_id());
310 }
311 }
312
313 let old_encrypted_mutations_object_id =
314 std::mem::replace(&mut new_store_info.encrypted_mutations_object_id, INVALID_OBJECT_ID);
315 if old_encrypted_mutations_object_id != INVALID_OBJECT_ID {
316 parent_store.add_to_graveyard(&mut end_transaction, old_encrypted_mutations_object_id);
317 }
318
319 match &mut new_store_info.last_object_id {
327 LastObjectIdInfo::Unencrypted { id } => {
328 let LastObjectId::Unencrypted { id: in_memory_value } =
329 &*self.last_object_id.lock()
330 else {
331 unreachable!()
332 };
333 *id = *in_memory_value;
334 }
335 LastObjectIdInfo::Encrypted { id, key } => {
336 let LastObjectId::Encrypted { id: in_memory_value, .. } =
337 &*self.last_object_id.lock()
338 else {
339 unreachable!()
340 };
341 *id = *in_memory_value;
342 let guard = self.store_info.lock();
343 let current_store_info = guard.as_ref().unwrap();
344 let LastObjectIdInfo::Encrypted { key: in_memory_value, .. } =
345 ¤t_store_info.last_object_id
346 else {
347 unreachable!()
348 };
349 *key = in_memory_value.clone();
350 }
351 LastObjectIdInfo::Low32Bit => {}
352 }
353
354 self.write_store_info(&mut end_transaction, &new_store_info).await?;
355
356 let layer_file_sizes = new_layers
357 .iter()
358 .map(|l| l.handle().map(ReadObjectHandle::get_size).unwrap_or(0))
359 .collect::<Vec<u64>>();
360
361 let total_layer_size = layer_file_sizes.iter().sum();
362 reservation_update =
363 ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size));
364
365 end_transaction.add_with_object(
366 self.store_object_id(),
367 Mutation::EndFlush,
368 AssocObj::Borrowed(&reservation_update),
369 );
370
371 if self.trace.load(Ordering::Relaxed) {
372 info!(
373 store_id = self.store_object_id(),
374 old_layer_count = old_layers.len(),
375 new_layer_count = new_layers.len(),
376 total_layer_size,
377 new_store_info:?;
378 "OS: compacting"
379 );
380 }
381
382 end_transaction
383 .commit_with_callback(|_| {
384 let mut store_info = self.store_info.lock();
385 let info = store_info.as_mut().unwrap();
386 info.layers = new_store_info.layers;
387 info.encrypted_mutations_object_id = new_store_info.encrypted_mutations_object_id;
388 info.mutations_cipher_offset = new_store_info.mutations_cipher_offset;
389 self.tree.set_layers(new_layers);
390 })
391 .await?;
392
393 for layer in old_layers {
395 let object_id = layer.handle().map(|h| h.object_id());
396 layer.close_layer().await;
397 if let Some(object_id) = object_id {
398 parent_store.tombstone_object(object_id, txn_options).await?;
399 }
400 }
401
402 if old_encrypted_mutations_object_id != INVALID_OBJECT_ID {
403 parent_store.tombstone_object(old_encrypted_mutations_object_id, txn_options).await?;
404 }
405
406 Ok(FlushResult::Ok(layer_file_sizes))
407 }
408
409 async fn flush_locked(&self, txn_guard: &TxnGuard<'_>) -> Result<(), Error> {
411 let filesystem = self.filesystem();
412 let object_manager = filesystem.object_manager();
413 let reservation = object_manager.metadata_reservation();
414 let txn_options = Options {
415 skip_journal_checks: true,
416 borrow_metadata_space: true,
417 allocator_reservation: Some(reservation),
418 txn_guard: Some(txn_guard),
419 ..Default::default()
420 };
421
422 let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
423 transaction.add(self.store_object_id(), Mutation::BeginFlush);
424 transaction.commit().await?;
425
426 let mut new_store_info = self.load_store_info().await?;
427
428 let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
433
434 let reservation_update: ReservationUpdate; let handle; let mut end_transaction;
437
438 let parent_store = self.parent_store.as_ref().unwrap();
441 handle = if new_store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
442 let handle = ObjectStore::create_object(
443 parent_store,
444 &mut transaction,
445 HandleOptions { skip_journal_checks: true, ..Default::default() },
446 None,
447 )
448 .await?;
449 let oid = handle.object_id();
450 end_transaction = filesystem
451 .clone()
452 .new_transaction(
453 lock_keys![
454 LockKey::object(parent_store.store_object_id(), oid),
455 LockKey::object(
456 parent_store.store_object_id(),
457 self.store_info_handle_object_id().unwrap(),
458 ),
459 ],
460 txn_options,
461 )
462 .await?;
463 new_store_info.encrypted_mutations_object_id = oid;
464 parent_store.add_to_graveyard(&mut transaction, oid);
465 parent_store.remove_from_graveyard(&mut end_transaction, oid);
466 handle
467 } else {
468 end_transaction = filesystem
469 .clone()
470 .new_transaction(
471 lock_keys![
472 LockKey::object(
473 parent_store.store_object_id(),
474 new_store_info.encrypted_mutations_object_id,
475 ),
476 LockKey::object(
477 parent_store.store_object_id(),
478 self.store_info_handle_object_id().unwrap(),
479 ),
480 ],
481 txn_options,
482 )
483 .await?;
484 ObjectStore::open_object(
485 parent_store,
486 new_store_info.encrypted_mutations_object_id,
487 HandleOptions { skip_journal_checks: true, ..Default::default() },
488 None,
489 )
490 .await?
491 };
492 transaction.commit().await?;
493
494 let journaled = filesystem
497 .journal()
498 .read_transactions_for_object(self.store_object_id)
499 .await
500 .context("Failed to read encrypted mutations from journal")?;
501 let mut buffer = handle.allocate_buffer(MAX_ENCRYPTED_MUTATIONS_SIZE).await;
502 let mut cursor = std::io::Cursor::new(buffer.as_mut_slice());
503 EncryptedMutations::from_replayed_mutations(self.store_object_id, journaled)
504 .serialize_with_version(&mut cursor)?;
505 let len = cursor.position() as usize;
506 handle.txn_write(&mut end_transaction, handle.get_size(), buffer.subslice(..len)).await?;
507
508 self.write_store_info(&mut end_transaction, &new_store_info).await?;
509
510 let mut total_layer_size = 0;
511 for &oid in &new_store_info.layers {
512 total_layer_size += parent_store.get_file_size(oid).await?;
513 }
514 total_layer_size +=
515 layer_size_from_encrypted_mutations_size(handle.get_size() + len as u64);
516
517 reservation_update =
518 ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size));
519
520 end_transaction.add_with_object(
521 self.store_object_id(),
522 Mutation::EndFlush,
523 AssocObj::Borrowed(&reservation_update),
524 );
525
526 end_transaction.commit().await?;
527
528 Ok(())
529 }
530}
531
532#[cfg(test)]
533mod tests {
534 use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, JournalingObject, SyncOptions};
535 use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle};
536 use crate::object_store::directory::Directory;
537 use crate::object_store::transaction::{Options, lock_keys};
538 use crate::object_store::volume::root_volume;
539 use crate::object_store::{
540 HandleOptions, LockKey, NO_OWNER, NewChildStoreOptions, ObjectStore, StoreOptions,
541 layer_size_from_encrypted_mutations_size, tree,
542 };
543 use fxfs_insecure_crypto::new_insecure_crypt;
544 use std::sync::Arc;
545 use storage_device::DeviceHolder;
546 use storage_device::fake_device::FakeDevice;
547
548 async fn run_key_roll_test(flush_before_unlock: bool) {
549 let device = DeviceHolder::new(FakeDevice::new(8192, 1024));
550 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
551 let store_id = {
552 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
553 root_volume
554 .new_volume(
555 "test",
556 NewChildStoreOptions {
557 options: StoreOptions {
558 crypt: Some(Arc::new(new_insecure_crypt())),
559 ..StoreOptions::default()
560 },
561 ..NewChildStoreOptions::default()
562 },
563 )
564 .await
565 .expect("new_volume failed")
566 .store_object_id()
567 };
568
569 fs.close().await.expect("close failed");
570 let device = fs.take_device().await;
571 device.reopen(false);
572
573 let fs = FxFilesystemBuilder::new()
574 .roll_metadata_key_byte_count(512 * 1024)
575 .open(device)
576 .await
577 .expect("open failed");
578
579 let (first_filename, last_filename) = {
580 let store = fs.object_manager().store(store_id).expect("store not found");
581 store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
582
583 let root_dir = Directory::open(&store, store.root_directory_object_id())
585 .await
586 .expect("open failed");
587
588 let mut last_mutations_cipher_offset = 0;
589 let mut i = 0;
590 let first_filename = format!("{:<200}", i);
591 loop {
592 let mut transaction = fs
593 .clone()
594 .new_transaction(
595 lock_keys![LockKey::object(store_id, root_dir.object_id())],
596 Options::default(),
597 )
598 .await
599 .expect("new_transaction failed");
600 root_dir
601 .create_child_file(&mut transaction, &format!("{:<200}", i))
602 .await
603 .expect("create_child_file failed");
604 i += 1;
605 transaction.commit().await.expect("commit failed");
606 let cipher_offset = store.mutations_cipher.lock().as_ref().unwrap().offset();
607 if cipher_offset < last_mutations_cipher_offset {
608 break;
609 }
610 last_mutations_cipher_offset = cipher_offset;
611 }
612
613 fs.sync(SyncOptions::default()).await.expect("sync failed");
616
617 let mut transaction = fs
619 .clone()
620 .new_transaction(
621 lock_keys![LockKey::object(store_id, root_dir.object_id())],
622 Options::default(),
623 )
624 .await
625 .expect("new_transaction failed");
626 let last_filename = format!("{:<200}", i);
627 root_dir
628 .create_child_file(&mut transaction, &last_filename)
629 .await
630 .expect("create_child_file failed");
631 transaction.commit().await.expect("commit failed");
632 (first_filename, last_filename)
633 };
634
635 fs.close().await.expect("close failed");
636
637 let device = fs.take_device().await;
639 device.reopen(false);
640 let fs = FxFilesystemBuilder::new()
641 .roll_metadata_key_byte_count(512 * 1024)
642 .open(device)
643 .await
644 .expect("open failed");
645
646 if flush_before_unlock {
647 fs.object_manager().flush().await.expect("flush failed");
650 }
651
652 {
653 let store = fs.object_manager().store(store_id).expect("store not found");
654 store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
655
656 assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
658
659 let root_dir = Directory::open(&store, store.root_directory_object_id())
660 .await
661 .expect("open failed");
662 root_dir
663 .lookup(&first_filename)
664 .await
665 .expect("Lookup failed")
666 .expect("First created file wasn't present");
667 root_dir
668 .lookup(&last_filename)
669 .await
670 .expect("Lookup failed")
671 .expect("Last created file wasn't present");
672 }
673 }
674
675 #[fuchsia::test(threads = 10)]
676 async fn test_metadata_key_roll() {
677 run_key_roll_test(false).await;
678 }
679
680 #[fuchsia::test(threads = 10)]
681 async fn test_metadata_key_roll_with_flush_before_unlock() {
682 run_key_roll_test(true).await;
683 }
684
685 #[fuchsia::test]
686 async fn test_flush_when_locked() {
687 let device = DeviceHolder::new(FakeDevice::new(8192, 1024));
688 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
689 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
690 let crypt = Arc::new(new_insecure_crypt());
691 let store = root_volume
692 .new_volume(
693 "test",
694 NewChildStoreOptions {
695 options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
696 ..NewChildStoreOptions::default()
697 },
698 )
699 .await
700 .expect("new_volume failed");
701 let root_dir =
702 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
703 let mut transaction = fs
704 .clone()
705 .new_transaction(
706 lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
707 Options::default(),
708 )
709 .await
710 .expect("new_transaction failed");
711 let foo = root_dir
712 .create_child_file(&mut transaction, "foo")
713 .await
714 .expect("create_child_file failed");
715 transaction.commit().await.expect("commit failed");
716
717 store.flush().await.expect("flush failed");
721
722 let mut transaction = fs
723 .clone()
724 .new_transaction(
725 lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
726 Options::default(),
727 )
728 .await
729 .expect("new_transaction failed");
730 let bar = root_dir
731 .create_child_file(&mut transaction, "bar")
732 .await
733 .expect("create_child_file failed");
734 transaction.commit().await.expect("commit failed");
735
736 store.lock().await.expect("lock failed");
737
738 store.flush().await.expect("flush failed");
740
741 let info = store.load_store_info().await.unwrap();
743 let parent_store = store.parent_store().unwrap();
744 let mut total_layer_size = 0;
745 for &oid in &info.layers {
746 total_layer_size +=
747 parent_store.get_file_size(oid).await.expect("get_file_size failed");
748 }
749 assert_ne!(info.encrypted_mutations_object_id, INVALID_OBJECT_ID);
750 total_layer_size += layer_size_from_encrypted_mutations_size(
751 parent_store
752 .get_file_size(info.encrypted_mutations_object_id)
753 .await
754 .expect("get_file_size failed"),
755 );
756 assert_eq!(
757 fs.object_manager().reservation(store.store_object_id()),
758 Some(tree::reservation_amount_from_layer_size(total_layer_size))
759 );
760
761 store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
763
764 ObjectStore::open_object(&store, foo.object_id(), HandleOptions::default(), None)
765 .await
766 .expect("open_object failed");
767
768 ObjectStore::open_object(&store, bar.object_id(), HandleOptions::default(), None)
769 .await
770 .expect("open_object failed");
771
772 fs.close().await.expect("close failed");
773 }
774}
775
776impl tree::MajorCompactable<ObjectKey, ObjectValue> for LSMTree<ObjectKey, ObjectValue> {
777 async fn major_iter(
778 iter: impl LayerIterator<ObjectKey, ObjectValue>,
779 ) -> Result<impl LayerIterator<ObjectKey, ObjectValue>, Error> {
780 iter.filter(|item: ItemRef<'_, _, _>| match item {
781 ItemRef { value: ObjectValue::None, .. } => false,
783 ItemRef { value: ObjectValue::Extent(ExtentValue::None), .. } => false,
785 _ => true,
786 })
787 .await
788 }
789}