fxfs/object_store/
transaction.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::Checksum;
6use crate::filesystem::{FxFilesystem, TxnGuard};
7use crate::log::*;
8use crate::lsm_tree::types::Item;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::AttributeKey;
11use crate::object_store::allocator::{AllocatorItem, Reservation};
12use crate::object_store::object_manager::{ObjectManager, reserved_space_from_journal_usage};
13use crate::object_store::object_record::{
14    FxfsKey, FxfsKeyV40, FxfsKeyV49, ObjectItem, ObjectItemV40, ObjectItemV41, ObjectItemV43,
15    ObjectItemV46, ObjectItemV47, ObjectItemV49, ObjectItemV50, ObjectKey, ObjectKeyData,
16    ObjectValue, ProjectProperty,
17};
18use crate::serialized_types::{Migrate, Versioned, migrate_nodefault, migrate_to_version};
19use anyhow::Error;
20use either::{Either, Left, Right};
21use fprint::TypeFingerprint;
22use fuchsia_sync::Mutex;
23use futures::future::poll_fn;
24use futures::pin_mut;
25use rustc_hash::FxHashMap as HashMap;
26use scopeguard::ScopeGuard;
27use serde::{Deserialize, Serialize};
28use std::cell::UnsafeCell;
29use std::cmp::Ordering;
30use std::collections::BTreeSet;
31use std::collections::hash_map::Entry;
32use std::marker::PhantomPinned;
33use std::ops::{Deref, DerefMut, Range};
34use std::sync::Arc;
35use std::task::{Poll, Waker};
36use std::{fmt, mem};
37
38/// This allows for special handling of certain transactions such as deletes and the
39/// extension of Journal extents. For most other use cases it is appropriate to use
40/// `default()` here.
41#[derive(Clone, Copy, Default)]
42pub struct Options<'a> {
43    /// If true, don't check for low journal space.  This should be true for any transactions that
44    /// might alleviate journal space (i.e. compaction).
45    pub skip_journal_checks: bool,
46
47    /// If true, borrow metadata space from the metadata reservation.  This setting should be set to
48    /// true for any transaction that will either not affect space usage after compaction
49    /// (e.g. setting attributes), or reduce space usage (e.g. unlinking).  Otherwise, a transaction
50    /// might fail with an out-of-space error.
51    pub borrow_metadata_space: bool,
52
53    /// If specified, a reservation to be used with the transaction.  If not set, any allocations
54    /// that are part of this transaction will have to take their chances, and will fail if there is
55    /// no free space.  The intention is that this should be used for things like the journal which
56    /// require guaranteed space.
57    pub allocator_reservation: Option<&'a Reservation>,
58
59    /// An existing transaction guard to be used.
60    pub txn_guard: Option<&'a TxnGuard<'a>>,
61}
62
63// This is the amount of space that we reserve for metadata when we are creating a new transaction.
64// A transaction should not take more than this.  This is expressed in terms of space occupied in
65// the journal; transactions must not take up more space in the journal than the number below.  The
66// amount chosen here must be large enough for the maximum possible transaction that can be created,
67// so transactions always need to be bounded which might involve splitting an operation up into
68// smaller transactions.
69pub const TRANSACTION_MAX_JOURNAL_USAGE: u64 = 24_576;
70pub const TRANSACTION_METADATA_MAX_AMOUNT: u64 =
71    reserved_space_from_journal_usage(TRANSACTION_MAX_JOURNAL_USAGE);
72
73#[must_use]
74pub struct TransactionLocks<'a>(pub WriteGuard<'a>);
75
76/// The journal consists of these records which will be replayed at mount time.  Within a
77/// transaction, these are stored as a set which allows some mutations to be deduplicated and found
78/// (and we require custom comparison functions below).  For example, we need to be able to find
79/// object size changes.
80pub type Mutation = MutationV50;
81
82#[derive(
83    Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned,
84)]
85#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
86pub enum MutationV50 {
87    ObjectStore(ObjectStoreMutationV50),
88    EncryptedObjectStore(Box<[u8]>),
89    Allocator(AllocatorMutationV32),
90    // Indicates the beginning of a flush.  This would typically involve sealing a tree.
91    BeginFlush,
92    // Indicates the end of a flush.  This would typically involve replacing the immutable layers
93    // with compacted ones.
94    EndFlush,
95    // Volume has been deleted.  Requires we remove it from the set of managed ObjectStore.
96    DeleteVolume,
97    UpdateBorrowed(u64),
98    UpdateMutationsKey(UpdateMutationsKeyV49),
99    CreateInternalDir(u64),
100}
101
102#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
103#[migrate_to_version(MutationV50)]
104pub enum MutationV49 {
105    ObjectStore(ObjectStoreMutationV49),
106    EncryptedObjectStore(Box<[u8]>),
107    Allocator(AllocatorMutationV32),
108    BeginFlush,
109    EndFlush,
110    DeleteVolume,
111    UpdateBorrowed(u64),
112    UpdateMutationsKey(UpdateMutationsKeyV49),
113    CreateInternalDir(u64),
114}
115
116#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
117#[migrate_to_version(MutationV49)]
118pub enum MutationV47 {
119    ObjectStore(ObjectStoreMutationV47),
120    EncryptedObjectStore(Box<[u8]>),
121    Allocator(AllocatorMutationV32),
122    BeginFlush,
123    EndFlush,
124    DeleteVolume,
125    UpdateBorrowed(u64),
126    UpdateMutationsKey(UpdateMutationsKeyV40),
127    CreateInternalDir(u64),
128}
129
130#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
131#[migrate_to_version(MutationV47)]
132pub enum MutationV46 {
133    ObjectStore(ObjectStoreMutationV46),
134    EncryptedObjectStore(Box<[u8]>),
135    Allocator(AllocatorMutationV32),
136    BeginFlush,
137    EndFlush,
138    DeleteVolume,
139    UpdateBorrowed(u64),
140    UpdateMutationsKey(UpdateMutationsKeyV40),
141    CreateInternalDir(u64),
142}
143
144#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
145#[migrate_to_version(MutationV46)]
146pub enum MutationV43 {
147    ObjectStore(ObjectStoreMutationV43),
148    EncryptedObjectStore(Box<[u8]>),
149    Allocator(AllocatorMutationV32),
150    BeginFlush,
151    EndFlush,
152    DeleteVolume,
153    UpdateBorrowed(u64),
154    UpdateMutationsKey(UpdateMutationsKeyV40),
155    CreateInternalDir(u64),
156}
157
158#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
159#[migrate_to_version(MutationV43)]
160pub enum MutationV41 {
161    ObjectStore(ObjectStoreMutationV41),
162    EncryptedObjectStore(Box<[u8]>),
163    Allocator(AllocatorMutationV32),
164    BeginFlush,
165    EndFlush,
166    DeleteVolume,
167    UpdateBorrowed(u64),
168    UpdateMutationsKey(UpdateMutationsKeyV40),
169    CreateInternalDir(u64),
170}
171
172#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
173#[migrate_to_version(MutationV41)]
174pub enum MutationV40 {
175    ObjectStore(ObjectStoreMutationV40),
176    EncryptedObjectStore(Box<[u8]>),
177    Allocator(AllocatorMutationV32),
178    BeginFlush,
179    EndFlush,
180    DeleteVolume,
181    UpdateBorrowed(u64),
182    UpdateMutationsKey(UpdateMutationsKeyV40),
183    CreateInternalDir(u64),
184}
185
186impl Mutation {
187    pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
188        Mutation::ObjectStore(ObjectStoreMutation {
189            item: Item::new(key, value),
190            op: Operation::Insert,
191        })
192    }
193
194    pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
195        Mutation::ObjectStore(ObjectStoreMutation {
196            item: Item::new(key, value),
197            op: Operation::ReplaceOrInsert,
198        })
199    }
200
201    pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
202        Mutation::ObjectStore(ObjectStoreMutation {
203            item: Item::new(key, value),
204            op: Operation::Merge,
205        })
206    }
207
208    pub fn update_mutations_key(key: FxfsKey) -> Self {
209        Mutation::UpdateMutationsKey(key.into())
210    }
211}
212
213// We have custom comparison functions for mutations that just use the key, rather than the key and
214// value that would be used by default so that we can deduplicate and find mutations (see
215// get_object_mutation below).
216pub type ObjectStoreMutation = ObjectStoreMutationV50;
217
218#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
219#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
220pub struct ObjectStoreMutationV50 {
221    pub item: ObjectItemV50,
222    pub op: OperationV32,
223}
224
225#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
226#[migrate_to_version(ObjectStoreMutationV50)]
227#[migrate_nodefault]
228pub struct ObjectStoreMutationV49 {
229    pub item: ObjectItemV49,
230    pub op: OperationV32,
231}
232
233#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
234#[migrate_to_version(ObjectStoreMutationV49)]
235#[migrate_nodefault]
236pub struct ObjectStoreMutationV47 {
237    pub item: ObjectItemV47,
238    pub op: OperationV32,
239}
240
241#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
242#[migrate_to_version(ObjectStoreMutationV47)]
243#[migrate_nodefault]
244pub struct ObjectStoreMutationV46 {
245    pub item: ObjectItemV46,
246    pub op: OperationV32,
247}
248
249#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
250#[migrate_to_version(ObjectStoreMutationV46)]
251#[migrate_nodefault]
252pub struct ObjectStoreMutationV43 {
253    pub item: ObjectItemV43,
254    pub op: OperationV32,
255}
256
257#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
258#[migrate_to_version(ObjectStoreMutationV43)]
259#[migrate_nodefault]
260pub struct ObjectStoreMutationV41 {
261    pub item: ObjectItemV41,
262    pub op: OperationV32,
263}
264
265#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
266#[migrate_nodefault]
267#[migrate_to_version(ObjectStoreMutationV41)]
268pub struct ObjectStoreMutationV40 {
269    pub item: ObjectItemV40,
270    pub op: OperationV32,
271}
272
273/// The different LSM tree operations that can be performed as part of a mutation.
274pub type Operation = OperationV32;
275
276#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
277#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
278pub enum OperationV32 {
279    Insert,
280    ReplaceOrInsert,
281    Merge,
282}
283
284impl Ord for ObjectStoreMutation {
285    fn cmp(&self, other: &Self) -> Ordering {
286        self.item.key.cmp(&other.item.key)
287    }
288}
289
290impl PartialOrd for ObjectStoreMutation {
291    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
292        Some(self.cmp(other))
293    }
294}
295
296impl PartialEq for ObjectStoreMutation {
297    fn eq(&self, other: &Self) -> bool {
298        self.item.key.eq(&other.item.key)
299    }
300}
301
302impl Eq for ObjectStoreMutation {}
303
304impl Ord for AllocatorItem {
305    fn cmp(&self, other: &Self) -> Ordering {
306        self.key.cmp(&other.key)
307    }
308}
309
310impl PartialOrd for AllocatorItem {
311    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
312        Some(self.cmp(other))
313    }
314}
315
316/// Same as std::ops::Range but with Ord and PartialOrd support, sorted first by start of the range,
317/// then by the end.
318pub type DeviceRange = DeviceRangeV32;
319
320#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypeFingerprint)]
321#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
322pub struct DeviceRangeV32(pub Range<u64>);
323
324impl Deref for DeviceRange {
325    type Target = Range<u64>;
326
327    fn deref(&self) -> &Self::Target {
328        &self.0
329    }
330}
331
332impl DerefMut for DeviceRange {
333    fn deref_mut(&mut self) -> &mut Self::Target {
334        &mut self.0
335    }
336}
337
338impl From<Range<u64>> for DeviceRange {
339    fn from(range: Range<u64>) -> Self {
340        Self(range)
341    }
342}
343
344impl Into<Range<u64>> for DeviceRange {
345    fn into(self) -> Range<u64> {
346        self.0
347    }
348}
349
350impl Ord for DeviceRange {
351    fn cmp(&self, other: &Self) -> Ordering {
352        self.start.cmp(&other.start).then(self.end.cmp(&other.end))
353    }
354}
355
356impl PartialOrd for DeviceRange {
357    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
358        Some(self.cmp(other))
359    }
360}
361
362pub type AllocatorMutation = AllocatorMutationV32;
363
364#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint)]
365#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
366pub enum AllocatorMutationV32 {
367    Allocate {
368        device_range: DeviceRangeV32,
369        owner_object_id: u64,
370    },
371    Deallocate {
372        device_range: DeviceRangeV32,
373        owner_object_id: u64,
374    },
375    SetLimit {
376        owner_object_id: u64,
377        bytes: u64,
378    },
379    /// Marks all extents with a given owner_object_id for deletion.
380    /// Used to free space allocated to encrypted ObjectStore where we may not have the key.
381    /// Note that the actual deletion time is undefined so this should never be used where an
382    /// ObjectStore is still in use due to a high risk of corruption. Similarly, owner_object_id
383    /// should never be reused for the same reasons.
384    MarkForDeletion(u64),
385}
386
387pub type UpdateMutationsKey = UpdateMutationsKeyV49;
388
389#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
390pub struct UpdateMutationsKeyV49(pub FxfsKeyV49);
391
392#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
393#[migrate_to_version(UpdateMutationsKeyV49)]
394pub struct UpdateMutationsKeyV40(pub FxfsKeyV40);
395
396impl From<UpdateMutationsKey> for FxfsKey {
397    fn from(outer: UpdateMutationsKey) -> Self {
398        outer.0
399    }
400}
401
402impl From<FxfsKey> for UpdateMutationsKey {
403    fn from(inner: FxfsKey) -> Self {
404        Self(inner)
405    }
406}
407
408#[cfg(fuzz)]
409impl<'a> arbitrary::Arbitrary<'a> for UpdateMutationsKey {
410    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
411        Ok(UpdateMutationsKey::from(FxfsKey::arbitrary(u).unwrap()))
412    }
413}
414
415impl Ord for UpdateMutationsKey {
416    fn cmp(&self, other: &Self) -> Ordering {
417        (self as *const UpdateMutationsKey).cmp(&(other as *const _))
418    }
419}
420
421impl PartialOrd for UpdateMutationsKey {
422    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
423        Some(self.cmp(other))
424    }
425}
426
427impl Eq for UpdateMutationsKey {}
428
429impl PartialEq for UpdateMutationsKey {
430    fn eq(&self, other: &Self) -> bool {
431        std::ptr::eq(self, other)
432    }
433}
434
435/// When creating a transaction, locks typically need to be held to prevent two or more writers
436/// trying to make conflicting mutations at the same time.  LockKeys are used for this.
437/// NOTE: Ordering is important here!  The lock manager sorts the list of locks in a transaction
438/// to acquire them in a consistent order, but there are special cases for the Filesystem lock and
439/// the Flush lock.
440/// The Filesystem lock is taken by every transaction and is done so first, as part of the TxnGuard.
441/// The Flush lock is taken when we flush an LSM tree (e.g. an object store), and is held for
442/// several transactions.  As such, it must come first in the lock acquisition ordering, so that
443/// other transactions using the Flush lock have the same ordering as in flushing.
444#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Copy)]
445pub enum LockKey {
446    /// Locks the entire filesystem.
447    Filesystem,
448
449    /// Used to lock flushing an object.
450    Flush {
451        object_id: u64,
452    },
453
454    /// Used to lock changes to a particular object attribute (e.g. writes).
455    ObjectAttribute {
456        store_object_id: u64,
457        object_id: u64,
458        attribute_id: u64,
459    },
460
461    /// Used to lock changes to a particular object (e.g. adding a child to a directory).
462    Object {
463        store_object_id: u64,
464        object_id: u64,
465    },
466
467    ProjectId {
468        store_object_id: u64,
469        project_id: u64,
470    },
471
472    /// Used to lock any truncate operations for a file.
473    Truncate {
474        store_object_id: u64,
475        object_id: u64,
476    },
477}
478
479impl LockKey {
480    pub const fn object_attribute(store_object_id: u64, object_id: u64, attribute_id: u64) -> Self {
481        LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
482    }
483
484    pub const fn object(store_object_id: u64, object_id: u64) -> Self {
485        LockKey::Object { store_object_id, object_id }
486    }
487
488    pub const fn flush(object_id: u64) -> Self {
489        LockKey::Flush { object_id }
490    }
491
492    pub const fn truncate(store_object_id: u64, object_id: u64) -> Self {
493        LockKey::Truncate { store_object_id, object_id }
494    }
495}
496
497/// A container for holding `LockKey` objects. Can store a single `LockKey` inline.
498#[derive(Clone, Debug)]
499pub enum LockKeys {
500    None,
501    Inline(LockKey),
502    Vec(Vec<LockKey>),
503}
504
505impl LockKeys {
506    pub fn with_capacity(capacity: usize) -> Self {
507        if capacity > 1 { LockKeys::Vec(Vec::with_capacity(capacity)) } else { LockKeys::None }
508    }
509
510    pub fn push(&mut self, key: LockKey) {
511        match self {
512            Self::None => *self = LockKeys::Inline(key),
513            Self::Inline(inline) => {
514                *self = LockKeys::Vec(vec![*inline, key]);
515            }
516            Self::Vec(vec) => vec.push(key),
517        }
518    }
519
520    pub fn truncate(&mut self, len: usize) {
521        match self {
522            Self::None => {}
523            Self::Inline(_) => {
524                if len == 0 {
525                    *self = Self::None;
526                }
527            }
528            Self::Vec(vec) => vec.truncate(len),
529        }
530    }
531
532    fn len(&self) -> usize {
533        match self {
534            Self::None => 0,
535            Self::Inline(_) => 1,
536            Self::Vec(vec) => vec.len(),
537        }
538    }
539
540    fn contains(&self, key: &LockKey) -> bool {
541        match self {
542            Self::None => false,
543            Self::Inline(single) => single == key,
544            Self::Vec(vec) => vec.contains(key),
545        }
546    }
547
548    fn sort_unstable(&mut self) {
549        match self {
550            Self::Vec(vec) => vec.sort_unstable(),
551            _ => {}
552        }
553    }
554
555    fn dedup(&mut self) {
556        match self {
557            Self::Vec(vec) => vec.dedup(),
558            _ => {}
559        }
560    }
561
562    fn iter(&self) -> LockKeysIter<'_> {
563        match self {
564            LockKeys::None => LockKeysIter::None,
565            LockKeys::Inline(key) => LockKeysIter::Inline(key),
566            LockKeys::Vec(keys) => LockKeysIter::Vec(keys.into_iter()),
567        }
568    }
569}
570
571enum LockKeysIter<'a> {
572    None,
573    Inline(&'a LockKey),
574    Vec(<&'a Vec<LockKey> as IntoIterator>::IntoIter),
575}
576
577impl<'a> Iterator for LockKeysIter<'a> {
578    type Item = &'a LockKey;
579    fn next(&mut self) -> Option<Self::Item> {
580        match self {
581            Self::None => None,
582            Self::Inline(inline) => {
583                let next = *inline;
584                *self = Self::None;
585                Some(next)
586            }
587            Self::Vec(vec) => vec.next(),
588        }
589    }
590}
591
592impl Default for LockKeys {
593    fn default() -> Self {
594        LockKeys::None
595    }
596}
597
598#[macro_export]
599macro_rules! lock_keys {
600    () => {
601        $crate::object_store::transaction::LockKeys::None
602    };
603    ($lock_key:expr $(,)?) => {
604        $crate::object_store::transaction::LockKeys::Inline($lock_key)
605    };
606    ($($lock_keys:expr),+ $(,)?) => {
607        $crate::object_store::transaction::LockKeys::Vec(vec![$($lock_keys),+])
608    };
609}
610pub use lock_keys;
611
612/// Mutations in a transaction can be associated with an object so that when mutations are applied,
613/// updates can be applied to in-memory structures. For example, we cache object sizes, so when a
614/// size change is applied, we can update the cached object size.
615pub trait AssociatedObject: Send + Sync {
616    fn will_apply_mutation(&self, _mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
617    }
618}
619
620pub enum AssocObj<'a> {
621    None,
622    Borrowed(&'a dyn AssociatedObject),
623    Owned(Box<dyn AssociatedObject>),
624}
625
626impl AssocObj<'_> {
627    pub fn map<R, F: FnOnce(&dyn AssociatedObject) -> R>(&self, f: F) -> Option<R> {
628        match self {
629            AssocObj::None => None,
630            AssocObj::Borrowed(b) => Some(f(*b)),
631            AssocObj::Owned(o) => Some(f(o.as_ref())),
632        }
633    }
634}
635
636pub struct TxnMutation<'a> {
637    // This, at time of writing, is either the object ID of an object store, or the object ID of the
638    // allocator.  In the case of an object mutation, there's another object ID in the mutation
639    // record that would be for the object actually being changed.
640    pub object_id: u64,
641
642    // The actual mutation.  This gets serialized to the journal.
643    pub mutation: Mutation,
644
645    // An optional associated object for the mutation.  During replay, there will always be no
646    // associated object.
647    pub associated_object: AssocObj<'a>,
648}
649
650// We store TxnMutation in a set, and for that, we only use object_id and mutation and not the
651// associated object or checksum.
652impl Ord for TxnMutation<'_> {
653    fn cmp(&self, other: &Self) -> Ordering {
654        self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
655    }
656}
657
658impl PartialOrd for TxnMutation<'_> {
659    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
660        Some(self.cmp(other))
661    }
662}
663
664impl PartialEq for TxnMutation<'_> {
665    fn eq(&self, other: &Self) -> bool {
666        self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
667    }
668}
669
670impl Eq for TxnMutation<'_> {}
671
672impl std::fmt::Debug for TxnMutation<'_> {
673    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
674        f.debug_struct("TxnMutation")
675            .field("object_id", &self.object_id)
676            .field("mutation", &self.mutation)
677            .finish()
678    }
679}
680
681pub enum MetadataReservation {
682    // The state after a transaction has been dropped.
683    None,
684
685    // Metadata space for this transaction is being borrowed from ObjectManager's metadata
686    // reservation.
687    Borrowed,
688
689    // A metadata reservation was made when the transaction was created.
690    Reservation(Reservation),
691
692    // The metadata space is being _held_ within `allocator_reservation`.
693    Hold(u64),
694}
695
696/// A transaction groups mutation records to be committed as a group.
697pub struct Transaction<'a> {
698    txn_guard: TxnGuard<'a>,
699
700    // The mutations that make up this transaction.
701    mutations: BTreeSet<TxnMutation<'a>>,
702
703    // The locks that this transaction currently holds.
704    txn_locks: LockKeys,
705
706    /// If set, an allocator reservation that should be used for allocations.
707    pub allocator_reservation: Option<&'a Reservation>,
708
709    /// The reservation for the metadata for this transaction.
710    pub metadata_reservation: MetadataReservation,
711
712    // Keep track of objects explicitly created by this transaction. No locks are required for them.
713    // Addressed by (owner_object_id, object_id).
714    new_objects: BTreeSet<(u64, u64)>,
715
716    /// Any data checksums which should be evaluated when replaying this transaction.
717    checksums: Vec<(Range<u64>, Vec<Checksum>, bool)>,
718
719    /// Set if this transaction contains data (i.e. includes any extent mutations).
720    includes_write: bool,
721}
722
723impl<'a> Transaction<'a> {
724    /// Creates a new transaction.  `txn_locks` are read locks that can be upgraded to write locks
725    /// at commit time.
726    pub async fn new(
727        txn_guard: TxnGuard<'a>,
728        options: Options<'a>,
729        txn_locks: LockKeys,
730    ) -> Result<Transaction<'a>, Error> {
731        txn_guard.fs().add_transaction(options.skip_journal_checks).await;
732        let fs = txn_guard.fs().clone();
733        let guard = scopeguard::guard((), |_| fs.sub_transaction());
734        let (metadata_reservation, allocator_reservation, hold) =
735            txn_guard.fs().reservation_for_transaction(options).await?;
736
737        let txn_locks = {
738            let lock_manager = txn_guard.fs().lock_manager();
739            let mut write_guard = lock_manager.txn_lock(txn_locks).await;
740            std::mem::take(&mut write_guard.0.lock_keys)
741        };
742        let mut transaction = Transaction {
743            txn_guard,
744            mutations: BTreeSet::new(),
745            txn_locks,
746            allocator_reservation: None,
747            metadata_reservation,
748            new_objects: BTreeSet::new(),
749            checksums: Vec::new(),
750            includes_write: false,
751        };
752
753        ScopeGuard::into_inner(guard);
754        hold.map(|h| h.forget()); // Transaction takes ownership from here on.
755        transaction.allocator_reservation = allocator_reservation;
756        Ok(transaction)
757    }
758
759    pub fn txn_guard(&self) -> &TxnGuard<'_> {
760        &self.txn_guard
761    }
762
763    pub fn mutations(&self) -> &BTreeSet<TxnMutation<'a>> {
764        &self.mutations
765    }
766
767    pub fn take_mutations(&mut self) -> BTreeSet<TxnMutation<'a>> {
768        self.new_objects.clear();
769        mem::take(&mut self.mutations)
770    }
771
772    /// Adds a mutation to this transaction.  If the mutation already exists, it is replaced and the
773    /// old mutation is returned.
774    pub fn add(&mut self, object_id: u64, mutation: Mutation) -> Option<Mutation> {
775        self.add_with_object(object_id, mutation, AssocObj::None)
776    }
777
778    /// Removes a mutation that matches `mutation`.
779    pub fn remove(&mut self, object_id: u64, mutation: Mutation) {
780        let txn_mutation = TxnMutation { object_id, mutation, associated_object: AssocObj::None };
781        if self.mutations.remove(&txn_mutation) {
782            if let Mutation::ObjectStore(ObjectStoreMutation {
783                item:
784                    ObjectItem {
785                        key: ObjectKey { object_id: new_object_id, data: ObjectKeyData::Object },
786                        ..
787                    },
788                op: Operation::Insert,
789            }) = txn_mutation.mutation
790            {
791                self.new_objects.remove(&(object_id, new_object_id));
792            }
793        }
794    }
795
796    /// Adds a mutation with an associated object. If the mutation already exists, it is replaced
797    /// and the old mutation is returned.
798    pub fn add_with_object(
799        &mut self,
800        object_id: u64,
801        mutation: Mutation,
802        associated_object: AssocObj<'a>,
803    ) -> Option<Mutation> {
804        assert!(object_id != INVALID_OBJECT_ID);
805        if let Mutation::ObjectStore(ObjectStoreMutation {
806            item:
807                Item {
808                    key:
809                        ObjectKey { data: ObjectKeyData::Attribute(_, AttributeKey::Extent(_)), .. },
810                    ..
811                },
812            ..
813        }) = &mutation
814        {
815            self.includes_write = true;
816        }
817        let txn_mutation = TxnMutation { object_id, mutation, associated_object };
818        self.verify_locks(&txn_mutation);
819        self.mutations.replace(txn_mutation).map(|m| m.mutation)
820    }
821
822    pub fn add_checksum(&mut self, range: Range<u64>, checksums: Vec<Checksum>, first_write: bool) {
823        self.checksums.push((range, checksums, first_write));
824    }
825
826    pub fn includes_write(&self) -> bool {
827        self.includes_write
828    }
829
830    pub fn checksums(&self) -> &[(Range<u64>, Vec<Checksum>, bool)] {
831        &self.checksums
832    }
833
834    pub fn take_checksums(&mut self) -> Vec<(Range<u64>, Vec<Checksum>, bool)> {
835        std::mem::replace(&mut self.checksums, Vec::new())
836    }
837
838    fn verify_locks(&mut self, mutation: &TxnMutation<'_>) {
839        // It was considered to change the locks from Vec to BTreeSet since we'll now be searching
840        // through it, but given the small set that these locks usually comprise, it probably isn't
841        // worth it.
842        match mutation {
843            TxnMutation {
844                mutation:
845                    Mutation::ObjectStore {
846                        0: ObjectStoreMutation { item: ObjectItem { key, .. }, op },
847                    },
848                object_id: store_object_id,
849                ..
850            } => {
851                match &key.data {
852                    ObjectKeyData::Attribute(..) => {
853                        // TODO(https://fxbug.dev/42073914): Check lock requirements.
854                    }
855                    ObjectKeyData::Child { .. }
856                    | ObjectKeyData::EncryptedChild { .. }
857                    | ObjectKeyData::CasefoldChild { .. } => {
858                        let id = key.object_id;
859                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
860                            && !self.new_objects.contains(&(*store_object_id, id))
861                        {
862                            debug_assert!(
863                                false,
864                                "Not holding required lock for object {id} \
865                                in store {store_object_id}"
866                            );
867                            error!(
868                                "Not holding required lock for object {id} in store \
869                                {store_object_id}"
870                            )
871                        }
872                    }
873                    ObjectKeyData::GraveyardEntry { .. } => {
874                        // TODO(https://fxbug.dev/42073911): Check lock requirements.
875                    }
876                    ObjectKeyData::GraveyardAttributeEntry { .. } => {
877                        // TODO(https://fxbug.dev/122974): Check lock requirements.
878                    }
879                    ObjectKeyData::Keys => {
880                        let id = key.object_id;
881                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
882                            && !self.new_objects.contains(&(*store_object_id, id))
883                        {
884                            debug_assert!(
885                                false,
886                                "Not holding required lock for object {id} \
887                                in store {store_object_id}"
888                            );
889                            error!(
890                                "Not holding required lock for object {id} in store \
891                                {store_object_id}"
892                            )
893                        }
894                    }
895                    ObjectKeyData::Object => match op {
896                        // Insert implies the caller expects no object with which to race
897                        Operation::Insert => {
898                            self.new_objects.insert((*store_object_id, key.object_id));
899                        }
900                        Operation::Merge | Operation::ReplaceOrInsert => {
901                            let id = key.object_id;
902                            if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
903                                && !self.new_objects.contains(&(*store_object_id, id))
904                            {
905                                debug_assert!(
906                                    false,
907                                    "Not holding required lock for object {id} \
908                                    in store {store_object_id}"
909                                );
910                                error!(
911                                    "Not holding required lock for object {id} in store \
912                                    {store_object_id}"
913                                )
914                            }
915                        }
916                    },
917                    ObjectKeyData::Project { project_id, property: ProjectProperty::Limit } => {
918                        if !self.txn_locks.contains(&LockKey::ProjectId {
919                            store_object_id: *store_object_id,
920                            project_id: *project_id,
921                        }) {
922                            debug_assert!(
923                                false,
924                                "Not holding required lock for project limit id {project_id} \
925                                in store {store_object_id}"
926                            );
927                            error!(
928                                "Not holding required lock for project limit id {project_id} in \
929                                store {store_object_id}"
930                            )
931                        }
932                    }
933                    ObjectKeyData::Project { property: ProjectProperty::Usage, .. } => match op {
934                        Operation::Insert | Operation::ReplaceOrInsert => {
935                            panic!(
936                                "Project usage is all handled by merging deltas, no inserts or \
937                                replacements should be used"
938                            );
939                        }
940                        // Merges are all handled like atomic +/- and serialized by the tree locks.
941                        Operation::Merge => {}
942                    },
943                    ObjectKeyData::ExtendedAttribute { .. } => {
944                        let id = key.object_id;
945                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
946                            && !self.new_objects.contains(&(*store_object_id, id))
947                        {
948                            debug_assert!(
949                                false,
950                                "Not holding required lock for object {id} \
951                                in store {store_object_id} while mutating extended attribute"
952                            );
953                            error!(
954                                "Not holding required lock for object {id} in store \
955                                {store_object_id} while mutating extended attribute"
956                            )
957                        }
958                    }
959                }
960            }
961            TxnMutation { mutation: Mutation::DeleteVolume, object_id, .. } => {
962                if !self.txn_locks.contains(&LockKey::flush(*object_id)) {
963                    debug_assert!(false, "Not holding required lock for DeleteVolume");
964                    error!("Not holding required lock for DeleteVolume");
965                }
966            }
967            _ => {}
968        }
969    }
970
971    /// Returns true if this transaction has no mutations.
972    pub fn is_empty(&self) -> bool {
973        self.mutations.is_empty()
974    }
975
976    /// Searches for an existing object mutation within the transaction that has the given key and
977    /// returns it if found.
978    pub fn get_object_mutation(
979        &self,
980        store_object_id: u64,
981        key: ObjectKey,
982    ) -> Option<&ObjectStoreMutation> {
983        if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
984            self.mutations.get(&TxnMutation {
985                object_id: store_object_id,
986                mutation: Mutation::insert_object(key, ObjectValue::None),
987                associated_object: AssocObj::None,
988            })
989        {
990            Some(mutation)
991        } else {
992            None
993        }
994    }
995
996    /// Commits a transaction.  If successful, returns the journal offset of the transaction.
997    pub async fn commit(mut self) -> Result<u64, Error> {
998        debug!(txn:? = &self; "Commit");
999        self.txn_guard.fs().clone().commit_transaction(&mut self, &mut |_| {}).await
1000    }
1001
1002    /// Commits and then runs the callback whilst locks are held.  The callback accepts a single
1003    /// parameter which is the journal offset of the transaction.
1004    pub async fn commit_with_callback<R: Send>(
1005        mut self,
1006        f: impl FnOnce(u64) -> R + Send,
1007    ) -> Result<R, Error> {
1008        debug!(txn:? = &self; "Commit");
1009        // It's not possible to pass an FnOnce via a trait without boxing it, but we don't want to
1010        // do that (for performance reasons), hence the reason for the following.
1011        let mut f = Some(f);
1012        let mut result = None;
1013        self.txn_guard
1014            .fs()
1015            .clone()
1016            .commit_transaction(&mut self, &mut |offset| {
1017                result = Some(f.take().unwrap()(offset));
1018            })
1019            .await?;
1020        Ok(result.unwrap())
1021    }
1022
1023    /// Commits the transaction, but allows the transaction to be used again.  The locks are not
1024    /// dropped (but transaction locks will get downgraded to read locks).
1025    pub async fn commit_and_continue(&mut self) -> Result<(), Error> {
1026        debug!(txn:? = self; "Commit");
1027        self.txn_guard.fs().clone().commit_transaction(self, &mut |_| {}).await?;
1028        assert!(self.mutations.is_empty());
1029        self.txn_guard.fs().lock_manager().downgrade_locks(&self.txn_locks);
1030        Ok(())
1031    }
1032}
1033
1034impl Drop for Transaction<'_> {
1035    fn drop(&mut self) {
1036        // Call the filesystem implementation of drop_transaction which should, as a minimum, call
1037        // LockManager's drop_transaction to ensure the locks are released.
1038        debug!(txn:? = &self; "Drop");
1039        self.txn_guard.fs().clone().drop_transaction(self);
1040    }
1041}
1042
1043impl std::fmt::Debug for Transaction<'_> {
1044    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1045        f.debug_struct("Transaction")
1046            .field("mutations", &self.mutations)
1047            .field("txn_locks", &self.txn_locks)
1048            .field("reservation", &self.allocator_reservation)
1049            .finish()
1050    }
1051}
1052
1053pub enum BorrowedOrOwned<'a, T> {
1054    Borrowed(&'a T),
1055    Owned(T),
1056}
1057
1058impl<T> Deref for BorrowedOrOwned<'_, T> {
1059    type Target = T;
1060
1061    fn deref(&self) -> &Self::Target {
1062        match self {
1063            BorrowedOrOwned::Borrowed(b) => b,
1064            BorrowedOrOwned::Owned(o) => &o,
1065        }
1066    }
1067}
1068
1069impl<'a, T> From<&'a T> for BorrowedOrOwned<'a, T> {
1070    fn from(value: &'a T) -> Self {
1071        BorrowedOrOwned::Borrowed(value)
1072    }
1073}
1074
1075impl<T> From<T> for BorrowedOrOwned<'_, T> {
1076    fn from(value: T) -> Self {
1077        BorrowedOrOwned::Owned(value)
1078    }
1079}
1080
1081/// LockManager holds the locks that transactions might have taken.  A TransactionManager
1082/// implementation would typically have one of these.
1083///
1084/// Three different kinds of locks are supported.  There are read locks and write locks, which are
1085/// as one would expect.  The third kind of lock is a _transaction_ lock (which is also known as an
1086/// upgradeable read lock).  When first acquired, these block other writes (including other
1087/// transaction locks) but do not block reads.  When it is time to commit a transaction, these locks
1088/// are upgraded to full write locks (without ever dropping the lock) and then dropped after
1089/// committing (unless commit_and_continue is used).  This way, reads are only blocked for the
1090/// shortest possible time.  It follows that write locks should be used sparingly.  Locks are
1091/// granted in order with one exception: when a lock is in the initial _transaction_ lock state
1092/// (LockState::Locked), all read locks are allowed even if there are other tasks waiting for the
1093/// lock.  The reason for this is because we allow read locks to be taken by tasks that have taken a
1094/// _transaction_ lock (i.e. recursion is allowed).  In other cases, such as when a writer is
1095/// waiting and there are only readers, readers will queue up behind the writer.
1096///
1097/// To summarize:
1098///
1099/// +-------------------------+-----------------+----------------+------------------+
1100/// |                         | While read_lock | While txn_lock | While write_lock |
1101/// |                         | is held         | is held        | is held          |
1102/// +-------------------------+-----------------+----------------+------------------+
1103/// | Can acquire read_lock?  | true            | true           | false            |
1104/// +-------------------------+-----------------+----------------+------------------+
1105/// | Can acquire txn_lock?   | true            | false          | false            |
1106/// +-------------------------+-----------------+----------------+------------------+
1107/// | Can acquire write_lock? | false           | false          | false            |
1108/// +-------------------------+-----------------+----------------+------------------+
1109pub struct LockManager {
1110    locks: Mutex<Locks>,
1111}
1112
1113struct Locks {
1114    keys: HashMap<LockKey, LockEntry>,
1115}
1116
1117impl Locks {
1118    fn drop_lock(&mut self, key: LockKey, state: LockState) {
1119        if let Entry::Occupied(mut occupied) = self.keys.entry(key) {
1120            let entry = occupied.get_mut();
1121            let wake = match state {
1122                LockState::ReadLock => {
1123                    entry.read_count -= 1;
1124                    entry.read_count == 0
1125                }
1126                // drop_write_locks currently depends on us treating Locked and WriteLock the same.
1127                LockState::Locked | LockState::WriteLock => {
1128                    entry.state = LockState::ReadLock;
1129                    true
1130                }
1131            };
1132            if wake {
1133                // SAFETY: The lock in `LockManager::locks` is held.
1134                unsafe {
1135                    entry.wake();
1136                }
1137                if entry.can_remove() {
1138                    occupied.remove_entry();
1139                }
1140            }
1141        } else {
1142            unreachable!();
1143        }
1144    }
1145
1146    fn drop_read_locks(&mut self, lock_keys: LockKeys) {
1147        for lock in lock_keys.iter() {
1148            self.drop_lock(*lock, LockState::ReadLock);
1149        }
1150    }
1151
1152    fn drop_write_locks(&mut self, lock_keys: LockKeys) {
1153        for lock in lock_keys.iter() {
1154            // This is a bit hacky, but this works for locks in either the Locked or WriteLock
1155            // states.
1156            self.drop_lock(*lock, LockState::WriteLock);
1157        }
1158    }
1159
1160    // Downgrades locks from WriteLock to Locked.
1161    fn downgrade_locks(&mut self, lock_keys: &LockKeys) {
1162        for lock in lock_keys.iter() {
1163            // SAFETY: The lock in `LockManager::locks` is held.
1164            unsafe {
1165                self.keys.get_mut(lock).unwrap().downgrade_lock();
1166            }
1167        }
1168    }
1169}
1170
1171#[derive(Debug)]
1172struct LockEntry {
1173    // In the states that allow readers (ReadLock, Locked), this count can be non-zero
1174    // to indicate the number of active readers.
1175    read_count: u64,
1176
1177    // The state of the lock (see below).
1178    state: LockState,
1179
1180    // A doubly-linked list of wakers that should be woken when they have been granted the lock.
1181    // New wakers are usually chained on to tail, with the exception being the case where a lock in
1182    // state Locked is to be upgraded to WriteLock, but can't because there are readers.  It might
1183    // be possible to use intrusive-collections in the future.
1184    head: *const LockWaker,
1185    tail: *const LockWaker,
1186}
1187
1188unsafe impl Send for LockEntry {}
1189
1190// Represents a node in the waker list.  It is only safe to access the members wrapped by UnsafeCell
1191// when LockManager's `locks` member is locked.
1192struct LockWaker {
1193    // The next and previous pointers in the doubly-linked list.
1194    next: UnsafeCell<*const LockWaker>,
1195    prev: UnsafeCell<*const LockWaker>,
1196
1197    // Holds the lock key for this waker.  This is required so that we can find the associated
1198    // `LockEntry`.
1199    key: LockKey,
1200
1201    // The underlying waker that should be used to wake the task.
1202    waker: UnsafeCell<WakerState>,
1203
1204    // The target state for this waker.
1205    target_state: LockState,
1206
1207    // True if this is an upgrade.
1208    is_upgrade: bool,
1209
1210    // We need to be pinned because these form part of the linked list.
1211    _pin: PhantomPinned,
1212}
1213
1214enum WakerState {
1215    // This is the initial state before the waker has been first polled.
1216    Pending,
1217
1218    // Once polled, this contains the actual waker.
1219    Registered(Waker),
1220
1221    // The waker has been woken and has been granted the lock.
1222    Woken,
1223}
1224
1225impl WakerState {
1226    fn is_woken(&self) -> bool {
1227        matches!(self, WakerState::Woken)
1228    }
1229}
1230
1231unsafe impl Send for LockWaker {}
1232unsafe impl Sync for LockWaker {}
1233
1234impl LockWaker {
1235    // Waits for the waker to be woken.
1236    async fn wait(&self, manager: &LockManager) {
1237        // We must guard against the future being dropped.
1238        let waker_guard = scopeguard::guard((), |_| {
1239            let mut locks = manager.locks.lock();
1240            // SAFETY: We've acquired the lock.
1241            unsafe {
1242                if (*self.waker.get()).is_woken() {
1243                    // We were woken, but didn't actually run, so we must drop the lock.
1244                    if self.is_upgrade {
1245                        locks.keys.get_mut(&self.key).unwrap().downgrade_lock();
1246                    } else {
1247                        locks.drop_lock(self.key, self.target_state);
1248                    }
1249                } else {
1250                    // We haven't been woken but we've been dropped so we must remove ourself from
1251                    // the waker list.
1252                    locks.keys.get_mut(&self.key).unwrap().remove_waker(self);
1253                }
1254            }
1255        });
1256
1257        poll_fn(|cx| {
1258            let _locks = manager.locks.lock();
1259            // SAFETY: We've acquired the lock.
1260            unsafe {
1261                if (*self.waker.get()).is_woken() {
1262                    Poll::Ready(())
1263                } else {
1264                    *self.waker.get() = WakerState::Registered(cx.waker().clone());
1265                    Poll::Pending
1266                }
1267            }
1268        })
1269        .await;
1270
1271        ScopeGuard::into_inner(waker_guard);
1272    }
1273}
1274
1275#[derive(Copy, Clone, Debug, PartialEq)]
1276enum LockState {
1277    // In this state, there are only readers.
1278    ReadLock,
1279
1280    // This state is used for transactions to lock other writers (including other transactions), but
1281    // it still allows readers.
1282    Locked,
1283
1284    // A writer has exclusive access; all other readers and writers are blocked.
1285    WriteLock,
1286}
1287
1288impl LockManager {
1289    pub fn new() -> Self {
1290        LockManager { locks: Mutex::new(Locks { keys: HashMap::default() }) }
1291    }
1292
1293    /// Acquires the locks.  It is the caller's responsibility to ensure that drop_transaction is
1294    /// called when a transaction is dropped i.e. the filesystem's drop_transaction method should
1295    /// call LockManager's drop_transaction method.
1296    pub async fn txn_lock<'a>(&'a self, lock_keys: LockKeys) -> TransactionLocks<'a> {
1297        TransactionLocks(
1298            debug_assert_not_too_long!(self.lock(lock_keys, LockState::Locked)).right().unwrap(),
1299        )
1300    }
1301
1302    // `state` indicates the kind of lock required.  ReadLock means acquire a read lock.  Locked
1303    // means lock other writers, but still allow readers.  WriteLock means acquire a write lock.
1304    async fn lock<'a>(
1305        &'a self,
1306        mut lock_keys: LockKeys,
1307        target_state: LockState,
1308    ) -> Either<ReadGuard<'a>, WriteGuard<'a>> {
1309        let mut guard = match &target_state {
1310            LockState::ReadLock => Left(ReadGuard {
1311                manager: self.into(),
1312                lock_keys: LockKeys::with_capacity(lock_keys.len()),
1313            }),
1314            LockState::Locked | LockState::WriteLock => Right(WriteGuard {
1315                manager: self.into(),
1316                lock_keys: LockKeys::with_capacity(lock_keys.len()),
1317            }),
1318        };
1319        let guard_keys = match &mut guard {
1320            Left(g) => &mut g.lock_keys,
1321            Right(g) => &mut g.lock_keys,
1322        };
1323        lock_keys.sort_unstable();
1324        lock_keys.dedup();
1325        for lock in lock_keys.iter() {
1326            let lock_waker = None;
1327            pin_mut!(lock_waker);
1328            {
1329                let mut locks = self.locks.lock();
1330                match locks.keys.entry(*lock) {
1331                    Entry::Vacant(vacant) => {
1332                        vacant.insert(LockEntry {
1333                            read_count: if let LockState::ReadLock = target_state {
1334                                guard_keys.push(*lock);
1335                                1
1336                            } else {
1337                                guard_keys.push(*lock);
1338                                0
1339                            },
1340                            state: target_state,
1341                            head: std::ptr::null(),
1342                            tail: std::ptr::null(),
1343                        });
1344                    }
1345                    Entry::Occupied(mut occupied) => {
1346                        let entry = occupied.get_mut();
1347                        // SAFETY: We've acquired the lock.
1348                        if unsafe { entry.is_allowed(target_state, entry.head.is_null()) } {
1349                            if let LockState::ReadLock = target_state {
1350                                entry.read_count += 1;
1351                                guard_keys.push(*lock);
1352                            } else {
1353                                entry.state = target_state;
1354                                guard_keys.push(*lock);
1355                            }
1356                        } else {
1357                            // Initialise a waker and push it on the tail of the list.
1358                            // SAFETY: `lock_waker` isn't used prior to this point.
1359                            unsafe {
1360                                *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1361                                    next: UnsafeCell::new(std::ptr::null()),
1362                                    prev: UnsafeCell::new(entry.tail),
1363                                    key: *lock,
1364                                    waker: UnsafeCell::new(WakerState::Pending),
1365                                    target_state: target_state,
1366                                    is_upgrade: false,
1367                                    _pin: PhantomPinned,
1368                                });
1369                            }
1370                            let waker = (*lock_waker).as_ref().unwrap();
1371                            if entry.tail.is_null() {
1372                                entry.head = waker;
1373                            } else {
1374                                // SAFETY: We've acquired the lock.
1375                                unsafe {
1376                                    *(*entry.tail).next.get() = waker;
1377                                }
1378                            }
1379                            entry.tail = waker;
1380                        }
1381                    }
1382                }
1383            }
1384            if let Some(waker) = &*lock_waker {
1385                waker.wait(self).await;
1386                guard_keys.push(*lock);
1387            }
1388        }
1389        guard
1390    }
1391
1392    /// This should be called by the filesystem's drop_transaction implementation.
1393    pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
1394        let mut locks = self.locks.lock();
1395        locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks));
1396    }
1397
1398    /// Prepares to commit by waiting for readers to finish.
1399    pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
1400        self.commit_prepare_keys(&transaction.txn_locks).await;
1401    }
1402
1403    async fn commit_prepare_keys(&self, lock_keys: &LockKeys) {
1404        for lock in lock_keys.iter() {
1405            let lock_waker = None;
1406            pin_mut!(lock_waker);
1407            {
1408                let mut locks = self.locks.lock();
1409                let entry = locks.keys.get_mut(lock).unwrap();
1410                assert_eq!(entry.state, LockState::Locked);
1411
1412                if entry.read_count == 0 {
1413                    entry.state = LockState::WriteLock;
1414                } else {
1415                    // Initialise a waker and push it on the head of the list.
1416                    // SAFETY: `lock_waker` isn't used prior to this point.
1417                    unsafe {
1418                        *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1419                            next: UnsafeCell::new(entry.head),
1420                            prev: UnsafeCell::new(std::ptr::null()),
1421                            key: *lock,
1422                            waker: UnsafeCell::new(WakerState::Pending),
1423                            target_state: LockState::WriteLock,
1424                            is_upgrade: true,
1425                            _pin: PhantomPinned,
1426                        });
1427                    }
1428                    let waker = (*lock_waker).as_ref().unwrap();
1429                    if entry.head.is_null() {
1430                        entry.tail = (*lock_waker).as_ref().unwrap();
1431                    } else {
1432                        // SAFETY: We've acquired the lock.
1433                        unsafe {
1434                            *(*entry.head).prev.get() = waker;
1435                        }
1436                    }
1437                    entry.head = waker;
1438                }
1439            }
1440
1441            if let Some(waker) = &*lock_waker {
1442                waker.wait(self).await;
1443            }
1444        }
1445    }
1446
1447    /// Acquires a read lock for the given keys.  Read locks are only blocked whilst a transaction
1448    /// is being committed for the same locks.  They are only necessary where consistency is
1449    /// required between different mutations within a transaction.  For example, a write might
1450    /// change the size and extents for an object, in which case a read lock is required so that
1451    /// observed size and extents are seen together or not at all.
1452    pub async fn read_lock<'a>(&'a self, lock_keys: LockKeys) -> ReadGuard<'a> {
1453        debug_assert_not_too_long!(self.lock(lock_keys, LockState::ReadLock)).left().unwrap()
1454    }
1455
1456    /// Acquires a write lock for the given keys.  Write locks provide exclusive access to the
1457    /// requested lock keys.
1458    pub async fn write_lock<'a>(&'a self, lock_keys: LockKeys) -> WriteGuard<'a> {
1459        debug_assert_not_too_long!(self.lock(lock_keys, LockState::WriteLock)).right().unwrap()
1460    }
1461
1462    /// Downgrades locks from the WriteLock state to Locked state.  This will panic if the locks are
1463    /// not in the WriteLock state.
1464    pub fn downgrade_locks(&self, lock_keys: &LockKeys) {
1465        self.locks.lock().downgrade_locks(lock_keys);
1466    }
1467}
1468
1469// These unsafe functions require that `locks` in LockManager is locked.
1470impl LockEntry {
1471    unsafe fn wake(&mut self) {
1472        // If the lock's state is WriteLock, or there's nothing waiting, return early.
1473        if self.head.is_null() || self.state == LockState::WriteLock {
1474            return;
1475        }
1476
1477        let waker = unsafe { &*self.head };
1478
1479        if waker.is_upgrade {
1480            if self.read_count > 0 {
1481                return;
1482            }
1483        } else if !unsafe { self.is_allowed(waker.target_state, true) } {
1484            return;
1485        }
1486
1487        unsafe { self.pop_and_wake() };
1488
1489        // If the waker was a write lock, we can't wake any more up, but otherwise, we can keep
1490        // waking up readers.
1491        if waker.target_state == LockState::WriteLock {
1492            return;
1493        }
1494
1495        while !self.head.is_null() && unsafe { (*self.head).target_state } == LockState::ReadLock {
1496            unsafe { self.pop_and_wake() };
1497        }
1498    }
1499
1500    unsafe fn pop_and_wake(&mut self) {
1501        let waker = unsafe { &*self.head };
1502
1503        // Pop the waker.
1504        self.head = unsafe { *waker.next.get() };
1505        if self.head.is_null() {
1506            self.tail = std::ptr::null()
1507        } else {
1508            unsafe { *(*self.head).prev.get() = std::ptr::null() };
1509        }
1510
1511        // Adjust our state accordingly.
1512        if waker.target_state == LockState::ReadLock {
1513            self.read_count += 1;
1514        } else {
1515            self.state = waker.target_state;
1516        }
1517
1518        // Now wake the task.
1519        if let WakerState::Registered(waker) =
1520            std::mem::replace(unsafe { &mut *waker.waker.get() }, WakerState::Woken)
1521        {
1522            waker.wake();
1523        }
1524    }
1525
1526    fn can_remove(&self) -> bool {
1527        self.state == LockState::ReadLock && self.read_count == 0
1528    }
1529
1530    unsafe fn remove_waker(&mut self, waker: &LockWaker) {
1531        unsafe {
1532            let is_first = (*waker.prev.get()).is_null();
1533            if is_first {
1534                self.head = *waker.next.get();
1535            } else {
1536                *(**waker.prev.get()).next.get() = *waker.next.get();
1537            }
1538            if (*waker.next.get()).is_null() {
1539                self.tail = *waker.prev.get();
1540            } else {
1541                *(**waker.next.get()).prev.get() = *waker.prev.get();
1542            }
1543            if is_first {
1544                // We must call wake in case we erased a pending write lock and readers can now
1545                // proceed.
1546                self.wake();
1547            }
1548        }
1549    }
1550
1551    // Returns whether or not a lock with given `target_state` can proceed.  `is_head` should be
1552    // true if this is something at the head of the waker list (or the waker list is empty) and
1553    // false if there are other items on the waker list that are prior.
1554    unsafe fn is_allowed(&self, target_state: LockState, is_head: bool) -> bool {
1555        match self.state {
1556            LockState::ReadLock => {
1557                // Allow ReadLock and Locked so long as nothing else is waiting.
1558                (self.read_count == 0
1559                    || target_state == LockState::Locked
1560                    || target_state == LockState::ReadLock)
1561                    && is_head
1562            }
1563            LockState::Locked => {
1564                // Always allow reads unless there's an upgrade waiting.  We have to
1565                // always allow reads in this state because tasks that have locks in
1566                // the Locked state can later try and acquire ReadLock.
1567                target_state == LockState::ReadLock
1568                    && (is_head || unsafe { !(*self.head).is_upgrade })
1569            }
1570            LockState::WriteLock => false,
1571        }
1572    }
1573
1574    unsafe fn downgrade_lock(&mut self) {
1575        assert_eq!(std::mem::replace(&mut self.state, LockState::Locked), LockState::WriteLock);
1576        unsafe { self.wake() };
1577    }
1578}
1579
1580#[must_use]
1581pub struct ReadGuard<'a> {
1582    manager: LockManagerRef<'a>,
1583    lock_keys: LockKeys,
1584}
1585
1586impl ReadGuard<'_> {
1587    pub fn fs(&self) -> Option<&Arc<FxFilesystem>> {
1588        if let LockManagerRef::Owned(fs) = &self.manager { Some(fs) } else { None }
1589    }
1590
1591    pub fn into_owned(mut self, fs: Arc<FxFilesystem>) -> ReadGuard<'static> {
1592        ReadGuard {
1593            manager: LockManagerRef::Owned(fs),
1594            lock_keys: std::mem::replace(&mut self.lock_keys, LockKeys::None),
1595        }
1596    }
1597}
1598
1599impl Drop for ReadGuard<'_> {
1600    fn drop(&mut self) {
1601        let mut locks = self.manager.locks.lock();
1602        locks.drop_read_locks(std::mem::take(&mut self.lock_keys));
1603    }
1604}
1605
1606impl fmt::Debug for ReadGuard<'_> {
1607    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1608        f.debug_struct("ReadGuard")
1609            .field("manager", &(&self.manager as *const _))
1610            .field("lock_keys", &self.lock_keys)
1611            .finish()
1612    }
1613}
1614
1615#[must_use]
1616pub struct WriteGuard<'a> {
1617    manager: LockManagerRef<'a>,
1618    lock_keys: LockKeys,
1619}
1620
1621impl Drop for WriteGuard<'_> {
1622    fn drop(&mut self) {
1623        let mut locks = self.manager.locks.lock();
1624        locks.drop_write_locks(std::mem::take(&mut self.lock_keys));
1625    }
1626}
1627
1628impl fmt::Debug for WriteGuard<'_> {
1629    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1630        f.debug_struct("WriteGuard")
1631            .field("manager", &(&self.manager as *const _))
1632            .field("lock_keys", &self.lock_keys)
1633            .finish()
1634    }
1635}
1636
1637enum LockManagerRef<'a> {
1638    Borrowed(&'a LockManager),
1639    Owned(Arc<FxFilesystem>),
1640}
1641
1642impl Deref for LockManagerRef<'_> {
1643    type Target = LockManager;
1644
1645    fn deref(&self) -> &Self::Target {
1646        match self {
1647            LockManagerRef::Borrowed(m) => m,
1648            LockManagerRef::Owned(f) => f.lock_manager(),
1649        }
1650    }
1651}
1652
1653impl<'a> From<&'a LockManager> for LockManagerRef<'a> {
1654    fn from(value: &'a LockManager) -> Self {
1655        LockManagerRef::Borrowed(value)
1656    }
1657}
1658
1659#[cfg(test)]
1660mod tests {
1661    use super::{LockKey, LockKeys, LockManager, LockState, Mutation, Options};
1662    use crate::filesystem::FxFilesystem;
1663    use fuchsia_async as fasync;
1664    use fuchsia_sync::Mutex;
1665    use futures::channel::oneshot::channel;
1666    use futures::future::FutureExt;
1667    use futures::stream::FuturesUnordered;
1668    use futures::{StreamExt, join, pin_mut};
1669    use std::task::Poll;
1670    use std::time::Duration;
1671    use storage_device::DeviceHolder;
1672    use storage_device::fake_device::FakeDevice;
1673
1674    #[fuchsia::test]
1675    async fn test_simple() {
1676        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1677        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1678        let mut t = fs
1679            .clone()
1680            .new_transaction(lock_keys![], Options::default())
1681            .await
1682            .expect("new_transaction failed");
1683        t.add(1, Mutation::BeginFlush);
1684        assert!(!t.is_empty());
1685    }
1686
1687    #[fuchsia::test]
1688    async fn test_locks() {
1689        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1690        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1691        let (send1, recv1) = channel();
1692        let (send2, recv2) = channel();
1693        let (send3, recv3) = channel();
1694        let done = Mutex::new(false);
1695        let mut futures = FuturesUnordered::new();
1696        futures.push(
1697            async {
1698                let _t = fs
1699                    .clone()
1700                    .new_transaction(
1701                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1702                        Options::default(),
1703                    )
1704                    .await
1705                    .expect("new_transaction failed");
1706                send1.send(()).unwrap(); // Tell the next future to continue.
1707                send3.send(()).unwrap(); // Tell the last future to continue.
1708                recv2.await.unwrap();
1709                // This is a halting problem so all we can do is sleep.
1710                fasync::Timer::new(Duration::from_millis(100)).await;
1711                assert!(!*done.lock());
1712            }
1713            .boxed(),
1714        );
1715        futures.push(
1716            async {
1717                recv1.await.unwrap();
1718                // This should not block since it is a different key.
1719                let _t = fs
1720                    .clone()
1721                    .new_transaction(
1722                        lock_keys![LockKey::object_attribute(2, 2, 3)],
1723                        Options::default(),
1724                    )
1725                    .await
1726                    .expect("new_transaction failed");
1727                // Tell the first future to continue.
1728                send2.send(()).unwrap();
1729            }
1730            .boxed(),
1731        );
1732        futures.push(
1733            async {
1734                // This should block until the first future has completed.
1735                recv3.await.unwrap();
1736                let _t = fs
1737                    .clone()
1738                    .new_transaction(
1739                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1740                        Options::default(),
1741                    )
1742                    .await;
1743                *done.lock() = true;
1744            }
1745            .boxed(),
1746        );
1747        while let Some(()) = futures.next().await {}
1748    }
1749
1750    #[fuchsia::test]
1751    async fn test_read_lock_after_write_lock() {
1752        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1753        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1754        let (send1, recv1) = channel();
1755        let (send2, recv2) = channel();
1756        let done = Mutex::new(false);
1757        join!(
1758            async {
1759                let t = fs
1760                    .clone()
1761                    .new_transaction(
1762                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1763                        Options::default(),
1764                    )
1765                    .await
1766                    .expect("new_transaction failed");
1767                send1.send(()).unwrap(); // Tell the next future to continue.
1768                recv2.await.unwrap();
1769                t.commit().await.expect("commit failed");
1770                *done.lock() = true;
1771            },
1772            async {
1773                recv1.await.unwrap();
1774                // Reads should not be blocked until the transaction is committed.
1775                let _guard = fs
1776                    .lock_manager()
1777                    .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1778                    .await;
1779                // Tell the first future to continue.
1780                send2.send(()).unwrap();
1781                // It shouldn't proceed until we release our read lock, but it's a halting
1782                // problem, so sleep.
1783                fasync::Timer::new(Duration::from_millis(100)).await;
1784                assert!(!*done.lock());
1785            },
1786        );
1787    }
1788
1789    #[fuchsia::test]
1790    async fn test_write_lock_after_read_lock() {
1791        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1792        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1793        let (send1, recv1) = channel();
1794        let (send2, recv2) = channel();
1795        let done = Mutex::new(false);
1796        join!(
1797            async {
1798                // Reads should not be blocked until the transaction is committed.
1799                let _guard = fs
1800                    .lock_manager()
1801                    .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1802                    .await;
1803                // Tell the next future to continue and then wait.
1804                send1.send(()).unwrap();
1805                recv2.await.unwrap();
1806                // It shouldn't proceed until we release our read lock, but it's a halting
1807                // problem, so sleep.
1808                fasync::Timer::new(Duration::from_millis(100)).await;
1809                assert!(!*done.lock());
1810            },
1811            async {
1812                recv1.await.unwrap();
1813                let t = fs
1814                    .clone()
1815                    .new_transaction(
1816                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1817                        Options::default(),
1818                    )
1819                    .await
1820                    .expect("new_transaction failed");
1821                send2.send(()).unwrap(); // Tell the first future to continue;
1822                t.commit().await.expect("commit failed");
1823                *done.lock() = true;
1824            },
1825        );
1826    }
1827
1828    #[fuchsia::test]
1829    async fn test_drop_uncommitted_transaction() {
1830        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1831        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1832        let key = lock_keys![LockKey::object(1, 1)];
1833
1834        // Dropping while there's a reader.
1835        {
1836            let _write_lock = fs
1837                .clone()
1838                .new_transaction(key.clone(), Options::default())
1839                .await
1840                .expect("new_transaction failed");
1841            let _read_lock = fs.lock_manager().read_lock(key.clone()).await;
1842        }
1843        // Dropping while there's no reader.
1844        {
1845            let _write_lock = fs
1846                .clone()
1847                .new_transaction(key.clone(), Options::default())
1848                .await
1849                .expect("new_transaction failed");
1850        }
1851        // Make sure we can take the lock again (i.e. it was actually released).
1852        fs.clone()
1853            .new_transaction(key.clone(), Options::default())
1854            .await
1855            .expect("new_transaction failed");
1856    }
1857
1858    #[fuchsia::test]
1859    async fn test_drop_waiting_write_lock() {
1860        let manager = LockManager::new();
1861        let keys = lock_keys![LockKey::object(1, 1)];
1862        {
1863            let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1864            if let Poll::Ready(_) =
1865                futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1866            {
1867                assert!(false);
1868            }
1869        }
1870        let _ = manager.lock(keys, LockState::WriteLock).await;
1871    }
1872
1873    #[fuchsia::test]
1874    async fn test_write_lock_blocks_everything() {
1875        let manager = LockManager::new();
1876        let keys = lock_keys![LockKey::object(1, 1)];
1877        {
1878            let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1879            if let Poll::Ready(_) =
1880                futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1881            {
1882                assert!(false);
1883            }
1884            if let Poll::Ready(_) =
1885                futures::poll!(manager.lock(keys.clone(), LockState::ReadLock).boxed())
1886            {
1887                assert!(false);
1888            }
1889        }
1890        {
1891            let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1892        }
1893        {
1894            let _guard = manager.lock(keys, LockState::ReadLock).await;
1895        }
1896    }
1897
1898    #[fuchsia::test]
1899    async fn test_downgrade_locks() {
1900        let manager = LockManager::new();
1901        let keys = lock_keys![LockKey::object(1, 1)];
1902        let _guard = manager.txn_lock(keys.clone()).await;
1903        manager.commit_prepare_keys(&keys).await;
1904
1905        // Use FuturesUnordered so that we can check that the waker is woken.
1906        let mut read_lock: FuturesUnordered<_> =
1907            std::iter::once(manager.read_lock(keys.clone())).collect();
1908
1909        // Trying to acquire a read lock now should be blocked.
1910        assert!(futures::poll!(read_lock.next()).is_pending());
1911
1912        manager.downgrade_locks(&keys);
1913
1914        // After downgrading, it should be possible to take a read lock.
1915        assert!(futures::poll!(read_lock.next()).is_ready());
1916    }
1917
1918    #[fuchsia::test]
1919    async fn test_dropped_write_lock_wakes() {
1920        let manager = LockManager::new();
1921        let keys = lock_keys![LockKey::object(1, 1)];
1922        let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1923        let mut read_lock = FuturesUnordered::new();
1924        read_lock.push(manager.lock(keys.clone(), LockState::ReadLock));
1925
1926        {
1927            let write_lock = manager.lock(keys, LockState::WriteLock);
1928            pin_mut!(write_lock);
1929
1930            // The write lock should be blocked because of the read lock.
1931            assert!(futures::poll!(write_lock).is_pending());
1932
1933            // Another read lock should be blocked because of the write lock.
1934            assert!(futures::poll!(read_lock.next()).is_pending());
1935        }
1936
1937        // Dropping the write lock should allow the read lock to proceed.
1938        assert!(futures::poll!(read_lock.next()).is_ready());
1939    }
1940
1941    #[fuchsia::test]
1942    async fn test_drop_upgrade() {
1943        let manager = LockManager::new();
1944        let keys = lock_keys![LockKey::object(1, 1)];
1945        let _guard = manager.lock(keys.clone(), LockState::Locked).await;
1946
1947        {
1948            let commit_prepare = manager.commit_prepare_keys(&keys);
1949            pin_mut!(commit_prepare);
1950            let _read_guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1951            assert!(futures::poll!(commit_prepare).is_pending());
1952
1953            // Now we test dropping read_guard which should wake commit_prepare and
1954            // then dropping commit_prepare.
1955        }
1956
1957        // We should be able to still commit_prepare.
1958        manager.commit_prepare_keys(&keys).await;
1959    }
1960
1961    #[fasync::run_singlethreaded(test)]
1962    async fn test_woken_upgrade_blocks_reads() {
1963        let manager = LockManager::new();
1964        let keys = lock_keys![LockKey::object(1, 1)];
1965        // Start with a transaction lock.
1966        let guard = manager.lock(keys.clone(), LockState::Locked).await;
1967
1968        // Take a read lock.
1969        let read1 = manager.lock(keys.clone(), LockState::ReadLock).await;
1970
1971        // Try and upgrade the transaction lock, which should not be possible because of the read.
1972        let commit_prepare = manager.commit_prepare_keys(&keys);
1973        pin_mut!(commit_prepare);
1974        assert!(futures::poll!(commit_prepare.as_mut()).is_pending());
1975
1976        // Taking another read should also be blocked.
1977        let read2 = manager.lock(keys.clone(), LockState::ReadLock);
1978        pin_mut!(read2);
1979        assert!(futures::poll!(read2.as_mut()).is_pending());
1980
1981        // Drop the first read and the upgrade should complete.
1982        std::mem::drop(read1);
1983        assert!(futures::poll!(commit_prepare).is_ready());
1984
1985        // But the second read should still be blocked.
1986        assert!(futures::poll!(read2.as_mut()).is_pending());
1987
1988        // If we drop the write lock now, the read should be unblocked.
1989        std::mem::drop(guard);
1990        assert!(futures::poll!(read2).is_ready());
1991    }
1992
1993    static LOCK_KEY_1: LockKey = LockKey::flush(1);
1994    static LOCK_KEY_2: LockKey = LockKey::flush(2);
1995    static LOCK_KEY_3: LockKey = LockKey::flush(3);
1996
1997    // The keys, storage method, and capacity must all match.
1998    fn assert_lock_keys_equal(value: &LockKeys, expected: &LockKeys) {
1999        match (value, expected) {
2000            (LockKeys::None, LockKeys::None) => {}
2001            (LockKeys::Inline(key1), LockKeys::Inline(key2)) => {
2002                if key1 != key2 {
2003                    panic!("{key1:?} != {key2:?}");
2004                }
2005            }
2006            (LockKeys::Vec(vec1), LockKeys::Vec(vec2)) => {
2007                if vec1 != vec2 {
2008                    panic!("{vec1:?} != {vec2:?}");
2009                }
2010                if vec1.capacity() != vec2.capacity() {
2011                    panic!(
2012                        "LockKeys have different capacity: {} != {}",
2013                        vec1.capacity(),
2014                        vec2.capacity()
2015                    );
2016                }
2017            }
2018            (_, _) => panic!("{value:?} != {expected:?}"),
2019        }
2020    }
2021
2022    // Only the keys must match. Storage method and capacity don't matter.
2023    fn assert_lock_keys_equivalent(value: &LockKeys, expected: &LockKeys) {
2024        let value: Vec<_> = value.iter().collect();
2025        let expected: Vec<_> = expected.iter().collect();
2026        assert_eq!(value, expected);
2027    }
2028
2029    #[test]
2030    fn test_lock_keys_macro() {
2031        assert_lock_keys_equal(&lock_keys![], &LockKeys::None);
2032        assert_lock_keys_equal(&lock_keys![LOCK_KEY_1], &LockKeys::Inline(LOCK_KEY_1));
2033        assert_lock_keys_equal(
2034            &lock_keys![LOCK_KEY_1, LOCK_KEY_2],
2035            &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]),
2036        );
2037    }
2038
2039    #[test]
2040    fn test_lock_keys_with_capacity() {
2041        assert_lock_keys_equal(&LockKeys::with_capacity(0), &LockKeys::None);
2042        assert_lock_keys_equal(&LockKeys::with_capacity(1), &LockKeys::None);
2043        assert_lock_keys_equal(&LockKeys::with_capacity(2), &LockKeys::Vec(Vec::with_capacity(2)));
2044    }
2045
2046    #[test]
2047    fn test_lock_keys_len() {
2048        assert_eq!(lock_keys![].len(), 0);
2049        assert_eq!(lock_keys![LOCK_KEY_1].len(), 1);
2050        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].len(), 2);
2051    }
2052
2053    #[test]
2054    fn test_lock_keys_contains() {
2055        assert_eq!(lock_keys![].contains(&LOCK_KEY_1), false);
2056        assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_1), true);
2057        assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_2), false);
2058        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_1), true);
2059        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_2), true);
2060        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_3), false);
2061    }
2062
2063    #[test]
2064    fn test_lock_keys_push() {
2065        let mut keys = lock_keys![];
2066        keys.push(LOCK_KEY_1);
2067        assert_lock_keys_equal(&keys, &LockKeys::Inline(LOCK_KEY_1));
2068        keys.push(LOCK_KEY_2);
2069        assert_lock_keys_equal(&keys, &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]));
2070        keys.push(LOCK_KEY_3);
2071        assert_lock_keys_equivalent(
2072            &keys,
2073            &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2, LOCK_KEY_3]),
2074        );
2075    }
2076
2077    #[test]
2078    fn test_lock_keys_sort_unstable() {
2079        let mut keys = lock_keys![];
2080        keys.sort_unstable();
2081        assert_lock_keys_equal(&keys, &lock_keys![]);
2082
2083        let mut keys = lock_keys![LOCK_KEY_1];
2084        keys.sort_unstable();
2085        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2086
2087        let mut keys = lock_keys![LOCK_KEY_2, LOCK_KEY_1];
2088        keys.sort_unstable();
2089        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2090    }
2091
2092    #[test]
2093    fn test_lock_keys_dedup() {
2094        let mut keys = lock_keys![];
2095        keys.dedup();
2096        assert_lock_keys_equal(&keys, &lock_keys![]);
2097
2098        let mut keys = lock_keys![LOCK_KEY_1];
2099        keys.dedup();
2100        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2101
2102        let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_1];
2103        keys.dedup();
2104        assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2105    }
2106
2107    #[test]
2108    fn test_lock_keys_truncate() {
2109        let mut keys = lock_keys![];
2110        keys.truncate(5);
2111        assert_lock_keys_equal(&keys, &lock_keys![]);
2112        keys.truncate(0);
2113        assert_lock_keys_equal(&keys, &lock_keys![]);
2114
2115        let mut keys = lock_keys![LOCK_KEY_1];
2116        keys.truncate(5);
2117        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2118        keys.truncate(0);
2119        assert_lock_keys_equal(&keys, &lock_keys![]);
2120
2121        let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_2];
2122        keys.truncate(5);
2123        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2124        keys.truncate(1);
2125        // Although there's only 1 key after truncate the key is not stored inline.
2126        assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2127    }
2128
2129    #[test]
2130    fn test_lock_keys_iter() {
2131        assert_eq!(lock_keys![].iter().collect::<Vec<_>>(), Vec::<&LockKey>::new());
2132
2133        assert_eq!(lock_keys![LOCK_KEY_1].iter().collect::<Vec<_>>(), vec![&LOCK_KEY_1]);
2134
2135        assert_eq!(
2136            lock_keys![LOCK_KEY_1, LOCK_KEY_2].iter().collect::<Vec<_>>(),
2137            vec![&LOCK_KEY_1, &LOCK_KEY_2]
2138        );
2139    }
2140}