fxfs/object_store/
transaction.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::Checksum;
6use crate::filesystem::{FxFilesystem, TxnGuard};
7use crate::log::*;
8use crate::lsm_tree::types::Item;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::AttributeKey;
11use crate::object_store::allocator::{AllocatorItem, Reservation};
12use crate::object_store::object_manager::{ObjectManager, reserved_space_from_journal_usage};
13use crate::object_store::object_record::{
14    FxfsKey, FxfsKeyV40, FxfsKeyV49, ObjectItem, ObjectItemV40, ObjectItemV41, ObjectItemV43,
15    ObjectItemV46, ObjectItemV47, ObjectItemV49, ObjectKey, ObjectKeyData, ObjectValue,
16    ProjectProperty,
17};
18use crate::serialized_types::{Migrate, Versioned, migrate_nodefault, migrate_to_version};
19use anyhow::Error;
20use either::{Either, Left, Right};
21use fprint::TypeFingerprint;
22use fuchsia_sync::Mutex;
23use futures::future::poll_fn;
24use futures::pin_mut;
25use rustc_hash::FxHashMap as HashMap;
26use scopeguard::ScopeGuard;
27use serde::{Deserialize, Serialize};
28use std::cell::UnsafeCell;
29use std::cmp::Ordering;
30use std::collections::BTreeSet;
31use std::collections::hash_map::Entry;
32use std::marker::PhantomPinned;
33use std::ops::{Deref, DerefMut, Range};
34use std::sync::Arc;
35use std::task::{Poll, Waker};
36use std::{fmt, mem};
37
38/// This allows for special handling of certain transactions such as deletes and the
39/// extension of Journal extents. For most other use cases it is appropriate to use
40/// `default()` here.
41#[derive(Clone, Copy, Default)]
42pub struct Options<'a> {
43    /// If true, don't check for low journal space.  This should be true for any transactions that
44    /// might alleviate journal space (i.e. compaction).
45    pub skip_journal_checks: bool,
46
47    /// If true, borrow metadata space from the metadata reservation.  This setting should be set to
48    /// true for any transaction that will either not affect space usage after compaction
49    /// (e.g. setting attributes), or reduce space usage (e.g. unlinking).  Otherwise, a transaction
50    /// might fail with an out-of-space error.
51    pub borrow_metadata_space: bool,
52
53    /// If specified, a reservation to be used with the transaction.  If not set, any allocations
54    /// that are part of this transaction will have to take their chances, and will fail if there is
55    /// no free space.  The intention is that this should be used for things like the journal which
56    /// require guaranteed space.
57    pub allocator_reservation: Option<&'a Reservation>,
58
59    /// An existing transaction guard to be used.
60    pub txn_guard: Option<&'a TxnGuard<'a>>,
61}
62
63// This is the amount of space that we reserve for metadata when we are creating a new transaction.
64// A transaction should not take more than this.  This is expressed in terms of space occupied in
65// the journal; transactions must not take up more space in the journal than the number below.  The
66// amount chosen here must be large enough for the maximum possible transaction that can be created,
67// so transactions always need to be bounded which might involve splitting an operation up into
68// smaller transactions.
69pub const TRANSACTION_MAX_JOURNAL_USAGE: u64 = 24_576;
70pub const TRANSACTION_METADATA_MAX_AMOUNT: u64 =
71    reserved_space_from_journal_usage(TRANSACTION_MAX_JOURNAL_USAGE);
72
73#[must_use]
74pub struct TransactionLocks<'a>(pub WriteGuard<'a>);
75
76/// The journal consists of these records which will be replayed at mount time.  Within a
77/// transaction, these are stored as a set which allows some mutations to be deduplicated and found
78/// (and we require custom comparison functions below).  For example, we need to be able to find
79/// object size changes.
80pub type Mutation = MutationV49;
81
82#[derive(
83    Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned,
84)]
85#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
86pub enum MutationV49 {
87    ObjectStore(ObjectStoreMutationV49),
88    EncryptedObjectStore(Box<[u8]>),
89    Allocator(AllocatorMutationV32),
90    // Indicates the beginning of a flush.  This would typically involve sealing a tree.
91    BeginFlush,
92    // Indicates the end of a flush.  This would typically involve replacing the immutable layers
93    // with compacted ones.
94    EndFlush,
95    // Volume has been deleted.  Requires we remove it from the set of managed ObjectStore.
96    DeleteVolume,
97    UpdateBorrowed(u64),
98    UpdateMutationsKey(UpdateMutationsKeyV49),
99    CreateInternalDir(u64),
100}
101
102#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
103#[migrate_to_version(MutationV49)]
104pub enum MutationV47 {
105    ObjectStore(ObjectStoreMutationV47),
106    EncryptedObjectStore(Box<[u8]>),
107    Allocator(AllocatorMutationV32),
108    BeginFlush,
109    EndFlush,
110    DeleteVolume,
111    UpdateBorrowed(u64),
112    UpdateMutationsKey(UpdateMutationsKeyV40),
113    CreateInternalDir(u64),
114}
115
116#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
117#[migrate_to_version(MutationV47)]
118pub enum MutationV46 {
119    ObjectStore(ObjectStoreMutationV46),
120    EncryptedObjectStore(Box<[u8]>),
121    Allocator(AllocatorMutationV32),
122    BeginFlush,
123    EndFlush,
124    DeleteVolume,
125    UpdateBorrowed(u64),
126    UpdateMutationsKey(UpdateMutationsKeyV40),
127    CreateInternalDir(u64),
128}
129
130#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
131#[migrate_to_version(MutationV46)]
132pub enum MutationV43 {
133    ObjectStore(ObjectStoreMutationV43),
134    EncryptedObjectStore(Box<[u8]>),
135    Allocator(AllocatorMutationV32),
136    BeginFlush,
137    EndFlush,
138    DeleteVolume,
139    UpdateBorrowed(u64),
140    UpdateMutationsKey(UpdateMutationsKeyV40),
141    CreateInternalDir(u64),
142}
143
144#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
145#[migrate_to_version(MutationV43)]
146pub enum MutationV41 {
147    ObjectStore(ObjectStoreMutationV41),
148    EncryptedObjectStore(Box<[u8]>),
149    Allocator(AllocatorMutationV32),
150    BeginFlush,
151    EndFlush,
152    DeleteVolume,
153    UpdateBorrowed(u64),
154    UpdateMutationsKey(UpdateMutationsKeyV40),
155    CreateInternalDir(u64),
156}
157
158#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
159#[migrate_to_version(MutationV41)]
160pub enum MutationV40 {
161    ObjectStore(ObjectStoreMutationV40),
162    EncryptedObjectStore(Box<[u8]>),
163    Allocator(AllocatorMutationV32),
164    BeginFlush,
165    EndFlush,
166    DeleteVolume,
167    UpdateBorrowed(u64),
168    UpdateMutationsKey(UpdateMutationsKeyV40),
169    CreateInternalDir(u64),
170}
171
172impl Mutation {
173    pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
174        Mutation::ObjectStore(ObjectStoreMutation {
175            item: Item::new(key, value),
176            op: Operation::Insert,
177        })
178    }
179
180    pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
181        Mutation::ObjectStore(ObjectStoreMutation {
182            item: Item::new(key, value),
183            op: Operation::ReplaceOrInsert,
184        })
185    }
186
187    pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
188        Mutation::ObjectStore(ObjectStoreMutation {
189            item: Item::new(key, value),
190            op: Operation::Merge,
191        })
192    }
193
194    pub fn update_mutations_key(key: FxfsKey) -> Self {
195        Mutation::UpdateMutationsKey(key.into())
196    }
197}
198
199// We have custom comparison functions for mutations that just use the key, rather than the key and
200// value that would be used by default so that we can deduplicate and find mutations (see
201// get_object_mutation below).
202pub type ObjectStoreMutation = ObjectStoreMutationV49;
203
204#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
205#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
206pub struct ObjectStoreMutationV49 {
207    pub item: ObjectItemV49,
208    pub op: OperationV32,
209}
210
211#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
212#[migrate_to_version(ObjectStoreMutationV49)]
213#[migrate_nodefault]
214pub struct ObjectStoreMutationV47 {
215    pub item: ObjectItemV47,
216    pub op: OperationV32,
217}
218
219#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
220#[migrate_to_version(ObjectStoreMutationV47)]
221#[migrate_nodefault]
222pub struct ObjectStoreMutationV46 {
223    pub item: ObjectItemV46,
224    pub op: OperationV32,
225}
226
227#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
228#[migrate_to_version(ObjectStoreMutationV46)]
229#[migrate_nodefault]
230pub struct ObjectStoreMutationV43 {
231    pub item: ObjectItemV43,
232    pub op: OperationV32,
233}
234
235#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
236#[migrate_to_version(ObjectStoreMutationV43)]
237#[migrate_nodefault]
238pub struct ObjectStoreMutationV41 {
239    pub item: ObjectItemV41,
240    pub op: OperationV32,
241}
242
243#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
244#[migrate_nodefault]
245#[migrate_to_version(ObjectStoreMutationV41)]
246pub struct ObjectStoreMutationV40 {
247    pub item: ObjectItemV40,
248    pub op: OperationV32,
249}
250
251/// The different LSM tree operations that can be performed as part of a mutation.
252pub type Operation = OperationV32;
253
254#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
255#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
256pub enum OperationV32 {
257    Insert,
258    ReplaceOrInsert,
259    Merge,
260}
261
262impl Ord for ObjectStoreMutation {
263    fn cmp(&self, other: &Self) -> Ordering {
264        self.item.key.cmp(&other.item.key)
265    }
266}
267
268impl PartialOrd for ObjectStoreMutation {
269    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
270        Some(self.cmp(other))
271    }
272}
273
274impl PartialEq for ObjectStoreMutation {
275    fn eq(&self, other: &Self) -> bool {
276        self.item.key.eq(&other.item.key)
277    }
278}
279
280impl Eq for ObjectStoreMutation {}
281
282impl Ord for AllocatorItem {
283    fn cmp(&self, other: &Self) -> Ordering {
284        self.key.cmp(&other.key)
285    }
286}
287
288impl PartialOrd for AllocatorItem {
289    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
290        Some(self.cmp(other))
291    }
292}
293
294/// Same as std::ops::Range but with Ord and PartialOrd support, sorted first by start of the range,
295/// then by the end.
296pub type DeviceRange = DeviceRangeV32;
297
298#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypeFingerprint)]
299#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
300pub struct DeviceRangeV32(pub Range<u64>);
301
302impl Deref for DeviceRange {
303    type Target = Range<u64>;
304
305    fn deref(&self) -> &Self::Target {
306        &self.0
307    }
308}
309
310impl DerefMut for DeviceRange {
311    fn deref_mut(&mut self) -> &mut Self::Target {
312        &mut self.0
313    }
314}
315
316impl From<Range<u64>> for DeviceRange {
317    fn from(range: Range<u64>) -> Self {
318        Self(range)
319    }
320}
321
322impl Into<Range<u64>> for DeviceRange {
323    fn into(self) -> Range<u64> {
324        self.0
325    }
326}
327
328impl Ord for DeviceRange {
329    fn cmp(&self, other: &Self) -> Ordering {
330        self.start.cmp(&other.start).then(self.end.cmp(&other.end))
331    }
332}
333
334impl PartialOrd for DeviceRange {
335    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
336        Some(self.cmp(other))
337    }
338}
339
340pub type AllocatorMutation = AllocatorMutationV32;
341
342#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint)]
343#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
344pub enum AllocatorMutationV32 {
345    Allocate {
346        device_range: DeviceRangeV32,
347        owner_object_id: u64,
348    },
349    Deallocate {
350        device_range: DeviceRangeV32,
351        owner_object_id: u64,
352    },
353    SetLimit {
354        owner_object_id: u64,
355        bytes: u64,
356    },
357    /// Marks all extents with a given owner_object_id for deletion.
358    /// Used to free space allocated to encrypted ObjectStore where we may not have the key.
359    /// Note that the actual deletion time is undefined so this should never be used where an
360    /// ObjectStore is still in use due to a high risk of corruption. Similarly, owner_object_id
361    /// should never be reused for the same reasons.
362    MarkForDeletion(u64),
363}
364
365pub type UpdateMutationsKey = UpdateMutationsKeyV49;
366
367#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
368pub struct UpdateMutationsKeyV49(pub FxfsKeyV49);
369
370#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
371#[migrate_to_version(UpdateMutationsKeyV49)]
372pub struct UpdateMutationsKeyV40(pub FxfsKeyV40);
373
374impl From<UpdateMutationsKey> for FxfsKey {
375    fn from(outer: UpdateMutationsKey) -> Self {
376        outer.0
377    }
378}
379
380impl From<FxfsKey> for UpdateMutationsKey {
381    fn from(inner: FxfsKey) -> Self {
382        Self(inner)
383    }
384}
385
386#[cfg(fuzz)]
387impl<'a> arbitrary::Arbitrary<'a> for UpdateMutationsKey {
388    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
389        Ok(UpdateMutationsKey::from(FxfsKey::arbitrary(u).unwrap()))
390    }
391}
392
393impl Ord for UpdateMutationsKey {
394    fn cmp(&self, other: &Self) -> Ordering {
395        (self as *const UpdateMutationsKey).cmp(&(other as *const _))
396    }
397}
398
399impl PartialOrd for UpdateMutationsKey {
400    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
401        Some(self.cmp(other))
402    }
403}
404
405impl Eq for UpdateMutationsKey {}
406
407impl PartialEq for UpdateMutationsKey {
408    fn eq(&self, other: &Self) -> bool {
409        std::ptr::eq(self, other)
410    }
411}
412
413/// When creating a transaction, locks typically need to be held to prevent two or more writers
414/// trying to make conflicting mutations at the same time.  LockKeys are used for this.
415/// NOTE: Ordering is important here!  The lock manager sorts the list of locks in a transaction
416/// to acquire them in a consistent order, but there are special cases for the Filesystem lock and
417/// the Flush lock.
418/// The Filesystem lock is taken by every transaction and is done so first, as part of the TxnGuard.
419/// The Flush lock is taken when we flush an LSM tree (e.g. an object store), and is held for
420/// several transactions.  As such, it must come first in the lock acquisition ordering, so that
421/// other transactions using the Flush lock have the same ordering as in flushing.
422#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Copy)]
423pub enum LockKey {
424    /// Locks the entire filesystem.
425    Filesystem,
426
427    /// Used to lock flushing an object.
428    Flush {
429        object_id: u64,
430    },
431
432    /// Used to lock changes to a particular object attribute (e.g. writes).
433    ObjectAttribute {
434        store_object_id: u64,
435        object_id: u64,
436        attribute_id: u64,
437    },
438
439    /// Used to lock changes to a particular object (e.g. adding a child to a directory).
440    Object {
441        store_object_id: u64,
442        object_id: u64,
443    },
444
445    ProjectId {
446        store_object_id: u64,
447        project_id: u64,
448    },
449
450    /// Used to lock any truncate operations for a file.
451    Truncate {
452        store_object_id: u64,
453        object_id: u64,
454    },
455}
456
457impl LockKey {
458    pub const fn object_attribute(store_object_id: u64, object_id: u64, attribute_id: u64) -> Self {
459        LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
460    }
461
462    pub const fn object(store_object_id: u64, object_id: u64) -> Self {
463        LockKey::Object { store_object_id, object_id }
464    }
465
466    pub const fn flush(object_id: u64) -> Self {
467        LockKey::Flush { object_id }
468    }
469
470    pub const fn truncate(store_object_id: u64, object_id: u64) -> Self {
471        LockKey::Truncate { store_object_id, object_id }
472    }
473}
474
475/// A container for holding `LockKey` objects. Can store a single `LockKey` inline.
476#[derive(Clone, Debug)]
477pub enum LockKeys {
478    None,
479    Inline(LockKey),
480    Vec(Vec<LockKey>),
481}
482
483impl LockKeys {
484    pub fn with_capacity(capacity: usize) -> Self {
485        if capacity > 1 { LockKeys::Vec(Vec::with_capacity(capacity)) } else { LockKeys::None }
486    }
487
488    pub fn push(&mut self, key: LockKey) {
489        match self {
490            Self::None => *self = LockKeys::Inline(key),
491            Self::Inline(inline) => {
492                *self = LockKeys::Vec(vec![*inline, key]);
493            }
494            Self::Vec(vec) => vec.push(key),
495        }
496    }
497
498    pub fn truncate(&mut self, len: usize) {
499        match self {
500            Self::None => {}
501            Self::Inline(_) => {
502                if len == 0 {
503                    *self = Self::None;
504                }
505            }
506            Self::Vec(vec) => vec.truncate(len),
507        }
508    }
509
510    fn len(&self) -> usize {
511        match self {
512            Self::None => 0,
513            Self::Inline(_) => 1,
514            Self::Vec(vec) => vec.len(),
515        }
516    }
517
518    fn contains(&self, key: &LockKey) -> bool {
519        match self {
520            Self::None => false,
521            Self::Inline(single) => single == key,
522            Self::Vec(vec) => vec.contains(key),
523        }
524    }
525
526    fn sort_unstable(&mut self) {
527        match self {
528            Self::Vec(vec) => vec.sort_unstable(),
529            _ => {}
530        }
531    }
532
533    fn dedup(&mut self) {
534        match self {
535            Self::Vec(vec) => vec.dedup(),
536            _ => {}
537        }
538    }
539
540    fn iter(&self) -> LockKeysIter<'_> {
541        match self {
542            LockKeys::None => LockKeysIter::None,
543            LockKeys::Inline(key) => LockKeysIter::Inline(key),
544            LockKeys::Vec(keys) => LockKeysIter::Vec(keys.into_iter()),
545        }
546    }
547}
548
549enum LockKeysIter<'a> {
550    None,
551    Inline(&'a LockKey),
552    Vec(<&'a Vec<LockKey> as IntoIterator>::IntoIter),
553}
554
555impl<'a> Iterator for LockKeysIter<'a> {
556    type Item = &'a LockKey;
557    fn next(&mut self) -> Option<Self::Item> {
558        match self {
559            Self::None => None,
560            Self::Inline(inline) => {
561                let next = *inline;
562                *self = Self::None;
563                Some(next)
564            }
565            Self::Vec(vec) => vec.next(),
566        }
567    }
568}
569
570impl Default for LockKeys {
571    fn default() -> Self {
572        LockKeys::None
573    }
574}
575
576#[macro_export]
577macro_rules! lock_keys {
578    () => {
579        $crate::object_store::transaction::LockKeys::None
580    };
581    ($lock_key:expr $(,)?) => {
582        $crate::object_store::transaction::LockKeys::Inline($lock_key)
583    };
584    ($($lock_keys:expr),+ $(,)?) => {
585        $crate::object_store::transaction::LockKeys::Vec(vec![$($lock_keys),+])
586    };
587}
588pub use lock_keys;
589
590/// Mutations in a transaction can be associated with an object so that when mutations are applied,
591/// updates can be applied to in-memory structures. For example, we cache object sizes, so when a
592/// size change is applied, we can update the cached object size.
593pub trait AssociatedObject: Send + Sync {
594    fn will_apply_mutation(&self, _mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
595    }
596}
597
598pub enum AssocObj<'a> {
599    None,
600    Borrowed(&'a dyn AssociatedObject),
601    Owned(Box<dyn AssociatedObject>),
602}
603
604impl AssocObj<'_> {
605    pub fn map<R, F: FnOnce(&dyn AssociatedObject) -> R>(&self, f: F) -> Option<R> {
606        match self {
607            AssocObj::None => None,
608            AssocObj::Borrowed(b) => Some(f(*b)),
609            AssocObj::Owned(o) => Some(f(o.as_ref())),
610        }
611    }
612}
613
614pub struct TxnMutation<'a> {
615    // This, at time of writing, is either the object ID of an object store, or the object ID of the
616    // allocator.  In the case of an object mutation, there's another object ID in the mutation
617    // record that would be for the object actually being changed.
618    pub object_id: u64,
619
620    // The actual mutation.  This gets serialized to the journal.
621    pub mutation: Mutation,
622
623    // An optional associated object for the mutation.  During replay, there will always be no
624    // associated object.
625    pub associated_object: AssocObj<'a>,
626}
627
628// We store TxnMutation in a set, and for that, we only use object_id and mutation and not the
629// associated object or checksum.
630impl Ord for TxnMutation<'_> {
631    fn cmp(&self, other: &Self) -> Ordering {
632        self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
633    }
634}
635
636impl PartialOrd for TxnMutation<'_> {
637    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
638        Some(self.cmp(other))
639    }
640}
641
642impl PartialEq for TxnMutation<'_> {
643    fn eq(&self, other: &Self) -> bool {
644        self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
645    }
646}
647
648impl Eq for TxnMutation<'_> {}
649
650impl std::fmt::Debug for TxnMutation<'_> {
651    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
652        f.debug_struct("TxnMutation")
653            .field("object_id", &self.object_id)
654            .field("mutation", &self.mutation)
655            .finish()
656    }
657}
658
659pub enum MetadataReservation {
660    // The state after a transaction has been dropped.
661    None,
662
663    // Metadata space for this transaction is being borrowed from ObjectManager's metadata
664    // reservation.
665    Borrowed,
666
667    // A metadata reservation was made when the transaction was created.
668    Reservation(Reservation),
669
670    // The metadata space is being _held_ within `allocator_reservation`.
671    Hold(u64),
672}
673
674/// A transaction groups mutation records to be committed as a group.
675pub struct Transaction<'a> {
676    txn_guard: TxnGuard<'a>,
677
678    // The mutations that make up this transaction.
679    mutations: BTreeSet<TxnMutation<'a>>,
680
681    // The locks that this transaction currently holds.
682    txn_locks: LockKeys,
683
684    /// If set, an allocator reservation that should be used for allocations.
685    pub allocator_reservation: Option<&'a Reservation>,
686
687    /// The reservation for the metadata for this transaction.
688    pub metadata_reservation: MetadataReservation,
689
690    // Keep track of objects explicitly created by this transaction. No locks are required for them.
691    // Addressed by (owner_object_id, object_id).
692    new_objects: BTreeSet<(u64, u64)>,
693
694    /// Any data checksums which should be evaluated when replaying this transaction.
695    checksums: Vec<(Range<u64>, Vec<Checksum>, bool)>,
696
697    /// Set if this transaction contains data (i.e. includes any extent mutations).
698    includes_write: bool,
699}
700
701impl<'a> Transaction<'a> {
702    /// Creates a new transaction.  `txn_locks` are read locks that can be upgraded to write locks
703    /// at commit time.
704    pub async fn new(
705        txn_guard: TxnGuard<'a>,
706        options: Options<'a>,
707        txn_locks: LockKeys,
708    ) -> Result<Transaction<'a>, Error> {
709        txn_guard.fs().add_transaction(options.skip_journal_checks).await;
710        let fs = txn_guard.fs().clone();
711        let guard = scopeguard::guard((), |_| fs.sub_transaction());
712        let (metadata_reservation, allocator_reservation, hold) =
713            txn_guard.fs().reservation_for_transaction(options).await?;
714
715        let txn_locks = {
716            let lock_manager = txn_guard.fs().lock_manager();
717            let mut write_guard = lock_manager.txn_lock(txn_locks).await;
718            std::mem::take(&mut write_guard.0.lock_keys)
719        };
720        let mut transaction = Transaction {
721            txn_guard,
722            mutations: BTreeSet::new(),
723            txn_locks,
724            allocator_reservation: None,
725            metadata_reservation,
726            new_objects: BTreeSet::new(),
727            checksums: Vec::new(),
728            includes_write: false,
729        };
730
731        ScopeGuard::into_inner(guard);
732        hold.map(|h| h.forget()); // Transaction takes ownership from here on.
733        transaction.allocator_reservation = allocator_reservation;
734        Ok(transaction)
735    }
736
737    pub fn txn_guard(&self) -> &TxnGuard<'_> {
738        &self.txn_guard
739    }
740
741    pub fn mutations(&self) -> &BTreeSet<TxnMutation<'a>> {
742        &self.mutations
743    }
744
745    pub fn take_mutations(&mut self) -> BTreeSet<TxnMutation<'a>> {
746        self.new_objects.clear();
747        mem::take(&mut self.mutations)
748    }
749
750    /// Adds a mutation to this transaction.  If the mutation already exists, it is replaced and the
751    /// old mutation is returned.
752    pub fn add(&mut self, object_id: u64, mutation: Mutation) -> Option<Mutation> {
753        self.add_with_object(object_id, mutation, AssocObj::None)
754    }
755
756    /// Removes a mutation that matches `mutation`.
757    pub fn remove(&mut self, object_id: u64, mutation: Mutation) {
758        let txn_mutation = TxnMutation { object_id, mutation, associated_object: AssocObj::None };
759        if self.mutations.remove(&txn_mutation) {
760            if let Mutation::ObjectStore(ObjectStoreMutation {
761                item:
762                    ObjectItem {
763                        key: ObjectKey { object_id: new_object_id, data: ObjectKeyData::Object },
764                        ..
765                    },
766                op: Operation::Insert,
767            }) = txn_mutation.mutation
768            {
769                self.new_objects.remove(&(object_id, new_object_id));
770            }
771        }
772    }
773
774    /// Adds a mutation with an associated object. If the mutation already exists, it is replaced
775    /// and the old mutation is returned.
776    pub fn add_with_object(
777        &mut self,
778        object_id: u64,
779        mutation: Mutation,
780        associated_object: AssocObj<'a>,
781    ) -> Option<Mutation> {
782        assert!(object_id != INVALID_OBJECT_ID);
783        if let Mutation::ObjectStore(ObjectStoreMutation {
784            item:
785                Item {
786                    key:
787                        ObjectKey { data: ObjectKeyData::Attribute(_, AttributeKey::Extent(_)), .. },
788                    ..
789                },
790            ..
791        }) = &mutation
792        {
793            self.includes_write = true;
794        }
795        let txn_mutation = TxnMutation { object_id, mutation, associated_object };
796        self.verify_locks(&txn_mutation);
797        self.mutations.replace(txn_mutation).map(|m| m.mutation)
798    }
799
800    pub fn add_checksum(&mut self, range: Range<u64>, checksums: Vec<Checksum>, first_write: bool) {
801        self.checksums.push((range, checksums, first_write));
802    }
803
804    pub fn includes_write(&self) -> bool {
805        self.includes_write
806    }
807
808    pub fn checksums(&self) -> &[(Range<u64>, Vec<Checksum>, bool)] {
809        &self.checksums
810    }
811
812    pub fn take_checksums(&mut self) -> Vec<(Range<u64>, Vec<Checksum>, bool)> {
813        std::mem::replace(&mut self.checksums, Vec::new())
814    }
815
816    fn verify_locks(&mut self, mutation: &TxnMutation<'_>) {
817        // It was considered to change the locks from Vec to BTreeSet since we'll now be searching
818        // through it, but given the small set that these locks usually comprise, it probably isn't
819        // worth it.
820        match mutation {
821            TxnMutation {
822                mutation:
823                    Mutation::ObjectStore {
824                        0: ObjectStoreMutation { item: ObjectItem { key, .. }, op },
825                    },
826                object_id: store_object_id,
827                ..
828            } => {
829                match &key.data {
830                    ObjectKeyData::Attribute(..) => {
831                        // TODO(https://fxbug.dev/42073914): Check lock requirements.
832                    }
833                    ObjectKeyData::Child { .. }
834                    | ObjectKeyData::EncryptedChild { .. }
835                    | ObjectKeyData::CasefoldChild { .. } => {
836                        let id = key.object_id;
837                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
838                            && !self.new_objects.contains(&(*store_object_id, id))
839                        {
840                            debug_assert!(
841                                false,
842                                "Not holding required lock for object {id} \
843                                in store {store_object_id}"
844                            );
845                            error!(
846                                "Not holding required lock for object {id} in store \
847                                {store_object_id}"
848                            )
849                        }
850                    }
851                    ObjectKeyData::GraveyardEntry { .. } => {
852                        // TODO(https://fxbug.dev/42073911): Check lock requirements.
853                    }
854                    ObjectKeyData::GraveyardAttributeEntry { .. } => {
855                        // TODO(https://fxbug.dev/122974): Check lock requirements.
856                    }
857                    ObjectKeyData::Keys => {
858                        let id = key.object_id;
859                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
860                            && !self.new_objects.contains(&(*store_object_id, id))
861                        {
862                            debug_assert!(
863                                false,
864                                "Not holding required lock for object {id} \
865                                in store {store_object_id}"
866                            );
867                            error!(
868                                "Not holding required lock for object {id} in store \
869                                {store_object_id}"
870                            )
871                        }
872                    }
873                    ObjectKeyData::Object => match op {
874                        // Insert implies the caller expects no object with which to race
875                        Operation::Insert => {
876                            self.new_objects.insert((*store_object_id, key.object_id));
877                        }
878                        Operation::Merge | Operation::ReplaceOrInsert => {
879                            let id = key.object_id;
880                            if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
881                                && !self.new_objects.contains(&(*store_object_id, id))
882                            {
883                                debug_assert!(
884                                    false,
885                                    "Not holding required lock for object {id} \
886                                    in store {store_object_id}"
887                                );
888                                error!(
889                                    "Not holding required lock for object {id} in store \
890                                    {store_object_id}"
891                                )
892                            }
893                        }
894                    },
895                    ObjectKeyData::Project { project_id, property: ProjectProperty::Limit } => {
896                        if !self.txn_locks.contains(&LockKey::ProjectId {
897                            store_object_id: *store_object_id,
898                            project_id: *project_id,
899                        }) {
900                            debug_assert!(
901                                false,
902                                "Not holding required lock for project limit id {project_id} \
903                                in store {store_object_id}"
904                            );
905                            error!(
906                                "Not holding required lock for project limit id {project_id} in \
907                                store {store_object_id}"
908                            )
909                        }
910                    }
911                    ObjectKeyData::Project { property: ProjectProperty::Usage, .. } => match op {
912                        Operation::Insert | Operation::ReplaceOrInsert => {
913                            panic!(
914                                "Project usage is all handled by merging deltas, no inserts or \
915                                replacements should be used"
916                            );
917                        }
918                        // Merges are all handled like atomic +/- and serialized by the tree locks.
919                        Operation::Merge => {}
920                    },
921                    ObjectKeyData::ExtendedAttribute { .. } => {
922                        let id = key.object_id;
923                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
924                            && !self.new_objects.contains(&(*store_object_id, id))
925                        {
926                            debug_assert!(
927                                false,
928                                "Not holding required lock for object {id} \
929                                in store {store_object_id} while mutating extended attribute"
930                            );
931                            error!(
932                                "Not holding required lock for object {id} in store \
933                                {store_object_id} while mutating extended attribute"
934                            )
935                        }
936                    }
937                }
938            }
939            TxnMutation { mutation: Mutation::DeleteVolume, object_id, .. } => {
940                if !self.txn_locks.contains(&LockKey::flush(*object_id)) {
941                    debug_assert!(false, "Not holding required lock for DeleteVolume");
942                    error!("Not holding required lock for DeleteVolume");
943                }
944            }
945            _ => {}
946        }
947    }
948
949    /// Returns true if this transaction has no mutations.
950    pub fn is_empty(&self) -> bool {
951        self.mutations.is_empty()
952    }
953
954    /// Searches for an existing object mutation within the transaction that has the given key and
955    /// returns it if found.
956    pub fn get_object_mutation(
957        &self,
958        store_object_id: u64,
959        key: ObjectKey,
960    ) -> Option<&ObjectStoreMutation> {
961        if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
962            self.mutations.get(&TxnMutation {
963                object_id: store_object_id,
964                mutation: Mutation::insert_object(key, ObjectValue::None),
965                associated_object: AssocObj::None,
966            })
967        {
968            Some(mutation)
969        } else {
970            None
971        }
972    }
973
974    /// Commits a transaction.  If successful, returns the journal offset of the transaction.
975    pub async fn commit(mut self) -> Result<u64, Error> {
976        debug!(txn:? = &self; "Commit");
977        self.txn_guard.fs().clone().commit_transaction(&mut self, &mut |_| {}).await
978    }
979
980    /// Commits and then runs the callback whilst locks are held.  The callback accepts a single
981    /// parameter which is the journal offset of the transaction.
982    pub async fn commit_with_callback<R: Send>(
983        mut self,
984        f: impl FnOnce(u64) -> R + Send,
985    ) -> Result<R, Error> {
986        debug!(txn:? = &self; "Commit");
987        // It's not possible to pass an FnOnce via a trait without boxing it, but we don't want to
988        // do that (for performance reasons), hence the reason for the following.
989        let mut f = Some(f);
990        let mut result = None;
991        self.txn_guard
992            .fs()
993            .clone()
994            .commit_transaction(&mut self, &mut |offset| {
995                result = Some(f.take().unwrap()(offset));
996            })
997            .await?;
998        Ok(result.unwrap())
999    }
1000
1001    /// Commits the transaction, but allows the transaction to be used again.  The locks are not
1002    /// dropped (but transaction locks will get downgraded to read locks).
1003    pub async fn commit_and_continue(&mut self) -> Result<(), Error> {
1004        debug!(txn:? = self; "Commit");
1005        self.txn_guard.fs().clone().commit_transaction(self, &mut |_| {}).await?;
1006        assert!(self.mutations.is_empty());
1007        self.txn_guard.fs().lock_manager().downgrade_locks(&self.txn_locks);
1008        Ok(())
1009    }
1010}
1011
1012impl Drop for Transaction<'_> {
1013    fn drop(&mut self) {
1014        // Call the filesystem implementation of drop_transaction which should, as a minimum, call
1015        // LockManager's drop_transaction to ensure the locks are released.
1016        debug!(txn:? = &self; "Drop");
1017        self.txn_guard.fs().clone().drop_transaction(self);
1018    }
1019}
1020
1021impl std::fmt::Debug for Transaction<'_> {
1022    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1023        f.debug_struct("Transaction")
1024            .field("mutations", &self.mutations)
1025            .field("txn_locks", &self.txn_locks)
1026            .field("reservation", &self.allocator_reservation)
1027            .finish()
1028    }
1029}
1030
1031pub enum BorrowedOrOwned<'a, T> {
1032    Borrowed(&'a T),
1033    Owned(T),
1034}
1035
1036impl<T> Deref for BorrowedOrOwned<'_, T> {
1037    type Target = T;
1038
1039    fn deref(&self) -> &Self::Target {
1040        match self {
1041            BorrowedOrOwned::Borrowed(b) => b,
1042            BorrowedOrOwned::Owned(o) => &o,
1043        }
1044    }
1045}
1046
1047impl<'a, T> From<&'a T> for BorrowedOrOwned<'a, T> {
1048    fn from(value: &'a T) -> Self {
1049        BorrowedOrOwned::Borrowed(value)
1050    }
1051}
1052
1053impl<T> From<T> for BorrowedOrOwned<'_, T> {
1054    fn from(value: T) -> Self {
1055        BorrowedOrOwned::Owned(value)
1056    }
1057}
1058
1059/// LockManager holds the locks that transactions might have taken.  A TransactionManager
1060/// implementation would typically have one of these.
1061///
1062/// Three different kinds of locks are supported.  There are read locks and write locks, which are
1063/// as one would expect.  The third kind of lock is a _transaction_ lock (which is also known as an
1064/// upgradeable read lock).  When first acquired, these block other writes (including other
1065/// transaction locks) but do not block reads.  When it is time to commit a transaction, these locks
1066/// are upgraded to full write locks (without ever dropping the lock) and then dropped after
1067/// committing (unless commit_and_continue is used).  This way, reads are only blocked for the
1068/// shortest possible time.  It follows that write locks should be used sparingly.  Locks are
1069/// granted in order with one exception: when a lock is in the initial _transaction_ lock state
1070/// (LockState::Locked), all read locks are allowed even if there are other tasks waiting for the
1071/// lock.  The reason for this is because we allow read locks to be taken by tasks that have taken a
1072/// _transaction_ lock (i.e. recursion is allowed).  In other cases, such as when a writer is
1073/// waiting and there are only readers, readers will queue up behind the writer.
1074///
1075/// To summarize:
1076///
1077/// +-------------------------+-----------------+----------------+------------------+
1078/// |                         | While read_lock | While txn_lock | While write_lock |
1079/// |                         | is held         | is held        | is held          |
1080/// +-------------------------+-----------------+----------------+------------------+
1081/// | Can acquire read_lock?  | true            | true           | false            |
1082/// +-------------------------+-----------------+----------------+------------------+
1083/// | Can acquire txn_lock?   | true            | false          | false            |
1084/// +-------------------------+-----------------+----------------+------------------+
1085/// | Can acquire write_lock? | false           | false          | false            |
1086/// +-------------------------+-----------------+----------------+------------------+
1087pub struct LockManager {
1088    locks: Mutex<Locks>,
1089}
1090
1091struct Locks {
1092    keys: HashMap<LockKey, LockEntry>,
1093}
1094
1095impl Locks {
1096    fn drop_lock(&mut self, key: LockKey, state: LockState) {
1097        if let Entry::Occupied(mut occupied) = self.keys.entry(key) {
1098            let entry = occupied.get_mut();
1099            let wake = match state {
1100                LockState::ReadLock => {
1101                    entry.read_count -= 1;
1102                    entry.read_count == 0
1103                }
1104                // drop_write_locks currently depends on us treating Locked and WriteLock the same.
1105                LockState::Locked | LockState::WriteLock => {
1106                    entry.state = LockState::ReadLock;
1107                    true
1108                }
1109            };
1110            if wake {
1111                // SAFETY: The lock in `LockManager::locks` is held.
1112                unsafe {
1113                    entry.wake();
1114                }
1115                if entry.can_remove() {
1116                    occupied.remove_entry();
1117                }
1118            }
1119        } else {
1120            unreachable!();
1121        }
1122    }
1123
1124    fn drop_read_locks(&mut self, lock_keys: LockKeys) {
1125        for lock in lock_keys.iter() {
1126            self.drop_lock(*lock, LockState::ReadLock);
1127        }
1128    }
1129
1130    fn drop_write_locks(&mut self, lock_keys: LockKeys) {
1131        for lock in lock_keys.iter() {
1132            // This is a bit hacky, but this works for locks in either the Locked or WriteLock
1133            // states.
1134            self.drop_lock(*lock, LockState::WriteLock);
1135        }
1136    }
1137
1138    // Downgrades locks from WriteLock to Locked.
1139    fn downgrade_locks(&mut self, lock_keys: &LockKeys) {
1140        for lock in lock_keys.iter() {
1141            // SAFETY: The lock in `LockManager::locks` is held.
1142            unsafe {
1143                self.keys.get_mut(lock).unwrap().downgrade_lock();
1144            }
1145        }
1146    }
1147}
1148
1149#[derive(Debug)]
1150struct LockEntry {
1151    // In the states that allow readers (ReadLock, Locked), this count can be non-zero
1152    // to indicate the number of active readers.
1153    read_count: u64,
1154
1155    // The state of the lock (see below).
1156    state: LockState,
1157
1158    // A doubly-linked list of wakers that should be woken when they have been granted the lock.
1159    // New wakers are usually chained on to tail, with the exception being the case where a lock in
1160    // state Locked is to be upgraded to WriteLock, but can't because there are readers.  It might
1161    // be possible to use intrusive-collections in the future.
1162    head: *const LockWaker,
1163    tail: *const LockWaker,
1164}
1165
1166unsafe impl Send for LockEntry {}
1167
1168// Represents a node in the waker list.  It is only safe to access the members wrapped by UnsafeCell
1169// when LockManager's `locks` member is locked.
1170struct LockWaker {
1171    // The next and previous pointers in the doubly-linked list.
1172    next: UnsafeCell<*const LockWaker>,
1173    prev: UnsafeCell<*const LockWaker>,
1174
1175    // Holds the lock key for this waker.  This is required so that we can find the associated
1176    // `LockEntry`.
1177    key: LockKey,
1178
1179    // The underlying waker that should be used to wake the task.
1180    waker: UnsafeCell<WakerState>,
1181
1182    // The target state for this waker.
1183    target_state: LockState,
1184
1185    // True if this is an upgrade.
1186    is_upgrade: bool,
1187
1188    // We need to be pinned because these form part of the linked list.
1189    _pin: PhantomPinned,
1190}
1191
1192enum WakerState {
1193    // This is the initial state before the waker has been first polled.
1194    Pending,
1195
1196    // Once polled, this contains the actual waker.
1197    Registered(Waker),
1198
1199    // The waker has been woken and has been granted the lock.
1200    Woken,
1201}
1202
1203impl WakerState {
1204    fn is_woken(&self) -> bool {
1205        matches!(self, WakerState::Woken)
1206    }
1207}
1208
1209unsafe impl Send for LockWaker {}
1210unsafe impl Sync for LockWaker {}
1211
1212impl LockWaker {
1213    // Waits for the waker to be woken.
1214    async fn wait(&self, manager: &LockManager) {
1215        // We must guard against the future being dropped.
1216        let waker_guard = scopeguard::guard((), |_| {
1217            let mut locks = manager.locks.lock();
1218            // SAFETY: We've acquired the lock.
1219            unsafe {
1220                if (*self.waker.get()).is_woken() {
1221                    // We were woken, but didn't actually run, so we must drop the lock.
1222                    if self.is_upgrade {
1223                        locks.keys.get_mut(&self.key).unwrap().downgrade_lock();
1224                    } else {
1225                        locks.drop_lock(self.key, self.target_state);
1226                    }
1227                } else {
1228                    // We haven't been woken but we've been dropped so we must remove ourself from
1229                    // the waker list.
1230                    locks.keys.get_mut(&self.key).unwrap().remove_waker(self);
1231                }
1232            }
1233        });
1234
1235        poll_fn(|cx| {
1236            let _locks = manager.locks.lock();
1237            // SAFETY: We've acquired the lock.
1238            unsafe {
1239                if (*self.waker.get()).is_woken() {
1240                    Poll::Ready(())
1241                } else {
1242                    *self.waker.get() = WakerState::Registered(cx.waker().clone());
1243                    Poll::Pending
1244                }
1245            }
1246        })
1247        .await;
1248
1249        ScopeGuard::into_inner(waker_guard);
1250    }
1251}
1252
1253#[derive(Copy, Clone, Debug, PartialEq)]
1254enum LockState {
1255    // In this state, there are only readers.
1256    ReadLock,
1257
1258    // This state is used for transactions to lock other writers (including other transactions), but
1259    // it still allows readers.
1260    Locked,
1261
1262    // A writer has exclusive access; all other readers and writers are blocked.
1263    WriteLock,
1264}
1265
1266impl LockManager {
1267    pub fn new() -> Self {
1268        LockManager { locks: Mutex::new(Locks { keys: HashMap::default() }) }
1269    }
1270
1271    /// Acquires the locks.  It is the caller's responsibility to ensure that drop_transaction is
1272    /// called when a transaction is dropped i.e. the filesystem's drop_transaction method should
1273    /// call LockManager's drop_transaction method.
1274    pub async fn txn_lock<'a>(&'a self, lock_keys: LockKeys) -> TransactionLocks<'a> {
1275        TransactionLocks(
1276            debug_assert_not_too_long!(self.lock(lock_keys, LockState::Locked)).right().unwrap(),
1277        )
1278    }
1279
1280    // `state` indicates the kind of lock required.  ReadLock means acquire a read lock.  Locked
1281    // means lock other writers, but still allow readers.  WriteLock means acquire a write lock.
1282    async fn lock<'a>(
1283        &'a self,
1284        mut lock_keys: LockKeys,
1285        target_state: LockState,
1286    ) -> Either<ReadGuard<'a>, WriteGuard<'a>> {
1287        let mut guard = match &target_state {
1288            LockState::ReadLock => Left(ReadGuard {
1289                manager: self.into(),
1290                lock_keys: LockKeys::with_capacity(lock_keys.len()),
1291            }),
1292            LockState::Locked | LockState::WriteLock => Right(WriteGuard {
1293                manager: self.into(),
1294                lock_keys: LockKeys::with_capacity(lock_keys.len()),
1295            }),
1296        };
1297        let guard_keys = match &mut guard {
1298            Left(g) => &mut g.lock_keys,
1299            Right(g) => &mut g.lock_keys,
1300        };
1301        lock_keys.sort_unstable();
1302        lock_keys.dedup();
1303        for lock in lock_keys.iter() {
1304            let lock_waker = None;
1305            pin_mut!(lock_waker);
1306            {
1307                let mut locks = self.locks.lock();
1308                match locks.keys.entry(*lock) {
1309                    Entry::Vacant(vacant) => {
1310                        vacant.insert(LockEntry {
1311                            read_count: if let LockState::ReadLock = target_state {
1312                                guard_keys.push(*lock);
1313                                1
1314                            } else {
1315                                guard_keys.push(*lock);
1316                                0
1317                            },
1318                            state: target_state,
1319                            head: std::ptr::null(),
1320                            tail: std::ptr::null(),
1321                        });
1322                    }
1323                    Entry::Occupied(mut occupied) => {
1324                        let entry = occupied.get_mut();
1325                        // SAFETY: We've acquired the lock.
1326                        if unsafe { entry.is_allowed(target_state, entry.head.is_null()) } {
1327                            if let LockState::ReadLock = target_state {
1328                                entry.read_count += 1;
1329                                guard_keys.push(*lock);
1330                            } else {
1331                                entry.state = target_state;
1332                                guard_keys.push(*lock);
1333                            }
1334                        } else {
1335                            // Initialise a waker and push it on the tail of the list.
1336                            // SAFETY: `lock_waker` isn't used prior to this point.
1337                            unsafe {
1338                                *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1339                                    next: UnsafeCell::new(std::ptr::null()),
1340                                    prev: UnsafeCell::new(entry.tail),
1341                                    key: *lock,
1342                                    waker: UnsafeCell::new(WakerState::Pending),
1343                                    target_state: target_state,
1344                                    is_upgrade: false,
1345                                    _pin: PhantomPinned,
1346                                });
1347                            }
1348                            let waker = (*lock_waker).as_ref().unwrap();
1349                            if entry.tail.is_null() {
1350                                entry.head = waker;
1351                            } else {
1352                                // SAFETY: We've acquired the lock.
1353                                unsafe {
1354                                    *(*entry.tail).next.get() = waker;
1355                                }
1356                            }
1357                            entry.tail = waker;
1358                        }
1359                    }
1360                }
1361            }
1362            if let Some(waker) = &*lock_waker {
1363                waker.wait(self).await;
1364                guard_keys.push(*lock);
1365            }
1366        }
1367        guard
1368    }
1369
1370    /// This should be called by the filesystem's drop_transaction implementation.
1371    pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
1372        let mut locks = self.locks.lock();
1373        locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks));
1374    }
1375
1376    /// Prepares to commit by waiting for readers to finish.
1377    pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
1378        self.commit_prepare_keys(&transaction.txn_locks).await;
1379    }
1380
1381    async fn commit_prepare_keys(&self, lock_keys: &LockKeys) {
1382        for lock in lock_keys.iter() {
1383            let lock_waker = None;
1384            pin_mut!(lock_waker);
1385            {
1386                let mut locks = self.locks.lock();
1387                let entry = locks.keys.get_mut(lock).unwrap();
1388                assert_eq!(entry.state, LockState::Locked);
1389
1390                if entry.read_count == 0 {
1391                    entry.state = LockState::WriteLock;
1392                } else {
1393                    // Initialise a waker and push it on the head of the list.
1394                    // SAFETY: `lock_waker` isn't used prior to this point.
1395                    unsafe {
1396                        *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1397                            next: UnsafeCell::new(entry.head),
1398                            prev: UnsafeCell::new(std::ptr::null()),
1399                            key: *lock,
1400                            waker: UnsafeCell::new(WakerState::Pending),
1401                            target_state: LockState::WriteLock,
1402                            is_upgrade: true,
1403                            _pin: PhantomPinned,
1404                        });
1405                    }
1406                    let waker = (*lock_waker).as_ref().unwrap();
1407                    if entry.head.is_null() {
1408                        entry.tail = (*lock_waker).as_ref().unwrap();
1409                    } else {
1410                        // SAFETY: We've acquired the lock.
1411                        unsafe {
1412                            *(*entry.head).prev.get() = waker;
1413                        }
1414                    }
1415                    entry.head = waker;
1416                }
1417            }
1418
1419            if let Some(waker) = &*lock_waker {
1420                waker.wait(self).await;
1421            }
1422        }
1423    }
1424
1425    /// Acquires a read lock for the given keys.  Read locks are only blocked whilst a transaction
1426    /// is being committed for the same locks.  They are only necessary where consistency is
1427    /// required between different mutations within a transaction.  For example, a write might
1428    /// change the size and extents for an object, in which case a read lock is required so that
1429    /// observed size and extents are seen together or not at all.
1430    pub async fn read_lock<'a>(&'a self, lock_keys: LockKeys) -> ReadGuard<'a> {
1431        debug_assert_not_too_long!(self.lock(lock_keys, LockState::ReadLock)).left().unwrap()
1432    }
1433
1434    /// Acquires a write lock for the given keys.  Write locks provide exclusive access to the
1435    /// requested lock keys.
1436    pub async fn write_lock<'a>(&'a self, lock_keys: LockKeys) -> WriteGuard<'a> {
1437        debug_assert_not_too_long!(self.lock(lock_keys, LockState::WriteLock)).right().unwrap()
1438    }
1439
1440    /// Downgrades locks from the WriteLock state to Locked state.  This will panic if the locks are
1441    /// not in the WriteLock state.
1442    pub fn downgrade_locks(&self, lock_keys: &LockKeys) {
1443        self.locks.lock().downgrade_locks(lock_keys);
1444    }
1445}
1446
1447// These unsafe functions require that `locks` in LockManager is locked.
1448impl LockEntry {
1449    unsafe fn wake(&mut self) {
1450        // If the lock's state is WriteLock, or there's nothing waiting, return early.
1451        if self.head.is_null() || self.state == LockState::WriteLock {
1452            return;
1453        }
1454
1455        let waker = unsafe { &*self.head };
1456
1457        if waker.is_upgrade {
1458            if self.read_count > 0 {
1459                return;
1460            }
1461        } else if !unsafe { self.is_allowed(waker.target_state, true) } {
1462            return;
1463        }
1464
1465        unsafe { self.pop_and_wake() };
1466
1467        // If the waker was a write lock, we can't wake any more up, but otherwise, we can keep
1468        // waking up readers.
1469        if waker.target_state == LockState::WriteLock {
1470            return;
1471        }
1472
1473        while !self.head.is_null() && unsafe { (*self.head).target_state } == LockState::ReadLock {
1474            unsafe { self.pop_and_wake() };
1475        }
1476    }
1477
1478    unsafe fn pop_and_wake(&mut self) {
1479        let waker = unsafe { &*self.head };
1480
1481        // Pop the waker.
1482        self.head = unsafe { *waker.next.get() };
1483        if self.head.is_null() {
1484            self.tail = std::ptr::null()
1485        } else {
1486            unsafe { *(*self.head).prev.get() = std::ptr::null() };
1487        }
1488
1489        // Adjust our state accordingly.
1490        if waker.target_state == LockState::ReadLock {
1491            self.read_count += 1;
1492        } else {
1493            self.state = waker.target_state;
1494        }
1495
1496        // Now wake the task.
1497        if let WakerState::Registered(waker) =
1498            std::mem::replace(unsafe { &mut *waker.waker.get() }, WakerState::Woken)
1499        {
1500            waker.wake();
1501        }
1502    }
1503
1504    fn can_remove(&self) -> bool {
1505        self.state == LockState::ReadLock && self.read_count == 0
1506    }
1507
1508    unsafe fn remove_waker(&mut self, waker: &LockWaker) {
1509        unsafe {
1510            let is_first = (*waker.prev.get()).is_null();
1511            if is_first {
1512                self.head = *waker.next.get();
1513            } else {
1514                *(**waker.prev.get()).next.get() = *waker.next.get();
1515            }
1516            if (*waker.next.get()).is_null() {
1517                self.tail = *waker.prev.get();
1518            } else {
1519                *(**waker.next.get()).prev.get() = *waker.prev.get();
1520            }
1521            if is_first {
1522                // We must call wake in case we erased a pending write lock and readers can now
1523                // proceed.
1524                self.wake();
1525            }
1526        }
1527    }
1528
1529    // Returns whether or not a lock with given `target_state` can proceed.  `is_head` should be
1530    // true if this is something at the head of the waker list (or the waker list is empty) and
1531    // false if there are other items on the waker list that are prior.
1532    unsafe fn is_allowed(&self, target_state: LockState, is_head: bool) -> bool {
1533        match self.state {
1534            LockState::ReadLock => {
1535                // Allow ReadLock and Locked so long as nothing else is waiting.
1536                (self.read_count == 0
1537                    || target_state == LockState::Locked
1538                    || target_state == LockState::ReadLock)
1539                    && is_head
1540            }
1541            LockState::Locked => {
1542                // Always allow reads unless there's an upgrade waiting.  We have to
1543                // always allow reads in this state because tasks that have locks in
1544                // the Locked state can later try and acquire ReadLock.
1545                target_state == LockState::ReadLock
1546                    && (is_head || unsafe { !(*self.head).is_upgrade })
1547            }
1548            LockState::WriteLock => false,
1549        }
1550    }
1551
1552    unsafe fn downgrade_lock(&mut self) {
1553        assert_eq!(std::mem::replace(&mut self.state, LockState::Locked), LockState::WriteLock);
1554        unsafe { self.wake() };
1555    }
1556}
1557
1558#[must_use]
1559pub struct ReadGuard<'a> {
1560    manager: LockManagerRef<'a>,
1561    lock_keys: LockKeys,
1562}
1563
1564impl ReadGuard<'_> {
1565    pub fn fs(&self) -> Option<&Arc<FxFilesystem>> {
1566        if let LockManagerRef::Owned(fs) = &self.manager { Some(fs) } else { None }
1567    }
1568
1569    pub fn into_owned(mut self, fs: Arc<FxFilesystem>) -> ReadGuard<'static> {
1570        ReadGuard {
1571            manager: LockManagerRef::Owned(fs),
1572            lock_keys: std::mem::replace(&mut self.lock_keys, LockKeys::None),
1573        }
1574    }
1575}
1576
1577impl Drop for ReadGuard<'_> {
1578    fn drop(&mut self) {
1579        let mut locks = self.manager.locks.lock();
1580        locks.drop_read_locks(std::mem::take(&mut self.lock_keys));
1581    }
1582}
1583
1584impl fmt::Debug for ReadGuard<'_> {
1585    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1586        f.debug_struct("ReadGuard")
1587            .field("manager", &(&self.manager as *const _))
1588            .field("lock_keys", &self.lock_keys)
1589            .finish()
1590    }
1591}
1592
1593#[must_use]
1594pub struct WriteGuard<'a> {
1595    manager: LockManagerRef<'a>,
1596    lock_keys: LockKeys,
1597}
1598
1599impl Drop for WriteGuard<'_> {
1600    fn drop(&mut self) {
1601        let mut locks = self.manager.locks.lock();
1602        locks.drop_write_locks(std::mem::take(&mut self.lock_keys));
1603    }
1604}
1605
1606impl fmt::Debug for WriteGuard<'_> {
1607    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1608        f.debug_struct("WriteGuard")
1609            .field("manager", &(&self.manager as *const _))
1610            .field("lock_keys", &self.lock_keys)
1611            .finish()
1612    }
1613}
1614
1615enum LockManagerRef<'a> {
1616    Borrowed(&'a LockManager),
1617    Owned(Arc<FxFilesystem>),
1618}
1619
1620impl Deref for LockManagerRef<'_> {
1621    type Target = LockManager;
1622
1623    fn deref(&self) -> &Self::Target {
1624        match self {
1625            LockManagerRef::Borrowed(m) => m,
1626            LockManagerRef::Owned(f) => f.lock_manager(),
1627        }
1628    }
1629}
1630
1631impl<'a> From<&'a LockManager> for LockManagerRef<'a> {
1632    fn from(value: &'a LockManager) -> Self {
1633        LockManagerRef::Borrowed(value)
1634    }
1635}
1636
1637#[cfg(test)]
1638mod tests {
1639    use super::{LockKey, LockKeys, LockManager, LockState, Mutation, Options};
1640    use crate::filesystem::FxFilesystem;
1641    use fuchsia_async as fasync;
1642    use fuchsia_sync::Mutex;
1643    use futures::channel::oneshot::channel;
1644    use futures::future::FutureExt;
1645    use futures::stream::FuturesUnordered;
1646    use futures::{StreamExt, join, pin_mut};
1647    use std::task::Poll;
1648    use std::time::Duration;
1649    use storage_device::DeviceHolder;
1650    use storage_device::fake_device::FakeDevice;
1651
1652    #[fuchsia::test]
1653    async fn test_simple() {
1654        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1655        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1656        let mut t = fs
1657            .clone()
1658            .new_transaction(lock_keys![], Options::default())
1659            .await
1660            .expect("new_transaction failed");
1661        t.add(1, Mutation::BeginFlush);
1662        assert!(!t.is_empty());
1663    }
1664
1665    #[fuchsia::test]
1666    async fn test_locks() {
1667        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1668        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1669        let (send1, recv1) = channel();
1670        let (send2, recv2) = channel();
1671        let (send3, recv3) = channel();
1672        let done = Mutex::new(false);
1673        let mut futures = FuturesUnordered::new();
1674        futures.push(
1675            async {
1676                let _t = fs
1677                    .clone()
1678                    .new_transaction(
1679                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1680                        Options::default(),
1681                    )
1682                    .await
1683                    .expect("new_transaction failed");
1684                send1.send(()).unwrap(); // Tell the next future to continue.
1685                send3.send(()).unwrap(); // Tell the last future to continue.
1686                recv2.await.unwrap();
1687                // This is a halting problem so all we can do is sleep.
1688                fasync::Timer::new(Duration::from_millis(100)).await;
1689                assert!(!*done.lock());
1690            }
1691            .boxed(),
1692        );
1693        futures.push(
1694            async {
1695                recv1.await.unwrap();
1696                // This should not block since it is a different key.
1697                let _t = fs
1698                    .clone()
1699                    .new_transaction(
1700                        lock_keys![LockKey::object_attribute(2, 2, 3)],
1701                        Options::default(),
1702                    )
1703                    .await
1704                    .expect("new_transaction failed");
1705                // Tell the first future to continue.
1706                send2.send(()).unwrap();
1707            }
1708            .boxed(),
1709        );
1710        futures.push(
1711            async {
1712                // This should block until the first future has completed.
1713                recv3.await.unwrap();
1714                let _t = fs
1715                    .clone()
1716                    .new_transaction(
1717                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1718                        Options::default(),
1719                    )
1720                    .await;
1721                *done.lock() = true;
1722            }
1723            .boxed(),
1724        );
1725        while let Some(()) = futures.next().await {}
1726    }
1727
1728    #[fuchsia::test]
1729    async fn test_read_lock_after_write_lock() {
1730        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1731        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1732        let (send1, recv1) = channel();
1733        let (send2, recv2) = channel();
1734        let done = Mutex::new(false);
1735        join!(
1736            async {
1737                let t = fs
1738                    .clone()
1739                    .new_transaction(
1740                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1741                        Options::default(),
1742                    )
1743                    .await
1744                    .expect("new_transaction failed");
1745                send1.send(()).unwrap(); // Tell the next future to continue.
1746                recv2.await.unwrap();
1747                t.commit().await.expect("commit failed");
1748                *done.lock() = true;
1749            },
1750            async {
1751                recv1.await.unwrap();
1752                // Reads should not be blocked until the transaction is committed.
1753                let _guard = fs
1754                    .lock_manager()
1755                    .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1756                    .await;
1757                // Tell the first future to continue.
1758                send2.send(()).unwrap();
1759                // It shouldn't proceed until we release our read lock, but it's a halting
1760                // problem, so sleep.
1761                fasync::Timer::new(Duration::from_millis(100)).await;
1762                assert!(!*done.lock());
1763            },
1764        );
1765    }
1766
1767    #[fuchsia::test]
1768    async fn test_write_lock_after_read_lock() {
1769        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1770        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1771        let (send1, recv1) = channel();
1772        let (send2, recv2) = channel();
1773        let done = Mutex::new(false);
1774        join!(
1775            async {
1776                // Reads should not be blocked until the transaction is committed.
1777                let _guard = fs
1778                    .lock_manager()
1779                    .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1780                    .await;
1781                // Tell the next future to continue and then wait.
1782                send1.send(()).unwrap();
1783                recv2.await.unwrap();
1784                // It shouldn't proceed until we release our read lock, but it's a halting
1785                // problem, so sleep.
1786                fasync::Timer::new(Duration::from_millis(100)).await;
1787                assert!(!*done.lock());
1788            },
1789            async {
1790                recv1.await.unwrap();
1791                let t = fs
1792                    .clone()
1793                    .new_transaction(
1794                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1795                        Options::default(),
1796                    )
1797                    .await
1798                    .expect("new_transaction failed");
1799                send2.send(()).unwrap(); // Tell the first future to continue;
1800                t.commit().await.expect("commit failed");
1801                *done.lock() = true;
1802            },
1803        );
1804    }
1805
1806    #[fuchsia::test]
1807    async fn test_drop_uncommitted_transaction() {
1808        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1809        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1810        let key = lock_keys![LockKey::object(1, 1)];
1811
1812        // Dropping while there's a reader.
1813        {
1814            let _write_lock = fs
1815                .clone()
1816                .new_transaction(key.clone(), Options::default())
1817                .await
1818                .expect("new_transaction failed");
1819            let _read_lock = fs.lock_manager().read_lock(key.clone()).await;
1820        }
1821        // Dropping while there's no reader.
1822        {
1823            let _write_lock = fs
1824                .clone()
1825                .new_transaction(key.clone(), Options::default())
1826                .await
1827                .expect("new_transaction failed");
1828        }
1829        // Make sure we can take the lock again (i.e. it was actually released).
1830        fs.clone()
1831            .new_transaction(key.clone(), Options::default())
1832            .await
1833            .expect("new_transaction failed");
1834    }
1835
1836    #[fuchsia::test]
1837    async fn test_drop_waiting_write_lock() {
1838        let manager = LockManager::new();
1839        let keys = lock_keys![LockKey::object(1, 1)];
1840        {
1841            let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1842            if let Poll::Ready(_) =
1843                futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1844            {
1845                assert!(false);
1846            }
1847        }
1848        let _ = manager.lock(keys, LockState::WriteLock).await;
1849    }
1850
1851    #[fuchsia::test]
1852    async fn test_write_lock_blocks_everything() {
1853        let manager = LockManager::new();
1854        let keys = lock_keys![LockKey::object(1, 1)];
1855        {
1856            let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1857            if let Poll::Ready(_) =
1858                futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1859            {
1860                assert!(false);
1861            }
1862            if let Poll::Ready(_) =
1863                futures::poll!(manager.lock(keys.clone(), LockState::ReadLock).boxed())
1864            {
1865                assert!(false);
1866            }
1867        }
1868        {
1869            let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1870        }
1871        {
1872            let _guard = manager.lock(keys, LockState::ReadLock).await;
1873        }
1874    }
1875
1876    #[fuchsia::test]
1877    async fn test_downgrade_locks() {
1878        let manager = LockManager::new();
1879        let keys = lock_keys![LockKey::object(1, 1)];
1880        let _guard = manager.txn_lock(keys.clone()).await;
1881        manager.commit_prepare_keys(&keys).await;
1882
1883        // Use FuturesUnordered so that we can check that the waker is woken.
1884        let mut read_lock: FuturesUnordered<_> =
1885            std::iter::once(manager.read_lock(keys.clone())).collect();
1886
1887        // Trying to acquire a read lock now should be blocked.
1888        assert!(futures::poll!(read_lock.next()).is_pending());
1889
1890        manager.downgrade_locks(&keys);
1891
1892        // After downgrading, it should be possible to take a read lock.
1893        assert!(futures::poll!(read_lock.next()).is_ready());
1894    }
1895
1896    #[fuchsia::test]
1897    async fn test_dropped_write_lock_wakes() {
1898        let manager = LockManager::new();
1899        let keys = lock_keys![LockKey::object(1, 1)];
1900        let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1901        let mut read_lock = FuturesUnordered::new();
1902        read_lock.push(manager.lock(keys.clone(), LockState::ReadLock));
1903
1904        {
1905            let write_lock = manager.lock(keys, LockState::WriteLock);
1906            pin_mut!(write_lock);
1907
1908            // The write lock should be blocked because of the read lock.
1909            assert!(futures::poll!(write_lock).is_pending());
1910
1911            // Another read lock should be blocked because of the write lock.
1912            assert!(futures::poll!(read_lock.next()).is_pending());
1913        }
1914
1915        // Dropping the write lock should allow the read lock to proceed.
1916        assert!(futures::poll!(read_lock.next()).is_ready());
1917    }
1918
1919    #[fuchsia::test]
1920    async fn test_drop_upgrade() {
1921        let manager = LockManager::new();
1922        let keys = lock_keys![LockKey::object(1, 1)];
1923        let _guard = manager.lock(keys.clone(), LockState::Locked).await;
1924
1925        {
1926            let commit_prepare = manager.commit_prepare_keys(&keys);
1927            pin_mut!(commit_prepare);
1928            let _read_guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1929            assert!(futures::poll!(commit_prepare).is_pending());
1930
1931            // Now we test dropping read_guard which should wake commit_prepare and
1932            // then dropping commit_prepare.
1933        }
1934
1935        // We should be able to still commit_prepare.
1936        manager.commit_prepare_keys(&keys).await;
1937    }
1938
1939    #[fasync::run_singlethreaded(test)]
1940    async fn test_woken_upgrade_blocks_reads() {
1941        let manager = LockManager::new();
1942        let keys = lock_keys![LockKey::object(1, 1)];
1943        // Start with a transaction lock.
1944        let guard = manager.lock(keys.clone(), LockState::Locked).await;
1945
1946        // Take a read lock.
1947        let read1 = manager.lock(keys.clone(), LockState::ReadLock).await;
1948
1949        // Try and upgrade the transaction lock, which should not be possible because of the read.
1950        let commit_prepare = manager.commit_prepare_keys(&keys);
1951        pin_mut!(commit_prepare);
1952        assert!(futures::poll!(commit_prepare.as_mut()).is_pending());
1953
1954        // Taking another read should also be blocked.
1955        let read2 = manager.lock(keys.clone(), LockState::ReadLock);
1956        pin_mut!(read2);
1957        assert!(futures::poll!(read2.as_mut()).is_pending());
1958
1959        // Drop the first read and the upgrade should complete.
1960        std::mem::drop(read1);
1961        assert!(futures::poll!(commit_prepare).is_ready());
1962
1963        // But the second read should still be blocked.
1964        assert!(futures::poll!(read2.as_mut()).is_pending());
1965
1966        // If we drop the write lock now, the read should be unblocked.
1967        std::mem::drop(guard);
1968        assert!(futures::poll!(read2).is_ready());
1969    }
1970
1971    static LOCK_KEY_1: LockKey = LockKey::flush(1);
1972    static LOCK_KEY_2: LockKey = LockKey::flush(2);
1973    static LOCK_KEY_3: LockKey = LockKey::flush(3);
1974
1975    // The keys, storage method, and capacity must all match.
1976    fn assert_lock_keys_equal(value: &LockKeys, expected: &LockKeys) {
1977        match (value, expected) {
1978            (LockKeys::None, LockKeys::None) => {}
1979            (LockKeys::Inline(key1), LockKeys::Inline(key2)) => {
1980                if key1 != key2 {
1981                    panic!("{key1:?} != {key2:?}");
1982                }
1983            }
1984            (LockKeys::Vec(vec1), LockKeys::Vec(vec2)) => {
1985                if vec1 != vec2 {
1986                    panic!("{vec1:?} != {vec2:?}");
1987                }
1988                if vec1.capacity() != vec2.capacity() {
1989                    panic!(
1990                        "LockKeys have different capacity: {} != {}",
1991                        vec1.capacity(),
1992                        vec2.capacity()
1993                    );
1994                }
1995            }
1996            (_, _) => panic!("{value:?} != {expected:?}"),
1997        }
1998    }
1999
2000    // Only the keys must match. Storage method and capacity don't matter.
2001    fn assert_lock_keys_equivalent(value: &LockKeys, expected: &LockKeys) {
2002        let value: Vec<_> = value.iter().collect();
2003        let expected: Vec<_> = expected.iter().collect();
2004        assert_eq!(value, expected);
2005    }
2006
2007    #[test]
2008    fn test_lock_keys_macro() {
2009        assert_lock_keys_equal(&lock_keys![], &LockKeys::None);
2010        assert_lock_keys_equal(&lock_keys![LOCK_KEY_1], &LockKeys::Inline(LOCK_KEY_1));
2011        assert_lock_keys_equal(
2012            &lock_keys![LOCK_KEY_1, LOCK_KEY_2],
2013            &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]),
2014        );
2015    }
2016
2017    #[test]
2018    fn test_lock_keys_with_capacity() {
2019        assert_lock_keys_equal(&LockKeys::with_capacity(0), &LockKeys::None);
2020        assert_lock_keys_equal(&LockKeys::with_capacity(1), &LockKeys::None);
2021        assert_lock_keys_equal(&LockKeys::with_capacity(2), &LockKeys::Vec(Vec::with_capacity(2)));
2022    }
2023
2024    #[test]
2025    fn test_lock_keys_len() {
2026        assert_eq!(lock_keys![].len(), 0);
2027        assert_eq!(lock_keys![LOCK_KEY_1].len(), 1);
2028        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].len(), 2);
2029    }
2030
2031    #[test]
2032    fn test_lock_keys_contains() {
2033        assert_eq!(lock_keys![].contains(&LOCK_KEY_1), false);
2034        assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_1), true);
2035        assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_2), false);
2036        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_1), true);
2037        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_2), true);
2038        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_3), false);
2039    }
2040
2041    #[test]
2042    fn test_lock_keys_push() {
2043        let mut keys = lock_keys![];
2044        keys.push(LOCK_KEY_1);
2045        assert_lock_keys_equal(&keys, &LockKeys::Inline(LOCK_KEY_1));
2046        keys.push(LOCK_KEY_2);
2047        assert_lock_keys_equal(&keys, &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]));
2048        keys.push(LOCK_KEY_3);
2049        assert_lock_keys_equivalent(
2050            &keys,
2051            &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2, LOCK_KEY_3]),
2052        );
2053    }
2054
2055    #[test]
2056    fn test_lock_keys_sort_unstable() {
2057        let mut keys = lock_keys![];
2058        keys.sort_unstable();
2059        assert_lock_keys_equal(&keys, &lock_keys![]);
2060
2061        let mut keys = lock_keys![LOCK_KEY_1];
2062        keys.sort_unstable();
2063        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2064
2065        let mut keys = lock_keys![LOCK_KEY_2, LOCK_KEY_1];
2066        keys.sort_unstable();
2067        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2068    }
2069
2070    #[test]
2071    fn test_lock_keys_dedup() {
2072        let mut keys = lock_keys![];
2073        keys.dedup();
2074        assert_lock_keys_equal(&keys, &lock_keys![]);
2075
2076        let mut keys = lock_keys![LOCK_KEY_1];
2077        keys.dedup();
2078        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2079
2080        let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_1];
2081        keys.dedup();
2082        assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2083    }
2084
2085    #[test]
2086    fn test_lock_keys_truncate() {
2087        let mut keys = lock_keys![];
2088        keys.truncate(5);
2089        assert_lock_keys_equal(&keys, &lock_keys![]);
2090        keys.truncate(0);
2091        assert_lock_keys_equal(&keys, &lock_keys![]);
2092
2093        let mut keys = lock_keys![LOCK_KEY_1];
2094        keys.truncate(5);
2095        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2096        keys.truncate(0);
2097        assert_lock_keys_equal(&keys, &lock_keys![]);
2098
2099        let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_2];
2100        keys.truncate(5);
2101        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2102        keys.truncate(1);
2103        // Although there's only 1 key after truncate the key is not stored inline.
2104        assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2105    }
2106
2107    #[test]
2108    fn test_lock_keys_iter() {
2109        assert_eq!(lock_keys![].iter().collect::<Vec<_>>(), Vec::<&LockKey>::new());
2110
2111        assert_eq!(lock_keys![LOCK_KEY_1].iter().collect::<Vec<_>>(), vec![&LOCK_KEY_1]);
2112
2113        assert_eq!(
2114            lock_keys![LOCK_KEY_1, LOCK_KEY_2].iter().collect::<Vec<_>>(),
2115            vec![&LOCK_KEY_1, &LOCK_KEY_2]
2116        );
2117    }
2118}