fxfs/object_store/
transaction.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::Checksum;
6use crate::filesystem::{FxFilesystem, TxnGuard};
7use crate::log::*;
8use crate::lsm_tree::types::Item;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::AttributeKey;
11use crate::object_store::allocator::{AllocatorItem, Reservation};
12use crate::object_store::object_manager::{ObjectManager, reserved_space_from_journal_usage};
13use crate::object_store::object_record::{
14    FxfsKey, FxfsKeyV40, FxfsKeyV49, ObjectItem, ObjectItemV40, ObjectItemV41, ObjectItemV43,
15    ObjectItemV46, ObjectItemV47, ObjectItemV49, ObjectItemV50, ObjectKey, ObjectKeyData,
16    ObjectValue, ProjectProperty,
17};
18use crate::serialized_types::{Migrate, Versioned, migrate_nodefault, migrate_to_version};
19use anyhow::Error;
20use either::{Either, Left, Right};
21use fprint::TypeFingerprint;
22use fuchsia_sync::Mutex;
23use futures::future::poll_fn;
24use futures::pin_mut;
25use rustc_hash::FxHashMap as HashMap;
26use scopeguard::ScopeGuard;
27use serde::{Deserialize, Serialize};
28use std::cell::UnsafeCell;
29use std::cmp::Ordering;
30use std::collections::BTreeSet;
31use std::collections::hash_map::Entry;
32use std::marker::PhantomPinned;
33use std::ops::{Deref, DerefMut, Range};
34use std::sync::Arc;
35use std::task::{Poll, Waker};
36use std::{fmt, mem};
37
38/// This allows for special handling of certain transactions such as deletes and the
39/// extension of Journal extents. For most other use cases it is appropriate to use
40/// `default()` here.
41#[derive(Clone, Copy, Default)]
42pub struct Options<'a> {
43    /// If true, don't check for low journal space.  This should be true for any transactions that
44    /// might alleviate journal space (i.e. compaction).
45    pub skip_journal_checks: bool,
46
47    /// If true, borrow metadata space from the metadata reservation.  This setting should be set to
48    /// true for any transaction that will either not affect space usage after compaction
49    /// (e.g. setting attributes), or reduce space usage (e.g. unlinking).  Otherwise, a transaction
50    /// might fail with an out-of-space error.
51    pub borrow_metadata_space: bool,
52
53    /// If specified, a reservation to be used with the transaction.  If not set, any allocations
54    /// that are part of this transaction will have to take their chances, and will fail if there is
55    /// no free space.  The intention is that this should be used for things like the journal which
56    /// require guaranteed space.
57    pub allocator_reservation: Option<&'a Reservation>,
58
59    /// An existing transaction guard to be used.
60    pub txn_guard: Option<&'a TxnGuard<'a>>,
61}
62
63// This is the amount of space that we reserve for metadata when we are creating a new transaction.
64// A transaction should not take more than this.  This is expressed in terms of space occupied in
65// the journal; transactions must not take up more space in the journal than the number below.  The
66// amount chosen here must be large enough for the maximum possible transaction that can be created,
67// so transactions always need to be bounded which might involve splitting an operation up into
68// smaller transactions.
69pub const TRANSACTION_MAX_JOURNAL_USAGE: u64 = 24_576;
70pub const TRANSACTION_METADATA_MAX_AMOUNT: u64 =
71    reserved_space_from_journal_usage(TRANSACTION_MAX_JOURNAL_USAGE);
72
73#[must_use]
74pub struct TransactionLocks<'a>(pub WriteGuard<'a>);
75
76/// The journal consists of these records which will be replayed at mount time.  Within a
77/// transaction, these are stored as a set which allows some mutations to be deduplicated and found
78/// (and we require custom comparison functions below).  For example, we need to be able to find
79/// object size changes.
80pub type Mutation = MutationV50;
81
82#[derive(
83    Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned,
84)]
85#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
86pub enum MutationV50 {
87    ObjectStore(ObjectStoreMutationV50),
88    EncryptedObjectStore(Box<[u8]>),
89    Allocator(AllocatorMutationV32),
90    // Indicates the beginning of a flush.  This would typically involve sealing a tree.
91    BeginFlush,
92    // Indicates the end of a flush.  This would typically involve replacing the immutable layers
93    // with compacted ones.
94    EndFlush,
95    // Volume has been deleted.  Requires we remove it from the set of managed ObjectStore.
96    DeleteVolume,
97    UpdateBorrowed(u64),
98    UpdateMutationsKey(UpdateMutationsKeyV49),
99    CreateInternalDir(u64),
100}
101
102#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
103#[migrate_to_version(MutationV50)]
104pub enum MutationV49 {
105    ObjectStore(ObjectStoreMutationV49),
106    EncryptedObjectStore(Box<[u8]>),
107    Allocator(AllocatorMutationV32),
108    BeginFlush,
109    EndFlush,
110    DeleteVolume,
111    UpdateBorrowed(u64),
112    UpdateMutationsKey(UpdateMutationsKeyV49),
113    CreateInternalDir(u64),
114}
115
116#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
117#[migrate_to_version(MutationV49)]
118pub enum MutationV47 {
119    ObjectStore(ObjectStoreMutationV47),
120    EncryptedObjectStore(Box<[u8]>),
121    Allocator(AllocatorMutationV32),
122    BeginFlush,
123    EndFlush,
124    DeleteVolume,
125    UpdateBorrowed(u64),
126    UpdateMutationsKey(UpdateMutationsKeyV40),
127    CreateInternalDir(u64),
128}
129
130#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
131#[migrate_to_version(MutationV47)]
132pub enum MutationV46 {
133    ObjectStore(ObjectStoreMutationV46),
134    EncryptedObjectStore(Box<[u8]>),
135    Allocator(AllocatorMutationV32),
136    BeginFlush,
137    EndFlush,
138    DeleteVolume,
139    UpdateBorrowed(u64),
140    UpdateMutationsKey(UpdateMutationsKeyV40),
141    CreateInternalDir(u64),
142}
143
144#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
145#[migrate_to_version(MutationV46)]
146pub enum MutationV43 {
147    ObjectStore(ObjectStoreMutationV43),
148    EncryptedObjectStore(Box<[u8]>),
149    Allocator(AllocatorMutationV32),
150    BeginFlush,
151    EndFlush,
152    DeleteVolume,
153    UpdateBorrowed(u64),
154    UpdateMutationsKey(UpdateMutationsKeyV40),
155    CreateInternalDir(u64),
156}
157
158#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
159#[migrate_to_version(MutationV43)]
160pub enum MutationV41 {
161    ObjectStore(ObjectStoreMutationV41),
162    EncryptedObjectStore(Box<[u8]>),
163    Allocator(AllocatorMutationV32),
164    BeginFlush,
165    EndFlush,
166    DeleteVolume,
167    UpdateBorrowed(u64),
168    UpdateMutationsKey(UpdateMutationsKeyV40),
169    CreateInternalDir(u64),
170}
171
172#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
173#[migrate_to_version(MutationV41)]
174pub enum MutationV40 {
175    ObjectStore(ObjectStoreMutationV40),
176    EncryptedObjectStore(Box<[u8]>),
177    Allocator(AllocatorMutationV32),
178    BeginFlush,
179    EndFlush,
180    DeleteVolume,
181    UpdateBorrowed(u64),
182    UpdateMutationsKey(UpdateMutationsKeyV40),
183    CreateInternalDir(u64),
184}
185
186impl Mutation {
187    pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
188        Mutation::ObjectStore(ObjectStoreMutation {
189            item: Item::new(key, value),
190            op: Operation::Insert,
191        })
192    }
193
194    pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
195        Mutation::ObjectStore(ObjectStoreMutation {
196            item: Item::new(key, value),
197            op: Operation::ReplaceOrInsert,
198        })
199    }
200
201    pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
202        Mutation::ObjectStore(ObjectStoreMutation {
203            item: Item::new(key, value),
204            op: Operation::Merge,
205        })
206    }
207
208    pub fn update_mutations_key(key: FxfsKey) -> Self {
209        Mutation::UpdateMutationsKey(key.into())
210    }
211}
212
213// We have custom comparison functions for mutations that just use the key, rather than the key and
214// value that would be used by default so that we can deduplicate and find mutations (see
215// get_object_mutation below).
216pub type ObjectStoreMutation = ObjectStoreMutationV50;
217
218#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
219#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
220pub struct ObjectStoreMutationV50 {
221    pub item: ObjectItemV50,
222    pub op: OperationV32,
223}
224
225#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
226#[migrate_to_version(ObjectStoreMutationV50)]
227#[migrate_nodefault]
228pub struct ObjectStoreMutationV49 {
229    pub item: ObjectItemV49,
230    pub op: OperationV32,
231}
232
233#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
234#[migrate_to_version(ObjectStoreMutationV49)]
235#[migrate_nodefault]
236pub struct ObjectStoreMutationV47 {
237    pub item: ObjectItemV47,
238    pub op: OperationV32,
239}
240
241#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
242#[migrate_to_version(ObjectStoreMutationV47)]
243#[migrate_nodefault]
244pub struct ObjectStoreMutationV46 {
245    pub item: ObjectItemV46,
246    pub op: OperationV32,
247}
248
249#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
250#[migrate_to_version(ObjectStoreMutationV46)]
251#[migrate_nodefault]
252pub struct ObjectStoreMutationV43 {
253    pub item: ObjectItemV43,
254    pub op: OperationV32,
255}
256
257#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
258#[migrate_to_version(ObjectStoreMutationV43)]
259#[migrate_nodefault]
260pub struct ObjectStoreMutationV41 {
261    pub item: ObjectItemV41,
262    pub op: OperationV32,
263}
264
265#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
266#[migrate_nodefault]
267#[migrate_to_version(ObjectStoreMutationV41)]
268pub struct ObjectStoreMutationV40 {
269    pub item: ObjectItemV40,
270    pub op: OperationV32,
271}
272
273/// The different LSM tree operations that can be performed as part of a mutation.
274pub type Operation = OperationV32;
275
276#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
277#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
278pub enum OperationV32 {
279    Insert,
280    ReplaceOrInsert,
281    Merge,
282}
283
284impl Ord for ObjectStoreMutation {
285    fn cmp(&self, other: &Self) -> Ordering {
286        self.item.key.cmp(&other.item.key)
287    }
288}
289
290impl PartialOrd for ObjectStoreMutation {
291    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
292        Some(self.cmp(other))
293    }
294}
295
296impl PartialEq for ObjectStoreMutation {
297    fn eq(&self, other: &Self) -> bool {
298        self.item.key.eq(&other.item.key)
299    }
300}
301
302impl Eq for ObjectStoreMutation {}
303
304impl Ord for AllocatorItem {
305    fn cmp(&self, other: &Self) -> Ordering {
306        self.key.cmp(&other.key)
307    }
308}
309
310impl PartialOrd for AllocatorItem {
311    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
312        Some(self.cmp(other))
313    }
314}
315
316/// Same as std::ops::Range but with Ord and PartialOrd support, sorted first by start of the range,
317/// then by the end.
318pub type DeviceRange = DeviceRangeV32;
319
320#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypeFingerprint)]
321#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
322pub struct DeviceRangeV32(pub Range<u64>);
323
324impl Deref for DeviceRange {
325    type Target = Range<u64>;
326
327    fn deref(&self) -> &Self::Target {
328        &self.0
329    }
330}
331
332impl DerefMut for DeviceRange {
333    fn deref_mut(&mut self) -> &mut Self::Target {
334        &mut self.0
335    }
336}
337
338impl From<Range<u64>> for DeviceRange {
339    fn from(range: Range<u64>) -> Self {
340        Self(range)
341    }
342}
343
344impl Into<Range<u64>> for DeviceRange {
345    fn into(self) -> Range<u64> {
346        self.0
347    }
348}
349
350impl Ord for DeviceRange {
351    fn cmp(&self, other: &Self) -> Ordering {
352        self.start.cmp(&other.start).then(self.end.cmp(&other.end))
353    }
354}
355
356impl PartialOrd for DeviceRange {
357    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
358        Some(self.cmp(other))
359    }
360}
361
362pub type AllocatorMutation = AllocatorMutationV32;
363
364#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint)]
365#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
366pub enum AllocatorMutationV32 {
367    Allocate {
368        device_range: DeviceRangeV32,
369        owner_object_id: u64,
370    },
371    Deallocate {
372        device_range: DeviceRangeV32,
373        owner_object_id: u64,
374    },
375    SetLimit {
376        owner_object_id: u64,
377        bytes: u64,
378    },
379    /// Marks all extents with a given owner_object_id for deletion.
380    /// Used to free space allocated to encrypted ObjectStore where we may not have the key.
381    /// Note that the actual deletion time is undefined so this should never be used where an
382    /// ObjectStore is still in use due to a high risk of corruption. Similarly, owner_object_id
383    /// should never be reused for the same reasons.
384    MarkForDeletion(u64),
385}
386
387pub type UpdateMutationsKey = UpdateMutationsKeyV49;
388
389#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
390pub struct UpdateMutationsKeyV49(pub FxfsKeyV49);
391
392#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
393#[migrate_to_version(UpdateMutationsKeyV49)]
394pub struct UpdateMutationsKeyV40(pub FxfsKeyV40);
395
396impl From<UpdateMutationsKey> for FxfsKey {
397    fn from(outer: UpdateMutationsKey) -> Self {
398        outer.0
399    }
400}
401
402impl From<FxfsKey> for UpdateMutationsKey {
403    fn from(inner: FxfsKey) -> Self {
404        Self(inner)
405    }
406}
407
408#[cfg(fuzz)]
409impl<'a> arbitrary::Arbitrary<'a> for UpdateMutationsKey {
410    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
411        Ok(UpdateMutationsKey::from(FxfsKey::arbitrary(u).unwrap()))
412    }
413}
414
415impl Ord for UpdateMutationsKey {
416    fn cmp(&self, other: &Self) -> Ordering {
417        (self as *const UpdateMutationsKey).cmp(&(other as *const _))
418    }
419}
420
421impl PartialOrd for UpdateMutationsKey {
422    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
423        Some(self.cmp(other))
424    }
425}
426
427impl Eq for UpdateMutationsKey {}
428
429impl PartialEq for UpdateMutationsKey {
430    fn eq(&self, other: &Self) -> bool {
431        std::ptr::eq(self, other)
432    }
433}
434
435/// When creating a transaction, locks typically need to be held to prevent two or more writers
436/// trying to make conflicting mutations at the same time.  LockKeys are used for this.
437/// NOTE: Ordering is important here!  The lock manager sorts the list of locks in a transaction
438/// to acquire them in a consistent order, but there are special cases for the Filesystem lock and
439/// the Flush lock.
440/// The Filesystem lock is taken by every transaction and is done so first, as part of the TxnGuard.
441/// The Flush lock is taken when we flush an LSM tree (e.g. an object store), and is held for
442/// several transactions.  As such, it must come first in the lock acquisition ordering, so that
443/// other transactions using the Flush lock have the same ordering as in flushing.
444#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Copy)]
445pub enum LockKey {
446    /// Locks the entire filesystem.
447    Filesystem,
448
449    /// Used to lock flushing an object.
450    Flush {
451        object_id: u64,
452    },
453
454    /// Used to lock changes to a particular object attribute (e.g. writes).
455    ObjectAttribute {
456        store_object_id: u64,
457        object_id: u64,
458        attribute_id: u64,
459    },
460
461    /// Used to lock changes to a particular object (e.g. adding a child to a directory).
462    Object {
463        store_object_id: u64,
464        object_id: u64,
465    },
466
467    ProjectId {
468        store_object_id: u64,
469        project_id: u64,
470    },
471
472    /// Used to lock any truncate operations for a file.
473    Truncate {
474        store_object_id: u64,
475        object_id: u64,
476    },
477
478    /// A lock used when getting or creating the internal directory.
479    InternalDirectory {
480        store_object_id: u64,
481    },
482}
483
484impl LockKey {
485    pub const fn object_attribute(store_object_id: u64, object_id: u64, attribute_id: u64) -> Self {
486        LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
487    }
488
489    pub const fn object(store_object_id: u64, object_id: u64) -> Self {
490        LockKey::Object { store_object_id, object_id }
491    }
492
493    pub const fn flush(object_id: u64) -> Self {
494        LockKey::Flush { object_id }
495    }
496
497    pub const fn truncate(store_object_id: u64, object_id: u64) -> Self {
498        LockKey::Truncate { store_object_id, object_id }
499    }
500}
501
502/// A container for holding `LockKey` objects. Can store a single `LockKey` inline.
503#[derive(Clone, Debug)]
504pub enum LockKeys {
505    None,
506    Inline(LockKey),
507    Vec(Vec<LockKey>),
508}
509
510impl LockKeys {
511    pub fn with_capacity(capacity: usize) -> Self {
512        if capacity > 1 { LockKeys::Vec(Vec::with_capacity(capacity)) } else { LockKeys::None }
513    }
514
515    pub fn push(&mut self, key: LockKey) {
516        match self {
517            Self::None => *self = LockKeys::Inline(key),
518            Self::Inline(inline) => {
519                *self = LockKeys::Vec(vec![*inline, key]);
520            }
521            Self::Vec(vec) => vec.push(key),
522        }
523    }
524
525    pub fn truncate(&mut self, len: usize) {
526        match self {
527            Self::None => {}
528            Self::Inline(_) => {
529                if len == 0 {
530                    *self = Self::None;
531                }
532            }
533            Self::Vec(vec) => vec.truncate(len),
534        }
535    }
536
537    fn len(&self) -> usize {
538        match self {
539            Self::None => 0,
540            Self::Inline(_) => 1,
541            Self::Vec(vec) => vec.len(),
542        }
543    }
544
545    fn contains(&self, key: &LockKey) -> bool {
546        match self {
547            Self::None => false,
548            Self::Inline(single) => single == key,
549            Self::Vec(vec) => vec.contains(key),
550        }
551    }
552
553    fn sort_unstable(&mut self) {
554        match self {
555            Self::Vec(vec) => vec.sort_unstable(),
556            _ => {}
557        }
558    }
559
560    fn dedup(&mut self) {
561        match self {
562            Self::Vec(vec) => vec.dedup(),
563            _ => {}
564        }
565    }
566
567    fn iter(&self) -> LockKeysIter<'_> {
568        match self {
569            LockKeys::None => LockKeysIter::None,
570            LockKeys::Inline(key) => LockKeysIter::Inline(key),
571            LockKeys::Vec(keys) => LockKeysIter::Vec(keys.into_iter()),
572        }
573    }
574}
575
576enum LockKeysIter<'a> {
577    None,
578    Inline(&'a LockKey),
579    Vec(<&'a Vec<LockKey> as IntoIterator>::IntoIter),
580}
581
582impl<'a> Iterator for LockKeysIter<'a> {
583    type Item = &'a LockKey;
584    fn next(&mut self) -> Option<Self::Item> {
585        match self {
586            Self::None => None,
587            Self::Inline(inline) => {
588                let next = *inline;
589                *self = Self::None;
590                Some(next)
591            }
592            Self::Vec(vec) => vec.next(),
593        }
594    }
595}
596
597impl Default for LockKeys {
598    fn default() -> Self {
599        LockKeys::None
600    }
601}
602
603#[macro_export]
604macro_rules! lock_keys {
605    () => {
606        $crate::object_store::transaction::LockKeys::None
607    };
608    ($lock_key:expr $(,)?) => {
609        $crate::object_store::transaction::LockKeys::Inline($lock_key)
610    };
611    ($($lock_keys:expr),+ $(,)?) => {
612        $crate::object_store::transaction::LockKeys::Vec(vec![$($lock_keys),+])
613    };
614}
615pub use lock_keys;
616
617/// Mutations in a transaction can be associated with an object so that when mutations are applied,
618/// updates can be applied to in-memory structures. For example, we cache object sizes, so when a
619/// size change is applied, we can update the cached object size.
620pub trait AssociatedObject: Send + Sync {
621    fn will_apply_mutation(&self, _mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
622    }
623}
624
625pub enum AssocObj<'a> {
626    None,
627    Borrowed(&'a dyn AssociatedObject),
628    Owned(Box<dyn AssociatedObject>),
629}
630
631impl AssocObj<'_> {
632    pub fn map<R, F: FnOnce(&dyn AssociatedObject) -> R>(&self, f: F) -> Option<R> {
633        match self {
634            AssocObj::None => None,
635            AssocObj::Borrowed(b) => Some(f(*b)),
636            AssocObj::Owned(o) => Some(f(o.as_ref())),
637        }
638    }
639}
640
641pub struct TxnMutation<'a> {
642    // This, at time of writing, is either the object ID of an object store, or the object ID of the
643    // allocator.  In the case of an object mutation, there's another object ID in the mutation
644    // record that would be for the object actually being changed.
645    pub object_id: u64,
646
647    // The actual mutation.  This gets serialized to the journal.
648    pub mutation: Mutation,
649
650    // An optional associated object for the mutation.  During replay, there will always be no
651    // associated object.
652    pub associated_object: AssocObj<'a>,
653}
654
655// We store TxnMutation in a set, and for that, we only use object_id and mutation and not the
656// associated object or checksum.
657impl Ord for TxnMutation<'_> {
658    fn cmp(&self, other: &Self) -> Ordering {
659        self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
660    }
661}
662
663impl PartialOrd for TxnMutation<'_> {
664    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
665        Some(self.cmp(other))
666    }
667}
668
669impl PartialEq for TxnMutation<'_> {
670    fn eq(&self, other: &Self) -> bool {
671        self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
672    }
673}
674
675impl Eq for TxnMutation<'_> {}
676
677impl std::fmt::Debug for TxnMutation<'_> {
678    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
679        f.debug_struct("TxnMutation")
680            .field("object_id", &self.object_id)
681            .field("mutation", &self.mutation)
682            .finish()
683    }
684}
685
686pub enum MetadataReservation {
687    // The state after a transaction has been dropped.
688    None,
689
690    // Metadata space for this transaction is being borrowed from ObjectManager's metadata
691    // reservation.
692    Borrowed,
693
694    // A metadata reservation was made when the transaction was created.
695    Reservation(Reservation),
696
697    // The metadata space is being _held_ within `allocator_reservation`.
698    Hold(u64),
699}
700
701/// A transaction groups mutation records to be committed as a group.
702pub struct Transaction<'a> {
703    txn_guard: TxnGuard<'a>,
704
705    // The mutations that make up this transaction.
706    mutations: BTreeSet<TxnMutation<'a>>,
707
708    // The locks that this transaction currently holds.
709    txn_locks: LockKeys,
710
711    /// If set, an allocator reservation that should be used for allocations.
712    pub allocator_reservation: Option<&'a Reservation>,
713
714    /// The reservation for the metadata for this transaction.
715    pub metadata_reservation: MetadataReservation,
716
717    // Keep track of objects explicitly created by this transaction. No locks are required for them.
718    // Addressed by (owner_object_id, object_id).
719    new_objects: BTreeSet<(u64, u64)>,
720
721    /// Any data checksums which should be evaluated when replaying this transaction.
722    checksums: Vec<(Range<u64>, Vec<Checksum>, bool)>,
723
724    /// Set if this transaction contains data (i.e. includes any extent mutations).
725    includes_write: bool,
726}
727
728impl<'a> Transaction<'a> {
729    /// Creates a new transaction.  `txn_locks` are read locks that can be upgraded to write locks
730    /// at commit time.
731    pub async fn new(
732        txn_guard: TxnGuard<'a>,
733        options: Options<'a>,
734        txn_locks: LockKeys,
735    ) -> Result<Transaction<'a>, Error> {
736        txn_guard.fs().add_transaction(options.skip_journal_checks).await;
737        let fs = txn_guard.fs().clone();
738        let guard = scopeguard::guard((), |_| fs.sub_transaction());
739        let (metadata_reservation, allocator_reservation, hold) =
740            txn_guard.fs().reservation_for_transaction(options).await?;
741
742        let txn_locks = {
743            let lock_manager = txn_guard.fs().lock_manager();
744            let mut write_guard = lock_manager.txn_lock(txn_locks).await;
745            std::mem::take(&mut write_guard.0.lock_keys)
746        };
747        let mut transaction = Transaction {
748            txn_guard,
749            mutations: BTreeSet::new(),
750            txn_locks,
751            allocator_reservation: None,
752            metadata_reservation,
753            new_objects: BTreeSet::new(),
754            checksums: Vec::new(),
755            includes_write: false,
756        };
757
758        ScopeGuard::into_inner(guard);
759        hold.map(|h| h.forget()); // Transaction takes ownership from here on.
760        transaction.allocator_reservation = allocator_reservation;
761        Ok(transaction)
762    }
763
764    pub fn txn_guard(&self) -> &TxnGuard<'_> {
765        &self.txn_guard
766    }
767
768    pub fn mutations(&self) -> &BTreeSet<TxnMutation<'a>> {
769        &self.mutations
770    }
771
772    pub fn take_mutations(&mut self) -> BTreeSet<TxnMutation<'a>> {
773        self.new_objects.clear();
774        mem::take(&mut self.mutations)
775    }
776
777    /// Adds a mutation to this transaction.  If the mutation already exists, it is replaced and the
778    /// old mutation is returned.
779    pub fn add(&mut self, object_id: u64, mutation: Mutation) -> Option<Mutation> {
780        self.add_with_object(object_id, mutation, AssocObj::None)
781    }
782
783    /// Removes a mutation that matches `mutation`.
784    pub fn remove(&mut self, object_id: u64, mutation: Mutation) {
785        let txn_mutation = TxnMutation { object_id, mutation, associated_object: AssocObj::None };
786        if self.mutations.remove(&txn_mutation) {
787            if let Mutation::ObjectStore(ObjectStoreMutation {
788                item:
789                    ObjectItem {
790                        key: ObjectKey { object_id: new_object_id, data: ObjectKeyData::Object },
791                        ..
792                    },
793                op: Operation::Insert,
794            }) = txn_mutation.mutation
795            {
796                self.new_objects.remove(&(object_id, new_object_id));
797            }
798        }
799    }
800
801    /// Adds a mutation with an associated object. If the mutation already exists, it is replaced
802    /// and the old mutation is returned.
803    pub fn add_with_object(
804        &mut self,
805        object_id: u64,
806        mutation: Mutation,
807        associated_object: AssocObj<'a>,
808    ) -> Option<Mutation> {
809        assert!(object_id != INVALID_OBJECT_ID);
810        if let Mutation::ObjectStore(ObjectStoreMutation {
811            item:
812                Item {
813                    key:
814                        ObjectKey { data: ObjectKeyData::Attribute(_, AttributeKey::Extent(_)), .. },
815                    ..
816                },
817            ..
818        }) = &mutation
819        {
820            self.includes_write = true;
821        }
822        let txn_mutation = TxnMutation { object_id, mutation, associated_object };
823        self.verify_locks(&txn_mutation);
824        self.mutations.replace(txn_mutation).map(|m| m.mutation)
825    }
826
827    pub fn add_checksum(&mut self, range: Range<u64>, checksums: Vec<Checksum>, first_write: bool) {
828        self.checksums.push((range, checksums, first_write));
829    }
830
831    pub fn includes_write(&self) -> bool {
832        self.includes_write
833    }
834
835    pub fn checksums(&self) -> &[(Range<u64>, Vec<Checksum>, bool)] {
836        &self.checksums
837    }
838
839    pub fn take_checksums(&mut self) -> Vec<(Range<u64>, Vec<Checksum>, bool)> {
840        std::mem::replace(&mut self.checksums, Vec::new())
841    }
842
843    fn verify_locks(&mut self, mutation: &TxnMutation<'_>) {
844        // It was considered to change the locks from Vec to BTreeSet since we'll now be searching
845        // through it, but given the small set that these locks usually comprise, it probably isn't
846        // worth it.
847        match mutation {
848            TxnMutation {
849                mutation:
850                    Mutation::ObjectStore {
851                        0: ObjectStoreMutation { item: ObjectItem { key, .. }, op },
852                    },
853                object_id: store_object_id,
854                ..
855            } => {
856                match &key.data {
857                    ObjectKeyData::Attribute(..) => {
858                        // TODO(https://fxbug.dev/42073914): Check lock requirements.
859                    }
860                    ObjectKeyData::Child { .. }
861                    | ObjectKeyData::EncryptedChild(_)
862                    | ObjectKeyData::EncryptedCasefoldChild(_)
863                    | ObjectKeyData::CasefoldChild { .. } => {
864                        let id = key.object_id;
865                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
866                            && !self.new_objects.contains(&(*store_object_id, id))
867                        {
868                            debug_assert!(
869                                false,
870                                "Not holding required lock for object {id} \
871                                in store {store_object_id}"
872                            );
873                            error!(
874                                "Not holding required lock for object {id} in store \
875                                {store_object_id}"
876                            )
877                        }
878                    }
879                    ObjectKeyData::GraveyardEntry { .. } => {
880                        // TODO(https://fxbug.dev/42073911): Check lock requirements.
881                    }
882                    ObjectKeyData::GraveyardAttributeEntry { .. } => {
883                        // TODO(https://fxbug.dev/122974): Check lock requirements.
884                    }
885                    ObjectKeyData::Keys => {
886                        let id = key.object_id;
887                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
888                            && !self.new_objects.contains(&(*store_object_id, id))
889                        {
890                            debug_assert!(
891                                false,
892                                "Not holding required lock for object {id} \
893                                in store {store_object_id}"
894                            );
895                            error!(
896                                "Not holding required lock for object {id} in store \
897                                {store_object_id}"
898                            )
899                        }
900                    }
901                    ObjectKeyData::Object => match op {
902                        // Insert implies the caller expects no object with which to race
903                        Operation::Insert => {
904                            self.new_objects.insert((*store_object_id, key.object_id));
905                        }
906                        Operation::Merge | Operation::ReplaceOrInsert => {
907                            let id = key.object_id;
908                            if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
909                                && !self.new_objects.contains(&(*store_object_id, id))
910                            {
911                                debug_assert!(
912                                    false,
913                                    "Not holding required lock for object {id} \
914                                    in store {store_object_id}"
915                                );
916                                error!(
917                                    "Not holding required lock for object {id} in store \
918                                    {store_object_id}"
919                                )
920                            }
921                        }
922                    },
923                    ObjectKeyData::Project { project_id, property: ProjectProperty::Limit } => {
924                        if !self.txn_locks.contains(&LockKey::ProjectId {
925                            store_object_id: *store_object_id,
926                            project_id: *project_id,
927                        }) {
928                            debug_assert!(
929                                false,
930                                "Not holding required lock for project limit id {project_id} \
931                                in store {store_object_id}"
932                            );
933                            error!(
934                                "Not holding required lock for project limit id {project_id} in \
935                                store {store_object_id}"
936                            )
937                        }
938                    }
939                    ObjectKeyData::Project { property: ProjectProperty::Usage, .. } => match op {
940                        Operation::Insert | Operation::ReplaceOrInsert => {
941                            panic!(
942                                "Project usage is all handled by merging deltas, no inserts or \
943                                replacements should be used"
944                            );
945                        }
946                        // Merges are all handled like atomic +/- and serialized by the tree locks.
947                        Operation::Merge => {}
948                    },
949                    ObjectKeyData::ExtendedAttribute { .. } => {
950                        let id = key.object_id;
951                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
952                            && !self.new_objects.contains(&(*store_object_id, id))
953                        {
954                            debug_assert!(
955                                false,
956                                "Not holding required lock for object {id} \
957                                in store {store_object_id} while mutating extended attribute"
958                            );
959                            error!(
960                                "Not holding required lock for object {id} in store \
961                                {store_object_id} while mutating extended attribute"
962                            )
963                        }
964                    }
965                }
966            }
967            TxnMutation { mutation: Mutation::DeleteVolume, object_id, .. } => {
968                if !self.txn_locks.contains(&LockKey::flush(*object_id)) {
969                    debug_assert!(false, "Not holding required lock for DeleteVolume");
970                    error!("Not holding required lock for DeleteVolume");
971                }
972            }
973            _ => {}
974        }
975    }
976
977    /// Returns true if this transaction has no mutations.
978    pub fn is_empty(&self) -> bool {
979        self.mutations.is_empty()
980    }
981
982    /// Searches for an existing object mutation within the transaction that has the given key and
983    /// returns it if found.
984    pub fn get_object_mutation(
985        &self,
986        store_object_id: u64,
987        key: ObjectKey,
988    ) -> Option<&ObjectStoreMutation> {
989        if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
990            self.mutations.get(&TxnMutation {
991                object_id: store_object_id,
992                mutation: Mutation::insert_object(key, ObjectValue::None),
993                associated_object: AssocObj::None,
994            })
995        {
996            Some(mutation)
997        } else {
998            None
999        }
1000    }
1001
1002    /// Commits a transaction.  If successful, returns the journal offset of the transaction.
1003    pub async fn commit(mut self) -> Result<u64, Error> {
1004        debug!(txn:? = &self; "Commit");
1005        self.txn_guard.fs().clone().commit_transaction(&mut self, &mut |_| {}).await
1006    }
1007
1008    /// Commits and then runs the callback whilst locks are held.  The callback accepts a single
1009    /// parameter which is the journal offset of the transaction.
1010    pub async fn commit_with_callback<R: Send>(
1011        mut self,
1012        f: impl FnOnce(u64) -> R + Send,
1013    ) -> Result<R, Error> {
1014        debug!(txn:? = &self; "Commit");
1015        // It's not possible to pass an FnOnce via a trait without boxing it, but we don't want to
1016        // do that (for performance reasons), hence the reason for the following.
1017        let mut f = Some(f);
1018        let mut result = None;
1019        self.txn_guard
1020            .fs()
1021            .clone()
1022            .commit_transaction(&mut self, &mut |offset| {
1023                result = Some(f.take().unwrap()(offset));
1024            })
1025            .await?;
1026        Ok(result.unwrap())
1027    }
1028
1029    /// Commits the transaction, but allows the transaction to be used again.  The locks are not
1030    /// dropped (but transaction locks will get downgraded to read locks).
1031    pub async fn commit_and_continue(&mut self) -> Result<(), Error> {
1032        debug!(txn:? = self; "Commit");
1033        self.txn_guard.fs().clone().commit_transaction(self, &mut |_| {}).await?;
1034        assert!(self.mutations.is_empty());
1035        self.txn_guard.fs().lock_manager().downgrade_locks(&self.txn_locks);
1036        Ok(())
1037    }
1038}
1039
1040impl Drop for Transaction<'_> {
1041    fn drop(&mut self) {
1042        // Call the filesystem implementation of drop_transaction which should, as a minimum, call
1043        // LockManager's drop_transaction to ensure the locks are released.
1044        debug!(txn:? = &self; "Drop");
1045        self.txn_guard.fs().clone().drop_transaction(self);
1046    }
1047}
1048
1049impl std::fmt::Debug for Transaction<'_> {
1050    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1051        f.debug_struct("Transaction")
1052            .field("mutations", &self.mutations)
1053            .field("txn_locks", &self.txn_locks)
1054            .field("reservation", &self.allocator_reservation)
1055            .finish()
1056    }
1057}
1058
1059pub enum BorrowedOrOwned<'a, T> {
1060    Borrowed(&'a T),
1061    Owned(T),
1062}
1063
1064impl<T> Deref for BorrowedOrOwned<'_, T> {
1065    type Target = T;
1066
1067    fn deref(&self) -> &Self::Target {
1068        match self {
1069            BorrowedOrOwned::Borrowed(b) => b,
1070            BorrowedOrOwned::Owned(o) => &o,
1071        }
1072    }
1073}
1074
1075impl<'a, T> From<&'a T> for BorrowedOrOwned<'a, T> {
1076    fn from(value: &'a T) -> Self {
1077        BorrowedOrOwned::Borrowed(value)
1078    }
1079}
1080
1081impl<T> From<T> for BorrowedOrOwned<'_, T> {
1082    fn from(value: T) -> Self {
1083        BorrowedOrOwned::Owned(value)
1084    }
1085}
1086
1087/// LockManager holds the locks that transactions might have taken.  A TransactionManager
1088/// implementation would typically have one of these.
1089///
1090/// Three different kinds of locks are supported.  There are read locks and write locks, which are
1091/// as one would expect.  The third kind of lock is a _transaction_ lock (which is also known as an
1092/// upgradeable read lock).  When first acquired, these block other writes (including other
1093/// transaction locks) but do not block reads.  When it is time to commit a transaction, these locks
1094/// are upgraded to full write locks (without ever dropping the lock) and then dropped after
1095/// committing (unless commit_and_continue is used).  This way, reads are only blocked for the
1096/// shortest possible time.  It follows that write locks should be used sparingly.  Locks are
1097/// granted in order with one exception: when a lock is in the initial _transaction_ lock state
1098/// (LockState::Locked), all read locks are allowed even if there are other tasks waiting for the
1099/// lock.  The reason for this is because we allow read locks to be taken by tasks that have taken a
1100/// _transaction_ lock (i.e. recursion is allowed).  In other cases, such as when a writer is
1101/// waiting and there are only readers, readers will queue up behind the writer.
1102///
1103/// To summarize:
1104///
1105/// +-------------------------+-----------------+----------------+------------------+
1106/// |                         | While read_lock | While txn_lock | While write_lock |
1107/// |                         | is held         | is held        | is held          |
1108/// +-------------------------+-----------------+----------------+------------------+
1109/// | Can acquire read_lock?  | true            | true           | false            |
1110/// +-------------------------+-----------------+----------------+------------------+
1111/// | Can acquire txn_lock?   | true            | false          | false            |
1112/// +-------------------------+-----------------+----------------+------------------+
1113/// | Can acquire write_lock? | false           | false          | false            |
1114/// +-------------------------+-----------------+----------------+------------------+
1115pub struct LockManager {
1116    locks: Mutex<Locks>,
1117}
1118
1119struct Locks {
1120    keys: HashMap<LockKey, LockEntry>,
1121}
1122
1123impl Locks {
1124    fn drop_lock(&mut self, key: LockKey, state: LockState) {
1125        if let Entry::Occupied(mut occupied) = self.keys.entry(key) {
1126            let entry = occupied.get_mut();
1127            let wake = match state {
1128                LockState::ReadLock => {
1129                    entry.read_count -= 1;
1130                    entry.read_count == 0
1131                }
1132                // drop_write_locks currently depends on us treating Locked and WriteLock the same.
1133                LockState::Locked | LockState::WriteLock => {
1134                    entry.state = LockState::ReadLock;
1135                    true
1136                }
1137            };
1138            if wake {
1139                // SAFETY: The lock in `LockManager::locks` is held.
1140                unsafe {
1141                    entry.wake();
1142                }
1143                if entry.can_remove() {
1144                    occupied.remove_entry();
1145                }
1146            }
1147        } else {
1148            unreachable!();
1149        }
1150    }
1151
1152    fn drop_read_locks(&mut self, lock_keys: LockKeys) {
1153        for lock in lock_keys.iter() {
1154            self.drop_lock(*lock, LockState::ReadLock);
1155        }
1156    }
1157
1158    fn drop_write_locks(&mut self, lock_keys: LockKeys) {
1159        for lock in lock_keys.iter() {
1160            // This is a bit hacky, but this works for locks in either the Locked or WriteLock
1161            // states.
1162            self.drop_lock(*lock, LockState::WriteLock);
1163        }
1164    }
1165
1166    // Downgrades locks from WriteLock to Locked.
1167    fn downgrade_locks(&mut self, lock_keys: &LockKeys) {
1168        for lock in lock_keys.iter() {
1169            // SAFETY: The lock in `LockManager::locks` is held.
1170            unsafe {
1171                self.keys.get_mut(lock).unwrap().downgrade_lock();
1172            }
1173        }
1174    }
1175}
1176
1177#[derive(Debug)]
1178struct LockEntry {
1179    // In the states that allow readers (ReadLock, Locked), this count can be non-zero
1180    // to indicate the number of active readers.
1181    read_count: u64,
1182
1183    // The state of the lock (see below).
1184    state: LockState,
1185
1186    // A doubly-linked list of wakers that should be woken when they have been granted the lock.
1187    // New wakers are usually chained on to tail, with the exception being the case where a lock in
1188    // state Locked is to be upgraded to WriteLock, but can't because there are readers.  It might
1189    // be possible to use intrusive-collections in the future.
1190    head: *const LockWaker,
1191    tail: *const LockWaker,
1192}
1193
1194unsafe impl Send for LockEntry {}
1195
1196// Represents a node in the waker list.  It is only safe to access the members wrapped by UnsafeCell
1197// when LockManager's `locks` member is locked.
1198struct LockWaker {
1199    // The next and previous pointers in the doubly-linked list.
1200    next: UnsafeCell<*const LockWaker>,
1201    prev: UnsafeCell<*const LockWaker>,
1202
1203    // Holds the lock key for this waker.  This is required so that we can find the associated
1204    // `LockEntry`.
1205    key: LockKey,
1206
1207    // The underlying waker that should be used to wake the task.
1208    waker: UnsafeCell<WakerState>,
1209
1210    // The target state for this waker.
1211    target_state: LockState,
1212
1213    // True if this is an upgrade.
1214    is_upgrade: bool,
1215
1216    // We need to be pinned because these form part of the linked list.
1217    _pin: PhantomPinned,
1218}
1219
1220enum WakerState {
1221    // This is the initial state before the waker has been first polled.
1222    Pending,
1223
1224    // Once polled, this contains the actual waker.
1225    Registered(Waker),
1226
1227    // The waker has been woken and has been granted the lock.
1228    Woken,
1229}
1230
1231impl WakerState {
1232    fn is_woken(&self) -> bool {
1233        matches!(self, WakerState::Woken)
1234    }
1235}
1236
1237unsafe impl Send for LockWaker {}
1238unsafe impl Sync for LockWaker {}
1239
1240impl LockWaker {
1241    // Waits for the waker to be woken.
1242    async fn wait(&self, manager: &LockManager) {
1243        // We must guard against the future being dropped.
1244        let waker_guard = scopeguard::guard((), |_| {
1245            let mut locks = manager.locks.lock();
1246            // SAFETY: We've acquired the lock.
1247            unsafe {
1248                if (*self.waker.get()).is_woken() {
1249                    // We were woken, but didn't actually run, so we must drop the lock.
1250                    if self.is_upgrade {
1251                        locks.keys.get_mut(&self.key).unwrap().downgrade_lock();
1252                    } else {
1253                        locks.drop_lock(self.key, self.target_state);
1254                    }
1255                } else {
1256                    // We haven't been woken but we've been dropped so we must remove ourself from
1257                    // the waker list.
1258                    locks.keys.get_mut(&self.key).unwrap().remove_waker(self);
1259                }
1260            }
1261        });
1262
1263        poll_fn(|cx| {
1264            let _locks = manager.locks.lock();
1265            // SAFETY: We've acquired the lock.
1266            unsafe {
1267                if (*self.waker.get()).is_woken() {
1268                    Poll::Ready(())
1269                } else {
1270                    *self.waker.get() = WakerState::Registered(cx.waker().clone());
1271                    Poll::Pending
1272                }
1273            }
1274        })
1275        .await;
1276
1277        ScopeGuard::into_inner(waker_guard);
1278    }
1279}
1280
1281#[derive(Copy, Clone, Debug, PartialEq)]
1282enum LockState {
1283    // In this state, there are only readers.
1284    ReadLock,
1285
1286    // This state is used for transactions to lock other writers (including other transactions), but
1287    // it still allows readers.
1288    Locked,
1289
1290    // A writer has exclusive access; all other readers and writers are blocked.
1291    WriteLock,
1292}
1293
1294impl LockManager {
1295    pub fn new() -> Self {
1296        LockManager { locks: Mutex::new(Locks { keys: HashMap::default() }) }
1297    }
1298
1299    /// Acquires the locks.  It is the caller's responsibility to ensure that drop_transaction is
1300    /// called when a transaction is dropped i.e. the filesystem's drop_transaction method should
1301    /// call LockManager's drop_transaction method.
1302    pub async fn txn_lock<'a>(&'a self, lock_keys: LockKeys) -> TransactionLocks<'a> {
1303        TransactionLocks(
1304            debug_assert_not_too_long!(self.lock(lock_keys, LockState::Locked)).right().unwrap(),
1305        )
1306    }
1307
1308    // `state` indicates the kind of lock required.  ReadLock means acquire a read lock.  Locked
1309    // means lock other writers, but still allow readers.  WriteLock means acquire a write lock.
1310    async fn lock<'a>(
1311        &'a self,
1312        mut lock_keys: LockKeys,
1313        target_state: LockState,
1314    ) -> Either<ReadGuard<'a>, WriteGuard<'a>> {
1315        let mut guard = match &target_state {
1316            LockState::ReadLock => Left(ReadGuard {
1317                manager: self.into(),
1318                lock_keys: LockKeys::with_capacity(lock_keys.len()),
1319            }),
1320            LockState::Locked | LockState::WriteLock => Right(WriteGuard {
1321                manager: self.into(),
1322                lock_keys: LockKeys::with_capacity(lock_keys.len()),
1323            }),
1324        };
1325        let guard_keys = match &mut guard {
1326            Left(g) => &mut g.lock_keys,
1327            Right(g) => &mut g.lock_keys,
1328        };
1329        lock_keys.sort_unstable();
1330        lock_keys.dedup();
1331        for lock in lock_keys.iter() {
1332            let lock_waker = None;
1333            pin_mut!(lock_waker);
1334            {
1335                let mut locks = self.locks.lock();
1336                match locks.keys.entry(*lock) {
1337                    Entry::Vacant(vacant) => {
1338                        vacant.insert(LockEntry {
1339                            read_count: if let LockState::ReadLock = target_state {
1340                                guard_keys.push(*lock);
1341                                1
1342                            } else {
1343                                guard_keys.push(*lock);
1344                                0
1345                            },
1346                            state: target_state,
1347                            head: std::ptr::null(),
1348                            tail: std::ptr::null(),
1349                        });
1350                    }
1351                    Entry::Occupied(mut occupied) => {
1352                        let entry = occupied.get_mut();
1353                        // SAFETY: We've acquired the lock.
1354                        if unsafe { entry.is_allowed(target_state, entry.head.is_null()) } {
1355                            if let LockState::ReadLock = target_state {
1356                                entry.read_count += 1;
1357                                guard_keys.push(*lock);
1358                            } else {
1359                                entry.state = target_state;
1360                                guard_keys.push(*lock);
1361                            }
1362                        } else {
1363                            // Initialise a waker and push it on the tail of the list.
1364                            // SAFETY: `lock_waker` isn't used prior to this point.
1365                            unsafe {
1366                                *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1367                                    next: UnsafeCell::new(std::ptr::null()),
1368                                    prev: UnsafeCell::new(entry.tail),
1369                                    key: *lock,
1370                                    waker: UnsafeCell::new(WakerState::Pending),
1371                                    target_state: target_state,
1372                                    is_upgrade: false,
1373                                    _pin: PhantomPinned,
1374                                });
1375                            }
1376                            let waker = (*lock_waker).as_ref().unwrap();
1377                            if entry.tail.is_null() {
1378                                entry.head = waker;
1379                            } else {
1380                                // SAFETY: We've acquired the lock.
1381                                unsafe {
1382                                    *(*entry.tail).next.get() = waker;
1383                                }
1384                            }
1385                            entry.tail = waker;
1386                        }
1387                    }
1388                }
1389            }
1390            if let Some(waker) = &*lock_waker {
1391                waker.wait(self).await;
1392                guard_keys.push(*lock);
1393            }
1394        }
1395        guard
1396    }
1397
1398    /// This should be called by the filesystem's drop_transaction implementation.
1399    pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
1400        let mut locks = self.locks.lock();
1401        locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks));
1402    }
1403
1404    /// Prepares to commit by waiting for readers to finish.
1405    pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
1406        self.commit_prepare_keys(&transaction.txn_locks).await;
1407    }
1408
1409    async fn commit_prepare_keys(&self, lock_keys: &LockKeys) {
1410        for lock in lock_keys.iter() {
1411            let lock_waker = None;
1412            pin_mut!(lock_waker);
1413            {
1414                let mut locks = self.locks.lock();
1415                let entry = locks.keys.get_mut(lock).unwrap();
1416                assert_eq!(entry.state, LockState::Locked);
1417
1418                if entry.read_count == 0 {
1419                    entry.state = LockState::WriteLock;
1420                } else {
1421                    // Initialise a waker and push it on the head of the list.
1422                    // SAFETY: `lock_waker` isn't used prior to this point.
1423                    unsafe {
1424                        *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1425                            next: UnsafeCell::new(entry.head),
1426                            prev: UnsafeCell::new(std::ptr::null()),
1427                            key: *lock,
1428                            waker: UnsafeCell::new(WakerState::Pending),
1429                            target_state: LockState::WriteLock,
1430                            is_upgrade: true,
1431                            _pin: PhantomPinned,
1432                        });
1433                    }
1434                    let waker = (*lock_waker).as_ref().unwrap();
1435                    if entry.head.is_null() {
1436                        entry.tail = (*lock_waker).as_ref().unwrap();
1437                    } else {
1438                        // SAFETY: We've acquired the lock.
1439                        unsafe {
1440                            *(*entry.head).prev.get() = waker;
1441                        }
1442                    }
1443                    entry.head = waker;
1444                }
1445            }
1446
1447            if let Some(waker) = &*lock_waker {
1448                waker.wait(self).await;
1449            }
1450        }
1451    }
1452
1453    /// Acquires a read lock for the given keys.  Read locks are only blocked whilst a transaction
1454    /// is being committed for the same locks.  They are only necessary where consistency is
1455    /// required between different mutations within a transaction.  For example, a write might
1456    /// change the size and extents for an object, in which case a read lock is required so that
1457    /// observed size and extents are seen together or not at all.
1458    pub async fn read_lock<'a>(&'a self, lock_keys: LockKeys) -> ReadGuard<'a> {
1459        debug_assert_not_too_long!(self.lock(lock_keys, LockState::ReadLock)).left().unwrap()
1460    }
1461
1462    /// Acquires a write lock for the given keys.  Write locks provide exclusive access to the
1463    /// requested lock keys.
1464    pub async fn write_lock<'a>(&'a self, lock_keys: LockKeys) -> WriteGuard<'a> {
1465        debug_assert_not_too_long!(self.lock(lock_keys, LockState::WriteLock)).right().unwrap()
1466    }
1467
1468    /// Downgrades locks from the WriteLock state to Locked state.  This will panic if the locks are
1469    /// not in the WriteLock state.
1470    pub fn downgrade_locks(&self, lock_keys: &LockKeys) {
1471        self.locks.lock().downgrade_locks(lock_keys);
1472    }
1473}
1474
1475// These unsafe functions require that `locks` in LockManager is locked.
1476impl LockEntry {
1477    unsafe fn wake(&mut self) {
1478        // If the lock's state is WriteLock, or there's nothing waiting, return early.
1479        if self.head.is_null() || self.state == LockState::WriteLock {
1480            return;
1481        }
1482
1483        let waker = unsafe { &*self.head };
1484
1485        if waker.is_upgrade {
1486            if self.read_count > 0 {
1487                return;
1488            }
1489        } else if !unsafe { self.is_allowed(waker.target_state, true) } {
1490            return;
1491        }
1492
1493        unsafe { self.pop_and_wake() };
1494
1495        // If the waker was a write lock, we can't wake any more up, but otherwise, we can keep
1496        // waking up readers.
1497        if waker.target_state == LockState::WriteLock {
1498            return;
1499        }
1500
1501        while !self.head.is_null() && unsafe { (*self.head).target_state } == LockState::ReadLock {
1502            unsafe { self.pop_and_wake() };
1503        }
1504    }
1505
1506    unsafe fn pop_and_wake(&mut self) {
1507        let waker = unsafe { &*self.head };
1508
1509        // Pop the waker.
1510        self.head = unsafe { *waker.next.get() };
1511        if self.head.is_null() {
1512            self.tail = std::ptr::null()
1513        } else {
1514            unsafe { *(*self.head).prev.get() = std::ptr::null() };
1515        }
1516
1517        // Adjust our state accordingly.
1518        if waker.target_state == LockState::ReadLock {
1519            self.read_count += 1;
1520        } else {
1521            self.state = waker.target_state;
1522        }
1523
1524        // Now wake the task.
1525        if let WakerState::Registered(waker) =
1526            std::mem::replace(unsafe { &mut *waker.waker.get() }, WakerState::Woken)
1527        {
1528            waker.wake();
1529        }
1530    }
1531
1532    fn can_remove(&self) -> bool {
1533        self.state == LockState::ReadLock && self.read_count == 0
1534    }
1535
1536    unsafe fn remove_waker(&mut self, waker: &LockWaker) {
1537        unsafe {
1538            let is_first = (*waker.prev.get()).is_null();
1539            if is_first {
1540                self.head = *waker.next.get();
1541            } else {
1542                *(**waker.prev.get()).next.get() = *waker.next.get();
1543            }
1544            if (*waker.next.get()).is_null() {
1545                self.tail = *waker.prev.get();
1546            } else {
1547                *(**waker.next.get()).prev.get() = *waker.prev.get();
1548            }
1549            if is_first {
1550                // We must call wake in case we erased a pending write lock and readers can now
1551                // proceed.
1552                self.wake();
1553            }
1554        }
1555    }
1556
1557    // Returns whether or not a lock with given `target_state` can proceed.  `is_head` should be
1558    // true if this is something at the head of the waker list (or the waker list is empty) and
1559    // false if there are other items on the waker list that are prior.
1560    unsafe fn is_allowed(&self, target_state: LockState, is_head: bool) -> bool {
1561        match self.state {
1562            LockState::ReadLock => {
1563                // Allow ReadLock and Locked so long as nothing else is waiting.
1564                (self.read_count == 0
1565                    || target_state == LockState::Locked
1566                    || target_state == LockState::ReadLock)
1567                    && is_head
1568            }
1569            LockState::Locked => {
1570                // Always allow reads unless there's an upgrade waiting.  We have to
1571                // always allow reads in this state because tasks that have locks in
1572                // the Locked state can later try and acquire ReadLock.
1573                target_state == LockState::ReadLock
1574                    && (is_head || unsafe { !(*self.head).is_upgrade })
1575            }
1576            LockState::WriteLock => false,
1577        }
1578    }
1579
1580    unsafe fn downgrade_lock(&mut self) {
1581        assert_eq!(std::mem::replace(&mut self.state, LockState::Locked), LockState::WriteLock);
1582        unsafe { self.wake() };
1583    }
1584}
1585
1586#[must_use]
1587pub struct ReadGuard<'a> {
1588    manager: LockManagerRef<'a>,
1589    lock_keys: LockKeys,
1590}
1591
1592impl ReadGuard<'_> {
1593    pub fn fs(&self) -> Option<&Arc<FxFilesystem>> {
1594        if let LockManagerRef::Owned(fs) = &self.manager { Some(fs) } else { None }
1595    }
1596
1597    pub fn into_owned(mut self, fs: Arc<FxFilesystem>) -> ReadGuard<'static> {
1598        ReadGuard {
1599            manager: LockManagerRef::Owned(fs),
1600            lock_keys: std::mem::replace(&mut self.lock_keys, LockKeys::None),
1601        }
1602    }
1603}
1604
1605impl Drop for ReadGuard<'_> {
1606    fn drop(&mut self) {
1607        let mut locks = self.manager.locks.lock();
1608        locks.drop_read_locks(std::mem::take(&mut self.lock_keys));
1609    }
1610}
1611
1612impl fmt::Debug for ReadGuard<'_> {
1613    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1614        f.debug_struct("ReadGuard")
1615            .field("manager", &(&self.manager as *const _))
1616            .field("lock_keys", &self.lock_keys)
1617            .finish()
1618    }
1619}
1620
1621#[must_use]
1622pub struct WriteGuard<'a> {
1623    manager: LockManagerRef<'a>,
1624    lock_keys: LockKeys,
1625}
1626
1627impl Drop for WriteGuard<'_> {
1628    fn drop(&mut self) {
1629        let mut locks = self.manager.locks.lock();
1630        locks.drop_write_locks(std::mem::take(&mut self.lock_keys));
1631    }
1632}
1633
1634impl fmt::Debug for WriteGuard<'_> {
1635    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1636        f.debug_struct("WriteGuard")
1637            .field("manager", &(&self.manager as *const _))
1638            .field("lock_keys", &self.lock_keys)
1639            .finish()
1640    }
1641}
1642
1643enum LockManagerRef<'a> {
1644    Borrowed(&'a LockManager),
1645    Owned(Arc<FxFilesystem>),
1646}
1647
1648impl Deref for LockManagerRef<'_> {
1649    type Target = LockManager;
1650
1651    fn deref(&self) -> &Self::Target {
1652        match self {
1653            LockManagerRef::Borrowed(m) => m,
1654            LockManagerRef::Owned(f) => f.lock_manager(),
1655        }
1656    }
1657}
1658
1659impl<'a> From<&'a LockManager> for LockManagerRef<'a> {
1660    fn from(value: &'a LockManager) -> Self {
1661        LockManagerRef::Borrowed(value)
1662    }
1663}
1664
1665#[cfg(test)]
1666mod tests {
1667    use super::{LockKey, LockKeys, LockManager, LockState, Mutation, Options};
1668    use crate::filesystem::FxFilesystem;
1669    use fuchsia_async as fasync;
1670    use fuchsia_sync::Mutex;
1671    use futures::channel::oneshot::channel;
1672    use futures::future::FutureExt;
1673    use futures::stream::FuturesUnordered;
1674    use futures::{StreamExt, join, pin_mut};
1675    use std::task::Poll;
1676    use std::time::Duration;
1677    use storage_device::DeviceHolder;
1678    use storage_device::fake_device::FakeDevice;
1679
1680    #[fuchsia::test]
1681    async fn test_simple() {
1682        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1683        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1684        let mut t = fs
1685            .clone()
1686            .new_transaction(lock_keys![], Options::default())
1687            .await
1688            .expect("new_transaction failed");
1689        t.add(1, Mutation::BeginFlush);
1690        assert!(!t.is_empty());
1691    }
1692
1693    #[fuchsia::test]
1694    async fn test_locks() {
1695        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1696        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1697        let (send1, recv1) = channel();
1698        let (send2, recv2) = channel();
1699        let (send3, recv3) = channel();
1700        let done = Mutex::new(false);
1701        let mut futures = FuturesUnordered::new();
1702        futures.push(
1703            async {
1704                let _t = fs
1705                    .clone()
1706                    .new_transaction(
1707                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1708                        Options::default(),
1709                    )
1710                    .await
1711                    .expect("new_transaction failed");
1712                send1.send(()).unwrap(); // Tell the next future to continue.
1713                send3.send(()).unwrap(); // Tell the last future to continue.
1714                recv2.await.unwrap();
1715                // This is a halting problem so all we can do is sleep.
1716                fasync::Timer::new(Duration::from_millis(100)).await;
1717                assert!(!*done.lock());
1718            }
1719            .boxed(),
1720        );
1721        futures.push(
1722            async {
1723                recv1.await.unwrap();
1724                // This should not block since it is a different key.
1725                let _t = fs
1726                    .clone()
1727                    .new_transaction(
1728                        lock_keys![LockKey::object_attribute(2, 2, 3)],
1729                        Options::default(),
1730                    )
1731                    .await
1732                    .expect("new_transaction failed");
1733                // Tell the first future to continue.
1734                send2.send(()).unwrap();
1735            }
1736            .boxed(),
1737        );
1738        futures.push(
1739            async {
1740                // This should block until the first future has completed.
1741                recv3.await.unwrap();
1742                let _t = fs
1743                    .clone()
1744                    .new_transaction(
1745                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1746                        Options::default(),
1747                    )
1748                    .await;
1749                *done.lock() = true;
1750            }
1751            .boxed(),
1752        );
1753        while let Some(()) = futures.next().await {}
1754    }
1755
1756    #[fuchsia::test]
1757    async fn test_read_lock_after_write_lock() {
1758        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1759        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1760        let (send1, recv1) = channel();
1761        let (send2, recv2) = channel();
1762        let done = Mutex::new(false);
1763        join!(
1764            async {
1765                let t = fs
1766                    .clone()
1767                    .new_transaction(
1768                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1769                        Options::default(),
1770                    )
1771                    .await
1772                    .expect("new_transaction failed");
1773                send1.send(()).unwrap(); // Tell the next future to continue.
1774                recv2.await.unwrap();
1775                t.commit().await.expect("commit failed");
1776                *done.lock() = true;
1777            },
1778            async {
1779                recv1.await.unwrap();
1780                // Reads should not be blocked until the transaction is committed.
1781                let _guard = fs
1782                    .lock_manager()
1783                    .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1784                    .await;
1785                // Tell the first future to continue.
1786                send2.send(()).unwrap();
1787                // It shouldn't proceed until we release our read lock, but it's a halting
1788                // problem, so sleep.
1789                fasync::Timer::new(Duration::from_millis(100)).await;
1790                assert!(!*done.lock());
1791            },
1792        );
1793    }
1794
1795    #[fuchsia::test]
1796    async fn test_write_lock_after_read_lock() {
1797        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1798        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1799        let (send1, recv1) = channel();
1800        let (send2, recv2) = channel();
1801        let done = Mutex::new(false);
1802        join!(
1803            async {
1804                // Reads should not be blocked until the transaction is committed.
1805                let _guard = fs
1806                    .lock_manager()
1807                    .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1808                    .await;
1809                // Tell the next future to continue and then wait.
1810                send1.send(()).unwrap();
1811                recv2.await.unwrap();
1812                // It shouldn't proceed until we release our read lock, but it's a halting
1813                // problem, so sleep.
1814                fasync::Timer::new(Duration::from_millis(100)).await;
1815                assert!(!*done.lock());
1816            },
1817            async {
1818                recv1.await.unwrap();
1819                let t = fs
1820                    .clone()
1821                    .new_transaction(
1822                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1823                        Options::default(),
1824                    )
1825                    .await
1826                    .expect("new_transaction failed");
1827                send2.send(()).unwrap(); // Tell the first future to continue;
1828                t.commit().await.expect("commit failed");
1829                *done.lock() = true;
1830            },
1831        );
1832    }
1833
1834    #[fuchsia::test]
1835    async fn test_drop_uncommitted_transaction() {
1836        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1837        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1838        let key = lock_keys![LockKey::object(1, 1)];
1839
1840        // Dropping while there's a reader.
1841        {
1842            let _write_lock = fs
1843                .clone()
1844                .new_transaction(key.clone(), Options::default())
1845                .await
1846                .expect("new_transaction failed");
1847            let _read_lock = fs.lock_manager().read_lock(key.clone()).await;
1848        }
1849        // Dropping while there's no reader.
1850        {
1851            let _write_lock = fs
1852                .clone()
1853                .new_transaction(key.clone(), Options::default())
1854                .await
1855                .expect("new_transaction failed");
1856        }
1857        // Make sure we can take the lock again (i.e. it was actually released).
1858        fs.clone()
1859            .new_transaction(key.clone(), Options::default())
1860            .await
1861            .expect("new_transaction failed");
1862    }
1863
1864    #[fuchsia::test]
1865    async fn test_drop_waiting_write_lock() {
1866        let manager = LockManager::new();
1867        let keys = lock_keys![LockKey::object(1, 1)];
1868        {
1869            let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1870            if let Poll::Ready(_) =
1871                futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1872            {
1873                assert!(false);
1874            }
1875        }
1876        let _ = manager.lock(keys, LockState::WriteLock).await;
1877    }
1878
1879    #[fuchsia::test]
1880    async fn test_write_lock_blocks_everything() {
1881        let manager = LockManager::new();
1882        let keys = lock_keys![LockKey::object(1, 1)];
1883        {
1884            let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1885            if let Poll::Ready(_) =
1886                futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1887            {
1888                assert!(false);
1889            }
1890            if let Poll::Ready(_) =
1891                futures::poll!(manager.lock(keys.clone(), LockState::ReadLock).boxed())
1892            {
1893                assert!(false);
1894            }
1895        }
1896        {
1897            let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1898        }
1899        {
1900            let _guard = manager.lock(keys, LockState::ReadLock).await;
1901        }
1902    }
1903
1904    #[fuchsia::test]
1905    async fn test_downgrade_locks() {
1906        let manager = LockManager::new();
1907        let keys = lock_keys![LockKey::object(1, 1)];
1908        let _guard = manager.txn_lock(keys.clone()).await;
1909        manager.commit_prepare_keys(&keys).await;
1910
1911        // Use FuturesUnordered so that we can check that the waker is woken.
1912        let mut read_lock: FuturesUnordered<_> =
1913            std::iter::once(manager.read_lock(keys.clone())).collect();
1914
1915        // Trying to acquire a read lock now should be blocked.
1916        assert!(futures::poll!(read_lock.next()).is_pending());
1917
1918        manager.downgrade_locks(&keys);
1919
1920        // After downgrading, it should be possible to take a read lock.
1921        assert!(futures::poll!(read_lock.next()).is_ready());
1922    }
1923
1924    #[fuchsia::test]
1925    async fn test_dropped_write_lock_wakes() {
1926        let manager = LockManager::new();
1927        let keys = lock_keys![LockKey::object(1, 1)];
1928        let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1929        let mut read_lock = FuturesUnordered::new();
1930        read_lock.push(manager.lock(keys.clone(), LockState::ReadLock));
1931
1932        {
1933            let write_lock = manager.lock(keys, LockState::WriteLock);
1934            pin_mut!(write_lock);
1935
1936            // The write lock should be blocked because of the read lock.
1937            assert!(futures::poll!(write_lock).is_pending());
1938
1939            // Another read lock should be blocked because of the write lock.
1940            assert!(futures::poll!(read_lock.next()).is_pending());
1941        }
1942
1943        // Dropping the write lock should allow the read lock to proceed.
1944        assert!(futures::poll!(read_lock.next()).is_ready());
1945    }
1946
1947    #[fuchsia::test]
1948    async fn test_drop_upgrade() {
1949        let manager = LockManager::new();
1950        let keys = lock_keys![LockKey::object(1, 1)];
1951        let _guard = manager.lock(keys.clone(), LockState::Locked).await;
1952
1953        {
1954            let commit_prepare = manager.commit_prepare_keys(&keys);
1955            pin_mut!(commit_prepare);
1956            let _read_guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1957            assert!(futures::poll!(commit_prepare).is_pending());
1958
1959            // Now we test dropping read_guard which should wake commit_prepare and
1960            // then dropping commit_prepare.
1961        }
1962
1963        // We should be able to still commit_prepare.
1964        manager.commit_prepare_keys(&keys).await;
1965    }
1966
1967    #[fasync::run_singlethreaded(test)]
1968    async fn test_woken_upgrade_blocks_reads() {
1969        let manager = LockManager::new();
1970        let keys = lock_keys![LockKey::object(1, 1)];
1971        // Start with a transaction lock.
1972        let guard = manager.lock(keys.clone(), LockState::Locked).await;
1973
1974        // Take a read lock.
1975        let read1 = manager.lock(keys.clone(), LockState::ReadLock).await;
1976
1977        // Try and upgrade the transaction lock, which should not be possible because of the read.
1978        let commit_prepare = manager.commit_prepare_keys(&keys);
1979        pin_mut!(commit_prepare);
1980        assert!(futures::poll!(commit_prepare.as_mut()).is_pending());
1981
1982        // Taking another read should also be blocked.
1983        let read2 = manager.lock(keys.clone(), LockState::ReadLock);
1984        pin_mut!(read2);
1985        assert!(futures::poll!(read2.as_mut()).is_pending());
1986
1987        // Drop the first read and the upgrade should complete.
1988        std::mem::drop(read1);
1989        assert!(futures::poll!(commit_prepare).is_ready());
1990
1991        // But the second read should still be blocked.
1992        assert!(futures::poll!(read2.as_mut()).is_pending());
1993
1994        // If we drop the write lock now, the read should be unblocked.
1995        std::mem::drop(guard);
1996        assert!(futures::poll!(read2).is_ready());
1997    }
1998
1999    static LOCK_KEY_1: LockKey = LockKey::flush(1);
2000    static LOCK_KEY_2: LockKey = LockKey::flush(2);
2001    static LOCK_KEY_3: LockKey = LockKey::flush(3);
2002
2003    // The keys, storage method, and capacity must all match.
2004    fn assert_lock_keys_equal(value: &LockKeys, expected: &LockKeys) {
2005        match (value, expected) {
2006            (LockKeys::None, LockKeys::None) => {}
2007            (LockKeys::Inline(key1), LockKeys::Inline(key2)) => {
2008                if key1 != key2 {
2009                    panic!("{key1:?} != {key2:?}");
2010                }
2011            }
2012            (LockKeys::Vec(vec1), LockKeys::Vec(vec2)) => {
2013                if vec1 != vec2 {
2014                    panic!("{vec1:?} != {vec2:?}");
2015                }
2016                if vec1.capacity() != vec2.capacity() {
2017                    panic!(
2018                        "LockKeys have different capacity: {} != {}",
2019                        vec1.capacity(),
2020                        vec2.capacity()
2021                    );
2022                }
2023            }
2024            (_, _) => panic!("{value:?} != {expected:?}"),
2025        }
2026    }
2027
2028    // Only the keys must match. Storage method and capacity don't matter.
2029    fn assert_lock_keys_equivalent(value: &LockKeys, expected: &LockKeys) {
2030        let value: Vec<_> = value.iter().collect();
2031        let expected: Vec<_> = expected.iter().collect();
2032        assert_eq!(value, expected);
2033    }
2034
2035    #[test]
2036    fn test_lock_keys_macro() {
2037        assert_lock_keys_equal(&lock_keys![], &LockKeys::None);
2038        assert_lock_keys_equal(&lock_keys![LOCK_KEY_1], &LockKeys::Inline(LOCK_KEY_1));
2039        assert_lock_keys_equal(
2040            &lock_keys![LOCK_KEY_1, LOCK_KEY_2],
2041            &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]),
2042        );
2043    }
2044
2045    #[test]
2046    fn test_lock_keys_with_capacity() {
2047        assert_lock_keys_equal(&LockKeys::with_capacity(0), &LockKeys::None);
2048        assert_lock_keys_equal(&LockKeys::with_capacity(1), &LockKeys::None);
2049        assert_lock_keys_equal(&LockKeys::with_capacity(2), &LockKeys::Vec(Vec::with_capacity(2)));
2050    }
2051
2052    #[test]
2053    fn test_lock_keys_len() {
2054        assert_eq!(lock_keys![].len(), 0);
2055        assert_eq!(lock_keys![LOCK_KEY_1].len(), 1);
2056        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].len(), 2);
2057    }
2058
2059    #[test]
2060    fn test_lock_keys_contains() {
2061        assert_eq!(lock_keys![].contains(&LOCK_KEY_1), false);
2062        assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_1), true);
2063        assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_2), false);
2064        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_1), true);
2065        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_2), true);
2066        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_3), false);
2067    }
2068
2069    #[test]
2070    fn test_lock_keys_push() {
2071        let mut keys = lock_keys![];
2072        keys.push(LOCK_KEY_1);
2073        assert_lock_keys_equal(&keys, &LockKeys::Inline(LOCK_KEY_1));
2074        keys.push(LOCK_KEY_2);
2075        assert_lock_keys_equal(&keys, &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]));
2076        keys.push(LOCK_KEY_3);
2077        assert_lock_keys_equivalent(
2078            &keys,
2079            &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2, LOCK_KEY_3]),
2080        );
2081    }
2082
2083    #[test]
2084    fn test_lock_keys_sort_unstable() {
2085        let mut keys = lock_keys![];
2086        keys.sort_unstable();
2087        assert_lock_keys_equal(&keys, &lock_keys![]);
2088
2089        let mut keys = lock_keys![LOCK_KEY_1];
2090        keys.sort_unstable();
2091        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2092
2093        let mut keys = lock_keys![LOCK_KEY_2, LOCK_KEY_1];
2094        keys.sort_unstable();
2095        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2096    }
2097
2098    #[test]
2099    fn test_lock_keys_dedup() {
2100        let mut keys = lock_keys![];
2101        keys.dedup();
2102        assert_lock_keys_equal(&keys, &lock_keys![]);
2103
2104        let mut keys = lock_keys![LOCK_KEY_1];
2105        keys.dedup();
2106        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2107
2108        let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_1];
2109        keys.dedup();
2110        assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2111    }
2112
2113    #[test]
2114    fn test_lock_keys_truncate() {
2115        let mut keys = lock_keys![];
2116        keys.truncate(5);
2117        assert_lock_keys_equal(&keys, &lock_keys![]);
2118        keys.truncate(0);
2119        assert_lock_keys_equal(&keys, &lock_keys![]);
2120
2121        let mut keys = lock_keys![LOCK_KEY_1];
2122        keys.truncate(5);
2123        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2124        keys.truncate(0);
2125        assert_lock_keys_equal(&keys, &lock_keys![]);
2126
2127        let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_2];
2128        keys.truncate(5);
2129        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2130        keys.truncate(1);
2131        // Although there's only 1 key after truncate the key is not stored inline.
2132        assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2133    }
2134
2135    #[test]
2136    fn test_lock_keys_iter() {
2137        assert_eq!(lock_keys![].iter().collect::<Vec<_>>(), Vec::<&LockKey>::new());
2138
2139        assert_eq!(lock_keys![LOCK_KEY_1].iter().collect::<Vec<_>>(), vec![&LOCK_KEY_1]);
2140
2141        assert_eq!(
2142            lock_keys![LOCK_KEY_1, LOCK_KEY_2].iter().collect::<Vec<_>>(),
2143            vec![&LOCK_KEY_1, &LOCK_KEY_2]
2144        );
2145    }
2146}