1use crate::checksum::Checksum;
6use crate::debug_assert_not_too_long;
7use crate::filesystem::{FxFilesystem, TxnGuard};
8use crate::log::*;
9use crate::lsm_tree::types::Item;
10use crate::object_handle::INVALID_OBJECT_ID;
11use crate::object_store::allocator::{AllocatorItem, Reservation};
12use crate::object_store::object_manager::{reserved_space_from_journal_usage, ObjectManager};
13use crate::object_store::object_record::{
14 ObjectItem, ObjectItemV40, ObjectItemV41, ObjectItemV43, ObjectKey, ObjectKeyData, ObjectValue,
15 ProjectProperty,
16};
17use crate::serialized_types::{migrate_nodefault, migrate_to_version, Migrate, Versioned};
18use anyhow::Error;
19use either::{Either, Left, Right};
20use fprint::TypeFingerprint;
21use fuchsia_sync::Mutex;
22use futures::future::poll_fn;
23use futures::pin_mut;
24use fxfs_crypto::{WrappedKey, WrappedKeyV32, WrappedKeyV40};
25use rustc_hash::FxHashMap as HashMap;
26use scopeguard::ScopeGuard;
27use serde::{Deserialize, Serialize};
28use std::cell::UnsafeCell;
29use std::cmp::Ordering;
30use std::collections::hash_map::Entry;
31use std::collections::BTreeSet;
32use std::marker::PhantomPinned;
33use std::ops::{Deref, DerefMut, Range};
34use std::sync::Arc;
35use std::task::{Poll, Waker};
36use std::{fmt, mem};
37
38#[derive(Clone, Copy, Default)]
42pub struct Options<'a> {
43 pub skip_journal_checks: bool,
46
47 pub borrow_metadata_space: bool,
52
53 pub allocator_reservation: Option<&'a Reservation>,
58
59 pub txn_guard: Option<&'a TxnGuard<'a>>,
61}
62
63pub const TRANSACTION_MAX_JOURNAL_USAGE: u64 = 24_576;
70pub const TRANSACTION_METADATA_MAX_AMOUNT: u64 =
71 reserved_space_from_journal_usage(TRANSACTION_MAX_JOURNAL_USAGE);
72
73#[must_use]
74pub struct TransactionLocks<'a>(pub WriteGuard<'a>);
75
76pub type Mutation = MutationV43;
81
82#[derive(
83 Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned,
84)]
85#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
86pub enum MutationV43 {
87 ObjectStore(ObjectStoreMutationV43),
88 EncryptedObjectStore(Box<[u8]>),
89 Allocator(AllocatorMutationV32),
90 BeginFlush,
92 EndFlush,
95 DeleteVolume,
97 UpdateBorrowed(u64),
98 UpdateMutationsKey(UpdateMutationsKeyV40),
99 CreateInternalDir(u64),
100}
101
102#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
103#[migrate_to_version(MutationV43)]
104pub enum MutationV41 {
105 ObjectStore(ObjectStoreMutationV41),
106 EncryptedObjectStore(Box<[u8]>),
107 Allocator(AllocatorMutationV32),
108 BeginFlush,
109 EndFlush,
110 DeleteVolume,
111 UpdateBorrowed(u64),
112 UpdateMutationsKey(UpdateMutationsKeyV40),
113 CreateInternalDir(u64),
114}
115
116#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
117#[migrate_to_version(MutationV41)]
118pub enum MutationV40 {
119 ObjectStore(ObjectStoreMutationV40),
120 EncryptedObjectStore(Box<[u8]>),
121 Allocator(AllocatorMutationV32),
122 BeginFlush,
124 EndFlush,
127 DeleteVolume,
129 UpdateBorrowed(u64),
130 UpdateMutationsKey(UpdateMutationsKeyV40),
131 CreateInternalDir(u64),
132}
133
134impl Mutation {
135 pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
136 Mutation::ObjectStore(ObjectStoreMutation {
137 item: Item::new(key, value),
138 op: Operation::Insert,
139 })
140 }
141
142 pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
143 Mutation::ObjectStore(ObjectStoreMutation {
144 item: Item::new(key, value),
145 op: Operation::ReplaceOrInsert,
146 })
147 }
148
149 pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
150 Mutation::ObjectStore(ObjectStoreMutation {
151 item: Item::new(key, value),
152 op: Operation::Merge,
153 })
154 }
155
156 pub fn update_mutations_key(key: WrappedKey) -> Self {
157 Mutation::UpdateMutationsKey(key.into())
158 }
159}
160
161pub type ObjectStoreMutation = ObjectStoreMutationV43;
165
166#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
167#[migrate_nodefault]
168#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
169pub struct ObjectStoreMutationV43 {
170 pub item: ObjectItemV43,
171 pub op: OperationV32,
172}
173
174#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
175#[migrate_to_version(ObjectStoreMutationV43)]
176#[migrate_nodefault]
177pub struct ObjectStoreMutationV41 {
178 pub item: ObjectItemV41,
179 pub op: OperationV32,
180}
181
182#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
183#[migrate_nodefault]
184#[migrate_to_version(ObjectStoreMutationV41)]
185pub struct ObjectStoreMutationV40 {
186 pub item: ObjectItemV40,
187 pub op: OperationV32,
188}
189
190pub type Operation = OperationV32;
192
193#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
194#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
195pub enum OperationV32 {
196 Insert,
197 ReplaceOrInsert,
198 Merge,
199}
200
201impl Ord for ObjectStoreMutation {
202 fn cmp(&self, other: &Self) -> Ordering {
203 self.item.key.cmp(&other.item.key)
204 }
205}
206
207impl PartialOrd for ObjectStoreMutation {
208 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
209 Some(self.cmp(other))
210 }
211}
212
213impl PartialEq for ObjectStoreMutation {
214 fn eq(&self, other: &Self) -> bool {
215 self.item.key.eq(&other.item.key)
216 }
217}
218
219impl Eq for ObjectStoreMutation {}
220
221impl Ord for AllocatorItem {
222 fn cmp(&self, other: &Self) -> Ordering {
223 self.key.cmp(&other.key)
224 }
225}
226
227impl PartialOrd for AllocatorItem {
228 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
229 Some(self.cmp(other))
230 }
231}
232
233pub type DeviceRange = DeviceRangeV32;
236
237#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypeFingerprint)]
238#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
239pub struct DeviceRangeV32(pub Range<u64>);
240
241impl Deref for DeviceRange {
242 type Target = Range<u64>;
243
244 fn deref(&self) -> &Self::Target {
245 &self.0
246 }
247}
248
249impl DerefMut for DeviceRange {
250 fn deref_mut(&mut self) -> &mut Self::Target {
251 &mut self.0
252 }
253}
254
255impl From<Range<u64>> for DeviceRange {
256 fn from(range: Range<u64>) -> Self {
257 Self(range)
258 }
259}
260
261impl Into<Range<u64>> for DeviceRange {
262 fn into(self) -> Range<u64> {
263 self.0
264 }
265}
266
267impl Ord for DeviceRange {
268 fn cmp(&self, other: &Self) -> Ordering {
269 self.start.cmp(&other.start).then(self.end.cmp(&other.end))
270 }
271}
272
273impl PartialOrd for DeviceRange {
274 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
275 Some(self.cmp(other))
276 }
277}
278
279pub type AllocatorMutation = AllocatorMutationV32;
280
281#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint)]
282#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
283pub enum AllocatorMutationV32 {
284 Allocate {
285 device_range: DeviceRangeV32,
286 owner_object_id: u64,
287 },
288 Deallocate {
289 device_range: DeviceRangeV32,
290 owner_object_id: u64,
291 },
292 SetLimit {
293 owner_object_id: u64,
294 bytes: u64,
295 },
296 MarkForDeletion(u64),
302}
303
304pub type UpdateMutationsKey = UpdateMutationsKeyV40;
305
306#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
307pub struct UpdateMutationsKeyV40(pub WrappedKeyV40);
308
309#[derive(Serialize, Deserialize, TypeFingerprint)]
310pub struct UpdateMutationsKeyV32(pub WrappedKeyV32);
311
312impl From<UpdateMutationsKeyV32> for UpdateMutationsKeyV40 {
313 fn from(value: UpdateMutationsKeyV32) -> Self {
314 Self(value.0.into())
315 }
316}
317
318impl From<UpdateMutationsKey> for WrappedKey {
319 fn from(outer: UpdateMutationsKey) -> Self {
320 outer.0
321 }
322}
323
324impl From<WrappedKey> for UpdateMutationsKey {
325 fn from(inner: WrappedKey) -> Self {
326 Self(inner)
327 }
328}
329
330#[cfg(fuzz)]
331impl<'a> arbitrary::Arbitrary<'a> for UpdateMutationsKey {
332 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
333 <u128>::arbitrary(u).map(|wrapping_key_id| {
334 UpdateMutationsKey::from(WrappedKey {
335 wrapping_key_id,
336 key: fxfs_crypto::WrappedKeyBytes::default(),
338 })
339 })
340 }
341}
342
343impl Ord for UpdateMutationsKey {
344 fn cmp(&self, other: &Self) -> Ordering {
345 (self as *const UpdateMutationsKey).cmp(&(other as *const _))
346 }
347}
348
349impl PartialOrd for UpdateMutationsKey {
350 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
351 Some(self.cmp(other))
352 }
353}
354
355impl Eq for UpdateMutationsKey {}
356
357impl PartialEq for UpdateMutationsKey {
358 fn eq(&self, other: &Self) -> bool {
359 std::ptr::eq(self, other)
360 }
361}
362
363#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Copy)]
373pub enum LockKey {
374 Filesystem,
376
377 Flush {
379 object_id: u64,
380 },
381
382 ObjectAttribute {
384 store_object_id: u64,
385 object_id: u64,
386 attribute_id: u64,
387 },
388
389 Object {
391 store_object_id: u64,
392 object_id: u64,
393 },
394
395 ProjectId {
396 store_object_id: u64,
397 project_id: u64,
398 },
399
400 Truncate {
402 store_object_id: u64,
403 object_id: u64,
404 },
405}
406
407impl LockKey {
408 pub const fn object_attribute(store_object_id: u64, object_id: u64, attribute_id: u64) -> Self {
409 LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
410 }
411
412 pub const fn object(store_object_id: u64, object_id: u64) -> Self {
413 LockKey::Object { store_object_id, object_id }
414 }
415
416 pub const fn flush(object_id: u64) -> Self {
417 LockKey::Flush { object_id }
418 }
419
420 pub const fn truncate(store_object_id: u64, object_id: u64) -> Self {
421 LockKey::Truncate { store_object_id, object_id }
422 }
423}
424
425#[derive(Clone, Debug)]
427pub enum LockKeys {
428 None,
429 Inline(LockKey),
430 Vec(Vec<LockKey>),
431}
432
433impl LockKeys {
434 pub fn with_capacity(capacity: usize) -> Self {
435 if capacity > 1 {
436 LockKeys::Vec(Vec::with_capacity(capacity))
437 } else {
438 LockKeys::None
439 }
440 }
441
442 pub fn push(&mut self, key: LockKey) {
443 match self {
444 Self::None => *self = LockKeys::Inline(key),
445 Self::Inline(inline) => {
446 *self = LockKeys::Vec(vec![*inline, key]);
447 }
448 Self::Vec(vec) => vec.push(key),
449 }
450 }
451
452 pub fn truncate(&mut self, len: usize) {
453 match self {
454 Self::None => {}
455 Self::Inline(_) => {
456 if len == 0 {
457 *self = Self::None;
458 }
459 }
460 Self::Vec(vec) => vec.truncate(len),
461 }
462 }
463
464 fn len(&self) -> usize {
465 match self {
466 Self::None => 0,
467 Self::Inline(_) => 1,
468 Self::Vec(vec) => vec.len(),
469 }
470 }
471
472 fn contains(&self, key: &LockKey) -> bool {
473 match self {
474 Self::None => false,
475 Self::Inline(single) => single == key,
476 Self::Vec(vec) => vec.contains(key),
477 }
478 }
479
480 fn sort_unstable(&mut self) {
481 match self {
482 Self::Vec(vec) => vec.sort_unstable(),
483 _ => {}
484 }
485 }
486
487 fn dedup(&mut self) {
488 match self {
489 Self::Vec(vec) => vec.dedup(),
490 _ => {}
491 }
492 }
493
494 fn iter(&self) -> LockKeysIter<'_> {
495 match self {
496 LockKeys::None => LockKeysIter::None,
497 LockKeys::Inline(key) => LockKeysIter::Inline(key),
498 LockKeys::Vec(keys) => LockKeysIter::Vec(keys.into_iter()),
499 }
500 }
501}
502
503enum LockKeysIter<'a> {
504 None,
505 Inline(&'a LockKey),
506 Vec(<&'a Vec<LockKey> as IntoIterator>::IntoIter),
507}
508
509impl<'a> Iterator for LockKeysIter<'a> {
510 type Item = &'a LockKey;
511 fn next(&mut self) -> Option<Self::Item> {
512 match self {
513 Self::None => None,
514 Self::Inline(inline) => {
515 let next = *inline;
516 *self = Self::None;
517 Some(next)
518 }
519 Self::Vec(vec) => vec.next(),
520 }
521 }
522}
523
524impl Default for LockKeys {
525 fn default() -> Self {
526 LockKeys::None
527 }
528}
529
530#[macro_export]
531macro_rules! lock_keys {
532 () => {
533 $crate::object_store::transaction::LockKeys::None
534 };
535 ($lock_key:expr $(,)?) => {
536 $crate::object_store::transaction::LockKeys::Inline($lock_key)
537 };
538 ($($lock_keys:expr),+ $(,)?) => {
539 $crate::object_store::transaction::LockKeys::Vec(vec![$($lock_keys),+])
540 };
541}
542pub use lock_keys;
543
544pub trait AssociatedObject: Send + Sync {
548 fn will_apply_mutation(&self, _mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
549 }
550}
551
552pub enum AssocObj<'a> {
553 None,
554 Borrowed(&'a (dyn AssociatedObject)),
555 Owned(Box<dyn AssociatedObject>),
556}
557
558impl AssocObj<'_> {
559 pub fn map<R, F: FnOnce(&dyn AssociatedObject) -> R>(&self, f: F) -> Option<R> {
560 match self {
561 AssocObj::None => None,
562 AssocObj::Borrowed(ref b) => Some(f(*b)),
563 AssocObj::Owned(ref o) => Some(f(o.as_ref())),
564 }
565 }
566}
567
568pub struct TxnMutation<'a> {
569 pub object_id: u64,
573
574 pub mutation: Mutation,
576
577 pub associated_object: AssocObj<'a>,
580}
581
582impl Ord for TxnMutation<'_> {
585 fn cmp(&self, other: &Self) -> Ordering {
586 self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
587 }
588}
589
590impl PartialOrd for TxnMutation<'_> {
591 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
592 Some(self.cmp(other))
593 }
594}
595
596impl PartialEq for TxnMutation<'_> {
597 fn eq(&self, other: &Self) -> bool {
598 self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
599 }
600}
601
602impl Eq for TxnMutation<'_> {}
603
604impl std::fmt::Debug for TxnMutation<'_> {
605 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
606 f.debug_struct("TxnMutation")
607 .field("object_id", &self.object_id)
608 .field("mutation", &self.mutation)
609 .finish()
610 }
611}
612
613pub enum MetadataReservation {
614 None,
616
617 Borrowed,
620
621 Reservation(Reservation),
623
624 Hold(u64),
626}
627
628pub struct Transaction<'a> {
630 txn_guard: TxnGuard<'a>,
631
632 mutations: BTreeSet<TxnMutation<'a>>,
634
635 txn_locks: LockKeys,
637
638 pub allocator_reservation: Option<&'a Reservation>,
640
641 pub metadata_reservation: MetadataReservation,
643
644 new_objects: BTreeSet<(u64, u64)>,
647
648 checksums: Vec<(Range<u64>, Vec<Checksum>, bool)>,
650}
651
652impl<'a> Transaction<'a> {
653 pub async fn new(
656 txn_guard: TxnGuard<'a>,
657 options: Options<'a>,
658 txn_locks: LockKeys,
659 ) -> Result<Transaction<'a>, Error> {
660 txn_guard.fs().add_transaction(options.skip_journal_checks).await;
661 let fs = txn_guard.fs().clone();
662 let guard = scopeguard::guard((), |_| fs.sub_transaction());
663 let (metadata_reservation, allocator_reservation, hold) =
664 txn_guard.fs().reservation_for_transaction(options).await?;
665
666 let txn_locks = {
667 let lock_manager = txn_guard.fs().lock_manager();
668 let mut write_guard = lock_manager.txn_lock(txn_locks).await;
669 std::mem::take(&mut write_guard.0.lock_keys)
670 };
671 let mut transaction = Transaction {
672 txn_guard,
673 mutations: BTreeSet::new(),
674 txn_locks,
675 allocator_reservation: None,
676 metadata_reservation,
677 new_objects: BTreeSet::new(),
678 checksums: Vec::new(),
679 };
680
681 ScopeGuard::into_inner(guard);
682 hold.map(|h| h.forget()); transaction.allocator_reservation = allocator_reservation;
684 Ok(transaction)
685 }
686
687 pub fn txn_guard(&self) -> &TxnGuard<'_> {
688 &self.txn_guard
689 }
690
691 pub fn mutations(&self) -> &BTreeSet<TxnMutation<'a>> {
692 &self.mutations
693 }
694
695 pub fn take_mutations(&mut self) -> BTreeSet<TxnMutation<'a>> {
696 self.new_objects.clear();
697 mem::take(&mut self.mutations)
698 }
699
700 pub fn add(&mut self, object_id: u64, mutation: Mutation) -> Option<Mutation> {
703 self.add_with_object(object_id, mutation, AssocObj::None)
704 }
705
706 pub fn remove(&mut self, object_id: u64, mutation: Mutation) {
708 let txn_mutation = TxnMutation { object_id, mutation, associated_object: AssocObj::None };
709 if self.mutations.remove(&txn_mutation) {
710 if let Mutation::ObjectStore(ObjectStoreMutation {
711 item:
712 ObjectItem {
713 key: ObjectKey { object_id: new_object_id, data: ObjectKeyData::Object },
714 ..
715 },
716 op: Operation::Insert,
717 }) = txn_mutation.mutation
718 {
719 self.new_objects.remove(&(object_id, new_object_id));
720 }
721 }
722 }
723
724 pub fn add_with_object(
727 &mut self,
728 object_id: u64,
729 mutation: Mutation,
730 associated_object: AssocObj<'a>,
731 ) -> Option<Mutation> {
732 assert!(object_id != INVALID_OBJECT_ID);
733 let txn_mutation = TxnMutation { object_id, mutation, associated_object };
734 self.verify_locks(&txn_mutation);
735 self.mutations.replace(txn_mutation).map(|m| m.mutation)
736 }
737
738 pub fn add_checksum(&mut self, range: Range<u64>, checksums: Vec<Checksum>, first_write: bool) {
739 self.checksums.push((range, checksums, first_write));
740 }
741
742 pub fn checksums(&self) -> &[(Range<u64>, Vec<Checksum>, bool)] {
743 &self.checksums
744 }
745
746 pub fn take_checksums(&mut self) -> Vec<(Range<u64>, Vec<Checksum>, bool)> {
747 std::mem::replace(&mut self.checksums, Vec::new())
748 }
749
750 fn verify_locks(&mut self, mutation: &TxnMutation<'_>) {
751 if let TxnMutation {
755 mutation:
756 Mutation::ObjectStore { 0: ObjectStoreMutation { item: ObjectItem { key, .. }, op } },
757 object_id: store_object_id,
758 ..
759 } = mutation
760 {
761 match &key.data {
762 ObjectKeyData::Attribute(..) => {
763 }
765 ObjectKeyData::Child { .. }
766 | ObjectKeyData::EncryptedChild { .. }
767 | ObjectKeyData::CasefoldChild { .. } => {
768 let id = key.object_id;
769 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
770 && !self.new_objects.contains(&(*store_object_id, id))
771 {
772 debug_assert!(
773 false,
774 "Not holding required lock for object {id} \
775 in store {store_object_id}"
776 );
777 error!(
778 "Not holding required lock for object {id} in store \
779 {store_object_id}"
780 )
781 }
782 }
783 ObjectKeyData::GraveyardEntry { .. } => {
784 }
786 ObjectKeyData::GraveyardAttributeEntry { .. } => {
787 }
789 ObjectKeyData::Keys => {
790 let id = key.object_id;
791 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
792 && !self.new_objects.contains(&(*store_object_id, id))
793 {
794 debug_assert!(
795 false,
796 "Not holding required lock for object {id} \
797 in store {store_object_id}"
798 );
799 error!(
800 "Not holding required lock for object {id} in store \
801 {store_object_id}"
802 )
803 }
804 }
805 ObjectKeyData::Object => match op {
806 Operation::Insert => {
808 self.new_objects.insert((*store_object_id, key.object_id));
809 }
810 Operation::Merge | Operation::ReplaceOrInsert => {
811 let id = key.object_id;
812 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
813 && !self.new_objects.contains(&(*store_object_id, id))
814 {
815 debug_assert!(
816 false,
817 "Not holding required lock for object {id} \
818 in store {store_object_id}"
819 );
820 error!(
821 "Not holding required lock for object {id} in store \
822 {store_object_id}"
823 )
824 }
825 }
826 },
827 ObjectKeyData::Project { project_id, property: ProjectProperty::Limit } => {
828 if !self.txn_locks.contains(&LockKey::ProjectId {
829 store_object_id: *store_object_id,
830 project_id: *project_id,
831 }) {
832 debug_assert!(
833 false,
834 "Not holding required lock for project limit id {project_id} \
835 in store {store_object_id}"
836 );
837 error!(
838 "Not holding required lock for project limit id {project_id} in \
839 store {store_object_id}"
840 )
841 }
842 }
843 ObjectKeyData::Project { property: ProjectProperty::Usage, .. } => match op {
844 Operation::Insert | Operation::ReplaceOrInsert => {
845 panic!(
846 "Project usage is all handled by merging deltas, no inserts or \
847 replacements should be used"
848 );
849 }
850 Operation::Merge => {}
852 },
853 ObjectKeyData::ExtendedAttribute { .. } => {
854 let id = key.object_id;
855 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
856 && !self.new_objects.contains(&(*store_object_id, id))
857 {
858 debug_assert!(
859 false,
860 "Not holding required lock for object {id} \
861 in store {store_object_id} while mutating extended attribute"
862 );
863 error!(
864 "Not holding required lock for object {id} in store \
865 {store_object_id} while mutating extended attribute"
866 )
867 }
868 }
869 }
870 }
871 }
872
873 pub fn is_empty(&self) -> bool {
875 self.mutations.is_empty()
876 }
877
878 pub fn get_object_mutation(
881 &self,
882 store_object_id: u64,
883 key: ObjectKey,
884 ) -> Option<&ObjectStoreMutation> {
885 if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
886 self.mutations.get(&TxnMutation {
887 object_id: store_object_id,
888 mutation: Mutation::insert_object(key, ObjectValue::None),
889 associated_object: AssocObj::None,
890 })
891 {
892 Some(mutation)
893 } else {
894 None
895 }
896 }
897
898 pub async fn commit(mut self) -> Result<u64, Error> {
900 debug!(txn:? = &self; "Commit");
901 self.txn_guard.fs().clone().commit_transaction(&mut self, &mut |_| {}).await
902 }
903
904 pub async fn commit_with_callback<R: Send>(
907 mut self,
908 f: impl FnOnce(u64) -> R + Send,
909 ) -> Result<R, Error> {
910 debug!(txn:? = &self; "Commit");
911 let mut f = Some(f);
914 let mut result = None;
915 self.txn_guard
916 .fs()
917 .clone()
918 .commit_transaction(&mut self, &mut |offset| {
919 result = Some(f.take().unwrap()(offset));
920 })
921 .await?;
922 Ok(result.unwrap())
923 }
924
925 pub async fn commit_and_continue(&mut self) -> Result<(), Error> {
928 debug!(txn:? = self; "Commit");
929 self.txn_guard.fs().clone().commit_transaction(self, &mut |_| {}).await?;
930 assert!(self.mutations.is_empty());
931 self.txn_guard.fs().lock_manager().downgrade_locks(&self.txn_locks);
932 Ok(())
933 }
934}
935
936impl Drop for Transaction<'_> {
937 fn drop(&mut self) {
938 debug!(txn:? = &self; "Drop");
941 self.txn_guard.fs().clone().drop_transaction(self);
942 }
943}
944
945impl std::fmt::Debug for Transaction<'_> {
946 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
947 f.debug_struct("Transaction")
948 .field("mutations", &self.mutations)
949 .field("txn_locks", &self.txn_locks)
950 .field("reservation", &self.allocator_reservation)
951 .finish()
952 }
953}
954
955pub enum BorrowedOrOwned<'a, T> {
956 Borrowed(&'a T),
957 Owned(T),
958}
959
960impl<T> Deref for BorrowedOrOwned<'_, T> {
961 type Target = T;
962
963 fn deref(&self) -> &Self::Target {
964 match self {
965 BorrowedOrOwned::Borrowed(b) => b,
966 BorrowedOrOwned::Owned(o) => &o,
967 }
968 }
969}
970
971impl<'a, T> From<&'a T> for BorrowedOrOwned<'a, T> {
972 fn from(value: &'a T) -> Self {
973 BorrowedOrOwned::Borrowed(value)
974 }
975}
976
977impl<T> From<T> for BorrowedOrOwned<'_, T> {
978 fn from(value: T) -> Self {
979 BorrowedOrOwned::Owned(value)
980 }
981}
982
983pub struct LockManager {
1012 locks: Mutex<Locks>,
1013}
1014
1015struct Locks {
1016 keys: HashMap<LockKey, LockEntry>,
1017}
1018
1019impl Locks {
1020 fn drop_lock(&mut self, key: LockKey, state: LockState) {
1021 if let Entry::Occupied(mut occupied) = self.keys.entry(key) {
1022 let entry = occupied.get_mut();
1023 let wake = match state {
1024 LockState::ReadLock => {
1025 entry.read_count -= 1;
1026 entry.read_count == 0
1027 }
1028 LockState::Locked | LockState::WriteLock => {
1030 entry.state = LockState::ReadLock;
1031 true
1032 }
1033 };
1034 if wake {
1035 unsafe {
1037 entry.wake();
1038 }
1039 if entry.can_remove() {
1040 occupied.remove_entry();
1041 }
1042 }
1043 } else {
1044 unreachable!();
1045 }
1046 }
1047
1048 fn drop_read_locks(&mut self, lock_keys: LockKeys) {
1049 for lock in lock_keys.iter() {
1050 self.drop_lock(*lock, LockState::ReadLock);
1051 }
1052 }
1053
1054 fn drop_write_locks(&mut self, lock_keys: LockKeys) {
1055 for lock in lock_keys.iter() {
1056 self.drop_lock(*lock, LockState::WriteLock);
1059 }
1060 }
1061
1062 fn downgrade_locks(&mut self, lock_keys: &LockKeys) {
1064 for lock in lock_keys.iter() {
1065 unsafe {
1067 self.keys.get_mut(lock).unwrap().downgrade_lock();
1068 }
1069 }
1070 }
1071}
1072
1073#[derive(Debug)]
1074struct LockEntry {
1075 read_count: u64,
1078
1079 state: LockState,
1081
1082 head: *const LockWaker,
1087 tail: *const LockWaker,
1088}
1089
1090unsafe impl Send for LockEntry {}
1091
1092struct LockWaker {
1095 next: UnsafeCell<*const LockWaker>,
1097 prev: UnsafeCell<*const LockWaker>,
1098
1099 key: LockKey,
1102
1103 waker: UnsafeCell<WakerState>,
1105
1106 target_state: LockState,
1108
1109 is_upgrade: bool,
1111
1112 _pin: PhantomPinned,
1114}
1115
1116enum WakerState {
1117 Pending,
1119
1120 Registered(Waker),
1122
1123 Woken,
1125}
1126
1127impl WakerState {
1128 fn is_woken(&self) -> bool {
1129 matches!(self, WakerState::Woken)
1130 }
1131}
1132
1133unsafe impl Send for LockWaker {}
1134unsafe impl Sync for LockWaker {}
1135
1136impl LockWaker {
1137 async fn wait(&self, manager: &LockManager) {
1139 let waker_guard = scopeguard::guard((), |_| {
1141 let mut locks = manager.locks.lock();
1142 unsafe {
1144 if (*self.waker.get()).is_woken() {
1145 if self.is_upgrade {
1147 locks.keys.get_mut(&self.key).unwrap().downgrade_lock();
1148 } else {
1149 locks.drop_lock(self.key, self.target_state);
1150 }
1151 } else {
1152 locks.keys.get_mut(&self.key).unwrap().remove_waker(self);
1155 }
1156 }
1157 });
1158
1159 poll_fn(|cx| {
1160 let _locks = manager.locks.lock();
1161 unsafe {
1163 if (*self.waker.get()).is_woken() {
1164 Poll::Ready(())
1165 } else {
1166 *self.waker.get() = WakerState::Registered(cx.waker().clone());
1167 Poll::Pending
1168 }
1169 }
1170 })
1171 .await;
1172
1173 ScopeGuard::into_inner(waker_guard);
1174 }
1175}
1176
1177#[derive(Copy, Clone, Debug, PartialEq)]
1178enum LockState {
1179 ReadLock,
1181
1182 Locked,
1185
1186 WriteLock,
1188}
1189
1190impl LockManager {
1191 pub fn new() -> Self {
1192 LockManager { locks: Mutex::new(Locks { keys: HashMap::default() }) }
1193 }
1194
1195 pub async fn txn_lock<'a>(&'a self, lock_keys: LockKeys) -> TransactionLocks<'a> {
1199 TransactionLocks(
1200 debug_assert_not_too_long!(self.lock(lock_keys, LockState::Locked)).right().unwrap(),
1201 )
1202 }
1203
1204 async fn lock<'a>(
1207 &'a self,
1208 mut lock_keys: LockKeys,
1209 target_state: LockState,
1210 ) -> Either<ReadGuard<'a>, WriteGuard<'a>> {
1211 let mut guard = match &target_state {
1212 LockState::ReadLock => Left(ReadGuard {
1213 manager: self.into(),
1214 lock_keys: LockKeys::with_capacity(lock_keys.len()),
1215 }),
1216 LockState::Locked | LockState::WriteLock => Right(WriteGuard {
1217 manager: self.into(),
1218 lock_keys: LockKeys::with_capacity(lock_keys.len()),
1219 }),
1220 };
1221 let guard_keys = match &mut guard {
1222 Left(g) => &mut g.lock_keys,
1223 Right(g) => &mut g.lock_keys,
1224 };
1225 lock_keys.sort_unstable();
1226 lock_keys.dedup();
1227 for lock in lock_keys.iter() {
1228 let lock_waker = None;
1229 pin_mut!(lock_waker);
1230 {
1231 let mut locks = self.locks.lock();
1232 match locks.keys.entry(*lock) {
1233 Entry::Vacant(vacant) => {
1234 vacant.insert(LockEntry {
1235 read_count: if let LockState::ReadLock = target_state {
1236 guard_keys.push(*lock);
1237 1
1238 } else {
1239 guard_keys.push(*lock);
1240 0
1241 },
1242 state: target_state,
1243 head: std::ptr::null(),
1244 tail: std::ptr::null(),
1245 });
1246 }
1247 Entry::Occupied(mut occupied) => {
1248 let entry = occupied.get_mut();
1249 if unsafe { entry.is_allowed(target_state, entry.head.is_null()) } {
1251 if let LockState::ReadLock = target_state {
1252 entry.read_count += 1;
1253 guard_keys.push(*lock);
1254 } else {
1255 entry.state = target_state;
1256 guard_keys.push(*lock);
1257 }
1258 } else {
1259 unsafe {
1262 *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1263 next: UnsafeCell::new(std::ptr::null()),
1264 prev: UnsafeCell::new(entry.tail),
1265 key: *lock,
1266 waker: UnsafeCell::new(WakerState::Pending),
1267 target_state: target_state,
1268 is_upgrade: false,
1269 _pin: PhantomPinned,
1270 });
1271 }
1272 let waker = (*lock_waker).as_ref().unwrap();
1273 if entry.tail.is_null() {
1274 entry.head = waker;
1275 } else {
1276 unsafe {
1278 *(*entry.tail).next.get() = waker;
1279 }
1280 }
1281 entry.tail = waker;
1282 }
1283 }
1284 }
1285 }
1286 if let Some(waker) = &*lock_waker {
1287 waker.wait(self).await;
1288 guard_keys.push(*lock);
1289 }
1290 }
1291 guard
1292 }
1293
1294 pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
1296 let mut locks = self.locks.lock();
1297 locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks));
1298 }
1299
1300 pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
1302 self.commit_prepare_keys(&transaction.txn_locks).await;
1303 }
1304
1305 async fn commit_prepare_keys(&self, lock_keys: &LockKeys) {
1306 for lock in lock_keys.iter() {
1307 let lock_waker = None;
1308 pin_mut!(lock_waker);
1309 {
1310 let mut locks = self.locks.lock();
1311 let entry = locks.keys.get_mut(lock).unwrap();
1312 assert_eq!(entry.state, LockState::Locked);
1313
1314 if entry.read_count == 0 {
1315 entry.state = LockState::WriteLock;
1316 } else {
1317 unsafe {
1320 *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1321 next: UnsafeCell::new(entry.head),
1322 prev: UnsafeCell::new(std::ptr::null()),
1323 key: *lock,
1324 waker: UnsafeCell::new(WakerState::Pending),
1325 target_state: LockState::WriteLock,
1326 is_upgrade: true,
1327 _pin: PhantomPinned,
1328 });
1329 }
1330 let waker = (*lock_waker).as_ref().unwrap();
1331 if entry.head.is_null() {
1332 entry.tail = (*lock_waker).as_ref().unwrap();
1333 } else {
1334 unsafe {
1336 *(*entry.head).prev.get() = waker;
1337 }
1338 }
1339 entry.head = waker;
1340 }
1341 }
1342
1343 if let Some(waker) = &*lock_waker {
1344 waker.wait(self).await;
1345 }
1346 }
1347 }
1348
1349 pub async fn read_lock<'a>(&'a self, lock_keys: LockKeys) -> ReadGuard<'a> {
1355 debug_assert_not_too_long!(self.lock(lock_keys, LockState::ReadLock)).left().unwrap()
1356 }
1357
1358 pub async fn write_lock<'a>(&'a self, lock_keys: LockKeys) -> WriteGuard<'a> {
1361 debug_assert_not_too_long!(self.lock(lock_keys, LockState::WriteLock)).right().unwrap()
1362 }
1363
1364 pub fn downgrade_locks(&self, lock_keys: &LockKeys) {
1367 self.locks.lock().downgrade_locks(lock_keys);
1368 }
1369}
1370
1371impl LockEntry {
1373 unsafe fn wake(&mut self) {
1374 if self.head.is_null() || self.state == LockState::WriteLock {
1376 return;
1377 }
1378
1379 let waker = &*self.head;
1380
1381 if waker.is_upgrade {
1382 if self.read_count > 0 {
1383 return;
1384 }
1385 } else if !self.is_allowed(waker.target_state, true) {
1386 return;
1387 }
1388
1389 self.pop_and_wake();
1390
1391 if waker.target_state == LockState::WriteLock {
1394 return;
1395 }
1396
1397 while !self.head.is_null() && (*self.head).target_state == LockState::ReadLock {
1398 self.pop_and_wake();
1399 }
1400 }
1401
1402 unsafe fn pop_and_wake(&mut self) {
1403 let waker = &*self.head;
1404
1405 self.head = *waker.next.get();
1407 if self.head.is_null() {
1408 self.tail = std::ptr::null()
1409 } else {
1410 *(*self.head).prev.get() = std::ptr::null();
1411 }
1412
1413 if waker.target_state == LockState::ReadLock {
1415 self.read_count += 1;
1416 } else {
1417 self.state = waker.target_state;
1418 }
1419
1420 if let WakerState::Registered(waker) =
1422 std::mem::replace(&mut *waker.waker.get(), WakerState::Woken)
1423 {
1424 waker.wake();
1425 }
1426 }
1427
1428 fn can_remove(&self) -> bool {
1429 self.state == LockState::ReadLock && self.read_count == 0
1430 }
1431
1432 unsafe fn remove_waker(&mut self, waker: &LockWaker) {
1433 let is_first = (*waker.prev.get()).is_null();
1434 if is_first {
1435 self.head = *waker.next.get();
1436 } else {
1437 *(**waker.prev.get()).next.get() = *waker.next.get();
1438 }
1439 if (*waker.next.get()).is_null() {
1440 self.tail = *waker.prev.get();
1441 } else {
1442 *(**waker.next.get()).prev.get() = *waker.prev.get();
1443 }
1444 if is_first {
1445 self.wake();
1447 }
1448 }
1449
1450 unsafe fn is_allowed(&self, target_state: LockState, is_head: bool) -> bool {
1454 match self.state {
1455 LockState::ReadLock => {
1456 (self.read_count == 0
1458 || target_state == LockState::Locked
1459 || target_state == LockState::ReadLock)
1460 && is_head
1461 }
1462 LockState::Locked => {
1463 target_state == LockState::ReadLock && (is_head || !(*self.head).is_upgrade)
1467 }
1468 LockState::WriteLock => false,
1469 }
1470 }
1471
1472 unsafe fn downgrade_lock(&mut self) {
1473 assert_eq!(std::mem::replace(&mut self.state, LockState::Locked), LockState::WriteLock);
1474 self.wake();
1475 }
1476}
1477
1478#[must_use]
1479pub struct ReadGuard<'a> {
1480 manager: LockManagerRef<'a>,
1481 lock_keys: LockKeys,
1482}
1483
1484impl ReadGuard<'_> {
1485 pub fn fs(&self) -> Option<&Arc<FxFilesystem>> {
1486 if let LockManagerRef::Owned(fs) = &self.manager {
1487 Some(fs)
1488 } else {
1489 None
1490 }
1491 }
1492
1493 pub fn into_owned(mut self, fs: Arc<FxFilesystem>) -> ReadGuard<'static> {
1494 ReadGuard {
1495 manager: LockManagerRef::Owned(fs),
1496 lock_keys: std::mem::replace(&mut self.lock_keys, LockKeys::None),
1497 }
1498 }
1499}
1500
1501impl Drop for ReadGuard<'_> {
1502 fn drop(&mut self) {
1503 let mut locks = self.manager.locks.lock();
1504 locks.drop_read_locks(std::mem::take(&mut self.lock_keys));
1505 }
1506}
1507
1508impl fmt::Debug for ReadGuard<'_> {
1509 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1510 f.debug_struct("ReadGuard")
1511 .field("manager", &(&self.manager as *const _))
1512 .field("lock_keys", &self.lock_keys)
1513 .finish()
1514 }
1515}
1516
1517#[must_use]
1518pub struct WriteGuard<'a> {
1519 manager: LockManagerRef<'a>,
1520 lock_keys: LockKeys,
1521}
1522
1523impl Drop for WriteGuard<'_> {
1524 fn drop(&mut self) {
1525 let mut locks = self.manager.locks.lock();
1526 locks.drop_write_locks(std::mem::take(&mut self.lock_keys));
1527 }
1528}
1529
1530impl fmt::Debug for WriteGuard<'_> {
1531 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1532 f.debug_struct("WriteGuard")
1533 .field("manager", &(&self.manager as *const _))
1534 .field("lock_keys", &self.lock_keys)
1535 .finish()
1536 }
1537}
1538
1539enum LockManagerRef<'a> {
1540 Borrowed(&'a LockManager),
1541 Owned(Arc<FxFilesystem>),
1542}
1543
1544impl Deref for LockManagerRef<'_> {
1545 type Target = LockManager;
1546
1547 fn deref(&self) -> &Self::Target {
1548 match self {
1549 LockManagerRef::Borrowed(m) => m,
1550 LockManagerRef::Owned(f) => f.lock_manager(),
1551 }
1552 }
1553}
1554
1555impl<'a> From<&'a LockManager> for LockManagerRef<'a> {
1556 fn from(value: &'a LockManager) -> Self {
1557 LockManagerRef::Borrowed(value)
1558 }
1559}
1560
1561#[cfg(test)]
1562mod tests {
1563 use super::{LockKey, LockKeys, LockManager, LockState, Mutation, Options};
1564 use crate::filesystem::FxFilesystem;
1565 use fuchsia_async as fasync;
1566 use fuchsia_sync::Mutex;
1567 use futures::channel::oneshot::channel;
1568 use futures::future::FutureExt;
1569 use futures::stream::FuturesUnordered;
1570 use futures::{join, pin_mut, StreamExt};
1571 use std::task::Poll;
1572 use std::time::Duration;
1573 use storage_device::fake_device::FakeDevice;
1574 use storage_device::DeviceHolder;
1575
1576 #[fuchsia::test]
1577 async fn test_simple() {
1578 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1579 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1580 let mut t = fs
1581 .clone()
1582 .new_transaction(lock_keys![], Options::default())
1583 .await
1584 .expect("new_transaction failed");
1585 t.add(1, Mutation::BeginFlush);
1586 assert!(!t.is_empty());
1587 }
1588
1589 #[fuchsia::test]
1590 async fn test_locks() {
1591 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1592 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1593 let (send1, recv1) = channel();
1594 let (send2, recv2) = channel();
1595 let (send3, recv3) = channel();
1596 let done = Mutex::new(false);
1597 let mut futures = FuturesUnordered::new();
1598 futures.push(
1599 async {
1600 let _t = fs
1601 .clone()
1602 .new_transaction(
1603 lock_keys![LockKey::object_attribute(1, 2, 3)],
1604 Options::default(),
1605 )
1606 .await
1607 .expect("new_transaction failed");
1608 send1.send(()).unwrap(); send3.send(()).unwrap(); recv2.await.unwrap();
1611 fasync::Timer::new(Duration::from_millis(100)).await;
1613 assert!(!*done.lock());
1614 }
1615 .boxed(),
1616 );
1617 futures.push(
1618 async {
1619 recv1.await.unwrap();
1620 let _t = fs
1622 .clone()
1623 .new_transaction(
1624 lock_keys![LockKey::object_attribute(2, 2, 3)],
1625 Options::default(),
1626 )
1627 .await
1628 .expect("new_transaction failed");
1629 send2.send(()).unwrap();
1631 }
1632 .boxed(),
1633 );
1634 futures.push(
1635 async {
1636 recv3.await.unwrap();
1638 let _t = fs
1639 .clone()
1640 .new_transaction(
1641 lock_keys![LockKey::object_attribute(1, 2, 3)],
1642 Options::default(),
1643 )
1644 .await;
1645 *done.lock() = true;
1646 }
1647 .boxed(),
1648 );
1649 while let Some(()) = futures.next().await {}
1650 }
1651
1652 #[fuchsia::test]
1653 async fn test_read_lock_after_write_lock() {
1654 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1655 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1656 let (send1, recv1) = channel();
1657 let (send2, recv2) = channel();
1658 let done = Mutex::new(false);
1659 join!(
1660 async {
1661 let t = fs
1662 .clone()
1663 .new_transaction(
1664 lock_keys![LockKey::object_attribute(1, 2, 3)],
1665 Options::default(),
1666 )
1667 .await
1668 .expect("new_transaction failed");
1669 send1.send(()).unwrap(); recv2.await.unwrap();
1671 t.commit().await.expect("commit failed");
1672 *done.lock() = true;
1673 },
1674 async {
1675 recv1.await.unwrap();
1676 let _guard = fs
1678 .lock_manager()
1679 .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1680 .await;
1681 send2.send(()).unwrap();
1683 fasync::Timer::new(Duration::from_millis(100)).await;
1686 assert!(!*done.lock());
1687 },
1688 );
1689 }
1690
1691 #[fuchsia::test]
1692 async fn test_write_lock_after_read_lock() {
1693 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1694 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1695 let (send1, recv1) = channel();
1696 let (send2, recv2) = channel();
1697 let done = Mutex::new(false);
1698 join!(
1699 async {
1700 let _guard = fs
1702 .lock_manager()
1703 .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1704 .await;
1705 send1.send(()).unwrap();
1707 recv2.await.unwrap();
1708 fasync::Timer::new(Duration::from_millis(100)).await;
1711 assert!(!*done.lock());
1712 },
1713 async {
1714 recv1.await.unwrap();
1715 let t = fs
1716 .clone()
1717 .new_transaction(
1718 lock_keys![LockKey::object_attribute(1, 2, 3)],
1719 Options::default(),
1720 )
1721 .await
1722 .expect("new_transaction failed");
1723 send2.send(()).unwrap(); t.commit().await.expect("commit failed");
1725 *done.lock() = true;
1726 },
1727 );
1728 }
1729
1730 #[fuchsia::test]
1731 async fn test_drop_uncommitted_transaction() {
1732 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1733 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1734 let key = lock_keys![LockKey::object(1, 1)];
1735
1736 {
1738 let _write_lock = fs
1739 .clone()
1740 .new_transaction(key.clone(), Options::default())
1741 .await
1742 .expect("new_transaction failed");
1743 let _read_lock = fs.lock_manager().read_lock(key.clone()).await;
1744 }
1745 {
1747 let _write_lock = fs
1748 .clone()
1749 .new_transaction(key.clone(), Options::default())
1750 .await
1751 .expect("new_transaction failed");
1752 }
1753 fs.clone()
1755 .new_transaction(key.clone(), Options::default())
1756 .await
1757 .expect("new_transaction failed");
1758 }
1759
1760 #[fuchsia::test]
1761 async fn test_drop_waiting_write_lock() {
1762 let manager = LockManager::new();
1763 let keys = lock_keys![LockKey::object(1, 1)];
1764 {
1765 let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1766 if let Poll::Ready(_) =
1767 futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1768 {
1769 assert!(false);
1770 }
1771 }
1772 let _ = manager.lock(keys, LockState::WriteLock).await;
1773 }
1774
1775 #[fuchsia::test]
1776 async fn test_write_lock_blocks_everything() {
1777 let manager = LockManager::new();
1778 let keys = lock_keys![LockKey::object(1, 1)];
1779 {
1780 let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1781 if let Poll::Ready(_) =
1782 futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1783 {
1784 assert!(false);
1785 }
1786 if let Poll::Ready(_) =
1787 futures::poll!(manager.lock(keys.clone(), LockState::ReadLock).boxed())
1788 {
1789 assert!(false);
1790 }
1791 }
1792 {
1793 let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1794 }
1795 {
1796 let _guard = manager.lock(keys, LockState::ReadLock).await;
1797 }
1798 }
1799
1800 #[fuchsia::test]
1801 async fn test_downgrade_locks() {
1802 let manager = LockManager::new();
1803 let keys = lock_keys![LockKey::object(1, 1)];
1804 let _guard = manager.txn_lock(keys.clone()).await;
1805 manager.commit_prepare_keys(&keys).await;
1806
1807 let mut read_lock: FuturesUnordered<_> =
1809 std::iter::once(manager.read_lock(keys.clone())).collect();
1810
1811 assert!(futures::poll!(read_lock.next()).is_pending());
1813
1814 manager.downgrade_locks(&keys);
1815
1816 assert!(futures::poll!(read_lock.next()).is_ready());
1818 }
1819
1820 #[fuchsia::test]
1821 async fn test_dropped_write_lock_wakes() {
1822 let manager = LockManager::new();
1823 let keys = lock_keys![LockKey::object(1, 1)];
1824 let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1825 let mut read_lock = FuturesUnordered::new();
1826 read_lock.push(manager.lock(keys.clone(), LockState::ReadLock));
1827
1828 {
1829 let write_lock = manager.lock(keys, LockState::WriteLock);
1830 pin_mut!(write_lock);
1831
1832 assert!(futures::poll!(write_lock).is_pending());
1834
1835 assert!(futures::poll!(read_lock.next()).is_pending());
1837 }
1838
1839 assert!(futures::poll!(read_lock.next()).is_ready());
1841 }
1842
1843 #[fuchsia::test]
1844 async fn test_drop_upgrade() {
1845 let manager = LockManager::new();
1846 let keys = lock_keys![LockKey::object(1, 1)];
1847 let _guard = manager.lock(keys.clone(), LockState::Locked).await;
1848
1849 {
1850 let commit_prepare = manager.commit_prepare_keys(&keys);
1851 pin_mut!(commit_prepare);
1852 let _read_guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1853 assert!(futures::poll!(commit_prepare).is_pending());
1854
1855 }
1858
1859 manager.commit_prepare_keys(&keys).await;
1861 }
1862
1863 #[fasync::run_singlethreaded(test)]
1864 async fn test_woken_upgrade_blocks_reads() {
1865 let manager = LockManager::new();
1866 let keys = lock_keys![LockKey::object(1, 1)];
1867 let guard = manager.lock(keys.clone(), LockState::Locked).await;
1869
1870 let read1 = manager.lock(keys.clone(), LockState::ReadLock).await;
1872
1873 let commit_prepare = manager.commit_prepare_keys(&keys);
1875 pin_mut!(commit_prepare);
1876 assert!(futures::poll!(commit_prepare.as_mut()).is_pending());
1877
1878 let read2 = manager.lock(keys.clone(), LockState::ReadLock);
1880 pin_mut!(read2);
1881 assert!(futures::poll!(read2.as_mut()).is_pending());
1882
1883 std::mem::drop(read1);
1885 assert!(futures::poll!(commit_prepare).is_ready());
1886
1887 assert!(futures::poll!(read2.as_mut()).is_pending());
1889
1890 std::mem::drop(guard);
1892 assert!(futures::poll!(read2).is_ready());
1893 }
1894
1895 static LOCK_KEY_1: LockKey = LockKey::flush(1);
1896 static LOCK_KEY_2: LockKey = LockKey::flush(2);
1897 static LOCK_KEY_3: LockKey = LockKey::flush(3);
1898
1899 fn assert_lock_keys_equal(value: &LockKeys, expected: &LockKeys) {
1901 match (value, expected) {
1902 (LockKeys::None, LockKeys::None) => {}
1903 (LockKeys::Inline(key1), LockKeys::Inline(key2)) => {
1904 if key1 != key2 {
1905 panic!("{key1:?} != {key2:?}");
1906 }
1907 }
1908 (LockKeys::Vec(vec1), LockKeys::Vec(vec2)) => {
1909 if vec1 != vec2 {
1910 panic!("{vec1:?} != {vec2:?}");
1911 }
1912 if vec1.capacity() != vec2.capacity() {
1913 panic!(
1914 "LockKeys have different capacity: {} != {}",
1915 vec1.capacity(),
1916 vec2.capacity()
1917 );
1918 }
1919 }
1920 (_, _) => panic!("{value:?} != {expected:?}"),
1921 }
1922 }
1923
1924 fn assert_lock_keys_equivalent(value: &LockKeys, expected: &LockKeys) {
1926 let value: Vec<_> = value.iter().collect();
1927 let expected: Vec<_> = expected.iter().collect();
1928 assert_eq!(value, expected);
1929 }
1930
1931 #[test]
1932 fn test_lock_keys_macro() {
1933 assert_lock_keys_equal(&lock_keys![], &LockKeys::None);
1934 assert_lock_keys_equal(&lock_keys![LOCK_KEY_1], &LockKeys::Inline(LOCK_KEY_1));
1935 assert_lock_keys_equal(
1936 &lock_keys![LOCK_KEY_1, LOCK_KEY_2],
1937 &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]),
1938 );
1939 }
1940
1941 #[test]
1942 fn test_lock_keys_with_capacity() {
1943 assert_lock_keys_equal(&LockKeys::with_capacity(0), &LockKeys::None);
1944 assert_lock_keys_equal(&LockKeys::with_capacity(1), &LockKeys::None);
1945 assert_lock_keys_equal(&LockKeys::with_capacity(2), &LockKeys::Vec(Vec::with_capacity(2)));
1946 }
1947
1948 #[test]
1949 fn test_lock_keys_len() {
1950 assert_eq!(lock_keys![].len(), 0);
1951 assert_eq!(lock_keys![LOCK_KEY_1].len(), 1);
1952 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].len(), 2);
1953 }
1954
1955 #[test]
1956 fn test_lock_keys_contains() {
1957 assert_eq!(lock_keys![].contains(&LOCK_KEY_1), false);
1958 assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_1), true);
1959 assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_2), false);
1960 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_1), true);
1961 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_2), true);
1962 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_3), false);
1963 }
1964
1965 #[test]
1966 fn test_lock_keys_push() {
1967 let mut keys = lock_keys![];
1968 keys.push(LOCK_KEY_1);
1969 assert_lock_keys_equal(&keys, &LockKeys::Inline(LOCK_KEY_1));
1970 keys.push(LOCK_KEY_2);
1971 assert_lock_keys_equal(&keys, &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]));
1972 keys.push(LOCK_KEY_3);
1973 assert_lock_keys_equivalent(
1974 &keys,
1975 &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2, LOCK_KEY_3]),
1976 );
1977 }
1978
1979 #[test]
1980 fn test_lock_keys_sort_unstable() {
1981 let mut keys = lock_keys![];
1982 keys.sort_unstable();
1983 assert_lock_keys_equal(&keys, &lock_keys![]);
1984
1985 let mut keys = lock_keys![LOCK_KEY_1];
1986 keys.sort_unstable();
1987 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
1988
1989 let mut keys = lock_keys![LOCK_KEY_2, LOCK_KEY_1];
1990 keys.sort_unstable();
1991 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
1992 }
1993
1994 #[test]
1995 fn test_lock_keys_dedup() {
1996 let mut keys = lock_keys![];
1997 keys.dedup();
1998 assert_lock_keys_equal(&keys, &lock_keys![]);
1999
2000 let mut keys = lock_keys![LOCK_KEY_1];
2001 keys.dedup();
2002 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2003
2004 let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_1];
2005 keys.dedup();
2006 assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2007 }
2008
2009 #[test]
2010 fn test_lock_keys_truncate() {
2011 let mut keys = lock_keys![];
2012 keys.truncate(5);
2013 assert_lock_keys_equal(&keys, &lock_keys![]);
2014 keys.truncate(0);
2015 assert_lock_keys_equal(&keys, &lock_keys![]);
2016
2017 let mut keys = lock_keys![LOCK_KEY_1];
2018 keys.truncate(5);
2019 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2020 keys.truncate(0);
2021 assert_lock_keys_equal(&keys, &lock_keys![]);
2022
2023 let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_2];
2024 keys.truncate(5);
2025 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2026 keys.truncate(1);
2027 assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2029 }
2030
2031 #[test]
2032 fn test_lock_keys_iter() {
2033 assert_eq!(lock_keys![].iter().collect::<Vec<_>>(), Vec::<&LockKey>::new());
2034
2035 assert_eq!(lock_keys![LOCK_KEY_1].iter().collect::<Vec<_>>(), vec![&LOCK_KEY_1]);
2036
2037 assert_eq!(
2038 lock_keys![LOCK_KEY_1, LOCK_KEY_2].iter().collect::<Vec<_>>(),
2039 vec![&LOCK_KEY_1, &LOCK_KEY_2]
2040 );
2041 }
2042}