fxfs/object_store/
transaction.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::Checksum;
6use crate::debug_assert_not_too_long;
7use crate::filesystem::{FxFilesystem, TxnGuard};
8use crate::log::*;
9use crate::lsm_tree::types::Item;
10use crate::object_handle::INVALID_OBJECT_ID;
11use crate::object_store::allocator::{AllocatorItem, Reservation};
12use crate::object_store::object_manager::{reserved_space_from_journal_usage, ObjectManager};
13use crate::object_store::object_record::{
14    ObjectItem, ObjectItemV40, ObjectItemV41, ObjectItemV43, ObjectKey, ObjectKeyData, ObjectValue,
15    ProjectProperty,
16};
17use crate::serialized_types::{migrate_nodefault, migrate_to_version, Migrate, Versioned};
18use anyhow::Error;
19use either::{Either, Left, Right};
20use fprint::TypeFingerprint;
21use fuchsia_sync::Mutex;
22use futures::future::poll_fn;
23use futures::pin_mut;
24use fxfs_crypto::{WrappedKey, WrappedKeyV32, WrappedKeyV40};
25use rustc_hash::FxHashMap as HashMap;
26use scopeguard::ScopeGuard;
27use serde::{Deserialize, Serialize};
28use std::cell::UnsafeCell;
29use std::cmp::Ordering;
30use std::collections::hash_map::Entry;
31use std::collections::BTreeSet;
32use std::marker::PhantomPinned;
33use std::ops::{Deref, DerefMut, Range};
34use std::sync::Arc;
35use std::task::{Poll, Waker};
36use std::{fmt, mem};
37
38/// This allows for special handling of certain transactions such as deletes and the
39/// extension of Journal extents. For most other use cases it is appropriate to use
40/// `default()` here.
41#[derive(Clone, Copy, Default)]
42pub struct Options<'a> {
43    /// If true, don't check for low journal space.  This should be true for any transactions that
44    /// might alleviate journal space (i.e. compaction).
45    pub skip_journal_checks: bool,
46
47    /// If true, borrow metadata space from the metadata reservation.  This setting should be set to
48    /// true for any transaction that will either not affect space usage after compaction
49    /// (e.g. setting attributes), or reduce space usage (e.g. unlinking).  Otherwise, a transaction
50    /// might fail with an out-of-space error.
51    pub borrow_metadata_space: bool,
52
53    /// If specified, a reservation to be used with the transaction.  If not set, any allocations
54    /// that are part of this transaction will have to take their chances, and will fail if there is
55    /// no free space.  The intention is that this should be used for things like the journal which
56    /// require guaranteed space.
57    pub allocator_reservation: Option<&'a Reservation>,
58
59    /// An existing transaction guard to be used.
60    pub txn_guard: Option<&'a TxnGuard<'a>>,
61}
62
63// This is the amount of space that we reserve for metadata when we are creating a new transaction.
64// A transaction should not take more than this.  This is expressed in terms of space occupied in
65// the journal; transactions must not take up more space in the journal than the number below.  The
66// amount chosen here must be large enough for the maximum possible transaction that can be created,
67// so transactions always need to be bounded which might involve splitting an operation up into
68// smaller transactions.
69pub const TRANSACTION_MAX_JOURNAL_USAGE: u64 = 24_576;
70pub const TRANSACTION_METADATA_MAX_AMOUNT: u64 =
71    reserved_space_from_journal_usage(TRANSACTION_MAX_JOURNAL_USAGE);
72
73#[must_use]
74pub struct TransactionLocks<'a>(pub WriteGuard<'a>);
75
76/// The journal consists of these records which will be replayed at mount time.  Within a
77/// transaction, these are stored as a set which allows some mutations to be deduplicated and found
78/// (and we require custom comparison functions below).  For example, we need to be able to find
79/// object size changes.
80pub type Mutation = MutationV43;
81
82#[derive(
83    Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned,
84)]
85#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
86pub enum MutationV43 {
87    ObjectStore(ObjectStoreMutationV43),
88    EncryptedObjectStore(Box<[u8]>),
89    Allocator(AllocatorMutationV32),
90    // Indicates the beginning of a flush.  This would typically involve sealing a tree.
91    BeginFlush,
92    // Indicates the end of a flush.  This would typically involve replacing the immutable layers
93    // with compacted ones.
94    EndFlush,
95    // Volume has been deleted.  Requires we remove it from the set of managed ObjectStore.
96    DeleteVolume,
97    UpdateBorrowed(u64),
98    UpdateMutationsKey(UpdateMutationsKeyV40),
99    CreateInternalDir(u64),
100}
101
102#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
103#[migrate_to_version(MutationV43)]
104pub enum MutationV41 {
105    ObjectStore(ObjectStoreMutationV41),
106    EncryptedObjectStore(Box<[u8]>),
107    Allocator(AllocatorMutationV32),
108    BeginFlush,
109    EndFlush,
110    DeleteVolume,
111    UpdateBorrowed(u64),
112    UpdateMutationsKey(UpdateMutationsKeyV40),
113    CreateInternalDir(u64),
114}
115
116#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
117#[migrate_to_version(MutationV41)]
118pub enum MutationV40 {
119    ObjectStore(ObjectStoreMutationV40),
120    EncryptedObjectStore(Box<[u8]>),
121    Allocator(AllocatorMutationV32),
122    // Indicates the beginning of a flush.  This would typically involve sealing a tree.
123    BeginFlush,
124    // Indicates the end of a flush.  This would typically involve replacing the immutable layers
125    // with compacted ones.
126    EndFlush,
127    // Volume has been deleted.  Requires we remove it from the set of managed ObjectStore.
128    DeleteVolume,
129    UpdateBorrowed(u64),
130    UpdateMutationsKey(UpdateMutationsKeyV40),
131    CreateInternalDir(u64),
132}
133
134impl Mutation {
135    pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
136        Mutation::ObjectStore(ObjectStoreMutation {
137            item: Item::new(key, value),
138            op: Operation::Insert,
139        })
140    }
141
142    pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
143        Mutation::ObjectStore(ObjectStoreMutation {
144            item: Item::new(key, value),
145            op: Operation::ReplaceOrInsert,
146        })
147    }
148
149    pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
150        Mutation::ObjectStore(ObjectStoreMutation {
151            item: Item::new(key, value),
152            op: Operation::Merge,
153        })
154    }
155
156    pub fn update_mutations_key(key: WrappedKey) -> Self {
157        Mutation::UpdateMutationsKey(key.into())
158    }
159}
160
161// We have custom comparison functions for mutations that just use the key, rather than the key and
162// value that would be used by default so that we can deduplicate and find mutations (see
163// get_object_mutation below).
164pub type ObjectStoreMutation = ObjectStoreMutationV43;
165
166#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
167#[migrate_nodefault]
168#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
169pub struct ObjectStoreMutationV43 {
170    pub item: ObjectItemV43,
171    pub op: OperationV32,
172}
173
174#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
175#[migrate_to_version(ObjectStoreMutationV43)]
176#[migrate_nodefault]
177pub struct ObjectStoreMutationV41 {
178    pub item: ObjectItemV41,
179    pub op: OperationV32,
180}
181
182#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
183#[migrate_nodefault]
184#[migrate_to_version(ObjectStoreMutationV41)]
185pub struct ObjectStoreMutationV40 {
186    pub item: ObjectItemV40,
187    pub op: OperationV32,
188}
189
190/// The different LSM tree operations that can be performed as part of a mutation.
191pub type Operation = OperationV32;
192
193#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
194#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
195pub enum OperationV32 {
196    Insert,
197    ReplaceOrInsert,
198    Merge,
199}
200
201impl Ord for ObjectStoreMutation {
202    fn cmp(&self, other: &Self) -> Ordering {
203        self.item.key.cmp(&other.item.key)
204    }
205}
206
207impl PartialOrd for ObjectStoreMutation {
208    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
209        Some(self.cmp(other))
210    }
211}
212
213impl PartialEq for ObjectStoreMutation {
214    fn eq(&self, other: &Self) -> bool {
215        self.item.key.eq(&other.item.key)
216    }
217}
218
219impl Eq for ObjectStoreMutation {}
220
221impl Ord for AllocatorItem {
222    fn cmp(&self, other: &Self) -> Ordering {
223        self.key.cmp(&other.key)
224    }
225}
226
227impl PartialOrd for AllocatorItem {
228    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
229        Some(self.cmp(other))
230    }
231}
232
233/// Same as std::ops::Range but with Ord and PartialOrd support, sorted first by start of the range,
234/// then by the end.
235pub type DeviceRange = DeviceRangeV32;
236
237#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypeFingerprint)]
238#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
239pub struct DeviceRangeV32(pub Range<u64>);
240
241impl Deref for DeviceRange {
242    type Target = Range<u64>;
243
244    fn deref(&self) -> &Self::Target {
245        &self.0
246    }
247}
248
249impl DerefMut for DeviceRange {
250    fn deref_mut(&mut self) -> &mut Self::Target {
251        &mut self.0
252    }
253}
254
255impl From<Range<u64>> for DeviceRange {
256    fn from(range: Range<u64>) -> Self {
257        Self(range)
258    }
259}
260
261impl Into<Range<u64>> for DeviceRange {
262    fn into(self) -> Range<u64> {
263        self.0
264    }
265}
266
267impl Ord for DeviceRange {
268    fn cmp(&self, other: &Self) -> Ordering {
269        self.start.cmp(&other.start).then(self.end.cmp(&other.end))
270    }
271}
272
273impl PartialOrd for DeviceRange {
274    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
275        Some(self.cmp(other))
276    }
277}
278
279pub type AllocatorMutation = AllocatorMutationV32;
280
281#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint)]
282#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
283pub enum AllocatorMutationV32 {
284    Allocate {
285        device_range: DeviceRangeV32,
286        owner_object_id: u64,
287    },
288    Deallocate {
289        device_range: DeviceRangeV32,
290        owner_object_id: u64,
291    },
292    SetLimit {
293        owner_object_id: u64,
294        bytes: u64,
295    },
296    /// Marks all extents with a given owner_object_id for deletion.
297    /// Used to free space allocated to encrypted ObjectStore where we may not have the key.
298    /// Note that the actual deletion time is undefined so this should never be used where an
299    /// ObjectStore is still in use due to a high risk of corruption. Similarly, owner_object_id
300    /// should never be reused for the same reasons.
301    MarkForDeletion(u64),
302}
303
304pub type UpdateMutationsKey = UpdateMutationsKeyV40;
305
306#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
307pub struct UpdateMutationsKeyV40(pub WrappedKeyV40);
308
309#[derive(Serialize, Deserialize, TypeFingerprint)]
310pub struct UpdateMutationsKeyV32(pub WrappedKeyV32);
311
312impl From<UpdateMutationsKeyV32> for UpdateMutationsKeyV40 {
313    fn from(value: UpdateMutationsKeyV32) -> Self {
314        Self(value.0.into())
315    }
316}
317
318impl From<UpdateMutationsKey> for WrappedKey {
319    fn from(outer: UpdateMutationsKey) -> Self {
320        outer.0
321    }
322}
323
324impl From<WrappedKey> for UpdateMutationsKey {
325    fn from(inner: WrappedKey) -> Self {
326        Self(inner)
327    }
328}
329
330#[cfg(fuzz)]
331impl<'a> arbitrary::Arbitrary<'a> for UpdateMutationsKey {
332    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
333        <u128>::arbitrary(u).map(|wrapping_key_id| {
334            UpdateMutationsKey::from(WrappedKey {
335                wrapping_key_id,
336                // There doesn't seem to be much point to randomly generate crypto keys.
337                key: fxfs_crypto::WrappedKeyBytes::default(),
338            })
339        })
340    }
341}
342
343impl Ord for UpdateMutationsKey {
344    fn cmp(&self, other: &Self) -> Ordering {
345        (self as *const UpdateMutationsKey).cmp(&(other as *const _))
346    }
347}
348
349impl PartialOrd for UpdateMutationsKey {
350    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
351        Some(self.cmp(other))
352    }
353}
354
355impl Eq for UpdateMutationsKey {}
356
357impl PartialEq for UpdateMutationsKey {
358    fn eq(&self, other: &Self) -> bool {
359        std::ptr::eq(self, other)
360    }
361}
362
363/// When creating a transaction, locks typically need to be held to prevent two or more writers
364/// trying to make conflicting mutations at the same time.  LockKeys are used for this.
365/// NOTE: Ordering is important here!  The lock manager sorts the list of locks in a transaction
366/// to acquire them in a consistent order, but there are special cases for the Filesystem lock and
367/// the Flush lock.
368/// The Filesystem lock is taken by every transaction and is done so first, as part of the TxnGuard.
369/// The Flush lock is taken when we flush an LSM tree (e.g. an object store), and is held for
370/// several transactions.  As such, it must come first in the lock acquisition ordering, so that
371/// other transactions using the Flush lock have the same ordering as in flushing.
372#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Copy)]
373pub enum LockKey {
374    /// Locks the entire filesystem.
375    Filesystem,
376
377    /// Used to lock flushing an object.
378    Flush {
379        object_id: u64,
380    },
381
382    /// Used to lock changes to a particular object attribute (e.g. writes).
383    ObjectAttribute {
384        store_object_id: u64,
385        object_id: u64,
386        attribute_id: u64,
387    },
388
389    /// Used to lock changes to a particular object (e.g. adding a child to a directory).
390    Object {
391        store_object_id: u64,
392        object_id: u64,
393    },
394
395    ProjectId {
396        store_object_id: u64,
397        project_id: u64,
398    },
399
400    /// Used to lock any truncate operations for a file.
401    Truncate {
402        store_object_id: u64,
403        object_id: u64,
404    },
405}
406
407impl LockKey {
408    pub const fn object_attribute(store_object_id: u64, object_id: u64, attribute_id: u64) -> Self {
409        LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
410    }
411
412    pub const fn object(store_object_id: u64, object_id: u64) -> Self {
413        LockKey::Object { store_object_id, object_id }
414    }
415
416    pub const fn flush(object_id: u64) -> Self {
417        LockKey::Flush { object_id }
418    }
419
420    pub const fn truncate(store_object_id: u64, object_id: u64) -> Self {
421        LockKey::Truncate { store_object_id, object_id }
422    }
423}
424
425/// A container for holding `LockKey` objects. Can store a single `LockKey` inline.
426#[derive(Clone, Debug)]
427pub enum LockKeys {
428    None,
429    Inline(LockKey),
430    Vec(Vec<LockKey>),
431}
432
433impl LockKeys {
434    pub fn with_capacity(capacity: usize) -> Self {
435        if capacity > 1 {
436            LockKeys::Vec(Vec::with_capacity(capacity))
437        } else {
438            LockKeys::None
439        }
440    }
441
442    pub fn push(&mut self, key: LockKey) {
443        match self {
444            Self::None => *self = LockKeys::Inline(key),
445            Self::Inline(inline) => {
446                *self = LockKeys::Vec(vec![*inline, key]);
447            }
448            Self::Vec(vec) => vec.push(key),
449        }
450    }
451
452    pub fn truncate(&mut self, len: usize) {
453        match self {
454            Self::None => {}
455            Self::Inline(_) => {
456                if len == 0 {
457                    *self = Self::None;
458                }
459            }
460            Self::Vec(vec) => vec.truncate(len),
461        }
462    }
463
464    fn len(&self) -> usize {
465        match self {
466            Self::None => 0,
467            Self::Inline(_) => 1,
468            Self::Vec(vec) => vec.len(),
469        }
470    }
471
472    fn contains(&self, key: &LockKey) -> bool {
473        match self {
474            Self::None => false,
475            Self::Inline(single) => single == key,
476            Self::Vec(vec) => vec.contains(key),
477        }
478    }
479
480    fn sort_unstable(&mut self) {
481        match self {
482            Self::Vec(vec) => vec.sort_unstable(),
483            _ => {}
484        }
485    }
486
487    fn dedup(&mut self) {
488        match self {
489            Self::Vec(vec) => vec.dedup(),
490            _ => {}
491        }
492    }
493
494    fn iter(&self) -> LockKeysIter<'_> {
495        match self {
496            LockKeys::None => LockKeysIter::None,
497            LockKeys::Inline(key) => LockKeysIter::Inline(key),
498            LockKeys::Vec(keys) => LockKeysIter::Vec(keys.into_iter()),
499        }
500    }
501}
502
503enum LockKeysIter<'a> {
504    None,
505    Inline(&'a LockKey),
506    Vec(<&'a Vec<LockKey> as IntoIterator>::IntoIter),
507}
508
509impl<'a> Iterator for LockKeysIter<'a> {
510    type Item = &'a LockKey;
511    fn next(&mut self) -> Option<Self::Item> {
512        match self {
513            Self::None => None,
514            Self::Inline(inline) => {
515                let next = *inline;
516                *self = Self::None;
517                Some(next)
518            }
519            Self::Vec(vec) => vec.next(),
520        }
521    }
522}
523
524impl Default for LockKeys {
525    fn default() -> Self {
526        LockKeys::None
527    }
528}
529
530#[macro_export]
531macro_rules! lock_keys {
532    () => {
533        $crate::object_store::transaction::LockKeys::None
534    };
535    ($lock_key:expr $(,)?) => {
536        $crate::object_store::transaction::LockKeys::Inline($lock_key)
537    };
538    ($($lock_keys:expr),+ $(,)?) => {
539        $crate::object_store::transaction::LockKeys::Vec(vec![$($lock_keys),+])
540    };
541}
542pub use lock_keys;
543
544/// Mutations in a transaction can be associated with an object so that when mutations are applied,
545/// updates can be applied to in-memory structures. For example, we cache object sizes, so when a
546/// size change is applied, we can update the cached object size.
547pub trait AssociatedObject: Send + Sync {
548    fn will_apply_mutation(&self, _mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
549    }
550}
551
552pub enum AssocObj<'a> {
553    None,
554    Borrowed(&'a (dyn AssociatedObject)),
555    Owned(Box<dyn AssociatedObject>),
556}
557
558impl AssocObj<'_> {
559    pub fn map<R, F: FnOnce(&dyn AssociatedObject) -> R>(&self, f: F) -> Option<R> {
560        match self {
561            AssocObj::None => None,
562            AssocObj::Borrowed(ref b) => Some(f(*b)),
563            AssocObj::Owned(ref o) => Some(f(o.as_ref())),
564        }
565    }
566}
567
568pub struct TxnMutation<'a> {
569    // This, at time of writing, is either the object ID of an object store, or the object ID of the
570    // allocator.  In the case of an object mutation, there's another object ID in the mutation
571    // record that would be for the object actually being changed.
572    pub object_id: u64,
573
574    // The actual mutation.  This gets serialized to the journal.
575    pub mutation: Mutation,
576
577    // An optional associated object for the mutation.  During replay, there will always be no
578    // associated object.
579    pub associated_object: AssocObj<'a>,
580}
581
582// We store TxnMutation in a set, and for that, we only use object_id and mutation and not the
583// associated object or checksum.
584impl Ord for TxnMutation<'_> {
585    fn cmp(&self, other: &Self) -> Ordering {
586        self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
587    }
588}
589
590impl PartialOrd for TxnMutation<'_> {
591    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
592        Some(self.cmp(other))
593    }
594}
595
596impl PartialEq for TxnMutation<'_> {
597    fn eq(&self, other: &Self) -> bool {
598        self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
599    }
600}
601
602impl Eq for TxnMutation<'_> {}
603
604impl std::fmt::Debug for TxnMutation<'_> {
605    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
606        f.debug_struct("TxnMutation")
607            .field("object_id", &self.object_id)
608            .field("mutation", &self.mutation)
609            .finish()
610    }
611}
612
613pub enum MetadataReservation {
614    // The state after a transaction has been dropped.
615    None,
616
617    // Metadata space for this transaction is being borrowed from ObjectManager's metadata
618    // reservation.
619    Borrowed,
620
621    // A metadata reservation was made when the transaction was created.
622    Reservation(Reservation),
623
624    // The metadata space is being _held_ within `allocator_reservation`.
625    Hold(u64),
626}
627
628/// A transaction groups mutation records to be committed as a group.
629pub struct Transaction<'a> {
630    txn_guard: TxnGuard<'a>,
631
632    // The mutations that make up this transaction.
633    mutations: BTreeSet<TxnMutation<'a>>,
634
635    // The locks that this transaction currently holds.
636    txn_locks: LockKeys,
637
638    /// If set, an allocator reservation that should be used for allocations.
639    pub allocator_reservation: Option<&'a Reservation>,
640
641    /// The reservation for the metadata for this transaction.
642    pub metadata_reservation: MetadataReservation,
643
644    // Keep track of objects explicitly created by this transaction. No locks are required for them.
645    // Addressed by (owner_object_id, object_id).
646    new_objects: BTreeSet<(u64, u64)>,
647
648    /// Any data checksums which should be evaluated when replaying this transaction.
649    checksums: Vec<(Range<u64>, Vec<Checksum>, bool)>,
650}
651
652impl<'a> Transaction<'a> {
653    /// Creates a new transaction.  `txn_locks` are read locks that can be upgraded to write locks
654    /// at commit time.
655    pub async fn new(
656        txn_guard: TxnGuard<'a>,
657        options: Options<'a>,
658        txn_locks: LockKeys,
659    ) -> Result<Transaction<'a>, Error> {
660        txn_guard.fs().add_transaction(options.skip_journal_checks).await;
661        let fs = txn_guard.fs().clone();
662        let guard = scopeguard::guard((), |_| fs.sub_transaction());
663        let (metadata_reservation, allocator_reservation, hold) =
664            txn_guard.fs().reservation_for_transaction(options).await?;
665
666        let txn_locks = {
667            let lock_manager = txn_guard.fs().lock_manager();
668            let mut write_guard = lock_manager.txn_lock(txn_locks).await;
669            std::mem::take(&mut write_guard.0.lock_keys)
670        };
671        let mut transaction = Transaction {
672            txn_guard,
673            mutations: BTreeSet::new(),
674            txn_locks,
675            allocator_reservation: None,
676            metadata_reservation,
677            new_objects: BTreeSet::new(),
678            checksums: Vec::new(),
679        };
680
681        ScopeGuard::into_inner(guard);
682        hold.map(|h| h.forget()); // Transaction takes ownership from here on.
683        transaction.allocator_reservation = allocator_reservation;
684        Ok(transaction)
685    }
686
687    pub fn txn_guard(&self) -> &TxnGuard<'_> {
688        &self.txn_guard
689    }
690
691    pub fn mutations(&self) -> &BTreeSet<TxnMutation<'a>> {
692        &self.mutations
693    }
694
695    pub fn take_mutations(&mut self) -> BTreeSet<TxnMutation<'a>> {
696        self.new_objects.clear();
697        mem::take(&mut self.mutations)
698    }
699
700    /// Adds a mutation to this transaction.  If the mutation already exists, it is replaced and the
701    /// old mutation is returned.
702    pub fn add(&mut self, object_id: u64, mutation: Mutation) -> Option<Mutation> {
703        self.add_with_object(object_id, mutation, AssocObj::None)
704    }
705
706    /// Removes a mutation that matches `mutation`.
707    pub fn remove(&mut self, object_id: u64, mutation: Mutation) {
708        let txn_mutation = TxnMutation { object_id, mutation, associated_object: AssocObj::None };
709        if self.mutations.remove(&txn_mutation) {
710            if let Mutation::ObjectStore(ObjectStoreMutation {
711                item:
712                    ObjectItem {
713                        key: ObjectKey { object_id: new_object_id, data: ObjectKeyData::Object },
714                        ..
715                    },
716                op: Operation::Insert,
717            }) = txn_mutation.mutation
718            {
719                self.new_objects.remove(&(object_id, new_object_id));
720            }
721        }
722    }
723
724    /// Adds a mutation with an associated object. If the mutation already exists, it is replaced
725    /// and the old mutation is returned.
726    pub fn add_with_object(
727        &mut self,
728        object_id: u64,
729        mutation: Mutation,
730        associated_object: AssocObj<'a>,
731    ) -> Option<Mutation> {
732        assert!(object_id != INVALID_OBJECT_ID);
733        let txn_mutation = TxnMutation { object_id, mutation, associated_object };
734        self.verify_locks(&txn_mutation);
735        self.mutations.replace(txn_mutation).map(|m| m.mutation)
736    }
737
738    pub fn add_checksum(&mut self, range: Range<u64>, checksums: Vec<Checksum>, first_write: bool) {
739        self.checksums.push((range, checksums, first_write));
740    }
741
742    pub fn checksums(&self) -> &[(Range<u64>, Vec<Checksum>, bool)] {
743        &self.checksums
744    }
745
746    pub fn take_checksums(&mut self) -> Vec<(Range<u64>, Vec<Checksum>, bool)> {
747        std::mem::replace(&mut self.checksums, Vec::new())
748    }
749
750    fn verify_locks(&mut self, mutation: &TxnMutation<'_>) {
751        // It was considered to change the locks from Vec to BTreeSet since we'll now be searching
752        // through it, but given the small set that these locks usually comprise, it probably isn't
753        // worth it.
754        if let TxnMutation {
755            mutation:
756                Mutation::ObjectStore { 0: ObjectStoreMutation { item: ObjectItem { key, .. }, op } },
757            object_id: store_object_id,
758            ..
759        } = mutation
760        {
761            match &key.data {
762                ObjectKeyData::Attribute(..) => {
763                    // TODO(https://fxbug.dev/42073914): Check lock requirements.
764                }
765                ObjectKeyData::Child { .. }
766                | ObjectKeyData::EncryptedChild { .. }
767                | ObjectKeyData::CasefoldChild { .. } => {
768                    let id = key.object_id;
769                    if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
770                        && !self.new_objects.contains(&(*store_object_id, id))
771                    {
772                        debug_assert!(
773                            false,
774                            "Not holding required lock for object {id} \
775                                in store {store_object_id}"
776                        );
777                        error!(
778                            "Not holding required lock for object {id} in store \
779                                {store_object_id}"
780                        )
781                    }
782                }
783                ObjectKeyData::GraveyardEntry { .. } => {
784                    // TODO(https://fxbug.dev/42073911): Check lock requirements.
785                }
786                ObjectKeyData::GraveyardAttributeEntry { .. } => {
787                    // TODO(https://fxbug.dev/122974): Check lock requirements.
788                }
789                ObjectKeyData::Keys => {
790                    let id = key.object_id;
791                    if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
792                        && !self.new_objects.contains(&(*store_object_id, id))
793                    {
794                        debug_assert!(
795                            false,
796                            "Not holding required lock for object {id} \
797                                in store {store_object_id}"
798                        );
799                        error!(
800                            "Not holding required lock for object {id} in store \
801                                {store_object_id}"
802                        )
803                    }
804                }
805                ObjectKeyData::Object => match op {
806                    // Insert implies the caller expects no object with which to race
807                    Operation::Insert => {
808                        self.new_objects.insert((*store_object_id, key.object_id));
809                    }
810                    Operation::Merge | Operation::ReplaceOrInsert => {
811                        let id = key.object_id;
812                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
813                            && !self.new_objects.contains(&(*store_object_id, id))
814                        {
815                            debug_assert!(
816                                false,
817                                "Not holding required lock for object {id} \
818                                    in store {store_object_id}"
819                            );
820                            error!(
821                                "Not holding required lock for object {id} in store \
822                                    {store_object_id}"
823                            )
824                        }
825                    }
826                },
827                ObjectKeyData::Project { project_id, property: ProjectProperty::Limit } => {
828                    if !self.txn_locks.contains(&LockKey::ProjectId {
829                        store_object_id: *store_object_id,
830                        project_id: *project_id,
831                    }) {
832                        debug_assert!(
833                            false,
834                            "Not holding required lock for project limit id {project_id} \
835                                in store {store_object_id}"
836                        );
837                        error!(
838                            "Not holding required lock for project limit id {project_id} in \
839                                store {store_object_id}"
840                        )
841                    }
842                }
843                ObjectKeyData::Project { property: ProjectProperty::Usage, .. } => match op {
844                    Operation::Insert | Operation::ReplaceOrInsert => {
845                        panic!(
846                            "Project usage is all handled by merging deltas, no inserts or \
847                                replacements should be used"
848                        );
849                    }
850                    // Merges are all handled like atomic +/- and serialized by the tree locks.
851                    Operation::Merge => {}
852                },
853                ObjectKeyData::ExtendedAttribute { .. } => {
854                    let id = key.object_id;
855                    if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
856                        && !self.new_objects.contains(&(*store_object_id, id))
857                    {
858                        debug_assert!(
859                            false,
860                            "Not holding required lock for object {id} \
861                                in store {store_object_id} while mutating extended attribute"
862                        );
863                        error!(
864                            "Not holding required lock for object {id} in store \
865                                {store_object_id} while mutating extended attribute"
866                        )
867                    }
868                }
869            }
870        }
871    }
872
873    /// Returns true if this transaction has no mutations.
874    pub fn is_empty(&self) -> bool {
875        self.mutations.is_empty()
876    }
877
878    /// Searches for an existing object mutation within the transaction that has the given key and
879    /// returns it if found.
880    pub fn get_object_mutation(
881        &self,
882        store_object_id: u64,
883        key: ObjectKey,
884    ) -> Option<&ObjectStoreMutation> {
885        if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
886            self.mutations.get(&TxnMutation {
887                object_id: store_object_id,
888                mutation: Mutation::insert_object(key, ObjectValue::None),
889                associated_object: AssocObj::None,
890            })
891        {
892            Some(mutation)
893        } else {
894            None
895        }
896    }
897
898    /// Commits a transaction.  If successful, returns the journal offset of the transaction.
899    pub async fn commit(mut self) -> Result<u64, Error> {
900        debug!(txn:? = &self; "Commit");
901        self.txn_guard.fs().clone().commit_transaction(&mut self, &mut |_| {}).await
902    }
903
904    /// Commits and then runs the callback whilst locks are held.  The callback accepts a single
905    /// parameter which is the journal offset of the transaction.
906    pub async fn commit_with_callback<R: Send>(
907        mut self,
908        f: impl FnOnce(u64) -> R + Send,
909    ) -> Result<R, Error> {
910        debug!(txn:? = &self; "Commit");
911        // It's not possible to pass an FnOnce via a trait without boxing it, but we don't want to
912        // do that (for performance reasons), hence the reason for the following.
913        let mut f = Some(f);
914        let mut result = None;
915        self.txn_guard
916            .fs()
917            .clone()
918            .commit_transaction(&mut self, &mut |offset| {
919                result = Some(f.take().unwrap()(offset));
920            })
921            .await?;
922        Ok(result.unwrap())
923    }
924
925    /// Commits the transaction, but allows the transaction to be used again.  The locks are not
926    /// dropped (but transaction locks will get downgraded to read locks).
927    pub async fn commit_and_continue(&mut self) -> Result<(), Error> {
928        debug!(txn:? = self; "Commit");
929        self.txn_guard.fs().clone().commit_transaction(self, &mut |_| {}).await?;
930        assert!(self.mutations.is_empty());
931        self.txn_guard.fs().lock_manager().downgrade_locks(&self.txn_locks);
932        Ok(())
933    }
934}
935
936impl Drop for Transaction<'_> {
937    fn drop(&mut self) {
938        // Call the filesystem implementation of drop_transaction which should, as a minimum, call
939        // LockManager's drop_transaction to ensure the locks are released.
940        debug!(txn:? = &self; "Drop");
941        self.txn_guard.fs().clone().drop_transaction(self);
942    }
943}
944
945impl std::fmt::Debug for Transaction<'_> {
946    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
947        f.debug_struct("Transaction")
948            .field("mutations", &self.mutations)
949            .field("txn_locks", &self.txn_locks)
950            .field("reservation", &self.allocator_reservation)
951            .finish()
952    }
953}
954
955pub enum BorrowedOrOwned<'a, T> {
956    Borrowed(&'a T),
957    Owned(T),
958}
959
960impl<T> Deref for BorrowedOrOwned<'_, T> {
961    type Target = T;
962
963    fn deref(&self) -> &Self::Target {
964        match self {
965            BorrowedOrOwned::Borrowed(b) => b,
966            BorrowedOrOwned::Owned(o) => &o,
967        }
968    }
969}
970
971impl<'a, T> From<&'a T> for BorrowedOrOwned<'a, T> {
972    fn from(value: &'a T) -> Self {
973        BorrowedOrOwned::Borrowed(value)
974    }
975}
976
977impl<T> From<T> for BorrowedOrOwned<'_, T> {
978    fn from(value: T) -> Self {
979        BorrowedOrOwned::Owned(value)
980    }
981}
982
983/// LockManager holds the locks that transactions might have taken.  A TransactionManager
984/// implementation would typically have one of these.
985///
986/// Three different kinds of locks are supported.  There are read locks and write locks, which are
987/// as one would expect.  The third kind of lock is a _transaction_ lock (which is also known as an
988/// upgradeable read lock).  When first acquired, these block other writes (including other
989/// transaction locks) but do not block reads.  When it is time to commit a transaction, these locks
990/// are upgraded to full write locks (without ever dropping the lock) and then dropped after
991/// committing (unless commit_and_continue is used).  This way, reads are only blocked for the
992/// shortest possible time.  It follows that write locks should be used sparingly.  Locks are
993/// granted in order with one exception: when a lock is in the initial _transaction_ lock state
994/// (LockState::Locked), all read locks are allowed even if there are other tasks waiting for the
995/// lock.  The reason for this is because we allow read locks to be taken by tasks that have taken a
996/// _transaction_ lock (i.e. recursion is allowed).  In other cases, such as when a writer is
997/// waiting and there are only readers, readers will queue up behind the writer.
998///
999/// To summarize:
1000///
1001/// +-------------------------+-----------------+----------------+------------------+
1002/// |                         | While read_lock | While txn_lock | While write_lock |
1003/// |                         | is held         | is held        | is held          |
1004/// +-------------------------+-----------------+----------------+------------------+
1005/// | Can acquire read_lock?  | true            | true           | false            |
1006/// +-------------------------+-----------------+----------------+------------------+
1007/// | Can acquire txn_lock?   | true            | false          | false            |
1008/// +-------------------------+-----------------+----------------+------------------+
1009/// | Can acquire write_lock? | false           | false          | false            |
1010/// +-------------------------+-----------------+----------------+------------------+
1011pub struct LockManager {
1012    locks: Mutex<Locks>,
1013}
1014
1015struct Locks {
1016    keys: HashMap<LockKey, LockEntry>,
1017}
1018
1019impl Locks {
1020    fn drop_lock(&mut self, key: LockKey, state: LockState) {
1021        if let Entry::Occupied(mut occupied) = self.keys.entry(key) {
1022            let entry = occupied.get_mut();
1023            let wake = match state {
1024                LockState::ReadLock => {
1025                    entry.read_count -= 1;
1026                    entry.read_count == 0
1027                }
1028                // drop_write_locks currently depends on us treating Locked and WriteLock the same.
1029                LockState::Locked | LockState::WriteLock => {
1030                    entry.state = LockState::ReadLock;
1031                    true
1032                }
1033            };
1034            if wake {
1035                // SAFETY: The lock in `LockManager::locks` is held.
1036                unsafe {
1037                    entry.wake();
1038                }
1039                if entry.can_remove() {
1040                    occupied.remove_entry();
1041                }
1042            }
1043        } else {
1044            unreachable!();
1045        }
1046    }
1047
1048    fn drop_read_locks(&mut self, lock_keys: LockKeys) {
1049        for lock in lock_keys.iter() {
1050            self.drop_lock(*lock, LockState::ReadLock);
1051        }
1052    }
1053
1054    fn drop_write_locks(&mut self, lock_keys: LockKeys) {
1055        for lock in lock_keys.iter() {
1056            // This is a bit hacky, but this works for locks in either the Locked or WriteLock
1057            // states.
1058            self.drop_lock(*lock, LockState::WriteLock);
1059        }
1060    }
1061
1062    // Downgrades locks from WriteLock to Locked.
1063    fn downgrade_locks(&mut self, lock_keys: &LockKeys) {
1064        for lock in lock_keys.iter() {
1065            // SAFETY: The lock in `LockManager::locks` is held.
1066            unsafe {
1067                self.keys.get_mut(lock).unwrap().downgrade_lock();
1068            }
1069        }
1070    }
1071}
1072
1073#[derive(Debug)]
1074struct LockEntry {
1075    // In the states that allow readers (ReadLock, Locked), this count can be non-zero
1076    // to indicate the number of active readers.
1077    read_count: u64,
1078
1079    // The state of the lock (see below).
1080    state: LockState,
1081
1082    // A doubly-linked list of wakers that should be woken when they have been granted the lock.
1083    // New wakers are usually chained on to tail, with the exception being the case where a lock in
1084    // state Locked is to be upgraded to WriteLock, but can't because there are readers.  It might
1085    // be possible to use intrusive-collections in the future.
1086    head: *const LockWaker,
1087    tail: *const LockWaker,
1088}
1089
1090unsafe impl Send for LockEntry {}
1091
1092// Represents a node in the waker list.  It is only safe to access the members wrapped by UnsafeCell
1093// when LockManager's `locks` member is locked.
1094struct LockWaker {
1095    // The next and previous pointers in the doubly-linked list.
1096    next: UnsafeCell<*const LockWaker>,
1097    prev: UnsafeCell<*const LockWaker>,
1098
1099    // Holds the lock key for this waker.  This is required so that we can find the associated
1100    // `LockEntry`.
1101    key: LockKey,
1102
1103    // The underlying waker that should be used to wake the task.
1104    waker: UnsafeCell<WakerState>,
1105
1106    // The target state for this waker.
1107    target_state: LockState,
1108
1109    // True if this is an upgrade.
1110    is_upgrade: bool,
1111
1112    // We need to be pinned because these form part of the linked list.
1113    _pin: PhantomPinned,
1114}
1115
1116enum WakerState {
1117    // This is the initial state before the waker has been first polled.
1118    Pending,
1119
1120    // Once polled, this contains the actual waker.
1121    Registered(Waker),
1122
1123    // The waker has been woken and has been granted the lock.
1124    Woken,
1125}
1126
1127impl WakerState {
1128    fn is_woken(&self) -> bool {
1129        matches!(self, WakerState::Woken)
1130    }
1131}
1132
1133unsafe impl Send for LockWaker {}
1134unsafe impl Sync for LockWaker {}
1135
1136impl LockWaker {
1137    // Waits for the waker to be woken.
1138    async fn wait(&self, manager: &LockManager) {
1139        // We must guard against the future being dropped.
1140        let waker_guard = scopeguard::guard((), |_| {
1141            let mut locks = manager.locks.lock();
1142            // SAFETY: We've acquired the lock.
1143            unsafe {
1144                if (*self.waker.get()).is_woken() {
1145                    // We were woken, but didn't actually run, so we must drop the lock.
1146                    if self.is_upgrade {
1147                        locks.keys.get_mut(&self.key).unwrap().downgrade_lock();
1148                    } else {
1149                        locks.drop_lock(self.key, self.target_state);
1150                    }
1151                } else {
1152                    // We haven't been woken but we've been dropped so we must remove ourself from
1153                    // the waker list.
1154                    locks.keys.get_mut(&self.key).unwrap().remove_waker(self);
1155                }
1156            }
1157        });
1158
1159        poll_fn(|cx| {
1160            let _locks = manager.locks.lock();
1161            // SAFETY: We've acquired the lock.
1162            unsafe {
1163                if (*self.waker.get()).is_woken() {
1164                    Poll::Ready(())
1165                } else {
1166                    *self.waker.get() = WakerState::Registered(cx.waker().clone());
1167                    Poll::Pending
1168                }
1169            }
1170        })
1171        .await;
1172
1173        ScopeGuard::into_inner(waker_guard);
1174    }
1175}
1176
1177#[derive(Copy, Clone, Debug, PartialEq)]
1178enum LockState {
1179    // In this state, there are only readers.
1180    ReadLock,
1181
1182    // This state is used for transactions to lock other writers (including other transactions), but
1183    // it still allows readers.
1184    Locked,
1185
1186    // A writer has exclusive access; all other readers and writers are blocked.
1187    WriteLock,
1188}
1189
1190impl LockManager {
1191    pub fn new() -> Self {
1192        LockManager { locks: Mutex::new(Locks { keys: HashMap::default() }) }
1193    }
1194
1195    /// Acquires the locks.  It is the caller's responsibility to ensure that drop_transaction is
1196    /// called when a transaction is dropped i.e. the filesystem's drop_transaction method should
1197    /// call LockManager's drop_transaction method.
1198    pub async fn txn_lock<'a>(&'a self, lock_keys: LockKeys) -> TransactionLocks<'a> {
1199        TransactionLocks(
1200            debug_assert_not_too_long!(self.lock(lock_keys, LockState::Locked)).right().unwrap(),
1201        )
1202    }
1203
1204    // `state` indicates the kind of lock required.  ReadLock means acquire a read lock.  Locked
1205    // means lock other writers, but still allow readers.  WriteLock means acquire a write lock.
1206    async fn lock<'a>(
1207        &'a self,
1208        mut lock_keys: LockKeys,
1209        target_state: LockState,
1210    ) -> Either<ReadGuard<'a>, WriteGuard<'a>> {
1211        let mut guard = match &target_state {
1212            LockState::ReadLock => Left(ReadGuard {
1213                manager: self.into(),
1214                lock_keys: LockKeys::with_capacity(lock_keys.len()),
1215            }),
1216            LockState::Locked | LockState::WriteLock => Right(WriteGuard {
1217                manager: self.into(),
1218                lock_keys: LockKeys::with_capacity(lock_keys.len()),
1219            }),
1220        };
1221        let guard_keys = match &mut guard {
1222            Left(g) => &mut g.lock_keys,
1223            Right(g) => &mut g.lock_keys,
1224        };
1225        lock_keys.sort_unstable();
1226        lock_keys.dedup();
1227        for lock in lock_keys.iter() {
1228            let lock_waker = None;
1229            pin_mut!(lock_waker);
1230            {
1231                let mut locks = self.locks.lock();
1232                match locks.keys.entry(*lock) {
1233                    Entry::Vacant(vacant) => {
1234                        vacant.insert(LockEntry {
1235                            read_count: if let LockState::ReadLock = target_state {
1236                                guard_keys.push(*lock);
1237                                1
1238                            } else {
1239                                guard_keys.push(*lock);
1240                                0
1241                            },
1242                            state: target_state,
1243                            head: std::ptr::null(),
1244                            tail: std::ptr::null(),
1245                        });
1246                    }
1247                    Entry::Occupied(mut occupied) => {
1248                        let entry = occupied.get_mut();
1249                        // SAFETY: We've acquired the lock.
1250                        if unsafe { entry.is_allowed(target_state, entry.head.is_null()) } {
1251                            if let LockState::ReadLock = target_state {
1252                                entry.read_count += 1;
1253                                guard_keys.push(*lock);
1254                            } else {
1255                                entry.state = target_state;
1256                                guard_keys.push(*lock);
1257                            }
1258                        } else {
1259                            // Initialise a waker and push it on the tail of the list.
1260                            // SAFETY: `lock_waker` isn't used prior to this point.
1261                            unsafe {
1262                                *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1263                                    next: UnsafeCell::new(std::ptr::null()),
1264                                    prev: UnsafeCell::new(entry.tail),
1265                                    key: *lock,
1266                                    waker: UnsafeCell::new(WakerState::Pending),
1267                                    target_state: target_state,
1268                                    is_upgrade: false,
1269                                    _pin: PhantomPinned,
1270                                });
1271                            }
1272                            let waker = (*lock_waker).as_ref().unwrap();
1273                            if entry.tail.is_null() {
1274                                entry.head = waker;
1275                            } else {
1276                                // SAFETY: We've acquired the lock.
1277                                unsafe {
1278                                    *(*entry.tail).next.get() = waker;
1279                                }
1280                            }
1281                            entry.tail = waker;
1282                        }
1283                    }
1284                }
1285            }
1286            if let Some(waker) = &*lock_waker {
1287                waker.wait(self).await;
1288                guard_keys.push(*lock);
1289            }
1290        }
1291        guard
1292    }
1293
1294    /// This should be called by the filesystem's drop_transaction implementation.
1295    pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
1296        let mut locks = self.locks.lock();
1297        locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks));
1298    }
1299
1300    /// Prepares to commit by waiting for readers to finish.
1301    pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
1302        self.commit_prepare_keys(&transaction.txn_locks).await;
1303    }
1304
1305    async fn commit_prepare_keys(&self, lock_keys: &LockKeys) {
1306        for lock in lock_keys.iter() {
1307            let lock_waker = None;
1308            pin_mut!(lock_waker);
1309            {
1310                let mut locks = self.locks.lock();
1311                let entry = locks.keys.get_mut(lock).unwrap();
1312                assert_eq!(entry.state, LockState::Locked);
1313
1314                if entry.read_count == 0 {
1315                    entry.state = LockState::WriteLock;
1316                } else {
1317                    // Initialise a waker and push it on the head of the list.
1318                    // SAFETY: `lock_waker` isn't used prior to this point.
1319                    unsafe {
1320                        *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1321                            next: UnsafeCell::new(entry.head),
1322                            prev: UnsafeCell::new(std::ptr::null()),
1323                            key: *lock,
1324                            waker: UnsafeCell::new(WakerState::Pending),
1325                            target_state: LockState::WriteLock,
1326                            is_upgrade: true,
1327                            _pin: PhantomPinned,
1328                        });
1329                    }
1330                    let waker = (*lock_waker).as_ref().unwrap();
1331                    if entry.head.is_null() {
1332                        entry.tail = (*lock_waker).as_ref().unwrap();
1333                    } else {
1334                        // SAFETY: We've acquired the lock.
1335                        unsafe {
1336                            *(*entry.head).prev.get() = waker;
1337                        }
1338                    }
1339                    entry.head = waker;
1340                }
1341            }
1342
1343            if let Some(waker) = &*lock_waker {
1344                waker.wait(self).await;
1345            }
1346        }
1347    }
1348
1349    /// Acquires a read lock for the given keys.  Read locks are only blocked whilst a transaction
1350    /// is being committed for the same locks.  They are only necessary where consistency is
1351    /// required between different mutations within a transaction.  For example, a write might
1352    /// change the size and extents for an object, in which case a read lock is required so that
1353    /// observed size and extents are seen together or not at all.
1354    pub async fn read_lock<'a>(&'a self, lock_keys: LockKeys) -> ReadGuard<'a> {
1355        debug_assert_not_too_long!(self.lock(lock_keys, LockState::ReadLock)).left().unwrap()
1356    }
1357
1358    /// Acquires a write lock for the given keys.  Write locks provide exclusive access to the
1359    /// requested lock keys.
1360    pub async fn write_lock<'a>(&'a self, lock_keys: LockKeys) -> WriteGuard<'a> {
1361        debug_assert_not_too_long!(self.lock(lock_keys, LockState::WriteLock)).right().unwrap()
1362    }
1363
1364    /// Downgrades locks from the WriteLock state to Locked state.  This will panic if the locks are
1365    /// not in the WriteLock state.
1366    pub fn downgrade_locks(&self, lock_keys: &LockKeys) {
1367        self.locks.lock().downgrade_locks(lock_keys);
1368    }
1369}
1370
1371// These unsafe functions require that `locks` in LockManager is locked.
1372impl LockEntry {
1373    unsafe fn wake(&mut self) {
1374        // If the lock's state is WriteLock, or there's nothing waiting, return early.
1375        if self.head.is_null() || self.state == LockState::WriteLock {
1376            return;
1377        }
1378
1379        let waker = &*self.head;
1380
1381        if waker.is_upgrade {
1382            if self.read_count > 0 {
1383                return;
1384            }
1385        } else if !self.is_allowed(waker.target_state, true) {
1386            return;
1387        }
1388
1389        self.pop_and_wake();
1390
1391        // If the waker was a write lock, we can't wake any more up, but otherwise, we can keep
1392        // waking up readers.
1393        if waker.target_state == LockState::WriteLock {
1394            return;
1395        }
1396
1397        while !self.head.is_null() && (*self.head).target_state == LockState::ReadLock {
1398            self.pop_and_wake();
1399        }
1400    }
1401
1402    unsafe fn pop_and_wake(&mut self) {
1403        let waker = &*self.head;
1404
1405        // Pop the waker.
1406        self.head = *waker.next.get();
1407        if self.head.is_null() {
1408            self.tail = std::ptr::null()
1409        } else {
1410            *(*self.head).prev.get() = std::ptr::null();
1411        }
1412
1413        // Adjust our state accordingly.
1414        if waker.target_state == LockState::ReadLock {
1415            self.read_count += 1;
1416        } else {
1417            self.state = waker.target_state;
1418        }
1419
1420        // Now wake the task.
1421        if let WakerState::Registered(waker) =
1422            std::mem::replace(&mut *waker.waker.get(), WakerState::Woken)
1423        {
1424            waker.wake();
1425        }
1426    }
1427
1428    fn can_remove(&self) -> bool {
1429        self.state == LockState::ReadLock && self.read_count == 0
1430    }
1431
1432    unsafe fn remove_waker(&mut self, waker: &LockWaker) {
1433        let is_first = (*waker.prev.get()).is_null();
1434        if is_first {
1435            self.head = *waker.next.get();
1436        } else {
1437            *(**waker.prev.get()).next.get() = *waker.next.get();
1438        }
1439        if (*waker.next.get()).is_null() {
1440            self.tail = *waker.prev.get();
1441        } else {
1442            *(**waker.next.get()).prev.get() = *waker.prev.get();
1443        }
1444        if is_first {
1445            // We must call wake in case we erased a pending write lock and readers can now proceed.
1446            self.wake();
1447        }
1448    }
1449
1450    // Returns whether or not a lock with given `target_state` can proceed.  `is_head` should be
1451    // true if this is something at the head of the waker list (or the waker list is empty) and
1452    // false if there are other items on the waker list that are prior.
1453    unsafe fn is_allowed(&self, target_state: LockState, is_head: bool) -> bool {
1454        match self.state {
1455            LockState::ReadLock => {
1456                // Allow ReadLock and Locked so long as nothing else is waiting.
1457                (self.read_count == 0
1458                    || target_state == LockState::Locked
1459                    || target_state == LockState::ReadLock)
1460                    && is_head
1461            }
1462            LockState::Locked => {
1463                // Always allow reads unless there's an upgrade waiting.  We have to
1464                // always allow reads in this state because tasks that have locks in
1465                // the Locked state can later try and acquire ReadLock.
1466                target_state == LockState::ReadLock && (is_head || !(*self.head).is_upgrade)
1467            }
1468            LockState::WriteLock => false,
1469        }
1470    }
1471
1472    unsafe fn downgrade_lock(&mut self) {
1473        assert_eq!(std::mem::replace(&mut self.state, LockState::Locked), LockState::WriteLock);
1474        self.wake();
1475    }
1476}
1477
1478#[must_use]
1479pub struct ReadGuard<'a> {
1480    manager: LockManagerRef<'a>,
1481    lock_keys: LockKeys,
1482}
1483
1484impl ReadGuard<'_> {
1485    pub fn fs(&self) -> Option<&Arc<FxFilesystem>> {
1486        if let LockManagerRef::Owned(fs) = &self.manager {
1487            Some(fs)
1488        } else {
1489            None
1490        }
1491    }
1492
1493    pub fn into_owned(mut self, fs: Arc<FxFilesystem>) -> ReadGuard<'static> {
1494        ReadGuard {
1495            manager: LockManagerRef::Owned(fs),
1496            lock_keys: std::mem::replace(&mut self.lock_keys, LockKeys::None),
1497        }
1498    }
1499}
1500
1501impl Drop for ReadGuard<'_> {
1502    fn drop(&mut self) {
1503        let mut locks = self.manager.locks.lock();
1504        locks.drop_read_locks(std::mem::take(&mut self.lock_keys));
1505    }
1506}
1507
1508impl fmt::Debug for ReadGuard<'_> {
1509    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1510        f.debug_struct("ReadGuard")
1511            .field("manager", &(&self.manager as *const _))
1512            .field("lock_keys", &self.lock_keys)
1513            .finish()
1514    }
1515}
1516
1517#[must_use]
1518pub struct WriteGuard<'a> {
1519    manager: LockManagerRef<'a>,
1520    lock_keys: LockKeys,
1521}
1522
1523impl Drop for WriteGuard<'_> {
1524    fn drop(&mut self) {
1525        let mut locks = self.manager.locks.lock();
1526        locks.drop_write_locks(std::mem::take(&mut self.lock_keys));
1527    }
1528}
1529
1530impl fmt::Debug for WriteGuard<'_> {
1531    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1532        f.debug_struct("WriteGuard")
1533            .field("manager", &(&self.manager as *const _))
1534            .field("lock_keys", &self.lock_keys)
1535            .finish()
1536    }
1537}
1538
1539enum LockManagerRef<'a> {
1540    Borrowed(&'a LockManager),
1541    Owned(Arc<FxFilesystem>),
1542}
1543
1544impl Deref for LockManagerRef<'_> {
1545    type Target = LockManager;
1546
1547    fn deref(&self) -> &Self::Target {
1548        match self {
1549            LockManagerRef::Borrowed(m) => m,
1550            LockManagerRef::Owned(f) => f.lock_manager(),
1551        }
1552    }
1553}
1554
1555impl<'a> From<&'a LockManager> for LockManagerRef<'a> {
1556    fn from(value: &'a LockManager) -> Self {
1557        LockManagerRef::Borrowed(value)
1558    }
1559}
1560
1561#[cfg(test)]
1562mod tests {
1563    use super::{LockKey, LockKeys, LockManager, LockState, Mutation, Options};
1564    use crate::filesystem::FxFilesystem;
1565    use fuchsia_async as fasync;
1566    use fuchsia_sync::Mutex;
1567    use futures::channel::oneshot::channel;
1568    use futures::future::FutureExt;
1569    use futures::stream::FuturesUnordered;
1570    use futures::{join, pin_mut, StreamExt};
1571    use std::task::Poll;
1572    use std::time::Duration;
1573    use storage_device::fake_device::FakeDevice;
1574    use storage_device::DeviceHolder;
1575
1576    #[fuchsia::test]
1577    async fn test_simple() {
1578        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1579        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1580        let mut t = fs
1581            .clone()
1582            .new_transaction(lock_keys![], Options::default())
1583            .await
1584            .expect("new_transaction failed");
1585        t.add(1, Mutation::BeginFlush);
1586        assert!(!t.is_empty());
1587    }
1588
1589    #[fuchsia::test]
1590    async fn test_locks() {
1591        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1592        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1593        let (send1, recv1) = channel();
1594        let (send2, recv2) = channel();
1595        let (send3, recv3) = channel();
1596        let done = Mutex::new(false);
1597        let mut futures = FuturesUnordered::new();
1598        futures.push(
1599            async {
1600                let _t = fs
1601                    .clone()
1602                    .new_transaction(
1603                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1604                        Options::default(),
1605                    )
1606                    .await
1607                    .expect("new_transaction failed");
1608                send1.send(()).unwrap(); // Tell the next future to continue.
1609                send3.send(()).unwrap(); // Tell the last future to continue.
1610                recv2.await.unwrap();
1611                // This is a halting problem so all we can do is sleep.
1612                fasync::Timer::new(Duration::from_millis(100)).await;
1613                assert!(!*done.lock());
1614            }
1615            .boxed(),
1616        );
1617        futures.push(
1618            async {
1619                recv1.await.unwrap();
1620                // This should not block since it is a different key.
1621                let _t = fs
1622                    .clone()
1623                    .new_transaction(
1624                        lock_keys![LockKey::object_attribute(2, 2, 3)],
1625                        Options::default(),
1626                    )
1627                    .await
1628                    .expect("new_transaction failed");
1629                // Tell the first future to continue.
1630                send2.send(()).unwrap();
1631            }
1632            .boxed(),
1633        );
1634        futures.push(
1635            async {
1636                // This should block until the first future has completed.
1637                recv3.await.unwrap();
1638                let _t = fs
1639                    .clone()
1640                    .new_transaction(
1641                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1642                        Options::default(),
1643                    )
1644                    .await;
1645                *done.lock() = true;
1646            }
1647            .boxed(),
1648        );
1649        while let Some(()) = futures.next().await {}
1650    }
1651
1652    #[fuchsia::test]
1653    async fn test_read_lock_after_write_lock() {
1654        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1655        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1656        let (send1, recv1) = channel();
1657        let (send2, recv2) = channel();
1658        let done = Mutex::new(false);
1659        join!(
1660            async {
1661                let t = fs
1662                    .clone()
1663                    .new_transaction(
1664                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1665                        Options::default(),
1666                    )
1667                    .await
1668                    .expect("new_transaction failed");
1669                send1.send(()).unwrap(); // Tell the next future to continue.
1670                recv2.await.unwrap();
1671                t.commit().await.expect("commit failed");
1672                *done.lock() = true;
1673            },
1674            async {
1675                recv1.await.unwrap();
1676                // Reads should not be blocked until the transaction is committed.
1677                let _guard = fs
1678                    .lock_manager()
1679                    .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1680                    .await;
1681                // Tell the first future to continue.
1682                send2.send(()).unwrap();
1683                // It shouldn't proceed until we release our read lock, but it's a halting
1684                // problem, so sleep.
1685                fasync::Timer::new(Duration::from_millis(100)).await;
1686                assert!(!*done.lock());
1687            },
1688        );
1689    }
1690
1691    #[fuchsia::test]
1692    async fn test_write_lock_after_read_lock() {
1693        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1694        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1695        let (send1, recv1) = channel();
1696        let (send2, recv2) = channel();
1697        let done = Mutex::new(false);
1698        join!(
1699            async {
1700                // Reads should not be blocked until the transaction is committed.
1701                let _guard = fs
1702                    .lock_manager()
1703                    .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1704                    .await;
1705                // Tell the next future to continue and then wait.
1706                send1.send(()).unwrap();
1707                recv2.await.unwrap();
1708                // It shouldn't proceed until we release our read lock, but it's a halting
1709                // problem, so sleep.
1710                fasync::Timer::new(Duration::from_millis(100)).await;
1711                assert!(!*done.lock());
1712            },
1713            async {
1714                recv1.await.unwrap();
1715                let t = fs
1716                    .clone()
1717                    .new_transaction(
1718                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1719                        Options::default(),
1720                    )
1721                    .await
1722                    .expect("new_transaction failed");
1723                send2.send(()).unwrap(); // Tell the first future to continue;
1724                t.commit().await.expect("commit failed");
1725                *done.lock() = true;
1726            },
1727        );
1728    }
1729
1730    #[fuchsia::test]
1731    async fn test_drop_uncommitted_transaction() {
1732        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1733        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1734        let key = lock_keys![LockKey::object(1, 1)];
1735
1736        // Dropping while there's a reader.
1737        {
1738            let _write_lock = fs
1739                .clone()
1740                .new_transaction(key.clone(), Options::default())
1741                .await
1742                .expect("new_transaction failed");
1743            let _read_lock = fs.lock_manager().read_lock(key.clone()).await;
1744        }
1745        // Dropping while there's no reader.
1746        {
1747            let _write_lock = fs
1748                .clone()
1749                .new_transaction(key.clone(), Options::default())
1750                .await
1751                .expect("new_transaction failed");
1752        }
1753        // Make sure we can take the lock again (i.e. it was actually released).
1754        fs.clone()
1755            .new_transaction(key.clone(), Options::default())
1756            .await
1757            .expect("new_transaction failed");
1758    }
1759
1760    #[fuchsia::test]
1761    async fn test_drop_waiting_write_lock() {
1762        let manager = LockManager::new();
1763        let keys = lock_keys![LockKey::object(1, 1)];
1764        {
1765            let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1766            if let Poll::Ready(_) =
1767                futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1768            {
1769                assert!(false);
1770            }
1771        }
1772        let _ = manager.lock(keys, LockState::WriteLock).await;
1773    }
1774
1775    #[fuchsia::test]
1776    async fn test_write_lock_blocks_everything() {
1777        let manager = LockManager::new();
1778        let keys = lock_keys![LockKey::object(1, 1)];
1779        {
1780            let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1781            if let Poll::Ready(_) =
1782                futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1783            {
1784                assert!(false);
1785            }
1786            if let Poll::Ready(_) =
1787                futures::poll!(manager.lock(keys.clone(), LockState::ReadLock).boxed())
1788            {
1789                assert!(false);
1790            }
1791        }
1792        {
1793            let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1794        }
1795        {
1796            let _guard = manager.lock(keys, LockState::ReadLock).await;
1797        }
1798    }
1799
1800    #[fuchsia::test]
1801    async fn test_downgrade_locks() {
1802        let manager = LockManager::new();
1803        let keys = lock_keys![LockKey::object(1, 1)];
1804        let _guard = manager.txn_lock(keys.clone()).await;
1805        manager.commit_prepare_keys(&keys).await;
1806
1807        // Use FuturesUnordered so that we can check that the waker is woken.
1808        let mut read_lock: FuturesUnordered<_> =
1809            std::iter::once(manager.read_lock(keys.clone())).collect();
1810
1811        // Trying to acquire a read lock now should be blocked.
1812        assert!(futures::poll!(read_lock.next()).is_pending());
1813
1814        manager.downgrade_locks(&keys);
1815
1816        // After downgrading, it should be possible to take a read lock.
1817        assert!(futures::poll!(read_lock.next()).is_ready());
1818    }
1819
1820    #[fuchsia::test]
1821    async fn test_dropped_write_lock_wakes() {
1822        let manager = LockManager::new();
1823        let keys = lock_keys![LockKey::object(1, 1)];
1824        let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1825        let mut read_lock = FuturesUnordered::new();
1826        read_lock.push(manager.lock(keys.clone(), LockState::ReadLock));
1827
1828        {
1829            let write_lock = manager.lock(keys, LockState::WriteLock);
1830            pin_mut!(write_lock);
1831
1832            // The write lock should be blocked because of the read lock.
1833            assert!(futures::poll!(write_lock).is_pending());
1834
1835            // Another read lock should be blocked because of the write lock.
1836            assert!(futures::poll!(read_lock.next()).is_pending());
1837        }
1838
1839        // Dropping the write lock should allow the read lock to proceed.
1840        assert!(futures::poll!(read_lock.next()).is_ready());
1841    }
1842
1843    #[fuchsia::test]
1844    async fn test_drop_upgrade() {
1845        let manager = LockManager::new();
1846        let keys = lock_keys![LockKey::object(1, 1)];
1847        let _guard = manager.lock(keys.clone(), LockState::Locked).await;
1848
1849        {
1850            let commit_prepare = manager.commit_prepare_keys(&keys);
1851            pin_mut!(commit_prepare);
1852            let _read_guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1853            assert!(futures::poll!(commit_prepare).is_pending());
1854
1855            // Now we test dropping read_guard which should wake commit_prepare and
1856            // then dropping commit_prepare.
1857        }
1858
1859        // We should be able to still commit_prepare.
1860        manager.commit_prepare_keys(&keys).await;
1861    }
1862
1863    #[fasync::run_singlethreaded(test)]
1864    async fn test_woken_upgrade_blocks_reads() {
1865        let manager = LockManager::new();
1866        let keys = lock_keys![LockKey::object(1, 1)];
1867        // Start with a transaction lock.
1868        let guard = manager.lock(keys.clone(), LockState::Locked).await;
1869
1870        // Take a read lock.
1871        let read1 = manager.lock(keys.clone(), LockState::ReadLock).await;
1872
1873        // Try and upgrade the transaction lock, which should not be possible because of the read.
1874        let commit_prepare = manager.commit_prepare_keys(&keys);
1875        pin_mut!(commit_prepare);
1876        assert!(futures::poll!(commit_prepare.as_mut()).is_pending());
1877
1878        // Taking another read should also be blocked.
1879        let read2 = manager.lock(keys.clone(), LockState::ReadLock);
1880        pin_mut!(read2);
1881        assert!(futures::poll!(read2.as_mut()).is_pending());
1882
1883        // Drop the first read and the upgrade should complete.
1884        std::mem::drop(read1);
1885        assert!(futures::poll!(commit_prepare).is_ready());
1886
1887        // But the second read should still be blocked.
1888        assert!(futures::poll!(read2.as_mut()).is_pending());
1889
1890        // If we drop the write lock now, the read should be unblocked.
1891        std::mem::drop(guard);
1892        assert!(futures::poll!(read2).is_ready());
1893    }
1894
1895    static LOCK_KEY_1: LockKey = LockKey::flush(1);
1896    static LOCK_KEY_2: LockKey = LockKey::flush(2);
1897    static LOCK_KEY_3: LockKey = LockKey::flush(3);
1898
1899    // The keys, storage method, and capacity must all match.
1900    fn assert_lock_keys_equal(value: &LockKeys, expected: &LockKeys) {
1901        match (value, expected) {
1902            (LockKeys::None, LockKeys::None) => {}
1903            (LockKeys::Inline(key1), LockKeys::Inline(key2)) => {
1904                if key1 != key2 {
1905                    panic!("{key1:?} != {key2:?}");
1906                }
1907            }
1908            (LockKeys::Vec(vec1), LockKeys::Vec(vec2)) => {
1909                if vec1 != vec2 {
1910                    panic!("{vec1:?} != {vec2:?}");
1911                }
1912                if vec1.capacity() != vec2.capacity() {
1913                    panic!(
1914                        "LockKeys have different capacity: {} != {}",
1915                        vec1.capacity(),
1916                        vec2.capacity()
1917                    );
1918                }
1919            }
1920            (_, _) => panic!("{value:?} != {expected:?}"),
1921        }
1922    }
1923
1924    // Only the keys must match. Storage method and capacity don't matter.
1925    fn assert_lock_keys_equivalent(value: &LockKeys, expected: &LockKeys) {
1926        let value: Vec<_> = value.iter().collect();
1927        let expected: Vec<_> = expected.iter().collect();
1928        assert_eq!(value, expected);
1929    }
1930
1931    #[test]
1932    fn test_lock_keys_macro() {
1933        assert_lock_keys_equal(&lock_keys![], &LockKeys::None);
1934        assert_lock_keys_equal(&lock_keys![LOCK_KEY_1], &LockKeys::Inline(LOCK_KEY_1));
1935        assert_lock_keys_equal(
1936            &lock_keys![LOCK_KEY_1, LOCK_KEY_2],
1937            &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]),
1938        );
1939    }
1940
1941    #[test]
1942    fn test_lock_keys_with_capacity() {
1943        assert_lock_keys_equal(&LockKeys::with_capacity(0), &LockKeys::None);
1944        assert_lock_keys_equal(&LockKeys::with_capacity(1), &LockKeys::None);
1945        assert_lock_keys_equal(&LockKeys::with_capacity(2), &LockKeys::Vec(Vec::with_capacity(2)));
1946    }
1947
1948    #[test]
1949    fn test_lock_keys_len() {
1950        assert_eq!(lock_keys![].len(), 0);
1951        assert_eq!(lock_keys![LOCK_KEY_1].len(), 1);
1952        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].len(), 2);
1953    }
1954
1955    #[test]
1956    fn test_lock_keys_contains() {
1957        assert_eq!(lock_keys![].contains(&LOCK_KEY_1), false);
1958        assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_1), true);
1959        assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_2), false);
1960        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_1), true);
1961        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_2), true);
1962        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_3), false);
1963    }
1964
1965    #[test]
1966    fn test_lock_keys_push() {
1967        let mut keys = lock_keys![];
1968        keys.push(LOCK_KEY_1);
1969        assert_lock_keys_equal(&keys, &LockKeys::Inline(LOCK_KEY_1));
1970        keys.push(LOCK_KEY_2);
1971        assert_lock_keys_equal(&keys, &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]));
1972        keys.push(LOCK_KEY_3);
1973        assert_lock_keys_equivalent(
1974            &keys,
1975            &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2, LOCK_KEY_3]),
1976        );
1977    }
1978
1979    #[test]
1980    fn test_lock_keys_sort_unstable() {
1981        let mut keys = lock_keys![];
1982        keys.sort_unstable();
1983        assert_lock_keys_equal(&keys, &lock_keys![]);
1984
1985        let mut keys = lock_keys![LOCK_KEY_1];
1986        keys.sort_unstable();
1987        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
1988
1989        let mut keys = lock_keys![LOCK_KEY_2, LOCK_KEY_1];
1990        keys.sort_unstable();
1991        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
1992    }
1993
1994    #[test]
1995    fn test_lock_keys_dedup() {
1996        let mut keys = lock_keys![];
1997        keys.dedup();
1998        assert_lock_keys_equal(&keys, &lock_keys![]);
1999
2000        let mut keys = lock_keys![LOCK_KEY_1];
2001        keys.dedup();
2002        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2003
2004        let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_1];
2005        keys.dedup();
2006        assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2007    }
2008
2009    #[test]
2010    fn test_lock_keys_truncate() {
2011        let mut keys = lock_keys![];
2012        keys.truncate(5);
2013        assert_lock_keys_equal(&keys, &lock_keys![]);
2014        keys.truncate(0);
2015        assert_lock_keys_equal(&keys, &lock_keys![]);
2016
2017        let mut keys = lock_keys![LOCK_KEY_1];
2018        keys.truncate(5);
2019        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2020        keys.truncate(0);
2021        assert_lock_keys_equal(&keys, &lock_keys![]);
2022
2023        let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_2];
2024        keys.truncate(5);
2025        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2026        keys.truncate(1);
2027        // Although there's only 1 key after truncate the key is not stored inline.
2028        assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2029    }
2030
2031    #[test]
2032    fn test_lock_keys_iter() {
2033        assert_eq!(lock_keys![].iter().collect::<Vec<_>>(), Vec::<&LockKey>::new());
2034
2035        assert_eq!(lock_keys![LOCK_KEY_1].iter().collect::<Vec<_>>(), vec![&LOCK_KEY_1]);
2036
2037        assert_eq!(
2038            lock_keys![LOCK_KEY_1, LOCK_KEY_2].iter().collect::<Vec<_>>(),
2039            vec![&LOCK_KEY_1, &LOCK_KEY_2]
2040        );
2041    }
2042}