Skip to main content

fxfs/object_store/
transaction.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::Checksum;
6use crate::filesystem::FxFilesystem;
7use crate::log::*;
8use crate::lsm_tree::types::Item;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::allocator::{AllocatorItem, Reservation};
11use crate::object_store::object_manager::{ObjectManager, reserved_space_from_journal_usage};
12use crate::object_store::object_record::{
13    FxfsKey, FxfsKeyV40, FxfsKeyV49, ObjectItem, ObjectItemV40, ObjectItemV41, ObjectItemV43,
14    ObjectItemV46, ObjectItemV47, ObjectItemV49, ObjectItemV50, ObjectItemV55, ObjectKey,
15    ObjectKeyData, ObjectValue, ProjectProperty,
16};
17use crate::object_store::{AttributeId, AttributeKey, ProjectId};
18use crate::serialized_types::{Migrate, Versioned, migrate_nodefault, migrate_to_version};
19use anyhow::Error;
20use either::{Either, Left, Right};
21use fprint::TypeFingerprint;
22use fuchsia_sync::Mutex;
23use futures::future::poll_fn;
24use futures::pin_mut;
25use rustc_hash::FxHashMap as HashMap;
26use scopeguard::ScopeGuard;
27use serde::{Deserialize, Serialize};
28use std::cell::UnsafeCell;
29use std::cmp::Ordering;
30use std::collections::BTreeSet;
31use std::collections::hash_map::Entry;
32use std::marker::PhantomPinned;
33use std::ops::{Deref, DerefMut, Range};
34use std::sync::Arc;
35use std::task::{Poll, Waker};
36use std::{fmt, mem};
37
38/// This allows for special handling of certain transactions such as deletes and the
39/// extension of Journal extents. For most other use cases it is appropriate to use
40/// `default()` here.
41#[derive(Clone, Copy, Default)]
42pub struct Options<'a> {
43    /// If true, don't check for low journal space.  This should be true for any transactions that
44    /// might alleviate journal space (i.e. compaction).
45    pub skip_journal_checks: bool,
46
47    /// If true, don't check if we need to roll the mutations key.
48    pub skip_key_roll: bool,
49
50    /// If true, borrow metadata space from the metadata reservation.  This setting should be set to
51    /// true for any transaction that will either not affect space usage after compaction
52    /// (e.g. setting attributes), or reduce space usage (e.g. unlinking).  Otherwise, a transaction
53    /// might fail with an out-of-space error.
54    pub borrow_metadata_space: bool,
55
56    /// If specified, a reservation to be used with the transaction.  If not set, any allocations
57    /// that are part of this transaction will have to take their chances, and will fail if there is
58    /// no free space.  The intention is that this should be used for things like the journal which
59    /// require guaranteed space.
60    pub allocator_reservation: Option<&'a Reservation>,
61}
62
63// This is the amount of space that we reserve for metadata when we are creating a new transaction.
64// A transaction should not take more than this.  This is expressed in terms of space occupied in
65// the journal; transactions must not take up more space in the journal than the number below.  The
66// amount chosen here must be large enough for the maximum possible transaction that can be created,
67// so transactions always need to be bounded which might involve splitting an operation up into
68// smaller transactions.
69pub const TRANSACTION_MAX_JOURNAL_USAGE: u64 = 24_576;
70pub const TRANSACTION_METADATA_MAX_AMOUNT: u64 =
71    reserved_space_from_journal_usage(TRANSACTION_MAX_JOURNAL_USAGE);
72
73#[must_use]
74pub struct TransactionLocks<'a>(pub WriteGuard<'a>);
75
76/// The journal consists of these records which will be replayed at mount time.  Within a
77/// transaction, these are stored as a set which allows some mutations to be deduplicated and found
78/// (and we require custom comparison functions below).  For example, we need to be able to find
79/// object size changes.
80pub type Mutation = MutationV55;
81
82#[derive(
83    Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned,
84)]
85#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
86pub enum MutationV55 {
87    ObjectStore(ObjectStoreMutationV55),
88    EncryptedObjectStore(#[serde(with = "crate::zerocopy_serialization")] Box<[u8]>),
89    Allocator(AllocatorMutationV32),
90    // Indicates the beginning of a flush.  This would typically involve sealing a tree.
91    BeginFlush,
92    // Indicates the end of a flush.  This would typically involve replacing the immutable layers
93    // with compacted ones.
94    EndFlush,
95    // Volume has been deleted.  Requires we remove it from the set of managed ObjectStore.
96    DeleteVolume,
97    UpdateBorrowed(u64),
98    UpdateMutationsKey(UpdateMutationsKey),
99    CreateInternalDir(u64),
100}
101
102#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
103#[migrate_to_version(MutationV55)]
104pub enum MutationV54 {
105    ObjectStore(ObjectStoreMutationV54),
106    EncryptedObjectStore(#[serde(with = "crate::zerocopy_serialization")] Box<[u8]>),
107    Allocator(AllocatorMutationV32),
108    BeginFlush,
109    EndFlush,
110    DeleteVolume,
111    UpdateBorrowed(u64),
112    UpdateMutationsKey(UpdateMutationsKey),
113    CreateInternalDir(u64),
114}
115
116#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
117#[migrate_to_version(MutationV54)]
118pub enum MutationV50 {
119    ObjectStore(ObjectStoreMutationV50),
120    EncryptedObjectStore(#[serde(with = "crate::zerocopy_serialization")] Box<[u8]>),
121    Allocator(AllocatorMutationV32),
122    BeginFlush,
123    EndFlush,
124    DeleteVolume,
125    UpdateBorrowed(u64),
126    UpdateMutationsKey(UpdateMutationsKeyV49),
127    CreateInternalDir(u64),
128}
129
130#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
131#[migrate_to_version(MutationV50)]
132pub enum MutationV49 {
133    ObjectStore(ObjectStoreMutationV49),
134    EncryptedObjectStore(Box<[u8]>),
135    Allocator(AllocatorMutationV32),
136    BeginFlush,
137    EndFlush,
138    DeleteVolume,
139    UpdateBorrowed(u64),
140    UpdateMutationsKey(UpdateMutationsKeyV49),
141    CreateInternalDir(u64),
142}
143
144#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
145#[migrate_to_version(MutationV49)]
146pub enum MutationV47 {
147    ObjectStore(ObjectStoreMutationV47),
148    EncryptedObjectStore(Box<[u8]>),
149    Allocator(AllocatorMutationV32),
150    BeginFlush,
151    EndFlush,
152    DeleteVolume,
153    UpdateBorrowed(u64),
154    UpdateMutationsKey(UpdateMutationsKeyV40),
155    CreateInternalDir(u64),
156}
157
158#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
159#[migrate_to_version(MutationV47)]
160pub enum MutationV46 {
161    ObjectStore(ObjectStoreMutationV46),
162    EncryptedObjectStore(Box<[u8]>),
163    Allocator(AllocatorMutationV32),
164    BeginFlush,
165    EndFlush,
166    DeleteVolume,
167    UpdateBorrowed(u64),
168    UpdateMutationsKey(UpdateMutationsKeyV40),
169    CreateInternalDir(u64),
170}
171
172#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
173#[migrate_to_version(MutationV46)]
174pub enum MutationV43 {
175    ObjectStore(ObjectStoreMutationV43),
176    EncryptedObjectStore(Box<[u8]>),
177    Allocator(AllocatorMutationV32),
178    BeginFlush,
179    EndFlush,
180    DeleteVolume,
181    UpdateBorrowed(u64),
182    UpdateMutationsKey(UpdateMutationsKeyV40),
183    CreateInternalDir(u64),
184}
185
186#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
187#[migrate_to_version(MutationV43)]
188pub enum MutationV41 {
189    ObjectStore(ObjectStoreMutationV41),
190    EncryptedObjectStore(Box<[u8]>),
191    Allocator(AllocatorMutationV32),
192    BeginFlush,
193    EndFlush,
194    DeleteVolume,
195    UpdateBorrowed(u64),
196    UpdateMutationsKey(UpdateMutationsKeyV40),
197    CreateInternalDir(u64),
198}
199
200#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
201#[migrate_to_version(MutationV41)]
202pub enum MutationV40 {
203    ObjectStore(ObjectStoreMutationV40),
204    EncryptedObjectStore(Box<[u8]>),
205    Allocator(AllocatorMutationV32),
206    BeginFlush,
207    EndFlush,
208    DeleteVolume,
209    UpdateBorrowed(u64),
210    UpdateMutationsKey(UpdateMutationsKeyV40),
211    CreateInternalDir(u64),
212}
213
214impl Mutation {
215    pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
216        Mutation::ObjectStore(ObjectStoreMutation {
217            item: Item::new(key, value),
218            op: Operation::Insert,
219        })
220    }
221
222    pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
223        Mutation::ObjectStore(ObjectStoreMutation {
224            item: Item::new(key, value),
225            op: Operation::ReplaceOrInsert,
226        })
227    }
228
229    pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
230        Mutation::ObjectStore(ObjectStoreMutation {
231            item: Item::new(key, value),
232            op: Operation::Merge,
233        })
234    }
235
236    pub fn update_mutations_key(key: FxfsKey) -> Self {
237        Mutation::UpdateMutationsKey(key.into())
238    }
239}
240
241// We have custom comparison functions for mutations that just use the key, rather than the key and
242// value that would be used by default so that we can deduplicate and find mutations (see
243// get_object_mutation below).
244pub type ObjectStoreMutation = ObjectStoreMutationV55;
245
246#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
247#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
248pub struct ObjectStoreMutationV55 {
249    pub item: ObjectItemV55,
250    pub op: Operation,
251}
252
253#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
254#[migrate_to_version(ObjectStoreMutationV55)]
255#[migrate_nodefault]
256pub struct ObjectStoreMutationV54 {
257    pub item: crate::object_store::object_record::ObjectItemV54,
258    pub op: Operation,
259}
260
261#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
262#[migrate_to_version(ObjectStoreMutationV54)]
263#[migrate_nodefault]
264pub struct ObjectStoreMutationV50 {
265    pub item: ObjectItemV50,
266    pub op: OperationV32,
267}
268
269#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
270#[migrate_to_version(ObjectStoreMutationV50)]
271#[migrate_nodefault]
272pub struct ObjectStoreMutationV49 {
273    pub item: ObjectItemV49,
274    pub op: OperationV32,
275}
276
277#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
278#[migrate_to_version(ObjectStoreMutationV49)]
279#[migrate_nodefault]
280pub struct ObjectStoreMutationV47 {
281    pub item: ObjectItemV47,
282    pub op: OperationV32,
283}
284
285#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
286#[migrate_to_version(ObjectStoreMutationV47)]
287#[migrate_nodefault]
288pub struct ObjectStoreMutationV46 {
289    pub item: ObjectItemV46,
290    pub op: OperationV32,
291}
292
293#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
294#[migrate_to_version(ObjectStoreMutationV46)]
295#[migrate_nodefault]
296pub struct ObjectStoreMutationV43 {
297    pub item: ObjectItemV43,
298    pub op: OperationV32,
299}
300
301#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
302#[migrate_to_version(ObjectStoreMutationV43)]
303#[migrate_nodefault]
304pub struct ObjectStoreMutationV41 {
305    pub item: ObjectItemV41,
306    pub op: OperationV32,
307}
308
309#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
310#[migrate_nodefault]
311#[migrate_to_version(ObjectStoreMutationV41)]
312pub struct ObjectStoreMutationV40 {
313    pub item: ObjectItemV40,
314    pub op: OperationV32,
315}
316
317/// The different LSM tree operations that can be performed as part of a mutation.
318pub type Operation = OperationV32;
319
320#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
321#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
322pub enum OperationV32 {
323    Insert,
324    ReplaceOrInsert,
325    Merge,
326}
327
328impl Ord for ObjectStoreMutation {
329    fn cmp(&self, other: &Self) -> Ordering {
330        self.item.key.cmp(&other.item.key)
331    }
332}
333
334impl PartialOrd for ObjectStoreMutation {
335    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
336        Some(self.cmp(other))
337    }
338}
339
340impl PartialEq for ObjectStoreMutation {
341    fn eq(&self, other: &Self) -> bool {
342        self.item.key.eq(&other.item.key)
343    }
344}
345
346impl Eq for ObjectStoreMutation {}
347
348impl Ord for AllocatorItem {
349    fn cmp(&self, other: &Self) -> Ordering {
350        self.key.cmp(&other.key)
351    }
352}
353
354impl PartialOrd for AllocatorItem {
355    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
356        Some(self.cmp(other))
357    }
358}
359
360/// Same as std::ops::Range but with Ord and PartialOrd support, sorted first by start of the range,
361/// then by the end.
362pub type DeviceRange = DeviceRangeV32;
363
364#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypeFingerprint)]
365#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
366pub struct DeviceRangeV32(pub Range<u64>);
367
368impl Deref for DeviceRange {
369    type Target = Range<u64>;
370
371    fn deref(&self) -> &Self::Target {
372        &self.0
373    }
374}
375
376impl DerefMut for DeviceRange {
377    fn deref_mut(&mut self) -> &mut Self::Target {
378        &mut self.0
379    }
380}
381
382impl From<Range<u64>> for DeviceRange {
383    fn from(range: Range<u64>) -> Self {
384        Self(range)
385    }
386}
387
388impl Into<Range<u64>> for DeviceRange {
389    fn into(self) -> Range<u64> {
390        self.0
391    }
392}
393
394impl Ord for DeviceRange {
395    fn cmp(&self, other: &Self) -> Ordering {
396        self.start.cmp(&other.start).then(self.end.cmp(&other.end))
397    }
398}
399
400impl PartialOrd for DeviceRange {
401    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
402        Some(self.cmp(other))
403    }
404}
405
406pub type AllocatorMutation = AllocatorMutationV32;
407
408#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint)]
409#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
410pub enum AllocatorMutationV32 {
411    Allocate {
412        device_range: DeviceRangeV32,
413        owner_object_id: u64,
414    },
415    Deallocate {
416        device_range: DeviceRangeV32,
417        owner_object_id: u64,
418    },
419    SetLimit {
420        owner_object_id: u64,
421        bytes: u64,
422    },
423    /// Marks all extents with a given owner_object_id for deletion.
424    /// Used to free space allocated to encrypted ObjectStore where we may not have the key.
425    /// Note that the actual deletion time is undefined so this should never be used where an
426    /// ObjectStore is still in use due to a high risk of corruption. Similarly, owner_object_id
427    /// should never be reused for the same reasons.
428    MarkForDeletion(u64),
429}
430
431pub type UpdateMutationsKey = UpdateMutationsKeyV49;
432
433#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
434pub struct UpdateMutationsKeyV49(pub FxfsKeyV49);
435
436#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
437#[migrate_to_version(UpdateMutationsKeyV49)]
438pub struct UpdateMutationsKeyV40(pub FxfsKeyV40);
439
440impl From<UpdateMutationsKey> for FxfsKey {
441    fn from(outer: UpdateMutationsKey) -> Self {
442        outer.0
443    }
444}
445
446impl From<FxfsKey> for UpdateMutationsKey {
447    fn from(inner: FxfsKey) -> Self {
448        Self(inner)
449    }
450}
451
452#[cfg(fuzz)]
453impl<'a> arbitrary::Arbitrary<'a> for UpdateMutationsKey {
454    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
455        Ok(UpdateMutationsKey::from(FxfsKey::arbitrary(u).unwrap()))
456    }
457}
458
459impl Ord for UpdateMutationsKey {
460    fn cmp(&self, other: &Self) -> Ordering {
461        (self as *const UpdateMutationsKey).cmp(&(other as *const _))
462    }
463}
464
465impl PartialOrd for UpdateMutationsKey {
466    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
467        Some(self.cmp(other))
468    }
469}
470
471impl Eq for UpdateMutationsKey {}
472
473impl PartialEq for UpdateMutationsKey {
474    fn eq(&self, other: &Self) -> bool {
475        std::ptr::eq(self, other)
476    }
477}
478
479/// When creating a transaction, locks typically need to be held to prevent two or more writers
480/// trying to make conflicting mutations at the same time.  LockKeys are used for this.
481/// NOTE: Ordering is important here!  The lock manager sorts the list of locks in a transaction
482/// to acquire them in a consistent order, but there is a special case for the Flush lock.
483/// The Flush lock is taken when we flush an LSM tree (e.g. an object store), and is held for
484/// several transactions.  As such, it must come first in the lock acquisition ordering, so that
485/// other transactions using the Flush lock have the same ordering as in flushing.
486#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Copy)]
487pub enum LockKey {
488    /// Used to lock flushing an object.
489    Flush {
490        object_id: u64,
491    },
492
493    /// Used to lock changes to a particular object attribute (e.g. writes).
494    ObjectAttribute {
495        store_object_id: u64,
496        object_id: u64,
497        attribute_id: AttributeId,
498    },
499
500    /// Used to lock changes to a particular object (e.g. adding a child to a directory).
501    Object {
502        store_object_id: u64,
503        object_id: u64,
504    },
505
506    ProjectId {
507        store_object_id: u64,
508        project_id: ProjectId,
509    },
510
511    /// Used to lock any truncate operations for a file.
512    Truncate {
513        store_object_id: u64,
514        object_id: u64,
515    },
516
517    /// A lock used when getting or creating the internal directory.
518    InternalDirectory {
519        store_object_id: u64,
520    },
521
522    /// Used to lock mutations key roll.
523    MutationsKeyRoll {
524        store_object_id: u64,
525    },
526
527    /// Used to serialize pre caching of keys.  The lock ordering is different for this: it is
528    /// acquired in `Filesystem::commit_transaction` and happens *after* other keys have been
529    /// promoted to write locks, but before the commit lock.
530    PreCacheKeys {
531        store_object_id: u64,
532    },
533}
534
535impl LockKey {
536    pub const fn object_attribute(
537        store_object_id: u64,
538        object_id: u64,
539        attribute_id: AttributeId,
540    ) -> Self {
541        LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
542    }
543
544    pub const fn object(store_object_id: u64, object_id: u64) -> Self {
545        LockKey::Object { store_object_id, object_id }
546    }
547
548    pub const fn flush(object_id: u64) -> Self {
549        LockKey::Flush { object_id }
550    }
551
552    pub const fn truncate(store_object_id: u64, object_id: u64) -> Self {
553        LockKey::Truncate { store_object_id, object_id }
554    }
555
556    pub const fn mutations_key_roll(store_object_id: u64) -> Self {
557        LockKey::MutationsKeyRoll { store_object_id }
558    }
559
560    pub const fn pre_cache_keys(store_object_id: u64) -> Self {
561        LockKey::PreCacheKeys { store_object_id }
562    }
563}
564
565/// A container for holding `LockKey` objects. Can store a single `LockKey` inline.
566#[derive(Clone, Debug)]
567pub enum LockKeys {
568    None,
569    Inline(LockKey),
570    Vec(Vec<LockKey>),
571}
572
573impl LockKeys {
574    pub fn with_capacity(capacity: usize) -> Self {
575        if capacity > 1 { LockKeys::Vec(Vec::with_capacity(capacity)) } else { LockKeys::None }
576    }
577
578    pub fn push(&mut self, key: LockKey) {
579        match self {
580            Self::None => *self = LockKeys::Inline(key),
581            Self::Inline(inline) => {
582                *self = LockKeys::Vec(vec![*inline, key]);
583            }
584            Self::Vec(vec) => vec.push(key),
585        }
586    }
587
588    pub fn truncate(&mut self, len: usize) {
589        match self {
590            Self::None => {}
591            Self::Inline(_) => {
592                if len == 0 {
593                    *self = Self::None;
594                }
595            }
596            Self::Vec(vec) => vec.truncate(len),
597        }
598    }
599
600    fn len(&self) -> usize {
601        match self {
602            Self::None => 0,
603            Self::Inline(_) => 1,
604            Self::Vec(vec) => vec.len(),
605        }
606    }
607
608    fn contains(&self, key: &LockKey) -> bool {
609        match self {
610            Self::None => false,
611            Self::Inline(single) => single == key,
612            Self::Vec(vec) => vec.contains(key),
613        }
614    }
615
616    fn sort_unstable(&mut self) {
617        match self {
618            Self::Vec(vec) => vec.sort_unstable(),
619            _ => {}
620        }
621    }
622
623    fn dedup(&mut self) {
624        match self {
625            Self::Vec(vec) => vec.dedup(),
626            _ => {}
627        }
628    }
629
630    fn iter(&self) -> LockKeysIter<'_> {
631        match self {
632            LockKeys::None => LockKeysIter::None,
633            LockKeys::Inline(key) => LockKeysIter::Inline(key),
634            LockKeys::Vec(keys) => LockKeysIter::Vec(keys.iter()),
635        }
636    }
637}
638
639enum LockKeysIter<'a> {
640    None,
641    Inline(&'a LockKey),
642    Vec(std::slice::Iter<'a, LockKey>),
643}
644
645impl<'a> Iterator for LockKeysIter<'a> {
646    type Item = &'a LockKey;
647    fn next(&mut self) -> Option<Self::Item> {
648        match self {
649            Self::None => None,
650            Self::Inline(inline) => {
651                let next = *inline;
652                *self = Self::None;
653                Some(next)
654            }
655            Self::Vec(vec) => vec.next(),
656        }
657    }
658}
659
660impl Default for LockKeys {
661    fn default() -> Self {
662        LockKeys::None
663    }
664}
665
666#[macro_export]
667macro_rules! lock_keys {
668    () => {
669        $crate::object_store::transaction::LockKeys::None
670    };
671    ($lock_key:expr $(,)?) => {
672        $crate::object_store::transaction::LockKeys::Inline($lock_key)
673    };
674    ($($lock_keys:expr),+ $(,)?) => {
675        $crate::object_store::transaction::LockKeys::Vec(vec![$($lock_keys),+])
676    };
677}
678pub use lock_keys;
679
680/// Mutations in a transaction can be associated with an object so that when mutations are applied,
681/// updates can be applied to in-memory structures. For example, we cache object sizes, so when a
682/// size change is applied, we can update the cached object size.
683pub trait AssociatedObject: Send + Sync {
684    fn will_apply_mutation(&self, _mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
685    }
686}
687
688pub enum AssocObj<'a> {
689    None,
690    Borrowed(&'a dyn AssociatedObject),
691    Owned(Box<dyn AssociatedObject>),
692}
693
694impl AssocObj<'_> {
695    pub fn map<R, F: FnOnce(&dyn AssociatedObject) -> R>(&self, f: F) -> Option<R> {
696        match self {
697            AssocObj::None => None,
698            AssocObj::Borrowed(b) => Some(f(*b)),
699            AssocObj::Owned(o) => Some(f(o.as_ref())),
700        }
701    }
702}
703
704pub struct TxnMutation<'a> {
705    // This, at time of writing, is either the object ID of an object store, or the object ID of the
706    // allocator.  In the case of an object mutation, there's another object ID in the mutation
707    // record that would be for the object actually being changed.
708    pub object_id: u64,
709
710    // The actual mutation.  This gets serialized to the journal.
711    pub mutation: Mutation,
712
713    // An optional associated object for the mutation.  During replay, there will always be no
714    // associated object.
715    pub associated_object: AssocObj<'a>,
716}
717
718// We store TxnMutation in a set, and for that, we only use object_id and mutation and not the
719// associated object or checksum.
720//
721// WARNING: It is critical that `object_id` (which corresponds to the store ID for store mutations)
722// remains the primary key for sorting. The commit pipeline (`commit_transaction`) relies on
723// the mutations being sorted by `object_id` to acquire store locks in a consistent order,
724// preventing deadlocks.
725impl Ord for TxnMutation<'_> {
726    fn cmp(&self, other: &Self) -> Ordering {
727        self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
728    }
729}
730
731impl PartialOrd for TxnMutation<'_> {
732    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
733        Some(self.cmp(other))
734    }
735}
736
737impl PartialEq for TxnMutation<'_> {
738    fn eq(&self, other: &Self) -> bool {
739        self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
740    }
741}
742
743impl Eq for TxnMutation<'_> {}
744
745impl std::fmt::Debug for TxnMutation<'_> {
746    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
747        f.debug_struct("TxnMutation")
748            .field("object_id", &self.object_id)
749            .field("mutation", &self.mutation)
750            .finish()
751    }
752}
753
754pub enum MetadataReservation {
755    // The state after a transaction has been dropped.
756    None,
757
758    // Metadata space for this transaction is being borrowed from ObjectManager's metadata
759    // reservation.
760    Borrowed,
761
762    // A metadata reservation was made when the transaction was created.
763    Reservation(Reservation),
764
765    // The metadata space is being _held_ within `allocator_reservation`.
766    Hold(u64),
767}
768
769/// A transaction groups mutation records to be committed as a group.
770pub struct Transaction<'a> {
771    fs: Arc<FxFilesystem>,
772
773    // The mutations that make up this transaction.
774    mutations: BTreeSet<TxnMutation<'a>>,
775
776    // The locks that this transaction currently holds.
777    txn_locks: LockKeys,
778
779    /// If set, an allocator reservation that should be used for allocations.
780    pub allocator_reservation: Option<&'a Reservation>,
781
782    /// The reservation for the metadata for this transaction.
783    pub metadata_reservation: MetadataReservation,
784
785    // Keep track of objects explicitly created by this transaction. No locks are required for them.
786    // Addressed by (owner_object_id, object_id).
787    new_objects: BTreeSet<(u64, u64)>,
788
789    /// Any data checksums which should be evaluated when replaying this transaction.
790    checksums: Vec<(Range<u64>, Vec<Checksum>, bool)>,
791
792    /// Set if this transaction contains data (i.e. includes any extent mutations).
793    includes_write: bool,
794}
795
796impl<'a> Transaction<'a> {
797    /// Creates a new transaction.  `txn_locks` are read locks that can be upgraded to write locks
798    /// at commit time.
799    pub async fn new(
800        fs: Arc<FxFilesystem>,
801        options: Options<'a>,
802        txn_locks: LockKeys,
803    ) -> Result<Transaction<'a>, Error> {
804        fs.add_transaction(options.skip_journal_checks).await;
805        let fs_clone = fs.clone();
806        let guard = scopeguard::guard((), |_| fs_clone.sub_transaction());
807        let (metadata_reservation, allocator_reservation, hold) =
808            fs.reservation_for_transaction(options).await?;
809
810        let txn_locks = {
811            let lock_manager = fs.lock_manager();
812            let mut write_guard = lock_manager.txn_lock(txn_locks).await;
813            std::mem::take(&mut write_guard.0.lock_keys)
814        };
815        let mut transaction = Transaction {
816            fs,
817            mutations: BTreeSet::new(),
818            txn_locks,
819            allocator_reservation: None,
820            metadata_reservation,
821            new_objects: BTreeSet::new(),
822            checksums: Vec::new(),
823            includes_write: false,
824        };
825
826        ScopeGuard::into_inner(guard);
827        hold.map(|h| h.forget()); // Transaction takes ownership from here on.
828        transaction.allocator_reservation = allocator_reservation;
829        Ok(transaction)
830    }
831
832    pub fn mutations(&self) -> &BTreeSet<TxnMutation<'a>> {
833        &self.mutations
834    }
835
836    pub fn take_mutations(&mut self) -> BTreeSet<TxnMutation<'a>> {
837        self.new_objects.clear();
838        mem::take(&mut self.mutations)
839    }
840
841    /// Adds a mutation to this transaction.  If the mutation already exists, it is replaced and the
842    /// old mutation is returned.
843    pub fn add(&mut self, object_id: u64, mutation: Mutation) -> Option<Mutation> {
844        self.add_with_object(object_id, mutation, AssocObj::None)
845    }
846
847    /// Removes a mutation that matches `mutation`.
848    pub fn remove(&mut self, object_id: u64, mutation: Mutation) {
849        let txn_mutation = TxnMutation { object_id, mutation, associated_object: AssocObj::None };
850        if self.mutations.remove(&txn_mutation) {
851            if let Mutation::ObjectStore(ObjectStoreMutation {
852                item:
853                    ObjectItem {
854                        key: ObjectKey { object_id: new_object_id, data: ObjectKeyData::Object },
855                        ..
856                    },
857                op: Operation::Insert,
858            }) = txn_mutation.mutation
859            {
860                self.new_objects.remove(&(object_id, new_object_id));
861            }
862        }
863    }
864
865    /// Adds a mutation with an associated object. If the mutation already exists, it is replaced
866    /// and the old mutation is returned.
867    pub fn add_with_object(
868        &mut self,
869        object_id: u64,
870        mutation: Mutation,
871        associated_object: AssocObj<'a>,
872    ) -> Option<Mutation> {
873        assert!(object_id != INVALID_OBJECT_ID);
874        if let Mutation::ObjectStore(ObjectStoreMutation {
875            item:
876                Item {
877                    key:
878                        ObjectKey { data: ObjectKeyData::Attribute(_, AttributeKey::Extent(_)), .. },
879                    ..
880                },
881            ..
882        }) = &mutation
883        {
884            self.includes_write = true;
885        }
886        let txn_mutation = TxnMutation { object_id, mutation, associated_object };
887        self.verify_locks(&txn_mutation);
888        self.mutations.replace(txn_mutation).map(|m| m.mutation)
889    }
890
891    pub fn add_checksum(&mut self, range: Range<u64>, checksums: Vec<Checksum>, first_write: bool) {
892        self.checksums.push((range, checksums, first_write));
893    }
894
895    pub fn includes_write(&self) -> bool {
896        self.includes_write
897    }
898
899    pub fn checksums(&self) -> &[(Range<u64>, Vec<Checksum>, bool)] {
900        &self.checksums
901    }
902
903    pub fn take_checksums(&mut self) -> Vec<(Range<u64>, Vec<Checksum>, bool)> {
904        std::mem::replace(&mut self.checksums, Vec::new())
905    }
906
907    fn verify_locks(&mut self, mutation: &TxnMutation<'_>) {
908        // It was considered to change the locks from Vec to BTreeSet since we'll now be searching
909        // through it, but given the small set that these locks usually comprise, it probably isn't
910        // worth it.
911        match mutation {
912            TxnMutation {
913                mutation:
914                    Mutation::ObjectStore {
915                        0: ObjectStoreMutation { item: ObjectItem { key, .. }, op },
916                    },
917                object_id: store_object_id,
918                ..
919            } => {
920                match &key.data {
921                    ObjectKeyData::Attribute(..) => {
922                        // TODO(https://fxbug.dev/42073914): Check lock requirements.
923                    }
924                    ObjectKeyData::Child { .. }
925                    | ObjectKeyData::EncryptedChild(_)
926                    | ObjectKeyData::EncryptedCasefoldChild(_)
927                    | ObjectKeyData::CasefoldChild { .. }
928                    | ObjectKeyData::LegacyCasefoldChild(_) => {
929                        let id = key.object_id;
930                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
931                            && !self.new_objects.contains(&(*store_object_id, id))
932                        {
933                            debug_assert!(
934                                false,
935                                "Not holding required lock for object {id} \
936                                in store {store_object_id}"
937                            );
938                            error!(
939                                "Not holding required lock for object {id} in store \
940                                {store_object_id}"
941                            )
942                        }
943                    }
944                    ObjectKeyData::GraveyardEntry { .. } => {
945                        // TODO(https://fxbug.dev/42073911): Check lock requirements.
946                    }
947                    ObjectKeyData::GraveyardAttributeEntry { .. } => {
948                        // TODO(https://fxbug.dev/122974): Check lock requirements.
949                    }
950                    ObjectKeyData::Keys => {
951                        let id = key.object_id;
952                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
953                            && !self.new_objects.contains(&(*store_object_id, id))
954                        {
955                            debug_assert!(
956                                false,
957                                "Not holding required lock for object {id} \
958                                in store {store_object_id}"
959                            );
960                            error!(
961                                "Not holding required lock for object {id} in store \
962                                {store_object_id}"
963                            )
964                        }
965                    }
966                    ObjectKeyData::Object => match op {
967                        // Insert implies the caller expects no object with which to race
968                        Operation::Insert => {
969                            self.new_objects.insert((*store_object_id, key.object_id));
970                        }
971                        Operation::Merge | Operation::ReplaceOrInsert => {
972                            let id = key.object_id;
973                            if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
974                                && !self.new_objects.contains(&(*store_object_id, id))
975                            {
976                                debug_assert!(
977                                    false,
978                                    "Not holding required lock for object {id} \
979                                    in store {store_object_id}"
980                                );
981                                error!(
982                                    "Not holding required lock for object {id} in store \
983                                    {store_object_id}"
984                                )
985                            }
986                        }
987                    },
988                    ObjectKeyData::Project { project_id, property: ProjectProperty::Limit } => {
989                        if !self.txn_locks.contains(&LockKey::ProjectId {
990                            store_object_id: *store_object_id,
991                            project_id: *project_id,
992                        }) {
993                            debug_assert!(
994                                false,
995                                "Not holding required lock for project limit id {project_id} \
996                                in store {store_object_id}"
997                            );
998                            error!(
999                                "Not holding required lock for project limit id {project_id} in \
1000                                store {store_object_id}"
1001                            )
1002                        }
1003                    }
1004                    ObjectKeyData::Project { property: ProjectProperty::Usage, .. } => match op {
1005                        Operation::Insert | Operation::ReplaceOrInsert => {
1006                            panic!(
1007                                "Project usage is all handled by merging deltas, no inserts or \
1008                                replacements should be used"
1009                            );
1010                        }
1011                        // Merges are all handled like atomic +/- and serialized by the tree locks.
1012                        Operation::Merge => {}
1013                    },
1014                    ObjectKeyData::ExtendedAttribute { .. } => {
1015                        let id = key.object_id;
1016                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
1017                            && !self.new_objects.contains(&(*store_object_id, id))
1018                        {
1019                            debug_assert!(
1020                                false,
1021                                "Not holding required lock for object {id} \
1022                                in store {store_object_id} while mutating extended attribute"
1023                            );
1024                            error!(
1025                                "Not holding required lock for object {id} in store \
1026                                {store_object_id} while mutating extended attribute"
1027                            )
1028                        }
1029                    }
1030                }
1031            }
1032            TxnMutation { mutation: Mutation::DeleteVolume, object_id, .. } => {
1033                if !self.txn_locks.contains(&LockKey::flush(*object_id)) {
1034                    debug_assert!(false, "Not holding required lock for DeleteVolume");
1035                    error!("Not holding required lock for DeleteVolume");
1036                }
1037            }
1038            _ => {}
1039        }
1040    }
1041
1042    /// Returns true if this transaction has no mutations.
1043    pub fn is_empty(&self) -> bool {
1044        self.mutations.is_empty()
1045    }
1046
1047    /// Searches for an existing object mutation within the transaction that has the given key and
1048    /// returns it if found.
1049    pub fn get_object_mutation(
1050        &self,
1051        store_object_id: u64,
1052        key: ObjectKey,
1053    ) -> Option<&ObjectStoreMutation> {
1054        if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
1055            self.mutations.get(&TxnMutation {
1056                object_id: store_object_id,
1057                mutation: Mutation::insert_object(key, ObjectValue::None),
1058                associated_object: AssocObj::None,
1059            })
1060        {
1061            Some(mutation)
1062        } else {
1063            None
1064        }
1065    }
1066
1067    /// Commits a transaction.  If successful, returns the journal offset of the transaction.
1068    pub async fn commit(mut self) -> Result<u64, Error> {
1069        debug!(txn:? = &self; "Commit");
1070        self.fs.clone().commit_transaction(&mut self, |x| x).await
1071    }
1072
1073    /// Commits and then runs the callback whilst locks are held.  The callback accepts a single
1074    /// parameter which is the journal offset of the transaction.
1075    pub async fn commit_with_callback<R: Send>(
1076        mut self,
1077        f: impl FnOnce(u64) -> R + Send,
1078    ) -> Result<R, Error> {
1079        debug!(txn:? = &self; "Commit");
1080        self.fs.clone().commit_transaction(&mut self, f).await
1081    }
1082
1083    /// Commits the transaction, but allows the transaction to be used again.  The locks are not
1084    /// dropped (but transaction locks will get downgraded to read locks).
1085    pub async fn commit_and_continue(&mut self) -> Result<(), Error> {
1086        debug!(txn:? = self; "Commit");
1087        self.fs.clone().commit_transaction(self, |_| {}).await?;
1088        assert!(self.mutations.is_empty());
1089        self.fs.lock_manager().downgrade_locks(&self.txn_locks);
1090        Ok(())
1091    }
1092}
1093
1094impl Drop for Transaction<'_> {
1095    fn drop(&mut self) {
1096        // Call the filesystem implementation of drop_transaction which should, as a minimum, call
1097        // LockManager's drop_transaction to ensure the locks are released.
1098        debug!(txn:? = &self; "Drop");
1099        self.fs.clone().drop_transaction(self);
1100    }
1101}
1102
1103impl std::fmt::Debug for Transaction<'_> {
1104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1105        f.debug_struct("Transaction")
1106            .field("mutations", &self.mutations)
1107            .field("txn_locks", &self.txn_locks)
1108            .field("reservation", &self.allocator_reservation)
1109            .finish()
1110    }
1111}
1112
1113pub enum BorrowedOrOwned<'a, T> {
1114    Borrowed(&'a T),
1115    Owned(T),
1116}
1117
1118impl<T> Deref for BorrowedOrOwned<'_, T> {
1119    type Target = T;
1120
1121    fn deref(&self) -> &Self::Target {
1122        match self {
1123            BorrowedOrOwned::Borrowed(b) => b,
1124            BorrowedOrOwned::Owned(o) => &o,
1125        }
1126    }
1127}
1128
1129impl<'a, T> From<&'a T> for BorrowedOrOwned<'a, T> {
1130    fn from(value: &'a T) -> Self {
1131        BorrowedOrOwned::Borrowed(value)
1132    }
1133}
1134
1135impl<T> From<T> for BorrowedOrOwned<'_, T> {
1136    fn from(value: T) -> Self {
1137        BorrowedOrOwned::Owned(value)
1138    }
1139}
1140
1141/// LockManager holds the locks that transactions might have taken.  A TransactionManager
1142/// implementation would typically have one of these.
1143///
1144/// Three different kinds of locks are supported.  There are read locks and write locks, which are
1145/// as one would expect.  The third kind of lock is a _transaction_ lock (which is also known as an
1146/// upgradeable read lock).  When first acquired, these block other writes (including other
1147/// transaction locks) but do not block reads.  When it is time to commit a transaction, these locks
1148/// are upgraded to full write locks (without ever dropping the lock) and then dropped after
1149/// committing (unless commit_and_continue is used).  This way, reads are only blocked for the
1150/// shortest possible time.  It follows that write locks should be used sparingly.  Locks are
1151/// granted in order with one exception: when a lock is in the initial _transaction_ lock state
1152/// (LockState::Locked), all read locks are allowed even if there are other tasks waiting for the
1153/// lock.  The reason for this is because we allow read locks to be taken by tasks that have taken a
1154/// _transaction_ lock (i.e. recursion is allowed).  In other cases, such as when a writer is
1155/// waiting and there are only readers, readers will queue up behind the writer.
1156///
1157/// To summarize:
1158///
1159/// +-------------------------+-----------------+----------------+------------------+
1160/// |                         | While read_lock | While txn_lock | While write_lock |
1161/// |                         | is held         | is held        | is held          |
1162/// +-------------------------+-----------------+----------------+------------------+
1163/// | Can acquire read_lock?  | true            | true           | false            |
1164/// +-------------------------+-----------------+----------------+------------------+
1165/// | Can acquire txn_lock?   | true            | false          | false            |
1166/// +-------------------------+-----------------+----------------+------------------+
1167/// | Can acquire write_lock? | false           | false          | false            |
1168/// +-------------------------+-----------------+----------------+------------------+
1169pub struct LockManager {
1170    locks: Mutex<Locks>,
1171}
1172
1173struct Locks {
1174    keys: HashMap<LockKey, LockEntry>,
1175}
1176
1177impl Locks {
1178    fn drop_lock(&mut self, key: LockKey, state: LockState) {
1179        if let Entry::Occupied(mut occupied) = self.keys.entry(key) {
1180            let entry = occupied.get_mut();
1181            let wake = match state {
1182                LockState::ReadLock => {
1183                    entry.read_count -= 1;
1184                    entry.read_count == 0
1185                }
1186                // drop_write_locks currently depends on us treating Locked and WriteLock the same.
1187                LockState::Locked | LockState::WriteLock => {
1188                    entry.state = LockState::ReadLock;
1189                    true
1190                }
1191            };
1192            if wake {
1193                // SAFETY: The lock in `LockManager::locks` is held.
1194                unsafe {
1195                    entry.wake();
1196                }
1197                if entry.can_remove() {
1198                    occupied.remove_entry();
1199                }
1200            }
1201        } else {
1202            unreachable!();
1203        }
1204    }
1205
1206    fn drop_read_locks(&mut self, lock_keys: LockKeys) {
1207        for lock in lock_keys.iter() {
1208            self.drop_lock(*lock, LockState::ReadLock);
1209        }
1210    }
1211
1212    fn drop_write_locks(&mut self, lock_keys: LockKeys) {
1213        for lock in lock_keys.iter() {
1214            // This is a bit hacky, but this works for locks in either the Locked or WriteLock
1215            // states.
1216            self.drop_lock(*lock, LockState::WriteLock);
1217        }
1218    }
1219
1220    // Downgrades locks from WriteLock to Locked.
1221    fn downgrade_locks(&mut self, lock_keys: &LockKeys) {
1222        for lock in lock_keys.iter() {
1223            // SAFETY: The lock in `LockManager::locks` is held.
1224            unsafe {
1225                self.keys.get_mut(lock).unwrap().downgrade_lock();
1226            }
1227        }
1228    }
1229}
1230
1231#[derive(Debug)]
1232struct LockEntry {
1233    // In the states that allow readers (ReadLock, Locked), this count can be non-zero
1234    // to indicate the number of active readers.
1235    read_count: u64,
1236
1237    // The state of the lock (see below).
1238    state: LockState,
1239
1240    // A doubly-linked list of wakers that should be woken when they have been granted the lock.
1241    // New wakers are usually chained on to tail, with the exception being the case where a lock in
1242    // state Locked is to be upgraded to WriteLock, but can't because there are readers.  It might
1243    // be possible to use intrusive-collections in the future.
1244    head: *const LockWaker,
1245    tail: *const LockWaker,
1246}
1247
1248unsafe impl Send for LockEntry {}
1249
1250// Represents a node in the waker list.  It is only safe to access the members wrapped by UnsafeCell
1251// when LockManager's `locks` member is locked.
1252struct LockWaker {
1253    // The next and previous pointers in the doubly-linked list.
1254    next: UnsafeCell<*const LockWaker>,
1255    prev: UnsafeCell<*const LockWaker>,
1256
1257    // Holds the lock key for this waker.  This is required so that we can find the associated
1258    // `LockEntry`.
1259    key: LockKey,
1260
1261    // The underlying waker that should be used to wake the task.
1262    waker: UnsafeCell<WakerState>,
1263
1264    // The target state for this waker.
1265    target_state: LockState,
1266
1267    // True if this is an upgrade.
1268    is_upgrade: bool,
1269
1270    // We need to be pinned because these form part of the linked list.
1271    _pin: PhantomPinned,
1272}
1273
1274enum WakerState {
1275    // This is the initial state before the waker has been first polled.
1276    Pending,
1277
1278    // Once polled, this contains the actual waker.
1279    Registered(Waker),
1280
1281    // The waker has been woken and has been granted the lock.
1282    Woken,
1283}
1284
1285impl WakerState {
1286    fn is_woken(&self) -> bool {
1287        matches!(self, WakerState::Woken)
1288    }
1289}
1290
1291unsafe impl Send for LockWaker {}
1292unsafe impl Sync for LockWaker {}
1293
1294impl LockWaker {
1295    // Waits for the waker to be woken.
1296    async fn wait(&self, manager: &LockManager) {
1297        // We must guard against the future being dropped.
1298        let waker_guard = scopeguard::guard((), |_| {
1299            let mut locks = manager.locks.lock();
1300            // SAFETY: We've acquired the lock.
1301            unsafe {
1302                if (*self.waker.get()).is_woken() {
1303                    // We were woken, but didn't actually run, so we must drop the lock.
1304                    if self.is_upgrade {
1305                        locks.keys.get_mut(&self.key).unwrap().downgrade_lock();
1306                    } else {
1307                        locks.drop_lock(self.key, self.target_state);
1308                    }
1309                } else {
1310                    // We haven't been woken but we've been dropped so we must remove ourself from
1311                    // the waker list.
1312                    locks.keys.get_mut(&self.key).unwrap().remove_waker(self);
1313                }
1314            }
1315        });
1316
1317        poll_fn(|cx| {
1318            let _locks = manager.locks.lock();
1319            // SAFETY: We've acquired the lock.
1320            unsafe {
1321                if (*self.waker.get()).is_woken() {
1322                    Poll::Ready(())
1323                } else {
1324                    *self.waker.get() = WakerState::Registered(cx.waker().clone());
1325                    Poll::Pending
1326                }
1327            }
1328        })
1329        .await;
1330
1331        ScopeGuard::into_inner(waker_guard);
1332    }
1333}
1334
1335#[derive(Copy, Clone, Debug, PartialEq)]
1336enum LockState {
1337    // In this state, there are only readers.
1338    ReadLock,
1339
1340    // This state is used for transactions to lock other writers (including other transactions), but
1341    // it still allows readers.
1342    Locked,
1343
1344    // A writer has exclusive access; all other readers and writers are blocked.
1345    WriteLock,
1346}
1347
1348impl LockManager {
1349    pub fn new() -> Self {
1350        LockManager { locks: Mutex::new(Locks { keys: HashMap::default() }) }
1351    }
1352
1353    /// Acquires the locks.  It is the caller's responsibility to ensure that drop_transaction is
1354    /// called when a transaction is dropped i.e. the filesystem's drop_transaction method should
1355    /// call LockManager's drop_transaction method.
1356    pub async fn txn_lock<'a>(&'a self, lock_keys: LockKeys) -> TransactionLocks<'a> {
1357        TransactionLocks(
1358            debug_assert_not_too_long!(self.lock(lock_keys, LockState::Locked)).right().unwrap(),
1359        )
1360    }
1361
1362    // `state` indicates the kind of lock required.  ReadLock means acquire a read lock.  Locked
1363    // means lock other writers, but still allow readers.  WriteLock means acquire a write lock.
1364    async fn lock<'a>(
1365        &'a self,
1366        mut lock_keys: LockKeys,
1367        target_state: LockState,
1368    ) -> Either<ReadGuard<'a>, WriteGuard<'a>> {
1369        let mut guard = match &target_state {
1370            LockState::ReadLock => Left(ReadGuard {
1371                manager: self.into(),
1372                lock_keys: LockKeys::with_capacity(lock_keys.len()),
1373            }),
1374            LockState::Locked | LockState::WriteLock => Right(WriteGuard {
1375                manager: self.into(),
1376                lock_keys: LockKeys::with_capacity(lock_keys.len()),
1377            }),
1378        };
1379        let guard_keys = match &mut guard {
1380            Left(g) => &mut g.lock_keys,
1381            Right(g) => &mut g.lock_keys,
1382        };
1383        lock_keys.sort_unstable();
1384        lock_keys.dedup();
1385        for lock in lock_keys.iter() {
1386            let lock_waker = None;
1387            pin_mut!(lock_waker);
1388            {
1389                let mut locks = self.locks.lock();
1390                match locks.keys.entry(*lock) {
1391                    Entry::Vacant(vacant) => {
1392                        vacant.insert(LockEntry {
1393                            read_count: if let LockState::ReadLock = target_state {
1394                                guard_keys.push(*lock);
1395                                1
1396                            } else {
1397                                guard_keys.push(*lock);
1398                                0
1399                            },
1400                            state: target_state,
1401                            head: std::ptr::null(),
1402                            tail: std::ptr::null(),
1403                        });
1404                    }
1405                    Entry::Occupied(mut occupied) => {
1406                        let entry = occupied.get_mut();
1407                        // SAFETY: We've acquired the lock.
1408                        if unsafe { entry.is_allowed(target_state, entry.head.is_null()) } {
1409                            if let LockState::ReadLock = target_state {
1410                                entry.read_count += 1;
1411                                guard_keys.push(*lock);
1412                            } else {
1413                                entry.state = target_state;
1414                                guard_keys.push(*lock);
1415                            }
1416                        } else {
1417                            // Initialise a waker and push it on the tail of the list.
1418                            // SAFETY: `lock_waker` isn't used prior to this point.
1419                            unsafe {
1420                                *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1421                                    next: UnsafeCell::new(std::ptr::null()),
1422                                    prev: UnsafeCell::new(entry.tail),
1423                                    key: *lock,
1424                                    waker: UnsafeCell::new(WakerState::Pending),
1425                                    target_state: target_state,
1426                                    is_upgrade: false,
1427                                    _pin: PhantomPinned,
1428                                });
1429                            }
1430                            let waker = (*lock_waker).as_ref().unwrap();
1431                            if entry.tail.is_null() {
1432                                entry.head = waker;
1433                            } else {
1434                                // SAFETY: We've acquired the lock.
1435                                unsafe {
1436                                    *(*entry.tail).next.get() = waker;
1437                                }
1438                            }
1439                            entry.tail = waker;
1440                        }
1441                    }
1442                }
1443            }
1444            if let Some(waker) = &*lock_waker {
1445                waker.wait(self).await;
1446                guard_keys.push(*lock);
1447            }
1448        }
1449        guard
1450    }
1451
1452    /// This should be called by the filesystem's drop_transaction implementation.
1453    pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
1454        let mut locks = self.locks.lock();
1455        locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks));
1456    }
1457
1458    /// Prepares to commit by waiting for readers to finish.
1459    pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
1460        self.commit_prepare_keys(&transaction.txn_locks).await;
1461    }
1462
1463    async fn commit_prepare_keys(&self, lock_keys: &LockKeys) {
1464        for lock in lock_keys.iter() {
1465            let lock_waker = None;
1466            pin_mut!(lock_waker);
1467            {
1468                let mut locks = self.locks.lock();
1469                let entry = locks.keys.get_mut(lock).unwrap();
1470                assert_eq!(entry.state, LockState::Locked);
1471
1472                if entry.read_count == 0 {
1473                    entry.state = LockState::WriteLock;
1474                } else {
1475                    // Initialise a waker and push it on the head of the list.
1476                    // SAFETY: `lock_waker` isn't used prior to this point.
1477                    unsafe {
1478                        *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1479                            next: UnsafeCell::new(entry.head),
1480                            prev: UnsafeCell::new(std::ptr::null()),
1481                            key: *lock,
1482                            waker: UnsafeCell::new(WakerState::Pending),
1483                            target_state: LockState::WriteLock,
1484                            is_upgrade: true,
1485                            _pin: PhantomPinned,
1486                        });
1487                    }
1488                    let waker = (*lock_waker).as_ref().unwrap();
1489                    if entry.head.is_null() {
1490                        entry.tail = (*lock_waker).as_ref().unwrap();
1491                    } else {
1492                        // SAFETY: We've acquired the lock.
1493                        unsafe {
1494                            *(*entry.head).prev.get() = waker;
1495                        }
1496                    }
1497                    entry.head = waker;
1498                }
1499            }
1500
1501            if let Some(waker) = &*lock_waker {
1502                waker.wait(self).await;
1503            }
1504        }
1505    }
1506
1507    /// Acquires a read lock for the given keys.  Read locks are only blocked whilst a transaction
1508    /// is being committed for the same locks.  They are only necessary where consistency is
1509    /// required between different mutations within a transaction.  For example, a write might
1510    /// change the size and extents for an object, in which case a read lock is required so that
1511    /// observed size and extents are seen together or not at all.
1512    pub async fn read_lock<'a>(&'a self, lock_keys: LockKeys) -> ReadGuard<'a> {
1513        debug_assert_not_too_long!(self.lock(lock_keys, LockState::ReadLock)).left().unwrap()
1514    }
1515
1516    /// Acquires a write lock for the given keys.  Write locks provide exclusive access to the
1517    /// requested lock keys.
1518    pub async fn write_lock<'a>(&'a self, lock_keys: LockKeys) -> WriteGuard<'a> {
1519        debug_assert_not_too_long!(self.lock(lock_keys, LockState::WriteLock)).right().unwrap()
1520    }
1521
1522    /// Downgrades locks from the WriteLock state to Locked state.  This will panic if the locks are
1523    /// not in the WriteLock state.
1524    pub fn downgrade_locks(&self, lock_keys: &LockKeys) {
1525        self.locks.lock().downgrade_locks(lock_keys);
1526    }
1527}
1528
1529// These unsafe functions require that `locks` in LockManager is locked.
1530impl LockEntry {
1531    unsafe fn wake(&mut self) {
1532        // If the lock's state is WriteLock, or there's nothing waiting, return early.
1533        if self.head.is_null() || self.state == LockState::WriteLock {
1534            return;
1535        }
1536
1537        let waker = unsafe { &*self.head };
1538
1539        if waker.is_upgrade {
1540            if self.read_count > 0 {
1541                return;
1542            }
1543        } else if !unsafe { self.is_allowed(waker.target_state, true) } {
1544            return;
1545        }
1546
1547        unsafe { self.pop_and_wake() };
1548
1549        // If the waker was a write lock, we can't wake any more up, but otherwise, we can keep
1550        // waking up readers.
1551        if waker.target_state == LockState::WriteLock {
1552            return;
1553        }
1554
1555        while !self.head.is_null() && unsafe { (*self.head).target_state } == LockState::ReadLock {
1556            unsafe { self.pop_and_wake() };
1557        }
1558    }
1559
1560    unsafe fn pop_and_wake(&mut self) {
1561        let waker = unsafe { &*self.head };
1562
1563        // Pop the waker.
1564        self.head = unsafe { *waker.next.get() };
1565        if self.head.is_null() {
1566            self.tail = std::ptr::null()
1567        } else {
1568            unsafe { *(*self.head).prev.get() = std::ptr::null() };
1569        }
1570
1571        // Adjust our state accordingly.
1572        if waker.target_state == LockState::ReadLock {
1573            self.read_count += 1;
1574        } else {
1575            self.state = waker.target_state;
1576        }
1577
1578        // Now wake the task.
1579        if let WakerState::Registered(waker) =
1580            std::mem::replace(unsafe { &mut *waker.waker.get() }, WakerState::Woken)
1581        {
1582            waker.wake();
1583        }
1584    }
1585
1586    fn can_remove(&self) -> bool {
1587        self.state == LockState::ReadLock && self.read_count == 0
1588    }
1589
1590    unsafe fn remove_waker(&mut self, waker: &LockWaker) {
1591        unsafe {
1592            let is_first = (*waker.prev.get()).is_null();
1593            if is_first {
1594                self.head = *waker.next.get();
1595            } else {
1596                *(**waker.prev.get()).next.get() = *waker.next.get();
1597            }
1598            if (*waker.next.get()).is_null() {
1599                self.tail = *waker.prev.get();
1600            } else {
1601                *(**waker.next.get()).prev.get() = *waker.prev.get();
1602            }
1603            if is_first {
1604                // We must call wake in case we erased a pending write lock and readers can now
1605                // proceed.
1606                self.wake();
1607            }
1608        }
1609    }
1610
1611    // Returns whether or not a lock with given `target_state` can proceed.  `is_head` should be
1612    // true if this is something at the head of the waker list (or the waker list is empty) and
1613    // false if there are other items on the waker list that are prior.
1614    unsafe fn is_allowed(&self, target_state: LockState, is_head: bool) -> bool {
1615        match self.state {
1616            LockState::ReadLock => {
1617                // Allow ReadLock and Locked so long as nothing else is waiting.
1618                (self.read_count == 0
1619                    || target_state == LockState::Locked
1620                    || target_state == LockState::ReadLock)
1621                    && is_head
1622            }
1623            LockState::Locked => {
1624                // Always allow reads unless there's an upgrade waiting.  We have to
1625                // always allow reads in this state because tasks that have locks in
1626                // the Locked state can later try and acquire ReadLock.
1627                target_state == LockState::ReadLock
1628                    && (is_head || unsafe { !(*self.head).is_upgrade })
1629            }
1630            LockState::WriteLock => false,
1631        }
1632    }
1633
1634    unsafe fn downgrade_lock(&mut self) {
1635        assert_eq!(std::mem::replace(&mut self.state, LockState::Locked), LockState::WriteLock);
1636        unsafe { self.wake() };
1637    }
1638}
1639
1640#[must_use]
1641pub struct ReadGuard<'a> {
1642    manager: LockManagerRef<'a>,
1643    lock_keys: LockKeys,
1644}
1645
1646impl ReadGuard<'_> {
1647    pub fn fs(&self) -> Option<&Arc<FxFilesystem>> {
1648        if let LockManagerRef::Owned(fs) = &self.manager { Some(fs) } else { None }
1649    }
1650
1651    pub fn into_owned(mut self, fs: Arc<FxFilesystem>) -> ReadGuard<'static> {
1652        ReadGuard {
1653            manager: LockManagerRef::Owned(fs),
1654            lock_keys: std::mem::replace(&mut self.lock_keys, LockKeys::None),
1655        }
1656    }
1657}
1658
1659impl Drop for ReadGuard<'_> {
1660    fn drop(&mut self) {
1661        let mut locks = self.manager.locks.lock();
1662        locks.drop_read_locks(std::mem::take(&mut self.lock_keys));
1663    }
1664}
1665
1666impl fmt::Debug for ReadGuard<'_> {
1667    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1668        f.debug_struct("ReadGuard")
1669            .field("manager", &(&self.manager as *const _))
1670            .field("lock_keys", &self.lock_keys)
1671            .finish()
1672    }
1673}
1674
1675#[must_use]
1676pub struct WriteGuard<'a> {
1677    manager: LockManagerRef<'a>,
1678    lock_keys: LockKeys,
1679}
1680
1681impl Drop for WriteGuard<'_> {
1682    fn drop(&mut self) {
1683        let mut locks = self.manager.locks.lock();
1684        locks.drop_write_locks(std::mem::take(&mut self.lock_keys));
1685    }
1686}
1687
1688impl fmt::Debug for WriteGuard<'_> {
1689    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1690        f.debug_struct("WriteGuard")
1691            .field("manager", &(&self.manager as *const _))
1692            .field("lock_keys", &self.lock_keys)
1693            .finish()
1694    }
1695}
1696
1697enum LockManagerRef<'a> {
1698    Borrowed(&'a LockManager),
1699    Owned(Arc<FxFilesystem>),
1700}
1701
1702impl Deref for LockManagerRef<'_> {
1703    type Target = LockManager;
1704
1705    fn deref(&self) -> &Self::Target {
1706        match self {
1707            LockManagerRef::Borrowed(m) => m,
1708            LockManagerRef::Owned(f) => f.lock_manager(),
1709        }
1710    }
1711}
1712
1713impl<'a> From<&'a LockManager> for LockManagerRef<'a> {
1714    fn from(value: &'a LockManager) -> Self {
1715        LockManagerRef::Borrowed(value)
1716    }
1717}
1718
1719#[cfg(test)]
1720mod tests {
1721    use super::{AttributeId, LockKey, LockKeys, LockManager, LockState, Mutation, Options};
1722    use crate::filesystem::FxFilesystem;
1723    use fuchsia_async as fasync;
1724    use fuchsia_sync::Mutex;
1725    use futures::channel::oneshot::channel;
1726    use futures::future::FutureExt;
1727    use futures::stream::FuturesUnordered;
1728    use futures::{StreamExt, join, pin_mut};
1729    use std::task::Poll;
1730    use std::time::Duration;
1731    use storage_device::DeviceHolder;
1732    use storage_device::fake_device::FakeDevice;
1733
1734    #[fuchsia::test]
1735    async fn test_simple() {
1736        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1737        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1738        let mut t = fs
1739            .root_store()
1740            .new_transaction(lock_keys![], Options::default())
1741            .await
1742            .expect("new_transaction failed");
1743        t.add(1, Mutation::BeginFlush);
1744        assert!(!t.is_empty());
1745    }
1746
1747    #[fuchsia::test]
1748    async fn test_locks() {
1749        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1750        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1751        let (send1, recv1) = channel();
1752        let (send2, recv2) = channel();
1753        let (send3, recv3) = channel();
1754        let done = Mutex::new(false);
1755        let mut futures = FuturesUnordered::new();
1756        futures.push(
1757            async {
1758                let _t = fs
1759                    .root_store()
1760                    .new_transaction(
1761                        lock_keys![LockKey::object_attribute(1, 2, AttributeId::TEST_ID)],
1762                        Options::default(),
1763                    )
1764                    .await
1765                    .expect("new_transaction failed");
1766                send1.send(()).unwrap(); // Tell the next future to continue.
1767                send3.send(()).unwrap(); // Tell the last future to continue.
1768                recv2.await.unwrap();
1769                // This is a halting problem so all we can do is sleep.
1770                fasync::Timer::new(Duration::from_millis(100)).await;
1771                assert!(!*done.lock());
1772            }
1773            .boxed(),
1774        );
1775        futures.push(
1776            async {
1777                recv1.await.unwrap();
1778                // This should not block since it is a different key.
1779                let _t = fs
1780                    .root_store()
1781                    .new_transaction(
1782                        lock_keys![LockKey::object_attribute(2, 2, AttributeId::TEST_ID)],
1783                        Options::default(),
1784                    )
1785                    .await
1786                    .expect("new_transaction failed");
1787                // Tell the first future to continue.
1788                send2.send(()).unwrap();
1789            }
1790            .boxed(),
1791        );
1792        futures.push(
1793            async {
1794                // This should block until the first future has completed.
1795                recv3.await.unwrap();
1796                let _t = fs
1797                    .root_store()
1798                    .new_transaction(
1799                        lock_keys![LockKey::object_attribute(1, 2, AttributeId::TEST_ID)],
1800                        Options::default(),
1801                    )
1802                    .await;
1803                *done.lock() = true;
1804            }
1805            .boxed(),
1806        );
1807        while let Some(()) = futures.next().await {}
1808    }
1809
1810    #[fuchsia::test]
1811    async fn test_read_lock_after_write_lock() {
1812        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1813        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1814        let (send1, recv1) = channel();
1815        let (send2, recv2) = channel();
1816        let done = Mutex::new(false);
1817        join!(
1818            async {
1819                let t = fs
1820                    .root_store()
1821                    .new_transaction(
1822                        lock_keys![LockKey::object_attribute(1, 2, AttributeId::TEST_ID)],
1823                        Options::default(),
1824                    )
1825                    .await
1826                    .expect("new_transaction failed");
1827                send1.send(()).unwrap(); // Tell the next future to continue.
1828                recv2.await.unwrap();
1829                t.commit().await.expect("commit failed");
1830                *done.lock() = true;
1831            },
1832            async {
1833                recv1.await.unwrap();
1834                // Reads should not be blocked until the transaction is committed.
1835                let _guard = fs
1836                    .lock_manager()
1837                    .read_lock(lock_keys![LockKey::object_attribute(1, 2, AttributeId::TEST_ID)])
1838                    .await;
1839                // Tell the first future to continue.
1840                send2.send(()).unwrap();
1841                // It shouldn't proceed until we release our read lock, but it's a halting
1842                // problem, so sleep.
1843                fasync::Timer::new(Duration::from_millis(100)).await;
1844                assert!(!*done.lock());
1845            },
1846        );
1847    }
1848
1849    #[fuchsia::test]
1850    async fn test_write_lock_after_read_lock() {
1851        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1852        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1853        let (send1, recv1) = channel();
1854        let (send2, recv2) = channel();
1855        let done = Mutex::new(false);
1856        join!(
1857            async {
1858                // Reads should not be blocked until the transaction is committed.
1859                let _guard = fs
1860                    .lock_manager()
1861                    .read_lock(lock_keys![LockKey::object_attribute(1, 2, AttributeId::TEST_ID)])
1862                    .await;
1863                // Tell the next future to continue and then wait.
1864                send1.send(()).unwrap();
1865                recv2.await.unwrap();
1866                // It shouldn't proceed until we release our read lock, but it's a halting
1867                // problem, so sleep.
1868                fasync::Timer::new(Duration::from_millis(100)).await;
1869                assert!(!*done.lock());
1870            },
1871            async {
1872                recv1.await.unwrap();
1873                let t = fs
1874                    .root_store()
1875                    .new_transaction(
1876                        lock_keys![LockKey::object_attribute(1, 2, AttributeId::TEST_ID)],
1877                        Options::default(),
1878                    )
1879                    .await
1880                    .expect("new_transaction failed");
1881                send2.send(()).unwrap(); // Tell the first future to continue;
1882                t.commit().await.expect("commit failed");
1883                *done.lock() = true;
1884            },
1885        );
1886    }
1887
1888    #[fuchsia::test]
1889    async fn test_drop_uncommitted_transaction() {
1890        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1891        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1892        let key = lock_keys![LockKey::object(1, 1)];
1893
1894        // Dropping while there's a reader.
1895        {
1896            let _write_lock = fs
1897                .root_store()
1898                .new_transaction(key.clone(), Options::default())
1899                .await
1900                .expect("new_transaction failed");
1901            let _read_lock = fs.lock_manager().read_lock(key.clone()).await;
1902        }
1903        // Dropping while there's no reader.
1904        {
1905            let _write_lock = fs
1906                .root_store()
1907                .new_transaction(key.clone(), Options::default())
1908                .await
1909                .expect("new_transaction failed");
1910        }
1911        // Make sure we can take the lock again (i.e. it was actually released).
1912        fs.root_store()
1913            .new_transaction(key.clone(), Options::default())
1914            .await
1915            .expect("new_transaction failed");
1916    }
1917
1918    #[fuchsia::test]
1919    async fn test_drop_waiting_write_lock() {
1920        let manager = LockManager::new();
1921        let keys = lock_keys![LockKey::object(1, 1)];
1922        {
1923            let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1924            if let Poll::Ready(_) =
1925                futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1926            {
1927                assert!(false);
1928            }
1929        }
1930        let _ = manager.lock(keys, LockState::WriteLock).await;
1931    }
1932
1933    #[fuchsia::test]
1934    async fn test_write_lock_blocks_everything() {
1935        let manager = LockManager::new();
1936        let keys = lock_keys![LockKey::object(1, 1)];
1937        {
1938            let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1939            if let Poll::Ready(_) =
1940                futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1941            {
1942                assert!(false);
1943            }
1944            if let Poll::Ready(_) =
1945                futures::poll!(manager.lock(keys.clone(), LockState::ReadLock).boxed())
1946            {
1947                assert!(false);
1948            }
1949        }
1950        {
1951            let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1952        }
1953        {
1954            let _guard = manager.lock(keys, LockState::ReadLock).await;
1955        }
1956    }
1957
1958    #[fuchsia::test]
1959    async fn test_downgrade_locks() {
1960        let manager = LockManager::new();
1961        let keys = lock_keys![LockKey::object(1, 1)];
1962        let _guard = manager.txn_lock(keys.clone()).await;
1963        manager.commit_prepare_keys(&keys).await;
1964
1965        // Use FuturesUnordered so that we can check that the waker is woken.
1966        let mut read_lock: FuturesUnordered<_> =
1967            std::iter::once(manager.read_lock(keys.clone())).collect();
1968
1969        // Trying to acquire a read lock now should be blocked.
1970        assert!(futures::poll!(read_lock.next()).is_pending());
1971
1972        manager.downgrade_locks(&keys);
1973
1974        // After downgrading, it should be possible to take a read lock.
1975        assert!(futures::poll!(read_lock.next()).is_ready());
1976    }
1977
1978    #[fuchsia::test]
1979    async fn test_dropped_write_lock_wakes() {
1980        let manager = LockManager::new();
1981        let keys = lock_keys![LockKey::object(1, 1)];
1982        let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1983        let mut read_lock = FuturesUnordered::new();
1984        read_lock.push(manager.lock(keys.clone(), LockState::ReadLock));
1985
1986        {
1987            let write_lock = manager.lock(keys, LockState::WriteLock);
1988            pin_mut!(write_lock);
1989
1990            // The write lock should be blocked because of the read lock.
1991            assert!(futures::poll!(write_lock).is_pending());
1992
1993            // Another read lock should be blocked because of the write lock.
1994            assert!(futures::poll!(read_lock.next()).is_pending());
1995        }
1996
1997        // Dropping the write lock should allow the read lock to proceed.
1998        assert!(futures::poll!(read_lock.next()).is_ready());
1999    }
2000
2001    #[fuchsia::test]
2002    async fn test_drop_upgrade() {
2003        let manager = LockManager::new();
2004        let keys = lock_keys![LockKey::object(1, 1)];
2005        let _guard = manager.lock(keys.clone(), LockState::Locked).await;
2006
2007        {
2008            let commit_prepare = manager.commit_prepare_keys(&keys);
2009            pin_mut!(commit_prepare);
2010            let _read_guard = manager.lock(keys.clone(), LockState::ReadLock).await;
2011            assert!(futures::poll!(commit_prepare).is_pending());
2012
2013            // Now we test dropping read_guard which should wake commit_prepare and
2014            // then dropping commit_prepare.
2015        }
2016
2017        // We should be able to still commit_prepare.
2018        manager.commit_prepare_keys(&keys).await;
2019    }
2020
2021    #[fasync::run_singlethreaded(test)]
2022    async fn test_woken_upgrade_blocks_reads() {
2023        let manager = LockManager::new();
2024        let keys = lock_keys![LockKey::object(1, 1)];
2025        // Start with a transaction lock.
2026        let guard = manager.lock(keys.clone(), LockState::Locked).await;
2027
2028        // Take a read lock.
2029        let read1 = manager.lock(keys.clone(), LockState::ReadLock).await;
2030
2031        // Try and upgrade the transaction lock, which should not be possible because of the read.
2032        let commit_prepare = manager.commit_prepare_keys(&keys);
2033        pin_mut!(commit_prepare);
2034        assert!(futures::poll!(commit_prepare.as_mut()).is_pending());
2035
2036        // Taking another read should also be blocked.
2037        let read2 = manager.lock(keys.clone(), LockState::ReadLock);
2038        pin_mut!(read2);
2039        assert!(futures::poll!(read2.as_mut()).is_pending());
2040
2041        // Drop the first read and the upgrade should complete.
2042        std::mem::drop(read1);
2043        assert!(futures::poll!(commit_prepare).is_ready());
2044
2045        // But the second read should still be blocked.
2046        assert!(futures::poll!(read2.as_mut()).is_pending());
2047
2048        // If we drop the write lock now, the read should be unblocked.
2049        std::mem::drop(guard);
2050        assert!(futures::poll!(read2).is_ready());
2051    }
2052
2053    static LOCK_KEY_1: LockKey = LockKey::flush(1);
2054    static LOCK_KEY_2: LockKey = LockKey::flush(2);
2055    static LOCK_KEY_3: LockKey = LockKey::flush(3);
2056
2057    // The keys, storage method, and capacity must all match.
2058    fn assert_lock_keys_equal(value: &LockKeys, expected: &LockKeys) {
2059        match (value, expected) {
2060            (LockKeys::None, LockKeys::None) => {}
2061            (LockKeys::Inline(key1), LockKeys::Inline(key2)) => {
2062                if key1 != key2 {
2063                    panic!("{key1:?} != {key2:?}");
2064                }
2065            }
2066            (LockKeys::Vec(vec1), LockKeys::Vec(vec2)) => {
2067                if vec1 != vec2 {
2068                    panic!("{vec1:?} != {vec2:?}");
2069                }
2070                if vec1.capacity() != vec2.capacity() {
2071                    panic!(
2072                        "LockKeys have different capacity: {} != {}",
2073                        vec1.capacity(),
2074                        vec2.capacity()
2075                    );
2076                }
2077            }
2078            (_, _) => panic!("{value:?} != {expected:?}"),
2079        }
2080    }
2081
2082    // Only the keys must match. Storage method and capacity don't matter.
2083    fn assert_lock_keys_equivalent(value: &LockKeys, expected: &LockKeys) {
2084        let value: Vec<_> = value.iter().collect();
2085        let expected: Vec<_> = expected.iter().collect();
2086        assert_eq!(value, expected);
2087    }
2088
2089    #[test]
2090    fn test_lock_keys_macro() {
2091        assert_lock_keys_equal(&lock_keys![], &LockKeys::None);
2092        assert_lock_keys_equal(&lock_keys![LOCK_KEY_1], &LockKeys::Inline(LOCK_KEY_1));
2093        assert_lock_keys_equal(
2094            &lock_keys![LOCK_KEY_1, LOCK_KEY_2],
2095            &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]),
2096        );
2097    }
2098
2099    #[test]
2100    fn test_lock_keys_with_capacity() {
2101        assert_lock_keys_equal(&LockKeys::with_capacity(0), &LockKeys::None);
2102        assert_lock_keys_equal(&LockKeys::with_capacity(1), &LockKeys::None);
2103        assert_lock_keys_equal(&LockKeys::with_capacity(2), &LockKeys::Vec(Vec::with_capacity(2)));
2104    }
2105
2106    #[test]
2107    fn test_lock_keys_len() {
2108        assert_eq!(lock_keys![].len(), 0);
2109        assert_eq!(lock_keys![LOCK_KEY_1].len(), 1);
2110        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].len(), 2);
2111    }
2112
2113    #[test]
2114    fn test_lock_keys_contains() {
2115        assert_eq!(lock_keys![].contains(&LOCK_KEY_1), false);
2116        assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_1), true);
2117        assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_2), false);
2118        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_1), true);
2119        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_2), true);
2120        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_3), false);
2121    }
2122
2123    #[test]
2124    fn test_lock_keys_push() {
2125        let mut keys = lock_keys![];
2126        keys.push(LOCK_KEY_1);
2127        assert_lock_keys_equal(&keys, &LockKeys::Inline(LOCK_KEY_1));
2128        keys.push(LOCK_KEY_2);
2129        assert_lock_keys_equal(&keys, &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]));
2130        keys.push(LOCK_KEY_3);
2131        assert_lock_keys_equivalent(
2132            &keys,
2133            &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2, LOCK_KEY_3]),
2134        );
2135    }
2136
2137    #[test]
2138    fn test_lock_keys_sort_unstable() {
2139        let mut keys = lock_keys![];
2140        keys.sort_unstable();
2141        assert_lock_keys_equal(&keys, &lock_keys![]);
2142
2143        let mut keys = lock_keys![LOCK_KEY_1];
2144        keys.sort_unstable();
2145        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2146
2147        let mut keys = lock_keys![LOCK_KEY_2, LOCK_KEY_1];
2148        keys.sort_unstable();
2149        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2150    }
2151
2152    #[test]
2153    fn test_lock_keys_dedup() {
2154        let mut keys = lock_keys![];
2155        keys.dedup();
2156        assert_lock_keys_equal(&keys, &lock_keys![]);
2157
2158        let mut keys = lock_keys![LOCK_KEY_1];
2159        keys.dedup();
2160        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2161
2162        let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_1];
2163        keys.dedup();
2164        assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2165    }
2166
2167    #[test]
2168    fn test_lock_keys_truncate() {
2169        let mut keys = lock_keys![];
2170        keys.truncate(5);
2171        assert_lock_keys_equal(&keys, &lock_keys![]);
2172        keys.truncate(0);
2173        assert_lock_keys_equal(&keys, &lock_keys![]);
2174
2175        let mut keys = lock_keys![LOCK_KEY_1];
2176        keys.truncate(5);
2177        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2178        keys.truncate(0);
2179        assert_lock_keys_equal(&keys, &lock_keys![]);
2180
2181        let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_2];
2182        keys.truncate(5);
2183        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2184        keys.truncate(1);
2185        // Although there's only 1 key after truncate the key is not stored inline.
2186        assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2187    }
2188
2189    #[test]
2190    fn test_lock_keys_iter() {
2191        assert_eq!(lock_keys![].iter().collect::<Vec<_>>(), Vec::<&LockKey>::new());
2192
2193        assert_eq!(lock_keys![LOCK_KEY_1].iter().collect::<Vec<_>>(), vec![&LOCK_KEY_1]);
2194
2195        assert_eq!(
2196            lock_keys![LOCK_KEY_1, LOCK_KEY_2].iter().collect::<Vec<_>>(),
2197            vec![&LOCK_KEY_1, &LOCK_KEY_2]
2198        );
2199    }
2200}