1use crate::checksum::Checksum;
6use crate::filesystem::{FxFilesystem, TxnGuard};
7use crate::log::*;
8use crate::lsm_tree::types::Item;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::AttributeKey;
11use crate::object_store::allocator::{AllocatorItem, Reservation};
12use crate::object_store::object_manager::{ObjectManager, reserved_space_from_journal_usage};
13use crate::object_store::object_record::{
14 FxfsKey, FxfsKeyV40, FxfsKeyV49, ObjectItem, ObjectItemV40, ObjectItemV41, ObjectItemV43,
15 ObjectItemV46, ObjectItemV47, ObjectItemV49, ObjectItemV50, ObjectKey, ObjectKeyData,
16 ObjectValue, ProjectProperty,
17};
18use crate::serialized_types::{Migrate, Versioned, migrate_nodefault, migrate_to_version};
19use anyhow::Error;
20use either::{Either, Left, Right};
21use fprint::TypeFingerprint;
22use fuchsia_sync::Mutex;
23use futures::future::poll_fn;
24use futures::pin_mut;
25use rustc_hash::FxHashMap as HashMap;
26use scopeguard::ScopeGuard;
27use serde::{Deserialize, Serialize};
28use std::cell::UnsafeCell;
29use std::cmp::Ordering;
30use std::collections::BTreeSet;
31use std::collections::hash_map::Entry;
32use std::marker::PhantomPinned;
33use std::ops::{Deref, DerefMut, Range};
34use std::sync::Arc;
35use std::task::{Poll, Waker};
36use std::{fmt, mem};
37
38#[derive(Clone, Copy, Default)]
42pub struct Options<'a> {
43 pub skip_journal_checks: bool,
46
47 pub borrow_metadata_space: bool,
52
53 pub allocator_reservation: Option<&'a Reservation>,
58
59 pub txn_guard: Option<&'a TxnGuard<'a>>,
61}
62
63pub const TRANSACTION_MAX_JOURNAL_USAGE: u64 = 24_576;
70pub const TRANSACTION_METADATA_MAX_AMOUNT: u64 =
71 reserved_space_from_journal_usage(TRANSACTION_MAX_JOURNAL_USAGE);
72
73#[must_use]
74pub struct TransactionLocks<'a>(pub WriteGuard<'a>);
75
76pub type Mutation = MutationV54;
81
82#[derive(
83 Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned,
84)]
85#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
86pub enum MutationV54 {
87 ObjectStore(ObjectStoreMutationV54),
88 EncryptedObjectStore(#[serde(with = "crate::zerocopy_serialization")] Box<[u8]>),
89 Allocator(AllocatorMutationV32),
90 BeginFlush,
91 EndFlush,
92 DeleteVolume,
93 UpdateBorrowed(u64),
94 UpdateMutationsKey(UpdateMutationsKey),
95 CreateInternalDir(u64),
96}
97
98#[derive(
99 Migrate,
100 Clone,
101 Debug,
102 Eq,
103 Ord,
104 PartialEq,
105 PartialOrd,
106 Serialize,
107 Deserialize,
108 TypeFingerprint,
109 Versioned,
110)]
111#[migrate_to_version(MutationV54)]
112#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
113pub enum MutationV50 {
114 ObjectStore(ObjectStoreMutationV50),
115 EncryptedObjectStore(#[serde(with = "crate::zerocopy_serialization")] Box<[u8]>),
116 Allocator(AllocatorMutationV32),
117 BeginFlush,
119 EndFlush,
122 DeleteVolume,
124 UpdateBorrowed(u64),
125 UpdateMutationsKey(UpdateMutationsKeyV49),
126 CreateInternalDir(u64),
127}
128
129#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
130#[migrate_to_version(MutationV50)]
131pub enum MutationV49 {
132 ObjectStore(ObjectStoreMutationV49),
133 EncryptedObjectStore(Box<[u8]>),
134 Allocator(AllocatorMutationV32),
135 BeginFlush,
136 EndFlush,
137 DeleteVolume,
138 UpdateBorrowed(u64),
139 UpdateMutationsKey(UpdateMutationsKeyV49),
140 CreateInternalDir(u64),
141}
142
143#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
144#[migrate_to_version(MutationV49)]
145pub enum MutationV47 {
146 ObjectStore(ObjectStoreMutationV47),
147 EncryptedObjectStore(Box<[u8]>),
148 Allocator(AllocatorMutationV32),
149 BeginFlush,
150 EndFlush,
151 DeleteVolume,
152 UpdateBorrowed(u64),
153 UpdateMutationsKey(UpdateMutationsKeyV40),
154 CreateInternalDir(u64),
155}
156
157#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
158#[migrate_to_version(MutationV47)]
159pub enum MutationV46 {
160 ObjectStore(ObjectStoreMutationV46),
161 EncryptedObjectStore(Box<[u8]>),
162 Allocator(AllocatorMutationV32),
163 BeginFlush,
164 EndFlush,
165 DeleteVolume,
166 UpdateBorrowed(u64),
167 UpdateMutationsKey(UpdateMutationsKeyV40),
168 CreateInternalDir(u64),
169}
170
171#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
172#[migrate_to_version(MutationV46)]
173pub enum MutationV43 {
174 ObjectStore(ObjectStoreMutationV43),
175 EncryptedObjectStore(Box<[u8]>),
176 Allocator(AllocatorMutationV32),
177 BeginFlush,
178 EndFlush,
179 DeleteVolume,
180 UpdateBorrowed(u64),
181 UpdateMutationsKey(UpdateMutationsKeyV40),
182 CreateInternalDir(u64),
183}
184
185#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
186#[migrate_to_version(MutationV43)]
187pub enum MutationV41 {
188 ObjectStore(ObjectStoreMutationV41),
189 EncryptedObjectStore(Box<[u8]>),
190 Allocator(AllocatorMutationV32),
191 BeginFlush,
192 EndFlush,
193 DeleteVolume,
194 UpdateBorrowed(u64),
195 UpdateMutationsKey(UpdateMutationsKeyV40),
196 CreateInternalDir(u64),
197}
198
199#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
200#[migrate_to_version(MutationV41)]
201pub enum MutationV40 {
202 ObjectStore(ObjectStoreMutationV40),
203 EncryptedObjectStore(Box<[u8]>),
204 Allocator(AllocatorMutationV32),
205 BeginFlush,
206 EndFlush,
207 DeleteVolume,
208 UpdateBorrowed(u64),
209 UpdateMutationsKey(UpdateMutationsKeyV40),
210 CreateInternalDir(u64),
211}
212
213impl Mutation {
214 pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
215 Mutation::ObjectStore(ObjectStoreMutation {
216 item: Item::new(key, value),
217 op: Operation::Insert,
218 })
219 }
220
221 pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
222 Mutation::ObjectStore(ObjectStoreMutation {
223 item: Item::new(key, value),
224 op: Operation::ReplaceOrInsert,
225 })
226 }
227
228 pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
229 Mutation::ObjectStore(ObjectStoreMutation {
230 item: Item::new(key, value),
231 op: Operation::Merge,
232 })
233 }
234
235 pub fn update_mutations_key(key: FxfsKey) -> Self {
236 Mutation::UpdateMutationsKey(key.into())
237 }
238}
239
240pub type ObjectStoreMutation = ObjectStoreMutationV54;
244
245#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
246#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
247pub struct ObjectStoreMutationV54 {
248 pub item: crate::object_store::object_record::ObjectItem,
249 pub op: Operation,
250}
251
252#[derive(Migrate, Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
253#[migrate_to_version(ObjectStoreMutationV54)]
254#[migrate_nodefault]
255#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
256pub struct ObjectStoreMutationV50 {
257 pub item: ObjectItemV50,
258 pub op: OperationV32,
259}
260
261#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
262#[migrate_to_version(ObjectStoreMutationV50)]
263#[migrate_nodefault]
264pub struct ObjectStoreMutationV49 {
265 pub item: ObjectItemV49,
266 pub op: OperationV32,
267}
268
269#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
270#[migrate_to_version(ObjectStoreMutationV49)]
271#[migrate_nodefault]
272pub struct ObjectStoreMutationV47 {
273 pub item: ObjectItemV47,
274 pub op: OperationV32,
275}
276
277#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
278#[migrate_to_version(ObjectStoreMutationV47)]
279#[migrate_nodefault]
280pub struct ObjectStoreMutationV46 {
281 pub item: ObjectItemV46,
282 pub op: OperationV32,
283}
284
285#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
286#[migrate_to_version(ObjectStoreMutationV46)]
287#[migrate_nodefault]
288pub struct ObjectStoreMutationV43 {
289 pub item: ObjectItemV43,
290 pub op: OperationV32,
291}
292
293#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
294#[migrate_to_version(ObjectStoreMutationV43)]
295#[migrate_nodefault]
296pub struct ObjectStoreMutationV41 {
297 pub item: ObjectItemV41,
298 pub op: OperationV32,
299}
300
301#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
302#[migrate_nodefault]
303#[migrate_to_version(ObjectStoreMutationV41)]
304pub struct ObjectStoreMutationV40 {
305 pub item: ObjectItemV40,
306 pub op: OperationV32,
307}
308
309pub type Operation = OperationV32;
311
312#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
313#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
314pub enum OperationV32 {
315 Insert,
316 ReplaceOrInsert,
317 Merge,
318}
319
320impl Ord for ObjectStoreMutation {
321 fn cmp(&self, other: &Self) -> Ordering {
322 self.item.key.cmp(&other.item.key)
323 }
324}
325
326impl PartialOrd for ObjectStoreMutation {
327 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
328 Some(self.cmp(other))
329 }
330}
331
332impl PartialEq for ObjectStoreMutation {
333 fn eq(&self, other: &Self) -> bool {
334 self.item.key.eq(&other.item.key)
335 }
336}
337
338impl Eq for ObjectStoreMutation {}
339
340impl Ord for ObjectStoreMutationV50 {
341 fn cmp(&self, other: &Self) -> Ordering {
342 self.item.key.cmp(&other.item.key)
343 }
344}
345
346impl PartialOrd for ObjectStoreMutationV50 {
347 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
348 Some(self.cmp(other))
349 }
350}
351
352impl PartialEq for ObjectStoreMutationV50 {
353 fn eq(&self, other: &Self) -> bool {
354 self.item.key.eq(&other.item.key)
355 }
356}
357
358impl Eq for ObjectStoreMutationV50 {}
359
360impl Ord for AllocatorItem {
361 fn cmp(&self, other: &Self) -> Ordering {
362 self.key.cmp(&other.key)
363 }
364}
365
366impl PartialOrd for AllocatorItem {
367 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
368 Some(self.cmp(other))
369 }
370}
371
372pub type DeviceRange = DeviceRangeV32;
375
376#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypeFingerprint)]
377#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
378pub struct DeviceRangeV32(pub Range<u64>);
379
380impl Deref for DeviceRange {
381 type Target = Range<u64>;
382
383 fn deref(&self) -> &Self::Target {
384 &self.0
385 }
386}
387
388impl DerefMut for DeviceRange {
389 fn deref_mut(&mut self) -> &mut Self::Target {
390 &mut self.0
391 }
392}
393
394impl From<Range<u64>> for DeviceRange {
395 fn from(range: Range<u64>) -> Self {
396 Self(range)
397 }
398}
399
400impl Into<Range<u64>> for DeviceRange {
401 fn into(self) -> Range<u64> {
402 self.0
403 }
404}
405
406impl Ord for DeviceRange {
407 fn cmp(&self, other: &Self) -> Ordering {
408 self.start.cmp(&other.start).then(self.end.cmp(&other.end))
409 }
410}
411
412impl PartialOrd for DeviceRange {
413 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
414 Some(self.cmp(other))
415 }
416}
417
418pub type AllocatorMutation = AllocatorMutationV32;
419
420#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint)]
421#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
422pub enum AllocatorMutationV32 {
423 Allocate {
424 device_range: DeviceRangeV32,
425 owner_object_id: u64,
426 },
427 Deallocate {
428 device_range: DeviceRangeV32,
429 owner_object_id: u64,
430 },
431 SetLimit {
432 owner_object_id: u64,
433 bytes: u64,
434 },
435 MarkForDeletion(u64),
441}
442
443pub type UpdateMutationsKey = UpdateMutationsKeyV49;
444
445#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
446pub struct UpdateMutationsKeyV49(pub FxfsKeyV49);
447
448#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
449#[migrate_to_version(UpdateMutationsKeyV49)]
450pub struct UpdateMutationsKeyV40(pub FxfsKeyV40);
451
452impl From<UpdateMutationsKey> for FxfsKey {
453 fn from(outer: UpdateMutationsKey) -> Self {
454 outer.0
455 }
456}
457
458impl From<FxfsKey> for UpdateMutationsKey {
459 fn from(inner: FxfsKey) -> Self {
460 Self(inner)
461 }
462}
463
464#[cfg(fuzz)]
465impl<'a> arbitrary::Arbitrary<'a> for UpdateMutationsKey {
466 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
467 Ok(UpdateMutationsKey::from(FxfsKey::arbitrary(u).unwrap()))
468 }
469}
470
471impl Ord for UpdateMutationsKey {
472 fn cmp(&self, other: &Self) -> Ordering {
473 (self as *const UpdateMutationsKey).cmp(&(other as *const _))
474 }
475}
476
477impl PartialOrd for UpdateMutationsKey {
478 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
479 Some(self.cmp(other))
480 }
481}
482
483impl Eq for UpdateMutationsKey {}
484
485impl PartialEq for UpdateMutationsKey {
486 fn eq(&self, other: &Self) -> bool {
487 std::ptr::eq(self, other)
488 }
489}
490
491#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Copy)]
501pub enum LockKey {
502 Filesystem,
504
505 Flush {
507 object_id: u64,
508 },
509
510 ObjectAttribute {
512 store_object_id: u64,
513 object_id: u64,
514 attribute_id: u64,
515 },
516
517 Object {
519 store_object_id: u64,
520 object_id: u64,
521 },
522
523 ProjectId {
524 store_object_id: u64,
525 project_id: u64,
526 },
527
528 Truncate {
530 store_object_id: u64,
531 object_id: u64,
532 },
533
534 InternalDirectory {
536 store_object_id: u64,
537 },
538}
539
540impl LockKey {
541 pub const fn object_attribute(store_object_id: u64, object_id: u64, attribute_id: u64) -> Self {
542 LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
543 }
544
545 pub const fn object(store_object_id: u64, object_id: u64) -> Self {
546 LockKey::Object { store_object_id, object_id }
547 }
548
549 pub const fn flush(object_id: u64) -> Self {
550 LockKey::Flush { object_id }
551 }
552
553 pub const fn truncate(store_object_id: u64, object_id: u64) -> Self {
554 LockKey::Truncate { store_object_id, object_id }
555 }
556}
557
558#[derive(Clone, Debug)]
560pub enum LockKeys {
561 None,
562 Inline(LockKey),
563 Vec(Vec<LockKey>),
564}
565
566impl LockKeys {
567 pub fn with_capacity(capacity: usize) -> Self {
568 if capacity > 1 { LockKeys::Vec(Vec::with_capacity(capacity)) } else { LockKeys::None }
569 }
570
571 pub fn push(&mut self, key: LockKey) {
572 match self {
573 Self::None => *self = LockKeys::Inline(key),
574 Self::Inline(inline) => {
575 *self = LockKeys::Vec(vec![*inline, key]);
576 }
577 Self::Vec(vec) => vec.push(key),
578 }
579 }
580
581 pub fn truncate(&mut self, len: usize) {
582 match self {
583 Self::None => {}
584 Self::Inline(_) => {
585 if len == 0 {
586 *self = Self::None;
587 }
588 }
589 Self::Vec(vec) => vec.truncate(len),
590 }
591 }
592
593 fn len(&self) -> usize {
594 match self {
595 Self::None => 0,
596 Self::Inline(_) => 1,
597 Self::Vec(vec) => vec.len(),
598 }
599 }
600
601 fn contains(&self, key: &LockKey) -> bool {
602 match self {
603 Self::None => false,
604 Self::Inline(single) => single == key,
605 Self::Vec(vec) => vec.contains(key),
606 }
607 }
608
609 fn sort_unstable(&mut self) {
610 match self {
611 Self::Vec(vec) => vec.sort_unstable(),
612 _ => {}
613 }
614 }
615
616 fn dedup(&mut self) {
617 match self {
618 Self::Vec(vec) => vec.dedup(),
619 _ => {}
620 }
621 }
622
623 fn iter(&self) -> LockKeysIter<'_> {
624 match self {
625 LockKeys::None => LockKeysIter::None,
626 LockKeys::Inline(key) => LockKeysIter::Inline(key),
627 LockKeys::Vec(keys) => LockKeysIter::Vec(keys.iter()),
628 }
629 }
630}
631
632enum LockKeysIter<'a> {
633 None,
634 Inline(&'a LockKey),
635 Vec(std::slice::Iter<'a, LockKey>),
636}
637
638impl<'a> Iterator for LockKeysIter<'a> {
639 type Item = &'a LockKey;
640 fn next(&mut self) -> Option<Self::Item> {
641 match self {
642 Self::None => None,
643 Self::Inline(inline) => {
644 let next = *inline;
645 *self = Self::None;
646 Some(next)
647 }
648 Self::Vec(vec) => vec.next(),
649 }
650 }
651}
652
653impl Default for LockKeys {
654 fn default() -> Self {
655 LockKeys::None
656 }
657}
658
659#[macro_export]
660macro_rules! lock_keys {
661 () => {
662 $crate::object_store::transaction::LockKeys::None
663 };
664 ($lock_key:expr $(,)?) => {
665 $crate::object_store::transaction::LockKeys::Inline($lock_key)
666 };
667 ($($lock_keys:expr),+ $(,)?) => {
668 $crate::object_store::transaction::LockKeys::Vec(vec![$($lock_keys),+])
669 };
670}
671pub use lock_keys;
672
673pub trait AssociatedObject: Send + Sync {
677 fn will_apply_mutation(&self, _mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
678 }
679}
680
681pub enum AssocObj<'a> {
682 None,
683 Borrowed(&'a dyn AssociatedObject),
684 Owned(Box<dyn AssociatedObject>),
685}
686
687impl AssocObj<'_> {
688 pub fn map<R, F: FnOnce(&dyn AssociatedObject) -> R>(&self, f: F) -> Option<R> {
689 match self {
690 AssocObj::None => None,
691 AssocObj::Borrowed(b) => Some(f(*b)),
692 AssocObj::Owned(o) => Some(f(o.as_ref())),
693 }
694 }
695}
696
697pub struct TxnMutation<'a> {
698 pub object_id: u64,
702
703 pub mutation: Mutation,
705
706 pub associated_object: AssocObj<'a>,
709}
710
711impl Ord for TxnMutation<'_> {
714 fn cmp(&self, other: &Self) -> Ordering {
715 self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
716 }
717}
718
719impl PartialOrd for TxnMutation<'_> {
720 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
721 Some(self.cmp(other))
722 }
723}
724
725impl PartialEq for TxnMutation<'_> {
726 fn eq(&self, other: &Self) -> bool {
727 self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
728 }
729}
730
731impl Eq for TxnMutation<'_> {}
732
733impl std::fmt::Debug for TxnMutation<'_> {
734 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
735 f.debug_struct("TxnMutation")
736 .field("object_id", &self.object_id)
737 .field("mutation", &self.mutation)
738 .finish()
739 }
740}
741
742pub enum MetadataReservation {
743 None,
745
746 Borrowed,
749
750 Reservation(Reservation),
752
753 Hold(u64),
755}
756
757pub struct Transaction<'a> {
759 txn_guard: TxnGuard<'a>,
760
761 mutations: BTreeSet<TxnMutation<'a>>,
763
764 txn_locks: LockKeys,
766
767 pub allocator_reservation: Option<&'a Reservation>,
769
770 pub metadata_reservation: MetadataReservation,
772
773 new_objects: BTreeSet<(u64, u64)>,
776
777 checksums: Vec<(Range<u64>, Vec<Checksum>, bool)>,
779
780 includes_write: bool,
782}
783
784impl<'a> Transaction<'a> {
785 pub async fn new(
788 txn_guard: TxnGuard<'a>,
789 options: Options<'a>,
790 txn_locks: LockKeys,
791 ) -> Result<Transaction<'a>, Error> {
792 txn_guard.fs().add_transaction(options.skip_journal_checks).await;
793 let fs = txn_guard.fs().clone();
794 let guard = scopeguard::guard((), |_| fs.sub_transaction());
795 let (metadata_reservation, allocator_reservation, hold) =
796 txn_guard.fs().reservation_for_transaction(options).await?;
797
798 let txn_locks = {
799 let lock_manager = txn_guard.fs().lock_manager();
800 let mut write_guard = lock_manager.txn_lock(txn_locks).await;
801 std::mem::take(&mut write_guard.0.lock_keys)
802 };
803 let mut transaction = Transaction {
804 txn_guard,
805 mutations: BTreeSet::new(),
806 txn_locks,
807 allocator_reservation: None,
808 metadata_reservation,
809 new_objects: BTreeSet::new(),
810 checksums: Vec::new(),
811 includes_write: false,
812 };
813
814 ScopeGuard::into_inner(guard);
815 hold.map(|h| h.forget()); transaction.allocator_reservation = allocator_reservation;
817 Ok(transaction)
818 }
819
820 pub fn txn_guard(&self) -> &TxnGuard<'_> {
821 &self.txn_guard
822 }
823
824 pub fn mutations(&self) -> &BTreeSet<TxnMutation<'a>> {
825 &self.mutations
826 }
827
828 pub fn take_mutations(&mut self) -> BTreeSet<TxnMutation<'a>> {
829 self.new_objects.clear();
830 mem::take(&mut self.mutations)
831 }
832
833 pub fn add(&mut self, object_id: u64, mutation: Mutation) -> Option<Mutation> {
836 self.add_with_object(object_id, mutation, AssocObj::None)
837 }
838
839 pub fn remove(&mut self, object_id: u64, mutation: Mutation) {
841 let txn_mutation = TxnMutation { object_id, mutation, associated_object: AssocObj::None };
842 if self.mutations.remove(&txn_mutation) {
843 if let Mutation::ObjectStore(ObjectStoreMutation {
844 item:
845 ObjectItem {
846 key: ObjectKey { object_id: new_object_id, data: ObjectKeyData::Object },
847 ..
848 },
849 op: Operation::Insert,
850 }) = txn_mutation.mutation
851 {
852 self.new_objects.remove(&(object_id, new_object_id));
853 }
854 }
855 }
856
857 pub fn add_with_object(
860 &mut self,
861 object_id: u64,
862 mutation: Mutation,
863 associated_object: AssocObj<'a>,
864 ) -> Option<Mutation> {
865 assert!(object_id != INVALID_OBJECT_ID);
866 if let Mutation::ObjectStore(ObjectStoreMutation {
867 item:
868 Item {
869 key:
870 ObjectKey { data: ObjectKeyData::Attribute(_, AttributeKey::Extent(_)), .. },
871 ..
872 },
873 ..
874 }) = &mutation
875 {
876 self.includes_write = true;
877 }
878 let txn_mutation = TxnMutation { object_id, mutation, associated_object };
879 self.verify_locks(&txn_mutation);
880 self.mutations.replace(txn_mutation).map(|m| m.mutation)
881 }
882
883 pub fn add_checksum(&mut self, range: Range<u64>, checksums: Vec<Checksum>, first_write: bool) {
884 self.checksums.push((range, checksums, first_write));
885 }
886
887 pub fn includes_write(&self) -> bool {
888 self.includes_write
889 }
890
891 pub fn checksums(&self) -> &[(Range<u64>, Vec<Checksum>, bool)] {
892 &self.checksums
893 }
894
895 pub fn take_checksums(&mut self) -> Vec<(Range<u64>, Vec<Checksum>, bool)> {
896 std::mem::replace(&mut self.checksums, Vec::new())
897 }
898
899 fn verify_locks(&mut self, mutation: &TxnMutation<'_>) {
900 match mutation {
904 TxnMutation {
905 mutation:
906 Mutation::ObjectStore {
907 0: ObjectStoreMutation { item: ObjectItem { key, .. }, op },
908 },
909 object_id: store_object_id,
910 ..
911 } => {
912 match &key.data {
913 ObjectKeyData::Attribute(..) => {
914 }
916 ObjectKeyData::Child { .. }
917 | ObjectKeyData::EncryptedChild(_)
918 | ObjectKeyData::EncryptedCasefoldChild(_)
919 | ObjectKeyData::CasefoldChild { .. }
920 | ObjectKeyData::LegacyCasefoldChild(_) => {
921 let id = key.object_id;
922 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
923 && !self.new_objects.contains(&(*store_object_id, id))
924 {
925 debug_assert!(
926 false,
927 "Not holding required lock for object {id} \
928 in store {store_object_id}"
929 );
930 error!(
931 "Not holding required lock for object {id} in store \
932 {store_object_id}"
933 )
934 }
935 }
936 ObjectKeyData::GraveyardEntry { .. } => {
937 }
939 ObjectKeyData::GraveyardAttributeEntry { .. } => {
940 }
942 ObjectKeyData::Keys => {
943 let id = key.object_id;
944 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
945 && !self.new_objects.contains(&(*store_object_id, id))
946 {
947 debug_assert!(
948 false,
949 "Not holding required lock for object {id} \
950 in store {store_object_id}"
951 );
952 error!(
953 "Not holding required lock for object {id} in store \
954 {store_object_id}"
955 )
956 }
957 }
958 ObjectKeyData::Object => match op {
959 Operation::Insert => {
961 self.new_objects.insert((*store_object_id, key.object_id));
962 }
963 Operation::Merge | Operation::ReplaceOrInsert => {
964 let id = key.object_id;
965 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
966 && !self.new_objects.contains(&(*store_object_id, id))
967 {
968 debug_assert!(
969 false,
970 "Not holding required lock for object {id} \
971 in store {store_object_id}"
972 );
973 error!(
974 "Not holding required lock for object {id} in store \
975 {store_object_id}"
976 )
977 }
978 }
979 },
980 ObjectKeyData::Project { project_id, property: ProjectProperty::Limit } => {
981 if !self.txn_locks.contains(&LockKey::ProjectId {
982 store_object_id: *store_object_id,
983 project_id: *project_id,
984 }) {
985 debug_assert!(
986 false,
987 "Not holding required lock for project limit id {project_id} \
988 in store {store_object_id}"
989 );
990 error!(
991 "Not holding required lock for project limit id {project_id} in \
992 store {store_object_id}"
993 )
994 }
995 }
996 ObjectKeyData::Project { property: ProjectProperty::Usage, .. } => match op {
997 Operation::Insert | Operation::ReplaceOrInsert => {
998 panic!(
999 "Project usage is all handled by merging deltas, no inserts or \
1000 replacements should be used"
1001 );
1002 }
1003 Operation::Merge => {}
1005 },
1006 ObjectKeyData::ExtendedAttribute { .. } => {
1007 let id = key.object_id;
1008 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
1009 && !self.new_objects.contains(&(*store_object_id, id))
1010 {
1011 debug_assert!(
1012 false,
1013 "Not holding required lock for object {id} \
1014 in store {store_object_id} while mutating extended attribute"
1015 );
1016 error!(
1017 "Not holding required lock for object {id} in store \
1018 {store_object_id} while mutating extended attribute"
1019 )
1020 }
1021 }
1022 }
1023 }
1024 TxnMutation { mutation: Mutation::DeleteVolume, object_id, .. } => {
1025 if !self.txn_locks.contains(&LockKey::flush(*object_id)) {
1026 debug_assert!(false, "Not holding required lock for DeleteVolume");
1027 error!("Not holding required lock for DeleteVolume");
1028 }
1029 }
1030 _ => {}
1031 }
1032 }
1033
1034 pub fn is_empty(&self) -> bool {
1036 self.mutations.is_empty()
1037 }
1038
1039 pub fn get_object_mutation(
1042 &self,
1043 store_object_id: u64,
1044 key: ObjectKey,
1045 ) -> Option<&ObjectStoreMutation> {
1046 if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
1047 self.mutations.get(&TxnMutation {
1048 object_id: store_object_id,
1049 mutation: Mutation::insert_object(key, ObjectValue::None),
1050 associated_object: AssocObj::None,
1051 })
1052 {
1053 Some(mutation)
1054 } else {
1055 None
1056 }
1057 }
1058
1059 pub async fn commit(mut self) -> Result<u64, Error> {
1061 debug!(txn:? = &self; "Commit");
1062 self.txn_guard.fs().clone().commit_transaction(&mut self, &mut |_| {}).await
1063 }
1064
1065 pub async fn commit_with_callback<R: Send>(
1068 mut self,
1069 f: impl FnOnce(u64) -> R + Send,
1070 ) -> Result<R, Error> {
1071 debug!(txn:? = &self; "Commit");
1072 let mut f = Some(f);
1075 let mut result = None;
1076 self.txn_guard
1077 .fs()
1078 .clone()
1079 .commit_transaction(&mut self, &mut |offset| {
1080 result = Some(f.take().unwrap()(offset));
1081 })
1082 .await?;
1083 Ok(result.unwrap())
1084 }
1085
1086 pub async fn commit_and_continue(&mut self) -> Result<(), Error> {
1089 debug!(txn:? = self; "Commit");
1090 self.txn_guard.fs().clone().commit_transaction(self, &mut |_| {}).await?;
1091 assert!(self.mutations.is_empty());
1092 self.txn_guard.fs().lock_manager().downgrade_locks(&self.txn_locks);
1093 Ok(())
1094 }
1095}
1096
1097impl Drop for Transaction<'_> {
1098 fn drop(&mut self) {
1099 debug!(txn:? = &self; "Drop");
1102 self.txn_guard.fs().clone().drop_transaction(self);
1103 }
1104}
1105
1106impl std::fmt::Debug for Transaction<'_> {
1107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1108 f.debug_struct("Transaction")
1109 .field("mutations", &self.mutations)
1110 .field("txn_locks", &self.txn_locks)
1111 .field("reservation", &self.allocator_reservation)
1112 .finish()
1113 }
1114}
1115
1116pub enum BorrowedOrOwned<'a, T> {
1117 Borrowed(&'a T),
1118 Owned(T),
1119}
1120
1121impl<T> Deref for BorrowedOrOwned<'_, T> {
1122 type Target = T;
1123
1124 fn deref(&self) -> &Self::Target {
1125 match self {
1126 BorrowedOrOwned::Borrowed(b) => b,
1127 BorrowedOrOwned::Owned(o) => &o,
1128 }
1129 }
1130}
1131
1132impl<'a, T> From<&'a T> for BorrowedOrOwned<'a, T> {
1133 fn from(value: &'a T) -> Self {
1134 BorrowedOrOwned::Borrowed(value)
1135 }
1136}
1137
1138impl<T> From<T> for BorrowedOrOwned<'_, T> {
1139 fn from(value: T) -> Self {
1140 BorrowedOrOwned::Owned(value)
1141 }
1142}
1143
1144pub struct LockManager {
1173 locks: Mutex<Locks>,
1174}
1175
1176struct Locks {
1177 keys: HashMap<LockKey, LockEntry>,
1178}
1179
1180impl Locks {
1181 fn drop_lock(&mut self, key: LockKey, state: LockState) {
1182 if let Entry::Occupied(mut occupied) = self.keys.entry(key) {
1183 let entry = occupied.get_mut();
1184 let wake = match state {
1185 LockState::ReadLock => {
1186 entry.read_count -= 1;
1187 entry.read_count == 0
1188 }
1189 LockState::Locked | LockState::WriteLock => {
1191 entry.state = LockState::ReadLock;
1192 true
1193 }
1194 };
1195 if wake {
1196 unsafe {
1198 entry.wake();
1199 }
1200 if entry.can_remove() {
1201 occupied.remove_entry();
1202 }
1203 }
1204 } else {
1205 unreachable!();
1206 }
1207 }
1208
1209 fn drop_read_locks(&mut self, lock_keys: LockKeys) {
1210 for lock in lock_keys.iter() {
1211 self.drop_lock(*lock, LockState::ReadLock);
1212 }
1213 }
1214
1215 fn drop_write_locks(&mut self, lock_keys: LockKeys) {
1216 for lock in lock_keys.iter() {
1217 self.drop_lock(*lock, LockState::WriteLock);
1220 }
1221 }
1222
1223 fn downgrade_locks(&mut self, lock_keys: &LockKeys) {
1225 for lock in lock_keys.iter() {
1226 unsafe {
1228 self.keys.get_mut(lock).unwrap().downgrade_lock();
1229 }
1230 }
1231 }
1232}
1233
1234#[derive(Debug)]
1235struct LockEntry {
1236 read_count: u64,
1239
1240 state: LockState,
1242
1243 head: *const LockWaker,
1248 tail: *const LockWaker,
1249}
1250
1251unsafe impl Send for LockEntry {}
1252
1253struct LockWaker {
1256 next: UnsafeCell<*const LockWaker>,
1258 prev: UnsafeCell<*const LockWaker>,
1259
1260 key: LockKey,
1263
1264 waker: UnsafeCell<WakerState>,
1266
1267 target_state: LockState,
1269
1270 is_upgrade: bool,
1272
1273 _pin: PhantomPinned,
1275}
1276
1277enum WakerState {
1278 Pending,
1280
1281 Registered(Waker),
1283
1284 Woken,
1286}
1287
1288impl WakerState {
1289 fn is_woken(&self) -> bool {
1290 matches!(self, WakerState::Woken)
1291 }
1292}
1293
1294unsafe impl Send for LockWaker {}
1295unsafe impl Sync for LockWaker {}
1296
1297impl LockWaker {
1298 async fn wait(&self, manager: &LockManager) {
1300 let waker_guard = scopeguard::guard((), |_| {
1302 let mut locks = manager.locks.lock();
1303 unsafe {
1305 if (*self.waker.get()).is_woken() {
1306 if self.is_upgrade {
1308 locks.keys.get_mut(&self.key).unwrap().downgrade_lock();
1309 } else {
1310 locks.drop_lock(self.key, self.target_state);
1311 }
1312 } else {
1313 locks.keys.get_mut(&self.key).unwrap().remove_waker(self);
1316 }
1317 }
1318 });
1319
1320 poll_fn(|cx| {
1321 let _locks = manager.locks.lock();
1322 unsafe {
1324 if (*self.waker.get()).is_woken() {
1325 Poll::Ready(())
1326 } else {
1327 *self.waker.get() = WakerState::Registered(cx.waker().clone());
1328 Poll::Pending
1329 }
1330 }
1331 })
1332 .await;
1333
1334 ScopeGuard::into_inner(waker_guard);
1335 }
1336}
1337
1338#[derive(Copy, Clone, Debug, PartialEq)]
1339enum LockState {
1340 ReadLock,
1342
1343 Locked,
1346
1347 WriteLock,
1349}
1350
1351impl LockManager {
1352 pub fn new() -> Self {
1353 LockManager { locks: Mutex::new(Locks { keys: HashMap::default() }) }
1354 }
1355
1356 pub async fn txn_lock<'a>(&'a self, lock_keys: LockKeys) -> TransactionLocks<'a> {
1360 TransactionLocks(
1361 debug_assert_not_too_long!(self.lock(lock_keys, LockState::Locked)).right().unwrap(),
1362 )
1363 }
1364
1365 async fn lock<'a>(
1368 &'a self,
1369 mut lock_keys: LockKeys,
1370 target_state: LockState,
1371 ) -> Either<ReadGuard<'a>, WriteGuard<'a>> {
1372 let mut guard = match &target_state {
1373 LockState::ReadLock => Left(ReadGuard {
1374 manager: self.into(),
1375 lock_keys: LockKeys::with_capacity(lock_keys.len()),
1376 }),
1377 LockState::Locked | LockState::WriteLock => Right(WriteGuard {
1378 manager: self.into(),
1379 lock_keys: LockKeys::with_capacity(lock_keys.len()),
1380 }),
1381 };
1382 let guard_keys = match &mut guard {
1383 Left(g) => &mut g.lock_keys,
1384 Right(g) => &mut g.lock_keys,
1385 };
1386 lock_keys.sort_unstable();
1387 lock_keys.dedup();
1388 for lock in lock_keys.iter() {
1389 let lock_waker = None;
1390 pin_mut!(lock_waker);
1391 {
1392 let mut locks = self.locks.lock();
1393 match locks.keys.entry(*lock) {
1394 Entry::Vacant(vacant) => {
1395 vacant.insert(LockEntry {
1396 read_count: if let LockState::ReadLock = target_state {
1397 guard_keys.push(*lock);
1398 1
1399 } else {
1400 guard_keys.push(*lock);
1401 0
1402 },
1403 state: target_state,
1404 head: std::ptr::null(),
1405 tail: std::ptr::null(),
1406 });
1407 }
1408 Entry::Occupied(mut occupied) => {
1409 let entry = occupied.get_mut();
1410 if unsafe { entry.is_allowed(target_state, entry.head.is_null()) } {
1412 if let LockState::ReadLock = target_state {
1413 entry.read_count += 1;
1414 guard_keys.push(*lock);
1415 } else {
1416 entry.state = target_state;
1417 guard_keys.push(*lock);
1418 }
1419 } else {
1420 unsafe {
1423 *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1424 next: UnsafeCell::new(std::ptr::null()),
1425 prev: UnsafeCell::new(entry.tail),
1426 key: *lock,
1427 waker: UnsafeCell::new(WakerState::Pending),
1428 target_state: target_state,
1429 is_upgrade: false,
1430 _pin: PhantomPinned,
1431 });
1432 }
1433 let waker = (*lock_waker).as_ref().unwrap();
1434 if entry.tail.is_null() {
1435 entry.head = waker;
1436 } else {
1437 unsafe {
1439 *(*entry.tail).next.get() = waker;
1440 }
1441 }
1442 entry.tail = waker;
1443 }
1444 }
1445 }
1446 }
1447 if let Some(waker) = &*lock_waker {
1448 waker.wait(self).await;
1449 guard_keys.push(*lock);
1450 }
1451 }
1452 guard
1453 }
1454
1455 pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
1457 let mut locks = self.locks.lock();
1458 locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks));
1459 }
1460
1461 pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
1463 self.commit_prepare_keys(&transaction.txn_locks).await;
1464 }
1465
1466 async fn commit_prepare_keys(&self, lock_keys: &LockKeys) {
1467 for lock in lock_keys.iter() {
1468 let lock_waker = None;
1469 pin_mut!(lock_waker);
1470 {
1471 let mut locks = self.locks.lock();
1472 let entry = locks.keys.get_mut(lock).unwrap();
1473 assert_eq!(entry.state, LockState::Locked);
1474
1475 if entry.read_count == 0 {
1476 entry.state = LockState::WriteLock;
1477 } else {
1478 unsafe {
1481 *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1482 next: UnsafeCell::new(entry.head),
1483 prev: UnsafeCell::new(std::ptr::null()),
1484 key: *lock,
1485 waker: UnsafeCell::new(WakerState::Pending),
1486 target_state: LockState::WriteLock,
1487 is_upgrade: true,
1488 _pin: PhantomPinned,
1489 });
1490 }
1491 let waker = (*lock_waker).as_ref().unwrap();
1492 if entry.head.is_null() {
1493 entry.tail = (*lock_waker).as_ref().unwrap();
1494 } else {
1495 unsafe {
1497 *(*entry.head).prev.get() = waker;
1498 }
1499 }
1500 entry.head = waker;
1501 }
1502 }
1503
1504 if let Some(waker) = &*lock_waker {
1505 waker.wait(self).await;
1506 }
1507 }
1508 }
1509
1510 pub async fn read_lock<'a>(&'a self, lock_keys: LockKeys) -> ReadGuard<'a> {
1516 debug_assert_not_too_long!(self.lock(lock_keys, LockState::ReadLock)).left().unwrap()
1517 }
1518
1519 pub async fn write_lock<'a>(&'a self, lock_keys: LockKeys) -> WriteGuard<'a> {
1522 debug_assert_not_too_long!(self.lock(lock_keys, LockState::WriteLock)).right().unwrap()
1523 }
1524
1525 pub fn downgrade_locks(&self, lock_keys: &LockKeys) {
1528 self.locks.lock().downgrade_locks(lock_keys);
1529 }
1530}
1531
1532impl LockEntry {
1534 unsafe fn wake(&mut self) {
1535 if self.head.is_null() || self.state == LockState::WriteLock {
1537 return;
1538 }
1539
1540 let waker = unsafe { &*self.head };
1541
1542 if waker.is_upgrade {
1543 if self.read_count > 0 {
1544 return;
1545 }
1546 } else if !unsafe { self.is_allowed(waker.target_state, true) } {
1547 return;
1548 }
1549
1550 unsafe { self.pop_and_wake() };
1551
1552 if waker.target_state == LockState::WriteLock {
1555 return;
1556 }
1557
1558 while !self.head.is_null() && unsafe { (*self.head).target_state } == LockState::ReadLock {
1559 unsafe { self.pop_and_wake() };
1560 }
1561 }
1562
1563 unsafe fn pop_and_wake(&mut self) {
1564 let waker = unsafe { &*self.head };
1565
1566 self.head = unsafe { *waker.next.get() };
1568 if self.head.is_null() {
1569 self.tail = std::ptr::null()
1570 } else {
1571 unsafe { *(*self.head).prev.get() = std::ptr::null() };
1572 }
1573
1574 if waker.target_state == LockState::ReadLock {
1576 self.read_count += 1;
1577 } else {
1578 self.state = waker.target_state;
1579 }
1580
1581 if let WakerState::Registered(waker) =
1583 std::mem::replace(unsafe { &mut *waker.waker.get() }, WakerState::Woken)
1584 {
1585 waker.wake();
1586 }
1587 }
1588
1589 fn can_remove(&self) -> bool {
1590 self.state == LockState::ReadLock && self.read_count == 0
1591 }
1592
1593 unsafe fn remove_waker(&mut self, waker: &LockWaker) {
1594 unsafe {
1595 let is_first = (*waker.prev.get()).is_null();
1596 if is_first {
1597 self.head = *waker.next.get();
1598 } else {
1599 *(**waker.prev.get()).next.get() = *waker.next.get();
1600 }
1601 if (*waker.next.get()).is_null() {
1602 self.tail = *waker.prev.get();
1603 } else {
1604 *(**waker.next.get()).prev.get() = *waker.prev.get();
1605 }
1606 if is_first {
1607 self.wake();
1610 }
1611 }
1612 }
1613
1614 unsafe fn is_allowed(&self, target_state: LockState, is_head: bool) -> bool {
1618 match self.state {
1619 LockState::ReadLock => {
1620 (self.read_count == 0
1622 || target_state == LockState::Locked
1623 || target_state == LockState::ReadLock)
1624 && is_head
1625 }
1626 LockState::Locked => {
1627 target_state == LockState::ReadLock
1631 && (is_head || unsafe { !(*self.head).is_upgrade })
1632 }
1633 LockState::WriteLock => false,
1634 }
1635 }
1636
1637 unsafe fn downgrade_lock(&mut self) {
1638 assert_eq!(std::mem::replace(&mut self.state, LockState::Locked), LockState::WriteLock);
1639 unsafe { self.wake() };
1640 }
1641}
1642
1643#[must_use]
1644pub struct ReadGuard<'a> {
1645 manager: LockManagerRef<'a>,
1646 lock_keys: LockKeys,
1647}
1648
1649impl ReadGuard<'_> {
1650 pub fn fs(&self) -> Option<&Arc<FxFilesystem>> {
1651 if let LockManagerRef::Owned(fs) = &self.manager { Some(fs) } else { None }
1652 }
1653
1654 pub fn into_owned(mut self, fs: Arc<FxFilesystem>) -> ReadGuard<'static> {
1655 ReadGuard {
1656 manager: LockManagerRef::Owned(fs),
1657 lock_keys: std::mem::replace(&mut self.lock_keys, LockKeys::None),
1658 }
1659 }
1660}
1661
1662impl Drop for ReadGuard<'_> {
1663 fn drop(&mut self) {
1664 let mut locks = self.manager.locks.lock();
1665 locks.drop_read_locks(std::mem::take(&mut self.lock_keys));
1666 }
1667}
1668
1669impl fmt::Debug for ReadGuard<'_> {
1670 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1671 f.debug_struct("ReadGuard")
1672 .field("manager", &(&self.manager as *const _))
1673 .field("lock_keys", &self.lock_keys)
1674 .finish()
1675 }
1676}
1677
1678#[must_use]
1679pub struct WriteGuard<'a> {
1680 manager: LockManagerRef<'a>,
1681 lock_keys: LockKeys,
1682}
1683
1684impl Drop for WriteGuard<'_> {
1685 fn drop(&mut self) {
1686 let mut locks = self.manager.locks.lock();
1687 locks.drop_write_locks(std::mem::take(&mut self.lock_keys));
1688 }
1689}
1690
1691impl fmt::Debug for WriteGuard<'_> {
1692 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1693 f.debug_struct("WriteGuard")
1694 .field("manager", &(&self.manager as *const _))
1695 .field("lock_keys", &self.lock_keys)
1696 .finish()
1697 }
1698}
1699
1700enum LockManagerRef<'a> {
1701 Borrowed(&'a LockManager),
1702 Owned(Arc<FxFilesystem>),
1703}
1704
1705impl Deref for LockManagerRef<'_> {
1706 type Target = LockManager;
1707
1708 fn deref(&self) -> &Self::Target {
1709 match self {
1710 LockManagerRef::Borrowed(m) => m,
1711 LockManagerRef::Owned(f) => f.lock_manager(),
1712 }
1713 }
1714}
1715
1716impl<'a> From<&'a LockManager> for LockManagerRef<'a> {
1717 fn from(value: &'a LockManager) -> Self {
1718 LockManagerRef::Borrowed(value)
1719 }
1720}
1721
1722#[cfg(test)]
1723mod tests {
1724 use super::{LockKey, LockKeys, LockManager, LockState, Mutation, Options};
1725 use crate::filesystem::FxFilesystem;
1726 use fuchsia_async as fasync;
1727 use fuchsia_sync::Mutex;
1728 use futures::channel::oneshot::channel;
1729 use futures::future::FutureExt;
1730 use futures::stream::FuturesUnordered;
1731 use futures::{StreamExt, join, pin_mut};
1732 use std::task::Poll;
1733 use std::time::Duration;
1734 use storage_device::DeviceHolder;
1735 use storage_device::fake_device::FakeDevice;
1736
1737 #[fuchsia::test]
1738 async fn test_simple() {
1739 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1740 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1741 let mut t = fs
1742 .clone()
1743 .new_transaction(lock_keys![], Options::default())
1744 .await
1745 .expect("new_transaction failed");
1746 t.add(1, Mutation::BeginFlush);
1747 assert!(!t.is_empty());
1748 }
1749
1750 #[fuchsia::test]
1751 async fn test_locks() {
1752 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1753 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1754 let (send1, recv1) = channel();
1755 let (send2, recv2) = channel();
1756 let (send3, recv3) = channel();
1757 let done = Mutex::new(false);
1758 let mut futures = FuturesUnordered::new();
1759 futures.push(
1760 async {
1761 let _t = fs
1762 .clone()
1763 .new_transaction(
1764 lock_keys![LockKey::object_attribute(1, 2, 3)],
1765 Options::default(),
1766 )
1767 .await
1768 .expect("new_transaction failed");
1769 send1.send(()).unwrap(); send3.send(()).unwrap(); recv2.await.unwrap();
1772 fasync::Timer::new(Duration::from_millis(100)).await;
1774 assert!(!*done.lock());
1775 }
1776 .boxed(),
1777 );
1778 futures.push(
1779 async {
1780 recv1.await.unwrap();
1781 let _t = fs
1783 .clone()
1784 .new_transaction(
1785 lock_keys![LockKey::object_attribute(2, 2, 3)],
1786 Options::default(),
1787 )
1788 .await
1789 .expect("new_transaction failed");
1790 send2.send(()).unwrap();
1792 }
1793 .boxed(),
1794 );
1795 futures.push(
1796 async {
1797 recv3.await.unwrap();
1799 let _t = fs
1800 .clone()
1801 .new_transaction(
1802 lock_keys![LockKey::object_attribute(1, 2, 3)],
1803 Options::default(),
1804 )
1805 .await;
1806 *done.lock() = true;
1807 }
1808 .boxed(),
1809 );
1810 while let Some(()) = futures.next().await {}
1811 }
1812
1813 #[fuchsia::test]
1814 async fn test_read_lock_after_write_lock() {
1815 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1816 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1817 let (send1, recv1) = channel();
1818 let (send2, recv2) = channel();
1819 let done = Mutex::new(false);
1820 join!(
1821 async {
1822 let t = fs
1823 .clone()
1824 .new_transaction(
1825 lock_keys![LockKey::object_attribute(1, 2, 3)],
1826 Options::default(),
1827 )
1828 .await
1829 .expect("new_transaction failed");
1830 send1.send(()).unwrap(); recv2.await.unwrap();
1832 t.commit().await.expect("commit failed");
1833 *done.lock() = true;
1834 },
1835 async {
1836 recv1.await.unwrap();
1837 let _guard = fs
1839 .lock_manager()
1840 .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1841 .await;
1842 send2.send(()).unwrap();
1844 fasync::Timer::new(Duration::from_millis(100)).await;
1847 assert!(!*done.lock());
1848 },
1849 );
1850 }
1851
1852 #[fuchsia::test]
1853 async fn test_write_lock_after_read_lock() {
1854 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1855 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1856 let (send1, recv1) = channel();
1857 let (send2, recv2) = channel();
1858 let done = Mutex::new(false);
1859 join!(
1860 async {
1861 let _guard = fs
1863 .lock_manager()
1864 .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1865 .await;
1866 send1.send(()).unwrap();
1868 recv2.await.unwrap();
1869 fasync::Timer::new(Duration::from_millis(100)).await;
1872 assert!(!*done.lock());
1873 },
1874 async {
1875 recv1.await.unwrap();
1876 let t = fs
1877 .clone()
1878 .new_transaction(
1879 lock_keys![LockKey::object_attribute(1, 2, 3)],
1880 Options::default(),
1881 )
1882 .await
1883 .expect("new_transaction failed");
1884 send2.send(()).unwrap(); t.commit().await.expect("commit failed");
1886 *done.lock() = true;
1887 },
1888 );
1889 }
1890
1891 #[fuchsia::test]
1892 async fn test_drop_uncommitted_transaction() {
1893 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1894 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1895 let key = lock_keys![LockKey::object(1, 1)];
1896
1897 {
1899 let _write_lock = fs
1900 .clone()
1901 .new_transaction(key.clone(), Options::default())
1902 .await
1903 .expect("new_transaction failed");
1904 let _read_lock = fs.lock_manager().read_lock(key.clone()).await;
1905 }
1906 {
1908 let _write_lock = fs
1909 .clone()
1910 .new_transaction(key.clone(), Options::default())
1911 .await
1912 .expect("new_transaction failed");
1913 }
1914 fs.clone()
1916 .new_transaction(key.clone(), Options::default())
1917 .await
1918 .expect("new_transaction failed");
1919 }
1920
1921 #[fuchsia::test]
1922 async fn test_drop_waiting_write_lock() {
1923 let manager = LockManager::new();
1924 let keys = lock_keys![LockKey::object(1, 1)];
1925 {
1926 let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1927 if let Poll::Ready(_) =
1928 futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1929 {
1930 assert!(false);
1931 }
1932 }
1933 let _ = manager.lock(keys, LockState::WriteLock).await;
1934 }
1935
1936 #[fuchsia::test]
1937 async fn test_write_lock_blocks_everything() {
1938 let manager = LockManager::new();
1939 let keys = lock_keys![LockKey::object(1, 1)];
1940 {
1941 let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1942 if let Poll::Ready(_) =
1943 futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1944 {
1945 assert!(false);
1946 }
1947 if let Poll::Ready(_) =
1948 futures::poll!(manager.lock(keys.clone(), LockState::ReadLock).boxed())
1949 {
1950 assert!(false);
1951 }
1952 }
1953 {
1954 let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1955 }
1956 {
1957 let _guard = manager.lock(keys, LockState::ReadLock).await;
1958 }
1959 }
1960
1961 #[fuchsia::test]
1962 async fn test_downgrade_locks() {
1963 let manager = LockManager::new();
1964 let keys = lock_keys![LockKey::object(1, 1)];
1965 let _guard = manager.txn_lock(keys.clone()).await;
1966 manager.commit_prepare_keys(&keys).await;
1967
1968 let mut read_lock: FuturesUnordered<_> =
1970 std::iter::once(manager.read_lock(keys.clone())).collect();
1971
1972 assert!(futures::poll!(read_lock.next()).is_pending());
1974
1975 manager.downgrade_locks(&keys);
1976
1977 assert!(futures::poll!(read_lock.next()).is_ready());
1979 }
1980
1981 #[fuchsia::test]
1982 async fn test_dropped_write_lock_wakes() {
1983 let manager = LockManager::new();
1984 let keys = lock_keys![LockKey::object(1, 1)];
1985 let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1986 let mut read_lock = FuturesUnordered::new();
1987 read_lock.push(manager.lock(keys.clone(), LockState::ReadLock));
1988
1989 {
1990 let write_lock = manager.lock(keys, LockState::WriteLock);
1991 pin_mut!(write_lock);
1992
1993 assert!(futures::poll!(write_lock).is_pending());
1995
1996 assert!(futures::poll!(read_lock.next()).is_pending());
1998 }
1999
2000 assert!(futures::poll!(read_lock.next()).is_ready());
2002 }
2003
2004 #[fuchsia::test]
2005 async fn test_drop_upgrade() {
2006 let manager = LockManager::new();
2007 let keys = lock_keys![LockKey::object(1, 1)];
2008 let _guard = manager.lock(keys.clone(), LockState::Locked).await;
2009
2010 {
2011 let commit_prepare = manager.commit_prepare_keys(&keys);
2012 pin_mut!(commit_prepare);
2013 let _read_guard = manager.lock(keys.clone(), LockState::ReadLock).await;
2014 assert!(futures::poll!(commit_prepare).is_pending());
2015
2016 }
2019
2020 manager.commit_prepare_keys(&keys).await;
2022 }
2023
2024 #[fasync::run_singlethreaded(test)]
2025 async fn test_woken_upgrade_blocks_reads() {
2026 let manager = LockManager::new();
2027 let keys = lock_keys![LockKey::object(1, 1)];
2028 let guard = manager.lock(keys.clone(), LockState::Locked).await;
2030
2031 let read1 = manager.lock(keys.clone(), LockState::ReadLock).await;
2033
2034 let commit_prepare = manager.commit_prepare_keys(&keys);
2036 pin_mut!(commit_prepare);
2037 assert!(futures::poll!(commit_prepare.as_mut()).is_pending());
2038
2039 let read2 = manager.lock(keys.clone(), LockState::ReadLock);
2041 pin_mut!(read2);
2042 assert!(futures::poll!(read2.as_mut()).is_pending());
2043
2044 std::mem::drop(read1);
2046 assert!(futures::poll!(commit_prepare).is_ready());
2047
2048 assert!(futures::poll!(read2.as_mut()).is_pending());
2050
2051 std::mem::drop(guard);
2053 assert!(futures::poll!(read2).is_ready());
2054 }
2055
2056 static LOCK_KEY_1: LockKey = LockKey::flush(1);
2057 static LOCK_KEY_2: LockKey = LockKey::flush(2);
2058 static LOCK_KEY_3: LockKey = LockKey::flush(3);
2059
2060 fn assert_lock_keys_equal(value: &LockKeys, expected: &LockKeys) {
2062 match (value, expected) {
2063 (LockKeys::None, LockKeys::None) => {}
2064 (LockKeys::Inline(key1), LockKeys::Inline(key2)) => {
2065 if key1 != key2 {
2066 panic!("{key1:?} != {key2:?}");
2067 }
2068 }
2069 (LockKeys::Vec(vec1), LockKeys::Vec(vec2)) => {
2070 if vec1 != vec2 {
2071 panic!("{vec1:?} != {vec2:?}");
2072 }
2073 if vec1.capacity() != vec2.capacity() {
2074 panic!(
2075 "LockKeys have different capacity: {} != {}",
2076 vec1.capacity(),
2077 vec2.capacity()
2078 );
2079 }
2080 }
2081 (_, _) => panic!("{value:?} != {expected:?}"),
2082 }
2083 }
2084
2085 fn assert_lock_keys_equivalent(value: &LockKeys, expected: &LockKeys) {
2087 let value: Vec<_> = value.iter().collect();
2088 let expected: Vec<_> = expected.iter().collect();
2089 assert_eq!(value, expected);
2090 }
2091
2092 #[test]
2093 fn test_lock_keys_macro() {
2094 assert_lock_keys_equal(&lock_keys![], &LockKeys::None);
2095 assert_lock_keys_equal(&lock_keys![LOCK_KEY_1], &LockKeys::Inline(LOCK_KEY_1));
2096 assert_lock_keys_equal(
2097 &lock_keys![LOCK_KEY_1, LOCK_KEY_2],
2098 &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]),
2099 );
2100 }
2101
2102 #[test]
2103 fn test_lock_keys_with_capacity() {
2104 assert_lock_keys_equal(&LockKeys::with_capacity(0), &LockKeys::None);
2105 assert_lock_keys_equal(&LockKeys::with_capacity(1), &LockKeys::None);
2106 assert_lock_keys_equal(&LockKeys::with_capacity(2), &LockKeys::Vec(Vec::with_capacity(2)));
2107 }
2108
2109 #[test]
2110 fn test_lock_keys_len() {
2111 assert_eq!(lock_keys![].len(), 0);
2112 assert_eq!(lock_keys![LOCK_KEY_1].len(), 1);
2113 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].len(), 2);
2114 }
2115
2116 #[test]
2117 fn test_lock_keys_contains() {
2118 assert_eq!(lock_keys![].contains(&LOCK_KEY_1), false);
2119 assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_1), true);
2120 assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_2), false);
2121 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_1), true);
2122 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_2), true);
2123 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_3), false);
2124 }
2125
2126 #[test]
2127 fn test_lock_keys_push() {
2128 let mut keys = lock_keys![];
2129 keys.push(LOCK_KEY_1);
2130 assert_lock_keys_equal(&keys, &LockKeys::Inline(LOCK_KEY_1));
2131 keys.push(LOCK_KEY_2);
2132 assert_lock_keys_equal(&keys, &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]));
2133 keys.push(LOCK_KEY_3);
2134 assert_lock_keys_equivalent(
2135 &keys,
2136 &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2, LOCK_KEY_3]),
2137 );
2138 }
2139
2140 #[test]
2141 fn test_lock_keys_sort_unstable() {
2142 let mut keys = lock_keys![];
2143 keys.sort_unstable();
2144 assert_lock_keys_equal(&keys, &lock_keys![]);
2145
2146 let mut keys = lock_keys![LOCK_KEY_1];
2147 keys.sort_unstable();
2148 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2149
2150 let mut keys = lock_keys![LOCK_KEY_2, LOCK_KEY_1];
2151 keys.sort_unstable();
2152 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2153 }
2154
2155 #[test]
2156 fn test_lock_keys_dedup() {
2157 let mut keys = lock_keys![];
2158 keys.dedup();
2159 assert_lock_keys_equal(&keys, &lock_keys![]);
2160
2161 let mut keys = lock_keys![LOCK_KEY_1];
2162 keys.dedup();
2163 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2164
2165 let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_1];
2166 keys.dedup();
2167 assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2168 }
2169
2170 #[test]
2171 fn test_lock_keys_truncate() {
2172 let mut keys = lock_keys![];
2173 keys.truncate(5);
2174 assert_lock_keys_equal(&keys, &lock_keys![]);
2175 keys.truncate(0);
2176 assert_lock_keys_equal(&keys, &lock_keys![]);
2177
2178 let mut keys = lock_keys![LOCK_KEY_1];
2179 keys.truncate(5);
2180 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2181 keys.truncate(0);
2182 assert_lock_keys_equal(&keys, &lock_keys![]);
2183
2184 let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_2];
2185 keys.truncate(5);
2186 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2187 keys.truncate(1);
2188 assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2190 }
2191
2192 #[test]
2193 fn test_lock_keys_iter() {
2194 assert_eq!(lock_keys![].iter().collect::<Vec<_>>(), Vec::<&LockKey>::new());
2195
2196 assert_eq!(lock_keys![LOCK_KEY_1].iter().collect::<Vec<_>>(), vec![&LOCK_KEY_1]);
2197
2198 assert_eq!(
2199 lock_keys![LOCK_KEY_1, LOCK_KEY_2].iter().collect::<Vec<_>>(),
2200 vec![&LOCK_KEY_1, &LOCK_KEY_2]
2201 );
2202 }
2203}