1use crate::checksum::Checksum;
6use crate::filesystem::{FxFilesystem, TxnGuard};
7use crate::log::*;
8use crate::lsm_tree::types::Item;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::AttributeKey;
11use crate::object_store::allocator::{AllocatorItem, Reservation};
12use crate::object_store::object_manager::{ObjectManager, reserved_space_from_journal_usage};
13use crate::object_store::object_record::{
14 FxfsKey, FxfsKeyV40, FxfsKeyV49, ObjectItem, ObjectItemV40, ObjectItemV41, ObjectItemV43,
15 ObjectItemV46, ObjectItemV47, ObjectItemV49, ObjectItemV50, ObjectItemV55, ObjectKey,
16 ObjectKeyData, ObjectValue, ProjectProperty,
17};
18use crate::serialized_types::{Migrate, Versioned, migrate_nodefault, migrate_to_version};
19use anyhow::Error;
20use either::{Either, Left, Right};
21use fprint::TypeFingerprint;
22use fuchsia_sync::Mutex;
23use futures::future::poll_fn;
24use futures::pin_mut;
25use rustc_hash::FxHashMap as HashMap;
26use scopeguard::ScopeGuard;
27use serde::{Deserialize, Serialize};
28use std::cell::UnsafeCell;
29use std::cmp::Ordering;
30use std::collections::BTreeSet;
31use std::collections::hash_map::Entry;
32use std::marker::PhantomPinned;
33use std::ops::{Deref, DerefMut, Range};
34use std::sync::Arc;
35use std::task::{Poll, Waker};
36use std::{fmt, mem};
37
38#[derive(Clone, Copy, Default)]
42pub struct Options<'a> {
43 pub skip_journal_checks: bool,
46
47 pub borrow_metadata_space: bool,
52
53 pub allocator_reservation: Option<&'a Reservation>,
58
59 pub txn_guard: Option<&'a TxnGuard<'a>>,
61}
62
63pub const TRANSACTION_MAX_JOURNAL_USAGE: u64 = 24_576;
70pub const TRANSACTION_METADATA_MAX_AMOUNT: u64 =
71 reserved_space_from_journal_usage(TRANSACTION_MAX_JOURNAL_USAGE);
72
73#[must_use]
74pub struct TransactionLocks<'a>(pub WriteGuard<'a>);
75
76pub type Mutation = MutationV55;
81
82#[derive(
83 Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned,
84)]
85#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
86pub enum MutationV55 {
87 ObjectStore(ObjectStoreMutationV55),
88 EncryptedObjectStore(#[serde(with = "crate::zerocopy_serialization")] Box<[u8]>),
89 Allocator(AllocatorMutationV32),
90 BeginFlush,
92 EndFlush,
95 DeleteVolume,
97 UpdateBorrowed(u64),
98 UpdateMutationsKey(UpdateMutationsKey),
99 CreateInternalDir(u64),
100}
101
102#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
103#[migrate_to_version(MutationV55)]
104#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
105pub enum MutationV54 {
106 ObjectStore(ObjectStoreMutationV54),
107 EncryptedObjectStore(#[serde(with = "crate::zerocopy_serialization")] Box<[u8]>),
108 Allocator(AllocatorMutationV32),
109 BeginFlush,
110 EndFlush,
111 DeleteVolume,
112 UpdateBorrowed(u64),
113 UpdateMutationsKey(UpdateMutationsKey),
114 CreateInternalDir(u64),
115}
116
117#[derive(
118 Migrate, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned,
119)]
120#[migrate_to_version(MutationV54)]
121pub enum MutationV50 {
122 ObjectStore(ObjectStoreMutationV50),
123 EncryptedObjectStore(#[serde(with = "crate::zerocopy_serialization")] Box<[u8]>),
124 Allocator(AllocatorMutationV32),
125 BeginFlush,
126 EndFlush,
127 DeleteVolume,
128 UpdateBorrowed(u64),
129 UpdateMutationsKey(UpdateMutationsKeyV49),
130 CreateInternalDir(u64),
131}
132
133#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
134#[migrate_to_version(MutationV50)]
135pub enum MutationV49 {
136 ObjectStore(ObjectStoreMutationV49),
137 EncryptedObjectStore(Box<[u8]>),
138 Allocator(AllocatorMutationV32),
139 BeginFlush,
140 EndFlush,
141 DeleteVolume,
142 UpdateBorrowed(u64),
143 UpdateMutationsKey(UpdateMutationsKeyV49),
144 CreateInternalDir(u64),
145}
146
147#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
148#[migrate_to_version(MutationV49)]
149pub enum MutationV47 {
150 ObjectStore(ObjectStoreMutationV47),
151 EncryptedObjectStore(Box<[u8]>),
152 Allocator(AllocatorMutationV32),
153 BeginFlush,
154 EndFlush,
155 DeleteVolume,
156 UpdateBorrowed(u64),
157 UpdateMutationsKey(UpdateMutationsKeyV40),
158 CreateInternalDir(u64),
159}
160
161#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
162#[migrate_to_version(MutationV47)]
163pub enum MutationV46 {
164 ObjectStore(ObjectStoreMutationV46),
165 EncryptedObjectStore(Box<[u8]>),
166 Allocator(AllocatorMutationV32),
167 BeginFlush,
168 EndFlush,
169 DeleteVolume,
170 UpdateBorrowed(u64),
171 UpdateMutationsKey(UpdateMutationsKeyV40),
172 CreateInternalDir(u64),
173}
174
175#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
176#[migrate_to_version(MutationV46)]
177pub enum MutationV43 {
178 ObjectStore(ObjectStoreMutationV43),
179 EncryptedObjectStore(Box<[u8]>),
180 Allocator(AllocatorMutationV32),
181 BeginFlush,
182 EndFlush,
183 DeleteVolume,
184 UpdateBorrowed(u64),
185 UpdateMutationsKey(UpdateMutationsKeyV40),
186 CreateInternalDir(u64),
187}
188
189#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
190#[migrate_to_version(MutationV43)]
191pub enum MutationV41 {
192 ObjectStore(ObjectStoreMutationV41),
193 EncryptedObjectStore(Box<[u8]>),
194 Allocator(AllocatorMutationV32),
195 BeginFlush,
196 EndFlush,
197 DeleteVolume,
198 UpdateBorrowed(u64),
199 UpdateMutationsKey(UpdateMutationsKeyV40),
200 CreateInternalDir(u64),
201}
202
203#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
204#[migrate_to_version(MutationV41)]
205pub enum MutationV40 {
206 ObjectStore(ObjectStoreMutationV40),
207 EncryptedObjectStore(Box<[u8]>),
208 Allocator(AllocatorMutationV32),
209 BeginFlush,
210 EndFlush,
211 DeleteVolume,
212 UpdateBorrowed(u64),
213 UpdateMutationsKey(UpdateMutationsKeyV40),
214 CreateInternalDir(u64),
215}
216
217impl Mutation {
218 pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
219 Mutation::ObjectStore(ObjectStoreMutation {
220 item: Item::new(key, value),
221 op: Operation::Insert,
222 })
223 }
224
225 pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
226 Mutation::ObjectStore(ObjectStoreMutation {
227 item: Item::new(key, value),
228 op: Operation::ReplaceOrInsert,
229 })
230 }
231
232 pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
233 Mutation::ObjectStore(ObjectStoreMutation {
234 item: Item::new(key, value),
235 op: Operation::Merge,
236 })
237 }
238
239 pub fn update_mutations_key(key: FxfsKey) -> Self {
240 Mutation::UpdateMutationsKey(key.into())
241 }
242}
243
244pub type ObjectStoreMutation = ObjectStoreMutationV55;
248
249#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
250#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
251pub struct ObjectStoreMutationV55 {
252 pub item: ObjectItemV55,
253 pub op: Operation,
254}
255
256#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
257#[migrate_to_version(ObjectStoreMutationV55)]
258#[migrate_nodefault]
259#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
260pub struct ObjectStoreMutationV54 {
261 pub item: crate::object_store::object_record::ObjectItemV54,
262 pub op: Operation,
263}
264
265#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
266#[migrate_to_version(ObjectStoreMutationV54)]
267#[migrate_nodefault]
268#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
269pub struct ObjectStoreMutationV50 {
270 pub item: ObjectItemV50,
271 pub op: OperationV32,
272}
273
274#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
275#[migrate_to_version(ObjectStoreMutationV50)]
276#[migrate_nodefault]
277pub struct ObjectStoreMutationV49 {
278 pub item: ObjectItemV49,
279 pub op: OperationV32,
280}
281
282#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
283#[migrate_to_version(ObjectStoreMutationV49)]
284#[migrate_nodefault]
285pub struct ObjectStoreMutationV47 {
286 pub item: ObjectItemV47,
287 pub op: OperationV32,
288}
289
290#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
291#[migrate_to_version(ObjectStoreMutationV47)]
292#[migrate_nodefault]
293pub struct ObjectStoreMutationV46 {
294 pub item: ObjectItemV46,
295 pub op: OperationV32,
296}
297
298#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
299#[migrate_to_version(ObjectStoreMutationV46)]
300#[migrate_nodefault]
301pub struct ObjectStoreMutationV43 {
302 pub item: ObjectItemV43,
303 pub op: OperationV32,
304}
305
306#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
307#[migrate_to_version(ObjectStoreMutationV43)]
308#[migrate_nodefault]
309pub struct ObjectStoreMutationV41 {
310 pub item: ObjectItemV41,
311 pub op: OperationV32,
312}
313
314#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
315#[migrate_nodefault]
316#[migrate_to_version(ObjectStoreMutationV41)]
317pub struct ObjectStoreMutationV40 {
318 pub item: ObjectItemV40,
319 pub op: OperationV32,
320}
321
322pub type Operation = OperationV32;
324
325#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
326#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
327pub enum OperationV32 {
328 Insert,
329 ReplaceOrInsert,
330 Merge,
331}
332
333impl Ord for ObjectStoreMutation {
334 fn cmp(&self, other: &Self) -> Ordering {
335 self.item.key.cmp(&other.item.key)
336 }
337}
338
339impl PartialOrd for ObjectStoreMutation {
340 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
341 Some(self.cmp(other))
342 }
343}
344
345impl PartialEq for ObjectStoreMutation {
346 fn eq(&self, other: &Self) -> bool {
347 self.item.key.eq(&other.item.key)
348 }
349}
350
351impl Eq for ObjectStoreMutation {}
352
353impl Ord for ObjectStoreMutationV50 {
354 fn cmp(&self, other: &Self) -> Ordering {
355 self.item.key.cmp(&other.item.key)
356 }
357}
358
359impl PartialOrd for ObjectStoreMutationV50 {
360 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
361 Some(self.cmp(other))
362 }
363}
364
365impl PartialEq for ObjectStoreMutationV50 {
366 fn eq(&self, other: &Self) -> bool {
367 self.item.key.eq(&other.item.key)
368 }
369}
370
371impl Eq for ObjectStoreMutationV50 {}
372
373impl Ord for AllocatorItem {
374 fn cmp(&self, other: &Self) -> Ordering {
375 self.key.cmp(&other.key)
376 }
377}
378
379impl PartialOrd for AllocatorItem {
380 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
381 Some(self.cmp(other))
382 }
383}
384
385pub type DeviceRange = DeviceRangeV32;
388
389#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypeFingerprint)]
390#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
391pub struct DeviceRangeV32(pub Range<u64>);
392
393impl Deref for DeviceRange {
394 type Target = Range<u64>;
395
396 fn deref(&self) -> &Self::Target {
397 &self.0
398 }
399}
400
401impl DerefMut for DeviceRange {
402 fn deref_mut(&mut self) -> &mut Self::Target {
403 &mut self.0
404 }
405}
406
407impl From<Range<u64>> for DeviceRange {
408 fn from(range: Range<u64>) -> Self {
409 Self(range)
410 }
411}
412
413impl Into<Range<u64>> for DeviceRange {
414 fn into(self) -> Range<u64> {
415 self.0
416 }
417}
418
419impl Ord for DeviceRange {
420 fn cmp(&self, other: &Self) -> Ordering {
421 self.start.cmp(&other.start).then(self.end.cmp(&other.end))
422 }
423}
424
425impl PartialOrd for DeviceRange {
426 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
427 Some(self.cmp(other))
428 }
429}
430
431pub type AllocatorMutation = AllocatorMutationV32;
432
433#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint)]
434#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
435pub enum AllocatorMutationV32 {
436 Allocate {
437 device_range: DeviceRangeV32,
438 owner_object_id: u64,
439 },
440 Deallocate {
441 device_range: DeviceRangeV32,
442 owner_object_id: u64,
443 },
444 SetLimit {
445 owner_object_id: u64,
446 bytes: u64,
447 },
448 MarkForDeletion(u64),
454}
455
456pub type UpdateMutationsKey = UpdateMutationsKeyV49;
457
458#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
459pub struct UpdateMutationsKeyV49(pub FxfsKeyV49);
460
461#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
462#[migrate_to_version(UpdateMutationsKeyV49)]
463pub struct UpdateMutationsKeyV40(pub FxfsKeyV40);
464
465impl From<UpdateMutationsKey> for FxfsKey {
466 fn from(outer: UpdateMutationsKey) -> Self {
467 outer.0
468 }
469}
470
471impl From<FxfsKey> for UpdateMutationsKey {
472 fn from(inner: FxfsKey) -> Self {
473 Self(inner)
474 }
475}
476
477#[cfg(fuzz)]
478impl<'a> arbitrary::Arbitrary<'a> for UpdateMutationsKey {
479 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
480 Ok(UpdateMutationsKey::from(FxfsKey::arbitrary(u).unwrap()))
481 }
482}
483
484impl Ord for UpdateMutationsKey {
485 fn cmp(&self, other: &Self) -> Ordering {
486 (self as *const UpdateMutationsKey).cmp(&(other as *const _))
487 }
488}
489
490impl PartialOrd for UpdateMutationsKey {
491 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
492 Some(self.cmp(other))
493 }
494}
495
496impl Eq for UpdateMutationsKey {}
497
498impl PartialEq for UpdateMutationsKey {
499 fn eq(&self, other: &Self) -> bool {
500 std::ptr::eq(self, other)
501 }
502}
503
504#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Copy)]
514pub enum LockKey {
515 Filesystem,
517
518 Flush {
520 object_id: u64,
521 },
522
523 ObjectAttribute {
525 store_object_id: u64,
526 object_id: u64,
527 attribute_id: u64,
528 },
529
530 Object {
532 store_object_id: u64,
533 object_id: u64,
534 },
535
536 ProjectId {
537 store_object_id: u64,
538 project_id: u64,
539 },
540
541 Truncate {
543 store_object_id: u64,
544 object_id: u64,
545 },
546
547 InternalDirectory {
549 store_object_id: u64,
550 },
551}
552
553impl LockKey {
554 pub const fn object_attribute(store_object_id: u64, object_id: u64, attribute_id: u64) -> Self {
555 LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
556 }
557
558 pub const fn object(store_object_id: u64, object_id: u64) -> Self {
559 LockKey::Object { store_object_id, object_id }
560 }
561
562 pub const fn flush(object_id: u64) -> Self {
563 LockKey::Flush { object_id }
564 }
565
566 pub const fn truncate(store_object_id: u64, object_id: u64) -> Self {
567 LockKey::Truncate { store_object_id, object_id }
568 }
569}
570
571#[derive(Clone, Debug)]
573pub enum LockKeys {
574 None,
575 Inline(LockKey),
576 Vec(Vec<LockKey>),
577}
578
579impl LockKeys {
580 pub fn with_capacity(capacity: usize) -> Self {
581 if capacity > 1 { LockKeys::Vec(Vec::with_capacity(capacity)) } else { LockKeys::None }
582 }
583
584 pub fn push(&mut self, key: LockKey) {
585 match self {
586 Self::None => *self = LockKeys::Inline(key),
587 Self::Inline(inline) => {
588 *self = LockKeys::Vec(vec![*inline, key]);
589 }
590 Self::Vec(vec) => vec.push(key),
591 }
592 }
593
594 pub fn truncate(&mut self, len: usize) {
595 match self {
596 Self::None => {}
597 Self::Inline(_) => {
598 if len == 0 {
599 *self = Self::None;
600 }
601 }
602 Self::Vec(vec) => vec.truncate(len),
603 }
604 }
605
606 fn len(&self) -> usize {
607 match self {
608 Self::None => 0,
609 Self::Inline(_) => 1,
610 Self::Vec(vec) => vec.len(),
611 }
612 }
613
614 fn contains(&self, key: &LockKey) -> bool {
615 match self {
616 Self::None => false,
617 Self::Inline(single) => single == key,
618 Self::Vec(vec) => vec.contains(key),
619 }
620 }
621
622 fn sort_unstable(&mut self) {
623 match self {
624 Self::Vec(vec) => vec.sort_unstable(),
625 _ => {}
626 }
627 }
628
629 fn dedup(&mut self) {
630 match self {
631 Self::Vec(vec) => vec.dedup(),
632 _ => {}
633 }
634 }
635
636 fn iter(&self) -> LockKeysIter<'_> {
637 match self {
638 LockKeys::None => LockKeysIter::None,
639 LockKeys::Inline(key) => LockKeysIter::Inline(key),
640 LockKeys::Vec(keys) => LockKeysIter::Vec(keys.iter()),
641 }
642 }
643}
644
645enum LockKeysIter<'a> {
646 None,
647 Inline(&'a LockKey),
648 Vec(std::slice::Iter<'a, LockKey>),
649}
650
651impl<'a> Iterator for LockKeysIter<'a> {
652 type Item = &'a LockKey;
653 fn next(&mut self) -> Option<Self::Item> {
654 match self {
655 Self::None => None,
656 Self::Inline(inline) => {
657 let next = *inline;
658 *self = Self::None;
659 Some(next)
660 }
661 Self::Vec(vec) => vec.next(),
662 }
663 }
664}
665
666impl Default for LockKeys {
667 fn default() -> Self {
668 LockKeys::None
669 }
670}
671
672#[macro_export]
673macro_rules! lock_keys {
674 () => {
675 $crate::object_store::transaction::LockKeys::None
676 };
677 ($lock_key:expr $(,)?) => {
678 $crate::object_store::transaction::LockKeys::Inline($lock_key)
679 };
680 ($($lock_keys:expr),+ $(,)?) => {
681 $crate::object_store::transaction::LockKeys::Vec(vec![$($lock_keys),+])
682 };
683}
684pub use lock_keys;
685
686pub trait AssociatedObject: Send + Sync {
690 fn will_apply_mutation(&self, _mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
691 }
692}
693
694pub enum AssocObj<'a> {
695 None,
696 Borrowed(&'a dyn AssociatedObject),
697 Owned(Box<dyn AssociatedObject>),
698}
699
700impl AssocObj<'_> {
701 pub fn map<R, F: FnOnce(&dyn AssociatedObject) -> R>(&self, f: F) -> Option<R> {
702 match self {
703 AssocObj::None => None,
704 AssocObj::Borrowed(b) => Some(f(*b)),
705 AssocObj::Owned(o) => Some(f(o.as_ref())),
706 }
707 }
708}
709
710pub struct TxnMutation<'a> {
711 pub object_id: u64,
715
716 pub mutation: Mutation,
718
719 pub associated_object: AssocObj<'a>,
722}
723
724impl Ord for TxnMutation<'_> {
727 fn cmp(&self, other: &Self) -> Ordering {
728 self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
729 }
730}
731
732impl PartialOrd for TxnMutation<'_> {
733 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
734 Some(self.cmp(other))
735 }
736}
737
738impl PartialEq for TxnMutation<'_> {
739 fn eq(&self, other: &Self) -> bool {
740 self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
741 }
742}
743
744impl Eq for TxnMutation<'_> {}
745
746impl std::fmt::Debug for TxnMutation<'_> {
747 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
748 f.debug_struct("TxnMutation")
749 .field("object_id", &self.object_id)
750 .field("mutation", &self.mutation)
751 .finish()
752 }
753}
754
755pub enum MetadataReservation {
756 None,
758
759 Borrowed,
762
763 Reservation(Reservation),
765
766 Hold(u64),
768}
769
770pub struct Transaction<'a> {
772 txn_guard: TxnGuard<'a>,
773
774 mutations: BTreeSet<TxnMutation<'a>>,
776
777 txn_locks: LockKeys,
779
780 pub allocator_reservation: Option<&'a Reservation>,
782
783 pub metadata_reservation: MetadataReservation,
785
786 new_objects: BTreeSet<(u64, u64)>,
789
790 checksums: Vec<(Range<u64>, Vec<Checksum>, bool)>,
792
793 includes_write: bool,
795}
796
797impl<'a> Transaction<'a> {
798 pub async fn new(
801 txn_guard: TxnGuard<'a>,
802 options: Options<'a>,
803 txn_locks: LockKeys,
804 ) -> Result<Transaction<'a>, Error> {
805 txn_guard.fs().add_transaction(options.skip_journal_checks).await;
806 let fs = txn_guard.fs().clone();
807 let guard = scopeguard::guard((), |_| fs.sub_transaction());
808 let (metadata_reservation, allocator_reservation, hold) =
809 txn_guard.fs().reservation_for_transaction(options).await?;
810
811 let txn_locks = {
812 let lock_manager = txn_guard.fs().lock_manager();
813 let mut write_guard = lock_manager.txn_lock(txn_locks).await;
814 std::mem::take(&mut write_guard.0.lock_keys)
815 };
816 let mut transaction = Transaction {
817 txn_guard,
818 mutations: BTreeSet::new(),
819 txn_locks,
820 allocator_reservation: None,
821 metadata_reservation,
822 new_objects: BTreeSet::new(),
823 checksums: Vec::new(),
824 includes_write: false,
825 };
826
827 ScopeGuard::into_inner(guard);
828 hold.map(|h| h.forget()); transaction.allocator_reservation = allocator_reservation;
830 Ok(transaction)
831 }
832
833 pub fn txn_guard(&self) -> &TxnGuard<'_> {
834 &self.txn_guard
835 }
836
837 pub fn mutations(&self) -> &BTreeSet<TxnMutation<'a>> {
838 &self.mutations
839 }
840
841 pub fn take_mutations(&mut self) -> BTreeSet<TxnMutation<'a>> {
842 self.new_objects.clear();
843 mem::take(&mut self.mutations)
844 }
845
846 pub fn add(&mut self, object_id: u64, mutation: Mutation) -> Option<Mutation> {
849 self.add_with_object(object_id, mutation, AssocObj::None)
850 }
851
852 pub fn remove(&mut self, object_id: u64, mutation: Mutation) {
854 let txn_mutation = TxnMutation { object_id, mutation, associated_object: AssocObj::None };
855 if self.mutations.remove(&txn_mutation) {
856 if let Mutation::ObjectStore(ObjectStoreMutation {
857 item:
858 ObjectItem {
859 key: ObjectKey { object_id: new_object_id, data: ObjectKeyData::Object },
860 ..
861 },
862 op: Operation::Insert,
863 }) = txn_mutation.mutation
864 {
865 self.new_objects.remove(&(object_id, new_object_id));
866 }
867 }
868 }
869
870 pub fn add_with_object(
873 &mut self,
874 object_id: u64,
875 mutation: Mutation,
876 associated_object: AssocObj<'a>,
877 ) -> Option<Mutation> {
878 assert!(object_id != INVALID_OBJECT_ID);
879 if let Mutation::ObjectStore(ObjectStoreMutation {
880 item:
881 Item {
882 key:
883 ObjectKey { data: ObjectKeyData::Attribute(_, AttributeKey::Extent(_)), .. },
884 ..
885 },
886 ..
887 }) = &mutation
888 {
889 self.includes_write = true;
890 }
891 let txn_mutation = TxnMutation { object_id, mutation, associated_object };
892 self.verify_locks(&txn_mutation);
893 self.mutations.replace(txn_mutation).map(|m| m.mutation)
894 }
895
896 pub fn add_checksum(&mut self, range: Range<u64>, checksums: Vec<Checksum>, first_write: bool) {
897 self.checksums.push((range, checksums, first_write));
898 }
899
900 pub fn includes_write(&self) -> bool {
901 self.includes_write
902 }
903
904 pub fn checksums(&self) -> &[(Range<u64>, Vec<Checksum>, bool)] {
905 &self.checksums
906 }
907
908 pub fn take_checksums(&mut self) -> Vec<(Range<u64>, Vec<Checksum>, bool)> {
909 std::mem::replace(&mut self.checksums, Vec::new())
910 }
911
912 fn verify_locks(&mut self, mutation: &TxnMutation<'_>) {
913 match mutation {
917 TxnMutation {
918 mutation:
919 Mutation::ObjectStore {
920 0: ObjectStoreMutation { item: ObjectItem { key, .. }, op },
921 },
922 object_id: store_object_id,
923 ..
924 } => {
925 match &key.data {
926 ObjectKeyData::Attribute(..) => {
927 }
929 ObjectKeyData::Child { .. }
930 | ObjectKeyData::EncryptedChild(_)
931 | ObjectKeyData::EncryptedCasefoldChild(_)
932 | ObjectKeyData::CasefoldChild { .. }
933 | ObjectKeyData::LegacyCasefoldChild(_) => {
934 let id = key.object_id;
935 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
936 && !self.new_objects.contains(&(*store_object_id, id))
937 {
938 debug_assert!(
939 false,
940 "Not holding required lock for object {id} \
941 in store {store_object_id}"
942 );
943 error!(
944 "Not holding required lock for object {id} in store \
945 {store_object_id}"
946 )
947 }
948 }
949 ObjectKeyData::GraveyardEntry { .. } => {
950 }
952 ObjectKeyData::GraveyardAttributeEntry { .. } => {
953 }
955 ObjectKeyData::Keys => {
956 let id = key.object_id;
957 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
958 && !self.new_objects.contains(&(*store_object_id, id))
959 {
960 debug_assert!(
961 false,
962 "Not holding required lock for object {id} \
963 in store {store_object_id}"
964 );
965 error!(
966 "Not holding required lock for object {id} in store \
967 {store_object_id}"
968 )
969 }
970 }
971 ObjectKeyData::Object => match op {
972 Operation::Insert => {
974 self.new_objects.insert((*store_object_id, key.object_id));
975 }
976 Operation::Merge | Operation::ReplaceOrInsert => {
977 let id = key.object_id;
978 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
979 && !self.new_objects.contains(&(*store_object_id, id))
980 {
981 debug_assert!(
982 false,
983 "Not holding required lock for object {id} \
984 in store {store_object_id}"
985 );
986 error!(
987 "Not holding required lock for object {id} in store \
988 {store_object_id}"
989 )
990 }
991 }
992 },
993 ObjectKeyData::Project { project_id, property: ProjectProperty::Limit } => {
994 if !self.txn_locks.contains(&LockKey::ProjectId {
995 store_object_id: *store_object_id,
996 project_id: *project_id,
997 }) {
998 debug_assert!(
999 false,
1000 "Not holding required lock for project limit id {project_id} \
1001 in store {store_object_id}"
1002 );
1003 error!(
1004 "Not holding required lock for project limit id {project_id} in \
1005 store {store_object_id}"
1006 )
1007 }
1008 }
1009 ObjectKeyData::Project { property: ProjectProperty::Usage, .. } => match op {
1010 Operation::Insert | Operation::ReplaceOrInsert => {
1011 panic!(
1012 "Project usage is all handled by merging deltas, no inserts or \
1013 replacements should be used"
1014 );
1015 }
1016 Operation::Merge => {}
1018 },
1019 ObjectKeyData::ExtendedAttribute { .. } => {
1020 let id = key.object_id;
1021 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
1022 && !self.new_objects.contains(&(*store_object_id, id))
1023 {
1024 debug_assert!(
1025 false,
1026 "Not holding required lock for object {id} \
1027 in store {store_object_id} while mutating extended attribute"
1028 );
1029 error!(
1030 "Not holding required lock for object {id} in store \
1031 {store_object_id} while mutating extended attribute"
1032 )
1033 }
1034 }
1035 }
1036 }
1037 TxnMutation { mutation: Mutation::DeleteVolume, object_id, .. } => {
1038 if !self.txn_locks.contains(&LockKey::flush(*object_id)) {
1039 debug_assert!(false, "Not holding required lock for DeleteVolume");
1040 error!("Not holding required lock for DeleteVolume");
1041 }
1042 }
1043 _ => {}
1044 }
1045 }
1046
1047 pub fn is_empty(&self) -> bool {
1049 self.mutations.is_empty()
1050 }
1051
1052 pub fn get_object_mutation(
1055 &self,
1056 store_object_id: u64,
1057 key: ObjectKey,
1058 ) -> Option<&ObjectStoreMutation> {
1059 if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
1060 self.mutations.get(&TxnMutation {
1061 object_id: store_object_id,
1062 mutation: Mutation::insert_object(key, ObjectValue::None),
1063 associated_object: AssocObj::None,
1064 })
1065 {
1066 Some(mutation)
1067 } else {
1068 None
1069 }
1070 }
1071
1072 pub async fn commit(mut self) -> Result<u64, Error> {
1074 debug!(txn:? = &self; "Commit");
1075 self.txn_guard.fs().clone().commit_transaction(&mut self, |x| x).await
1076 }
1077
1078 pub async fn commit_with_callback<R: Send>(
1081 mut self,
1082 f: impl FnOnce(u64) -> R + Send,
1083 ) -> Result<R, Error> {
1084 debug!(txn:? = &self; "Commit");
1085 self.txn_guard.fs().clone().commit_transaction(&mut self, f).await
1086 }
1087
1088 pub async fn commit_and_continue(&mut self) -> Result<(), Error> {
1091 debug!(txn:? = self; "Commit");
1092 self.txn_guard.fs().clone().commit_transaction(self, |_| {}).await?;
1093 assert!(self.mutations.is_empty());
1094 self.txn_guard.fs().lock_manager().downgrade_locks(&self.txn_locks);
1095 Ok(())
1096 }
1097}
1098
1099impl Drop for Transaction<'_> {
1100 fn drop(&mut self) {
1101 debug!(txn:? = &self; "Drop");
1104 self.txn_guard.fs().clone().drop_transaction(self);
1105 }
1106}
1107
1108impl std::fmt::Debug for Transaction<'_> {
1109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1110 f.debug_struct("Transaction")
1111 .field("mutations", &self.mutations)
1112 .field("txn_locks", &self.txn_locks)
1113 .field("reservation", &self.allocator_reservation)
1114 .finish()
1115 }
1116}
1117
1118pub enum BorrowedOrOwned<'a, T> {
1119 Borrowed(&'a T),
1120 Owned(T),
1121}
1122
1123impl<T> Deref for BorrowedOrOwned<'_, T> {
1124 type Target = T;
1125
1126 fn deref(&self) -> &Self::Target {
1127 match self {
1128 BorrowedOrOwned::Borrowed(b) => b,
1129 BorrowedOrOwned::Owned(o) => &o,
1130 }
1131 }
1132}
1133
1134impl<'a, T> From<&'a T> for BorrowedOrOwned<'a, T> {
1135 fn from(value: &'a T) -> Self {
1136 BorrowedOrOwned::Borrowed(value)
1137 }
1138}
1139
1140impl<T> From<T> for BorrowedOrOwned<'_, T> {
1141 fn from(value: T) -> Self {
1142 BorrowedOrOwned::Owned(value)
1143 }
1144}
1145
1146pub struct LockManager {
1175 locks: Mutex<Locks>,
1176}
1177
1178struct Locks {
1179 keys: HashMap<LockKey, LockEntry>,
1180}
1181
1182impl Locks {
1183 fn drop_lock(&mut self, key: LockKey, state: LockState) {
1184 if let Entry::Occupied(mut occupied) = self.keys.entry(key) {
1185 let entry = occupied.get_mut();
1186 let wake = match state {
1187 LockState::ReadLock => {
1188 entry.read_count -= 1;
1189 entry.read_count == 0
1190 }
1191 LockState::Locked | LockState::WriteLock => {
1193 entry.state = LockState::ReadLock;
1194 true
1195 }
1196 };
1197 if wake {
1198 unsafe {
1200 entry.wake();
1201 }
1202 if entry.can_remove() {
1203 occupied.remove_entry();
1204 }
1205 }
1206 } else {
1207 unreachable!();
1208 }
1209 }
1210
1211 fn drop_read_locks(&mut self, lock_keys: LockKeys) {
1212 for lock in lock_keys.iter() {
1213 self.drop_lock(*lock, LockState::ReadLock);
1214 }
1215 }
1216
1217 fn drop_write_locks(&mut self, lock_keys: LockKeys) {
1218 for lock in lock_keys.iter() {
1219 self.drop_lock(*lock, LockState::WriteLock);
1222 }
1223 }
1224
1225 fn downgrade_locks(&mut self, lock_keys: &LockKeys) {
1227 for lock in lock_keys.iter() {
1228 unsafe {
1230 self.keys.get_mut(lock).unwrap().downgrade_lock();
1231 }
1232 }
1233 }
1234}
1235
1236#[derive(Debug)]
1237struct LockEntry {
1238 read_count: u64,
1241
1242 state: LockState,
1244
1245 head: *const LockWaker,
1250 tail: *const LockWaker,
1251}
1252
1253unsafe impl Send for LockEntry {}
1254
1255struct LockWaker {
1258 next: UnsafeCell<*const LockWaker>,
1260 prev: UnsafeCell<*const LockWaker>,
1261
1262 key: LockKey,
1265
1266 waker: UnsafeCell<WakerState>,
1268
1269 target_state: LockState,
1271
1272 is_upgrade: bool,
1274
1275 _pin: PhantomPinned,
1277}
1278
1279enum WakerState {
1280 Pending,
1282
1283 Registered(Waker),
1285
1286 Woken,
1288}
1289
1290impl WakerState {
1291 fn is_woken(&self) -> bool {
1292 matches!(self, WakerState::Woken)
1293 }
1294}
1295
1296unsafe impl Send for LockWaker {}
1297unsafe impl Sync for LockWaker {}
1298
1299impl LockWaker {
1300 async fn wait(&self, manager: &LockManager) {
1302 let waker_guard = scopeguard::guard((), |_| {
1304 let mut locks = manager.locks.lock();
1305 unsafe {
1307 if (*self.waker.get()).is_woken() {
1308 if self.is_upgrade {
1310 locks.keys.get_mut(&self.key).unwrap().downgrade_lock();
1311 } else {
1312 locks.drop_lock(self.key, self.target_state);
1313 }
1314 } else {
1315 locks.keys.get_mut(&self.key).unwrap().remove_waker(self);
1318 }
1319 }
1320 });
1321
1322 poll_fn(|cx| {
1323 let _locks = manager.locks.lock();
1324 unsafe {
1326 if (*self.waker.get()).is_woken() {
1327 Poll::Ready(())
1328 } else {
1329 *self.waker.get() = WakerState::Registered(cx.waker().clone());
1330 Poll::Pending
1331 }
1332 }
1333 })
1334 .await;
1335
1336 ScopeGuard::into_inner(waker_guard);
1337 }
1338}
1339
1340#[derive(Copy, Clone, Debug, PartialEq)]
1341enum LockState {
1342 ReadLock,
1344
1345 Locked,
1348
1349 WriteLock,
1351}
1352
1353impl LockManager {
1354 pub fn new() -> Self {
1355 LockManager { locks: Mutex::new(Locks { keys: HashMap::default() }) }
1356 }
1357
1358 pub async fn txn_lock<'a>(&'a self, lock_keys: LockKeys) -> TransactionLocks<'a> {
1362 TransactionLocks(
1363 debug_assert_not_too_long!(self.lock(lock_keys, LockState::Locked)).right().unwrap(),
1364 )
1365 }
1366
1367 async fn lock<'a>(
1370 &'a self,
1371 mut lock_keys: LockKeys,
1372 target_state: LockState,
1373 ) -> Either<ReadGuard<'a>, WriteGuard<'a>> {
1374 let mut guard = match &target_state {
1375 LockState::ReadLock => Left(ReadGuard {
1376 manager: self.into(),
1377 lock_keys: LockKeys::with_capacity(lock_keys.len()),
1378 }),
1379 LockState::Locked | LockState::WriteLock => Right(WriteGuard {
1380 manager: self.into(),
1381 lock_keys: LockKeys::with_capacity(lock_keys.len()),
1382 }),
1383 };
1384 let guard_keys = match &mut guard {
1385 Left(g) => &mut g.lock_keys,
1386 Right(g) => &mut g.lock_keys,
1387 };
1388 lock_keys.sort_unstable();
1389 lock_keys.dedup();
1390 for lock in lock_keys.iter() {
1391 let lock_waker = None;
1392 pin_mut!(lock_waker);
1393 {
1394 let mut locks = self.locks.lock();
1395 match locks.keys.entry(*lock) {
1396 Entry::Vacant(vacant) => {
1397 vacant.insert(LockEntry {
1398 read_count: if let LockState::ReadLock = target_state {
1399 guard_keys.push(*lock);
1400 1
1401 } else {
1402 guard_keys.push(*lock);
1403 0
1404 },
1405 state: target_state,
1406 head: std::ptr::null(),
1407 tail: std::ptr::null(),
1408 });
1409 }
1410 Entry::Occupied(mut occupied) => {
1411 let entry = occupied.get_mut();
1412 if unsafe { entry.is_allowed(target_state, entry.head.is_null()) } {
1414 if let LockState::ReadLock = target_state {
1415 entry.read_count += 1;
1416 guard_keys.push(*lock);
1417 } else {
1418 entry.state = target_state;
1419 guard_keys.push(*lock);
1420 }
1421 } else {
1422 unsafe {
1425 *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1426 next: UnsafeCell::new(std::ptr::null()),
1427 prev: UnsafeCell::new(entry.tail),
1428 key: *lock,
1429 waker: UnsafeCell::new(WakerState::Pending),
1430 target_state: target_state,
1431 is_upgrade: false,
1432 _pin: PhantomPinned,
1433 });
1434 }
1435 let waker = (*lock_waker).as_ref().unwrap();
1436 if entry.tail.is_null() {
1437 entry.head = waker;
1438 } else {
1439 unsafe {
1441 *(*entry.tail).next.get() = waker;
1442 }
1443 }
1444 entry.tail = waker;
1445 }
1446 }
1447 }
1448 }
1449 if let Some(waker) = &*lock_waker {
1450 waker.wait(self).await;
1451 guard_keys.push(*lock);
1452 }
1453 }
1454 guard
1455 }
1456
1457 pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
1459 let mut locks = self.locks.lock();
1460 locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks));
1461 }
1462
1463 pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
1465 self.commit_prepare_keys(&transaction.txn_locks).await;
1466 }
1467
1468 async fn commit_prepare_keys(&self, lock_keys: &LockKeys) {
1469 for lock in lock_keys.iter() {
1470 let lock_waker = None;
1471 pin_mut!(lock_waker);
1472 {
1473 let mut locks = self.locks.lock();
1474 let entry = locks.keys.get_mut(lock).unwrap();
1475 assert_eq!(entry.state, LockState::Locked);
1476
1477 if entry.read_count == 0 {
1478 entry.state = LockState::WriteLock;
1479 } else {
1480 unsafe {
1483 *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1484 next: UnsafeCell::new(entry.head),
1485 prev: UnsafeCell::new(std::ptr::null()),
1486 key: *lock,
1487 waker: UnsafeCell::new(WakerState::Pending),
1488 target_state: LockState::WriteLock,
1489 is_upgrade: true,
1490 _pin: PhantomPinned,
1491 });
1492 }
1493 let waker = (*lock_waker).as_ref().unwrap();
1494 if entry.head.is_null() {
1495 entry.tail = (*lock_waker).as_ref().unwrap();
1496 } else {
1497 unsafe {
1499 *(*entry.head).prev.get() = waker;
1500 }
1501 }
1502 entry.head = waker;
1503 }
1504 }
1505
1506 if let Some(waker) = &*lock_waker {
1507 waker.wait(self).await;
1508 }
1509 }
1510 }
1511
1512 pub async fn read_lock<'a>(&'a self, lock_keys: LockKeys) -> ReadGuard<'a> {
1518 debug_assert_not_too_long!(self.lock(lock_keys, LockState::ReadLock)).left().unwrap()
1519 }
1520
1521 pub async fn write_lock<'a>(&'a self, lock_keys: LockKeys) -> WriteGuard<'a> {
1524 debug_assert_not_too_long!(self.lock(lock_keys, LockState::WriteLock)).right().unwrap()
1525 }
1526
1527 pub fn downgrade_locks(&self, lock_keys: &LockKeys) {
1530 self.locks.lock().downgrade_locks(lock_keys);
1531 }
1532}
1533
1534impl LockEntry {
1536 unsafe fn wake(&mut self) {
1537 if self.head.is_null() || self.state == LockState::WriteLock {
1539 return;
1540 }
1541
1542 let waker = unsafe { &*self.head };
1543
1544 if waker.is_upgrade {
1545 if self.read_count > 0 {
1546 return;
1547 }
1548 } else if !unsafe { self.is_allowed(waker.target_state, true) } {
1549 return;
1550 }
1551
1552 unsafe { self.pop_and_wake() };
1553
1554 if waker.target_state == LockState::WriteLock {
1557 return;
1558 }
1559
1560 while !self.head.is_null() && unsafe { (*self.head).target_state } == LockState::ReadLock {
1561 unsafe { self.pop_and_wake() };
1562 }
1563 }
1564
1565 unsafe fn pop_and_wake(&mut self) {
1566 let waker = unsafe { &*self.head };
1567
1568 self.head = unsafe { *waker.next.get() };
1570 if self.head.is_null() {
1571 self.tail = std::ptr::null()
1572 } else {
1573 unsafe { *(*self.head).prev.get() = std::ptr::null() };
1574 }
1575
1576 if waker.target_state == LockState::ReadLock {
1578 self.read_count += 1;
1579 } else {
1580 self.state = waker.target_state;
1581 }
1582
1583 if let WakerState::Registered(waker) =
1585 std::mem::replace(unsafe { &mut *waker.waker.get() }, WakerState::Woken)
1586 {
1587 waker.wake();
1588 }
1589 }
1590
1591 fn can_remove(&self) -> bool {
1592 self.state == LockState::ReadLock && self.read_count == 0
1593 }
1594
1595 unsafe fn remove_waker(&mut self, waker: &LockWaker) {
1596 unsafe {
1597 let is_first = (*waker.prev.get()).is_null();
1598 if is_first {
1599 self.head = *waker.next.get();
1600 } else {
1601 *(**waker.prev.get()).next.get() = *waker.next.get();
1602 }
1603 if (*waker.next.get()).is_null() {
1604 self.tail = *waker.prev.get();
1605 } else {
1606 *(**waker.next.get()).prev.get() = *waker.prev.get();
1607 }
1608 if is_first {
1609 self.wake();
1612 }
1613 }
1614 }
1615
1616 unsafe fn is_allowed(&self, target_state: LockState, is_head: bool) -> bool {
1620 match self.state {
1621 LockState::ReadLock => {
1622 (self.read_count == 0
1624 || target_state == LockState::Locked
1625 || target_state == LockState::ReadLock)
1626 && is_head
1627 }
1628 LockState::Locked => {
1629 target_state == LockState::ReadLock
1633 && (is_head || unsafe { !(*self.head).is_upgrade })
1634 }
1635 LockState::WriteLock => false,
1636 }
1637 }
1638
1639 unsafe fn downgrade_lock(&mut self) {
1640 assert_eq!(std::mem::replace(&mut self.state, LockState::Locked), LockState::WriteLock);
1641 unsafe { self.wake() };
1642 }
1643}
1644
1645#[must_use]
1646pub struct ReadGuard<'a> {
1647 manager: LockManagerRef<'a>,
1648 lock_keys: LockKeys,
1649}
1650
1651impl ReadGuard<'_> {
1652 pub fn fs(&self) -> Option<&Arc<FxFilesystem>> {
1653 if let LockManagerRef::Owned(fs) = &self.manager { Some(fs) } else { None }
1654 }
1655
1656 pub fn into_owned(mut self, fs: Arc<FxFilesystem>) -> ReadGuard<'static> {
1657 ReadGuard {
1658 manager: LockManagerRef::Owned(fs),
1659 lock_keys: std::mem::replace(&mut self.lock_keys, LockKeys::None),
1660 }
1661 }
1662}
1663
1664impl Drop for ReadGuard<'_> {
1665 fn drop(&mut self) {
1666 let mut locks = self.manager.locks.lock();
1667 locks.drop_read_locks(std::mem::take(&mut self.lock_keys));
1668 }
1669}
1670
1671impl fmt::Debug for ReadGuard<'_> {
1672 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1673 f.debug_struct("ReadGuard")
1674 .field("manager", &(&self.manager as *const _))
1675 .field("lock_keys", &self.lock_keys)
1676 .finish()
1677 }
1678}
1679
1680#[must_use]
1681pub struct WriteGuard<'a> {
1682 manager: LockManagerRef<'a>,
1683 lock_keys: LockKeys,
1684}
1685
1686impl Drop for WriteGuard<'_> {
1687 fn drop(&mut self) {
1688 let mut locks = self.manager.locks.lock();
1689 locks.drop_write_locks(std::mem::take(&mut self.lock_keys));
1690 }
1691}
1692
1693impl fmt::Debug for WriteGuard<'_> {
1694 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1695 f.debug_struct("WriteGuard")
1696 .field("manager", &(&self.manager as *const _))
1697 .field("lock_keys", &self.lock_keys)
1698 .finish()
1699 }
1700}
1701
1702enum LockManagerRef<'a> {
1703 Borrowed(&'a LockManager),
1704 Owned(Arc<FxFilesystem>),
1705}
1706
1707impl Deref for LockManagerRef<'_> {
1708 type Target = LockManager;
1709
1710 fn deref(&self) -> &Self::Target {
1711 match self {
1712 LockManagerRef::Borrowed(m) => m,
1713 LockManagerRef::Owned(f) => f.lock_manager(),
1714 }
1715 }
1716}
1717
1718impl<'a> From<&'a LockManager> for LockManagerRef<'a> {
1719 fn from(value: &'a LockManager) -> Self {
1720 LockManagerRef::Borrowed(value)
1721 }
1722}
1723
1724#[cfg(test)]
1725mod tests {
1726 use super::{LockKey, LockKeys, LockManager, LockState, Mutation, Options};
1727 use crate::filesystem::FxFilesystem;
1728 use fuchsia_async as fasync;
1729 use fuchsia_sync::Mutex;
1730 use futures::channel::oneshot::channel;
1731 use futures::future::FutureExt;
1732 use futures::stream::FuturesUnordered;
1733 use futures::{StreamExt, join, pin_mut};
1734 use std::task::Poll;
1735 use std::time::Duration;
1736 use storage_device::DeviceHolder;
1737 use storage_device::fake_device::FakeDevice;
1738
1739 #[fuchsia::test]
1740 async fn test_simple() {
1741 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1742 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1743 let mut t = fs
1744 .clone()
1745 .new_transaction(lock_keys![], Options::default())
1746 .await
1747 .expect("new_transaction failed");
1748 t.add(1, Mutation::BeginFlush);
1749 assert!(!t.is_empty());
1750 }
1751
1752 #[fuchsia::test]
1753 async fn test_locks() {
1754 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1755 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1756 let (send1, recv1) = channel();
1757 let (send2, recv2) = channel();
1758 let (send3, recv3) = channel();
1759 let done = Mutex::new(false);
1760 let mut futures = FuturesUnordered::new();
1761 futures.push(
1762 async {
1763 let _t = fs
1764 .clone()
1765 .new_transaction(
1766 lock_keys![LockKey::object_attribute(1, 2, 3)],
1767 Options::default(),
1768 )
1769 .await
1770 .expect("new_transaction failed");
1771 send1.send(()).unwrap(); send3.send(()).unwrap(); recv2.await.unwrap();
1774 fasync::Timer::new(Duration::from_millis(100)).await;
1776 assert!(!*done.lock());
1777 }
1778 .boxed(),
1779 );
1780 futures.push(
1781 async {
1782 recv1.await.unwrap();
1783 let _t = fs
1785 .clone()
1786 .new_transaction(
1787 lock_keys![LockKey::object_attribute(2, 2, 3)],
1788 Options::default(),
1789 )
1790 .await
1791 .expect("new_transaction failed");
1792 send2.send(()).unwrap();
1794 }
1795 .boxed(),
1796 );
1797 futures.push(
1798 async {
1799 recv3.await.unwrap();
1801 let _t = fs
1802 .clone()
1803 .new_transaction(
1804 lock_keys![LockKey::object_attribute(1, 2, 3)],
1805 Options::default(),
1806 )
1807 .await;
1808 *done.lock() = true;
1809 }
1810 .boxed(),
1811 );
1812 while let Some(()) = futures.next().await {}
1813 }
1814
1815 #[fuchsia::test]
1816 async fn test_read_lock_after_write_lock() {
1817 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1818 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1819 let (send1, recv1) = channel();
1820 let (send2, recv2) = channel();
1821 let done = Mutex::new(false);
1822 join!(
1823 async {
1824 let t = fs
1825 .clone()
1826 .new_transaction(
1827 lock_keys![LockKey::object_attribute(1, 2, 3)],
1828 Options::default(),
1829 )
1830 .await
1831 .expect("new_transaction failed");
1832 send1.send(()).unwrap(); recv2.await.unwrap();
1834 t.commit().await.expect("commit failed");
1835 *done.lock() = true;
1836 },
1837 async {
1838 recv1.await.unwrap();
1839 let _guard = fs
1841 .lock_manager()
1842 .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1843 .await;
1844 send2.send(()).unwrap();
1846 fasync::Timer::new(Duration::from_millis(100)).await;
1849 assert!(!*done.lock());
1850 },
1851 );
1852 }
1853
1854 #[fuchsia::test]
1855 async fn test_write_lock_after_read_lock() {
1856 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1857 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1858 let (send1, recv1) = channel();
1859 let (send2, recv2) = channel();
1860 let done = Mutex::new(false);
1861 join!(
1862 async {
1863 let _guard = fs
1865 .lock_manager()
1866 .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1867 .await;
1868 send1.send(()).unwrap();
1870 recv2.await.unwrap();
1871 fasync::Timer::new(Duration::from_millis(100)).await;
1874 assert!(!*done.lock());
1875 },
1876 async {
1877 recv1.await.unwrap();
1878 let t = fs
1879 .clone()
1880 .new_transaction(
1881 lock_keys![LockKey::object_attribute(1, 2, 3)],
1882 Options::default(),
1883 )
1884 .await
1885 .expect("new_transaction failed");
1886 send2.send(()).unwrap(); t.commit().await.expect("commit failed");
1888 *done.lock() = true;
1889 },
1890 );
1891 }
1892
1893 #[fuchsia::test]
1894 async fn test_drop_uncommitted_transaction() {
1895 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1896 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1897 let key = lock_keys![LockKey::object(1, 1)];
1898
1899 {
1901 let _write_lock = fs
1902 .clone()
1903 .new_transaction(key.clone(), Options::default())
1904 .await
1905 .expect("new_transaction failed");
1906 let _read_lock = fs.lock_manager().read_lock(key.clone()).await;
1907 }
1908 {
1910 let _write_lock = fs
1911 .clone()
1912 .new_transaction(key.clone(), Options::default())
1913 .await
1914 .expect("new_transaction failed");
1915 }
1916 fs.clone()
1918 .new_transaction(key.clone(), Options::default())
1919 .await
1920 .expect("new_transaction failed");
1921 }
1922
1923 #[fuchsia::test]
1924 async fn test_drop_waiting_write_lock() {
1925 let manager = LockManager::new();
1926 let keys = lock_keys![LockKey::object(1, 1)];
1927 {
1928 let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1929 if let Poll::Ready(_) =
1930 futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1931 {
1932 assert!(false);
1933 }
1934 }
1935 let _ = manager.lock(keys, LockState::WriteLock).await;
1936 }
1937
1938 #[fuchsia::test]
1939 async fn test_write_lock_blocks_everything() {
1940 let manager = LockManager::new();
1941 let keys = lock_keys![LockKey::object(1, 1)];
1942 {
1943 let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1944 if let Poll::Ready(_) =
1945 futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1946 {
1947 assert!(false);
1948 }
1949 if let Poll::Ready(_) =
1950 futures::poll!(manager.lock(keys.clone(), LockState::ReadLock).boxed())
1951 {
1952 assert!(false);
1953 }
1954 }
1955 {
1956 let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1957 }
1958 {
1959 let _guard = manager.lock(keys, LockState::ReadLock).await;
1960 }
1961 }
1962
1963 #[fuchsia::test]
1964 async fn test_downgrade_locks() {
1965 let manager = LockManager::new();
1966 let keys = lock_keys![LockKey::object(1, 1)];
1967 let _guard = manager.txn_lock(keys.clone()).await;
1968 manager.commit_prepare_keys(&keys).await;
1969
1970 let mut read_lock: FuturesUnordered<_> =
1972 std::iter::once(manager.read_lock(keys.clone())).collect();
1973
1974 assert!(futures::poll!(read_lock.next()).is_pending());
1976
1977 manager.downgrade_locks(&keys);
1978
1979 assert!(futures::poll!(read_lock.next()).is_ready());
1981 }
1982
1983 #[fuchsia::test]
1984 async fn test_dropped_write_lock_wakes() {
1985 let manager = LockManager::new();
1986 let keys = lock_keys![LockKey::object(1, 1)];
1987 let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1988 let mut read_lock = FuturesUnordered::new();
1989 read_lock.push(manager.lock(keys.clone(), LockState::ReadLock));
1990
1991 {
1992 let write_lock = manager.lock(keys, LockState::WriteLock);
1993 pin_mut!(write_lock);
1994
1995 assert!(futures::poll!(write_lock).is_pending());
1997
1998 assert!(futures::poll!(read_lock.next()).is_pending());
2000 }
2001
2002 assert!(futures::poll!(read_lock.next()).is_ready());
2004 }
2005
2006 #[fuchsia::test]
2007 async fn test_drop_upgrade() {
2008 let manager = LockManager::new();
2009 let keys = lock_keys![LockKey::object(1, 1)];
2010 let _guard = manager.lock(keys.clone(), LockState::Locked).await;
2011
2012 {
2013 let commit_prepare = manager.commit_prepare_keys(&keys);
2014 pin_mut!(commit_prepare);
2015 let _read_guard = manager.lock(keys.clone(), LockState::ReadLock).await;
2016 assert!(futures::poll!(commit_prepare).is_pending());
2017
2018 }
2021
2022 manager.commit_prepare_keys(&keys).await;
2024 }
2025
2026 #[fasync::run_singlethreaded(test)]
2027 async fn test_woken_upgrade_blocks_reads() {
2028 let manager = LockManager::new();
2029 let keys = lock_keys![LockKey::object(1, 1)];
2030 let guard = manager.lock(keys.clone(), LockState::Locked).await;
2032
2033 let read1 = manager.lock(keys.clone(), LockState::ReadLock).await;
2035
2036 let commit_prepare = manager.commit_prepare_keys(&keys);
2038 pin_mut!(commit_prepare);
2039 assert!(futures::poll!(commit_prepare.as_mut()).is_pending());
2040
2041 let read2 = manager.lock(keys.clone(), LockState::ReadLock);
2043 pin_mut!(read2);
2044 assert!(futures::poll!(read2.as_mut()).is_pending());
2045
2046 std::mem::drop(read1);
2048 assert!(futures::poll!(commit_prepare).is_ready());
2049
2050 assert!(futures::poll!(read2.as_mut()).is_pending());
2052
2053 std::mem::drop(guard);
2055 assert!(futures::poll!(read2).is_ready());
2056 }
2057
2058 static LOCK_KEY_1: LockKey = LockKey::flush(1);
2059 static LOCK_KEY_2: LockKey = LockKey::flush(2);
2060 static LOCK_KEY_3: LockKey = LockKey::flush(3);
2061
2062 fn assert_lock_keys_equal(value: &LockKeys, expected: &LockKeys) {
2064 match (value, expected) {
2065 (LockKeys::None, LockKeys::None) => {}
2066 (LockKeys::Inline(key1), LockKeys::Inline(key2)) => {
2067 if key1 != key2 {
2068 panic!("{key1:?} != {key2:?}");
2069 }
2070 }
2071 (LockKeys::Vec(vec1), LockKeys::Vec(vec2)) => {
2072 if vec1 != vec2 {
2073 panic!("{vec1:?} != {vec2:?}");
2074 }
2075 if vec1.capacity() != vec2.capacity() {
2076 panic!(
2077 "LockKeys have different capacity: {} != {}",
2078 vec1.capacity(),
2079 vec2.capacity()
2080 );
2081 }
2082 }
2083 (_, _) => panic!("{value:?} != {expected:?}"),
2084 }
2085 }
2086
2087 fn assert_lock_keys_equivalent(value: &LockKeys, expected: &LockKeys) {
2089 let value: Vec<_> = value.iter().collect();
2090 let expected: Vec<_> = expected.iter().collect();
2091 assert_eq!(value, expected);
2092 }
2093
2094 #[test]
2095 fn test_lock_keys_macro() {
2096 assert_lock_keys_equal(&lock_keys![], &LockKeys::None);
2097 assert_lock_keys_equal(&lock_keys![LOCK_KEY_1], &LockKeys::Inline(LOCK_KEY_1));
2098 assert_lock_keys_equal(
2099 &lock_keys![LOCK_KEY_1, LOCK_KEY_2],
2100 &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]),
2101 );
2102 }
2103
2104 #[test]
2105 fn test_lock_keys_with_capacity() {
2106 assert_lock_keys_equal(&LockKeys::with_capacity(0), &LockKeys::None);
2107 assert_lock_keys_equal(&LockKeys::with_capacity(1), &LockKeys::None);
2108 assert_lock_keys_equal(&LockKeys::with_capacity(2), &LockKeys::Vec(Vec::with_capacity(2)));
2109 }
2110
2111 #[test]
2112 fn test_lock_keys_len() {
2113 assert_eq!(lock_keys![].len(), 0);
2114 assert_eq!(lock_keys![LOCK_KEY_1].len(), 1);
2115 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].len(), 2);
2116 }
2117
2118 #[test]
2119 fn test_lock_keys_contains() {
2120 assert_eq!(lock_keys![].contains(&LOCK_KEY_1), false);
2121 assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_1), true);
2122 assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_2), false);
2123 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_1), true);
2124 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_2), true);
2125 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_3), false);
2126 }
2127
2128 #[test]
2129 fn test_lock_keys_push() {
2130 let mut keys = lock_keys![];
2131 keys.push(LOCK_KEY_1);
2132 assert_lock_keys_equal(&keys, &LockKeys::Inline(LOCK_KEY_1));
2133 keys.push(LOCK_KEY_2);
2134 assert_lock_keys_equal(&keys, &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]));
2135 keys.push(LOCK_KEY_3);
2136 assert_lock_keys_equivalent(
2137 &keys,
2138 &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2, LOCK_KEY_3]),
2139 );
2140 }
2141
2142 #[test]
2143 fn test_lock_keys_sort_unstable() {
2144 let mut keys = lock_keys![];
2145 keys.sort_unstable();
2146 assert_lock_keys_equal(&keys, &lock_keys![]);
2147
2148 let mut keys = lock_keys![LOCK_KEY_1];
2149 keys.sort_unstable();
2150 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2151
2152 let mut keys = lock_keys![LOCK_KEY_2, LOCK_KEY_1];
2153 keys.sort_unstable();
2154 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2155 }
2156
2157 #[test]
2158 fn test_lock_keys_dedup() {
2159 let mut keys = lock_keys![];
2160 keys.dedup();
2161 assert_lock_keys_equal(&keys, &lock_keys![]);
2162
2163 let mut keys = lock_keys![LOCK_KEY_1];
2164 keys.dedup();
2165 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2166
2167 let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_1];
2168 keys.dedup();
2169 assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2170 }
2171
2172 #[test]
2173 fn test_lock_keys_truncate() {
2174 let mut keys = lock_keys![];
2175 keys.truncate(5);
2176 assert_lock_keys_equal(&keys, &lock_keys![]);
2177 keys.truncate(0);
2178 assert_lock_keys_equal(&keys, &lock_keys![]);
2179
2180 let mut keys = lock_keys![LOCK_KEY_1];
2181 keys.truncate(5);
2182 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2183 keys.truncate(0);
2184 assert_lock_keys_equal(&keys, &lock_keys![]);
2185
2186 let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_2];
2187 keys.truncate(5);
2188 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2189 keys.truncate(1);
2190 assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2192 }
2193
2194 #[test]
2195 fn test_lock_keys_iter() {
2196 assert_eq!(lock_keys![].iter().collect::<Vec<_>>(), Vec::<&LockKey>::new());
2197
2198 assert_eq!(lock_keys![LOCK_KEY_1].iter().collect::<Vec<_>>(), vec![&LOCK_KEY_1]);
2199
2200 assert_eq!(
2201 lock_keys![LOCK_KEY_1, LOCK_KEY_2].iter().collect::<Vec<_>>(),
2202 vec![&LOCK_KEY_1, &LOCK_KEY_2]
2203 );
2204 }
2205}