fxfs/object_store/
transaction.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::Checksum;
6use crate::filesystem::{FxFilesystem, TxnGuard};
7use crate::log::*;
8use crate::lsm_tree::types::Item;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::AttributeKey;
11use crate::object_store::allocator::{AllocatorItem, Reservation};
12use crate::object_store::object_manager::{ObjectManager, reserved_space_from_journal_usage};
13use crate::object_store::object_record::{
14    FxfsKey, FxfsKeyV40, FxfsKeyV49, ObjectItem, ObjectItemV40, ObjectItemV41, ObjectItemV43,
15    ObjectItemV46, ObjectItemV47, ObjectItemV49, ObjectItemV50, ObjectKey, ObjectKeyData,
16    ObjectValue, ProjectProperty,
17};
18use crate::serialized_types::{Migrate, Versioned, migrate_nodefault, migrate_to_version};
19use anyhow::Error;
20use either::{Either, Left, Right};
21use fprint::TypeFingerprint;
22use fuchsia_sync::Mutex;
23use futures::future::poll_fn;
24use futures::pin_mut;
25use rustc_hash::FxHashMap as HashMap;
26use scopeguard::ScopeGuard;
27use serde::{Deserialize, Serialize};
28use std::cell::UnsafeCell;
29use std::cmp::Ordering;
30use std::collections::BTreeSet;
31use std::collections::hash_map::Entry;
32use std::marker::PhantomPinned;
33use std::ops::{Deref, DerefMut, Range};
34use std::sync::Arc;
35use std::task::{Poll, Waker};
36use std::{fmt, mem};
37
38/// This allows for special handling of certain transactions such as deletes and the
39/// extension of Journal extents. For most other use cases it is appropriate to use
40/// `default()` here.
41#[derive(Clone, Copy, Default)]
42pub struct Options<'a> {
43    /// If true, don't check for low journal space.  This should be true for any transactions that
44    /// might alleviate journal space (i.e. compaction).
45    pub skip_journal_checks: bool,
46
47    /// If true, borrow metadata space from the metadata reservation.  This setting should be set to
48    /// true for any transaction that will either not affect space usage after compaction
49    /// (e.g. setting attributes), or reduce space usage (e.g. unlinking).  Otherwise, a transaction
50    /// might fail with an out-of-space error.
51    pub borrow_metadata_space: bool,
52
53    /// If specified, a reservation to be used with the transaction.  If not set, any allocations
54    /// that are part of this transaction will have to take their chances, and will fail if there is
55    /// no free space.  The intention is that this should be used for things like the journal which
56    /// require guaranteed space.
57    pub allocator_reservation: Option<&'a Reservation>,
58
59    /// An existing transaction guard to be used.
60    pub txn_guard: Option<&'a TxnGuard<'a>>,
61}
62
63// This is the amount of space that we reserve for metadata when we are creating a new transaction.
64// A transaction should not take more than this.  This is expressed in terms of space occupied in
65// the journal; transactions must not take up more space in the journal than the number below.  The
66// amount chosen here must be large enough for the maximum possible transaction that can be created,
67// so transactions always need to be bounded which might involve splitting an operation up into
68// smaller transactions.
69pub const TRANSACTION_MAX_JOURNAL_USAGE: u64 = 24_576;
70pub const TRANSACTION_METADATA_MAX_AMOUNT: u64 =
71    reserved_space_from_journal_usage(TRANSACTION_MAX_JOURNAL_USAGE);
72
73#[must_use]
74pub struct TransactionLocks<'a>(pub WriteGuard<'a>);
75
76/// The journal consists of these records which will be replayed at mount time.  Within a
77/// transaction, these are stored as a set which allows some mutations to be deduplicated and found
78/// (and we require custom comparison functions below).  For example, we need to be able to find
79/// object size changes.
80pub type Mutation = MutationV50;
81
82#[derive(
83    Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned,
84)]
85#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
86pub enum MutationV50 {
87    ObjectStore(ObjectStoreMutationV50),
88    EncryptedObjectStore(Box<[u8]>),
89    Allocator(AllocatorMutationV32),
90    // Indicates the beginning of a flush.  This would typically involve sealing a tree.
91    BeginFlush,
92    // Indicates the end of a flush.  This would typically involve replacing the immutable layers
93    // with compacted ones.
94    EndFlush,
95    // Volume has been deleted.  Requires we remove it from the set of managed ObjectStore.
96    DeleteVolume,
97    UpdateBorrowed(u64),
98    UpdateMutationsKey(UpdateMutationsKeyV49),
99    CreateInternalDir(u64),
100}
101
102#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
103#[migrate_to_version(MutationV50)]
104pub enum MutationV49 {
105    ObjectStore(ObjectStoreMutationV49),
106    EncryptedObjectStore(Box<[u8]>),
107    Allocator(AllocatorMutationV32),
108    BeginFlush,
109    EndFlush,
110    DeleteVolume,
111    UpdateBorrowed(u64),
112    UpdateMutationsKey(UpdateMutationsKeyV49),
113    CreateInternalDir(u64),
114}
115
116#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
117#[migrate_to_version(MutationV49)]
118pub enum MutationV47 {
119    ObjectStore(ObjectStoreMutationV47),
120    EncryptedObjectStore(Box<[u8]>),
121    Allocator(AllocatorMutationV32),
122    BeginFlush,
123    EndFlush,
124    DeleteVolume,
125    UpdateBorrowed(u64),
126    UpdateMutationsKey(UpdateMutationsKeyV40),
127    CreateInternalDir(u64),
128}
129
130#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
131#[migrate_to_version(MutationV47)]
132pub enum MutationV46 {
133    ObjectStore(ObjectStoreMutationV46),
134    EncryptedObjectStore(Box<[u8]>),
135    Allocator(AllocatorMutationV32),
136    BeginFlush,
137    EndFlush,
138    DeleteVolume,
139    UpdateBorrowed(u64),
140    UpdateMutationsKey(UpdateMutationsKeyV40),
141    CreateInternalDir(u64),
142}
143
144#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
145#[migrate_to_version(MutationV46)]
146pub enum MutationV43 {
147    ObjectStore(ObjectStoreMutationV43),
148    EncryptedObjectStore(Box<[u8]>),
149    Allocator(AllocatorMutationV32),
150    BeginFlush,
151    EndFlush,
152    DeleteVolume,
153    UpdateBorrowed(u64),
154    UpdateMutationsKey(UpdateMutationsKeyV40),
155    CreateInternalDir(u64),
156}
157
158#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
159#[migrate_to_version(MutationV43)]
160pub enum MutationV41 {
161    ObjectStore(ObjectStoreMutationV41),
162    EncryptedObjectStore(Box<[u8]>),
163    Allocator(AllocatorMutationV32),
164    BeginFlush,
165    EndFlush,
166    DeleteVolume,
167    UpdateBorrowed(u64),
168    UpdateMutationsKey(UpdateMutationsKeyV40),
169    CreateInternalDir(u64),
170}
171
172#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
173#[migrate_to_version(MutationV41)]
174pub enum MutationV40 {
175    ObjectStore(ObjectStoreMutationV40),
176    EncryptedObjectStore(Box<[u8]>),
177    Allocator(AllocatorMutationV32),
178    BeginFlush,
179    EndFlush,
180    DeleteVolume,
181    UpdateBorrowed(u64),
182    UpdateMutationsKey(UpdateMutationsKeyV40),
183    CreateInternalDir(u64),
184}
185
186impl Mutation {
187    pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
188        Mutation::ObjectStore(ObjectStoreMutation {
189            item: Item::new(key, value),
190            op: Operation::Insert,
191        })
192    }
193
194    pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
195        Mutation::ObjectStore(ObjectStoreMutation {
196            item: Item::new(key, value),
197            op: Operation::ReplaceOrInsert,
198        })
199    }
200
201    pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
202        Mutation::ObjectStore(ObjectStoreMutation {
203            item: Item::new(key, value),
204            op: Operation::Merge,
205        })
206    }
207
208    pub fn update_mutations_key(key: FxfsKey) -> Self {
209        Mutation::UpdateMutationsKey(key.into())
210    }
211}
212
213// We have custom comparison functions for mutations that just use the key, rather than the key and
214// value that would be used by default so that we can deduplicate and find mutations (see
215// get_object_mutation below).
216pub type ObjectStoreMutation = ObjectStoreMutationV50;
217
218#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
219#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
220pub struct ObjectStoreMutationV50 {
221    pub item: ObjectItemV50,
222    pub op: OperationV32,
223}
224
225#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
226#[migrate_to_version(ObjectStoreMutationV50)]
227#[migrate_nodefault]
228pub struct ObjectStoreMutationV49 {
229    pub item: ObjectItemV49,
230    pub op: OperationV32,
231}
232
233#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
234#[migrate_to_version(ObjectStoreMutationV49)]
235#[migrate_nodefault]
236pub struct ObjectStoreMutationV47 {
237    pub item: ObjectItemV47,
238    pub op: OperationV32,
239}
240
241#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
242#[migrate_to_version(ObjectStoreMutationV47)]
243#[migrate_nodefault]
244pub struct ObjectStoreMutationV46 {
245    pub item: ObjectItemV46,
246    pub op: OperationV32,
247}
248
249#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
250#[migrate_to_version(ObjectStoreMutationV46)]
251#[migrate_nodefault]
252pub struct ObjectStoreMutationV43 {
253    pub item: ObjectItemV43,
254    pub op: OperationV32,
255}
256
257#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
258#[migrate_to_version(ObjectStoreMutationV43)]
259#[migrate_nodefault]
260pub struct ObjectStoreMutationV41 {
261    pub item: ObjectItemV41,
262    pub op: OperationV32,
263}
264
265#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
266#[migrate_nodefault]
267#[migrate_to_version(ObjectStoreMutationV41)]
268pub struct ObjectStoreMutationV40 {
269    pub item: ObjectItemV40,
270    pub op: OperationV32,
271}
272
273/// The different LSM tree operations that can be performed as part of a mutation.
274pub type Operation = OperationV32;
275
276#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
277#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
278pub enum OperationV32 {
279    Insert,
280    ReplaceOrInsert,
281    Merge,
282}
283
284impl Ord for ObjectStoreMutation {
285    fn cmp(&self, other: &Self) -> Ordering {
286        self.item.key.cmp(&other.item.key)
287    }
288}
289
290impl PartialOrd for ObjectStoreMutation {
291    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
292        Some(self.cmp(other))
293    }
294}
295
296impl PartialEq for ObjectStoreMutation {
297    fn eq(&self, other: &Self) -> bool {
298        self.item.key.eq(&other.item.key)
299    }
300}
301
302impl Eq for ObjectStoreMutation {}
303
304impl Ord for AllocatorItem {
305    fn cmp(&self, other: &Self) -> Ordering {
306        self.key.cmp(&other.key)
307    }
308}
309
310impl PartialOrd for AllocatorItem {
311    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
312        Some(self.cmp(other))
313    }
314}
315
316/// Same as std::ops::Range but with Ord and PartialOrd support, sorted first by start of the range,
317/// then by the end.
318pub type DeviceRange = DeviceRangeV32;
319
320#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypeFingerprint)]
321#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
322pub struct DeviceRangeV32(pub Range<u64>);
323
324impl Deref for DeviceRange {
325    type Target = Range<u64>;
326
327    fn deref(&self) -> &Self::Target {
328        &self.0
329    }
330}
331
332impl DerefMut for DeviceRange {
333    fn deref_mut(&mut self) -> &mut Self::Target {
334        &mut self.0
335    }
336}
337
338impl From<Range<u64>> for DeviceRange {
339    fn from(range: Range<u64>) -> Self {
340        Self(range)
341    }
342}
343
344impl Into<Range<u64>> for DeviceRange {
345    fn into(self) -> Range<u64> {
346        self.0
347    }
348}
349
350impl Ord for DeviceRange {
351    fn cmp(&self, other: &Self) -> Ordering {
352        self.start.cmp(&other.start).then(self.end.cmp(&other.end))
353    }
354}
355
356impl PartialOrd for DeviceRange {
357    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
358        Some(self.cmp(other))
359    }
360}
361
362pub type AllocatorMutation = AllocatorMutationV32;
363
364#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint)]
365#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
366pub enum AllocatorMutationV32 {
367    Allocate {
368        device_range: DeviceRangeV32,
369        owner_object_id: u64,
370    },
371    Deallocate {
372        device_range: DeviceRangeV32,
373        owner_object_id: u64,
374    },
375    SetLimit {
376        owner_object_id: u64,
377        bytes: u64,
378    },
379    /// Marks all extents with a given owner_object_id for deletion.
380    /// Used to free space allocated to encrypted ObjectStore where we may not have the key.
381    /// Note that the actual deletion time is undefined so this should never be used where an
382    /// ObjectStore is still in use due to a high risk of corruption. Similarly, owner_object_id
383    /// should never be reused for the same reasons.
384    MarkForDeletion(u64),
385}
386
387pub type UpdateMutationsKey = UpdateMutationsKeyV49;
388
389#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
390pub struct UpdateMutationsKeyV49(pub FxfsKeyV49);
391
392#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
393#[migrate_to_version(UpdateMutationsKeyV49)]
394pub struct UpdateMutationsKeyV40(pub FxfsKeyV40);
395
396impl From<UpdateMutationsKey> for FxfsKey {
397    fn from(outer: UpdateMutationsKey) -> Self {
398        outer.0
399    }
400}
401
402impl From<FxfsKey> for UpdateMutationsKey {
403    fn from(inner: FxfsKey) -> Self {
404        Self(inner)
405    }
406}
407
408#[cfg(fuzz)]
409impl<'a> arbitrary::Arbitrary<'a> for UpdateMutationsKey {
410    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
411        Ok(UpdateMutationsKey::from(FxfsKey::arbitrary(u).unwrap()))
412    }
413}
414
415impl Ord for UpdateMutationsKey {
416    fn cmp(&self, other: &Self) -> Ordering {
417        (self as *const UpdateMutationsKey).cmp(&(other as *const _))
418    }
419}
420
421impl PartialOrd for UpdateMutationsKey {
422    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
423        Some(self.cmp(other))
424    }
425}
426
427impl Eq for UpdateMutationsKey {}
428
429impl PartialEq for UpdateMutationsKey {
430    fn eq(&self, other: &Self) -> bool {
431        std::ptr::eq(self, other)
432    }
433}
434
435/// When creating a transaction, locks typically need to be held to prevent two or more writers
436/// trying to make conflicting mutations at the same time.  LockKeys are used for this.
437/// NOTE: Ordering is important here!  The lock manager sorts the list of locks in a transaction
438/// to acquire them in a consistent order, but there are special cases for the Filesystem lock and
439/// the Flush lock.
440/// The Filesystem lock is taken by every transaction and is done so first, as part of the TxnGuard.
441/// The Flush lock is taken when we flush an LSM tree (e.g. an object store), and is held for
442/// several transactions.  As such, it must come first in the lock acquisition ordering, so that
443/// other transactions using the Flush lock have the same ordering as in flushing.
444#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Copy)]
445pub enum LockKey {
446    /// Locks the entire filesystem.
447    Filesystem,
448
449    /// Used to lock flushing an object.
450    Flush {
451        object_id: u64,
452    },
453
454    /// Used to lock changes to a particular object attribute (e.g. writes).
455    ObjectAttribute {
456        store_object_id: u64,
457        object_id: u64,
458        attribute_id: u64,
459    },
460
461    /// Used to lock changes to a particular object (e.g. adding a child to a directory).
462    Object {
463        store_object_id: u64,
464        object_id: u64,
465    },
466
467    ProjectId {
468        store_object_id: u64,
469        project_id: u64,
470    },
471
472    /// Used to lock any truncate operations for a file.
473    Truncate {
474        store_object_id: u64,
475        object_id: u64,
476    },
477}
478
479impl LockKey {
480    pub const fn object_attribute(store_object_id: u64, object_id: u64, attribute_id: u64) -> Self {
481        LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
482    }
483
484    pub const fn object(store_object_id: u64, object_id: u64) -> Self {
485        LockKey::Object { store_object_id, object_id }
486    }
487
488    pub const fn flush(object_id: u64) -> Self {
489        LockKey::Flush { object_id }
490    }
491
492    pub const fn truncate(store_object_id: u64, object_id: u64) -> Self {
493        LockKey::Truncate { store_object_id, object_id }
494    }
495}
496
497/// A container for holding `LockKey` objects. Can store a single `LockKey` inline.
498#[derive(Clone, Debug)]
499pub enum LockKeys {
500    None,
501    Inline(LockKey),
502    Vec(Vec<LockKey>),
503}
504
505impl LockKeys {
506    pub fn with_capacity(capacity: usize) -> Self {
507        if capacity > 1 { LockKeys::Vec(Vec::with_capacity(capacity)) } else { LockKeys::None }
508    }
509
510    pub fn push(&mut self, key: LockKey) {
511        match self {
512            Self::None => *self = LockKeys::Inline(key),
513            Self::Inline(inline) => {
514                *self = LockKeys::Vec(vec![*inline, key]);
515            }
516            Self::Vec(vec) => vec.push(key),
517        }
518    }
519
520    pub fn truncate(&mut self, len: usize) {
521        match self {
522            Self::None => {}
523            Self::Inline(_) => {
524                if len == 0 {
525                    *self = Self::None;
526                }
527            }
528            Self::Vec(vec) => vec.truncate(len),
529        }
530    }
531
532    fn len(&self) -> usize {
533        match self {
534            Self::None => 0,
535            Self::Inline(_) => 1,
536            Self::Vec(vec) => vec.len(),
537        }
538    }
539
540    fn contains(&self, key: &LockKey) -> bool {
541        match self {
542            Self::None => false,
543            Self::Inline(single) => single == key,
544            Self::Vec(vec) => vec.contains(key),
545        }
546    }
547
548    fn sort_unstable(&mut self) {
549        match self {
550            Self::Vec(vec) => vec.sort_unstable(),
551            _ => {}
552        }
553    }
554
555    fn dedup(&mut self) {
556        match self {
557            Self::Vec(vec) => vec.dedup(),
558            _ => {}
559        }
560    }
561
562    fn iter(&self) -> LockKeysIter<'_> {
563        match self {
564            LockKeys::None => LockKeysIter::None,
565            LockKeys::Inline(key) => LockKeysIter::Inline(key),
566            LockKeys::Vec(keys) => LockKeysIter::Vec(keys.into_iter()),
567        }
568    }
569}
570
571enum LockKeysIter<'a> {
572    None,
573    Inline(&'a LockKey),
574    Vec(<&'a Vec<LockKey> as IntoIterator>::IntoIter),
575}
576
577impl<'a> Iterator for LockKeysIter<'a> {
578    type Item = &'a LockKey;
579    fn next(&mut self) -> Option<Self::Item> {
580        match self {
581            Self::None => None,
582            Self::Inline(inline) => {
583                let next = *inline;
584                *self = Self::None;
585                Some(next)
586            }
587            Self::Vec(vec) => vec.next(),
588        }
589    }
590}
591
592impl Default for LockKeys {
593    fn default() -> Self {
594        LockKeys::None
595    }
596}
597
598#[macro_export]
599macro_rules! lock_keys {
600    () => {
601        $crate::object_store::transaction::LockKeys::None
602    };
603    ($lock_key:expr $(,)?) => {
604        $crate::object_store::transaction::LockKeys::Inline($lock_key)
605    };
606    ($($lock_keys:expr),+ $(,)?) => {
607        $crate::object_store::transaction::LockKeys::Vec(vec![$($lock_keys),+])
608    };
609}
610pub use lock_keys;
611
612/// Mutations in a transaction can be associated with an object so that when mutations are applied,
613/// updates can be applied to in-memory structures. For example, we cache object sizes, so when a
614/// size change is applied, we can update the cached object size.
615pub trait AssociatedObject: Send + Sync {
616    fn will_apply_mutation(&self, _mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
617    }
618}
619
620pub enum AssocObj<'a> {
621    None,
622    Borrowed(&'a dyn AssociatedObject),
623    Owned(Box<dyn AssociatedObject>),
624}
625
626impl AssocObj<'_> {
627    pub fn map<R, F: FnOnce(&dyn AssociatedObject) -> R>(&self, f: F) -> Option<R> {
628        match self {
629            AssocObj::None => None,
630            AssocObj::Borrowed(b) => Some(f(*b)),
631            AssocObj::Owned(o) => Some(f(o.as_ref())),
632        }
633    }
634}
635
636pub struct TxnMutation<'a> {
637    // This, at time of writing, is either the object ID of an object store, or the object ID of the
638    // allocator.  In the case of an object mutation, there's another object ID in the mutation
639    // record that would be for the object actually being changed.
640    pub object_id: u64,
641
642    // The actual mutation.  This gets serialized to the journal.
643    pub mutation: Mutation,
644
645    // An optional associated object for the mutation.  During replay, there will always be no
646    // associated object.
647    pub associated_object: AssocObj<'a>,
648}
649
650// We store TxnMutation in a set, and for that, we only use object_id and mutation and not the
651// associated object or checksum.
652impl Ord for TxnMutation<'_> {
653    fn cmp(&self, other: &Self) -> Ordering {
654        self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
655    }
656}
657
658impl PartialOrd for TxnMutation<'_> {
659    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
660        Some(self.cmp(other))
661    }
662}
663
664impl PartialEq for TxnMutation<'_> {
665    fn eq(&self, other: &Self) -> bool {
666        self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
667    }
668}
669
670impl Eq for TxnMutation<'_> {}
671
672impl std::fmt::Debug for TxnMutation<'_> {
673    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
674        f.debug_struct("TxnMutation")
675            .field("object_id", &self.object_id)
676            .field("mutation", &self.mutation)
677            .finish()
678    }
679}
680
681pub enum MetadataReservation {
682    // The state after a transaction has been dropped.
683    None,
684
685    // Metadata space for this transaction is being borrowed from ObjectManager's metadata
686    // reservation.
687    Borrowed,
688
689    // A metadata reservation was made when the transaction was created.
690    Reservation(Reservation),
691
692    // The metadata space is being _held_ within `allocator_reservation`.
693    Hold(u64),
694}
695
696/// A transaction groups mutation records to be committed as a group.
697pub struct Transaction<'a> {
698    txn_guard: TxnGuard<'a>,
699
700    // The mutations that make up this transaction.
701    mutations: BTreeSet<TxnMutation<'a>>,
702
703    // The locks that this transaction currently holds.
704    txn_locks: LockKeys,
705
706    /// If set, an allocator reservation that should be used for allocations.
707    pub allocator_reservation: Option<&'a Reservation>,
708
709    /// The reservation for the metadata for this transaction.
710    pub metadata_reservation: MetadataReservation,
711
712    // Keep track of objects explicitly created by this transaction. No locks are required for them.
713    // Addressed by (owner_object_id, object_id).
714    new_objects: BTreeSet<(u64, u64)>,
715
716    /// Any data checksums which should be evaluated when replaying this transaction.
717    checksums: Vec<(Range<u64>, Vec<Checksum>, bool)>,
718
719    /// Set if this transaction contains data (i.e. includes any extent mutations).
720    includes_write: bool,
721}
722
723impl<'a> Transaction<'a> {
724    /// Creates a new transaction.  `txn_locks` are read locks that can be upgraded to write locks
725    /// at commit time.
726    pub async fn new(
727        txn_guard: TxnGuard<'a>,
728        options: Options<'a>,
729        txn_locks: LockKeys,
730    ) -> Result<Transaction<'a>, Error> {
731        txn_guard.fs().add_transaction(options.skip_journal_checks).await;
732        let fs = txn_guard.fs().clone();
733        let guard = scopeguard::guard((), |_| fs.sub_transaction());
734        let (metadata_reservation, allocator_reservation, hold) =
735            txn_guard.fs().reservation_for_transaction(options).await?;
736
737        let txn_locks = {
738            let lock_manager = txn_guard.fs().lock_manager();
739            let mut write_guard = lock_manager.txn_lock(txn_locks).await;
740            std::mem::take(&mut write_guard.0.lock_keys)
741        };
742        let mut transaction = Transaction {
743            txn_guard,
744            mutations: BTreeSet::new(),
745            txn_locks,
746            allocator_reservation: None,
747            metadata_reservation,
748            new_objects: BTreeSet::new(),
749            checksums: Vec::new(),
750            includes_write: false,
751        };
752
753        ScopeGuard::into_inner(guard);
754        hold.map(|h| h.forget()); // Transaction takes ownership from here on.
755        transaction.allocator_reservation = allocator_reservation;
756        Ok(transaction)
757    }
758
759    pub fn txn_guard(&self) -> &TxnGuard<'_> {
760        &self.txn_guard
761    }
762
763    pub fn mutations(&self) -> &BTreeSet<TxnMutation<'a>> {
764        &self.mutations
765    }
766
767    pub fn take_mutations(&mut self) -> BTreeSet<TxnMutation<'a>> {
768        self.new_objects.clear();
769        mem::take(&mut self.mutations)
770    }
771
772    /// Adds a mutation to this transaction.  If the mutation already exists, it is replaced and the
773    /// old mutation is returned.
774    pub fn add(&mut self, object_id: u64, mutation: Mutation) -> Option<Mutation> {
775        self.add_with_object(object_id, mutation, AssocObj::None)
776    }
777
778    /// Removes a mutation that matches `mutation`.
779    pub fn remove(&mut self, object_id: u64, mutation: Mutation) {
780        let txn_mutation = TxnMutation { object_id, mutation, associated_object: AssocObj::None };
781        if self.mutations.remove(&txn_mutation) {
782            if let Mutation::ObjectStore(ObjectStoreMutation {
783                item:
784                    ObjectItem {
785                        key: ObjectKey { object_id: new_object_id, data: ObjectKeyData::Object },
786                        ..
787                    },
788                op: Operation::Insert,
789            }) = txn_mutation.mutation
790            {
791                self.new_objects.remove(&(object_id, new_object_id));
792            }
793        }
794    }
795
796    /// Adds a mutation with an associated object. If the mutation already exists, it is replaced
797    /// and the old mutation is returned.
798    pub fn add_with_object(
799        &mut self,
800        object_id: u64,
801        mutation: Mutation,
802        associated_object: AssocObj<'a>,
803    ) -> Option<Mutation> {
804        assert!(object_id != INVALID_OBJECT_ID);
805        if let Mutation::ObjectStore(ObjectStoreMutation {
806            item:
807                Item {
808                    key:
809                        ObjectKey { data: ObjectKeyData::Attribute(_, AttributeKey::Extent(_)), .. },
810                    ..
811                },
812            ..
813        }) = &mutation
814        {
815            self.includes_write = true;
816        }
817        let txn_mutation = TxnMutation { object_id, mutation, associated_object };
818        self.verify_locks(&txn_mutation);
819        self.mutations.replace(txn_mutation).map(|m| m.mutation)
820    }
821
822    pub fn add_checksum(&mut self, range: Range<u64>, checksums: Vec<Checksum>, first_write: bool) {
823        self.checksums.push((range, checksums, first_write));
824    }
825
826    pub fn includes_write(&self) -> bool {
827        self.includes_write
828    }
829
830    pub fn checksums(&self) -> &[(Range<u64>, Vec<Checksum>, bool)] {
831        &self.checksums
832    }
833
834    pub fn take_checksums(&mut self) -> Vec<(Range<u64>, Vec<Checksum>, bool)> {
835        std::mem::replace(&mut self.checksums, Vec::new())
836    }
837
838    fn verify_locks(&mut self, mutation: &TxnMutation<'_>) {
839        // It was considered to change the locks from Vec to BTreeSet since we'll now be searching
840        // through it, but given the small set that these locks usually comprise, it probably isn't
841        // worth it.
842        match mutation {
843            TxnMutation {
844                mutation:
845                    Mutation::ObjectStore {
846                        0: ObjectStoreMutation { item: ObjectItem { key, .. }, op },
847                    },
848                object_id: store_object_id,
849                ..
850            } => {
851                match &key.data {
852                    ObjectKeyData::Attribute(..) => {
853                        // TODO(https://fxbug.dev/42073914): Check lock requirements.
854                    }
855                    ObjectKeyData::Child { .. }
856                    | ObjectKeyData::EncryptedChild(_)
857                    | ObjectKeyData::EncryptedCasefoldChild(_)
858                    | ObjectKeyData::CasefoldChild { .. } => {
859                        let id = key.object_id;
860                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
861                            && !self.new_objects.contains(&(*store_object_id, id))
862                        {
863                            debug_assert!(
864                                false,
865                                "Not holding required lock for object {id} \
866                                in store {store_object_id}"
867                            );
868                            error!(
869                                "Not holding required lock for object {id} in store \
870                                {store_object_id}"
871                            )
872                        }
873                    }
874                    ObjectKeyData::GraveyardEntry { .. } => {
875                        // TODO(https://fxbug.dev/42073911): Check lock requirements.
876                    }
877                    ObjectKeyData::GraveyardAttributeEntry { .. } => {
878                        // TODO(https://fxbug.dev/122974): Check lock requirements.
879                    }
880                    ObjectKeyData::Keys => {
881                        let id = key.object_id;
882                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
883                            && !self.new_objects.contains(&(*store_object_id, id))
884                        {
885                            debug_assert!(
886                                false,
887                                "Not holding required lock for object {id} \
888                                in store {store_object_id}"
889                            );
890                            error!(
891                                "Not holding required lock for object {id} in store \
892                                {store_object_id}"
893                            )
894                        }
895                    }
896                    ObjectKeyData::Object => match op {
897                        // Insert implies the caller expects no object with which to race
898                        Operation::Insert => {
899                            self.new_objects.insert((*store_object_id, key.object_id));
900                        }
901                        Operation::Merge | Operation::ReplaceOrInsert => {
902                            let id = key.object_id;
903                            if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
904                                && !self.new_objects.contains(&(*store_object_id, id))
905                            {
906                                debug_assert!(
907                                    false,
908                                    "Not holding required lock for object {id} \
909                                    in store {store_object_id}"
910                                );
911                                error!(
912                                    "Not holding required lock for object {id} in store \
913                                    {store_object_id}"
914                                )
915                            }
916                        }
917                    },
918                    ObjectKeyData::Project { project_id, property: ProjectProperty::Limit } => {
919                        if !self.txn_locks.contains(&LockKey::ProjectId {
920                            store_object_id: *store_object_id,
921                            project_id: *project_id,
922                        }) {
923                            debug_assert!(
924                                false,
925                                "Not holding required lock for project limit id {project_id} \
926                                in store {store_object_id}"
927                            );
928                            error!(
929                                "Not holding required lock for project limit id {project_id} in \
930                                store {store_object_id}"
931                            )
932                        }
933                    }
934                    ObjectKeyData::Project { property: ProjectProperty::Usage, .. } => match op {
935                        Operation::Insert | Operation::ReplaceOrInsert => {
936                            panic!(
937                                "Project usage is all handled by merging deltas, no inserts or \
938                                replacements should be used"
939                            );
940                        }
941                        // Merges are all handled like atomic +/- and serialized by the tree locks.
942                        Operation::Merge => {}
943                    },
944                    ObjectKeyData::ExtendedAttribute { .. } => {
945                        let id = key.object_id;
946                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
947                            && !self.new_objects.contains(&(*store_object_id, id))
948                        {
949                            debug_assert!(
950                                false,
951                                "Not holding required lock for object {id} \
952                                in store {store_object_id} while mutating extended attribute"
953                            );
954                            error!(
955                                "Not holding required lock for object {id} in store \
956                                {store_object_id} while mutating extended attribute"
957                            )
958                        }
959                    }
960                }
961            }
962            TxnMutation { mutation: Mutation::DeleteVolume, object_id, .. } => {
963                if !self.txn_locks.contains(&LockKey::flush(*object_id)) {
964                    debug_assert!(false, "Not holding required lock for DeleteVolume");
965                    error!("Not holding required lock for DeleteVolume");
966                }
967            }
968            _ => {}
969        }
970    }
971
972    /// Returns true if this transaction has no mutations.
973    pub fn is_empty(&self) -> bool {
974        self.mutations.is_empty()
975    }
976
977    /// Searches for an existing object mutation within the transaction that has the given key and
978    /// returns it if found.
979    pub fn get_object_mutation(
980        &self,
981        store_object_id: u64,
982        key: ObjectKey,
983    ) -> Option<&ObjectStoreMutation> {
984        if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
985            self.mutations.get(&TxnMutation {
986                object_id: store_object_id,
987                mutation: Mutation::insert_object(key, ObjectValue::None),
988                associated_object: AssocObj::None,
989            })
990        {
991            Some(mutation)
992        } else {
993            None
994        }
995    }
996
997    /// Commits a transaction.  If successful, returns the journal offset of the transaction.
998    pub async fn commit(mut self) -> Result<u64, Error> {
999        debug!(txn:? = &self; "Commit");
1000        self.txn_guard.fs().clone().commit_transaction(&mut self, &mut |_| {}).await
1001    }
1002
1003    /// Commits and then runs the callback whilst locks are held.  The callback accepts a single
1004    /// parameter which is the journal offset of the transaction.
1005    pub async fn commit_with_callback<R: Send>(
1006        mut self,
1007        f: impl FnOnce(u64) -> R + Send,
1008    ) -> Result<R, Error> {
1009        debug!(txn:? = &self; "Commit");
1010        // It's not possible to pass an FnOnce via a trait without boxing it, but we don't want to
1011        // do that (for performance reasons), hence the reason for the following.
1012        let mut f = Some(f);
1013        let mut result = None;
1014        self.txn_guard
1015            .fs()
1016            .clone()
1017            .commit_transaction(&mut self, &mut |offset| {
1018                result = Some(f.take().unwrap()(offset));
1019            })
1020            .await?;
1021        Ok(result.unwrap())
1022    }
1023
1024    /// Commits the transaction, but allows the transaction to be used again.  The locks are not
1025    /// dropped (but transaction locks will get downgraded to read locks).
1026    pub async fn commit_and_continue(&mut self) -> Result<(), Error> {
1027        debug!(txn:? = self; "Commit");
1028        self.txn_guard.fs().clone().commit_transaction(self, &mut |_| {}).await?;
1029        assert!(self.mutations.is_empty());
1030        self.txn_guard.fs().lock_manager().downgrade_locks(&self.txn_locks);
1031        Ok(())
1032    }
1033}
1034
1035impl Drop for Transaction<'_> {
1036    fn drop(&mut self) {
1037        // Call the filesystem implementation of drop_transaction which should, as a minimum, call
1038        // LockManager's drop_transaction to ensure the locks are released.
1039        debug!(txn:? = &self; "Drop");
1040        self.txn_guard.fs().clone().drop_transaction(self);
1041    }
1042}
1043
1044impl std::fmt::Debug for Transaction<'_> {
1045    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1046        f.debug_struct("Transaction")
1047            .field("mutations", &self.mutations)
1048            .field("txn_locks", &self.txn_locks)
1049            .field("reservation", &self.allocator_reservation)
1050            .finish()
1051    }
1052}
1053
1054pub enum BorrowedOrOwned<'a, T> {
1055    Borrowed(&'a T),
1056    Owned(T),
1057}
1058
1059impl<T> Deref for BorrowedOrOwned<'_, T> {
1060    type Target = T;
1061
1062    fn deref(&self) -> &Self::Target {
1063        match self {
1064            BorrowedOrOwned::Borrowed(b) => b,
1065            BorrowedOrOwned::Owned(o) => &o,
1066        }
1067    }
1068}
1069
1070impl<'a, T> From<&'a T> for BorrowedOrOwned<'a, T> {
1071    fn from(value: &'a T) -> Self {
1072        BorrowedOrOwned::Borrowed(value)
1073    }
1074}
1075
1076impl<T> From<T> for BorrowedOrOwned<'_, T> {
1077    fn from(value: T) -> Self {
1078        BorrowedOrOwned::Owned(value)
1079    }
1080}
1081
1082/// LockManager holds the locks that transactions might have taken.  A TransactionManager
1083/// implementation would typically have one of these.
1084///
1085/// Three different kinds of locks are supported.  There are read locks and write locks, which are
1086/// as one would expect.  The third kind of lock is a _transaction_ lock (which is also known as an
1087/// upgradeable read lock).  When first acquired, these block other writes (including other
1088/// transaction locks) but do not block reads.  When it is time to commit a transaction, these locks
1089/// are upgraded to full write locks (without ever dropping the lock) and then dropped after
1090/// committing (unless commit_and_continue is used).  This way, reads are only blocked for the
1091/// shortest possible time.  It follows that write locks should be used sparingly.  Locks are
1092/// granted in order with one exception: when a lock is in the initial _transaction_ lock state
1093/// (LockState::Locked), all read locks are allowed even if there are other tasks waiting for the
1094/// lock.  The reason for this is because we allow read locks to be taken by tasks that have taken a
1095/// _transaction_ lock (i.e. recursion is allowed).  In other cases, such as when a writer is
1096/// waiting and there are only readers, readers will queue up behind the writer.
1097///
1098/// To summarize:
1099///
1100/// +-------------------------+-----------------+----------------+------------------+
1101/// |                         | While read_lock | While txn_lock | While write_lock |
1102/// |                         | is held         | is held        | is held          |
1103/// +-------------------------+-----------------+----------------+------------------+
1104/// | Can acquire read_lock?  | true            | true           | false            |
1105/// +-------------------------+-----------------+----------------+------------------+
1106/// | Can acquire txn_lock?   | true            | false          | false            |
1107/// +-------------------------+-----------------+----------------+------------------+
1108/// | Can acquire write_lock? | false           | false          | false            |
1109/// +-------------------------+-----------------+----------------+------------------+
1110pub struct LockManager {
1111    locks: Mutex<Locks>,
1112}
1113
1114struct Locks {
1115    keys: HashMap<LockKey, LockEntry>,
1116}
1117
1118impl Locks {
1119    fn drop_lock(&mut self, key: LockKey, state: LockState) {
1120        if let Entry::Occupied(mut occupied) = self.keys.entry(key) {
1121            let entry = occupied.get_mut();
1122            let wake = match state {
1123                LockState::ReadLock => {
1124                    entry.read_count -= 1;
1125                    entry.read_count == 0
1126                }
1127                // drop_write_locks currently depends on us treating Locked and WriteLock the same.
1128                LockState::Locked | LockState::WriteLock => {
1129                    entry.state = LockState::ReadLock;
1130                    true
1131                }
1132            };
1133            if wake {
1134                // SAFETY: The lock in `LockManager::locks` is held.
1135                unsafe {
1136                    entry.wake();
1137                }
1138                if entry.can_remove() {
1139                    occupied.remove_entry();
1140                }
1141            }
1142        } else {
1143            unreachable!();
1144        }
1145    }
1146
1147    fn drop_read_locks(&mut self, lock_keys: LockKeys) {
1148        for lock in lock_keys.iter() {
1149            self.drop_lock(*lock, LockState::ReadLock);
1150        }
1151    }
1152
1153    fn drop_write_locks(&mut self, lock_keys: LockKeys) {
1154        for lock in lock_keys.iter() {
1155            // This is a bit hacky, but this works for locks in either the Locked or WriteLock
1156            // states.
1157            self.drop_lock(*lock, LockState::WriteLock);
1158        }
1159    }
1160
1161    // Downgrades locks from WriteLock to Locked.
1162    fn downgrade_locks(&mut self, lock_keys: &LockKeys) {
1163        for lock in lock_keys.iter() {
1164            // SAFETY: The lock in `LockManager::locks` is held.
1165            unsafe {
1166                self.keys.get_mut(lock).unwrap().downgrade_lock();
1167            }
1168        }
1169    }
1170}
1171
1172#[derive(Debug)]
1173struct LockEntry {
1174    // In the states that allow readers (ReadLock, Locked), this count can be non-zero
1175    // to indicate the number of active readers.
1176    read_count: u64,
1177
1178    // The state of the lock (see below).
1179    state: LockState,
1180
1181    // A doubly-linked list of wakers that should be woken when they have been granted the lock.
1182    // New wakers are usually chained on to tail, with the exception being the case where a lock in
1183    // state Locked is to be upgraded to WriteLock, but can't because there are readers.  It might
1184    // be possible to use intrusive-collections in the future.
1185    head: *const LockWaker,
1186    tail: *const LockWaker,
1187}
1188
1189unsafe impl Send for LockEntry {}
1190
1191// Represents a node in the waker list.  It is only safe to access the members wrapped by UnsafeCell
1192// when LockManager's `locks` member is locked.
1193struct LockWaker {
1194    // The next and previous pointers in the doubly-linked list.
1195    next: UnsafeCell<*const LockWaker>,
1196    prev: UnsafeCell<*const LockWaker>,
1197
1198    // Holds the lock key for this waker.  This is required so that we can find the associated
1199    // `LockEntry`.
1200    key: LockKey,
1201
1202    // The underlying waker that should be used to wake the task.
1203    waker: UnsafeCell<WakerState>,
1204
1205    // The target state for this waker.
1206    target_state: LockState,
1207
1208    // True if this is an upgrade.
1209    is_upgrade: bool,
1210
1211    // We need to be pinned because these form part of the linked list.
1212    _pin: PhantomPinned,
1213}
1214
1215enum WakerState {
1216    // This is the initial state before the waker has been first polled.
1217    Pending,
1218
1219    // Once polled, this contains the actual waker.
1220    Registered(Waker),
1221
1222    // The waker has been woken and has been granted the lock.
1223    Woken,
1224}
1225
1226impl WakerState {
1227    fn is_woken(&self) -> bool {
1228        matches!(self, WakerState::Woken)
1229    }
1230}
1231
1232unsafe impl Send for LockWaker {}
1233unsafe impl Sync for LockWaker {}
1234
1235impl LockWaker {
1236    // Waits for the waker to be woken.
1237    async fn wait(&self, manager: &LockManager) {
1238        // We must guard against the future being dropped.
1239        let waker_guard = scopeguard::guard((), |_| {
1240            let mut locks = manager.locks.lock();
1241            // SAFETY: We've acquired the lock.
1242            unsafe {
1243                if (*self.waker.get()).is_woken() {
1244                    // We were woken, but didn't actually run, so we must drop the lock.
1245                    if self.is_upgrade {
1246                        locks.keys.get_mut(&self.key).unwrap().downgrade_lock();
1247                    } else {
1248                        locks.drop_lock(self.key, self.target_state);
1249                    }
1250                } else {
1251                    // We haven't been woken but we've been dropped so we must remove ourself from
1252                    // the waker list.
1253                    locks.keys.get_mut(&self.key).unwrap().remove_waker(self);
1254                }
1255            }
1256        });
1257
1258        poll_fn(|cx| {
1259            let _locks = manager.locks.lock();
1260            // SAFETY: We've acquired the lock.
1261            unsafe {
1262                if (*self.waker.get()).is_woken() {
1263                    Poll::Ready(())
1264                } else {
1265                    *self.waker.get() = WakerState::Registered(cx.waker().clone());
1266                    Poll::Pending
1267                }
1268            }
1269        })
1270        .await;
1271
1272        ScopeGuard::into_inner(waker_guard);
1273    }
1274}
1275
1276#[derive(Copy, Clone, Debug, PartialEq)]
1277enum LockState {
1278    // In this state, there are only readers.
1279    ReadLock,
1280
1281    // This state is used for transactions to lock other writers (including other transactions), but
1282    // it still allows readers.
1283    Locked,
1284
1285    // A writer has exclusive access; all other readers and writers are blocked.
1286    WriteLock,
1287}
1288
1289impl LockManager {
1290    pub fn new() -> Self {
1291        LockManager { locks: Mutex::new(Locks { keys: HashMap::default() }) }
1292    }
1293
1294    /// Acquires the locks.  It is the caller's responsibility to ensure that drop_transaction is
1295    /// called when a transaction is dropped i.e. the filesystem's drop_transaction method should
1296    /// call LockManager's drop_transaction method.
1297    pub async fn txn_lock<'a>(&'a self, lock_keys: LockKeys) -> TransactionLocks<'a> {
1298        TransactionLocks(
1299            debug_assert_not_too_long!(self.lock(lock_keys, LockState::Locked)).right().unwrap(),
1300        )
1301    }
1302
1303    // `state` indicates the kind of lock required.  ReadLock means acquire a read lock.  Locked
1304    // means lock other writers, but still allow readers.  WriteLock means acquire a write lock.
1305    async fn lock<'a>(
1306        &'a self,
1307        mut lock_keys: LockKeys,
1308        target_state: LockState,
1309    ) -> Either<ReadGuard<'a>, WriteGuard<'a>> {
1310        let mut guard = match &target_state {
1311            LockState::ReadLock => Left(ReadGuard {
1312                manager: self.into(),
1313                lock_keys: LockKeys::with_capacity(lock_keys.len()),
1314            }),
1315            LockState::Locked | LockState::WriteLock => Right(WriteGuard {
1316                manager: self.into(),
1317                lock_keys: LockKeys::with_capacity(lock_keys.len()),
1318            }),
1319        };
1320        let guard_keys = match &mut guard {
1321            Left(g) => &mut g.lock_keys,
1322            Right(g) => &mut g.lock_keys,
1323        };
1324        lock_keys.sort_unstable();
1325        lock_keys.dedup();
1326        for lock in lock_keys.iter() {
1327            let lock_waker = None;
1328            pin_mut!(lock_waker);
1329            {
1330                let mut locks = self.locks.lock();
1331                match locks.keys.entry(*lock) {
1332                    Entry::Vacant(vacant) => {
1333                        vacant.insert(LockEntry {
1334                            read_count: if let LockState::ReadLock = target_state {
1335                                guard_keys.push(*lock);
1336                                1
1337                            } else {
1338                                guard_keys.push(*lock);
1339                                0
1340                            },
1341                            state: target_state,
1342                            head: std::ptr::null(),
1343                            tail: std::ptr::null(),
1344                        });
1345                    }
1346                    Entry::Occupied(mut occupied) => {
1347                        let entry = occupied.get_mut();
1348                        // SAFETY: We've acquired the lock.
1349                        if unsafe { entry.is_allowed(target_state, entry.head.is_null()) } {
1350                            if let LockState::ReadLock = target_state {
1351                                entry.read_count += 1;
1352                                guard_keys.push(*lock);
1353                            } else {
1354                                entry.state = target_state;
1355                                guard_keys.push(*lock);
1356                            }
1357                        } else {
1358                            // Initialise a waker and push it on the tail of the list.
1359                            // SAFETY: `lock_waker` isn't used prior to this point.
1360                            unsafe {
1361                                *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1362                                    next: UnsafeCell::new(std::ptr::null()),
1363                                    prev: UnsafeCell::new(entry.tail),
1364                                    key: *lock,
1365                                    waker: UnsafeCell::new(WakerState::Pending),
1366                                    target_state: target_state,
1367                                    is_upgrade: false,
1368                                    _pin: PhantomPinned,
1369                                });
1370                            }
1371                            let waker = (*lock_waker).as_ref().unwrap();
1372                            if entry.tail.is_null() {
1373                                entry.head = waker;
1374                            } else {
1375                                // SAFETY: We've acquired the lock.
1376                                unsafe {
1377                                    *(*entry.tail).next.get() = waker;
1378                                }
1379                            }
1380                            entry.tail = waker;
1381                        }
1382                    }
1383                }
1384            }
1385            if let Some(waker) = &*lock_waker {
1386                waker.wait(self).await;
1387                guard_keys.push(*lock);
1388            }
1389        }
1390        guard
1391    }
1392
1393    /// This should be called by the filesystem's drop_transaction implementation.
1394    pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
1395        let mut locks = self.locks.lock();
1396        locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks));
1397    }
1398
1399    /// Prepares to commit by waiting for readers to finish.
1400    pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
1401        self.commit_prepare_keys(&transaction.txn_locks).await;
1402    }
1403
1404    async fn commit_prepare_keys(&self, lock_keys: &LockKeys) {
1405        for lock in lock_keys.iter() {
1406            let lock_waker = None;
1407            pin_mut!(lock_waker);
1408            {
1409                let mut locks = self.locks.lock();
1410                let entry = locks.keys.get_mut(lock).unwrap();
1411                assert_eq!(entry.state, LockState::Locked);
1412
1413                if entry.read_count == 0 {
1414                    entry.state = LockState::WriteLock;
1415                } else {
1416                    // Initialise a waker and push it on the head of the list.
1417                    // SAFETY: `lock_waker` isn't used prior to this point.
1418                    unsafe {
1419                        *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1420                            next: UnsafeCell::new(entry.head),
1421                            prev: UnsafeCell::new(std::ptr::null()),
1422                            key: *lock,
1423                            waker: UnsafeCell::new(WakerState::Pending),
1424                            target_state: LockState::WriteLock,
1425                            is_upgrade: true,
1426                            _pin: PhantomPinned,
1427                        });
1428                    }
1429                    let waker = (*lock_waker).as_ref().unwrap();
1430                    if entry.head.is_null() {
1431                        entry.tail = (*lock_waker).as_ref().unwrap();
1432                    } else {
1433                        // SAFETY: We've acquired the lock.
1434                        unsafe {
1435                            *(*entry.head).prev.get() = waker;
1436                        }
1437                    }
1438                    entry.head = waker;
1439                }
1440            }
1441
1442            if let Some(waker) = &*lock_waker {
1443                waker.wait(self).await;
1444            }
1445        }
1446    }
1447
1448    /// Acquires a read lock for the given keys.  Read locks are only blocked whilst a transaction
1449    /// is being committed for the same locks.  They are only necessary where consistency is
1450    /// required between different mutations within a transaction.  For example, a write might
1451    /// change the size and extents for an object, in which case a read lock is required so that
1452    /// observed size and extents are seen together or not at all.
1453    pub async fn read_lock<'a>(&'a self, lock_keys: LockKeys) -> ReadGuard<'a> {
1454        debug_assert_not_too_long!(self.lock(lock_keys, LockState::ReadLock)).left().unwrap()
1455    }
1456
1457    /// Acquires a write lock for the given keys.  Write locks provide exclusive access to the
1458    /// requested lock keys.
1459    pub async fn write_lock<'a>(&'a self, lock_keys: LockKeys) -> WriteGuard<'a> {
1460        debug_assert_not_too_long!(self.lock(lock_keys, LockState::WriteLock)).right().unwrap()
1461    }
1462
1463    /// Downgrades locks from the WriteLock state to Locked state.  This will panic if the locks are
1464    /// not in the WriteLock state.
1465    pub fn downgrade_locks(&self, lock_keys: &LockKeys) {
1466        self.locks.lock().downgrade_locks(lock_keys);
1467    }
1468}
1469
1470// These unsafe functions require that `locks` in LockManager is locked.
1471impl LockEntry {
1472    unsafe fn wake(&mut self) {
1473        // If the lock's state is WriteLock, or there's nothing waiting, return early.
1474        if self.head.is_null() || self.state == LockState::WriteLock {
1475            return;
1476        }
1477
1478        let waker = unsafe { &*self.head };
1479
1480        if waker.is_upgrade {
1481            if self.read_count > 0 {
1482                return;
1483            }
1484        } else if !unsafe { self.is_allowed(waker.target_state, true) } {
1485            return;
1486        }
1487
1488        unsafe { self.pop_and_wake() };
1489
1490        // If the waker was a write lock, we can't wake any more up, but otherwise, we can keep
1491        // waking up readers.
1492        if waker.target_state == LockState::WriteLock {
1493            return;
1494        }
1495
1496        while !self.head.is_null() && unsafe { (*self.head).target_state } == LockState::ReadLock {
1497            unsafe { self.pop_and_wake() };
1498        }
1499    }
1500
1501    unsafe fn pop_and_wake(&mut self) {
1502        let waker = unsafe { &*self.head };
1503
1504        // Pop the waker.
1505        self.head = unsafe { *waker.next.get() };
1506        if self.head.is_null() {
1507            self.tail = std::ptr::null()
1508        } else {
1509            unsafe { *(*self.head).prev.get() = std::ptr::null() };
1510        }
1511
1512        // Adjust our state accordingly.
1513        if waker.target_state == LockState::ReadLock {
1514            self.read_count += 1;
1515        } else {
1516            self.state = waker.target_state;
1517        }
1518
1519        // Now wake the task.
1520        if let WakerState::Registered(waker) =
1521            std::mem::replace(unsafe { &mut *waker.waker.get() }, WakerState::Woken)
1522        {
1523            waker.wake();
1524        }
1525    }
1526
1527    fn can_remove(&self) -> bool {
1528        self.state == LockState::ReadLock && self.read_count == 0
1529    }
1530
1531    unsafe fn remove_waker(&mut self, waker: &LockWaker) {
1532        unsafe {
1533            let is_first = (*waker.prev.get()).is_null();
1534            if is_first {
1535                self.head = *waker.next.get();
1536            } else {
1537                *(**waker.prev.get()).next.get() = *waker.next.get();
1538            }
1539            if (*waker.next.get()).is_null() {
1540                self.tail = *waker.prev.get();
1541            } else {
1542                *(**waker.next.get()).prev.get() = *waker.prev.get();
1543            }
1544            if is_first {
1545                // We must call wake in case we erased a pending write lock and readers can now
1546                // proceed.
1547                self.wake();
1548            }
1549        }
1550    }
1551
1552    // Returns whether or not a lock with given `target_state` can proceed.  `is_head` should be
1553    // true if this is something at the head of the waker list (or the waker list is empty) and
1554    // false if there are other items on the waker list that are prior.
1555    unsafe fn is_allowed(&self, target_state: LockState, is_head: bool) -> bool {
1556        match self.state {
1557            LockState::ReadLock => {
1558                // Allow ReadLock and Locked so long as nothing else is waiting.
1559                (self.read_count == 0
1560                    || target_state == LockState::Locked
1561                    || target_state == LockState::ReadLock)
1562                    && is_head
1563            }
1564            LockState::Locked => {
1565                // Always allow reads unless there's an upgrade waiting.  We have to
1566                // always allow reads in this state because tasks that have locks in
1567                // the Locked state can later try and acquire ReadLock.
1568                target_state == LockState::ReadLock
1569                    && (is_head || unsafe { !(*self.head).is_upgrade })
1570            }
1571            LockState::WriteLock => false,
1572        }
1573    }
1574
1575    unsafe fn downgrade_lock(&mut self) {
1576        assert_eq!(std::mem::replace(&mut self.state, LockState::Locked), LockState::WriteLock);
1577        unsafe { self.wake() };
1578    }
1579}
1580
1581#[must_use]
1582pub struct ReadGuard<'a> {
1583    manager: LockManagerRef<'a>,
1584    lock_keys: LockKeys,
1585}
1586
1587impl ReadGuard<'_> {
1588    pub fn fs(&self) -> Option<&Arc<FxFilesystem>> {
1589        if let LockManagerRef::Owned(fs) = &self.manager { Some(fs) } else { None }
1590    }
1591
1592    pub fn into_owned(mut self, fs: Arc<FxFilesystem>) -> ReadGuard<'static> {
1593        ReadGuard {
1594            manager: LockManagerRef::Owned(fs),
1595            lock_keys: std::mem::replace(&mut self.lock_keys, LockKeys::None),
1596        }
1597    }
1598}
1599
1600impl Drop for ReadGuard<'_> {
1601    fn drop(&mut self) {
1602        let mut locks = self.manager.locks.lock();
1603        locks.drop_read_locks(std::mem::take(&mut self.lock_keys));
1604    }
1605}
1606
1607impl fmt::Debug for ReadGuard<'_> {
1608    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1609        f.debug_struct("ReadGuard")
1610            .field("manager", &(&self.manager as *const _))
1611            .field("lock_keys", &self.lock_keys)
1612            .finish()
1613    }
1614}
1615
1616#[must_use]
1617pub struct WriteGuard<'a> {
1618    manager: LockManagerRef<'a>,
1619    lock_keys: LockKeys,
1620}
1621
1622impl Drop for WriteGuard<'_> {
1623    fn drop(&mut self) {
1624        let mut locks = self.manager.locks.lock();
1625        locks.drop_write_locks(std::mem::take(&mut self.lock_keys));
1626    }
1627}
1628
1629impl fmt::Debug for WriteGuard<'_> {
1630    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1631        f.debug_struct("WriteGuard")
1632            .field("manager", &(&self.manager as *const _))
1633            .field("lock_keys", &self.lock_keys)
1634            .finish()
1635    }
1636}
1637
1638enum LockManagerRef<'a> {
1639    Borrowed(&'a LockManager),
1640    Owned(Arc<FxFilesystem>),
1641}
1642
1643impl Deref for LockManagerRef<'_> {
1644    type Target = LockManager;
1645
1646    fn deref(&self) -> &Self::Target {
1647        match self {
1648            LockManagerRef::Borrowed(m) => m,
1649            LockManagerRef::Owned(f) => f.lock_manager(),
1650        }
1651    }
1652}
1653
1654impl<'a> From<&'a LockManager> for LockManagerRef<'a> {
1655    fn from(value: &'a LockManager) -> Self {
1656        LockManagerRef::Borrowed(value)
1657    }
1658}
1659
1660#[cfg(test)]
1661mod tests {
1662    use super::{LockKey, LockKeys, LockManager, LockState, Mutation, Options};
1663    use crate::filesystem::FxFilesystem;
1664    use fuchsia_async as fasync;
1665    use fuchsia_sync::Mutex;
1666    use futures::channel::oneshot::channel;
1667    use futures::future::FutureExt;
1668    use futures::stream::FuturesUnordered;
1669    use futures::{StreamExt, join, pin_mut};
1670    use std::task::Poll;
1671    use std::time::Duration;
1672    use storage_device::DeviceHolder;
1673    use storage_device::fake_device::FakeDevice;
1674
1675    #[fuchsia::test]
1676    async fn test_simple() {
1677        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1678        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1679        let mut t = fs
1680            .clone()
1681            .new_transaction(lock_keys![], Options::default())
1682            .await
1683            .expect("new_transaction failed");
1684        t.add(1, Mutation::BeginFlush);
1685        assert!(!t.is_empty());
1686    }
1687
1688    #[fuchsia::test]
1689    async fn test_locks() {
1690        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1691        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1692        let (send1, recv1) = channel();
1693        let (send2, recv2) = channel();
1694        let (send3, recv3) = channel();
1695        let done = Mutex::new(false);
1696        let mut futures = FuturesUnordered::new();
1697        futures.push(
1698            async {
1699                let _t = fs
1700                    .clone()
1701                    .new_transaction(
1702                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1703                        Options::default(),
1704                    )
1705                    .await
1706                    .expect("new_transaction failed");
1707                send1.send(()).unwrap(); // Tell the next future to continue.
1708                send3.send(()).unwrap(); // Tell the last future to continue.
1709                recv2.await.unwrap();
1710                // This is a halting problem so all we can do is sleep.
1711                fasync::Timer::new(Duration::from_millis(100)).await;
1712                assert!(!*done.lock());
1713            }
1714            .boxed(),
1715        );
1716        futures.push(
1717            async {
1718                recv1.await.unwrap();
1719                // This should not block since it is a different key.
1720                let _t = fs
1721                    .clone()
1722                    .new_transaction(
1723                        lock_keys![LockKey::object_attribute(2, 2, 3)],
1724                        Options::default(),
1725                    )
1726                    .await
1727                    .expect("new_transaction failed");
1728                // Tell the first future to continue.
1729                send2.send(()).unwrap();
1730            }
1731            .boxed(),
1732        );
1733        futures.push(
1734            async {
1735                // This should block until the first future has completed.
1736                recv3.await.unwrap();
1737                let _t = fs
1738                    .clone()
1739                    .new_transaction(
1740                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1741                        Options::default(),
1742                    )
1743                    .await;
1744                *done.lock() = true;
1745            }
1746            .boxed(),
1747        );
1748        while let Some(()) = futures.next().await {}
1749    }
1750
1751    #[fuchsia::test]
1752    async fn test_read_lock_after_write_lock() {
1753        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1754        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1755        let (send1, recv1) = channel();
1756        let (send2, recv2) = channel();
1757        let done = Mutex::new(false);
1758        join!(
1759            async {
1760                let t = fs
1761                    .clone()
1762                    .new_transaction(
1763                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1764                        Options::default(),
1765                    )
1766                    .await
1767                    .expect("new_transaction failed");
1768                send1.send(()).unwrap(); // Tell the next future to continue.
1769                recv2.await.unwrap();
1770                t.commit().await.expect("commit failed");
1771                *done.lock() = true;
1772            },
1773            async {
1774                recv1.await.unwrap();
1775                // Reads should not be blocked until the transaction is committed.
1776                let _guard = fs
1777                    .lock_manager()
1778                    .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1779                    .await;
1780                // Tell the first future to continue.
1781                send2.send(()).unwrap();
1782                // It shouldn't proceed until we release our read lock, but it's a halting
1783                // problem, so sleep.
1784                fasync::Timer::new(Duration::from_millis(100)).await;
1785                assert!(!*done.lock());
1786            },
1787        );
1788    }
1789
1790    #[fuchsia::test]
1791    async fn test_write_lock_after_read_lock() {
1792        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1793        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1794        let (send1, recv1) = channel();
1795        let (send2, recv2) = channel();
1796        let done = Mutex::new(false);
1797        join!(
1798            async {
1799                // Reads should not be blocked until the transaction is committed.
1800                let _guard = fs
1801                    .lock_manager()
1802                    .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1803                    .await;
1804                // Tell the next future to continue and then wait.
1805                send1.send(()).unwrap();
1806                recv2.await.unwrap();
1807                // It shouldn't proceed until we release our read lock, but it's a halting
1808                // problem, so sleep.
1809                fasync::Timer::new(Duration::from_millis(100)).await;
1810                assert!(!*done.lock());
1811            },
1812            async {
1813                recv1.await.unwrap();
1814                let t = fs
1815                    .clone()
1816                    .new_transaction(
1817                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1818                        Options::default(),
1819                    )
1820                    .await
1821                    .expect("new_transaction failed");
1822                send2.send(()).unwrap(); // Tell the first future to continue;
1823                t.commit().await.expect("commit failed");
1824                *done.lock() = true;
1825            },
1826        );
1827    }
1828
1829    #[fuchsia::test]
1830    async fn test_drop_uncommitted_transaction() {
1831        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1832        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1833        let key = lock_keys![LockKey::object(1, 1)];
1834
1835        // Dropping while there's a reader.
1836        {
1837            let _write_lock = fs
1838                .clone()
1839                .new_transaction(key.clone(), Options::default())
1840                .await
1841                .expect("new_transaction failed");
1842            let _read_lock = fs.lock_manager().read_lock(key.clone()).await;
1843        }
1844        // Dropping while there's no reader.
1845        {
1846            let _write_lock = fs
1847                .clone()
1848                .new_transaction(key.clone(), Options::default())
1849                .await
1850                .expect("new_transaction failed");
1851        }
1852        // Make sure we can take the lock again (i.e. it was actually released).
1853        fs.clone()
1854            .new_transaction(key.clone(), Options::default())
1855            .await
1856            .expect("new_transaction failed");
1857    }
1858
1859    #[fuchsia::test]
1860    async fn test_drop_waiting_write_lock() {
1861        let manager = LockManager::new();
1862        let keys = lock_keys![LockKey::object(1, 1)];
1863        {
1864            let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1865            if let Poll::Ready(_) =
1866                futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1867            {
1868                assert!(false);
1869            }
1870        }
1871        let _ = manager.lock(keys, LockState::WriteLock).await;
1872    }
1873
1874    #[fuchsia::test]
1875    async fn test_write_lock_blocks_everything() {
1876        let manager = LockManager::new();
1877        let keys = lock_keys![LockKey::object(1, 1)];
1878        {
1879            let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1880            if let Poll::Ready(_) =
1881                futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1882            {
1883                assert!(false);
1884            }
1885            if let Poll::Ready(_) =
1886                futures::poll!(manager.lock(keys.clone(), LockState::ReadLock).boxed())
1887            {
1888                assert!(false);
1889            }
1890        }
1891        {
1892            let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1893        }
1894        {
1895            let _guard = manager.lock(keys, LockState::ReadLock).await;
1896        }
1897    }
1898
1899    #[fuchsia::test]
1900    async fn test_downgrade_locks() {
1901        let manager = LockManager::new();
1902        let keys = lock_keys![LockKey::object(1, 1)];
1903        let _guard = manager.txn_lock(keys.clone()).await;
1904        manager.commit_prepare_keys(&keys).await;
1905
1906        // Use FuturesUnordered so that we can check that the waker is woken.
1907        let mut read_lock: FuturesUnordered<_> =
1908            std::iter::once(manager.read_lock(keys.clone())).collect();
1909
1910        // Trying to acquire a read lock now should be blocked.
1911        assert!(futures::poll!(read_lock.next()).is_pending());
1912
1913        manager.downgrade_locks(&keys);
1914
1915        // After downgrading, it should be possible to take a read lock.
1916        assert!(futures::poll!(read_lock.next()).is_ready());
1917    }
1918
1919    #[fuchsia::test]
1920    async fn test_dropped_write_lock_wakes() {
1921        let manager = LockManager::new();
1922        let keys = lock_keys![LockKey::object(1, 1)];
1923        let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1924        let mut read_lock = FuturesUnordered::new();
1925        read_lock.push(manager.lock(keys.clone(), LockState::ReadLock));
1926
1927        {
1928            let write_lock = manager.lock(keys, LockState::WriteLock);
1929            pin_mut!(write_lock);
1930
1931            // The write lock should be blocked because of the read lock.
1932            assert!(futures::poll!(write_lock).is_pending());
1933
1934            // Another read lock should be blocked because of the write lock.
1935            assert!(futures::poll!(read_lock.next()).is_pending());
1936        }
1937
1938        // Dropping the write lock should allow the read lock to proceed.
1939        assert!(futures::poll!(read_lock.next()).is_ready());
1940    }
1941
1942    #[fuchsia::test]
1943    async fn test_drop_upgrade() {
1944        let manager = LockManager::new();
1945        let keys = lock_keys![LockKey::object(1, 1)];
1946        let _guard = manager.lock(keys.clone(), LockState::Locked).await;
1947
1948        {
1949            let commit_prepare = manager.commit_prepare_keys(&keys);
1950            pin_mut!(commit_prepare);
1951            let _read_guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1952            assert!(futures::poll!(commit_prepare).is_pending());
1953
1954            // Now we test dropping read_guard which should wake commit_prepare and
1955            // then dropping commit_prepare.
1956        }
1957
1958        // We should be able to still commit_prepare.
1959        manager.commit_prepare_keys(&keys).await;
1960    }
1961
1962    #[fasync::run_singlethreaded(test)]
1963    async fn test_woken_upgrade_blocks_reads() {
1964        let manager = LockManager::new();
1965        let keys = lock_keys![LockKey::object(1, 1)];
1966        // Start with a transaction lock.
1967        let guard = manager.lock(keys.clone(), LockState::Locked).await;
1968
1969        // Take a read lock.
1970        let read1 = manager.lock(keys.clone(), LockState::ReadLock).await;
1971
1972        // Try and upgrade the transaction lock, which should not be possible because of the read.
1973        let commit_prepare = manager.commit_prepare_keys(&keys);
1974        pin_mut!(commit_prepare);
1975        assert!(futures::poll!(commit_prepare.as_mut()).is_pending());
1976
1977        // Taking another read should also be blocked.
1978        let read2 = manager.lock(keys.clone(), LockState::ReadLock);
1979        pin_mut!(read2);
1980        assert!(futures::poll!(read2.as_mut()).is_pending());
1981
1982        // Drop the first read and the upgrade should complete.
1983        std::mem::drop(read1);
1984        assert!(futures::poll!(commit_prepare).is_ready());
1985
1986        // But the second read should still be blocked.
1987        assert!(futures::poll!(read2.as_mut()).is_pending());
1988
1989        // If we drop the write lock now, the read should be unblocked.
1990        std::mem::drop(guard);
1991        assert!(futures::poll!(read2).is_ready());
1992    }
1993
1994    static LOCK_KEY_1: LockKey = LockKey::flush(1);
1995    static LOCK_KEY_2: LockKey = LockKey::flush(2);
1996    static LOCK_KEY_3: LockKey = LockKey::flush(3);
1997
1998    // The keys, storage method, and capacity must all match.
1999    fn assert_lock_keys_equal(value: &LockKeys, expected: &LockKeys) {
2000        match (value, expected) {
2001            (LockKeys::None, LockKeys::None) => {}
2002            (LockKeys::Inline(key1), LockKeys::Inline(key2)) => {
2003                if key1 != key2 {
2004                    panic!("{key1:?} != {key2:?}");
2005                }
2006            }
2007            (LockKeys::Vec(vec1), LockKeys::Vec(vec2)) => {
2008                if vec1 != vec2 {
2009                    panic!("{vec1:?} != {vec2:?}");
2010                }
2011                if vec1.capacity() != vec2.capacity() {
2012                    panic!(
2013                        "LockKeys have different capacity: {} != {}",
2014                        vec1.capacity(),
2015                        vec2.capacity()
2016                    );
2017                }
2018            }
2019            (_, _) => panic!("{value:?} != {expected:?}"),
2020        }
2021    }
2022
2023    // Only the keys must match. Storage method and capacity don't matter.
2024    fn assert_lock_keys_equivalent(value: &LockKeys, expected: &LockKeys) {
2025        let value: Vec<_> = value.iter().collect();
2026        let expected: Vec<_> = expected.iter().collect();
2027        assert_eq!(value, expected);
2028    }
2029
2030    #[test]
2031    fn test_lock_keys_macro() {
2032        assert_lock_keys_equal(&lock_keys![], &LockKeys::None);
2033        assert_lock_keys_equal(&lock_keys![LOCK_KEY_1], &LockKeys::Inline(LOCK_KEY_1));
2034        assert_lock_keys_equal(
2035            &lock_keys![LOCK_KEY_1, LOCK_KEY_2],
2036            &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]),
2037        );
2038    }
2039
2040    #[test]
2041    fn test_lock_keys_with_capacity() {
2042        assert_lock_keys_equal(&LockKeys::with_capacity(0), &LockKeys::None);
2043        assert_lock_keys_equal(&LockKeys::with_capacity(1), &LockKeys::None);
2044        assert_lock_keys_equal(&LockKeys::with_capacity(2), &LockKeys::Vec(Vec::with_capacity(2)));
2045    }
2046
2047    #[test]
2048    fn test_lock_keys_len() {
2049        assert_eq!(lock_keys![].len(), 0);
2050        assert_eq!(lock_keys![LOCK_KEY_1].len(), 1);
2051        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].len(), 2);
2052    }
2053
2054    #[test]
2055    fn test_lock_keys_contains() {
2056        assert_eq!(lock_keys![].contains(&LOCK_KEY_1), false);
2057        assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_1), true);
2058        assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_2), false);
2059        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_1), true);
2060        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_2), true);
2061        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_3), false);
2062    }
2063
2064    #[test]
2065    fn test_lock_keys_push() {
2066        let mut keys = lock_keys![];
2067        keys.push(LOCK_KEY_1);
2068        assert_lock_keys_equal(&keys, &LockKeys::Inline(LOCK_KEY_1));
2069        keys.push(LOCK_KEY_2);
2070        assert_lock_keys_equal(&keys, &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]));
2071        keys.push(LOCK_KEY_3);
2072        assert_lock_keys_equivalent(
2073            &keys,
2074            &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2, LOCK_KEY_3]),
2075        );
2076    }
2077
2078    #[test]
2079    fn test_lock_keys_sort_unstable() {
2080        let mut keys = lock_keys![];
2081        keys.sort_unstable();
2082        assert_lock_keys_equal(&keys, &lock_keys![]);
2083
2084        let mut keys = lock_keys![LOCK_KEY_1];
2085        keys.sort_unstable();
2086        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2087
2088        let mut keys = lock_keys![LOCK_KEY_2, LOCK_KEY_1];
2089        keys.sort_unstable();
2090        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2091    }
2092
2093    #[test]
2094    fn test_lock_keys_dedup() {
2095        let mut keys = lock_keys![];
2096        keys.dedup();
2097        assert_lock_keys_equal(&keys, &lock_keys![]);
2098
2099        let mut keys = lock_keys![LOCK_KEY_1];
2100        keys.dedup();
2101        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2102
2103        let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_1];
2104        keys.dedup();
2105        assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2106    }
2107
2108    #[test]
2109    fn test_lock_keys_truncate() {
2110        let mut keys = lock_keys![];
2111        keys.truncate(5);
2112        assert_lock_keys_equal(&keys, &lock_keys![]);
2113        keys.truncate(0);
2114        assert_lock_keys_equal(&keys, &lock_keys![]);
2115
2116        let mut keys = lock_keys![LOCK_KEY_1];
2117        keys.truncate(5);
2118        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2119        keys.truncate(0);
2120        assert_lock_keys_equal(&keys, &lock_keys![]);
2121
2122        let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_2];
2123        keys.truncate(5);
2124        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2125        keys.truncate(1);
2126        // Although there's only 1 key after truncate the key is not stored inline.
2127        assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2128    }
2129
2130    #[test]
2131    fn test_lock_keys_iter() {
2132        assert_eq!(lock_keys![].iter().collect::<Vec<_>>(), Vec::<&LockKey>::new());
2133
2134        assert_eq!(lock_keys![LOCK_KEY_1].iter().collect::<Vec<_>>(), vec![&LOCK_KEY_1]);
2135
2136        assert_eq!(
2137            lock_keys![LOCK_KEY_1, LOCK_KEY_2].iter().collect::<Vec<_>>(),
2138            vec![&LOCK_KEY_1, &LOCK_KEY_2]
2139        );
2140    }
2141}