Skip to main content

fxfs/object_store/
transaction.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::Checksum;
6use crate::filesystem::{FxFilesystem, TxnGuard};
7use crate::log::*;
8use crate::lsm_tree::types::Item;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::AttributeKey;
11use crate::object_store::allocator::{AllocatorItem, Reservation};
12use crate::object_store::object_manager::{ObjectManager, reserved_space_from_journal_usage};
13use crate::object_store::object_record::{
14    FxfsKey, FxfsKeyV40, FxfsKeyV49, ObjectItem, ObjectItemV40, ObjectItemV41, ObjectItemV43,
15    ObjectItemV46, ObjectItemV47, ObjectItemV49, ObjectItemV50, ObjectItemV55, ObjectKey,
16    ObjectKeyData, ObjectValue, ProjectProperty,
17};
18use crate::serialized_types::{Migrate, Versioned, migrate_nodefault, migrate_to_version};
19use anyhow::Error;
20use either::{Either, Left, Right};
21use fprint::TypeFingerprint;
22use fuchsia_sync::Mutex;
23use futures::future::poll_fn;
24use futures::pin_mut;
25use rustc_hash::FxHashMap as HashMap;
26use scopeguard::ScopeGuard;
27use serde::{Deserialize, Serialize};
28use std::cell::UnsafeCell;
29use std::cmp::Ordering;
30use std::collections::BTreeSet;
31use std::collections::hash_map::Entry;
32use std::marker::PhantomPinned;
33use std::ops::{Deref, DerefMut, Range};
34use std::sync::Arc;
35use std::task::{Poll, Waker};
36use std::{fmt, mem};
37
38/// This allows for special handling of certain transactions such as deletes and the
39/// extension of Journal extents. For most other use cases it is appropriate to use
40/// `default()` here.
41#[derive(Clone, Copy, Default)]
42pub struct Options<'a> {
43    /// If true, don't check for low journal space.  This should be true for any transactions that
44    /// might alleviate journal space (i.e. compaction).
45    pub skip_journal_checks: bool,
46
47    /// If true, borrow metadata space from the metadata reservation.  This setting should be set to
48    /// true for any transaction that will either not affect space usage after compaction
49    /// (e.g. setting attributes), or reduce space usage (e.g. unlinking).  Otherwise, a transaction
50    /// might fail with an out-of-space error.
51    pub borrow_metadata_space: bool,
52
53    /// If specified, a reservation to be used with the transaction.  If not set, any allocations
54    /// that are part of this transaction will have to take their chances, and will fail if there is
55    /// no free space.  The intention is that this should be used for things like the journal which
56    /// require guaranteed space.
57    pub allocator_reservation: Option<&'a Reservation>,
58
59    /// An existing transaction guard to be used.
60    pub txn_guard: Option<&'a TxnGuard<'a>>,
61}
62
63// This is the amount of space that we reserve for metadata when we are creating a new transaction.
64// A transaction should not take more than this.  This is expressed in terms of space occupied in
65// the journal; transactions must not take up more space in the journal than the number below.  The
66// amount chosen here must be large enough for the maximum possible transaction that can be created,
67// so transactions always need to be bounded which might involve splitting an operation up into
68// smaller transactions.
69pub const TRANSACTION_MAX_JOURNAL_USAGE: u64 = 24_576;
70pub const TRANSACTION_METADATA_MAX_AMOUNT: u64 =
71    reserved_space_from_journal_usage(TRANSACTION_MAX_JOURNAL_USAGE);
72
73#[must_use]
74pub struct TransactionLocks<'a>(pub WriteGuard<'a>);
75
76/// The journal consists of these records which will be replayed at mount time.  Within a
77/// transaction, these are stored as a set which allows some mutations to be deduplicated and found
78/// (and we require custom comparison functions below).  For example, we need to be able to find
79/// object size changes.
80pub type Mutation = MutationV55;
81
82#[derive(
83    Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned,
84)]
85#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
86pub enum MutationV55 {
87    ObjectStore(ObjectStoreMutationV55),
88    EncryptedObjectStore(#[serde(with = "crate::zerocopy_serialization")] Box<[u8]>),
89    Allocator(AllocatorMutationV32),
90    // Indicates the beginning of a flush.  This would typically involve sealing a tree.
91    BeginFlush,
92    // Indicates the end of a flush.  This would typically involve replacing the immutable layers
93    // with compacted ones.
94    EndFlush,
95    // Volume has been deleted.  Requires we remove it from the set of managed ObjectStore.
96    DeleteVolume,
97    UpdateBorrowed(u64),
98    UpdateMutationsKey(UpdateMutationsKey),
99    CreateInternalDir(u64),
100}
101
102#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
103#[migrate_to_version(MutationV55)]
104#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
105pub enum MutationV54 {
106    ObjectStore(ObjectStoreMutationV54),
107    EncryptedObjectStore(#[serde(with = "crate::zerocopy_serialization")] Box<[u8]>),
108    Allocator(AllocatorMutationV32),
109    BeginFlush,
110    EndFlush,
111    DeleteVolume,
112    UpdateBorrowed(u64),
113    UpdateMutationsKey(UpdateMutationsKey),
114    CreateInternalDir(u64),
115}
116
117#[derive(
118    Migrate, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned,
119)]
120#[migrate_to_version(MutationV54)]
121pub enum MutationV50 {
122    ObjectStore(ObjectStoreMutationV50),
123    EncryptedObjectStore(#[serde(with = "crate::zerocopy_serialization")] Box<[u8]>),
124    Allocator(AllocatorMutationV32),
125    BeginFlush,
126    EndFlush,
127    DeleteVolume,
128    UpdateBorrowed(u64),
129    UpdateMutationsKey(UpdateMutationsKeyV49),
130    CreateInternalDir(u64),
131}
132
133#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
134#[migrate_to_version(MutationV50)]
135pub enum MutationV49 {
136    ObjectStore(ObjectStoreMutationV49),
137    EncryptedObjectStore(Box<[u8]>),
138    Allocator(AllocatorMutationV32),
139    BeginFlush,
140    EndFlush,
141    DeleteVolume,
142    UpdateBorrowed(u64),
143    UpdateMutationsKey(UpdateMutationsKeyV49),
144    CreateInternalDir(u64),
145}
146
147#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
148#[migrate_to_version(MutationV49)]
149pub enum MutationV47 {
150    ObjectStore(ObjectStoreMutationV47),
151    EncryptedObjectStore(Box<[u8]>),
152    Allocator(AllocatorMutationV32),
153    BeginFlush,
154    EndFlush,
155    DeleteVolume,
156    UpdateBorrowed(u64),
157    UpdateMutationsKey(UpdateMutationsKeyV40),
158    CreateInternalDir(u64),
159}
160
161#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
162#[migrate_to_version(MutationV47)]
163pub enum MutationV46 {
164    ObjectStore(ObjectStoreMutationV46),
165    EncryptedObjectStore(Box<[u8]>),
166    Allocator(AllocatorMutationV32),
167    BeginFlush,
168    EndFlush,
169    DeleteVolume,
170    UpdateBorrowed(u64),
171    UpdateMutationsKey(UpdateMutationsKeyV40),
172    CreateInternalDir(u64),
173}
174
175#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
176#[migrate_to_version(MutationV46)]
177pub enum MutationV43 {
178    ObjectStore(ObjectStoreMutationV43),
179    EncryptedObjectStore(Box<[u8]>),
180    Allocator(AllocatorMutationV32),
181    BeginFlush,
182    EndFlush,
183    DeleteVolume,
184    UpdateBorrowed(u64),
185    UpdateMutationsKey(UpdateMutationsKeyV40),
186    CreateInternalDir(u64),
187}
188
189#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
190#[migrate_to_version(MutationV43)]
191pub enum MutationV41 {
192    ObjectStore(ObjectStoreMutationV41),
193    EncryptedObjectStore(Box<[u8]>),
194    Allocator(AllocatorMutationV32),
195    BeginFlush,
196    EndFlush,
197    DeleteVolume,
198    UpdateBorrowed(u64),
199    UpdateMutationsKey(UpdateMutationsKeyV40),
200    CreateInternalDir(u64),
201}
202
203#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
204#[migrate_to_version(MutationV41)]
205pub enum MutationV40 {
206    ObjectStore(ObjectStoreMutationV40),
207    EncryptedObjectStore(Box<[u8]>),
208    Allocator(AllocatorMutationV32),
209    BeginFlush,
210    EndFlush,
211    DeleteVolume,
212    UpdateBorrowed(u64),
213    UpdateMutationsKey(UpdateMutationsKeyV40),
214    CreateInternalDir(u64),
215}
216
217impl Mutation {
218    pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
219        Mutation::ObjectStore(ObjectStoreMutation {
220            item: Item::new(key, value),
221            op: Operation::Insert,
222        })
223    }
224
225    pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
226        Mutation::ObjectStore(ObjectStoreMutation {
227            item: Item::new(key, value),
228            op: Operation::ReplaceOrInsert,
229        })
230    }
231
232    pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
233        Mutation::ObjectStore(ObjectStoreMutation {
234            item: Item::new(key, value),
235            op: Operation::Merge,
236        })
237    }
238
239    pub fn update_mutations_key(key: FxfsKey) -> Self {
240        Mutation::UpdateMutationsKey(key.into())
241    }
242}
243
244// We have custom comparison functions for mutations that just use the key, rather than the key and
245// value that would be used by default so that we can deduplicate and find mutations (see
246// get_object_mutation below).
247pub type ObjectStoreMutation = ObjectStoreMutationV55;
248
249#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
250#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
251pub struct ObjectStoreMutationV55 {
252    pub item: ObjectItemV55,
253    pub op: Operation,
254}
255
256#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
257#[migrate_to_version(ObjectStoreMutationV55)]
258#[migrate_nodefault]
259#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
260pub struct ObjectStoreMutationV54 {
261    pub item: crate::object_store::object_record::ObjectItemV54,
262    pub op: Operation,
263}
264
265#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
266#[migrate_to_version(ObjectStoreMutationV54)]
267#[migrate_nodefault]
268#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
269pub struct ObjectStoreMutationV50 {
270    pub item: ObjectItemV50,
271    pub op: OperationV32,
272}
273
274#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
275#[migrate_to_version(ObjectStoreMutationV50)]
276#[migrate_nodefault]
277pub struct ObjectStoreMutationV49 {
278    pub item: ObjectItemV49,
279    pub op: OperationV32,
280}
281
282#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
283#[migrate_to_version(ObjectStoreMutationV49)]
284#[migrate_nodefault]
285pub struct ObjectStoreMutationV47 {
286    pub item: ObjectItemV47,
287    pub op: OperationV32,
288}
289
290#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
291#[migrate_to_version(ObjectStoreMutationV47)]
292#[migrate_nodefault]
293pub struct ObjectStoreMutationV46 {
294    pub item: ObjectItemV46,
295    pub op: OperationV32,
296}
297
298#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
299#[migrate_to_version(ObjectStoreMutationV46)]
300#[migrate_nodefault]
301pub struct ObjectStoreMutationV43 {
302    pub item: ObjectItemV43,
303    pub op: OperationV32,
304}
305
306#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
307#[migrate_to_version(ObjectStoreMutationV43)]
308#[migrate_nodefault]
309pub struct ObjectStoreMutationV41 {
310    pub item: ObjectItemV41,
311    pub op: OperationV32,
312}
313
314#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
315#[migrate_nodefault]
316#[migrate_to_version(ObjectStoreMutationV41)]
317pub struct ObjectStoreMutationV40 {
318    pub item: ObjectItemV40,
319    pub op: OperationV32,
320}
321
322/// The different LSM tree operations that can be performed as part of a mutation.
323pub type Operation = OperationV32;
324
325#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
326#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
327pub enum OperationV32 {
328    Insert,
329    ReplaceOrInsert,
330    Merge,
331}
332
333impl Ord for ObjectStoreMutation {
334    fn cmp(&self, other: &Self) -> Ordering {
335        self.item.key.cmp(&other.item.key)
336    }
337}
338
339impl PartialOrd for ObjectStoreMutation {
340    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
341        Some(self.cmp(other))
342    }
343}
344
345impl PartialEq for ObjectStoreMutation {
346    fn eq(&self, other: &Self) -> bool {
347        self.item.key.eq(&other.item.key)
348    }
349}
350
351impl Eq for ObjectStoreMutation {}
352
353impl Ord for ObjectStoreMutationV50 {
354    fn cmp(&self, other: &Self) -> Ordering {
355        self.item.key.cmp(&other.item.key)
356    }
357}
358
359impl PartialOrd for ObjectStoreMutationV50 {
360    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
361        Some(self.cmp(other))
362    }
363}
364
365impl PartialEq for ObjectStoreMutationV50 {
366    fn eq(&self, other: &Self) -> bool {
367        self.item.key.eq(&other.item.key)
368    }
369}
370
371impl Eq for ObjectStoreMutationV50 {}
372
373impl Ord for AllocatorItem {
374    fn cmp(&self, other: &Self) -> Ordering {
375        self.key.cmp(&other.key)
376    }
377}
378
379impl PartialOrd for AllocatorItem {
380    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
381        Some(self.cmp(other))
382    }
383}
384
385/// Same as std::ops::Range but with Ord and PartialOrd support, sorted first by start of the range,
386/// then by the end.
387pub type DeviceRange = DeviceRangeV32;
388
389#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypeFingerprint)]
390#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
391pub struct DeviceRangeV32(pub Range<u64>);
392
393impl Deref for DeviceRange {
394    type Target = Range<u64>;
395
396    fn deref(&self) -> &Self::Target {
397        &self.0
398    }
399}
400
401impl DerefMut for DeviceRange {
402    fn deref_mut(&mut self) -> &mut Self::Target {
403        &mut self.0
404    }
405}
406
407impl From<Range<u64>> for DeviceRange {
408    fn from(range: Range<u64>) -> Self {
409        Self(range)
410    }
411}
412
413impl Into<Range<u64>> for DeviceRange {
414    fn into(self) -> Range<u64> {
415        self.0
416    }
417}
418
419impl Ord for DeviceRange {
420    fn cmp(&self, other: &Self) -> Ordering {
421        self.start.cmp(&other.start).then(self.end.cmp(&other.end))
422    }
423}
424
425impl PartialOrd for DeviceRange {
426    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
427        Some(self.cmp(other))
428    }
429}
430
431pub type AllocatorMutation = AllocatorMutationV32;
432
433#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint)]
434#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
435pub enum AllocatorMutationV32 {
436    Allocate {
437        device_range: DeviceRangeV32,
438        owner_object_id: u64,
439    },
440    Deallocate {
441        device_range: DeviceRangeV32,
442        owner_object_id: u64,
443    },
444    SetLimit {
445        owner_object_id: u64,
446        bytes: u64,
447    },
448    /// Marks all extents with a given owner_object_id for deletion.
449    /// Used to free space allocated to encrypted ObjectStore where we may not have the key.
450    /// Note that the actual deletion time is undefined so this should never be used where an
451    /// ObjectStore is still in use due to a high risk of corruption. Similarly, owner_object_id
452    /// should never be reused for the same reasons.
453    MarkForDeletion(u64),
454}
455
456pub type UpdateMutationsKey = UpdateMutationsKeyV49;
457
458#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
459pub struct UpdateMutationsKeyV49(pub FxfsKeyV49);
460
461#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
462#[migrate_to_version(UpdateMutationsKeyV49)]
463pub struct UpdateMutationsKeyV40(pub FxfsKeyV40);
464
465impl From<UpdateMutationsKey> for FxfsKey {
466    fn from(outer: UpdateMutationsKey) -> Self {
467        outer.0
468    }
469}
470
471impl From<FxfsKey> for UpdateMutationsKey {
472    fn from(inner: FxfsKey) -> Self {
473        Self(inner)
474    }
475}
476
477#[cfg(fuzz)]
478impl<'a> arbitrary::Arbitrary<'a> for UpdateMutationsKey {
479    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
480        Ok(UpdateMutationsKey::from(FxfsKey::arbitrary(u).unwrap()))
481    }
482}
483
484impl Ord for UpdateMutationsKey {
485    fn cmp(&self, other: &Self) -> Ordering {
486        (self as *const UpdateMutationsKey).cmp(&(other as *const _))
487    }
488}
489
490impl PartialOrd for UpdateMutationsKey {
491    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
492        Some(self.cmp(other))
493    }
494}
495
496impl Eq for UpdateMutationsKey {}
497
498impl PartialEq for UpdateMutationsKey {
499    fn eq(&self, other: &Self) -> bool {
500        std::ptr::eq(self, other)
501    }
502}
503
504/// When creating a transaction, locks typically need to be held to prevent two or more writers
505/// trying to make conflicting mutations at the same time.  LockKeys are used for this.
506/// NOTE: Ordering is important here!  The lock manager sorts the list of locks in a transaction
507/// to acquire them in a consistent order, but there are special cases for the Filesystem lock and
508/// the Flush lock.
509/// The Filesystem lock is taken by every transaction and is done so first, as part of the TxnGuard.
510/// The Flush lock is taken when we flush an LSM tree (e.g. an object store), and is held for
511/// several transactions.  As such, it must come first in the lock acquisition ordering, so that
512/// other transactions using the Flush lock have the same ordering as in flushing.
513#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Copy)]
514pub enum LockKey {
515    /// Locks the entire filesystem.
516    Filesystem,
517
518    /// Used to lock flushing an object.
519    Flush {
520        object_id: u64,
521    },
522
523    /// Used to lock changes to a particular object attribute (e.g. writes).
524    ObjectAttribute {
525        store_object_id: u64,
526        object_id: u64,
527        attribute_id: u64,
528    },
529
530    /// Used to lock changes to a particular object (e.g. adding a child to a directory).
531    Object {
532        store_object_id: u64,
533        object_id: u64,
534    },
535
536    ProjectId {
537        store_object_id: u64,
538        project_id: u64,
539    },
540
541    /// Used to lock any truncate operations for a file.
542    Truncate {
543        store_object_id: u64,
544        object_id: u64,
545    },
546
547    /// A lock used when getting or creating the internal directory.
548    InternalDirectory {
549        store_object_id: u64,
550    },
551}
552
553impl LockKey {
554    pub const fn object_attribute(store_object_id: u64, object_id: u64, attribute_id: u64) -> Self {
555        LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
556    }
557
558    pub const fn object(store_object_id: u64, object_id: u64) -> Self {
559        LockKey::Object { store_object_id, object_id }
560    }
561
562    pub const fn flush(object_id: u64) -> Self {
563        LockKey::Flush { object_id }
564    }
565
566    pub const fn truncate(store_object_id: u64, object_id: u64) -> Self {
567        LockKey::Truncate { store_object_id, object_id }
568    }
569}
570
571/// A container for holding `LockKey` objects. Can store a single `LockKey` inline.
572#[derive(Clone, Debug)]
573pub enum LockKeys {
574    None,
575    Inline(LockKey),
576    Vec(Vec<LockKey>),
577}
578
579impl LockKeys {
580    pub fn with_capacity(capacity: usize) -> Self {
581        if capacity > 1 { LockKeys::Vec(Vec::with_capacity(capacity)) } else { LockKeys::None }
582    }
583
584    pub fn push(&mut self, key: LockKey) {
585        match self {
586            Self::None => *self = LockKeys::Inline(key),
587            Self::Inline(inline) => {
588                *self = LockKeys::Vec(vec![*inline, key]);
589            }
590            Self::Vec(vec) => vec.push(key),
591        }
592    }
593
594    pub fn truncate(&mut self, len: usize) {
595        match self {
596            Self::None => {}
597            Self::Inline(_) => {
598                if len == 0 {
599                    *self = Self::None;
600                }
601            }
602            Self::Vec(vec) => vec.truncate(len),
603        }
604    }
605
606    fn len(&self) -> usize {
607        match self {
608            Self::None => 0,
609            Self::Inline(_) => 1,
610            Self::Vec(vec) => vec.len(),
611        }
612    }
613
614    fn contains(&self, key: &LockKey) -> bool {
615        match self {
616            Self::None => false,
617            Self::Inline(single) => single == key,
618            Self::Vec(vec) => vec.contains(key),
619        }
620    }
621
622    fn sort_unstable(&mut self) {
623        match self {
624            Self::Vec(vec) => vec.sort_unstable(),
625            _ => {}
626        }
627    }
628
629    fn dedup(&mut self) {
630        match self {
631            Self::Vec(vec) => vec.dedup(),
632            _ => {}
633        }
634    }
635
636    fn iter(&self) -> LockKeysIter<'_> {
637        match self {
638            LockKeys::None => LockKeysIter::None,
639            LockKeys::Inline(key) => LockKeysIter::Inline(key),
640            LockKeys::Vec(keys) => LockKeysIter::Vec(keys.iter()),
641        }
642    }
643}
644
645enum LockKeysIter<'a> {
646    None,
647    Inline(&'a LockKey),
648    Vec(std::slice::Iter<'a, LockKey>),
649}
650
651impl<'a> Iterator for LockKeysIter<'a> {
652    type Item = &'a LockKey;
653    fn next(&mut self) -> Option<Self::Item> {
654        match self {
655            Self::None => None,
656            Self::Inline(inline) => {
657                let next = *inline;
658                *self = Self::None;
659                Some(next)
660            }
661            Self::Vec(vec) => vec.next(),
662        }
663    }
664}
665
666impl Default for LockKeys {
667    fn default() -> Self {
668        LockKeys::None
669    }
670}
671
672#[macro_export]
673macro_rules! lock_keys {
674    () => {
675        $crate::object_store::transaction::LockKeys::None
676    };
677    ($lock_key:expr $(,)?) => {
678        $crate::object_store::transaction::LockKeys::Inline($lock_key)
679    };
680    ($($lock_keys:expr),+ $(,)?) => {
681        $crate::object_store::transaction::LockKeys::Vec(vec![$($lock_keys),+])
682    };
683}
684pub use lock_keys;
685
686/// Mutations in a transaction can be associated with an object so that when mutations are applied,
687/// updates can be applied to in-memory structures. For example, we cache object sizes, so when a
688/// size change is applied, we can update the cached object size.
689pub trait AssociatedObject: Send + Sync {
690    fn will_apply_mutation(&self, _mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
691    }
692}
693
694pub enum AssocObj<'a> {
695    None,
696    Borrowed(&'a dyn AssociatedObject),
697    Owned(Box<dyn AssociatedObject>),
698}
699
700impl AssocObj<'_> {
701    pub fn map<R, F: FnOnce(&dyn AssociatedObject) -> R>(&self, f: F) -> Option<R> {
702        match self {
703            AssocObj::None => None,
704            AssocObj::Borrowed(b) => Some(f(*b)),
705            AssocObj::Owned(o) => Some(f(o.as_ref())),
706        }
707    }
708}
709
710pub struct TxnMutation<'a> {
711    // This, at time of writing, is either the object ID of an object store, or the object ID of the
712    // allocator.  In the case of an object mutation, there's another object ID in the mutation
713    // record that would be for the object actually being changed.
714    pub object_id: u64,
715
716    // The actual mutation.  This gets serialized to the journal.
717    pub mutation: Mutation,
718
719    // An optional associated object for the mutation.  During replay, there will always be no
720    // associated object.
721    pub associated_object: AssocObj<'a>,
722}
723
724// We store TxnMutation in a set, and for that, we only use object_id and mutation and not the
725// associated object or checksum.
726impl Ord for TxnMutation<'_> {
727    fn cmp(&self, other: &Self) -> Ordering {
728        self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
729    }
730}
731
732impl PartialOrd for TxnMutation<'_> {
733    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
734        Some(self.cmp(other))
735    }
736}
737
738impl PartialEq for TxnMutation<'_> {
739    fn eq(&self, other: &Self) -> bool {
740        self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
741    }
742}
743
744impl Eq for TxnMutation<'_> {}
745
746impl std::fmt::Debug for TxnMutation<'_> {
747    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
748        f.debug_struct("TxnMutation")
749            .field("object_id", &self.object_id)
750            .field("mutation", &self.mutation)
751            .finish()
752    }
753}
754
755pub enum MetadataReservation {
756    // The state after a transaction has been dropped.
757    None,
758
759    // Metadata space for this transaction is being borrowed from ObjectManager's metadata
760    // reservation.
761    Borrowed,
762
763    // A metadata reservation was made when the transaction was created.
764    Reservation(Reservation),
765
766    // The metadata space is being _held_ within `allocator_reservation`.
767    Hold(u64),
768}
769
770/// A transaction groups mutation records to be committed as a group.
771pub struct Transaction<'a> {
772    txn_guard: TxnGuard<'a>,
773
774    // The mutations that make up this transaction.
775    mutations: BTreeSet<TxnMutation<'a>>,
776
777    // The locks that this transaction currently holds.
778    txn_locks: LockKeys,
779
780    /// If set, an allocator reservation that should be used for allocations.
781    pub allocator_reservation: Option<&'a Reservation>,
782
783    /// The reservation for the metadata for this transaction.
784    pub metadata_reservation: MetadataReservation,
785
786    // Keep track of objects explicitly created by this transaction. No locks are required for them.
787    // Addressed by (owner_object_id, object_id).
788    new_objects: BTreeSet<(u64, u64)>,
789
790    /// Any data checksums which should be evaluated when replaying this transaction.
791    checksums: Vec<(Range<u64>, Vec<Checksum>, bool)>,
792
793    /// Set if this transaction contains data (i.e. includes any extent mutations).
794    includes_write: bool,
795}
796
797impl<'a> Transaction<'a> {
798    /// Creates a new transaction.  `txn_locks` are read locks that can be upgraded to write locks
799    /// at commit time.
800    pub async fn new(
801        txn_guard: TxnGuard<'a>,
802        options: Options<'a>,
803        txn_locks: LockKeys,
804    ) -> Result<Transaction<'a>, Error> {
805        txn_guard.fs().add_transaction(options.skip_journal_checks).await;
806        let fs = txn_guard.fs().clone();
807        let guard = scopeguard::guard((), |_| fs.sub_transaction());
808        let (metadata_reservation, allocator_reservation, hold) =
809            txn_guard.fs().reservation_for_transaction(options).await?;
810
811        let txn_locks = {
812            let lock_manager = txn_guard.fs().lock_manager();
813            let mut write_guard = lock_manager.txn_lock(txn_locks).await;
814            std::mem::take(&mut write_guard.0.lock_keys)
815        };
816        let mut transaction = Transaction {
817            txn_guard,
818            mutations: BTreeSet::new(),
819            txn_locks,
820            allocator_reservation: None,
821            metadata_reservation,
822            new_objects: BTreeSet::new(),
823            checksums: Vec::new(),
824            includes_write: false,
825        };
826
827        ScopeGuard::into_inner(guard);
828        hold.map(|h| h.forget()); // Transaction takes ownership from here on.
829        transaction.allocator_reservation = allocator_reservation;
830        Ok(transaction)
831    }
832
833    pub fn txn_guard(&self) -> &TxnGuard<'_> {
834        &self.txn_guard
835    }
836
837    pub fn mutations(&self) -> &BTreeSet<TxnMutation<'a>> {
838        &self.mutations
839    }
840
841    pub fn take_mutations(&mut self) -> BTreeSet<TxnMutation<'a>> {
842        self.new_objects.clear();
843        mem::take(&mut self.mutations)
844    }
845
846    /// Adds a mutation to this transaction.  If the mutation already exists, it is replaced and the
847    /// old mutation is returned.
848    pub fn add(&mut self, object_id: u64, mutation: Mutation) -> Option<Mutation> {
849        self.add_with_object(object_id, mutation, AssocObj::None)
850    }
851
852    /// Removes a mutation that matches `mutation`.
853    pub fn remove(&mut self, object_id: u64, mutation: Mutation) {
854        let txn_mutation = TxnMutation { object_id, mutation, associated_object: AssocObj::None };
855        if self.mutations.remove(&txn_mutation) {
856            if let Mutation::ObjectStore(ObjectStoreMutation {
857                item:
858                    ObjectItem {
859                        key: ObjectKey { object_id: new_object_id, data: ObjectKeyData::Object },
860                        ..
861                    },
862                op: Operation::Insert,
863            }) = txn_mutation.mutation
864            {
865                self.new_objects.remove(&(object_id, new_object_id));
866            }
867        }
868    }
869
870    /// Adds a mutation with an associated object. If the mutation already exists, it is replaced
871    /// and the old mutation is returned.
872    pub fn add_with_object(
873        &mut self,
874        object_id: u64,
875        mutation: Mutation,
876        associated_object: AssocObj<'a>,
877    ) -> Option<Mutation> {
878        assert!(object_id != INVALID_OBJECT_ID);
879        if let Mutation::ObjectStore(ObjectStoreMutation {
880            item:
881                Item {
882                    key:
883                        ObjectKey { data: ObjectKeyData::Attribute(_, AttributeKey::Extent(_)), .. },
884                    ..
885                },
886            ..
887        }) = &mutation
888        {
889            self.includes_write = true;
890        }
891        let txn_mutation = TxnMutation { object_id, mutation, associated_object };
892        self.verify_locks(&txn_mutation);
893        self.mutations.replace(txn_mutation).map(|m| m.mutation)
894    }
895
896    pub fn add_checksum(&mut self, range: Range<u64>, checksums: Vec<Checksum>, first_write: bool) {
897        self.checksums.push((range, checksums, first_write));
898    }
899
900    pub fn includes_write(&self) -> bool {
901        self.includes_write
902    }
903
904    pub fn checksums(&self) -> &[(Range<u64>, Vec<Checksum>, bool)] {
905        &self.checksums
906    }
907
908    pub fn take_checksums(&mut self) -> Vec<(Range<u64>, Vec<Checksum>, bool)> {
909        std::mem::replace(&mut self.checksums, Vec::new())
910    }
911
912    fn verify_locks(&mut self, mutation: &TxnMutation<'_>) {
913        // It was considered to change the locks from Vec to BTreeSet since we'll now be searching
914        // through it, but given the small set that these locks usually comprise, it probably isn't
915        // worth it.
916        match mutation {
917            TxnMutation {
918                mutation:
919                    Mutation::ObjectStore {
920                        0: ObjectStoreMutation { item: ObjectItem { key, .. }, op },
921                    },
922                object_id: store_object_id,
923                ..
924            } => {
925                match &key.data {
926                    ObjectKeyData::Attribute(..) => {
927                        // TODO(https://fxbug.dev/42073914): Check lock requirements.
928                    }
929                    ObjectKeyData::Child { .. }
930                    | ObjectKeyData::EncryptedChild(_)
931                    | ObjectKeyData::EncryptedCasefoldChild(_)
932                    | ObjectKeyData::CasefoldChild { .. }
933                    | ObjectKeyData::LegacyCasefoldChild(_) => {
934                        let id = key.object_id;
935                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
936                            && !self.new_objects.contains(&(*store_object_id, id))
937                        {
938                            debug_assert!(
939                                false,
940                                "Not holding required lock for object {id} \
941                                in store {store_object_id}"
942                            );
943                            error!(
944                                "Not holding required lock for object {id} in store \
945                                {store_object_id}"
946                            )
947                        }
948                    }
949                    ObjectKeyData::GraveyardEntry { .. } => {
950                        // TODO(https://fxbug.dev/42073911): Check lock requirements.
951                    }
952                    ObjectKeyData::GraveyardAttributeEntry { .. } => {
953                        // TODO(https://fxbug.dev/122974): Check lock requirements.
954                    }
955                    ObjectKeyData::Keys => {
956                        let id = key.object_id;
957                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
958                            && !self.new_objects.contains(&(*store_object_id, id))
959                        {
960                            debug_assert!(
961                                false,
962                                "Not holding required lock for object {id} \
963                                in store {store_object_id}"
964                            );
965                            error!(
966                                "Not holding required lock for object {id} in store \
967                                {store_object_id}"
968                            )
969                        }
970                    }
971                    ObjectKeyData::Object => match op {
972                        // Insert implies the caller expects no object with which to race
973                        Operation::Insert => {
974                            self.new_objects.insert((*store_object_id, key.object_id));
975                        }
976                        Operation::Merge | Operation::ReplaceOrInsert => {
977                            let id = key.object_id;
978                            if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
979                                && !self.new_objects.contains(&(*store_object_id, id))
980                            {
981                                debug_assert!(
982                                    false,
983                                    "Not holding required lock for object {id} \
984                                    in store {store_object_id}"
985                                );
986                                error!(
987                                    "Not holding required lock for object {id} in store \
988                                    {store_object_id}"
989                                )
990                            }
991                        }
992                    },
993                    ObjectKeyData::Project { project_id, property: ProjectProperty::Limit } => {
994                        if !self.txn_locks.contains(&LockKey::ProjectId {
995                            store_object_id: *store_object_id,
996                            project_id: *project_id,
997                        }) {
998                            debug_assert!(
999                                false,
1000                                "Not holding required lock for project limit id {project_id} \
1001                                in store {store_object_id}"
1002                            );
1003                            error!(
1004                                "Not holding required lock for project limit id {project_id} in \
1005                                store {store_object_id}"
1006                            )
1007                        }
1008                    }
1009                    ObjectKeyData::Project { property: ProjectProperty::Usage, .. } => match op {
1010                        Operation::Insert | Operation::ReplaceOrInsert => {
1011                            panic!(
1012                                "Project usage is all handled by merging deltas, no inserts or \
1013                                replacements should be used"
1014                            );
1015                        }
1016                        // Merges are all handled like atomic +/- and serialized by the tree locks.
1017                        Operation::Merge => {}
1018                    },
1019                    ObjectKeyData::ExtendedAttribute { .. } => {
1020                        let id = key.object_id;
1021                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
1022                            && !self.new_objects.contains(&(*store_object_id, id))
1023                        {
1024                            debug_assert!(
1025                                false,
1026                                "Not holding required lock for object {id} \
1027                                in store {store_object_id} while mutating extended attribute"
1028                            );
1029                            error!(
1030                                "Not holding required lock for object {id} in store \
1031                                {store_object_id} while mutating extended attribute"
1032                            )
1033                        }
1034                    }
1035                }
1036            }
1037            TxnMutation { mutation: Mutation::DeleteVolume, object_id, .. } => {
1038                if !self.txn_locks.contains(&LockKey::flush(*object_id)) {
1039                    debug_assert!(false, "Not holding required lock for DeleteVolume");
1040                    error!("Not holding required lock for DeleteVolume");
1041                }
1042            }
1043            _ => {}
1044        }
1045    }
1046
1047    /// Returns true if this transaction has no mutations.
1048    pub fn is_empty(&self) -> bool {
1049        self.mutations.is_empty()
1050    }
1051
1052    /// Searches for an existing object mutation within the transaction that has the given key and
1053    /// returns it if found.
1054    pub fn get_object_mutation(
1055        &self,
1056        store_object_id: u64,
1057        key: ObjectKey,
1058    ) -> Option<&ObjectStoreMutation> {
1059        if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
1060            self.mutations.get(&TxnMutation {
1061                object_id: store_object_id,
1062                mutation: Mutation::insert_object(key, ObjectValue::None),
1063                associated_object: AssocObj::None,
1064            })
1065        {
1066            Some(mutation)
1067        } else {
1068            None
1069        }
1070    }
1071
1072    /// Commits a transaction.  If successful, returns the journal offset of the transaction.
1073    pub async fn commit(mut self) -> Result<u64, Error> {
1074        debug!(txn:? = &self; "Commit");
1075        self.txn_guard.fs().clone().commit_transaction(&mut self, |x| x).await
1076    }
1077
1078    /// Commits and then runs the callback whilst locks are held.  The callback accepts a single
1079    /// parameter which is the journal offset of the transaction.
1080    pub async fn commit_with_callback<R: Send>(
1081        mut self,
1082        f: impl FnOnce(u64) -> R + Send,
1083    ) -> Result<R, Error> {
1084        debug!(txn:? = &self; "Commit");
1085        self.txn_guard.fs().clone().commit_transaction(&mut self, f).await
1086    }
1087
1088    /// Commits the transaction, but allows the transaction to be used again.  The locks are not
1089    /// dropped (but transaction locks will get downgraded to read locks).
1090    pub async fn commit_and_continue(&mut self) -> Result<(), Error> {
1091        debug!(txn:? = self; "Commit");
1092        self.txn_guard.fs().clone().commit_transaction(self, |_| {}).await?;
1093        assert!(self.mutations.is_empty());
1094        self.txn_guard.fs().lock_manager().downgrade_locks(&self.txn_locks);
1095        Ok(())
1096    }
1097}
1098
1099impl Drop for Transaction<'_> {
1100    fn drop(&mut self) {
1101        // Call the filesystem implementation of drop_transaction which should, as a minimum, call
1102        // LockManager's drop_transaction to ensure the locks are released.
1103        debug!(txn:? = &self; "Drop");
1104        self.txn_guard.fs().clone().drop_transaction(self);
1105    }
1106}
1107
1108impl std::fmt::Debug for Transaction<'_> {
1109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1110        f.debug_struct("Transaction")
1111            .field("mutations", &self.mutations)
1112            .field("txn_locks", &self.txn_locks)
1113            .field("reservation", &self.allocator_reservation)
1114            .finish()
1115    }
1116}
1117
1118pub enum BorrowedOrOwned<'a, T> {
1119    Borrowed(&'a T),
1120    Owned(T),
1121}
1122
1123impl<T> Deref for BorrowedOrOwned<'_, T> {
1124    type Target = T;
1125
1126    fn deref(&self) -> &Self::Target {
1127        match self {
1128            BorrowedOrOwned::Borrowed(b) => b,
1129            BorrowedOrOwned::Owned(o) => &o,
1130        }
1131    }
1132}
1133
1134impl<'a, T> From<&'a T> for BorrowedOrOwned<'a, T> {
1135    fn from(value: &'a T) -> Self {
1136        BorrowedOrOwned::Borrowed(value)
1137    }
1138}
1139
1140impl<T> From<T> for BorrowedOrOwned<'_, T> {
1141    fn from(value: T) -> Self {
1142        BorrowedOrOwned::Owned(value)
1143    }
1144}
1145
1146/// LockManager holds the locks that transactions might have taken.  A TransactionManager
1147/// implementation would typically have one of these.
1148///
1149/// Three different kinds of locks are supported.  There are read locks and write locks, which are
1150/// as one would expect.  The third kind of lock is a _transaction_ lock (which is also known as an
1151/// upgradeable read lock).  When first acquired, these block other writes (including other
1152/// transaction locks) but do not block reads.  When it is time to commit a transaction, these locks
1153/// are upgraded to full write locks (without ever dropping the lock) and then dropped after
1154/// committing (unless commit_and_continue is used).  This way, reads are only blocked for the
1155/// shortest possible time.  It follows that write locks should be used sparingly.  Locks are
1156/// granted in order with one exception: when a lock is in the initial _transaction_ lock state
1157/// (LockState::Locked), all read locks are allowed even if there are other tasks waiting for the
1158/// lock.  The reason for this is because we allow read locks to be taken by tasks that have taken a
1159/// _transaction_ lock (i.e. recursion is allowed).  In other cases, such as when a writer is
1160/// waiting and there are only readers, readers will queue up behind the writer.
1161///
1162/// To summarize:
1163///
1164/// +-------------------------+-----------------+----------------+------------------+
1165/// |                         | While read_lock | While txn_lock | While write_lock |
1166/// |                         | is held         | is held        | is held          |
1167/// +-------------------------+-----------------+----------------+------------------+
1168/// | Can acquire read_lock?  | true            | true           | false            |
1169/// +-------------------------+-----------------+----------------+------------------+
1170/// | Can acquire txn_lock?   | true            | false          | false            |
1171/// +-------------------------+-----------------+----------------+------------------+
1172/// | Can acquire write_lock? | false           | false          | false            |
1173/// +-------------------------+-----------------+----------------+------------------+
1174pub struct LockManager {
1175    locks: Mutex<Locks>,
1176}
1177
1178struct Locks {
1179    keys: HashMap<LockKey, LockEntry>,
1180}
1181
1182impl Locks {
1183    fn drop_lock(&mut self, key: LockKey, state: LockState) {
1184        if let Entry::Occupied(mut occupied) = self.keys.entry(key) {
1185            let entry = occupied.get_mut();
1186            let wake = match state {
1187                LockState::ReadLock => {
1188                    entry.read_count -= 1;
1189                    entry.read_count == 0
1190                }
1191                // drop_write_locks currently depends on us treating Locked and WriteLock the same.
1192                LockState::Locked | LockState::WriteLock => {
1193                    entry.state = LockState::ReadLock;
1194                    true
1195                }
1196            };
1197            if wake {
1198                // SAFETY: The lock in `LockManager::locks` is held.
1199                unsafe {
1200                    entry.wake();
1201                }
1202                if entry.can_remove() {
1203                    occupied.remove_entry();
1204                }
1205            }
1206        } else {
1207            unreachable!();
1208        }
1209    }
1210
1211    fn drop_read_locks(&mut self, lock_keys: LockKeys) {
1212        for lock in lock_keys.iter() {
1213            self.drop_lock(*lock, LockState::ReadLock);
1214        }
1215    }
1216
1217    fn drop_write_locks(&mut self, lock_keys: LockKeys) {
1218        for lock in lock_keys.iter() {
1219            // This is a bit hacky, but this works for locks in either the Locked or WriteLock
1220            // states.
1221            self.drop_lock(*lock, LockState::WriteLock);
1222        }
1223    }
1224
1225    // Downgrades locks from WriteLock to Locked.
1226    fn downgrade_locks(&mut self, lock_keys: &LockKeys) {
1227        for lock in lock_keys.iter() {
1228            // SAFETY: The lock in `LockManager::locks` is held.
1229            unsafe {
1230                self.keys.get_mut(lock).unwrap().downgrade_lock();
1231            }
1232        }
1233    }
1234}
1235
1236#[derive(Debug)]
1237struct LockEntry {
1238    // In the states that allow readers (ReadLock, Locked), this count can be non-zero
1239    // to indicate the number of active readers.
1240    read_count: u64,
1241
1242    // The state of the lock (see below).
1243    state: LockState,
1244
1245    // A doubly-linked list of wakers that should be woken when they have been granted the lock.
1246    // New wakers are usually chained on to tail, with the exception being the case where a lock in
1247    // state Locked is to be upgraded to WriteLock, but can't because there are readers.  It might
1248    // be possible to use intrusive-collections in the future.
1249    head: *const LockWaker,
1250    tail: *const LockWaker,
1251}
1252
1253unsafe impl Send for LockEntry {}
1254
1255// Represents a node in the waker list.  It is only safe to access the members wrapped by UnsafeCell
1256// when LockManager's `locks` member is locked.
1257struct LockWaker {
1258    // The next and previous pointers in the doubly-linked list.
1259    next: UnsafeCell<*const LockWaker>,
1260    prev: UnsafeCell<*const LockWaker>,
1261
1262    // Holds the lock key for this waker.  This is required so that we can find the associated
1263    // `LockEntry`.
1264    key: LockKey,
1265
1266    // The underlying waker that should be used to wake the task.
1267    waker: UnsafeCell<WakerState>,
1268
1269    // The target state for this waker.
1270    target_state: LockState,
1271
1272    // True if this is an upgrade.
1273    is_upgrade: bool,
1274
1275    // We need to be pinned because these form part of the linked list.
1276    _pin: PhantomPinned,
1277}
1278
1279enum WakerState {
1280    // This is the initial state before the waker has been first polled.
1281    Pending,
1282
1283    // Once polled, this contains the actual waker.
1284    Registered(Waker),
1285
1286    // The waker has been woken and has been granted the lock.
1287    Woken,
1288}
1289
1290impl WakerState {
1291    fn is_woken(&self) -> bool {
1292        matches!(self, WakerState::Woken)
1293    }
1294}
1295
1296unsafe impl Send for LockWaker {}
1297unsafe impl Sync for LockWaker {}
1298
1299impl LockWaker {
1300    // Waits for the waker to be woken.
1301    async fn wait(&self, manager: &LockManager) {
1302        // We must guard against the future being dropped.
1303        let waker_guard = scopeguard::guard((), |_| {
1304            let mut locks = manager.locks.lock();
1305            // SAFETY: We've acquired the lock.
1306            unsafe {
1307                if (*self.waker.get()).is_woken() {
1308                    // We were woken, but didn't actually run, so we must drop the lock.
1309                    if self.is_upgrade {
1310                        locks.keys.get_mut(&self.key).unwrap().downgrade_lock();
1311                    } else {
1312                        locks.drop_lock(self.key, self.target_state);
1313                    }
1314                } else {
1315                    // We haven't been woken but we've been dropped so we must remove ourself from
1316                    // the waker list.
1317                    locks.keys.get_mut(&self.key).unwrap().remove_waker(self);
1318                }
1319            }
1320        });
1321
1322        poll_fn(|cx| {
1323            let _locks = manager.locks.lock();
1324            // SAFETY: We've acquired the lock.
1325            unsafe {
1326                if (*self.waker.get()).is_woken() {
1327                    Poll::Ready(())
1328                } else {
1329                    *self.waker.get() = WakerState::Registered(cx.waker().clone());
1330                    Poll::Pending
1331                }
1332            }
1333        })
1334        .await;
1335
1336        ScopeGuard::into_inner(waker_guard);
1337    }
1338}
1339
1340#[derive(Copy, Clone, Debug, PartialEq)]
1341enum LockState {
1342    // In this state, there are only readers.
1343    ReadLock,
1344
1345    // This state is used for transactions to lock other writers (including other transactions), but
1346    // it still allows readers.
1347    Locked,
1348
1349    // A writer has exclusive access; all other readers and writers are blocked.
1350    WriteLock,
1351}
1352
1353impl LockManager {
1354    pub fn new() -> Self {
1355        LockManager { locks: Mutex::new(Locks { keys: HashMap::default() }) }
1356    }
1357
1358    /// Acquires the locks.  It is the caller's responsibility to ensure that drop_transaction is
1359    /// called when a transaction is dropped i.e. the filesystem's drop_transaction method should
1360    /// call LockManager's drop_transaction method.
1361    pub async fn txn_lock<'a>(&'a self, lock_keys: LockKeys) -> TransactionLocks<'a> {
1362        TransactionLocks(
1363            debug_assert_not_too_long!(self.lock(lock_keys, LockState::Locked)).right().unwrap(),
1364        )
1365    }
1366
1367    // `state` indicates the kind of lock required.  ReadLock means acquire a read lock.  Locked
1368    // means lock other writers, but still allow readers.  WriteLock means acquire a write lock.
1369    async fn lock<'a>(
1370        &'a self,
1371        mut lock_keys: LockKeys,
1372        target_state: LockState,
1373    ) -> Either<ReadGuard<'a>, WriteGuard<'a>> {
1374        let mut guard = match &target_state {
1375            LockState::ReadLock => Left(ReadGuard {
1376                manager: self.into(),
1377                lock_keys: LockKeys::with_capacity(lock_keys.len()),
1378            }),
1379            LockState::Locked | LockState::WriteLock => Right(WriteGuard {
1380                manager: self.into(),
1381                lock_keys: LockKeys::with_capacity(lock_keys.len()),
1382            }),
1383        };
1384        let guard_keys = match &mut guard {
1385            Left(g) => &mut g.lock_keys,
1386            Right(g) => &mut g.lock_keys,
1387        };
1388        lock_keys.sort_unstable();
1389        lock_keys.dedup();
1390        for lock in lock_keys.iter() {
1391            let lock_waker = None;
1392            pin_mut!(lock_waker);
1393            {
1394                let mut locks = self.locks.lock();
1395                match locks.keys.entry(*lock) {
1396                    Entry::Vacant(vacant) => {
1397                        vacant.insert(LockEntry {
1398                            read_count: if let LockState::ReadLock = target_state {
1399                                guard_keys.push(*lock);
1400                                1
1401                            } else {
1402                                guard_keys.push(*lock);
1403                                0
1404                            },
1405                            state: target_state,
1406                            head: std::ptr::null(),
1407                            tail: std::ptr::null(),
1408                        });
1409                    }
1410                    Entry::Occupied(mut occupied) => {
1411                        let entry = occupied.get_mut();
1412                        // SAFETY: We've acquired the lock.
1413                        if unsafe { entry.is_allowed(target_state, entry.head.is_null()) } {
1414                            if let LockState::ReadLock = target_state {
1415                                entry.read_count += 1;
1416                                guard_keys.push(*lock);
1417                            } else {
1418                                entry.state = target_state;
1419                                guard_keys.push(*lock);
1420                            }
1421                        } else {
1422                            // Initialise a waker and push it on the tail of the list.
1423                            // SAFETY: `lock_waker` isn't used prior to this point.
1424                            unsafe {
1425                                *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1426                                    next: UnsafeCell::new(std::ptr::null()),
1427                                    prev: UnsafeCell::new(entry.tail),
1428                                    key: *lock,
1429                                    waker: UnsafeCell::new(WakerState::Pending),
1430                                    target_state: target_state,
1431                                    is_upgrade: false,
1432                                    _pin: PhantomPinned,
1433                                });
1434                            }
1435                            let waker = (*lock_waker).as_ref().unwrap();
1436                            if entry.tail.is_null() {
1437                                entry.head = waker;
1438                            } else {
1439                                // SAFETY: We've acquired the lock.
1440                                unsafe {
1441                                    *(*entry.tail).next.get() = waker;
1442                                }
1443                            }
1444                            entry.tail = waker;
1445                        }
1446                    }
1447                }
1448            }
1449            if let Some(waker) = &*lock_waker {
1450                waker.wait(self).await;
1451                guard_keys.push(*lock);
1452            }
1453        }
1454        guard
1455    }
1456
1457    /// This should be called by the filesystem's drop_transaction implementation.
1458    pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
1459        let mut locks = self.locks.lock();
1460        locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks));
1461    }
1462
1463    /// Prepares to commit by waiting for readers to finish.
1464    pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
1465        self.commit_prepare_keys(&transaction.txn_locks).await;
1466    }
1467
1468    async fn commit_prepare_keys(&self, lock_keys: &LockKeys) {
1469        for lock in lock_keys.iter() {
1470            let lock_waker = None;
1471            pin_mut!(lock_waker);
1472            {
1473                let mut locks = self.locks.lock();
1474                let entry = locks.keys.get_mut(lock).unwrap();
1475                assert_eq!(entry.state, LockState::Locked);
1476
1477                if entry.read_count == 0 {
1478                    entry.state = LockState::WriteLock;
1479                } else {
1480                    // Initialise a waker and push it on the head of the list.
1481                    // SAFETY: `lock_waker` isn't used prior to this point.
1482                    unsafe {
1483                        *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1484                            next: UnsafeCell::new(entry.head),
1485                            prev: UnsafeCell::new(std::ptr::null()),
1486                            key: *lock,
1487                            waker: UnsafeCell::new(WakerState::Pending),
1488                            target_state: LockState::WriteLock,
1489                            is_upgrade: true,
1490                            _pin: PhantomPinned,
1491                        });
1492                    }
1493                    let waker = (*lock_waker).as_ref().unwrap();
1494                    if entry.head.is_null() {
1495                        entry.tail = (*lock_waker).as_ref().unwrap();
1496                    } else {
1497                        // SAFETY: We've acquired the lock.
1498                        unsafe {
1499                            *(*entry.head).prev.get() = waker;
1500                        }
1501                    }
1502                    entry.head = waker;
1503                }
1504            }
1505
1506            if let Some(waker) = &*lock_waker {
1507                waker.wait(self).await;
1508            }
1509        }
1510    }
1511
1512    /// Acquires a read lock for the given keys.  Read locks are only blocked whilst a transaction
1513    /// is being committed for the same locks.  They are only necessary where consistency is
1514    /// required between different mutations within a transaction.  For example, a write might
1515    /// change the size and extents for an object, in which case a read lock is required so that
1516    /// observed size and extents are seen together or not at all.
1517    pub async fn read_lock<'a>(&'a self, lock_keys: LockKeys) -> ReadGuard<'a> {
1518        debug_assert_not_too_long!(self.lock(lock_keys, LockState::ReadLock)).left().unwrap()
1519    }
1520
1521    /// Acquires a write lock for the given keys.  Write locks provide exclusive access to the
1522    /// requested lock keys.
1523    pub async fn write_lock<'a>(&'a self, lock_keys: LockKeys) -> WriteGuard<'a> {
1524        debug_assert_not_too_long!(self.lock(lock_keys, LockState::WriteLock)).right().unwrap()
1525    }
1526
1527    /// Downgrades locks from the WriteLock state to Locked state.  This will panic if the locks are
1528    /// not in the WriteLock state.
1529    pub fn downgrade_locks(&self, lock_keys: &LockKeys) {
1530        self.locks.lock().downgrade_locks(lock_keys);
1531    }
1532}
1533
1534// These unsafe functions require that `locks` in LockManager is locked.
1535impl LockEntry {
1536    unsafe fn wake(&mut self) {
1537        // If the lock's state is WriteLock, or there's nothing waiting, return early.
1538        if self.head.is_null() || self.state == LockState::WriteLock {
1539            return;
1540        }
1541
1542        let waker = unsafe { &*self.head };
1543
1544        if waker.is_upgrade {
1545            if self.read_count > 0 {
1546                return;
1547            }
1548        } else if !unsafe { self.is_allowed(waker.target_state, true) } {
1549            return;
1550        }
1551
1552        unsafe { self.pop_and_wake() };
1553
1554        // If the waker was a write lock, we can't wake any more up, but otherwise, we can keep
1555        // waking up readers.
1556        if waker.target_state == LockState::WriteLock {
1557            return;
1558        }
1559
1560        while !self.head.is_null() && unsafe { (*self.head).target_state } == LockState::ReadLock {
1561            unsafe { self.pop_and_wake() };
1562        }
1563    }
1564
1565    unsafe fn pop_and_wake(&mut self) {
1566        let waker = unsafe { &*self.head };
1567
1568        // Pop the waker.
1569        self.head = unsafe { *waker.next.get() };
1570        if self.head.is_null() {
1571            self.tail = std::ptr::null()
1572        } else {
1573            unsafe { *(*self.head).prev.get() = std::ptr::null() };
1574        }
1575
1576        // Adjust our state accordingly.
1577        if waker.target_state == LockState::ReadLock {
1578            self.read_count += 1;
1579        } else {
1580            self.state = waker.target_state;
1581        }
1582
1583        // Now wake the task.
1584        if let WakerState::Registered(waker) =
1585            std::mem::replace(unsafe { &mut *waker.waker.get() }, WakerState::Woken)
1586        {
1587            waker.wake();
1588        }
1589    }
1590
1591    fn can_remove(&self) -> bool {
1592        self.state == LockState::ReadLock && self.read_count == 0
1593    }
1594
1595    unsafe fn remove_waker(&mut self, waker: &LockWaker) {
1596        unsafe {
1597            let is_first = (*waker.prev.get()).is_null();
1598            if is_first {
1599                self.head = *waker.next.get();
1600            } else {
1601                *(**waker.prev.get()).next.get() = *waker.next.get();
1602            }
1603            if (*waker.next.get()).is_null() {
1604                self.tail = *waker.prev.get();
1605            } else {
1606                *(**waker.next.get()).prev.get() = *waker.prev.get();
1607            }
1608            if is_first {
1609                // We must call wake in case we erased a pending write lock and readers can now
1610                // proceed.
1611                self.wake();
1612            }
1613        }
1614    }
1615
1616    // Returns whether or not a lock with given `target_state` can proceed.  `is_head` should be
1617    // true if this is something at the head of the waker list (or the waker list is empty) and
1618    // false if there are other items on the waker list that are prior.
1619    unsafe fn is_allowed(&self, target_state: LockState, is_head: bool) -> bool {
1620        match self.state {
1621            LockState::ReadLock => {
1622                // Allow ReadLock and Locked so long as nothing else is waiting.
1623                (self.read_count == 0
1624                    || target_state == LockState::Locked
1625                    || target_state == LockState::ReadLock)
1626                    && is_head
1627            }
1628            LockState::Locked => {
1629                // Always allow reads unless there's an upgrade waiting.  We have to
1630                // always allow reads in this state because tasks that have locks in
1631                // the Locked state can later try and acquire ReadLock.
1632                target_state == LockState::ReadLock
1633                    && (is_head || unsafe { !(*self.head).is_upgrade })
1634            }
1635            LockState::WriteLock => false,
1636        }
1637    }
1638
1639    unsafe fn downgrade_lock(&mut self) {
1640        assert_eq!(std::mem::replace(&mut self.state, LockState::Locked), LockState::WriteLock);
1641        unsafe { self.wake() };
1642    }
1643}
1644
1645#[must_use]
1646pub struct ReadGuard<'a> {
1647    manager: LockManagerRef<'a>,
1648    lock_keys: LockKeys,
1649}
1650
1651impl ReadGuard<'_> {
1652    pub fn fs(&self) -> Option<&Arc<FxFilesystem>> {
1653        if let LockManagerRef::Owned(fs) = &self.manager { Some(fs) } else { None }
1654    }
1655
1656    pub fn into_owned(mut self, fs: Arc<FxFilesystem>) -> ReadGuard<'static> {
1657        ReadGuard {
1658            manager: LockManagerRef::Owned(fs),
1659            lock_keys: std::mem::replace(&mut self.lock_keys, LockKeys::None),
1660        }
1661    }
1662}
1663
1664impl Drop for ReadGuard<'_> {
1665    fn drop(&mut self) {
1666        let mut locks = self.manager.locks.lock();
1667        locks.drop_read_locks(std::mem::take(&mut self.lock_keys));
1668    }
1669}
1670
1671impl fmt::Debug for ReadGuard<'_> {
1672    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1673        f.debug_struct("ReadGuard")
1674            .field("manager", &(&self.manager as *const _))
1675            .field("lock_keys", &self.lock_keys)
1676            .finish()
1677    }
1678}
1679
1680#[must_use]
1681pub struct WriteGuard<'a> {
1682    manager: LockManagerRef<'a>,
1683    lock_keys: LockKeys,
1684}
1685
1686impl Drop for WriteGuard<'_> {
1687    fn drop(&mut self) {
1688        let mut locks = self.manager.locks.lock();
1689        locks.drop_write_locks(std::mem::take(&mut self.lock_keys));
1690    }
1691}
1692
1693impl fmt::Debug for WriteGuard<'_> {
1694    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1695        f.debug_struct("WriteGuard")
1696            .field("manager", &(&self.manager as *const _))
1697            .field("lock_keys", &self.lock_keys)
1698            .finish()
1699    }
1700}
1701
1702enum LockManagerRef<'a> {
1703    Borrowed(&'a LockManager),
1704    Owned(Arc<FxFilesystem>),
1705}
1706
1707impl Deref for LockManagerRef<'_> {
1708    type Target = LockManager;
1709
1710    fn deref(&self) -> &Self::Target {
1711        match self {
1712            LockManagerRef::Borrowed(m) => m,
1713            LockManagerRef::Owned(f) => f.lock_manager(),
1714        }
1715    }
1716}
1717
1718impl<'a> From<&'a LockManager> for LockManagerRef<'a> {
1719    fn from(value: &'a LockManager) -> Self {
1720        LockManagerRef::Borrowed(value)
1721    }
1722}
1723
1724#[cfg(test)]
1725mod tests {
1726    use super::{LockKey, LockKeys, LockManager, LockState, Mutation, Options};
1727    use crate::filesystem::FxFilesystem;
1728    use fuchsia_async as fasync;
1729    use fuchsia_sync::Mutex;
1730    use futures::channel::oneshot::channel;
1731    use futures::future::FutureExt;
1732    use futures::stream::FuturesUnordered;
1733    use futures::{StreamExt, join, pin_mut};
1734    use std::task::Poll;
1735    use std::time::Duration;
1736    use storage_device::DeviceHolder;
1737    use storage_device::fake_device::FakeDevice;
1738
1739    #[fuchsia::test]
1740    async fn test_simple() {
1741        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1742        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1743        let mut t = fs
1744            .clone()
1745            .new_transaction(lock_keys![], Options::default())
1746            .await
1747            .expect("new_transaction failed");
1748        t.add(1, Mutation::BeginFlush);
1749        assert!(!t.is_empty());
1750    }
1751
1752    #[fuchsia::test]
1753    async fn test_locks() {
1754        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1755        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1756        let (send1, recv1) = channel();
1757        let (send2, recv2) = channel();
1758        let (send3, recv3) = channel();
1759        let done = Mutex::new(false);
1760        let mut futures = FuturesUnordered::new();
1761        futures.push(
1762            async {
1763                let _t = fs
1764                    .clone()
1765                    .new_transaction(
1766                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1767                        Options::default(),
1768                    )
1769                    .await
1770                    .expect("new_transaction failed");
1771                send1.send(()).unwrap(); // Tell the next future to continue.
1772                send3.send(()).unwrap(); // Tell the last future to continue.
1773                recv2.await.unwrap();
1774                // This is a halting problem so all we can do is sleep.
1775                fasync::Timer::new(Duration::from_millis(100)).await;
1776                assert!(!*done.lock());
1777            }
1778            .boxed(),
1779        );
1780        futures.push(
1781            async {
1782                recv1.await.unwrap();
1783                // This should not block since it is a different key.
1784                let _t = fs
1785                    .clone()
1786                    .new_transaction(
1787                        lock_keys![LockKey::object_attribute(2, 2, 3)],
1788                        Options::default(),
1789                    )
1790                    .await
1791                    .expect("new_transaction failed");
1792                // Tell the first future to continue.
1793                send2.send(()).unwrap();
1794            }
1795            .boxed(),
1796        );
1797        futures.push(
1798            async {
1799                // This should block until the first future has completed.
1800                recv3.await.unwrap();
1801                let _t = fs
1802                    .clone()
1803                    .new_transaction(
1804                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1805                        Options::default(),
1806                    )
1807                    .await;
1808                *done.lock() = true;
1809            }
1810            .boxed(),
1811        );
1812        while let Some(()) = futures.next().await {}
1813    }
1814
1815    #[fuchsia::test]
1816    async fn test_read_lock_after_write_lock() {
1817        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1818        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1819        let (send1, recv1) = channel();
1820        let (send2, recv2) = channel();
1821        let done = Mutex::new(false);
1822        join!(
1823            async {
1824                let t = fs
1825                    .clone()
1826                    .new_transaction(
1827                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1828                        Options::default(),
1829                    )
1830                    .await
1831                    .expect("new_transaction failed");
1832                send1.send(()).unwrap(); // Tell the next future to continue.
1833                recv2.await.unwrap();
1834                t.commit().await.expect("commit failed");
1835                *done.lock() = true;
1836            },
1837            async {
1838                recv1.await.unwrap();
1839                // Reads should not be blocked until the transaction is committed.
1840                let _guard = fs
1841                    .lock_manager()
1842                    .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1843                    .await;
1844                // Tell the first future to continue.
1845                send2.send(()).unwrap();
1846                // It shouldn't proceed until we release our read lock, but it's a halting
1847                // problem, so sleep.
1848                fasync::Timer::new(Duration::from_millis(100)).await;
1849                assert!(!*done.lock());
1850            },
1851        );
1852    }
1853
1854    #[fuchsia::test]
1855    async fn test_write_lock_after_read_lock() {
1856        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1857        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1858        let (send1, recv1) = channel();
1859        let (send2, recv2) = channel();
1860        let done = Mutex::new(false);
1861        join!(
1862            async {
1863                // Reads should not be blocked until the transaction is committed.
1864                let _guard = fs
1865                    .lock_manager()
1866                    .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1867                    .await;
1868                // Tell the next future to continue and then wait.
1869                send1.send(()).unwrap();
1870                recv2.await.unwrap();
1871                // It shouldn't proceed until we release our read lock, but it's a halting
1872                // problem, so sleep.
1873                fasync::Timer::new(Duration::from_millis(100)).await;
1874                assert!(!*done.lock());
1875            },
1876            async {
1877                recv1.await.unwrap();
1878                let t = fs
1879                    .clone()
1880                    .new_transaction(
1881                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1882                        Options::default(),
1883                    )
1884                    .await
1885                    .expect("new_transaction failed");
1886                send2.send(()).unwrap(); // Tell the first future to continue;
1887                t.commit().await.expect("commit failed");
1888                *done.lock() = true;
1889            },
1890        );
1891    }
1892
1893    #[fuchsia::test]
1894    async fn test_drop_uncommitted_transaction() {
1895        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1896        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1897        let key = lock_keys![LockKey::object(1, 1)];
1898
1899        // Dropping while there's a reader.
1900        {
1901            let _write_lock = fs
1902                .clone()
1903                .new_transaction(key.clone(), Options::default())
1904                .await
1905                .expect("new_transaction failed");
1906            let _read_lock = fs.lock_manager().read_lock(key.clone()).await;
1907        }
1908        // Dropping while there's no reader.
1909        {
1910            let _write_lock = fs
1911                .clone()
1912                .new_transaction(key.clone(), Options::default())
1913                .await
1914                .expect("new_transaction failed");
1915        }
1916        // Make sure we can take the lock again (i.e. it was actually released).
1917        fs.clone()
1918            .new_transaction(key.clone(), Options::default())
1919            .await
1920            .expect("new_transaction failed");
1921    }
1922
1923    #[fuchsia::test]
1924    async fn test_drop_waiting_write_lock() {
1925        let manager = LockManager::new();
1926        let keys = lock_keys![LockKey::object(1, 1)];
1927        {
1928            let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1929            if let Poll::Ready(_) =
1930                futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1931            {
1932                assert!(false);
1933            }
1934        }
1935        let _ = manager.lock(keys, LockState::WriteLock).await;
1936    }
1937
1938    #[fuchsia::test]
1939    async fn test_write_lock_blocks_everything() {
1940        let manager = LockManager::new();
1941        let keys = lock_keys![LockKey::object(1, 1)];
1942        {
1943            let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1944            if let Poll::Ready(_) =
1945                futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1946            {
1947                assert!(false);
1948            }
1949            if let Poll::Ready(_) =
1950                futures::poll!(manager.lock(keys.clone(), LockState::ReadLock).boxed())
1951            {
1952                assert!(false);
1953            }
1954        }
1955        {
1956            let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1957        }
1958        {
1959            let _guard = manager.lock(keys, LockState::ReadLock).await;
1960        }
1961    }
1962
1963    #[fuchsia::test]
1964    async fn test_downgrade_locks() {
1965        let manager = LockManager::new();
1966        let keys = lock_keys![LockKey::object(1, 1)];
1967        let _guard = manager.txn_lock(keys.clone()).await;
1968        manager.commit_prepare_keys(&keys).await;
1969
1970        // Use FuturesUnordered so that we can check that the waker is woken.
1971        let mut read_lock: FuturesUnordered<_> =
1972            std::iter::once(manager.read_lock(keys.clone())).collect();
1973
1974        // Trying to acquire a read lock now should be blocked.
1975        assert!(futures::poll!(read_lock.next()).is_pending());
1976
1977        manager.downgrade_locks(&keys);
1978
1979        // After downgrading, it should be possible to take a read lock.
1980        assert!(futures::poll!(read_lock.next()).is_ready());
1981    }
1982
1983    #[fuchsia::test]
1984    async fn test_dropped_write_lock_wakes() {
1985        let manager = LockManager::new();
1986        let keys = lock_keys![LockKey::object(1, 1)];
1987        let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1988        let mut read_lock = FuturesUnordered::new();
1989        read_lock.push(manager.lock(keys.clone(), LockState::ReadLock));
1990
1991        {
1992            let write_lock = manager.lock(keys, LockState::WriteLock);
1993            pin_mut!(write_lock);
1994
1995            // The write lock should be blocked because of the read lock.
1996            assert!(futures::poll!(write_lock).is_pending());
1997
1998            // Another read lock should be blocked because of the write lock.
1999            assert!(futures::poll!(read_lock.next()).is_pending());
2000        }
2001
2002        // Dropping the write lock should allow the read lock to proceed.
2003        assert!(futures::poll!(read_lock.next()).is_ready());
2004    }
2005
2006    #[fuchsia::test]
2007    async fn test_drop_upgrade() {
2008        let manager = LockManager::new();
2009        let keys = lock_keys![LockKey::object(1, 1)];
2010        let _guard = manager.lock(keys.clone(), LockState::Locked).await;
2011
2012        {
2013            let commit_prepare = manager.commit_prepare_keys(&keys);
2014            pin_mut!(commit_prepare);
2015            let _read_guard = manager.lock(keys.clone(), LockState::ReadLock).await;
2016            assert!(futures::poll!(commit_prepare).is_pending());
2017
2018            // Now we test dropping read_guard which should wake commit_prepare and
2019            // then dropping commit_prepare.
2020        }
2021
2022        // We should be able to still commit_prepare.
2023        manager.commit_prepare_keys(&keys).await;
2024    }
2025
2026    #[fasync::run_singlethreaded(test)]
2027    async fn test_woken_upgrade_blocks_reads() {
2028        let manager = LockManager::new();
2029        let keys = lock_keys![LockKey::object(1, 1)];
2030        // Start with a transaction lock.
2031        let guard = manager.lock(keys.clone(), LockState::Locked).await;
2032
2033        // Take a read lock.
2034        let read1 = manager.lock(keys.clone(), LockState::ReadLock).await;
2035
2036        // Try and upgrade the transaction lock, which should not be possible because of the read.
2037        let commit_prepare = manager.commit_prepare_keys(&keys);
2038        pin_mut!(commit_prepare);
2039        assert!(futures::poll!(commit_prepare.as_mut()).is_pending());
2040
2041        // Taking another read should also be blocked.
2042        let read2 = manager.lock(keys.clone(), LockState::ReadLock);
2043        pin_mut!(read2);
2044        assert!(futures::poll!(read2.as_mut()).is_pending());
2045
2046        // Drop the first read and the upgrade should complete.
2047        std::mem::drop(read1);
2048        assert!(futures::poll!(commit_prepare).is_ready());
2049
2050        // But the second read should still be blocked.
2051        assert!(futures::poll!(read2.as_mut()).is_pending());
2052
2053        // If we drop the write lock now, the read should be unblocked.
2054        std::mem::drop(guard);
2055        assert!(futures::poll!(read2).is_ready());
2056    }
2057
2058    static LOCK_KEY_1: LockKey = LockKey::flush(1);
2059    static LOCK_KEY_2: LockKey = LockKey::flush(2);
2060    static LOCK_KEY_3: LockKey = LockKey::flush(3);
2061
2062    // The keys, storage method, and capacity must all match.
2063    fn assert_lock_keys_equal(value: &LockKeys, expected: &LockKeys) {
2064        match (value, expected) {
2065            (LockKeys::None, LockKeys::None) => {}
2066            (LockKeys::Inline(key1), LockKeys::Inline(key2)) => {
2067                if key1 != key2 {
2068                    panic!("{key1:?} != {key2:?}");
2069                }
2070            }
2071            (LockKeys::Vec(vec1), LockKeys::Vec(vec2)) => {
2072                if vec1 != vec2 {
2073                    panic!("{vec1:?} != {vec2:?}");
2074                }
2075                if vec1.capacity() != vec2.capacity() {
2076                    panic!(
2077                        "LockKeys have different capacity: {} != {}",
2078                        vec1.capacity(),
2079                        vec2.capacity()
2080                    );
2081                }
2082            }
2083            (_, _) => panic!("{value:?} != {expected:?}"),
2084        }
2085    }
2086
2087    // Only the keys must match. Storage method and capacity don't matter.
2088    fn assert_lock_keys_equivalent(value: &LockKeys, expected: &LockKeys) {
2089        let value: Vec<_> = value.iter().collect();
2090        let expected: Vec<_> = expected.iter().collect();
2091        assert_eq!(value, expected);
2092    }
2093
2094    #[test]
2095    fn test_lock_keys_macro() {
2096        assert_lock_keys_equal(&lock_keys![], &LockKeys::None);
2097        assert_lock_keys_equal(&lock_keys![LOCK_KEY_1], &LockKeys::Inline(LOCK_KEY_1));
2098        assert_lock_keys_equal(
2099            &lock_keys![LOCK_KEY_1, LOCK_KEY_2],
2100            &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]),
2101        );
2102    }
2103
2104    #[test]
2105    fn test_lock_keys_with_capacity() {
2106        assert_lock_keys_equal(&LockKeys::with_capacity(0), &LockKeys::None);
2107        assert_lock_keys_equal(&LockKeys::with_capacity(1), &LockKeys::None);
2108        assert_lock_keys_equal(&LockKeys::with_capacity(2), &LockKeys::Vec(Vec::with_capacity(2)));
2109    }
2110
2111    #[test]
2112    fn test_lock_keys_len() {
2113        assert_eq!(lock_keys![].len(), 0);
2114        assert_eq!(lock_keys![LOCK_KEY_1].len(), 1);
2115        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].len(), 2);
2116    }
2117
2118    #[test]
2119    fn test_lock_keys_contains() {
2120        assert_eq!(lock_keys![].contains(&LOCK_KEY_1), false);
2121        assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_1), true);
2122        assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_2), false);
2123        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_1), true);
2124        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_2), true);
2125        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_3), false);
2126    }
2127
2128    #[test]
2129    fn test_lock_keys_push() {
2130        let mut keys = lock_keys![];
2131        keys.push(LOCK_KEY_1);
2132        assert_lock_keys_equal(&keys, &LockKeys::Inline(LOCK_KEY_1));
2133        keys.push(LOCK_KEY_2);
2134        assert_lock_keys_equal(&keys, &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]));
2135        keys.push(LOCK_KEY_3);
2136        assert_lock_keys_equivalent(
2137            &keys,
2138            &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2, LOCK_KEY_3]),
2139        );
2140    }
2141
2142    #[test]
2143    fn test_lock_keys_sort_unstable() {
2144        let mut keys = lock_keys![];
2145        keys.sort_unstable();
2146        assert_lock_keys_equal(&keys, &lock_keys![]);
2147
2148        let mut keys = lock_keys![LOCK_KEY_1];
2149        keys.sort_unstable();
2150        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2151
2152        let mut keys = lock_keys![LOCK_KEY_2, LOCK_KEY_1];
2153        keys.sort_unstable();
2154        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2155    }
2156
2157    #[test]
2158    fn test_lock_keys_dedup() {
2159        let mut keys = lock_keys![];
2160        keys.dedup();
2161        assert_lock_keys_equal(&keys, &lock_keys![]);
2162
2163        let mut keys = lock_keys![LOCK_KEY_1];
2164        keys.dedup();
2165        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2166
2167        let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_1];
2168        keys.dedup();
2169        assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2170    }
2171
2172    #[test]
2173    fn test_lock_keys_truncate() {
2174        let mut keys = lock_keys![];
2175        keys.truncate(5);
2176        assert_lock_keys_equal(&keys, &lock_keys![]);
2177        keys.truncate(0);
2178        assert_lock_keys_equal(&keys, &lock_keys![]);
2179
2180        let mut keys = lock_keys![LOCK_KEY_1];
2181        keys.truncate(5);
2182        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2183        keys.truncate(0);
2184        assert_lock_keys_equal(&keys, &lock_keys![]);
2185
2186        let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_2];
2187        keys.truncate(5);
2188        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2189        keys.truncate(1);
2190        // Although there's only 1 key after truncate the key is not stored inline.
2191        assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2192    }
2193
2194    #[test]
2195    fn test_lock_keys_iter() {
2196        assert_eq!(lock_keys![].iter().collect::<Vec<_>>(), Vec::<&LockKey>::new());
2197
2198        assert_eq!(lock_keys![LOCK_KEY_1].iter().collect::<Vec<_>>(), vec![&LOCK_KEY_1]);
2199
2200        assert_eq!(
2201            lock_keys![LOCK_KEY_1, LOCK_KEY_2].iter().collect::<Vec<_>>(),
2202            vec![&LOCK_KEY_1, &LOCK_KEY_2]
2203        );
2204    }
2205}