1use crate::checksum::Checksum;
6use crate::filesystem::FxFilesystem;
7use crate::log::*;
8use crate::lsm_tree::types::Item;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::allocator::{AllocatorItem, Reservation};
11use crate::object_store::object_manager::{ObjectManager, reserved_space_from_journal_usage};
12use crate::object_store::object_record::{
13 FxfsKey, FxfsKeyV40, FxfsKeyV49, ObjectItem, ObjectItemV40, ObjectItemV41, ObjectItemV43,
14 ObjectItemV46, ObjectItemV47, ObjectItemV49, ObjectItemV50, ObjectItemV55, ObjectKey,
15 ObjectKeyData, ObjectValue, ProjectProperty,
16};
17use crate::object_store::{AttributeId, AttributeKey, ProjectId};
18use crate::serialized_types::{Migrate, Versioned, migrate_nodefault, migrate_to_version};
19use anyhow::Error;
20use either::{Either, Left, Right};
21use fprint::TypeFingerprint;
22use fuchsia_sync::Mutex;
23use futures::future::poll_fn;
24use futures::pin_mut;
25use rustc_hash::FxHashMap as HashMap;
26use scopeguard::ScopeGuard;
27use serde::{Deserialize, Serialize};
28use std::cell::UnsafeCell;
29use std::cmp::Ordering;
30use std::collections::BTreeSet;
31use std::collections::hash_map::Entry;
32use std::marker::PhantomPinned;
33use std::ops::{Deref, DerefMut, Range};
34use std::sync::Arc;
35use std::task::{Poll, Waker};
36use std::{fmt, mem};
37
38#[derive(Clone, Copy, Default)]
42pub struct Options<'a> {
43 pub skip_journal_checks: bool,
46
47 pub skip_key_roll: bool,
49
50 pub borrow_metadata_space: bool,
55
56 pub allocator_reservation: Option<&'a Reservation>,
61}
62
63pub const TRANSACTION_MAX_JOURNAL_USAGE: u64 = 24_576;
70pub const TRANSACTION_METADATA_MAX_AMOUNT: u64 =
71 reserved_space_from_journal_usage(TRANSACTION_MAX_JOURNAL_USAGE);
72
73#[must_use]
74pub struct TransactionLocks<'a>(pub WriteGuard<'a>);
75
76pub type Mutation = MutationV55;
81
82#[derive(
83 Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned,
84)]
85#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
86pub enum MutationV55 {
87 ObjectStore(ObjectStoreMutationV55),
88 EncryptedObjectStore(#[serde(with = "crate::zerocopy_serialization")] Box<[u8]>),
89 Allocator(AllocatorMutationV32),
90 BeginFlush,
92 EndFlush,
95 DeleteVolume,
97 UpdateBorrowed(u64),
98 UpdateMutationsKey(UpdateMutationsKey),
99 CreateInternalDir(u64),
100}
101
102#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
103#[migrate_to_version(MutationV55)]
104pub enum MutationV54 {
105 ObjectStore(ObjectStoreMutationV54),
106 EncryptedObjectStore(#[serde(with = "crate::zerocopy_serialization")] Box<[u8]>),
107 Allocator(AllocatorMutationV32),
108 BeginFlush,
109 EndFlush,
110 DeleteVolume,
111 UpdateBorrowed(u64),
112 UpdateMutationsKey(UpdateMutationsKey),
113 CreateInternalDir(u64),
114}
115
116#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
117#[migrate_to_version(MutationV54)]
118pub enum MutationV50 {
119 ObjectStore(ObjectStoreMutationV50),
120 EncryptedObjectStore(#[serde(with = "crate::zerocopy_serialization")] Box<[u8]>),
121 Allocator(AllocatorMutationV32),
122 BeginFlush,
123 EndFlush,
124 DeleteVolume,
125 UpdateBorrowed(u64),
126 UpdateMutationsKey(UpdateMutationsKeyV49),
127 CreateInternalDir(u64),
128}
129
130#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
131#[migrate_to_version(MutationV50)]
132pub enum MutationV49 {
133 ObjectStore(ObjectStoreMutationV49),
134 EncryptedObjectStore(Box<[u8]>),
135 Allocator(AllocatorMutationV32),
136 BeginFlush,
137 EndFlush,
138 DeleteVolume,
139 UpdateBorrowed(u64),
140 UpdateMutationsKey(UpdateMutationsKeyV49),
141 CreateInternalDir(u64),
142}
143
144#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
145#[migrate_to_version(MutationV49)]
146pub enum MutationV47 {
147 ObjectStore(ObjectStoreMutationV47),
148 EncryptedObjectStore(Box<[u8]>),
149 Allocator(AllocatorMutationV32),
150 BeginFlush,
151 EndFlush,
152 DeleteVolume,
153 UpdateBorrowed(u64),
154 UpdateMutationsKey(UpdateMutationsKeyV40),
155 CreateInternalDir(u64),
156}
157
158#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
159#[migrate_to_version(MutationV47)]
160pub enum MutationV46 {
161 ObjectStore(ObjectStoreMutationV46),
162 EncryptedObjectStore(Box<[u8]>),
163 Allocator(AllocatorMutationV32),
164 BeginFlush,
165 EndFlush,
166 DeleteVolume,
167 UpdateBorrowed(u64),
168 UpdateMutationsKey(UpdateMutationsKeyV40),
169 CreateInternalDir(u64),
170}
171
172#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
173#[migrate_to_version(MutationV46)]
174pub enum MutationV43 {
175 ObjectStore(ObjectStoreMutationV43),
176 EncryptedObjectStore(Box<[u8]>),
177 Allocator(AllocatorMutationV32),
178 BeginFlush,
179 EndFlush,
180 DeleteVolume,
181 UpdateBorrowed(u64),
182 UpdateMutationsKey(UpdateMutationsKeyV40),
183 CreateInternalDir(u64),
184}
185
186#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
187#[migrate_to_version(MutationV43)]
188pub enum MutationV41 {
189 ObjectStore(ObjectStoreMutationV41),
190 EncryptedObjectStore(Box<[u8]>),
191 Allocator(AllocatorMutationV32),
192 BeginFlush,
193 EndFlush,
194 DeleteVolume,
195 UpdateBorrowed(u64),
196 UpdateMutationsKey(UpdateMutationsKeyV40),
197 CreateInternalDir(u64),
198}
199
200#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
201#[migrate_to_version(MutationV41)]
202pub enum MutationV40 {
203 ObjectStore(ObjectStoreMutationV40),
204 EncryptedObjectStore(Box<[u8]>),
205 Allocator(AllocatorMutationV32),
206 BeginFlush,
207 EndFlush,
208 DeleteVolume,
209 UpdateBorrowed(u64),
210 UpdateMutationsKey(UpdateMutationsKeyV40),
211 CreateInternalDir(u64),
212}
213
214impl Mutation {
215 pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
216 Mutation::ObjectStore(ObjectStoreMutation {
217 item: Item::new(key, value),
218 op: Operation::Insert,
219 })
220 }
221
222 pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
223 Mutation::ObjectStore(ObjectStoreMutation {
224 item: Item::new(key, value),
225 op: Operation::ReplaceOrInsert,
226 })
227 }
228
229 pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
230 Mutation::ObjectStore(ObjectStoreMutation {
231 item: Item::new(key, value),
232 op: Operation::Merge,
233 })
234 }
235
236 pub fn update_mutations_key(key: FxfsKey) -> Self {
237 Mutation::UpdateMutationsKey(key.into())
238 }
239}
240
241pub type ObjectStoreMutation = ObjectStoreMutationV55;
245
246#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
247#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
248pub struct ObjectStoreMutationV55 {
249 pub item: ObjectItemV55,
250 pub op: Operation,
251}
252
253#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
254#[migrate_to_version(ObjectStoreMutationV55)]
255#[migrate_nodefault]
256pub struct ObjectStoreMutationV54 {
257 pub item: crate::object_store::object_record::ObjectItemV54,
258 pub op: Operation,
259}
260
261#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
262#[migrate_to_version(ObjectStoreMutationV54)]
263#[migrate_nodefault]
264pub struct ObjectStoreMutationV50 {
265 pub item: ObjectItemV50,
266 pub op: OperationV32,
267}
268
269#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
270#[migrate_to_version(ObjectStoreMutationV50)]
271#[migrate_nodefault]
272pub struct ObjectStoreMutationV49 {
273 pub item: ObjectItemV49,
274 pub op: OperationV32,
275}
276
277#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
278#[migrate_to_version(ObjectStoreMutationV49)]
279#[migrate_nodefault]
280pub struct ObjectStoreMutationV47 {
281 pub item: ObjectItemV47,
282 pub op: OperationV32,
283}
284
285#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
286#[migrate_to_version(ObjectStoreMutationV47)]
287#[migrate_nodefault]
288pub struct ObjectStoreMutationV46 {
289 pub item: ObjectItemV46,
290 pub op: OperationV32,
291}
292
293#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
294#[migrate_to_version(ObjectStoreMutationV46)]
295#[migrate_nodefault]
296pub struct ObjectStoreMutationV43 {
297 pub item: ObjectItemV43,
298 pub op: OperationV32,
299}
300
301#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
302#[migrate_to_version(ObjectStoreMutationV43)]
303#[migrate_nodefault]
304pub struct ObjectStoreMutationV41 {
305 pub item: ObjectItemV41,
306 pub op: OperationV32,
307}
308
309#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
310#[migrate_nodefault]
311#[migrate_to_version(ObjectStoreMutationV41)]
312pub struct ObjectStoreMutationV40 {
313 pub item: ObjectItemV40,
314 pub op: OperationV32,
315}
316
317pub type Operation = OperationV32;
319
320#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
321#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
322pub enum OperationV32 {
323 Insert,
324 ReplaceOrInsert,
325 Merge,
326}
327
328impl Ord for ObjectStoreMutation {
329 fn cmp(&self, other: &Self) -> Ordering {
330 self.item.key.cmp(&other.item.key)
331 }
332}
333
334impl PartialOrd for ObjectStoreMutation {
335 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
336 Some(self.cmp(other))
337 }
338}
339
340impl PartialEq for ObjectStoreMutation {
341 fn eq(&self, other: &Self) -> bool {
342 self.item.key.eq(&other.item.key)
343 }
344}
345
346impl Eq for ObjectStoreMutation {}
347
348impl Ord for AllocatorItem {
349 fn cmp(&self, other: &Self) -> Ordering {
350 self.key.cmp(&other.key)
351 }
352}
353
354impl PartialOrd for AllocatorItem {
355 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
356 Some(self.cmp(other))
357 }
358}
359
360pub type DeviceRange = DeviceRangeV32;
363
364#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypeFingerprint)]
365#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
366pub struct DeviceRangeV32(pub Range<u64>);
367
368impl Deref for DeviceRange {
369 type Target = Range<u64>;
370
371 fn deref(&self) -> &Self::Target {
372 &self.0
373 }
374}
375
376impl DerefMut for DeviceRange {
377 fn deref_mut(&mut self) -> &mut Self::Target {
378 &mut self.0
379 }
380}
381
382impl From<Range<u64>> for DeviceRange {
383 fn from(range: Range<u64>) -> Self {
384 Self(range)
385 }
386}
387
388impl Into<Range<u64>> for DeviceRange {
389 fn into(self) -> Range<u64> {
390 self.0
391 }
392}
393
394impl Ord for DeviceRange {
395 fn cmp(&self, other: &Self) -> Ordering {
396 self.start.cmp(&other.start).then(self.end.cmp(&other.end))
397 }
398}
399
400impl PartialOrd for DeviceRange {
401 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
402 Some(self.cmp(other))
403 }
404}
405
406pub type AllocatorMutation = AllocatorMutationV32;
407
408#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint)]
409#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
410pub enum AllocatorMutationV32 {
411 Allocate {
412 device_range: DeviceRangeV32,
413 owner_object_id: u64,
414 },
415 Deallocate {
416 device_range: DeviceRangeV32,
417 owner_object_id: u64,
418 },
419 SetLimit {
420 owner_object_id: u64,
421 bytes: u64,
422 },
423 MarkForDeletion(u64),
429}
430
431pub type UpdateMutationsKey = UpdateMutationsKeyV49;
432
433#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
434pub struct UpdateMutationsKeyV49(pub FxfsKeyV49);
435
436#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
437#[migrate_to_version(UpdateMutationsKeyV49)]
438pub struct UpdateMutationsKeyV40(pub FxfsKeyV40);
439
440impl From<UpdateMutationsKey> for FxfsKey {
441 fn from(outer: UpdateMutationsKey) -> Self {
442 outer.0
443 }
444}
445
446impl From<FxfsKey> for UpdateMutationsKey {
447 fn from(inner: FxfsKey) -> Self {
448 Self(inner)
449 }
450}
451
452#[cfg(fuzz)]
453impl<'a> arbitrary::Arbitrary<'a> for UpdateMutationsKey {
454 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
455 Ok(UpdateMutationsKey::from(FxfsKey::arbitrary(u).unwrap()))
456 }
457}
458
459impl Ord for UpdateMutationsKey {
460 fn cmp(&self, other: &Self) -> Ordering {
461 (self as *const UpdateMutationsKey).cmp(&(other as *const _))
462 }
463}
464
465impl PartialOrd for UpdateMutationsKey {
466 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
467 Some(self.cmp(other))
468 }
469}
470
471impl Eq for UpdateMutationsKey {}
472
473impl PartialEq for UpdateMutationsKey {
474 fn eq(&self, other: &Self) -> bool {
475 std::ptr::eq(self, other)
476 }
477}
478
479#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Copy)]
487pub enum LockKey {
488 Flush {
490 object_id: u64,
491 },
492
493 ObjectAttribute {
495 store_object_id: u64,
496 object_id: u64,
497 attribute_id: AttributeId,
498 },
499
500 Object {
502 store_object_id: u64,
503 object_id: u64,
504 },
505
506 ProjectId {
507 store_object_id: u64,
508 project_id: ProjectId,
509 },
510
511 Truncate {
513 store_object_id: u64,
514 object_id: u64,
515 },
516
517 InternalDirectory {
519 store_object_id: u64,
520 },
521
522 MutationsKeyRoll {
524 store_object_id: u64,
525 },
526
527 PreCacheKeys {
531 store_object_id: u64,
532 },
533}
534
535impl LockKey {
536 pub const fn object_attribute(
537 store_object_id: u64,
538 object_id: u64,
539 attribute_id: AttributeId,
540 ) -> Self {
541 LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
542 }
543
544 pub const fn object(store_object_id: u64, object_id: u64) -> Self {
545 LockKey::Object { store_object_id, object_id }
546 }
547
548 pub const fn flush(object_id: u64) -> Self {
549 LockKey::Flush { object_id }
550 }
551
552 pub const fn truncate(store_object_id: u64, object_id: u64) -> Self {
553 LockKey::Truncate { store_object_id, object_id }
554 }
555
556 pub const fn mutations_key_roll(store_object_id: u64) -> Self {
557 LockKey::MutationsKeyRoll { store_object_id }
558 }
559
560 pub const fn pre_cache_keys(store_object_id: u64) -> Self {
561 LockKey::PreCacheKeys { store_object_id }
562 }
563}
564
565#[derive(Clone, Debug)]
567pub enum LockKeys {
568 None,
569 Inline(LockKey),
570 Vec(Vec<LockKey>),
571}
572
573impl LockKeys {
574 pub fn with_capacity(capacity: usize) -> Self {
575 if capacity > 1 { LockKeys::Vec(Vec::with_capacity(capacity)) } else { LockKeys::None }
576 }
577
578 pub fn push(&mut self, key: LockKey) {
579 match self {
580 Self::None => *self = LockKeys::Inline(key),
581 Self::Inline(inline) => {
582 *self = LockKeys::Vec(vec![*inline, key]);
583 }
584 Self::Vec(vec) => vec.push(key),
585 }
586 }
587
588 pub fn truncate(&mut self, len: usize) {
589 match self {
590 Self::None => {}
591 Self::Inline(_) => {
592 if len == 0 {
593 *self = Self::None;
594 }
595 }
596 Self::Vec(vec) => vec.truncate(len),
597 }
598 }
599
600 fn len(&self) -> usize {
601 match self {
602 Self::None => 0,
603 Self::Inline(_) => 1,
604 Self::Vec(vec) => vec.len(),
605 }
606 }
607
608 fn contains(&self, key: &LockKey) -> bool {
609 match self {
610 Self::None => false,
611 Self::Inline(single) => single == key,
612 Self::Vec(vec) => vec.contains(key),
613 }
614 }
615
616 fn sort_unstable(&mut self) {
617 match self {
618 Self::Vec(vec) => vec.sort_unstable(),
619 _ => {}
620 }
621 }
622
623 fn dedup(&mut self) {
624 match self {
625 Self::Vec(vec) => vec.dedup(),
626 _ => {}
627 }
628 }
629
630 fn iter(&self) -> LockKeysIter<'_> {
631 match self {
632 LockKeys::None => LockKeysIter::None,
633 LockKeys::Inline(key) => LockKeysIter::Inline(key),
634 LockKeys::Vec(keys) => LockKeysIter::Vec(keys.iter()),
635 }
636 }
637}
638
639enum LockKeysIter<'a> {
640 None,
641 Inline(&'a LockKey),
642 Vec(std::slice::Iter<'a, LockKey>),
643}
644
645impl<'a> Iterator for LockKeysIter<'a> {
646 type Item = &'a LockKey;
647 fn next(&mut self) -> Option<Self::Item> {
648 match self {
649 Self::None => None,
650 Self::Inline(inline) => {
651 let next = *inline;
652 *self = Self::None;
653 Some(next)
654 }
655 Self::Vec(vec) => vec.next(),
656 }
657 }
658}
659
660impl Default for LockKeys {
661 fn default() -> Self {
662 LockKeys::None
663 }
664}
665
666#[macro_export]
667macro_rules! lock_keys {
668 () => {
669 $crate::object_store::transaction::LockKeys::None
670 };
671 ($lock_key:expr $(,)?) => {
672 $crate::object_store::transaction::LockKeys::Inline($lock_key)
673 };
674 ($($lock_keys:expr),+ $(,)?) => {
675 $crate::object_store::transaction::LockKeys::Vec(vec![$($lock_keys),+])
676 };
677}
678pub use lock_keys;
679
680pub trait AssociatedObject: Send + Sync {
684 fn will_apply_mutation(&self, _mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
685 }
686}
687
688pub enum AssocObj<'a> {
689 None,
690 Borrowed(&'a dyn AssociatedObject),
691 Owned(Box<dyn AssociatedObject>),
692}
693
694impl AssocObj<'_> {
695 pub fn map<R, F: FnOnce(&dyn AssociatedObject) -> R>(&self, f: F) -> Option<R> {
696 match self {
697 AssocObj::None => None,
698 AssocObj::Borrowed(b) => Some(f(*b)),
699 AssocObj::Owned(o) => Some(f(o.as_ref())),
700 }
701 }
702}
703
704pub struct TxnMutation<'a> {
705 pub object_id: u64,
709
710 pub mutation: Mutation,
712
713 pub associated_object: AssocObj<'a>,
716}
717
718impl Ord for TxnMutation<'_> {
726 fn cmp(&self, other: &Self) -> Ordering {
727 self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
728 }
729}
730
731impl PartialOrd for TxnMutation<'_> {
732 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
733 Some(self.cmp(other))
734 }
735}
736
737impl PartialEq for TxnMutation<'_> {
738 fn eq(&self, other: &Self) -> bool {
739 self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
740 }
741}
742
743impl Eq for TxnMutation<'_> {}
744
745impl std::fmt::Debug for TxnMutation<'_> {
746 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
747 f.debug_struct("TxnMutation")
748 .field("object_id", &self.object_id)
749 .field("mutation", &self.mutation)
750 .finish()
751 }
752}
753
754pub enum MetadataReservation {
755 None,
757
758 Borrowed,
761
762 Reservation(Reservation),
764
765 Hold(u64),
767}
768
769pub struct Transaction<'a> {
771 fs: Arc<FxFilesystem>,
772
773 mutations: BTreeSet<TxnMutation<'a>>,
775
776 txn_locks: LockKeys,
778
779 pub allocator_reservation: Option<&'a Reservation>,
781
782 pub metadata_reservation: MetadataReservation,
784
785 new_objects: BTreeSet<(u64, u64)>,
788
789 checksums: Vec<(Range<u64>, Vec<Checksum>, bool)>,
791
792 includes_write: bool,
794}
795
796impl<'a> Transaction<'a> {
797 pub async fn new(
800 fs: Arc<FxFilesystem>,
801 options: Options<'a>,
802 txn_locks: LockKeys,
803 ) -> Result<Transaction<'a>, Error> {
804 fs.add_transaction(options.skip_journal_checks).await;
805 let fs_clone = fs.clone();
806 let guard = scopeguard::guard((), |_| fs_clone.sub_transaction());
807 let (metadata_reservation, allocator_reservation, hold) =
808 fs.reservation_for_transaction(options).await?;
809
810 let txn_locks = {
811 let lock_manager = fs.lock_manager();
812 let mut write_guard = lock_manager.txn_lock(txn_locks).await;
813 std::mem::take(&mut write_guard.0.lock_keys)
814 };
815 let mut transaction = Transaction {
816 fs,
817 mutations: BTreeSet::new(),
818 txn_locks,
819 allocator_reservation: None,
820 metadata_reservation,
821 new_objects: BTreeSet::new(),
822 checksums: Vec::new(),
823 includes_write: false,
824 };
825
826 ScopeGuard::into_inner(guard);
827 hold.map(|h| h.forget()); transaction.allocator_reservation = allocator_reservation;
829 Ok(transaction)
830 }
831
832 pub fn mutations(&self) -> &BTreeSet<TxnMutation<'a>> {
833 &self.mutations
834 }
835
836 pub fn take_mutations(&mut self) -> BTreeSet<TxnMutation<'a>> {
837 self.new_objects.clear();
838 mem::take(&mut self.mutations)
839 }
840
841 pub fn add(&mut self, object_id: u64, mutation: Mutation) -> Option<Mutation> {
844 self.add_with_object(object_id, mutation, AssocObj::None)
845 }
846
847 pub fn remove(&mut self, object_id: u64, mutation: Mutation) {
849 let txn_mutation = TxnMutation { object_id, mutation, associated_object: AssocObj::None };
850 if self.mutations.remove(&txn_mutation) {
851 if let Mutation::ObjectStore(ObjectStoreMutation {
852 item:
853 ObjectItem {
854 key: ObjectKey { object_id: new_object_id, data: ObjectKeyData::Object },
855 ..
856 },
857 op: Operation::Insert,
858 }) = txn_mutation.mutation
859 {
860 self.new_objects.remove(&(object_id, new_object_id));
861 }
862 }
863 }
864
865 pub fn add_with_object(
868 &mut self,
869 object_id: u64,
870 mutation: Mutation,
871 associated_object: AssocObj<'a>,
872 ) -> Option<Mutation> {
873 assert!(object_id != INVALID_OBJECT_ID);
874 if let Mutation::ObjectStore(ObjectStoreMutation {
875 item:
876 Item {
877 key:
878 ObjectKey { data: ObjectKeyData::Attribute(_, AttributeKey::Extent(_)), .. },
879 ..
880 },
881 ..
882 }) = &mutation
883 {
884 self.includes_write = true;
885 }
886 let txn_mutation = TxnMutation { object_id, mutation, associated_object };
887 self.verify_locks(&txn_mutation);
888 self.mutations.replace(txn_mutation).map(|m| m.mutation)
889 }
890
891 pub fn add_checksum(&mut self, range: Range<u64>, checksums: Vec<Checksum>, first_write: bool) {
892 self.checksums.push((range, checksums, first_write));
893 }
894
895 pub fn includes_write(&self) -> bool {
896 self.includes_write
897 }
898
899 pub fn checksums(&self) -> &[(Range<u64>, Vec<Checksum>, bool)] {
900 &self.checksums
901 }
902
903 pub fn take_checksums(&mut self) -> Vec<(Range<u64>, Vec<Checksum>, bool)> {
904 std::mem::replace(&mut self.checksums, Vec::new())
905 }
906
907 fn verify_locks(&mut self, mutation: &TxnMutation<'_>) {
908 match mutation {
912 TxnMutation {
913 mutation:
914 Mutation::ObjectStore {
915 0: ObjectStoreMutation { item: ObjectItem { key, .. }, op },
916 },
917 object_id: store_object_id,
918 ..
919 } => {
920 match &key.data {
921 ObjectKeyData::Attribute(..) => {
922 }
924 ObjectKeyData::Child { .. }
925 | ObjectKeyData::EncryptedChild(_)
926 | ObjectKeyData::EncryptedCasefoldChild(_)
927 | ObjectKeyData::CasefoldChild { .. }
928 | ObjectKeyData::LegacyCasefoldChild(_) => {
929 let id = key.object_id;
930 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
931 && !self.new_objects.contains(&(*store_object_id, id))
932 {
933 debug_assert!(
934 false,
935 "Not holding required lock for object {id} \
936 in store {store_object_id}"
937 );
938 error!(
939 "Not holding required lock for object {id} in store \
940 {store_object_id}"
941 )
942 }
943 }
944 ObjectKeyData::GraveyardEntry { .. } => {
945 }
947 ObjectKeyData::GraveyardAttributeEntry { .. } => {
948 }
950 ObjectKeyData::Keys => {
951 let id = key.object_id;
952 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
953 && !self.new_objects.contains(&(*store_object_id, id))
954 {
955 debug_assert!(
956 false,
957 "Not holding required lock for object {id} \
958 in store {store_object_id}"
959 );
960 error!(
961 "Not holding required lock for object {id} in store \
962 {store_object_id}"
963 )
964 }
965 }
966 ObjectKeyData::Object => match op {
967 Operation::Insert => {
969 self.new_objects.insert((*store_object_id, key.object_id));
970 }
971 Operation::Merge | Operation::ReplaceOrInsert => {
972 let id = key.object_id;
973 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
974 && !self.new_objects.contains(&(*store_object_id, id))
975 {
976 debug_assert!(
977 false,
978 "Not holding required lock for object {id} \
979 in store {store_object_id}"
980 );
981 error!(
982 "Not holding required lock for object {id} in store \
983 {store_object_id}"
984 )
985 }
986 }
987 },
988 ObjectKeyData::Project { project_id, property: ProjectProperty::Limit } => {
989 if !self.txn_locks.contains(&LockKey::ProjectId {
990 store_object_id: *store_object_id,
991 project_id: *project_id,
992 }) {
993 debug_assert!(
994 false,
995 "Not holding required lock for project limit id {project_id} \
996 in store {store_object_id}"
997 );
998 error!(
999 "Not holding required lock for project limit id {project_id} in \
1000 store {store_object_id}"
1001 )
1002 }
1003 }
1004 ObjectKeyData::Project { property: ProjectProperty::Usage, .. } => match op {
1005 Operation::Insert | Operation::ReplaceOrInsert => {
1006 panic!(
1007 "Project usage is all handled by merging deltas, no inserts or \
1008 replacements should be used"
1009 );
1010 }
1011 Operation::Merge => {}
1013 },
1014 ObjectKeyData::ExtendedAttribute { .. } => {
1015 let id = key.object_id;
1016 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
1017 && !self.new_objects.contains(&(*store_object_id, id))
1018 {
1019 debug_assert!(
1020 false,
1021 "Not holding required lock for object {id} \
1022 in store {store_object_id} while mutating extended attribute"
1023 );
1024 error!(
1025 "Not holding required lock for object {id} in store \
1026 {store_object_id} while mutating extended attribute"
1027 )
1028 }
1029 }
1030 }
1031 }
1032 TxnMutation { mutation: Mutation::DeleteVolume, object_id, .. } => {
1033 if !self.txn_locks.contains(&LockKey::flush(*object_id)) {
1034 debug_assert!(false, "Not holding required lock for DeleteVolume");
1035 error!("Not holding required lock for DeleteVolume");
1036 }
1037 }
1038 _ => {}
1039 }
1040 }
1041
1042 pub fn is_empty(&self) -> bool {
1044 self.mutations.is_empty()
1045 }
1046
1047 pub fn get_object_mutation(
1050 &self,
1051 store_object_id: u64,
1052 key: ObjectKey,
1053 ) -> Option<&ObjectStoreMutation> {
1054 if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
1055 self.mutations.get(&TxnMutation {
1056 object_id: store_object_id,
1057 mutation: Mutation::insert_object(key, ObjectValue::None),
1058 associated_object: AssocObj::None,
1059 })
1060 {
1061 Some(mutation)
1062 } else {
1063 None
1064 }
1065 }
1066
1067 pub async fn commit(mut self) -> Result<u64, Error> {
1069 debug!(txn:? = &self; "Commit");
1070 self.fs.clone().commit_transaction(&mut self, |x| x).await
1071 }
1072
1073 pub async fn commit_with_callback<R: Send>(
1076 mut self,
1077 f: impl FnOnce(u64) -> R + Send,
1078 ) -> Result<R, Error> {
1079 debug!(txn:? = &self; "Commit");
1080 self.fs.clone().commit_transaction(&mut self, f).await
1081 }
1082
1083 pub async fn commit_and_continue(&mut self) -> Result<(), Error> {
1086 debug!(txn:? = self; "Commit");
1087 self.fs.clone().commit_transaction(self, |_| {}).await?;
1088 assert!(self.mutations.is_empty());
1089 self.fs.lock_manager().downgrade_locks(&self.txn_locks);
1090 Ok(())
1091 }
1092}
1093
1094impl Drop for Transaction<'_> {
1095 fn drop(&mut self) {
1096 debug!(txn:? = &self; "Drop");
1099 self.fs.clone().drop_transaction(self);
1100 }
1101}
1102
1103impl std::fmt::Debug for Transaction<'_> {
1104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1105 f.debug_struct("Transaction")
1106 .field("mutations", &self.mutations)
1107 .field("txn_locks", &self.txn_locks)
1108 .field("reservation", &self.allocator_reservation)
1109 .finish()
1110 }
1111}
1112
1113pub enum BorrowedOrOwned<'a, T> {
1114 Borrowed(&'a T),
1115 Owned(T),
1116}
1117
1118impl<T> Deref for BorrowedOrOwned<'_, T> {
1119 type Target = T;
1120
1121 fn deref(&self) -> &Self::Target {
1122 match self {
1123 BorrowedOrOwned::Borrowed(b) => b,
1124 BorrowedOrOwned::Owned(o) => &o,
1125 }
1126 }
1127}
1128
1129impl<'a, T> From<&'a T> for BorrowedOrOwned<'a, T> {
1130 fn from(value: &'a T) -> Self {
1131 BorrowedOrOwned::Borrowed(value)
1132 }
1133}
1134
1135impl<T> From<T> for BorrowedOrOwned<'_, T> {
1136 fn from(value: T) -> Self {
1137 BorrowedOrOwned::Owned(value)
1138 }
1139}
1140
1141pub struct LockManager {
1170 locks: Mutex<Locks>,
1171}
1172
1173struct Locks {
1174 keys: HashMap<LockKey, LockEntry>,
1175}
1176
1177impl Locks {
1178 fn drop_lock(&mut self, key: LockKey, state: LockState) {
1179 if let Entry::Occupied(mut occupied) = self.keys.entry(key) {
1180 let entry = occupied.get_mut();
1181 let wake = match state {
1182 LockState::ReadLock => {
1183 entry.read_count -= 1;
1184 entry.read_count == 0
1185 }
1186 LockState::Locked | LockState::WriteLock => {
1188 entry.state = LockState::ReadLock;
1189 true
1190 }
1191 };
1192 if wake {
1193 unsafe {
1195 entry.wake();
1196 }
1197 if entry.can_remove() {
1198 occupied.remove_entry();
1199 }
1200 }
1201 } else {
1202 unreachable!();
1203 }
1204 }
1205
1206 fn drop_read_locks(&mut self, lock_keys: LockKeys) {
1207 for lock in lock_keys.iter() {
1208 self.drop_lock(*lock, LockState::ReadLock);
1209 }
1210 }
1211
1212 fn drop_write_locks(&mut self, lock_keys: LockKeys) {
1213 for lock in lock_keys.iter() {
1214 self.drop_lock(*lock, LockState::WriteLock);
1217 }
1218 }
1219
1220 fn downgrade_locks(&mut self, lock_keys: &LockKeys) {
1222 for lock in lock_keys.iter() {
1223 unsafe {
1225 self.keys.get_mut(lock).unwrap().downgrade_lock();
1226 }
1227 }
1228 }
1229}
1230
1231#[derive(Debug)]
1232struct LockEntry {
1233 read_count: u64,
1236
1237 state: LockState,
1239
1240 head: *const LockWaker,
1245 tail: *const LockWaker,
1246}
1247
1248unsafe impl Send for LockEntry {}
1249
1250struct LockWaker {
1253 next: UnsafeCell<*const LockWaker>,
1255 prev: UnsafeCell<*const LockWaker>,
1256
1257 key: LockKey,
1260
1261 waker: UnsafeCell<WakerState>,
1263
1264 target_state: LockState,
1266
1267 is_upgrade: bool,
1269
1270 _pin: PhantomPinned,
1272}
1273
1274enum WakerState {
1275 Pending,
1277
1278 Registered(Waker),
1280
1281 Woken,
1283}
1284
1285impl WakerState {
1286 fn is_woken(&self) -> bool {
1287 matches!(self, WakerState::Woken)
1288 }
1289}
1290
1291unsafe impl Send for LockWaker {}
1292unsafe impl Sync for LockWaker {}
1293
1294impl LockWaker {
1295 async fn wait(&self, manager: &LockManager) {
1297 let waker_guard = scopeguard::guard((), |_| {
1299 let mut locks = manager.locks.lock();
1300 unsafe {
1302 if (*self.waker.get()).is_woken() {
1303 if self.is_upgrade {
1305 locks.keys.get_mut(&self.key).unwrap().downgrade_lock();
1306 } else {
1307 locks.drop_lock(self.key, self.target_state);
1308 }
1309 } else {
1310 locks.keys.get_mut(&self.key).unwrap().remove_waker(self);
1313 }
1314 }
1315 });
1316
1317 poll_fn(|cx| {
1318 let _locks = manager.locks.lock();
1319 unsafe {
1321 if (*self.waker.get()).is_woken() {
1322 Poll::Ready(())
1323 } else {
1324 *self.waker.get() = WakerState::Registered(cx.waker().clone());
1325 Poll::Pending
1326 }
1327 }
1328 })
1329 .await;
1330
1331 ScopeGuard::into_inner(waker_guard);
1332 }
1333}
1334
1335#[derive(Copy, Clone, Debug, PartialEq)]
1336enum LockState {
1337 ReadLock,
1339
1340 Locked,
1343
1344 WriteLock,
1346}
1347
1348impl LockManager {
1349 pub fn new() -> Self {
1350 LockManager { locks: Mutex::new(Locks { keys: HashMap::default() }) }
1351 }
1352
1353 pub async fn txn_lock<'a>(&'a self, lock_keys: LockKeys) -> TransactionLocks<'a> {
1357 TransactionLocks(
1358 debug_assert_not_too_long!(self.lock(lock_keys, LockState::Locked)).right().unwrap(),
1359 )
1360 }
1361
1362 async fn lock<'a>(
1365 &'a self,
1366 mut lock_keys: LockKeys,
1367 target_state: LockState,
1368 ) -> Either<ReadGuard<'a>, WriteGuard<'a>> {
1369 let mut guard = match &target_state {
1370 LockState::ReadLock => Left(ReadGuard {
1371 manager: self.into(),
1372 lock_keys: LockKeys::with_capacity(lock_keys.len()),
1373 }),
1374 LockState::Locked | LockState::WriteLock => Right(WriteGuard {
1375 manager: self.into(),
1376 lock_keys: LockKeys::with_capacity(lock_keys.len()),
1377 }),
1378 };
1379 let guard_keys = match &mut guard {
1380 Left(g) => &mut g.lock_keys,
1381 Right(g) => &mut g.lock_keys,
1382 };
1383 lock_keys.sort_unstable();
1384 lock_keys.dedup();
1385 for lock in lock_keys.iter() {
1386 let lock_waker = None;
1387 pin_mut!(lock_waker);
1388 {
1389 let mut locks = self.locks.lock();
1390 match locks.keys.entry(*lock) {
1391 Entry::Vacant(vacant) => {
1392 vacant.insert(LockEntry {
1393 read_count: if let LockState::ReadLock = target_state {
1394 guard_keys.push(*lock);
1395 1
1396 } else {
1397 guard_keys.push(*lock);
1398 0
1399 },
1400 state: target_state,
1401 head: std::ptr::null(),
1402 tail: std::ptr::null(),
1403 });
1404 }
1405 Entry::Occupied(mut occupied) => {
1406 let entry = occupied.get_mut();
1407 if unsafe { entry.is_allowed(target_state, entry.head.is_null()) } {
1409 if let LockState::ReadLock = target_state {
1410 entry.read_count += 1;
1411 guard_keys.push(*lock);
1412 } else {
1413 entry.state = target_state;
1414 guard_keys.push(*lock);
1415 }
1416 } else {
1417 unsafe {
1420 *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1421 next: UnsafeCell::new(std::ptr::null()),
1422 prev: UnsafeCell::new(entry.tail),
1423 key: *lock,
1424 waker: UnsafeCell::new(WakerState::Pending),
1425 target_state: target_state,
1426 is_upgrade: false,
1427 _pin: PhantomPinned,
1428 });
1429 }
1430 let waker = (*lock_waker).as_ref().unwrap();
1431 if entry.tail.is_null() {
1432 entry.head = waker;
1433 } else {
1434 unsafe {
1436 *(*entry.tail).next.get() = waker;
1437 }
1438 }
1439 entry.tail = waker;
1440 }
1441 }
1442 }
1443 }
1444 if let Some(waker) = &*lock_waker {
1445 waker.wait(self).await;
1446 guard_keys.push(*lock);
1447 }
1448 }
1449 guard
1450 }
1451
1452 pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
1454 let mut locks = self.locks.lock();
1455 locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks));
1456 }
1457
1458 pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
1460 self.commit_prepare_keys(&transaction.txn_locks).await;
1461 }
1462
1463 async fn commit_prepare_keys(&self, lock_keys: &LockKeys) {
1464 for lock in lock_keys.iter() {
1465 let lock_waker = None;
1466 pin_mut!(lock_waker);
1467 {
1468 let mut locks = self.locks.lock();
1469 let entry = locks.keys.get_mut(lock).unwrap();
1470 assert_eq!(entry.state, LockState::Locked);
1471
1472 if entry.read_count == 0 {
1473 entry.state = LockState::WriteLock;
1474 } else {
1475 unsafe {
1478 *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1479 next: UnsafeCell::new(entry.head),
1480 prev: UnsafeCell::new(std::ptr::null()),
1481 key: *lock,
1482 waker: UnsafeCell::new(WakerState::Pending),
1483 target_state: LockState::WriteLock,
1484 is_upgrade: true,
1485 _pin: PhantomPinned,
1486 });
1487 }
1488 let waker = (*lock_waker).as_ref().unwrap();
1489 if entry.head.is_null() {
1490 entry.tail = (*lock_waker).as_ref().unwrap();
1491 } else {
1492 unsafe {
1494 *(*entry.head).prev.get() = waker;
1495 }
1496 }
1497 entry.head = waker;
1498 }
1499 }
1500
1501 if let Some(waker) = &*lock_waker {
1502 waker.wait(self).await;
1503 }
1504 }
1505 }
1506
1507 pub async fn read_lock<'a>(&'a self, lock_keys: LockKeys) -> ReadGuard<'a> {
1513 debug_assert_not_too_long!(self.lock(lock_keys, LockState::ReadLock)).left().unwrap()
1514 }
1515
1516 pub async fn write_lock<'a>(&'a self, lock_keys: LockKeys) -> WriteGuard<'a> {
1519 debug_assert_not_too_long!(self.lock(lock_keys, LockState::WriteLock)).right().unwrap()
1520 }
1521
1522 pub fn downgrade_locks(&self, lock_keys: &LockKeys) {
1525 self.locks.lock().downgrade_locks(lock_keys);
1526 }
1527}
1528
1529impl LockEntry {
1531 unsafe fn wake(&mut self) {
1532 if self.head.is_null() || self.state == LockState::WriteLock {
1534 return;
1535 }
1536
1537 let waker = unsafe { &*self.head };
1538
1539 if waker.is_upgrade {
1540 if self.read_count > 0 {
1541 return;
1542 }
1543 } else if !unsafe { self.is_allowed(waker.target_state, true) } {
1544 return;
1545 }
1546
1547 unsafe { self.pop_and_wake() };
1548
1549 if waker.target_state == LockState::WriteLock {
1552 return;
1553 }
1554
1555 while !self.head.is_null() && unsafe { (*self.head).target_state } == LockState::ReadLock {
1556 unsafe { self.pop_and_wake() };
1557 }
1558 }
1559
1560 unsafe fn pop_and_wake(&mut self) {
1561 let waker = unsafe { &*self.head };
1562
1563 self.head = unsafe { *waker.next.get() };
1565 if self.head.is_null() {
1566 self.tail = std::ptr::null()
1567 } else {
1568 unsafe { *(*self.head).prev.get() = std::ptr::null() };
1569 }
1570
1571 if waker.target_state == LockState::ReadLock {
1573 self.read_count += 1;
1574 } else {
1575 self.state = waker.target_state;
1576 }
1577
1578 if let WakerState::Registered(waker) =
1580 std::mem::replace(unsafe { &mut *waker.waker.get() }, WakerState::Woken)
1581 {
1582 waker.wake();
1583 }
1584 }
1585
1586 fn can_remove(&self) -> bool {
1587 self.state == LockState::ReadLock && self.read_count == 0
1588 }
1589
1590 unsafe fn remove_waker(&mut self, waker: &LockWaker) {
1591 unsafe {
1592 let is_first = (*waker.prev.get()).is_null();
1593 if is_first {
1594 self.head = *waker.next.get();
1595 } else {
1596 *(**waker.prev.get()).next.get() = *waker.next.get();
1597 }
1598 if (*waker.next.get()).is_null() {
1599 self.tail = *waker.prev.get();
1600 } else {
1601 *(**waker.next.get()).prev.get() = *waker.prev.get();
1602 }
1603 if is_first {
1604 self.wake();
1607 }
1608 }
1609 }
1610
1611 unsafe fn is_allowed(&self, target_state: LockState, is_head: bool) -> bool {
1615 match self.state {
1616 LockState::ReadLock => {
1617 (self.read_count == 0
1619 || target_state == LockState::Locked
1620 || target_state == LockState::ReadLock)
1621 && is_head
1622 }
1623 LockState::Locked => {
1624 target_state == LockState::ReadLock
1628 && (is_head || unsafe { !(*self.head).is_upgrade })
1629 }
1630 LockState::WriteLock => false,
1631 }
1632 }
1633
1634 unsafe fn downgrade_lock(&mut self) {
1635 assert_eq!(std::mem::replace(&mut self.state, LockState::Locked), LockState::WriteLock);
1636 unsafe { self.wake() };
1637 }
1638}
1639
1640#[must_use]
1641pub struct ReadGuard<'a> {
1642 manager: LockManagerRef<'a>,
1643 lock_keys: LockKeys,
1644}
1645
1646impl ReadGuard<'_> {
1647 pub fn fs(&self) -> Option<&Arc<FxFilesystem>> {
1648 if let LockManagerRef::Owned(fs) = &self.manager { Some(fs) } else { None }
1649 }
1650
1651 pub fn into_owned(mut self, fs: Arc<FxFilesystem>) -> ReadGuard<'static> {
1652 ReadGuard {
1653 manager: LockManagerRef::Owned(fs),
1654 lock_keys: std::mem::replace(&mut self.lock_keys, LockKeys::None),
1655 }
1656 }
1657}
1658
1659impl Drop for ReadGuard<'_> {
1660 fn drop(&mut self) {
1661 let mut locks = self.manager.locks.lock();
1662 locks.drop_read_locks(std::mem::take(&mut self.lock_keys));
1663 }
1664}
1665
1666impl fmt::Debug for ReadGuard<'_> {
1667 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1668 f.debug_struct("ReadGuard")
1669 .field("manager", &(&self.manager as *const _))
1670 .field("lock_keys", &self.lock_keys)
1671 .finish()
1672 }
1673}
1674
1675#[must_use]
1676pub struct WriteGuard<'a> {
1677 manager: LockManagerRef<'a>,
1678 lock_keys: LockKeys,
1679}
1680
1681impl Drop for WriteGuard<'_> {
1682 fn drop(&mut self) {
1683 let mut locks = self.manager.locks.lock();
1684 locks.drop_write_locks(std::mem::take(&mut self.lock_keys));
1685 }
1686}
1687
1688impl fmt::Debug for WriteGuard<'_> {
1689 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1690 f.debug_struct("WriteGuard")
1691 .field("manager", &(&self.manager as *const _))
1692 .field("lock_keys", &self.lock_keys)
1693 .finish()
1694 }
1695}
1696
1697enum LockManagerRef<'a> {
1698 Borrowed(&'a LockManager),
1699 Owned(Arc<FxFilesystem>),
1700}
1701
1702impl Deref for LockManagerRef<'_> {
1703 type Target = LockManager;
1704
1705 fn deref(&self) -> &Self::Target {
1706 match self {
1707 LockManagerRef::Borrowed(m) => m,
1708 LockManagerRef::Owned(f) => f.lock_manager(),
1709 }
1710 }
1711}
1712
1713impl<'a> From<&'a LockManager> for LockManagerRef<'a> {
1714 fn from(value: &'a LockManager) -> Self {
1715 LockManagerRef::Borrowed(value)
1716 }
1717}
1718
1719#[cfg(test)]
1720mod tests {
1721 use super::{AttributeId, LockKey, LockKeys, LockManager, LockState, Mutation, Options};
1722 use crate::filesystem::FxFilesystem;
1723 use fuchsia_async as fasync;
1724 use fuchsia_sync::Mutex;
1725 use futures::channel::oneshot::channel;
1726 use futures::future::FutureExt;
1727 use futures::stream::FuturesUnordered;
1728 use futures::{StreamExt, join, pin_mut};
1729 use std::task::Poll;
1730 use std::time::Duration;
1731 use storage_device::DeviceHolder;
1732 use storage_device::fake_device::FakeDevice;
1733
1734 #[fuchsia::test]
1735 async fn test_simple() {
1736 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1737 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1738 let mut t = fs
1739 .root_store()
1740 .new_transaction(lock_keys![], Options::default())
1741 .await
1742 .expect("new_transaction failed");
1743 t.add(1, Mutation::BeginFlush);
1744 assert!(!t.is_empty());
1745 }
1746
1747 #[fuchsia::test]
1748 async fn test_locks() {
1749 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1750 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1751 let (send1, recv1) = channel();
1752 let (send2, recv2) = channel();
1753 let (send3, recv3) = channel();
1754 let done = Mutex::new(false);
1755 let mut futures = FuturesUnordered::new();
1756 futures.push(
1757 async {
1758 let _t = fs
1759 .root_store()
1760 .new_transaction(
1761 lock_keys![LockKey::object_attribute(1, 2, AttributeId::TEST_ID)],
1762 Options::default(),
1763 )
1764 .await
1765 .expect("new_transaction failed");
1766 send1.send(()).unwrap(); send3.send(()).unwrap(); recv2.await.unwrap();
1769 fasync::Timer::new(Duration::from_millis(100)).await;
1771 assert!(!*done.lock());
1772 }
1773 .boxed(),
1774 );
1775 futures.push(
1776 async {
1777 recv1.await.unwrap();
1778 let _t = fs
1780 .root_store()
1781 .new_transaction(
1782 lock_keys![LockKey::object_attribute(2, 2, AttributeId::TEST_ID)],
1783 Options::default(),
1784 )
1785 .await
1786 .expect("new_transaction failed");
1787 send2.send(()).unwrap();
1789 }
1790 .boxed(),
1791 );
1792 futures.push(
1793 async {
1794 recv3.await.unwrap();
1796 let _t = fs
1797 .root_store()
1798 .new_transaction(
1799 lock_keys![LockKey::object_attribute(1, 2, AttributeId::TEST_ID)],
1800 Options::default(),
1801 )
1802 .await;
1803 *done.lock() = true;
1804 }
1805 .boxed(),
1806 );
1807 while let Some(()) = futures.next().await {}
1808 }
1809
1810 #[fuchsia::test]
1811 async fn test_read_lock_after_write_lock() {
1812 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1813 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1814 let (send1, recv1) = channel();
1815 let (send2, recv2) = channel();
1816 let done = Mutex::new(false);
1817 join!(
1818 async {
1819 let t = fs
1820 .root_store()
1821 .new_transaction(
1822 lock_keys![LockKey::object_attribute(1, 2, AttributeId::TEST_ID)],
1823 Options::default(),
1824 )
1825 .await
1826 .expect("new_transaction failed");
1827 send1.send(()).unwrap(); recv2.await.unwrap();
1829 t.commit().await.expect("commit failed");
1830 *done.lock() = true;
1831 },
1832 async {
1833 recv1.await.unwrap();
1834 let _guard = fs
1836 .lock_manager()
1837 .read_lock(lock_keys![LockKey::object_attribute(1, 2, AttributeId::TEST_ID)])
1838 .await;
1839 send2.send(()).unwrap();
1841 fasync::Timer::new(Duration::from_millis(100)).await;
1844 assert!(!*done.lock());
1845 },
1846 );
1847 }
1848
1849 #[fuchsia::test]
1850 async fn test_write_lock_after_read_lock() {
1851 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1852 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1853 let (send1, recv1) = channel();
1854 let (send2, recv2) = channel();
1855 let done = Mutex::new(false);
1856 join!(
1857 async {
1858 let _guard = fs
1860 .lock_manager()
1861 .read_lock(lock_keys![LockKey::object_attribute(1, 2, AttributeId::TEST_ID)])
1862 .await;
1863 send1.send(()).unwrap();
1865 recv2.await.unwrap();
1866 fasync::Timer::new(Duration::from_millis(100)).await;
1869 assert!(!*done.lock());
1870 },
1871 async {
1872 recv1.await.unwrap();
1873 let t = fs
1874 .root_store()
1875 .new_transaction(
1876 lock_keys![LockKey::object_attribute(1, 2, AttributeId::TEST_ID)],
1877 Options::default(),
1878 )
1879 .await
1880 .expect("new_transaction failed");
1881 send2.send(()).unwrap(); t.commit().await.expect("commit failed");
1883 *done.lock() = true;
1884 },
1885 );
1886 }
1887
1888 #[fuchsia::test]
1889 async fn test_drop_uncommitted_transaction() {
1890 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1891 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1892 let key = lock_keys![LockKey::object(1, 1)];
1893
1894 {
1896 let _write_lock = fs
1897 .root_store()
1898 .new_transaction(key.clone(), Options::default())
1899 .await
1900 .expect("new_transaction failed");
1901 let _read_lock = fs.lock_manager().read_lock(key.clone()).await;
1902 }
1903 {
1905 let _write_lock = fs
1906 .root_store()
1907 .new_transaction(key.clone(), Options::default())
1908 .await
1909 .expect("new_transaction failed");
1910 }
1911 fs.root_store()
1913 .new_transaction(key.clone(), Options::default())
1914 .await
1915 .expect("new_transaction failed");
1916 }
1917
1918 #[fuchsia::test]
1919 async fn test_drop_waiting_write_lock() {
1920 let manager = LockManager::new();
1921 let keys = lock_keys![LockKey::object(1, 1)];
1922 {
1923 let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1924 if let Poll::Ready(_) =
1925 futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1926 {
1927 assert!(false);
1928 }
1929 }
1930 let _ = manager.lock(keys, LockState::WriteLock).await;
1931 }
1932
1933 #[fuchsia::test]
1934 async fn test_write_lock_blocks_everything() {
1935 let manager = LockManager::new();
1936 let keys = lock_keys![LockKey::object(1, 1)];
1937 {
1938 let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1939 if let Poll::Ready(_) =
1940 futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1941 {
1942 assert!(false);
1943 }
1944 if let Poll::Ready(_) =
1945 futures::poll!(manager.lock(keys.clone(), LockState::ReadLock).boxed())
1946 {
1947 assert!(false);
1948 }
1949 }
1950 {
1951 let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1952 }
1953 {
1954 let _guard = manager.lock(keys, LockState::ReadLock).await;
1955 }
1956 }
1957
1958 #[fuchsia::test]
1959 async fn test_downgrade_locks() {
1960 let manager = LockManager::new();
1961 let keys = lock_keys![LockKey::object(1, 1)];
1962 let _guard = manager.txn_lock(keys.clone()).await;
1963 manager.commit_prepare_keys(&keys).await;
1964
1965 let mut read_lock: FuturesUnordered<_> =
1967 std::iter::once(manager.read_lock(keys.clone())).collect();
1968
1969 assert!(futures::poll!(read_lock.next()).is_pending());
1971
1972 manager.downgrade_locks(&keys);
1973
1974 assert!(futures::poll!(read_lock.next()).is_ready());
1976 }
1977
1978 #[fuchsia::test]
1979 async fn test_dropped_write_lock_wakes() {
1980 let manager = LockManager::new();
1981 let keys = lock_keys![LockKey::object(1, 1)];
1982 let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1983 let mut read_lock = FuturesUnordered::new();
1984 read_lock.push(manager.lock(keys.clone(), LockState::ReadLock));
1985
1986 {
1987 let write_lock = manager.lock(keys, LockState::WriteLock);
1988 pin_mut!(write_lock);
1989
1990 assert!(futures::poll!(write_lock).is_pending());
1992
1993 assert!(futures::poll!(read_lock.next()).is_pending());
1995 }
1996
1997 assert!(futures::poll!(read_lock.next()).is_ready());
1999 }
2000
2001 #[fuchsia::test]
2002 async fn test_drop_upgrade() {
2003 let manager = LockManager::new();
2004 let keys = lock_keys![LockKey::object(1, 1)];
2005 let _guard = manager.lock(keys.clone(), LockState::Locked).await;
2006
2007 {
2008 let commit_prepare = manager.commit_prepare_keys(&keys);
2009 pin_mut!(commit_prepare);
2010 let _read_guard = manager.lock(keys.clone(), LockState::ReadLock).await;
2011 assert!(futures::poll!(commit_prepare).is_pending());
2012
2013 }
2016
2017 manager.commit_prepare_keys(&keys).await;
2019 }
2020
2021 #[fasync::run_singlethreaded(test)]
2022 async fn test_woken_upgrade_blocks_reads() {
2023 let manager = LockManager::new();
2024 let keys = lock_keys![LockKey::object(1, 1)];
2025 let guard = manager.lock(keys.clone(), LockState::Locked).await;
2027
2028 let read1 = manager.lock(keys.clone(), LockState::ReadLock).await;
2030
2031 let commit_prepare = manager.commit_prepare_keys(&keys);
2033 pin_mut!(commit_prepare);
2034 assert!(futures::poll!(commit_prepare.as_mut()).is_pending());
2035
2036 let read2 = manager.lock(keys.clone(), LockState::ReadLock);
2038 pin_mut!(read2);
2039 assert!(futures::poll!(read2.as_mut()).is_pending());
2040
2041 std::mem::drop(read1);
2043 assert!(futures::poll!(commit_prepare).is_ready());
2044
2045 assert!(futures::poll!(read2.as_mut()).is_pending());
2047
2048 std::mem::drop(guard);
2050 assert!(futures::poll!(read2).is_ready());
2051 }
2052
2053 static LOCK_KEY_1: LockKey = LockKey::flush(1);
2054 static LOCK_KEY_2: LockKey = LockKey::flush(2);
2055 static LOCK_KEY_3: LockKey = LockKey::flush(3);
2056
2057 fn assert_lock_keys_equal(value: &LockKeys, expected: &LockKeys) {
2059 match (value, expected) {
2060 (LockKeys::None, LockKeys::None) => {}
2061 (LockKeys::Inline(key1), LockKeys::Inline(key2)) => {
2062 if key1 != key2 {
2063 panic!("{key1:?} != {key2:?}");
2064 }
2065 }
2066 (LockKeys::Vec(vec1), LockKeys::Vec(vec2)) => {
2067 if vec1 != vec2 {
2068 panic!("{vec1:?} != {vec2:?}");
2069 }
2070 if vec1.capacity() != vec2.capacity() {
2071 panic!(
2072 "LockKeys have different capacity: {} != {}",
2073 vec1.capacity(),
2074 vec2.capacity()
2075 );
2076 }
2077 }
2078 (_, _) => panic!("{value:?} != {expected:?}"),
2079 }
2080 }
2081
2082 fn assert_lock_keys_equivalent(value: &LockKeys, expected: &LockKeys) {
2084 let value: Vec<_> = value.iter().collect();
2085 let expected: Vec<_> = expected.iter().collect();
2086 assert_eq!(value, expected);
2087 }
2088
2089 #[test]
2090 fn test_lock_keys_macro() {
2091 assert_lock_keys_equal(&lock_keys![], &LockKeys::None);
2092 assert_lock_keys_equal(&lock_keys![LOCK_KEY_1], &LockKeys::Inline(LOCK_KEY_1));
2093 assert_lock_keys_equal(
2094 &lock_keys![LOCK_KEY_1, LOCK_KEY_2],
2095 &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]),
2096 );
2097 }
2098
2099 #[test]
2100 fn test_lock_keys_with_capacity() {
2101 assert_lock_keys_equal(&LockKeys::with_capacity(0), &LockKeys::None);
2102 assert_lock_keys_equal(&LockKeys::with_capacity(1), &LockKeys::None);
2103 assert_lock_keys_equal(&LockKeys::with_capacity(2), &LockKeys::Vec(Vec::with_capacity(2)));
2104 }
2105
2106 #[test]
2107 fn test_lock_keys_len() {
2108 assert_eq!(lock_keys![].len(), 0);
2109 assert_eq!(lock_keys![LOCK_KEY_1].len(), 1);
2110 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].len(), 2);
2111 }
2112
2113 #[test]
2114 fn test_lock_keys_contains() {
2115 assert_eq!(lock_keys![].contains(&LOCK_KEY_1), false);
2116 assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_1), true);
2117 assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_2), false);
2118 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_1), true);
2119 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_2), true);
2120 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_3), false);
2121 }
2122
2123 #[test]
2124 fn test_lock_keys_push() {
2125 let mut keys = lock_keys![];
2126 keys.push(LOCK_KEY_1);
2127 assert_lock_keys_equal(&keys, &LockKeys::Inline(LOCK_KEY_1));
2128 keys.push(LOCK_KEY_2);
2129 assert_lock_keys_equal(&keys, &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]));
2130 keys.push(LOCK_KEY_3);
2131 assert_lock_keys_equivalent(
2132 &keys,
2133 &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2, LOCK_KEY_3]),
2134 );
2135 }
2136
2137 #[test]
2138 fn test_lock_keys_sort_unstable() {
2139 let mut keys = lock_keys![];
2140 keys.sort_unstable();
2141 assert_lock_keys_equal(&keys, &lock_keys![]);
2142
2143 let mut keys = lock_keys![LOCK_KEY_1];
2144 keys.sort_unstable();
2145 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2146
2147 let mut keys = lock_keys![LOCK_KEY_2, LOCK_KEY_1];
2148 keys.sort_unstable();
2149 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2150 }
2151
2152 #[test]
2153 fn test_lock_keys_dedup() {
2154 let mut keys = lock_keys![];
2155 keys.dedup();
2156 assert_lock_keys_equal(&keys, &lock_keys![]);
2157
2158 let mut keys = lock_keys![LOCK_KEY_1];
2159 keys.dedup();
2160 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2161
2162 let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_1];
2163 keys.dedup();
2164 assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2165 }
2166
2167 #[test]
2168 fn test_lock_keys_truncate() {
2169 let mut keys = lock_keys![];
2170 keys.truncate(5);
2171 assert_lock_keys_equal(&keys, &lock_keys![]);
2172 keys.truncate(0);
2173 assert_lock_keys_equal(&keys, &lock_keys![]);
2174
2175 let mut keys = lock_keys![LOCK_KEY_1];
2176 keys.truncate(5);
2177 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2178 keys.truncate(0);
2179 assert_lock_keys_equal(&keys, &lock_keys![]);
2180
2181 let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_2];
2182 keys.truncate(5);
2183 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2184 keys.truncate(1);
2185 assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2187 }
2188
2189 #[test]
2190 fn test_lock_keys_iter() {
2191 assert_eq!(lock_keys![].iter().collect::<Vec<_>>(), Vec::<&LockKey>::new());
2192
2193 assert_eq!(lock_keys![LOCK_KEY_1].iter().collect::<Vec<_>>(), vec![&LOCK_KEY_1]);
2194
2195 assert_eq!(
2196 lock_keys![LOCK_KEY_1, LOCK_KEY_2].iter().collect::<Vec<_>>(),
2197 vec![&LOCK_KEY_1, &LOCK_KEY_2]
2198 );
2199 }
2200}