1use crate::checksum::Checksum;
6use crate::filesystem::{FxFilesystem, TxnGuard};
7use crate::log::*;
8use crate::lsm_tree::types::Item;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::allocator::{AllocatorItem, Reservation};
11use crate::object_store::object_manager::{ObjectManager, reserved_space_from_journal_usage};
12use crate::object_store::object_record::{
13 FxfsKey, FxfsKeyV40, FxfsKeyV49, ObjectItem, ObjectItemV40, ObjectItemV41, ObjectItemV43,
14 ObjectItemV46, ObjectItemV47, ObjectItemV49, ObjectItemV50, ObjectItemV55, ObjectKey,
15 ObjectKeyData, ObjectValue, ProjectProperty,
16};
17use crate::object_store::{AttributeId, AttributeKey, ProjectId};
18use crate::serialized_types::{Migrate, Versioned, migrate_nodefault, migrate_to_version};
19use anyhow::Error;
20use either::{Either, Left, Right};
21use fprint::TypeFingerprint;
22use fuchsia_sync::Mutex;
23use futures::future::poll_fn;
24use futures::pin_mut;
25use rustc_hash::FxHashMap as HashMap;
26use scopeguard::ScopeGuard;
27use serde::{Deserialize, Serialize};
28use std::cell::UnsafeCell;
29use std::cmp::Ordering;
30use std::collections::BTreeSet;
31use std::collections::hash_map::Entry;
32use std::marker::PhantomPinned;
33use std::ops::{Deref, DerefMut, Range};
34use std::sync::Arc;
35use std::task::{Poll, Waker};
36use std::{fmt, mem};
37
38#[derive(Clone, Copy, Default)]
42pub struct Options<'a> {
43 pub skip_journal_checks: bool,
46
47 pub borrow_metadata_space: bool,
52
53 pub allocator_reservation: Option<&'a Reservation>,
58
59 pub txn_guard: Option<&'a TxnGuard<'a>>,
61}
62
63pub const TRANSACTION_MAX_JOURNAL_USAGE: u64 = 24_576;
70pub const TRANSACTION_METADATA_MAX_AMOUNT: u64 =
71 reserved_space_from_journal_usage(TRANSACTION_MAX_JOURNAL_USAGE);
72
73#[must_use]
74pub struct TransactionLocks<'a>(pub WriteGuard<'a>);
75
76pub type Mutation = MutationV55;
81
82#[derive(
83 Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned,
84)]
85#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
86pub enum MutationV55 {
87 ObjectStore(ObjectStoreMutationV55),
88 EncryptedObjectStore(#[serde(with = "crate::zerocopy_serialization")] Box<[u8]>),
89 Allocator(AllocatorMutationV32),
90 BeginFlush,
92 EndFlush,
95 DeleteVolume,
97 UpdateBorrowed(u64),
98 UpdateMutationsKey(UpdateMutationsKey),
99 CreateInternalDir(u64),
100}
101
102#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
103#[migrate_to_version(MutationV55)]
104pub enum MutationV54 {
105 ObjectStore(ObjectStoreMutationV54),
106 EncryptedObjectStore(#[serde(with = "crate::zerocopy_serialization")] Box<[u8]>),
107 Allocator(AllocatorMutationV32),
108 BeginFlush,
109 EndFlush,
110 DeleteVolume,
111 UpdateBorrowed(u64),
112 UpdateMutationsKey(UpdateMutationsKey),
113 CreateInternalDir(u64),
114}
115
116#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
117#[migrate_to_version(MutationV54)]
118pub enum MutationV50 {
119 ObjectStore(ObjectStoreMutationV50),
120 EncryptedObjectStore(#[serde(with = "crate::zerocopy_serialization")] Box<[u8]>),
121 Allocator(AllocatorMutationV32),
122 BeginFlush,
123 EndFlush,
124 DeleteVolume,
125 UpdateBorrowed(u64),
126 UpdateMutationsKey(UpdateMutationsKeyV49),
127 CreateInternalDir(u64),
128}
129
130#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
131#[migrate_to_version(MutationV50)]
132pub enum MutationV49 {
133 ObjectStore(ObjectStoreMutationV49),
134 EncryptedObjectStore(Box<[u8]>),
135 Allocator(AllocatorMutationV32),
136 BeginFlush,
137 EndFlush,
138 DeleteVolume,
139 UpdateBorrowed(u64),
140 UpdateMutationsKey(UpdateMutationsKeyV49),
141 CreateInternalDir(u64),
142}
143
144#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
145#[migrate_to_version(MutationV49)]
146pub enum MutationV47 {
147 ObjectStore(ObjectStoreMutationV47),
148 EncryptedObjectStore(Box<[u8]>),
149 Allocator(AllocatorMutationV32),
150 BeginFlush,
151 EndFlush,
152 DeleteVolume,
153 UpdateBorrowed(u64),
154 UpdateMutationsKey(UpdateMutationsKeyV40),
155 CreateInternalDir(u64),
156}
157
158#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
159#[migrate_to_version(MutationV47)]
160pub enum MutationV46 {
161 ObjectStore(ObjectStoreMutationV46),
162 EncryptedObjectStore(Box<[u8]>),
163 Allocator(AllocatorMutationV32),
164 BeginFlush,
165 EndFlush,
166 DeleteVolume,
167 UpdateBorrowed(u64),
168 UpdateMutationsKey(UpdateMutationsKeyV40),
169 CreateInternalDir(u64),
170}
171
172#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
173#[migrate_to_version(MutationV46)]
174pub enum MutationV43 {
175 ObjectStore(ObjectStoreMutationV43),
176 EncryptedObjectStore(Box<[u8]>),
177 Allocator(AllocatorMutationV32),
178 BeginFlush,
179 EndFlush,
180 DeleteVolume,
181 UpdateBorrowed(u64),
182 UpdateMutationsKey(UpdateMutationsKeyV40),
183 CreateInternalDir(u64),
184}
185
186#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
187#[migrate_to_version(MutationV43)]
188pub enum MutationV41 {
189 ObjectStore(ObjectStoreMutationV41),
190 EncryptedObjectStore(Box<[u8]>),
191 Allocator(AllocatorMutationV32),
192 BeginFlush,
193 EndFlush,
194 DeleteVolume,
195 UpdateBorrowed(u64),
196 UpdateMutationsKey(UpdateMutationsKeyV40),
197 CreateInternalDir(u64),
198}
199
200#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
201#[migrate_to_version(MutationV41)]
202pub enum MutationV40 {
203 ObjectStore(ObjectStoreMutationV40),
204 EncryptedObjectStore(Box<[u8]>),
205 Allocator(AllocatorMutationV32),
206 BeginFlush,
207 EndFlush,
208 DeleteVolume,
209 UpdateBorrowed(u64),
210 UpdateMutationsKey(UpdateMutationsKeyV40),
211 CreateInternalDir(u64),
212}
213
214impl Mutation {
215 pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
216 Mutation::ObjectStore(ObjectStoreMutation {
217 item: Item::new(key, value),
218 op: Operation::Insert,
219 })
220 }
221
222 pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
223 Mutation::ObjectStore(ObjectStoreMutation {
224 item: Item::new(key, value),
225 op: Operation::ReplaceOrInsert,
226 })
227 }
228
229 pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
230 Mutation::ObjectStore(ObjectStoreMutation {
231 item: Item::new(key, value),
232 op: Operation::Merge,
233 })
234 }
235
236 pub fn update_mutations_key(key: FxfsKey) -> Self {
237 Mutation::UpdateMutationsKey(key.into())
238 }
239}
240
241pub type ObjectStoreMutation = ObjectStoreMutationV55;
245
246#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
247#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
248pub struct ObjectStoreMutationV55 {
249 pub item: ObjectItemV55,
250 pub op: Operation,
251}
252
253#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
254#[migrate_to_version(ObjectStoreMutationV55)]
255#[migrate_nodefault]
256pub struct ObjectStoreMutationV54 {
257 pub item: crate::object_store::object_record::ObjectItemV54,
258 pub op: Operation,
259}
260
261#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
262#[migrate_to_version(ObjectStoreMutationV54)]
263#[migrate_nodefault]
264pub struct ObjectStoreMutationV50 {
265 pub item: ObjectItemV50,
266 pub op: OperationV32,
267}
268
269#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
270#[migrate_to_version(ObjectStoreMutationV50)]
271#[migrate_nodefault]
272pub struct ObjectStoreMutationV49 {
273 pub item: ObjectItemV49,
274 pub op: OperationV32,
275}
276
277#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
278#[migrate_to_version(ObjectStoreMutationV49)]
279#[migrate_nodefault]
280pub struct ObjectStoreMutationV47 {
281 pub item: ObjectItemV47,
282 pub op: OperationV32,
283}
284
285#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
286#[migrate_to_version(ObjectStoreMutationV47)]
287#[migrate_nodefault]
288pub struct ObjectStoreMutationV46 {
289 pub item: ObjectItemV46,
290 pub op: OperationV32,
291}
292
293#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
294#[migrate_to_version(ObjectStoreMutationV46)]
295#[migrate_nodefault]
296pub struct ObjectStoreMutationV43 {
297 pub item: ObjectItemV43,
298 pub op: OperationV32,
299}
300
301#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
302#[migrate_to_version(ObjectStoreMutationV43)]
303#[migrate_nodefault]
304pub struct ObjectStoreMutationV41 {
305 pub item: ObjectItemV41,
306 pub op: OperationV32,
307}
308
309#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
310#[migrate_nodefault]
311#[migrate_to_version(ObjectStoreMutationV41)]
312pub struct ObjectStoreMutationV40 {
313 pub item: ObjectItemV40,
314 pub op: OperationV32,
315}
316
317pub type Operation = OperationV32;
319
320#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
321#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
322pub enum OperationV32 {
323 Insert,
324 ReplaceOrInsert,
325 Merge,
326}
327
328impl Ord for ObjectStoreMutation {
329 fn cmp(&self, other: &Self) -> Ordering {
330 self.item.key.cmp(&other.item.key)
331 }
332}
333
334impl PartialOrd for ObjectStoreMutation {
335 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
336 Some(self.cmp(other))
337 }
338}
339
340impl PartialEq for ObjectStoreMutation {
341 fn eq(&self, other: &Self) -> bool {
342 self.item.key.eq(&other.item.key)
343 }
344}
345
346impl Eq for ObjectStoreMutation {}
347
348impl Ord for AllocatorItem {
349 fn cmp(&self, other: &Self) -> Ordering {
350 self.key.cmp(&other.key)
351 }
352}
353
354impl PartialOrd for AllocatorItem {
355 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
356 Some(self.cmp(other))
357 }
358}
359
360pub type DeviceRange = DeviceRangeV32;
363
364#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypeFingerprint)]
365#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
366pub struct DeviceRangeV32(pub Range<u64>);
367
368impl Deref for DeviceRange {
369 type Target = Range<u64>;
370
371 fn deref(&self) -> &Self::Target {
372 &self.0
373 }
374}
375
376impl DerefMut for DeviceRange {
377 fn deref_mut(&mut self) -> &mut Self::Target {
378 &mut self.0
379 }
380}
381
382impl From<Range<u64>> for DeviceRange {
383 fn from(range: Range<u64>) -> Self {
384 Self(range)
385 }
386}
387
388impl Into<Range<u64>> for DeviceRange {
389 fn into(self) -> Range<u64> {
390 self.0
391 }
392}
393
394impl Ord for DeviceRange {
395 fn cmp(&self, other: &Self) -> Ordering {
396 self.start.cmp(&other.start).then(self.end.cmp(&other.end))
397 }
398}
399
400impl PartialOrd for DeviceRange {
401 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
402 Some(self.cmp(other))
403 }
404}
405
406pub type AllocatorMutation = AllocatorMutationV32;
407
408#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint)]
409#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
410pub enum AllocatorMutationV32 {
411 Allocate {
412 device_range: DeviceRangeV32,
413 owner_object_id: u64,
414 },
415 Deallocate {
416 device_range: DeviceRangeV32,
417 owner_object_id: u64,
418 },
419 SetLimit {
420 owner_object_id: u64,
421 bytes: u64,
422 },
423 MarkForDeletion(u64),
429}
430
431pub type UpdateMutationsKey = UpdateMutationsKeyV49;
432
433#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
434pub struct UpdateMutationsKeyV49(pub FxfsKeyV49);
435
436#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
437#[migrate_to_version(UpdateMutationsKeyV49)]
438pub struct UpdateMutationsKeyV40(pub FxfsKeyV40);
439
440impl From<UpdateMutationsKey> for FxfsKey {
441 fn from(outer: UpdateMutationsKey) -> Self {
442 outer.0
443 }
444}
445
446impl From<FxfsKey> for UpdateMutationsKey {
447 fn from(inner: FxfsKey) -> Self {
448 Self(inner)
449 }
450}
451
452#[cfg(fuzz)]
453impl<'a> arbitrary::Arbitrary<'a> for UpdateMutationsKey {
454 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
455 Ok(UpdateMutationsKey::from(FxfsKey::arbitrary(u).unwrap()))
456 }
457}
458
459impl Ord for UpdateMutationsKey {
460 fn cmp(&self, other: &Self) -> Ordering {
461 (self as *const UpdateMutationsKey).cmp(&(other as *const _))
462 }
463}
464
465impl PartialOrd for UpdateMutationsKey {
466 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
467 Some(self.cmp(other))
468 }
469}
470
471impl Eq for UpdateMutationsKey {}
472
473impl PartialEq for UpdateMutationsKey {
474 fn eq(&self, other: &Self) -> bool {
475 std::ptr::eq(self, other)
476 }
477}
478
479#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Copy)]
489pub enum LockKey {
490 Filesystem,
492
493 Flush {
495 object_id: u64,
496 },
497
498 ObjectAttribute {
500 store_object_id: u64,
501 object_id: u64,
502 attribute_id: AttributeId,
503 },
504
505 Object {
507 store_object_id: u64,
508 object_id: u64,
509 },
510
511 ProjectId {
512 store_object_id: u64,
513 project_id: ProjectId,
514 },
515
516 Truncate {
518 store_object_id: u64,
519 object_id: u64,
520 },
521
522 InternalDirectory {
524 store_object_id: u64,
525 },
526}
527
528impl LockKey {
529 pub const fn object_attribute(
530 store_object_id: u64,
531 object_id: u64,
532 attribute_id: AttributeId,
533 ) -> Self {
534 LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
535 }
536
537 pub const fn object(store_object_id: u64, object_id: u64) -> Self {
538 LockKey::Object { store_object_id, object_id }
539 }
540
541 pub const fn flush(object_id: u64) -> Self {
542 LockKey::Flush { object_id }
543 }
544
545 pub const fn truncate(store_object_id: u64, object_id: u64) -> Self {
546 LockKey::Truncate { store_object_id, object_id }
547 }
548}
549
550#[derive(Clone, Debug)]
552pub enum LockKeys {
553 None,
554 Inline(LockKey),
555 Vec(Vec<LockKey>),
556}
557
558impl LockKeys {
559 pub fn with_capacity(capacity: usize) -> Self {
560 if capacity > 1 { LockKeys::Vec(Vec::with_capacity(capacity)) } else { LockKeys::None }
561 }
562
563 pub fn push(&mut self, key: LockKey) {
564 match self {
565 Self::None => *self = LockKeys::Inline(key),
566 Self::Inline(inline) => {
567 *self = LockKeys::Vec(vec![*inline, key]);
568 }
569 Self::Vec(vec) => vec.push(key),
570 }
571 }
572
573 pub fn truncate(&mut self, len: usize) {
574 match self {
575 Self::None => {}
576 Self::Inline(_) => {
577 if len == 0 {
578 *self = Self::None;
579 }
580 }
581 Self::Vec(vec) => vec.truncate(len),
582 }
583 }
584
585 fn len(&self) -> usize {
586 match self {
587 Self::None => 0,
588 Self::Inline(_) => 1,
589 Self::Vec(vec) => vec.len(),
590 }
591 }
592
593 fn contains(&self, key: &LockKey) -> bool {
594 match self {
595 Self::None => false,
596 Self::Inline(single) => single == key,
597 Self::Vec(vec) => vec.contains(key),
598 }
599 }
600
601 fn sort_unstable(&mut self) {
602 match self {
603 Self::Vec(vec) => vec.sort_unstable(),
604 _ => {}
605 }
606 }
607
608 fn dedup(&mut self) {
609 match self {
610 Self::Vec(vec) => vec.dedup(),
611 _ => {}
612 }
613 }
614
615 fn iter(&self) -> LockKeysIter<'_> {
616 match self {
617 LockKeys::None => LockKeysIter::None,
618 LockKeys::Inline(key) => LockKeysIter::Inline(key),
619 LockKeys::Vec(keys) => LockKeysIter::Vec(keys.iter()),
620 }
621 }
622}
623
624enum LockKeysIter<'a> {
625 None,
626 Inline(&'a LockKey),
627 Vec(std::slice::Iter<'a, LockKey>),
628}
629
630impl<'a> Iterator for LockKeysIter<'a> {
631 type Item = &'a LockKey;
632 fn next(&mut self) -> Option<Self::Item> {
633 match self {
634 Self::None => None,
635 Self::Inline(inline) => {
636 let next = *inline;
637 *self = Self::None;
638 Some(next)
639 }
640 Self::Vec(vec) => vec.next(),
641 }
642 }
643}
644
645impl Default for LockKeys {
646 fn default() -> Self {
647 LockKeys::None
648 }
649}
650
651#[macro_export]
652macro_rules! lock_keys {
653 () => {
654 $crate::object_store::transaction::LockKeys::None
655 };
656 ($lock_key:expr $(,)?) => {
657 $crate::object_store::transaction::LockKeys::Inline($lock_key)
658 };
659 ($($lock_keys:expr),+ $(,)?) => {
660 $crate::object_store::transaction::LockKeys::Vec(vec![$($lock_keys),+])
661 };
662}
663pub use lock_keys;
664
665pub trait AssociatedObject: Send + Sync {
669 fn will_apply_mutation(&self, _mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
670 }
671}
672
673pub enum AssocObj<'a> {
674 None,
675 Borrowed(&'a dyn AssociatedObject),
676 Owned(Box<dyn AssociatedObject>),
677}
678
679impl AssocObj<'_> {
680 pub fn map<R, F: FnOnce(&dyn AssociatedObject) -> R>(&self, f: F) -> Option<R> {
681 match self {
682 AssocObj::None => None,
683 AssocObj::Borrowed(b) => Some(f(*b)),
684 AssocObj::Owned(o) => Some(f(o.as_ref())),
685 }
686 }
687}
688
689pub struct TxnMutation<'a> {
690 pub object_id: u64,
694
695 pub mutation: Mutation,
697
698 pub associated_object: AssocObj<'a>,
701}
702
703impl Ord for TxnMutation<'_> {
706 fn cmp(&self, other: &Self) -> Ordering {
707 self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
708 }
709}
710
711impl PartialOrd for TxnMutation<'_> {
712 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
713 Some(self.cmp(other))
714 }
715}
716
717impl PartialEq for TxnMutation<'_> {
718 fn eq(&self, other: &Self) -> bool {
719 self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
720 }
721}
722
723impl Eq for TxnMutation<'_> {}
724
725impl std::fmt::Debug for TxnMutation<'_> {
726 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
727 f.debug_struct("TxnMutation")
728 .field("object_id", &self.object_id)
729 .field("mutation", &self.mutation)
730 .finish()
731 }
732}
733
734pub enum MetadataReservation {
735 None,
737
738 Borrowed,
741
742 Reservation(Reservation),
744
745 Hold(u64),
747}
748
749pub struct Transaction<'a> {
751 txn_guard: TxnGuard<'a>,
752
753 mutations: BTreeSet<TxnMutation<'a>>,
755
756 txn_locks: LockKeys,
758
759 pub allocator_reservation: Option<&'a Reservation>,
761
762 pub metadata_reservation: MetadataReservation,
764
765 new_objects: BTreeSet<(u64, u64)>,
768
769 checksums: Vec<(Range<u64>, Vec<Checksum>, bool)>,
771
772 includes_write: bool,
774}
775
776impl<'a> Transaction<'a> {
777 pub async fn new(
780 txn_guard: TxnGuard<'a>,
781 options: Options<'a>,
782 txn_locks: LockKeys,
783 ) -> Result<Transaction<'a>, Error> {
784 txn_guard.fs().add_transaction(options.skip_journal_checks).await;
785 let fs = txn_guard.fs().clone();
786 let guard = scopeguard::guard((), |_| fs.sub_transaction());
787 let (metadata_reservation, allocator_reservation, hold) =
788 txn_guard.fs().reservation_for_transaction(options).await?;
789
790 let txn_locks = {
791 let lock_manager = txn_guard.fs().lock_manager();
792 let mut write_guard = lock_manager.txn_lock(txn_locks).await;
793 std::mem::take(&mut write_guard.0.lock_keys)
794 };
795 let mut transaction = Transaction {
796 txn_guard,
797 mutations: BTreeSet::new(),
798 txn_locks,
799 allocator_reservation: None,
800 metadata_reservation,
801 new_objects: BTreeSet::new(),
802 checksums: Vec::new(),
803 includes_write: false,
804 };
805
806 ScopeGuard::into_inner(guard);
807 hold.map(|h| h.forget()); transaction.allocator_reservation = allocator_reservation;
809 Ok(transaction)
810 }
811
812 pub fn txn_guard(&self) -> &TxnGuard<'_> {
813 &self.txn_guard
814 }
815
816 pub fn mutations(&self) -> &BTreeSet<TxnMutation<'a>> {
817 &self.mutations
818 }
819
820 pub fn take_mutations(&mut self) -> BTreeSet<TxnMutation<'a>> {
821 self.new_objects.clear();
822 mem::take(&mut self.mutations)
823 }
824
825 pub fn add(&mut self, object_id: u64, mutation: Mutation) -> Option<Mutation> {
828 self.add_with_object(object_id, mutation, AssocObj::None)
829 }
830
831 pub fn remove(&mut self, object_id: u64, mutation: Mutation) {
833 let txn_mutation = TxnMutation { object_id, mutation, associated_object: AssocObj::None };
834 if self.mutations.remove(&txn_mutation) {
835 if let Mutation::ObjectStore(ObjectStoreMutation {
836 item:
837 ObjectItem {
838 key: ObjectKey { object_id: new_object_id, data: ObjectKeyData::Object },
839 ..
840 },
841 op: Operation::Insert,
842 }) = txn_mutation.mutation
843 {
844 self.new_objects.remove(&(object_id, new_object_id));
845 }
846 }
847 }
848
849 pub fn add_with_object(
852 &mut self,
853 object_id: u64,
854 mutation: Mutation,
855 associated_object: AssocObj<'a>,
856 ) -> Option<Mutation> {
857 assert!(object_id != INVALID_OBJECT_ID);
858 if let Mutation::ObjectStore(ObjectStoreMutation {
859 item:
860 Item {
861 key:
862 ObjectKey { data: ObjectKeyData::Attribute(_, AttributeKey::Extent(_)), .. },
863 ..
864 },
865 ..
866 }) = &mutation
867 {
868 self.includes_write = true;
869 }
870 let txn_mutation = TxnMutation { object_id, mutation, associated_object };
871 self.verify_locks(&txn_mutation);
872 self.mutations.replace(txn_mutation).map(|m| m.mutation)
873 }
874
875 pub fn add_checksum(&mut self, range: Range<u64>, checksums: Vec<Checksum>, first_write: bool) {
876 self.checksums.push((range, checksums, first_write));
877 }
878
879 pub fn includes_write(&self) -> bool {
880 self.includes_write
881 }
882
883 pub fn checksums(&self) -> &[(Range<u64>, Vec<Checksum>, bool)] {
884 &self.checksums
885 }
886
887 pub fn take_checksums(&mut self) -> Vec<(Range<u64>, Vec<Checksum>, bool)> {
888 std::mem::replace(&mut self.checksums, Vec::new())
889 }
890
891 fn verify_locks(&mut self, mutation: &TxnMutation<'_>) {
892 match mutation {
896 TxnMutation {
897 mutation:
898 Mutation::ObjectStore {
899 0: ObjectStoreMutation { item: ObjectItem { key, .. }, op },
900 },
901 object_id: store_object_id,
902 ..
903 } => {
904 match &key.data {
905 ObjectKeyData::Attribute(..) => {
906 }
908 ObjectKeyData::Child { .. }
909 | ObjectKeyData::EncryptedChild(_)
910 | ObjectKeyData::EncryptedCasefoldChild(_)
911 | ObjectKeyData::CasefoldChild { .. }
912 | ObjectKeyData::LegacyCasefoldChild(_) => {
913 let id = key.object_id;
914 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
915 && !self.new_objects.contains(&(*store_object_id, id))
916 {
917 debug_assert!(
918 false,
919 "Not holding required lock for object {id} \
920 in store {store_object_id}"
921 );
922 error!(
923 "Not holding required lock for object {id} in store \
924 {store_object_id}"
925 )
926 }
927 }
928 ObjectKeyData::GraveyardEntry { .. } => {
929 }
931 ObjectKeyData::GraveyardAttributeEntry { .. } => {
932 }
934 ObjectKeyData::Keys => {
935 let id = key.object_id;
936 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
937 && !self.new_objects.contains(&(*store_object_id, id))
938 {
939 debug_assert!(
940 false,
941 "Not holding required lock for object {id} \
942 in store {store_object_id}"
943 );
944 error!(
945 "Not holding required lock for object {id} in store \
946 {store_object_id}"
947 )
948 }
949 }
950 ObjectKeyData::Object => match op {
951 Operation::Insert => {
953 self.new_objects.insert((*store_object_id, key.object_id));
954 }
955 Operation::Merge | Operation::ReplaceOrInsert => {
956 let id = key.object_id;
957 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
958 && !self.new_objects.contains(&(*store_object_id, id))
959 {
960 debug_assert!(
961 false,
962 "Not holding required lock for object {id} \
963 in store {store_object_id}"
964 );
965 error!(
966 "Not holding required lock for object {id} in store \
967 {store_object_id}"
968 )
969 }
970 }
971 },
972 ObjectKeyData::Project { project_id, property: ProjectProperty::Limit } => {
973 if !self.txn_locks.contains(&LockKey::ProjectId {
974 store_object_id: *store_object_id,
975 project_id: *project_id,
976 }) {
977 debug_assert!(
978 false,
979 "Not holding required lock for project limit id {project_id} \
980 in store {store_object_id}"
981 );
982 error!(
983 "Not holding required lock for project limit id {project_id} in \
984 store {store_object_id}"
985 )
986 }
987 }
988 ObjectKeyData::Project { property: ProjectProperty::Usage, .. } => match op {
989 Operation::Insert | Operation::ReplaceOrInsert => {
990 panic!(
991 "Project usage is all handled by merging deltas, no inserts or \
992 replacements should be used"
993 );
994 }
995 Operation::Merge => {}
997 },
998 ObjectKeyData::ExtendedAttribute { .. } => {
999 let id = key.object_id;
1000 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
1001 && !self.new_objects.contains(&(*store_object_id, id))
1002 {
1003 debug_assert!(
1004 false,
1005 "Not holding required lock for object {id} \
1006 in store {store_object_id} while mutating extended attribute"
1007 );
1008 error!(
1009 "Not holding required lock for object {id} in store \
1010 {store_object_id} while mutating extended attribute"
1011 )
1012 }
1013 }
1014 }
1015 }
1016 TxnMutation { mutation: Mutation::DeleteVolume, object_id, .. } => {
1017 if !self.txn_locks.contains(&LockKey::flush(*object_id)) {
1018 debug_assert!(false, "Not holding required lock for DeleteVolume");
1019 error!("Not holding required lock for DeleteVolume");
1020 }
1021 }
1022 _ => {}
1023 }
1024 }
1025
1026 pub fn is_empty(&self) -> bool {
1028 self.mutations.is_empty()
1029 }
1030
1031 pub fn get_object_mutation(
1034 &self,
1035 store_object_id: u64,
1036 key: ObjectKey,
1037 ) -> Option<&ObjectStoreMutation> {
1038 if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
1039 self.mutations.get(&TxnMutation {
1040 object_id: store_object_id,
1041 mutation: Mutation::insert_object(key, ObjectValue::None),
1042 associated_object: AssocObj::None,
1043 })
1044 {
1045 Some(mutation)
1046 } else {
1047 None
1048 }
1049 }
1050
1051 pub async fn commit(mut self) -> Result<u64, Error> {
1053 debug!(txn:? = &self; "Commit");
1054 self.txn_guard.fs().clone().commit_transaction(&mut self, |x| x).await
1055 }
1056
1057 pub async fn commit_with_callback<R: Send>(
1060 mut self,
1061 f: impl FnOnce(u64) -> R + Send,
1062 ) -> Result<R, Error> {
1063 debug!(txn:? = &self; "Commit");
1064 self.txn_guard.fs().clone().commit_transaction(&mut self, f).await
1065 }
1066
1067 pub async fn commit_and_continue(&mut self) -> Result<(), Error> {
1070 debug!(txn:? = self; "Commit");
1071 self.txn_guard.fs().clone().commit_transaction(self, |_| {}).await?;
1072 assert!(self.mutations.is_empty());
1073 self.txn_guard.fs().lock_manager().downgrade_locks(&self.txn_locks);
1074 Ok(())
1075 }
1076}
1077
1078impl Drop for Transaction<'_> {
1079 fn drop(&mut self) {
1080 debug!(txn:? = &self; "Drop");
1083 self.txn_guard.fs().clone().drop_transaction(self);
1084 }
1085}
1086
1087impl std::fmt::Debug for Transaction<'_> {
1088 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1089 f.debug_struct("Transaction")
1090 .field("mutations", &self.mutations)
1091 .field("txn_locks", &self.txn_locks)
1092 .field("reservation", &self.allocator_reservation)
1093 .finish()
1094 }
1095}
1096
1097pub enum BorrowedOrOwned<'a, T> {
1098 Borrowed(&'a T),
1099 Owned(T),
1100}
1101
1102impl<T> Deref for BorrowedOrOwned<'_, T> {
1103 type Target = T;
1104
1105 fn deref(&self) -> &Self::Target {
1106 match self {
1107 BorrowedOrOwned::Borrowed(b) => b,
1108 BorrowedOrOwned::Owned(o) => &o,
1109 }
1110 }
1111}
1112
1113impl<'a, T> From<&'a T> for BorrowedOrOwned<'a, T> {
1114 fn from(value: &'a T) -> Self {
1115 BorrowedOrOwned::Borrowed(value)
1116 }
1117}
1118
1119impl<T> From<T> for BorrowedOrOwned<'_, T> {
1120 fn from(value: T) -> Self {
1121 BorrowedOrOwned::Owned(value)
1122 }
1123}
1124
1125pub struct LockManager {
1154 locks: Mutex<Locks>,
1155}
1156
1157struct Locks {
1158 keys: HashMap<LockKey, LockEntry>,
1159}
1160
1161impl Locks {
1162 fn drop_lock(&mut self, key: LockKey, state: LockState) {
1163 if let Entry::Occupied(mut occupied) = self.keys.entry(key) {
1164 let entry = occupied.get_mut();
1165 let wake = match state {
1166 LockState::ReadLock => {
1167 entry.read_count -= 1;
1168 entry.read_count == 0
1169 }
1170 LockState::Locked | LockState::WriteLock => {
1172 entry.state = LockState::ReadLock;
1173 true
1174 }
1175 };
1176 if wake {
1177 unsafe {
1179 entry.wake();
1180 }
1181 if entry.can_remove() {
1182 occupied.remove_entry();
1183 }
1184 }
1185 } else {
1186 unreachable!();
1187 }
1188 }
1189
1190 fn drop_read_locks(&mut self, lock_keys: LockKeys) {
1191 for lock in lock_keys.iter() {
1192 self.drop_lock(*lock, LockState::ReadLock);
1193 }
1194 }
1195
1196 fn drop_write_locks(&mut self, lock_keys: LockKeys) {
1197 for lock in lock_keys.iter() {
1198 self.drop_lock(*lock, LockState::WriteLock);
1201 }
1202 }
1203
1204 fn downgrade_locks(&mut self, lock_keys: &LockKeys) {
1206 for lock in lock_keys.iter() {
1207 unsafe {
1209 self.keys.get_mut(lock).unwrap().downgrade_lock();
1210 }
1211 }
1212 }
1213}
1214
1215#[derive(Debug)]
1216struct LockEntry {
1217 read_count: u64,
1220
1221 state: LockState,
1223
1224 head: *const LockWaker,
1229 tail: *const LockWaker,
1230}
1231
1232unsafe impl Send for LockEntry {}
1233
1234struct LockWaker {
1237 next: UnsafeCell<*const LockWaker>,
1239 prev: UnsafeCell<*const LockWaker>,
1240
1241 key: LockKey,
1244
1245 waker: UnsafeCell<WakerState>,
1247
1248 target_state: LockState,
1250
1251 is_upgrade: bool,
1253
1254 _pin: PhantomPinned,
1256}
1257
1258enum WakerState {
1259 Pending,
1261
1262 Registered(Waker),
1264
1265 Woken,
1267}
1268
1269impl WakerState {
1270 fn is_woken(&self) -> bool {
1271 matches!(self, WakerState::Woken)
1272 }
1273}
1274
1275unsafe impl Send for LockWaker {}
1276unsafe impl Sync for LockWaker {}
1277
1278impl LockWaker {
1279 async fn wait(&self, manager: &LockManager) {
1281 let waker_guard = scopeguard::guard((), |_| {
1283 let mut locks = manager.locks.lock();
1284 unsafe {
1286 if (*self.waker.get()).is_woken() {
1287 if self.is_upgrade {
1289 locks.keys.get_mut(&self.key).unwrap().downgrade_lock();
1290 } else {
1291 locks.drop_lock(self.key, self.target_state);
1292 }
1293 } else {
1294 locks.keys.get_mut(&self.key).unwrap().remove_waker(self);
1297 }
1298 }
1299 });
1300
1301 poll_fn(|cx| {
1302 let _locks = manager.locks.lock();
1303 unsafe {
1305 if (*self.waker.get()).is_woken() {
1306 Poll::Ready(())
1307 } else {
1308 *self.waker.get() = WakerState::Registered(cx.waker().clone());
1309 Poll::Pending
1310 }
1311 }
1312 })
1313 .await;
1314
1315 ScopeGuard::into_inner(waker_guard);
1316 }
1317}
1318
1319#[derive(Copy, Clone, Debug, PartialEq)]
1320enum LockState {
1321 ReadLock,
1323
1324 Locked,
1327
1328 WriteLock,
1330}
1331
1332impl LockManager {
1333 pub fn new() -> Self {
1334 LockManager { locks: Mutex::new(Locks { keys: HashMap::default() }) }
1335 }
1336
1337 pub async fn txn_lock<'a>(&'a self, lock_keys: LockKeys) -> TransactionLocks<'a> {
1341 TransactionLocks(
1342 debug_assert_not_too_long!(self.lock(lock_keys, LockState::Locked)).right().unwrap(),
1343 )
1344 }
1345
1346 async fn lock<'a>(
1349 &'a self,
1350 mut lock_keys: LockKeys,
1351 target_state: LockState,
1352 ) -> Either<ReadGuard<'a>, WriteGuard<'a>> {
1353 let mut guard = match &target_state {
1354 LockState::ReadLock => Left(ReadGuard {
1355 manager: self.into(),
1356 lock_keys: LockKeys::with_capacity(lock_keys.len()),
1357 }),
1358 LockState::Locked | LockState::WriteLock => Right(WriteGuard {
1359 manager: self.into(),
1360 lock_keys: LockKeys::with_capacity(lock_keys.len()),
1361 }),
1362 };
1363 let guard_keys = match &mut guard {
1364 Left(g) => &mut g.lock_keys,
1365 Right(g) => &mut g.lock_keys,
1366 };
1367 lock_keys.sort_unstable();
1368 lock_keys.dedup();
1369 for lock in lock_keys.iter() {
1370 let lock_waker = None;
1371 pin_mut!(lock_waker);
1372 {
1373 let mut locks = self.locks.lock();
1374 match locks.keys.entry(*lock) {
1375 Entry::Vacant(vacant) => {
1376 vacant.insert(LockEntry {
1377 read_count: if let LockState::ReadLock = target_state {
1378 guard_keys.push(*lock);
1379 1
1380 } else {
1381 guard_keys.push(*lock);
1382 0
1383 },
1384 state: target_state,
1385 head: std::ptr::null(),
1386 tail: std::ptr::null(),
1387 });
1388 }
1389 Entry::Occupied(mut occupied) => {
1390 let entry = occupied.get_mut();
1391 if unsafe { entry.is_allowed(target_state, entry.head.is_null()) } {
1393 if let LockState::ReadLock = target_state {
1394 entry.read_count += 1;
1395 guard_keys.push(*lock);
1396 } else {
1397 entry.state = target_state;
1398 guard_keys.push(*lock);
1399 }
1400 } else {
1401 unsafe {
1404 *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1405 next: UnsafeCell::new(std::ptr::null()),
1406 prev: UnsafeCell::new(entry.tail),
1407 key: *lock,
1408 waker: UnsafeCell::new(WakerState::Pending),
1409 target_state: target_state,
1410 is_upgrade: false,
1411 _pin: PhantomPinned,
1412 });
1413 }
1414 let waker = (*lock_waker).as_ref().unwrap();
1415 if entry.tail.is_null() {
1416 entry.head = waker;
1417 } else {
1418 unsafe {
1420 *(*entry.tail).next.get() = waker;
1421 }
1422 }
1423 entry.tail = waker;
1424 }
1425 }
1426 }
1427 }
1428 if let Some(waker) = &*lock_waker {
1429 waker.wait(self).await;
1430 guard_keys.push(*lock);
1431 }
1432 }
1433 guard
1434 }
1435
1436 pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
1438 let mut locks = self.locks.lock();
1439 locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks));
1440 }
1441
1442 pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
1444 self.commit_prepare_keys(&transaction.txn_locks).await;
1445 }
1446
1447 async fn commit_prepare_keys(&self, lock_keys: &LockKeys) {
1448 for lock in lock_keys.iter() {
1449 let lock_waker = None;
1450 pin_mut!(lock_waker);
1451 {
1452 let mut locks = self.locks.lock();
1453 let entry = locks.keys.get_mut(lock).unwrap();
1454 assert_eq!(entry.state, LockState::Locked);
1455
1456 if entry.read_count == 0 {
1457 entry.state = LockState::WriteLock;
1458 } else {
1459 unsafe {
1462 *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1463 next: UnsafeCell::new(entry.head),
1464 prev: UnsafeCell::new(std::ptr::null()),
1465 key: *lock,
1466 waker: UnsafeCell::new(WakerState::Pending),
1467 target_state: LockState::WriteLock,
1468 is_upgrade: true,
1469 _pin: PhantomPinned,
1470 });
1471 }
1472 let waker = (*lock_waker).as_ref().unwrap();
1473 if entry.head.is_null() {
1474 entry.tail = (*lock_waker).as_ref().unwrap();
1475 } else {
1476 unsafe {
1478 *(*entry.head).prev.get() = waker;
1479 }
1480 }
1481 entry.head = waker;
1482 }
1483 }
1484
1485 if let Some(waker) = &*lock_waker {
1486 waker.wait(self).await;
1487 }
1488 }
1489 }
1490
1491 pub async fn read_lock<'a>(&'a self, lock_keys: LockKeys) -> ReadGuard<'a> {
1497 debug_assert_not_too_long!(self.lock(lock_keys, LockState::ReadLock)).left().unwrap()
1498 }
1499
1500 pub async fn write_lock<'a>(&'a self, lock_keys: LockKeys) -> WriteGuard<'a> {
1503 debug_assert_not_too_long!(self.lock(lock_keys, LockState::WriteLock)).right().unwrap()
1504 }
1505
1506 pub fn downgrade_locks(&self, lock_keys: &LockKeys) {
1509 self.locks.lock().downgrade_locks(lock_keys);
1510 }
1511}
1512
1513impl LockEntry {
1515 unsafe fn wake(&mut self) {
1516 if self.head.is_null() || self.state == LockState::WriteLock {
1518 return;
1519 }
1520
1521 let waker = unsafe { &*self.head };
1522
1523 if waker.is_upgrade {
1524 if self.read_count > 0 {
1525 return;
1526 }
1527 } else if !unsafe { self.is_allowed(waker.target_state, true) } {
1528 return;
1529 }
1530
1531 unsafe { self.pop_and_wake() };
1532
1533 if waker.target_state == LockState::WriteLock {
1536 return;
1537 }
1538
1539 while !self.head.is_null() && unsafe { (*self.head).target_state } == LockState::ReadLock {
1540 unsafe { self.pop_and_wake() };
1541 }
1542 }
1543
1544 unsafe fn pop_and_wake(&mut self) {
1545 let waker = unsafe { &*self.head };
1546
1547 self.head = unsafe { *waker.next.get() };
1549 if self.head.is_null() {
1550 self.tail = std::ptr::null()
1551 } else {
1552 unsafe { *(*self.head).prev.get() = std::ptr::null() };
1553 }
1554
1555 if waker.target_state == LockState::ReadLock {
1557 self.read_count += 1;
1558 } else {
1559 self.state = waker.target_state;
1560 }
1561
1562 if let WakerState::Registered(waker) =
1564 std::mem::replace(unsafe { &mut *waker.waker.get() }, WakerState::Woken)
1565 {
1566 waker.wake();
1567 }
1568 }
1569
1570 fn can_remove(&self) -> bool {
1571 self.state == LockState::ReadLock && self.read_count == 0
1572 }
1573
1574 unsafe fn remove_waker(&mut self, waker: &LockWaker) {
1575 unsafe {
1576 let is_first = (*waker.prev.get()).is_null();
1577 if is_first {
1578 self.head = *waker.next.get();
1579 } else {
1580 *(**waker.prev.get()).next.get() = *waker.next.get();
1581 }
1582 if (*waker.next.get()).is_null() {
1583 self.tail = *waker.prev.get();
1584 } else {
1585 *(**waker.next.get()).prev.get() = *waker.prev.get();
1586 }
1587 if is_first {
1588 self.wake();
1591 }
1592 }
1593 }
1594
1595 unsafe fn is_allowed(&self, target_state: LockState, is_head: bool) -> bool {
1599 match self.state {
1600 LockState::ReadLock => {
1601 (self.read_count == 0
1603 || target_state == LockState::Locked
1604 || target_state == LockState::ReadLock)
1605 && is_head
1606 }
1607 LockState::Locked => {
1608 target_state == LockState::ReadLock
1612 && (is_head || unsafe { !(*self.head).is_upgrade })
1613 }
1614 LockState::WriteLock => false,
1615 }
1616 }
1617
1618 unsafe fn downgrade_lock(&mut self) {
1619 assert_eq!(std::mem::replace(&mut self.state, LockState::Locked), LockState::WriteLock);
1620 unsafe { self.wake() };
1621 }
1622}
1623
1624#[must_use]
1625pub struct ReadGuard<'a> {
1626 manager: LockManagerRef<'a>,
1627 lock_keys: LockKeys,
1628}
1629
1630impl ReadGuard<'_> {
1631 pub fn fs(&self) -> Option<&Arc<FxFilesystem>> {
1632 if let LockManagerRef::Owned(fs) = &self.manager { Some(fs) } else { None }
1633 }
1634
1635 pub fn into_owned(mut self, fs: Arc<FxFilesystem>) -> ReadGuard<'static> {
1636 ReadGuard {
1637 manager: LockManagerRef::Owned(fs),
1638 lock_keys: std::mem::replace(&mut self.lock_keys, LockKeys::None),
1639 }
1640 }
1641}
1642
1643impl Drop for ReadGuard<'_> {
1644 fn drop(&mut self) {
1645 let mut locks = self.manager.locks.lock();
1646 locks.drop_read_locks(std::mem::take(&mut self.lock_keys));
1647 }
1648}
1649
1650impl fmt::Debug for ReadGuard<'_> {
1651 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1652 f.debug_struct("ReadGuard")
1653 .field("manager", &(&self.manager as *const _))
1654 .field("lock_keys", &self.lock_keys)
1655 .finish()
1656 }
1657}
1658
1659#[must_use]
1660pub struct WriteGuard<'a> {
1661 manager: LockManagerRef<'a>,
1662 lock_keys: LockKeys,
1663}
1664
1665impl Drop for WriteGuard<'_> {
1666 fn drop(&mut self) {
1667 let mut locks = self.manager.locks.lock();
1668 locks.drop_write_locks(std::mem::take(&mut self.lock_keys));
1669 }
1670}
1671
1672impl fmt::Debug for WriteGuard<'_> {
1673 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1674 f.debug_struct("WriteGuard")
1675 .field("manager", &(&self.manager as *const _))
1676 .field("lock_keys", &self.lock_keys)
1677 .finish()
1678 }
1679}
1680
1681enum LockManagerRef<'a> {
1682 Borrowed(&'a LockManager),
1683 Owned(Arc<FxFilesystem>),
1684}
1685
1686impl Deref for LockManagerRef<'_> {
1687 type Target = LockManager;
1688
1689 fn deref(&self) -> &Self::Target {
1690 match self {
1691 LockManagerRef::Borrowed(m) => m,
1692 LockManagerRef::Owned(f) => f.lock_manager(),
1693 }
1694 }
1695}
1696
1697impl<'a> From<&'a LockManager> for LockManagerRef<'a> {
1698 fn from(value: &'a LockManager) -> Self {
1699 LockManagerRef::Borrowed(value)
1700 }
1701}
1702
1703#[cfg(test)]
1704mod tests {
1705 use super::{AttributeId, LockKey, LockKeys, LockManager, LockState, Mutation, Options};
1706 use crate::filesystem::FxFilesystem;
1707 use fuchsia_async as fasync;
1708 use fuchsia_sync::Mutex;
1709 use futures::channel::oneshot::channel;
1710 use futures::future::FutureExt;
1711 use futures::stream::FuturesUnordered;
1712 use futures::{StreamExt, join, pin_mut};
1713 use std::task::Poll;
1714 use std::time::Duration;
1715 use storage_device::DeviceHolder;
1716 use storage_device::fake_device::FakeDevice;
1717
1718 #[fuchsia::test]
1719 async fn test_simple() {
1720 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1721 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1722 let mut t = fs
1723 .clone()
1724 .new_transaction(lock_keys![], Options::default())
1725 .await
1726 .expect("new_transaction failed");
1727 t.add(1, Mutation::BeginFlush);
1728 assert!(!t.is_empty());
1729 }
1730
1731 #[fuchsia::test]
1732 async fn test_locks() {
1733 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1734 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1735 let (send1, recv1) = channel();
1736 let (send2, recv2) = channel();
1737 let (send3, recv3) = channel();
1738 let done = Mutex::new(false);
1739 let mut futures = FuturesUnordered::new();
1740 futures.push(
1741 async {
1742 let _t = fs
1743 .clone()
1744 .new_transaction(
1745 lock_keys![LockKey::object_attribute(1, 2, AttributeId::TEST_ID)],
1746 Options::default(),
1747 )
1748 .await
1749 .expect("new_transaction failed");
1750 send1.send(()).unwrap(); send3.send(()).unwrap(); recv2.await.unwrap();
1753 fasync::Timer::new(Duration::from_millis(100)).await;
1755 assert!(!*done.lock());
1756 }
1757 .boxed(),
1758 );
1759 futures.push(
1760 async {
1761 recv1.await.unwrap();
1762 let _t = fs
1764 .clone()
1765 .new_transaction(
1766 lock_keys![LockKey::object_attribute(2, 2, AttributeId::TEST_ID)],
1767 Options::default(),
1768 )
1769 .await
1770 .expect("new_transaction failed");
1771 send2.send(()).unwrap();
1773 }
1774 .boxed(),
1775 );
1776 futures.push(
1777 async {
1778 recv3.await.unwrap();
1780 let _t = fs
1781 .clone()
1782 .new_transaction(
1783 lock_keys![LockKey::object_attribute(1, 2, AttributeId::TEST_ID)],
1784 Options::default(),
1785 )
1786 .await;
1787 *done.lock() = true;
1788 }
1789 .boxed(),
1790 );
1791 while let Some(()) = futures.next().await {}
1792 }
1793
1794 #[fuchsia::test]
1795 async fn test_read_lock_after_write_lock() {
1796 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1797 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1798 let (send1, recv1) = channel();
1799 let (send2, recv2) = channel();
1800 let done = Mutex::new(false);
1801 join!(
1802 async {
1803 let t = fs
1804 .clone()
1805 .new_transaction(
1806 lock_keys![LockKey::object_attribute(1, 2, AttributeId::TEST_ID)],
1807 Options::default(),
1808 )
1809 .await
1810 .expect("new_transaction failed");
1811 send1.send(()).unwrap(); recv2.await.unwrap();
1813 t.commit().await.expect("commit failed");
1814 *done.lock() = true;
1815 },
1816 async {
1817 recv1.await.unwrap();
1818 let _guard = fs
1820 .lock_manager()
1821 .read_lock(lock_keys![LockKey::object_attribute(1, 2, AttributeId::TEST_ID)])
1822 .await;
1823 send2.send(()).unwrap();
1825 fasync::Timer::new(Duration::from_millis(100)).await;
1828 assert!(!*done.lock());
1829 },
1830 );
1831 }
1832
1833 #[fuchsia::test]
1834 async fn test_write_lock_after_read_lock() {
1835 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1836 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1837 let (send1, recv1) = channel();
1838 let (send2, recv2) = channel();
1839 let done = Mutex::new(false);
1840 join!(
1841 async {
1842 let _guard = fs
1844 .lock_manager()
1845 .read_lock(lock_keys![LockKey::object_attribute(1, 2, AttributeId::TEST_ID)])
1846 .await;
1847 send1.send(()).unwrap();
1849 recv2.await.unwrap();
1850 fasync::Timer::new(Duration::from_millis(100)).await;
1853 assert!(!*done.lock());
1854 },
1855 async {
1856 recv1.await.unwrap();
1857 let t = fs
1858 .clone()
1859 .new_transaction(
1860 lock_keys![LockKey::object_attribute(1, 2, AttributeId::TEST_ID)],
1861 Options::default(),
1862 )
1863 .await
1864 .expect("new_transaction failed");
1865 send2.send(()).unwrap(); t.commit().await.expect("commit failed");
1867 *done.lock() = true;
1868 },
1869 );
1870 }
1871
1872 #[fuchsia::test]
1873 async fn test_drop_uncommitted_transaction() {
1874 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1875 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1876 let key = lock_keys![LockKey::object(1, 1)];
1877
1878 {
1880 let _write_lock = fs
1881 .clone()
1882 .new_transaction(key.clone(), Options::default())
1883 .await
1884 .expect("new_transaction failed");
1885 let _read_lock = fs.lock_manager().read_lock(key.clone()).await;
1886 }
1887 {
1889 let _write_lock = fs
1890 .clone()
1891 .new_transaction(key.clone(), Options::default())
1892 .await
1893 .expect("new_transaction failed");
1894 }
1895 fs.clone()
1897 .new_transaction(key.clone(), Options::default())
1898 .await
1899 .expect("new_transaction failed");
1900 }
1901
1902 #[fuchsia::test]
1903 async fn test_drop_waiting_write_lock() {
1904 let manager = LockManager::new();
1905 let keys = lock_keys![LockKey::object(1, 1)];
1906 {
1907 let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1908 if let Poll::Ready(_) =
1909 futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1910 {
1911 assert!(false);
1912 }
1913 }
1914 let _ = manager.lock(keys, LockState::WriteLock).await;
1915 }
1916
1917 #[fuchsia::test]
1918 async fn test_write_lock_blocks_everything() {
1919 let manager = LockManager::new();
1920 let keys = lock_keys![LockKey::object(1, 1)];
1921 {
1922 let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1923 if let Poll::Ready(_) =
1924 futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1925 {
1926 assert!(false);
1927 }
1928 if let Poll::Ready(_) =
1929 futures::poll!(manager.lock(keys.clone(), LockState::ReadLock).boxed())
1930 {
1931 assert!(false);
1932 }
1933 }
1934 {
1935 let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1936 }
1937 {
1938 let _guard = manager.lock(keys, LockState::ReadLock).await;
1939 }
1940 }
1941
1942 #[fuchsia::test]
1943 async fn test_downgrade_locks() {
1944 let manager = LockManager::new();
1945 let keys = lock_keys![LockKey::object(1, 1)];
1946 let _guard = manager.txn_lock(keys.clone()).await;
1947 manager.commit_prepare_keys(&keys).await;
1948
1949 let mut read_lock: FuturesUnordered<_> =
1951 std::iter::once(manager.read_lock(keys.clone())).collect();
1952
1953 assert!(futures::poll!(read_lock.next()).is_pending());
1955
1956 manager.downgrade_locks(&keys);
1957
1958 assert!(futures::poll!(read_lock.next()).is_ready());
1960 }
1961
1962 #[fuchsia::test]
1963 async fn test_dropped_write_lock_wakes() {
1964 let manager = LockManager::new();
1965 let keys = lock_keys![LockKey::object(1, 1)];
1966 let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1967 let mut read_lock = FuturesUnordered::new();
1968 read_lock.push(manager.lock(keys.clone(), LockState::ReadLock));
1969
1970 {
1971 let write_lock = manager.lock(keys, LockState::WriteLock);
1972 pin_mut!(write_lock);
1973
1974 assert!(futures::poll!(write_lock).is_pending());
1976
1977 assert!(futures::poll!(read_lock.next()).is_pending());
1979 }
1980
1981 assert!(futures::poll!(read_lock.next()).is_ready());
1983 }
1984
1985 #[fuchsia::test]
1986 async fn test_drop_upgrade() {
1987 let manager = LockManager::new();
1988 let keys = lock_keys![LockKey::object(1, 1)];
1989 let _guard = manager.lock(keys.clone(), LockState::Locked).await;
1990
1991 {
1992 let commit_prepare = manager.commit_prepare_keys(&keys);
1993 pin_mut!(commit_prepare);
1994 let _read_guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1995 assert!(futures::poll!(commit_prepare).is_pending());
1996
1997 }
2000
2001 manager.commit_prepare_keys(&keys).await;
2003 }
2004
2005 #[fasync::run_singlethreaded(test)]
2006 async fn test_woken_upgrade_blocks_reads() {
2007 let manager = LockManager::new();
2008 let keys = lock_keys![LockKey::object(1, 1)];
2009 let guard = manager.lock(keys.clone(), LockState::Locked).await;
2011
2012 let read1 = manager.lock(keys.clone(), LockState::ReadLock).await;
2014
2015 let commit_prepare = manager.commit_prepare_keys(&keys);
2017 pin_mut!(commit_prepare);
2018 assert!(futures::poll!(commit_prepare.as_mut()).is_pending());
2019
2020 let read2 = manager.lock(keys.clone(), LockState::ReadLock);
2022 pin_mut!(read2);
2023 assert!(futures::poll!(read2.as_mut()).is_pending());
2024
2025 std::mem::drop(read1);
2027 assert!(futures::poll!(commit_prepare).is_ready());
2028
2029 assert!(futures::poll!(read2.as_mut()).is_pending());
2031
2032 std::mem::drop(guard);
2034 assert!(futures::poll!(read2).is_ready());
2035 }
2036
2037 static LOCK_KEY_1: LockKey = LockKey::flush(1);
2038 static LOCK_KEY_2: LockKey = LockKey::flush(2);
2039 static LOCK_KEY_3: LockKey = LockKey::flush(3);
2040
2041 fn assert_lock_keys_equal(value: &LockKeys, expected: &LockKeys) {
2043 match (value, expected) {
2044 (LockKeys::None, LockKeys::None) => {}
2045 (LockKeys::Inline(key1), LockKeys::Inline(key2)) => {
2046 if key1 != key2 {
2047 panic!("{key1:?} != {key2:?}");
2048 }
2049 }
2050 (LockKeys::Vec(vec1), LockKeys::Vec(vec2)) => {
2051 if vec1 != vec2 {
2052 panic!("{vec1:?} != {vec2:?}");
2053 }
2054 if vec1.capacity() != vec2.capacity() {
2055 panic!(
2056 "LockKeys have different capacity: {} != {}",
2057 vec1.capacity(),
2058 vec2.capacity()
2059 );
2060 }
2061 }
2062 (_, _) => panic!("{value:?} != {expected:?}"),
2063 }
2064 }
2065
2066 fn assert_lock_keys_equivalent(value: &LockKeys, expected: &LockKeys) {
2068 let value: Vec<_> = value.iter().collect();
2069 let expected: Vec<_> = expected.iter().collect();
2070 assert_eq!(value, expected);
2071 }
2072
2073 #[test]
2074 fn test_lock_keys_macro() {
2075 assert_lock_keys_equal(&lock_keys![], &LockKeys::None);
2076 assert_lock_keys_equal(&lock_keys![LOCK_KEY_1], &LockKeys::Inline(LOCK_KEY_1));
2077 assert_lock_keys_equal(
2078 &lock_keys![LOCK_KEY_1, LOCK_KEY_2],
2079 &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]),
2080 );
2081 }
2082
2083 #[test]
2084 fn test_lock_keys_with_capacity() {
2085 assert_lock_keys_equal(&LockKeys::with_capacity(0), &LockKeys::None);
2086 assert_lock_keys_equal(&LockKeys::with_capacity(1), &LockKeys::None);
2087 assert_lock_keys_equal(&LockKeys::with_capacity(2), &LockKeys::Vec(Vec::with_capacity(2)));
2088 }
2089
2090 #[test]
2091 fn test_lock_keys_len() {
2092 assert_eq!(lock_keys![].len(), 0);
2093 assert_eq!(lock_keys![LOCK_KEY_1].len(), 1);
2094 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].len(), 2);
2095 }
2096
2097 #[test]
2098 fn test_lock_keys_contains() {
2099 assert_eq!(lock_keys![].contains(&LOCK_KEY_1), false);
2100 assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_1), true);
2101 assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_2), false);
2102 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_1), true);
2103 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_2), true);
2104 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_3), false);
2105 }
2106
2107 #[test]
2108 fn test_lock_keys_push() {
2109 let mut keys = lock_keys![];
2110 keys.push(LOCK_KEY_1);
2111 assert_lock_keys_equal(&keys, &LockKeys::Inline(LOCK_KEY_1));
2112 keys.push(LOCK_KEY_2);
2113 assert_lock_keys_equal(&keys, &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]));
2114 keys.push(LOCK_KEY_3);
2115 assert_lock_keys_equivalent(
2116 &keys,
2117 &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2, LOCK_KEY_3]),
2118 );
2119 }
2120
2121 #[test]
2122 fn test_lock_keys_sort_unstable() {
2123 let mut keys = lock_keys![];
2124 keys.sort_unstable();
2125 assert_lock_keys_equal(&keys, &lock_keys![]);
2126
2127 let mut keys = lock_keys![LOCK_KEY_1];
2128 keys.sort_unstable();
2129 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2130
2131 let mut keys = lock_keys![LOCK_KEY_2, LOCK_KEY_1];
2132 keys.sort_unstable();
2133 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2134 }
2135
2136 #[test]
2137 fn test_lock_keys_dedup() {
2138 let mut keys = lock_keys![];
2139 keys.dedup();
2140 assert_lock_keys_equal(&keys, &lock_keys![]);
2141
2142 let mut keys = lock_keys![LOCK_KEY_1];
2143 keys.dedup();
2144 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2145
2146 let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_1];
2147 keys.dedup();
2148 assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2149 }
2150
2151 #[test]
2152 fn test_lock_keys_truncate() {
2153 let mut keys = lock_keys![];
2154 keys.truncate(5);
2155 assert_lock_keys_equal(&keys, &lock_keys![]);
2156 keys.truncate(0);
2157 assert_lock_keys_equal(&keys, &lock_keys![]);
2158
2159 let mut keys = lock_keys![LOCK_KEY_1];
2160 keys.truncate(5);
2161 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2162 keys.truncate(0);
2163 assert_lock_keys_equal(&keys, &lock_keys![]);
2164
2165 let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_2];
2166 keys.truncate(5);
2167 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2168 keys.truncate(1);
2169 assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2171 }
2172
2173 #[test]
2174 fn test_lock_keys_iter() {
2175 assert_eq!(lock_keys![].iter().collect::<Vec<_>>(), Vec::<&LockKey>::new());
2176
2177 assert_eq!(lock_keys![LOCK_KEY_1].iter().collect::<Vec<_>>(), vec![&LOCK_KEY_1]);
2178
2179 assert_eq!(
2180 lock_keys![LOCK_KEY_1, LOCK_KEY_2].iter().collect::<Vec<_>>(),
2181 vec![&LOCK_KEY_1, &LOCK_KEY_2]
2182 );
2183 }
2184}