Skip to main content

fxfs/object_store/
transaction.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::Checksum;
6use crate::filesystem::{FxFilesystem, TxnGuard};
7use crate::log::*;
8use crate::lsm_tree::types::Item;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::AttributeKey;
11use crate::object_store::allocator::{AllocatorItem, Reservation};
12use crate::object_store::object_manager::{ObjectManager, reserved_space_from_journal_usage};
13use crate::object_store::object_record::{
14    FxfsKey, FxfsKeyV40, FxfsKeyV49, ObjectItem, ObjectItemV40, ObjectItemV41, ObjectItemV43,
15    ObjectItemV46, ObjectItemV47, ObjectItemV49, ObjectItemV50, ObjectKey, ObjectKeyData,
16    ObjectValue, ProjectProperty,
17};
18use crate::serialized_types::{Migrate, Versioned, migrate_nodefault, migrate_to_version};
19use anyhow::Error;
20use either::{Either, Left, Right};
21use fprint::TypeFingerprint;
22use fuchsia_sync::Mutex;
23use futures::future::poll_fn;
24use futures::pin_mut;
25use rustc_hash::FxHashMap as HashMap;
26use scopeguard::ScopeGuard;
27use serde::{Deserialize, Serialize};
28use std::cell::UnsafeCell;
29use std::cmp::Ordering;
30use std::collections::BTreeSet;
31use std::collections::hash_map::Entry;
32use std::marker::PhantomPinned;
33use std::ops::{Deref, DerefMut, Range};
34use std::sync::Arc;
35use std::task::{Poll, Waker};
36use std::{fmt, mem};
37
38/// This allows for special handling of certain transactions such as deletes and the
39/// extension of Journal extents. For most other use cases it is appropriate to use
40/// `default()` here.
41#[derive(Clone, Copy, Default)]
42pub struct Options<'a> {
43    /// If true, don't check for low journal space.  This should be true for any transactions that
44    /// might alleviate journal space (i.e. compaction).
45    pub skip_journal_checks: bool,
46
47    /// If true, borrow metadata space from the metadata reservation.  This setting should be set to
48    /// true for any transaction that will either not affect space usage after compaction
49    /// (e.g. setting attributes), or reduce space usage (e.g. unlinking).  Otherwise, a transaction
50    /// might fail with an out-of-space error.
51    pub borrow_metadata_space: bool,
52
53    /// If specified, a reservation to be used with the transaction.  If not set, any allocations
54    /// that are part of this transaction will have to take their chances, and will fail if there is
55    /// no free space.  The intention is that this should be used for things like the journal which
56    /// require guaranteed space.
57    pub allocator_reservation: Option<&'a Reservation>,
58
59    /// An existing transaction guard to be used.
60    pub txn_guard: Option<&'a TxnGuard<'a>>,
61}
62
63// This is the amount of space that we reserve for metadata when we are creating a new transaction.
64// A transaction should not take more than this.  This is expressed in terms of space occupied in
65// the journal; transactions must not take up more space in the journal than the number below.  The
66// amount chosen here must be large enough for the maximum possible transaction that can be created,
67// so transactions always need to be bounded which might involve splitting an operation up into
68// smaller transactions.
69pub const TRANSACTION_MAX_JOURNAL_USAGE: u64 = 24_576;
70pub const TRANSACTION_METADATA_MAX_AMOUNT: u64 =
71    reserved_space_from_journal_usage(TRANSACTION_MAX_JOURNAL_USAGE);
72
73#[must_use]
74pub struct TransactionLocks<'a>(pub WriteGuard<'a>);
75
76/// The journal consists of these records which will be replayed at mount time.  Within a
77/// transaction, these are stored as a set which allows some mutations to be deduplicated and found
78/// (and we require custom comparison functions below).  For example, we need to be able to find
79/// object size changes.
80pub type Mutation = MutationV54;
81
82#[derive(
83    Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned,
84)]
85#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
86pub enum MutationV54 {
87    ObjectStore(ObjectStoreMutationV54),
88    EncryptedObjectStore(#[serde(with = "crate::zerocopy_serialization")] Box<[u8]>),
89    Allocator(AllocatorMutationV32),
90    BeginFlush,
91    EndFlush,
92    DeleteVolume,
93    UpdateBorrowed(u64),
94    UpdateMutationsKey(UpdateMutationsKey),
95    CreateInternalDir(u64),
96}
97
98#[derive(
99    Migrate,
100    Clone,
101    Debug,
102    Eq,
103    Ord,
104    PartialEq,
105    PartialOrd,
106    Serialize,
107    Deserialize,
108    TypeFingerprint,
109    Versioned,
110)]
111#[migrate_to_version(MutationV54)]
112#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
113pub enum MutationV50 {
114    ObjectStore(ObjectStoreMutationV50),
115    EncryptedObjectStore(#[serde(with = "crate::zerocopy_serialization")] Box<[u8]>),
116    Allocator(AllocatorMutationV32),
117    // Indicates the beginning of a flush.  This would typically involve sealing a tree.
118    BeginFlush,
119    // Indicates the end of a flush.  This would typically involve replacing the immutable layers
120    // with compacted ones.
121    EndFlush,
122    // Volume has been deleted.  Requires we remove it from the set of managed ObjectStore.
123    DeleteVolume,
124    UpdateBorrowed(u64),
125    UpdateMutationsKey(UpdateMutationsKeyV49),
126    CreateInternalDir(u64),
127}
128
129#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
130#[migrate_to_version(MutationV50)]
131pub enum MutationV49 {
132    ObjectStore(ObjectStoreMutationV49),
133    EncryptedObjectStore(Box<[u8]>),
134    Allocator(AllocatorMutationV32),
135    BeginFlush,
136    EndFlush,
137    DeleteVolume,
138    UpdateBorrowed(u64),
139    UpdateMutationsKey(UpdateMutationsKeyV49),
140    CreateInternalDir(u64),
141}
142
143#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
144#[migrate_to_version(MutationV49)]
145pub enum MutationV47 {
146    ObjectStore(ObjectStoreMutationV47),
147    EncryptedObjectStore(Box<[u8]>),
148    Allocator(AllocatorMutationV32),
149    BeginFlush,
150    EndFlush,
151    DeleteVolume,
152    UpdateBorrowed(u64),
153    UpdateMutationsKey(UpdateMutationsKeyV40),
154    CreateInternalDir(u64),
155}
156
157#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
158#[migrate_to_version(MutationV47)]
159pub enum MutationV46 {
160    ObjectStore(ObjectStoreMutationV46),
161    EncryptedObjectStore(Box<[u8]>),
162    Allocator(AllocatorMutationV32),
163    BeginFlush,
164    EndFlush,
165    DeleteVolume,
166    UpdateBorrowed(u64),
167    UpdateMutationsKey(UpdateMutationsKeyV40),
168    CreateInternalDir(u64),
169}
170
171#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
172#[migrate_to_version(MutationV46)]
173pub enum MutationV43 {
174    ObjectStore(ObjectStoreMutationV43),
175    EncryptedObjectStore(Box<[u8]>),
176    Allocator(AllocatorMutationV32),
177    BeginFlush,
178    EndFlush,
179    DeleteVolume,
180    UpdateBorrowed(u64),
181    UpdateMutationsKey(UpdateMutationsKeyV40),
182    CreateInternalDir(u64),
183}
184
185#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
186#[migrate_to_version(MutationV43)]
187pub enum MutationV41 {
188    ObjectStore(ObjectStoreMutationV41),
189    EncryptedObjectStore(Box<[u8]>),
190    Allocator(AllocatorMutationV32),
191    BeginFlush,
192    EndFlush,
193    DeleteVolume,
194    UpdateBorrowed(u64),
195    UpdateMutationsKey(UpdateMutationsKeyV40),
196    CreateInternalDir(u64),
197}
198
199#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
200#[migrate_to_version(MutationV41)]
201pub enum MutationV40 {
202    ObjectStore(ObjectStoreMutationV40),
203    EncryptedObjectStore(Box<[u8]>),
204    Allocator(AllocatorMutationV32),
205    BeginFlush,
206    EndFlush,
207    DeleteVolume,
208    UpdateBorrowed(u64),
209    UpdateMutationsKey(UpdateMutationsKeyV40),
210    CreateInternalDir(u64),
211}
212
213impl Mutation {
214    pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
215        Mutation::ObjectStore(ObjectStoreMutation {
216            item: Item::new(key, value),
217            op: Operation::Insert,
218        })
219    }
220
221    pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
222        Mutation::ObjectStore(ObjectStoreMutation {
223            item: Item::new(key, value),
224            op: Operation::ReplaceOrInsert,
225        })
226    }
227
228    pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
229        Mutation::ObjectStore(ObjectStoreMutation {
230            item: Item::new(key, value),
231            op: Operation::Merge,
232        })
233    }
234
235    pub fn update_mutations_key(key: FxfsKey) -> Self {
236        Mutation::UpdateMutationsKey(key.into())
237    }
238}
239
240// We have custom comparison functions for mutations that just use the key, rather than the key and
241// value that would be used by default so that we can deduplicate and find mutations (see
242// get_object_mutation below).
243pub type ObjectStoreMutation = ObjectStoreMutationV54;
244
245#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
246#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
247pub struct ObjectStoreMutationV54 {
248    pub item: crate::object_store::object_record::ObjectItem,
249    pub op: Operation,
250}
251
252#[derive(Migrate, Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
253#[migrate_to_version(ObjectStoreMutationV54)]
254#[migrate_nodefault]
255#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
256pub struct ObjectStoreMutationV50 {
257    pub item: ObjectItemV50,
258    pub op: OperationV32,
259}
260
261#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
262#[migrate_to_version(ObjectStoreMutationV50)]
263#[migrate_nodefault]
264pub struct ObjectStoreMutationV49 {
265    pub item: ObjectItemV49,
266    pub op: OperationV32,
267}
268
269#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
270#[migrate_to_version(ObjectStoreMutationV49)]
271#[migrate_nodefault]
272pub struct ObjectStoreMutationV47 {
273    pub item: ObjectItemV47,
274    pub op: OperationV32,
275}
276
277#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
278#[migrate_to_version(ObjectStoreMutationV47)]
279#[migrate_nodefault]
280pub struct ObjectStoreMutationV46 {
281    pub item: ObjectItemV46,
282    pub op: OperationV32,
283}
284
285#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
286#[migrate_to_version(ObjectStoreMutationV46)]
287#[migrate_nodefault]
288pub struct ObjectStoreMutationV43 {
289    pub item: ObjectItemV43,
290    pub op: OperationV32,
291}
292
293#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
294#[migrate_to_version(ObjectStoreMutationV43)]
295#[migrate_nodefault]
296pub struct ObjectStoreMutationV41 {
297    pub item: ObjectItemV41,
298    pub op: OperationV32,
299}
300
301#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
302#[migrate_nodefault]
303#[migrate_to_version(ObjectStoreMutationV41)]
304pub struct ObjectStoreMutationV40 {
305    pub item: ObjectItemV40,
306    pub op: OperationV32,
307}
308
309/// The different LSM tree operations that can be performed as part of a mutation.
310pub type Operation = OperationV32;
311
312#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
313#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
314pub enum OperationV32 {
315    Insert,
316    ReplaceOrInsert,
317    Merge,
318}
319
320impl Ord for ObjectStoreMutation {
321    fn cmp(&self, other: &Self) -> Ordering {
322        self.item.key.cmp(&other.item.key)
323    }
324}
325
326impl PartialOrd for ObjectStoreMutation {
327    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
328        Some(self.cmp(other))
329    }
330}
331
332impl PartialEq for ObjectStoreMutation {
333    fn eq(&self, other: &Self) -> bool {
334        self.item.key.eq(&other.item.key)
335    }
336}
337
338impl Eq for ObjectStoreMutation {}
339
340impl Ord for ObjectStoreMutationV50 {
341    fn cmp(&self, other: &Self) -> Ordering {
342        self.item.key.cmp(&other.item.key)
343    }
344}
345
346impl PartialOrd for ObjectStoreMutationV50 {
347    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
348        Some(self.cmp(other))
349    }
350}
351
352impl PartialEq for ObjectStoreMutationV50 {
353    fn eq(&self, other: &Self) -> bool {
354        self.item.key.eq(&other.item.key)
355    }
356}
357
358impl Eq for ObjectStoreMutationV50 {}
359
360impl Ord for AllocatorItem {
361    fn cmp(&self, other: &Self) -> Ordering {
362        self.key.cmp(&other.key)
363    }
364}
365
366impl PartialOrd for AllocatorItem {
367    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
368        Some(self.cmp(other))
369    }
370}
371
372/// Same as std::ops::Range but with Ord and PartialOrd support, sorted first by start of the range,
373/// then by the end.
374pub type DeviceRange = DeviceRangeV32;
375
376#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypeFingerprint)]
377#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
378pub struct DeviceRangeV32(pub Range<u64>);
379
380impl Deref for DeviceRange {
381    type Target = Range<u64>;
382
383    fn deref(&self) -> &Self::Target {
384        &self.0
385    }
386}
387
388impl DerefMut for DeviceRange {
389    fn deref_mut(&mut self) -> &mut Self::Target {
390        &mut self.0
391    }
392}
393
394impl From<Range<u64>> for DeviceRange {
395    fn from(range: Range<u64>) -> Self {
396        Self(range)
397    }
398}
399
400impl Into<Range<u64>> for DeviceRange {
401    fn into(self) -> Range<u64> {
402        self.0
403    }
404}
405
406impl Ord for DeviceRange {
407    fn cmp(&self, other: &Self) -> Ordering {
408        self.start.cmp(&other.start).then(self.end.cmp(&other.end))
409    }
410}
411
412impl PartialOrd for DeviceRange {
413    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
414        Some(self.cmp(other))
415    }
416}
417
418pub type AllocatorMutation = AllocatorMutationV32;
419
420#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint)]
421#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
422pub enum AllocatorMutationV32 {
423    Allocate {
424        device_range: DeviceRangeV32,
425        owner_object_id: u64,
426    },
427    Deallocate {
428        device_range: DeviceRangeV32,
429        owner_object_id: u64,
430    },
431    SetLimit {
432        owner_object_id: u64,
433        bytes: u64,
434    },
435    /// Marks all extents with a given owner_object_id for deletion.
436    /// Used to free space allocated to encrypted ObjectStore where we may not have the key.
437    /// Note that the actual deletion time is undefined so this should never be used where an
438    /// ObjectStore is still in use due to a high risk of corruption. Similarly, owner_object_id
439    /// should never be reused for the same reasons.
440    MarkForDeletion(u64),
441}
442
443pub type UpdateMutationsKey = UpdateMutationsKeyV49;
444
445#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
446pub struct UpdateMutationsKeyV49(pub FxfsKeyV49);
447
448#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
449#[migrate_to_version(UpdateMutationsKeyV49)]
450pub struct UpdateMutationsKeyV40(pub FxfsKeyV40);
451
452impl From<UpdateMutationsKey> for FxfsKey {
453    fn from(outer: UpdateMutationsKey) -> Self {
454        outer.0
455    }
456}
457
458impl From<FxfsKey> for UpdateMutationsKey {
459    fn from(inner: FxfsKey) -> Self {
460        Self(inner)
461    }
462}
463
464#[cfg(fuzz)]
465impl<'a> arbitrary::Arbitrary<'a> for UpdateMutationsKey {
466    fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
467        Ok(UpdateMutationsKey::from(FxfsKey::arbitrary(u).unwrap()))
468    }
469}
470
471impl Ord for UpdateMutationsKey {
472    fn cmp(&self, other: &Self) -> Ordering {
473        (self as *const UpdateMutationsKey).cmp(&(other as *const _))
474    }
475}
476
477impl PartialOrd for UpdateMutationsKey {
478    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
479        Some(self.cmp(other))
480    }
481}
482
483impl Eq for UpdateMutationsKey {}
484
485impl PartialEq for UpdateMutationsKey {
486    fn eq(&self, other: &Self) -> bool {
487        std::ptr::eq(self, other)
488    }
489}
490
491/// When creating a transaction, locks typically need to be held to prevent two or more writers
492/// trying to make conflicting mutations at the same time.  LockKeys are used for this.
493/// NOTE: Ordering is important here!  The lock manager sorts the list of locks in a transaction
494/// to acquire them in a consistent order, but there are special cases for the Filesystem lock and
495/// the Flush lock.
496/// The Filesystem lock is taken by every transaction and is done so first, as part of the TxnGuard.
497/// The Flush lock is taken when we flush an LSM tree (e.g. an object store), and is held for
498/// several transactions.  As such, it must come first in the lock acquisition ordering, so that
499/// other transactions using the Flush lock have the same ordering as in flushing.
500#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Copy)]
501pub enum LockKey {
502    /// Locks the entire filesystem.
503    Filesystem,
504
505    /// Used to lock flushing an object.
506    Flush {
507        object_id: u64,
508    },
509
510    /// Used to lock changes to a particular object attribute (e.g. writes).
511    ObjectAttribute {
512        store_object_id: u64,
513        object_id: u64,
514        attribute_id: u64,
515    },
516
517    /// Used to lock changes to a particular object (e.g. adding a child to a directory).
518    Object {
519        store_object_id: u64,
520        object_id: u64,
521    },
522
523    ProjectId {
524        store_object_id: u64,
525        project_id: u64,
526    },
527
528    /// Used to lock any truncate operations for a file.
529    Truncate {
530        store_object_id: u64,
531        object_id: u64,
532    },
533
534    /// A lock used when getting or creating the internal directory.
535    InternalDirectory {
536        store_object_id: u64,
537    },
538}
539
540impl LockKey {
541    pub const fn object_attribute(store_object_id: u64, object_id: u64, attribute_id: u64) -> Self {
542        LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
543    }
544
545    pub const fn object(store_object_id: u64, object_id: u64) -> Self {
546        LockKey::Object { store_object_id, object_id }
547    }
548
549    pub const fn flush(object_id: u64) -> Self {
550        LockKey::Flush { object_id }
551    }
552
553    pub const fn truncate(store_object_id: u64, object_id: u64) -> Self {
554        LockKey::Truncate { store_object_id, object_id }
555    }
556}
557
558/// A container for holding `LockKey` objects. Can store a single `LockKey` inline.
559#[derive(Clone, Debug)]
560pub enum LockKeys {
561    None,
562    Inline(LockKey),
563    Vec(Vec<LockKey>),
564}
565
566impl LockKeys {
567    pub fn with_capacity(capacity: usize) -> Self {
568        if capacity > 1 { LockKeys::Vec(Vec::with_capacity(capacity)) } else { LockKeys::None }
569    }
570
571    pub fn push(&mut self, key: LockKey) {
572        match self {
573            Self::None => *self = LockKeys::Inline(key),
574            Self::Inline(inline) => {
575                *self = LockKeys::Vec(vec![*inline, key]);
576            }
577            Self::Vec(vec) => vec.push(key),
578        }
579    }
580
581    pub fn truncate(&mut self, len: usize) {
582        match self {
583            Self::None => {}
584            Self::Inline(_) => {
585                if len == 0 {
586                    *self = Self::None;
587                }
588            }
589            Self::Vec(vec) => vec.truncate(len),
590        }
591    }
592
593    fn len(&self) -> usize {
594        match self {
595            Self::None => 0,
596            Self::Inline(_) => 1,
597            Self::Vec(vec) => vec.len(),
598        }
599    }
600
601    fn contains(&self, key: &LockKey) -> bool {
602        match self {
603            Self::None => false,
604            Self::Inline(single) => single == key,
605            Self::Vec(vec) => vec.contains(key),
606        }
607    }
608
609    fn sort_unstable(&mut self) {
610        match self {
611            Self::Vec(vec) => vec.sort_unstable(),
612            _ => {}
613        }
614    }
615
616    fn dedup(&mut self) {
617        match self {
618            Self::Vec(vec) => vec.dedup(),
619            _ => {}
620        }
621    }
622
623    fn iter(&self) -> LockKeysIter<'_> {
624        match self {
625            LockKeys::None => LockKeysIter::None,
626            LockKeys::Inline(key) => LockKeysIter::Inline(key),
627            LockKeys::Vec(keys) => LockKeysIter::Vec(keys.iter()),
628        }
629    }
630}
631
632enum LockKeysIter<'a> {
633    None,
634    Inline(&'a LockKey),
635    Vec(std::slice::Iter<'a, LockKey>),
636}
637
638impl<'a> Iterator for LockKeysIter<'a> {
639    type Item = &'a LockKey;
640    fn next(&mut self) -> Option<Self::Item> {
641        match self {
642            Self::None => None,
643            Self::Inline(inline) => {
644                let next = *inline;
645                *self = Self::None;
646                Some(next)
647            }
648            Self::Vec(vec) => vec.next(),
649        }
650    }
651}
652
653impl Default for LockKeys {
654    fn default() -> Self {
655        LockKeys::None
656    }
657}
658
659#[macro_export]
660macro_rules! lock_keys {
661    () => {
662        $crate::object_store::transaction::LockKeys::None
663    };
664    ($lock_key:expr $(,)?) => {
665        $crate::object_store::transaction::LockKeys::Inline($lock_key)
666    };
667    ($($lock_keys:expr),+ $(,)?) => {
668        $crate::object_store::transaction::LockKeys::Vec(vec![$($lock_keys),+])
669    };
670}
671pub use lock_keys;
672
673/// Mutations in a transaction can be associated with an object so that when mutations are applied,
674/// updates can be applied to in-memory structures. For example, we cache object sizes, so when a
675/// size change is applied, we can update the cached object size.
676pub trait AssociatedObject: Send + Sync {
677    fn will_apply_mutation(&self, _mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
678    }
679}
680
681pub enum AssocObj<'a> {
682    None,
683    Borrowed(&'a dyn AssociatedObject),
684    Owned(Box<dyn AssociatedObject>),
685}
686
687impl AssocObj<'_> {
688    pub fn map<R, F: FnOnce(&dyn AssociatedObject) -> R>(&self, f: F) -> Option<R> {
689        match self {
690            AssocObj::None => None,
691            AssocObj::Borrowed(b) => Some(f(*b)),
692            AssocObj::Owned(o) => Some(f(o.as_ref())),
693        }
694    }
695}
696
697pub struct TxnMutation<'a> {
698    // This, at time of writing, is either the object ID of an object store, or the object ID of the
699    // allocator.  In the case of an object mutation, there's another object ID in the mutation
700    // record that would be for the object actually being changed.
701    pub object_id: u64,
702
703    // The actual mutation.  This gets serialized to the journal.
704    pub mutation: Mutation,
705
706    // An optional associated object for the mutation.  During replay, there will always be no
707    // associated object.
708    pub associated_object: AssocObj<'a>,
709}
710
711// We store TxnMutation in a set, and for that, we only use object_id and mutation and not the
712// associated object or checksum.
713impl Ord for TxnMutation<'_> {
714    fn cmp(&self, other: &Self) -> Ordering {
715        self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
716    }
717}
718
719impl PartialOrd for TxnMutation<'_> {
720    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
721        Some(self.cmp(other))
722    }
723}
724
725impl PartialEq for TxnMutation<'_> {
726    fn eq(&self, other: &Self) -> bool {
727        self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
728    }
729}
730
731impl Eq for TxnMutation<'_> {}
732
733impl std::fmt::Debug for TxnMutation<'_> {
734    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
735        f.debug_struct("TxnMutation")
736            .field("object_id", &self.object_id)
737            .field("mutation", &self.mutation)
738            .finish()
739    }
740}
741
742pub enum MetadataReservation {
743    // The state after a transaction has been dropped.
744    None,
745
746    // Metadata space for this transaction is being borrowed from ObjectManager's metadata
747    // reservation.
748    Borrowed,
749
750    // A metadata reservation was made when the transaction was created.
751    Reservation(Reservation),
752
753    // The metadata space is being _held_ within `allocator_reservation`.
754    Hold(u64),
755}
756
757/// A transaction groups mutation records to be committed as a group.
758pub struct Transaction<'a> {
759    txn_guard: TxnGuard<'a>,
760
761    // The mutations that make up this transaction.
762    mutations: BTreeSet<TxnMutation<'a>>,
763
764    // The locks that this transaction currently holds.
765    txn_locks: LockKeys,
766
767    /// If set, an allocator reservation that should be used for allocations.
768    pub allocator_reservation: Option<&'a Reservation>,
769
770    /// The reservation for the metadata for this transaction.
771    pub metadata_reservation: MetadataReservation,
772
773    // Keep track of objects explicitly created by this transaction. No locks are required for them.
774    // Addressed by (owner_object_id, object_id).
775    new_objects: BTreeSet<(u64, u64)>,
776
777    /// Any data checksums which should be evaluated when replaying this transaction.
778    checksums: Vec<(Range<u64>, Vec<Checksum>, bool)>,
779
780    /// Set if this transaction contains data (i.e. includes any extent mutations).
781    includes_write: bool,
782}
783
784impl<'a> Transaction<'a> {
785    /// Creates a new transaction.  `txn_locks` are read locks that can be upgraded to write locks
786    /// at commit time.
787    pub async fn new(
788        txn_guard: TxnGuard<'a>,
789        options: Options<'a>,
790        txn_locks: LockKeys,
791    ) -> Result<Transaction<'a>, Error> {
792        txn_guard.fs().add_transaction(options.skip_journal_checks).await;
793        let fs = txn_guard.fs().clone();
794        let guard = scopeguard::guard((), |_| fs.sub_transaction());
795        let (metadata_reservation, allocator_reservation, hold) =
796            txn_guard.fs().reservation_for_transaction(options).await?;
797
798        let txn_locks = {
799            let lock_manager = txn_guard.fs().lock_manager();
800            let mut write_guard = lock_manager.txn_lock(txn_locks).await;
801            std::mem::take(&mut write_guard.0.lock_keys)
802        };
803        let mut transaction = Transaction {
804            txn_guard,
805            mutations: BTreeSet::new(),
806            txn_locks,
807            allocator_reservation: None,
808            metadata_reservation,
809            new_objects: BTreeSet::new(),
810            checksums: Vec::new(),
811            includes_write: false,
812        };
813
814        ScopeGuard::into_inner(guard);
815        hold.map(|h| h.forget()); // Transaction takes ownership from here on.
816        transaction.allocator_reservation = allocator_reservation;
817        Ok(transaction)
818    }
819
820    pub fn txn_guard(&self) -> &TxnGuard<'_> {
821        &self.txn_guard
822    }
823
824    pub fn mutations(&self) -> &BTreeSet<TxnMutation<'a>> {
825        &self.mutations
826    }
827
828    pub fn take_mutations(&mut self) -> BTreeSet<TxnMutation<'a>> {
829        self.new_objects.clear();
830        mem::take(&mut self.mutations)
831    }
832
833    /// Adds a mutation to this transaction.  If the mutation already exists, it is replaced and the
834    /// old mutation is returned.
835    pub fn add(&mut self, object_id: u64, mutation: Mutation) -> Option<Mutation> {
836        self.add_with_object(object_id, mutation, AssocObj::None)
837    }
838
839    /// Removes a mutation that matches `mutation`.
840    pub fn remove(&mut self, object_id: u64, mutation: Mutation) {
841        let txn_mutation = TxnMutation { object_id, mutation, associated_object: AssocObj::None };
842        if self.mutations.remove(&txn_mutation) {
843            if let Mutation::ObjectStore(ObjectStoreMutation {
844                item:
845                    ObjectItem {
846                        key: ObjectKey { object_id: new_object_id, data: ObjectKeyData::Object },
847                        ..
848                    },
849                op: Operation::Insert,
850            }) = txn_mutation.mutation
851            {
852                self.new_objects.remove(&(object_id, new_object_id));
853            }
854        }
855    }
856
857    /// Adds a mutation with an associated object. If the mutation already exists, it is replaced
858    /// and the old mutation is returned.
859    pub fn add_with_object(
860        &mut self,
861        object_id: u64,
862        mutation: Mutation,
863        associated_object: AssocObj<'a>,
864    ) -> Option<Mutation> {
865        assert!(object_id != INVALID_OBJECT_ID);
866        if let Mutation::ObjectStore(ObjectStoreMutation {
867            item:
868                Item {
869                    key:
870                        ObjectKey { data: ObjectKeyData::Attribute(_, AttributeKey::Extent(_)), .. },
871                    ..
872                },
873            ..
874        }) = &mutation
875        {
876            self.includes_write = true;
877        }
878        let txn_mutation = TxnMutation { object_id, mutation, associated_object };
879        self.verify_locks(&txn_mutation);
880        self.mutations.replace(txn_mutation).map(|m| m.mutation)
881    }
882
883    pub fn add_checksum(&mut self, range: Range<u64>, checksums: Vec<Checksum>, first_write: bool) {
884        self.checksums.push((range, checksums, first_write));
885    }
886
887    pub fn includes_write(&self) -> bool {
888        self.includes_write
889    }
890
891    pub fn checksums(&self) -> &[(Range<u64>, Vec<Checksum>, bool)] {
892        &self.checksums
893    }
894
895    pub fn take_checksums(&mut self) -> Vec<(Range<u64>, Vec<Checksum>, bool)> {
896        std::mem::replace(&mut self.checksums, Vec::new())
897    }
898
899    fn verify_locks(&mut self, mutation: &TxnMutation<'_>) {
900        // It was considered to change the locks from Vec to BTreeSet since we'll now be searching
901        // through it, but given the small set that these locks usually comprise, it probably isn't
902        // worth it.
903        match mutation {
904            TxnMutation {
905                mutation:
906                    Mutation::ObjectStore {
907                        0: ObjectStoreMutation { item: ObjectItem { key, .. }, op },
908                    },
909                object_id: store_object_id,
910                ..
911            } => {
912                match &key.data {
913                    ObjectKeyData::Attribute(..) => {
914                        // TODO(https://fxbug.dev/42073914): Check lock requirements.
915                    }
916                    ObjectKeyData::Child { .. }
917                    | ObjectKeyData::EncryptedChild(_)
918                    | ObjectKeyData::EncryptedCasefoldChild(_)
919                    | ObjectKeyData::CasefoldChild { .. }
920                    | ObjectKeyData::LegacyCasefoldChild(_) => {
921                        let id = key.object_id;
922                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
923                            && !self.new_objects.contains(&(*store_object_id, id))
924                        {
925                            debug_assert!(
926                                false,
927                                "Not holding required lock for object {id} \
928                                in store {store_object_id}"
929                            );
930                            error!(
931                                "Not holding required lock for object {id} in store \
932                                {store_object_id}"
933                            )
934                        }
935                    }
936                    ObjectKeyData::GraveyardEntry { .. } => {
937                        // TODO(https://fxbug.dev/42073911): Check lock requirements.
938                    }
939                    ObjectKeyData::GraveyardAttributeEntry { .. } => {
940                        // TODO(https://fxbug.dev/122974): Check lock requirements.
941                    }
942                    ObjectKeyData::Keys => {
943                        let id = key.object_id;
944                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
945                            && !self.new_objects.contains(&(*store_object_id, id))
946                        {
947                            debug_assert!(
948                                false,
949                                "Not holding required lock for object {id} \
950                                in store {store_object_id}"
951                            );
952                            error!(
953                                "Not holding required lock for object {id} in store \
954                                {store_object_id}"
955                            )
956                        }
957                    }
958                    ObjectKeyData::Object => match op {
959                        // Insert implies the caller expects no object with which to race
960                        Operation::Insert => {
961                            self.new_objects.insert((*store_object_id, key.object_id));
962                        }
963                        Operation::Merge | Operation::ReplaceOrInsert => {
964                            let id = key.object_id;
965                            if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
966                                && !self.new_objects.contains(&(*store_object_id, id))
967                            {
968                                debug_assert!(
969                                    false,
970                                    "Not holding required lock for object {id} \
971                                    in store {store_object_id}"
972                                );
973                                error!(
974                                    "Not holding required lock for object {id} in store \
975                                    {store_object_id}"
976                                )
977                            }
978                        }
979                    },
980                    ObjectKeyData::Project { project_id, property: ProjectProperty::Limit } => {
981                        if !self.txn_locks.contains(&LockKey::ProjectId {
982                            store_object_id: *store_object_id,
983                            project_id: *project_id,
984                        }) {
985                            debug_assert!(
986                                false,
987                                "Not holding required lock for project limit id {project_id} \
988                                in store {store_object_id}"
989                            );
990                            error!(
991                                "Not holding required lock for project limit id {project_id} in \
992                                store {store_object_id}"
993                            )
994                        }
995                    }
996                    ObjectKeyData::Project { property: ProjectProperty::Usage, .. } => match op {
997                        Operation::Insert | Operation::ReplaceOrInsert => {
998                            panic!(
999                                "Project usage is all handled by merging deltas, no inserts or \
1000                                replacements should be used"
1001                            );
1002                        }
1003                        // Merges are all handled like atomic +/- and serialized by the tree locks.
1004                        Operation::Merge => {}
1005                    },
1006                    ObjectKeyData::ExtendedAttribute { .. } => {
1007                        let id = key.object_id;
1008                        if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
1009                            && !self.new_objects.contains(&(*store_object_id, id))
1010                        {
1011                            debug_assert!(
1012                                false,
1013                                "Not holding required lock for object {id} \
1014                                in store {store_object_id} while mutating extended attribute"
1015                            );
1016                            error!(
1017                                "Not holding required lock for object {id} in store \
1018                                {store_object_id} while mutating extended attribute"
1019                            )
1020                        }
1021                    }
1022                }
1023            }
1024            TxnMutation { mutation: Mutation::DeleteVolume, object_id, .. } => {
1025                if !self.txn_locks.contains(&LockKey::flush(*object_id)) {
1026                    debug_assert!(false, "Not holding required lock for DeleteVolume");
1027                    error!("Not holding required lock for DeleteVolume");
1028                }
1029            }
1030            _ => {}
1031        }
1032    }
1033
1034    /// Returns true if this transaction has no mutations.
1035    pub fn is_empty(&self) -> bool {
1036        self.mutations.is_empty()
1037    }
1038
1039    /// Searches for an existing object mutation within the transaction that has the given key and
1040    /// returns it if found.
1041    pub fn get_object_mutation(
1042        &self,
1043        store_object_id: u64,
1044        key: ObjectKey,
1045    ) -> Option<&ObjectStoreMutation> {
1046        if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
1047            self.mutations.get(&TxnMutation {
1048                object_id: store_object_id,
1049                mutation: Mutation::insert_object(key, ObjectValue::None),
1050                associated_object: AssocObj::None,
1051            })
1052        {
1053            Some(mutation)
1054        } else {
1055            None
1056        }
1057    }
1058
1059    /// Commits a transaction.  If successful, returns the journal offset of the transaction.
1060    pub async fn commit(mut self) -> Result<u64, Error> {
1061        debug!(txn:? = &self; "Commit");
1062        self.txn_guard.fs().clone().commit_transaction(&mut self, &mut |_| {}).await
1063    }
1064
1065    /// Commits and then runs the callback whilst locks are held.  The callback accepts a single
1066    /// parameter which is the journal offset of the transaction.
1067    pub async fn commit_with_callback<R: Send>(
1068        mut self,
1069        f: impl FnOnce(u64) -> R + Send,
1070    ) -> Result<R, Error> {
1071        debug!(txn:? = &self; "Commit");
1072        // It's not possible to pass an FnOnce via a trait without boxing it, but we don't want to
1073        // do that (for performance reasons), hence the reason for the following.
1074        let mut f = Some(f);
1075        let mut result = None;
1076        self.txn_guard
1077            .fs()
1078            .clone()
1079            .commit_transaction(&mut self, &mut |offset| {
1080                result = Some(f.take().unwrap()(offset));
1081            })
1082            .await?;
1083        Ok(result.unwrap())
1084    }
1085
1086    /// Commits the transaction, but allows the transaction to be used again.  The locks are not
1087    /// dropped (but transaction locks will get downgraded to read locks).
1088    pub async fn commit_and_continue(&mut self) -> Result<(), Error> {
1089        debug!(txn:? = self; "Commit");
1090        self.txn_guard.fs().clone().commit_transaction(self, &mut |_| {}).await?;
1091        assert!(self.mutations.is_empty());
1092        self.txn_guard.fs().lock_manager().downgrade_locks(&self.txn_locks);
1093        Ok(())
1094    }
1095}
1096
1097impl Drop for Transaction<'_> {
1098    fn drop(&mut self) {
1099        // Call the filesystem implementation of drop_transaction which should, as a minimum, call
1100        // LockManager's drop_transaction to ensure the locks are released.
1101        debug!(txn:? = &self; "Drop");
1102        self.txn_guard.fs().clone().drop_transaction(self);
1103    }
1104}
1105
1106impl std::fmt::Debug for Transaction<'_> {
1107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1108        f.debug_struct("Transaction")
1109            .field("mutations", &self.mutations)
1110            .field("txn_locks", &self.txn_locks)
1111            .field("reservation", &self.allocator_reservation)
1112            .finish()
1113    }
1114}
1115
1116pub enum BorrowedOrOwned<'a, T> {
1117    Borrowed(&'a T),
1118    Owned(T),
1119}
1120
1121impl<T> Deref for BorrowedOrOwned<'_, T> {
1122    type Target = T;
1123
1124    fn deref(&self) -> &Self::Target {
1125        match self {
1126            BorrowedOrOwned::Borrowed(b) => b,
1127            BorrowedOrOwned::Owned(o) => &o,
1128        }
1129    }
1130}
1131
1132impl<'a, T> From<&'a T> for BorrowedOrOwned<'a, T> {
1133    fn from(value: &'a T) -> Self {
1134        BorrowedOrOwned::Borrowed(value)
1135    }
1136}
1137
1138impl<T> From<T> for BorrowedOrOwned<'_, T> {
1139    fn from(value: T) -> Self {
1140        BorrowedOrOwned::Owned(value)
1141    }
1142}
1143
1144/// LockManager holds the locks that transactions might have taken.  A TransactionManager
1145/// implementation would typically have one of these.
1146///
1147/// Three different kinds of locks are supported.  There are read locks and write locks, which are
1148/// as one would expect.  The third kind of lock is a _transaction_ lock (which is also known as an
1149/// upgradeable read lock).  When first acquired, these block other writes (including other
1150/// transaction locks) but do not block reads.  When it is time to commit a transaction, these locks
1151/// are upgraded to full write locks (without ever dropping the lock) and then dropped after
1152/// committing (unless commit_and_continue is used).  This way, reads are only blocked for the
1153/// shortest possible time.  It follows that write locks should be used sparingly.  Locks are
1154/// granted in order with one exception: when a lock is in the initial _transaction_ lock state
1155/// (LockState::Locked), all read locks are allowed even if there are other tasks waiting for the
1156/// lock.  The reason for this is because we allow read locks to be taken by tasks that have taken a
1157/// _transaction_ lock (i.e. recursion is allowed).  In other cases, such as when a writer is
1158/// waiting and there are only readers, readers will queue up behind the writer.
1159///
1160/// To summarize:
1161///
1162/// +-------------------------+-----------------+----------------+------------------+
1163/// |                         | While read_lock | While txn_lock | While write_lock |
1164/// |                         | is held         | is held        | is held          |
1165/// +-------------------------+-----------------+----------------+------------------+
1166/// | Can acquire read_lock?  | true            | true           | false            |
1167/// +-------------------------+-----------------+----------------+------------------+
1168/// | Can acquire txn_lock?   | true            | false          | false            |
1169/// +-------------------------+-----------------+----------------+------------------+
1170/// | Can acquire write_lock? | false           | false          | false            |
1171/// +-------------------------+-----------------+----------------+------------------+
1172pub struct LockManager {
1173    locks: Mutex<Locks>,
1174}
1175
1176struct Locks {
1177    keys: HashMap<LockKey, LockEntry>,
1178}
1179
1180impl Locks {
1181    fn drop_lock(&mut self, key: LockKey, state: LockState) {
1182        if let Entry::Occupied(mut occupied) = self.keys.entry(key) {
1183            let entry = occupied.get_mut();
1184            let wake = match state {
1185                LockState::ReadLock => {
1186                    entry.read_count -= 1;
1187                    entry.read_count == 0
1188                }
1189                // drop_write_locks currently depends on us treating Locked and WriteLock the same.
1190                LockState::Locked | LockState::WriteLock => {
1191                    entry.state = LockState::ReadLock;
1192                    true
1193                }
1194            };
1195            if wake {
1196                // SAFETY: The lock in `LockManager::locks` is held.
1197                unsafe {
1198                    entry.wake();
1199                }
1200                if entry.can_remove() {
1201                    occupied.remove_entry();
1202                }
1203            }
1204        } else {
1205            unreachable!();
1206        }
1207    }
1208
1209    fn drop_read_locks(&mut self, lock_keys: LockKeys) {
1210        for lock in lock_keys.iter() {
1211            self.drop_lock(*lock, LockState::ReadLock);
1212        }
1213    }
1214
1215    fn drop_write_locks(&mut self, lock_keys: LockKeys) {
1216        for lock in lock_keys.iter() {
1217            // This is a bit hacky, but this works for locks in either the Locked or WriteLock
1218            // states.
1219            self.drop_lock(*lock, LockState::WriteLock);
1220        }
1221    }
1222
1223    // Downgrades locks from WriteLock to Locked.
1224    fn downgrade_locks(&mut self, lock_keys: &LockKeys) {
1225        for lock in lock_keys.iter() {
1226            // SAFETY: The lock in `LockManager::locks` is held.
1227            unsafe {
1228                self.keys.get_mut(lock).unwrap().downgrade_lock();
1229            }
1230        }
1231    }
1232}
1233
1234#[derive(Debug)]
1235struct LockEntry {
1236    // In the states that allow readers (ReadLock, Locked), this count can be non-zero
1237    // to indicate the number of active readers.
1238    read_count: u64,
1239
1240    // The state of the lock (see below).
1241    state: LockState,
1242
1243    // A doubly-linked list of wakers that should be woken when they have been granted the lock.
1244    // New wakers are usually chained on to tail, with the exception being the case where a lock in
1245    // state Locked is to be upgraded to WriteLock, but can't because there are readers.  It might
1246    // be possible to use intrusive-collections in the future.
1247    head: *const LockWaker,
1248    tail: *const LockWaker,
1249}
1250
1251unsafe impl Send for LockEntry {}
1252
1253// Represents a node in the waker list.  It is only safe to access the members wrapped by UnsafeCell
1254// when LockManager's `locks` member is locked.
1255struct LockWaker {
1256    // The next and previous pointers in the doubly-linked list.
1257    next: UnsafeCell<*const LockWaker>,
1258    prev: UnsafeCell<*const LockWaker>,
1259
1260    // Holds the lock key for this waker.  This is required so that we can find the associated
1261    // `LockEntry`.
1262    key: LockKey,
1263
1264    // The underlying waker that should be used to wake the task.
1265    waker: UnsafeCell<WakerState>,
1266
1267    // The target state for this waker.
1268    target_state: LockState,
1269
1270    // True if this is an upgrade.
1271    is_upgrade: bool,
1272
1273    // We need to be pinned because these form part of the linked list.
1274    _pin: PhantomPinned,
1275}
1276
1277enum WakerState {
1278    // This is the initial state before the waker has been first polled.
1279    Pending,
1280
1281    // Once polled, this contains the actual waker.
1282    Registered(Waker),
1283
1284    // The waker has been woken and has been granted the lock.
1285    Woken,
1286}
1287
1288impl WakerState {
1289    fn is_woken(&self) -> bool {
1290        matches!(self, WakerState::Woken)
1291    }
1292}
1293
1294unsafe impl Send for LockWaker {}
1295unsafe impl Sync for LockWaker {}
1296
1297impl LockWaker {
1298    // Waits for the waker to be woken.
1299    async fn wait(&self, manager: &LockManager) {
1300        // We must guard against the future being dropped.
1301        let waker_guard = scopeguard::guard((), |_| {
1302            let mut locks = manager.locks.lock();
1303            // SAFETY: We've acquired the lock.
1304            unsafe {
1305                if (*self.waker.get()).is_woken() {
1306                    // We were woken, but didn't actually run, so we must drop the lock.
1307                    if self.is_upgrade {
1308                        locks.keys.get_mut(&self.key).unwrap().downgrade_lock();
1309                    } else {
1310                        locks.drop_lock(self.key, self.target_state);
1311                    }
1312                } else {
1313                    // We haven't been woken but we've been dropped so we must remove ourself from
1314                    // the waker list.
1315                    locks.keys.get_mut(&self.key).unwrap().remove_waker(self);
1316                }
1317            }
1318        });
1319
1320        poll_fn(|cx| {
1321            let _locks = manager.locks.lock();
1322            // SAFETY: We've acquired the lock.
1323            unsafe {
1324                if (*self.waker.get()).is_woken() {
1325                    Poll::Ready(())
1326                } else {
1327                    *self.waker.get() = WakerState::Registered(cx.waker().clone());
1328                    Poll::Pending
1329                }
1330            }
1331        })
1332        .await;
1333
1334        ScopeGuard::into_inner(waker_guard);
1335    }
1336}
1337
1338#[derive(Copy, Clone, Debug, PartialEq)]
1339enum LockState {
1340    // In this state, there are only readers.
1341    ReadLock,
1342
1343    // This state is used for transactions to lock other writers (including other transactions), but
1344    // it still allows readers.
1345    Locked,
1346
1347    // A writer has exclusive access; all other readers and writers are blocked.
1348    WriteLock,
1349}
1350
1351impl LockManager {
1352    pub fn new() -> Self {
1353        LockManager { locks: Mutex::new(Locks { keys: HashMap::default() }) }
1354    }
1355
1356    /// Acquires the locks.  It is the caller's responsibility to ensure that drop_transaction is
1357    /// called when a transaction is dropped i.e. the filesystem's drop_transaction method should
1358    /// call LockManager's drop_transaction method.
1359    pub async fn txn_lock<'a>(&'a self, lock_keys: LockKeys) -> TransactionLocks<'a> {
1360        TransactionLocks(
1361            debug_assert_not_too_long!(self.lock(lock_keys, LockState::Locked)).right().unwrap(),
1362        )
1363    }
1364
1365    // `state` indicates the kind of lock required.  ReadLock means acquire a read lock.  Locked
1366    // means lock other writers, but still allow readers.  WriteLock means acquire a write lock.
1367    async fn lock<'a>(
1368        &'a self,
1369        mut lock_keys: LockKeys,
1370        target_state: LockState,
1371    ) -> Either<ReadGuard<'a>, WriteGuard<'a>> {
1372        let mut guard = match &target_state {
1373            LockState::ReadLock => Left(ReadGuard {
1374                manager: self.into(),
1375                lock_keys: LockKeys::with_capacity(lock_keys.len()),
1376            }),
1377            LockState::Locked | LockState::WriteLock => Right(WriteGuard {
1378                manager: self.into(),
1379                lock_keys: LockKeys::with_capacity(lock_keys.len()),
1380            }),
1381        };
1382        let guard_keys = match &mut guard {
1383            Left(g) => &mut g.lock_keys,
1384            Right(g) => &mut g.lock_keys,
1385        };
1386        lock_keys.sort_unstable();
1387        lock_keys.dedup();
1388        for lock in lock_keys.iter() {
1389            let lock_waker = None;
1390            pin_mut!(lock_waker);
1391            {
1392                let mut locks = self.locks.lock();
1393                match locks.keys.entry(*lock) {
1394                    Entry::Vacant(vacant) => {
1395                        vacant.insert(LockEntry {
1396                            read_count: if let LockState::ReadLock = target_state {
1397                                guard_keys.push(*lock);
1398                                1
1399                            } else {
1400                                guard_keys.push(*lock);
1401                                0
1402                            },
1403                            state: target_state,
1404                            head: std::ptr::null(),
1405                            tail: std::ptr::null(),
1406                        });
1407                    }
1408                    Entry::Occupied(mut occupied) => {
1409                        let entry = occupied.get_mut();
1410                        // SAFETY: We've acquired the lock.
1411                        if unsafe { entry.is_allowed(target_state, entry.head.is_null()) } {
1412                            if let LockState::ReadLock = target_state {
1413                                entry.read_count += 1;
1414                                guard_keys.push(*lock);
1415                            } else {
1416                                entry.state = target_state;
1417                                guard_keys.push(*lock);
1418                            }
1419                        } else {
1420                            // Initialise a waker and push it on the tail of the list.
1421                            // SAFETY: `lock_waker` isn't used prior to this point.
1422                            unsafe {
1423                                *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1424                                    next: UnsafeCell::new(std::ptr::null()),
1425                                    prev: UnsafeCell::new(entry.tail),
1426                                    key: *lock,
1427                                    waker: UnsafeCell::new(WakerState::Pending),
1428                                    target_state: target_state,
1429                                    is_upgrade: false,
1430                                    _pin: PhantomPinned,
1431                                });
1432                            }
1433                            let waker = (*lock_waker).as_ref().unwrap();
1434                            if entry.tail.is_null() {
1435                                entry.head = waker;
1436                            } else {
1437                                // SAFETY: We've acquired the lock.
1438                                unsafe {
1439                                    *(*entry.tail).next.get() = waker;
1440                                }
1441                            }
1442                            entry.tail = waker;
1443                        }
1444                    }
1445                }
1446            }
1447            if let Some(waker) = &*lock_waker {
1448                waker.wait(self).await;
1449                guard_keys.push(*lock);
1450            }
1451        }
1452        guard
1453    }
1454
1455    /// This should be called by the filesystem's drop_transaction implementation.
1456    pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
1457        let mut locks = self.locks.lock();
1458        locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks));
1459    }
1460
1461    /// Prepares to commit by waiting for readers to finish.
1462    pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
1463        self.commit_prepare_keys(&transaction.txn_locks).await;
1464    }
1465
1466    async fn commit_prepare_keys(&self, lock_keys: &LockKeys) {
1467        for lock in lock_keys.iter() {
1468            let lock_waker = None;
1469            pin_mut!(lock_waker);
1470            {
1471                let mut locks = self.locks.lock();
1472                let entry = locks.keys.get_mut(lock).unwrap();
1473                assert_eq!(entry.state, LockState::Locked);
1474
1475                if entry.read_count == 0 {
1476                    entry.state = LockState::WriteLock;
1477                } else {
1478                    // Initialise a waker and push it on the head of the list.
1479                    // SAFETY: `lock_waker` isn't used prior to this point.
1480                    unsafe {
1481                        *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1482                            next: UnsafeCell::new(entry.head),
1483                            prev: UnsafeCell::new(std::ptr::null()),
1484                            key: *lock,
1485                            waker: UnsafeCell::new(WakerState::Pending),
1486                            target_state: LockState::WriteLock,
1487                            is_upgrade: true,
1488                            _pin: PhantomPinned,
1489                        });
1490                    }
1491                    let waker = (*lock_waker).as_ref().unwrap();
1492                    if entry.head.is_null() {
1493                        entry.tail = (*lock_waker).as_ref().unwrap();
1494                    } else {
1495                        // SAFETY: We've acquired the lock.
1496                        unsafe {
1497                            *(*entry.head).prev.get() = waker;
1498                        }
1499                    }
1500                    entry.head = waker;
1501                }
1502            }
1503
1504            if let Some(waker) = &*lock_waker {
1505                waker.wait(self).await;
1506            }
1507        }
1508    }
1509
1510    /// Acquires a read lock for the given keys.  Read locks are only blocked whilst a transaction
1511    /// is being committed for the same locks.  They are only necessary where consistency is
1512    /// required between different mutations within a transaction.  For example, a write might
1513    /// change the size and extents for an object, in which case a read lock is required so that
1514    /// observed size and extents are seen together or not at all.
1515    pub async fn read_lock<'a>(&'a self, lock_keys: LockKeys) -> ReadGuard<'a> {
1516        debug_assert_not_too_long!(self.lock(lock_keys, LockState::ReadLock)).left().unwrap()
1517    }
1518
1519    /// Acquires a write lock for the given keys.  Write locks provide exclusive access to the
1520    /// requested lock keys.
1521    pub async fn write_lock<'a>(&'a self, lock_keys: LockKeys) -> WriteGuard<'a> {
1522        debug_assert_not_too_long!(self.lock(lock_keys, LockState::WriteLock)).right().unwrap()
1523    }
1524
1525    /// Downgrades locks from the WriteLock state to Locked state.  This will panic if the locks are
1526    /// not in the WriteLock state.
1527    pub fn downgrade_locks(&self, lock_keys: &LockKeys) {
1528        self.locks.lock().downgrade_locks(lock_keys);
1529    }
1530}
1531
1532// These unsafe functions require that `locks` in LockManager is locked.
1533impl LockEntry {
1534    unsafe fn wake(&mut self) {
1535        // If the lock's state is WriteLock, or there's nothing waiting, return early.
1536        if self.head.is_null() || self.state == LockState::WriteLock {
1537            return;
1538        }
1539
1540        let waker = unsafe { &*self.head };
1541
1542        if waker.is_upgrade {
1543            if self.read_count > 0 {
1544                return;
1545            }
1546        } else if !unsafe { self.is_allowed(waker.target_state, true) } {
1547            return;
1548        }
1549
1550        unsafe { self.pop_and_wake() };
1551
1552        // If the waker was a write lock, we can't wake any more up, but otherwise, we can keep
1553        // waking up readers.
1554        if waker.target_state == LockState::WriteLock {
1555            return;
1556        }
1557
1558        while !self.head.is_null() && unsafe { (*self.head).target_state } == LockState::ReadLock {
1559            unsafe { self.pop_and_wake() };
1560        }
1561    }
1562
1563    unsafe fn pop_and_wake(&mut self) {
1564        let waker = unsafe { &*self.head };
1565
1566        // Pop the waker.
1567        self.head = unsafe { *waker.next.get() };
1568        if self.head.is_null() {
1569            self.tail = std::ptr::null()
1570        } else {
1571            unsafe { *(*self.head).prev.get() = std::ptr::null() };
1572        }
1573
1574        // Adjust our state accordingly.
1575        if waker.target_state == LockState::ReadLock {
1576            self.read_count += 1;
1577        } else {
1578            self.state = waker.target_state;
1579        }
1580
1581        // Now wake the task.
1582        if let WakerState::Registered(waker) =
1583            std::mem::replace(unsafe { &mut *waker.waker.get() }, WakerState::Woken)
1584        {
1585            waker.wake();
1586        }
1587    }
1588
1589    fn can_remove(&self) -> bool {
1590        self.state == LockState::ReadLock && self.read_count == 0
1591    }
1592
1593    unsafe fn remove_waker(&mut self, waker: &LockWaker) {
1594        unsafe {
1595            let is_first = (*waker.prev.get()).is_null();
1596            if is_first {
1597                self.head = *waker.next.get();
1598            } else {
1599                *(**waker.prev.get()).next.get() = *waker.next.get();
1600            }
1601            if (*waker.next.get()).is_null() {
1602                self.tail = *waker.prev.get();
1603            } else {
1604                *(**waker.next.get()).prev.get() = *waker.prev.get();
1605            }
1606            if is_first {
1607                // We must call wake in case we erased a pending write lock and readers can now
1608                // proceed.
1609                self.wake();
1610            }
1611        }
1612    }
1613
1614    // Returns whether or not a lock with given `target_state` can proceed.  `is_head` should be
1615    // true if this is something at the head of the waker list (or the waker list is empty) and
1616    // false if there are other items on the waker list that are prior.
1617    unsafe fn is_allowed(&self, target_state: LockState, is_head: bool) -> bool {
1618        match self.state {
1619            LockState::ReadLock => {
1620                // Allow ReadLock and Locked so long as nothing else is waiting.
1621                (self.read_count == 0
1622                    || target_state == LockState::Locked
1623                    || target_state == LockState::ReadLock)
1624                    && is_head
1625            }
1626            LockState::Locked => {
1627                // Always allow reads unless there's an upgrade waiting.  We have to
1628                // always allow reads in this state because tasks that have locks in
1629                // the Locked state can later try and acquire ReadLock.
1630                target_state == LockState::ReadLock
1631                    && (is_head || unsafe { !(*self.head).is_upgrade })
1632            }
1633            LockState::WriteLock => false,
1634        }
1635    }
1636
1637    unsafe fn downgrade_lock(&mut self) {
1638        assert_eq!(std::mem::replace(&mut self.state, LockState::Locked), LockState::WriteLock);
1639        unsafe { self.wake() };
1640    }
1641}
1642
1643#[must_use]
1644pub struct ReadGuard<'a> {
1645    manager: LockManagerRef<'a>,
1646    lock_keys: LockKeys,
1647}
1648
1649impl ReadGuard<'_> {
1650    pub fn fs(&self) -> Option<&Arc<FxFilesystem>> {
1651        if let LockManagerRef::Owned(fs) = &self.manager { Some(fs) } else { None }
1652    }
1653
1654    pub fn into_owned(mut self, fs: Arc<FxFilesystem>) -> ReadGuard<'static> {
1655        ReadGuard {
1656            manager: LockManagerRef::Owned(fs),
1657            lock_keys: std::mem::replace(&mut self.lock_keys, LockKeys::None),
1658        }
1659    }
1660}
1661
1662impl Drop for ReadGuard<'_> {
1663    fn drop(&mut self) {
1664        let mut locks = self.manager.locks.lock();
1665        locks.drop_read_locks(std::mem::take(&mut self.lock_keys));
1666    }
1667}
1668
1669impl fmt::Debug for ReadGuard<'_> {
1670    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1671        f.debug_struct("ReadGuard")
1672            .field("manager", &(&self.manager as *const _))
1673            .field("lock_keys", &self.lock_keys)
1674            .finish()
1675    }
1676}
1677
1678#[must_use]
1679pub struct WriteGuard<'a> {
1680    manager: LockManagerRef<'a>,
1681    lock_keys: LockKeys,
1682}
1683
1684impl Drop for WriteGuard<'_> {
1685    fn drop(&mut self) {
1686        let mut locks = self.manager.locks.lock();
1687        locks.drop_write_locks(std::mem::take(&mut self.lock_keys));
1688    }
1689}
1690
1691impl fmt::Debug for WriteGuard<'_> {
1692    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1693        f.debug_struct("WriteGuard")
1694            .field("manager", &(&self.manager as *const _))
1695            .field("lock_keys", &self.lock_keys)
1696            .finish()
1697    }
1698}
1699
1700enum LockManagerRef<'a> {
1701    Borrowed(&'a LockManager),
1702    Owned(Arc<FxFilesystem>),
1703}
1704
1705impl Deref for LockManagerRef<'_> {
1706    type Target = LockManager;
1707
1708    fn deref(&self) -> &Self::Target {
1709        match self {
1710            LockManagerRef::Borrowed(m) => m,
1711            LockManagerRef::Owned(f) => f.lock_manager(),
1712        }
1713    }
1714}
1715
1716impl<'a> From<&'a LockManager> for LockManagerRef<'a> {
1717    fn from(value: &'a LockManager) -> Self {
1718        LockManagerRef::Borrowed(value)
1719    }
1720}
1721
1722#[cfg(test)]
1723mod tests {
1724    use super::{LockKey, LockKeys, LockManager, LockState, Mutation, Options};
1725    use crate::filesystem::FxFilesystem;
1726    use fuchsia_async as fasync;
1727    use fuchsia_sync::Mutex;
1728    use futures::channel::oneshot::channel;
1729    use futures::future::FutureExt;
1730    use futures::stream::FuturesUnordered;
1731    use futures::{StreamExt, join, pin_mut};
1732    use std::task::Poll;
1733    use std::time::Duration;
1734    use storage_device::DeviceHolder;
1735    use storage_device::fake_device::FakeDevice;
1736
1737    #[fuchsia::test]
1738    async fn test_simple() {
1739        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1740        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1741        let mut t = fs
1742            .clone()
1743            .new_transaction(lock_keys![], Options::default())
1744            .await
1745            .expect("new_transaction failed");
1746        t.add(1, Mutation::BeginFlush);
1747        assert!(!t.is_empty());
1748    }
1749
1750    #[fuchsia::test]
1751    async fn test_locks() {
1752        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1753        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1754        let (send1, recv1) = channel();
1755        let (send2, recv2) = channel();
1756        let (send3, recv3) = channel();
1757        let done = Mutex::new(false);
1758        let mut futures = FuturesUnordered::new();
1759        futures.push(
1760            async {
1761                let _t = fs
1762                    .clone()
1763                    .new_transaction(
1764                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1765                        Options::default(),
1766                    )
1767                    .await
1768                    .expect("new_transaction failed");
1769                send1.send(()).unwrap(); // Tell the next future to continue.
1770                send3.send(()).unwrap(); // Tell the last future to continue.
1771                recv2.await.unwrap();
1772                // This is a halting problem so all we can do is sleep.
1773                fasync::Timer::new(Duration::from_millis(100)).await;
1774                assert!(!*done.lock());
1775            }
1776            .boxed(),
1777        );
1778        futures.push(
1779            async {
1780                recv1.await.unwrap();
1781                // This should not block since it is a different key.
1782                let _t = fs
1783                    .clone()
1784                    .new_transaction(
1785                        lock_keys![LockKey::object_attribute(2, 2, 3)],
1786                        Options::default(),
1787                    )
1788                    .await
1789                    .expect("new_transaction failed");
1790                // Tell the first future to continue.
1791                send2.send(()).unwrap();
1792            }
1793            .boxed(),
1794        );
1795        futures.push(
1796            async {
1797                // This should block until the first future has completed.
1798                recv3.await.unwrap();
1799                let _t = fs
1800                    .clone()
1801                    .new_transaction(
1802                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1803                        Options::default(),
1804                    )
1805                    .await;
1806                *done.lock() = true;
1807            }
1808            .boxed(),
1809        );
1810        while let Some(()) = futures.next().await {}
1811    }
1812
1813    #[fuchsia::test]
1814    async fn test_read_lock_after_write_lock() {
1815        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1816        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1817        let (send1, recv1) = channel();
1818        let (send2, recv2) = channel();
1819        let done = Mutex::new(false);
1820        join!(
1821            async {
1822                let t = fs
1823                    .clone()
1824                    .new_transaction(
1825                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1826                        Options::default(),
1827                    )
1828                    .await
1829                    .expect("new_transaction failed");
1830                send1.send(()).unwrap(); // Tell the next future to continue.
1831                recv2.await.unwrap();
1832                t.commit().await.expect("commit failed");
1833                *done.lock() = true;
1834            },
1835            async {
1836                recv1.await.unwrap();
1837                // Reads should not be blocked until the transaction is committed.
1838                let _guard = fs
1839                    .lock_manager()
1840                    .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1841                    .await;
1842                // Tell the first future to continue.
1843                send2.send(()).unwrap();
1844                // It shouldn't proceed until we release our read lock, but it's a halting
1845                // problem, so sleep.
1846                fasync::Timer::new(Duration::from_millis(100)).await;
1847                assert!(!*done.lock());
1848            },
1849        );
1850    }
1851
1852    #[fuchsia::test]
1853    async fn test_write_lock_after_read_lock() {
1854        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1855        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1856        let (send1, recv1) = channel();
1857        let (send2, recv2) = channel();
1858        let done = Mutex::new(false);
1859        join!(
1860            async {
1861                // Reads should not be blocked until the transaction is committed.
1862                let _guard = fs
1863                    .lock_manager()
1864                    .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1865                    .await;
1866                // Tell the next future to continue and then wait.
1867                send1.send(()).unwrap();
1868                recv2.await.unwrap();
1869                // It shouldn't proceed until we release our read lock, but it's a halting
1870                // problem, so sleep.
1871                fasync::Timer::new(Duration::from_millis(100)).await;
1872                assert!(!*done.lock());
1873            },
1874            async {
1875                recv1.await.unwrap();
1876                let t = fs
1877                    .clone()
1878                    .new_transaction(
1879                        lock_keys![LockKey::object_attribute(1, 2, 3)],
1880                        Options::default(),
1881                    )
1882                    .await
1883                    .expect("new_transaction failed");
1884                send2.send(()).unwrap(); // Tell the first future to continue;
1885                t.commit().await.expect("commit failed");
1886                *done.lock() = true;
1887            },
1888        );
1889    }
1890
1891    #[fuchsia::test]
1892    async fn test_drop_uncommitted_transaction() {
1893        let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1894        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1895        let key = lock_keys![LockKey::object(1, 1)];
1896
1897        // Dropping while there's a reader.
1898        {
1899            let _write_lock = fs
1900                .clone()
1901                .new_transaction(key.clone(), Options::default())
1902                .await
1903                .expect("new_transaction failed");
1904            let _read_lock = fs.lock_manager().read_lock(key.clone()).await;
1905        }
1906        // Dropping while there's no reader.
1907        {
1908            let _write_lock = fs
1909                .clone()
1910                .new_transaction(key.clone(), Options::default())
1911                .await
1912                .expect("new_transaction failed");
1913        }
1914        // Make sure we can take the lock again (i.e. it was actually released).
1915        fs.clone()
1916            .new_transaction(key.clone(), Options::default())
1917            .await
1918            .expect("new_transaction failed");
1919    }
1920
1921    #[fuchsia::test]
1922    async fn test_drop_waiting_write_lock() {
1923        let manager = LockManager::new();
1924        let keys = lock_keys![LockKey::object(1, 1)];
1925        {
1926            let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1927            if let Poll::Ready(_) =
1928                futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1929            {
1930                assert!(false);
1931            }
1932        }
1933        let _ = manager.lock(keys, LockState::WriteLock).await;
1934    }
1935
1936    #[fuchsia::test]
1937    async fn test_write_lock_blocks_everything() {
1938        let manager = LockManager::new();
1939        let keys = lock_keys![LockKey::object(1, 1)];
1940        {
1941            let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1942            if let Poll::Ready(_) =
1943                futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1944            {
1945                assert!(false);
1946            }
1947            if let Poll::Ready(_) =
1948                futures::poll!(manager.lock(keys.clone(), LockState::ReadLock).boxed())
1949            {
1950                assert!(false);
1951            }
1952        }
1953        {
1954            let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1955        }
1956        {
1957            let _guard = manager.lock(keys, LockState::ReadLock).await;
1958        }
1959    }
1960
1961    #[fuchsia::test]
1962    async fn test_downgrade_locks() {
1963        let manager = LockManager::new();
1964        let keys = lock_keys![LockKey::object(1, 1)];
1965        let _guard = manager.txn_lock(keys.clone()).await;
1966        manager.commit_prepare_keys(&keys).await;
1967
1968        // Use FuturesUnordered so that we can check that the waker is woken.
1969        let mut read_lock: FuturesUnordered<_> =
1970            std::iter::once(manager.read_lock(keys.clone())).collect();
1971
1972        // Trying to acquire a read lock now should be blocked.
1973        assert!(futures::poll!(read_lock.next()).is_pending());
1974
1975        manager.downgrade_locks(&keys);
1976
1977        // After downgrading, it should be possible to take a read lock.
1978        assert!(futures::poll!(read_lock.next()).is_ready());
1979    }
1980
1981    #[fuchsia::test]
1982    async fn test_dropped_write_lock_wakes() {
1983        let manager = LockManager::new();
1984        let keys = lock_keys![LockKey::object(1, 1)];
1985        let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1986        let mut read_lock = FuturesUnordered::new();
1987        read_lock.push(manager.lock(keys.clone(), LockState::ReadLock));
1988
1989        {
1990            let write_lock = manager.lock(keys, LockState::WriteLock);
1991            pin_mut!(write_lock);
1992
1993            // The write lock should be blocked because of the read lock.
1994            assert!(futures::poll!(write_lock).is_pending());
1995
1996            // Another read lock should be blocked because of the write lock.
1997            assert!(futures::poll!(read_lock.next()).is_pending());
1998        }
1999
2000        // Dropping the write lock should allow the read lock to proceed.
2001        assert!(futures::poll!(read_lock.next()).is_ready());
2002    }
2003
2004    #[fuchsia::test]
2005    async fn test_drop_upgrade() {
2006        let manager = LockManager::new();
2007        let keys = lock_keys![LockKey::object(1, 1)];
2008        let _guard = manager.lock(keys.clone(), LockState::Locked).await;
2009
2010        {
2011            let commit_prepare = manager.commit_prepare_keys(&keys);
2012            pin_mut!(commit_prepare);
2013            let _read_guard = manager.lock(keys.clone(), LockState::ReadLock).await;
2014            assert!(futures::poll!(commit_prepare).is_pending());
2015
2016            // Now we test dropping read_guard which should wake commit_prepare and
2017            // then dropping commit_prepare.
2018        }
2019
2020        // We should be able to still commit_prepare.
2021        manager.commit_prepare_keys(&keys).await;
2022    }
2023
2024    #[fasync::run_singlethreaded(test)]
2025    async fn test_woken_upgrade_blocks_reads() {
2026        let manager = LockManager::new();
2027        let keys = lock_keys![LockKey::object(1, 1)];
2028        // Start with a transaction lock.
2029        let guard = manager.lock(keys.clone(), LockState::Locked).await;
2030
2031        // Take a read lock.
2032        let read1 = manager.lock(keys.clone(), LockState::ReadLock).await;
2033
2034        // Try and upgrade the transaction lock, which should not be possible because of the read.
2035        let commit_prepare = manager.commit_prepare_keys(&keys);
2036        pin_mut!(commit_prepare);
2037        assert!(futures::poll!(commit_prepare.as_mut()).is_pending());
2038
2039        // Taking another read should also be blocked.
2040        let read2 = manager.lock(keys.clone(), LockState::ReadLock);
2041        pin_mut!(read2);
2042        assert!(futures::poll!(read2.as_mut()).is_pending());
2043
2044        // Drop the first read and the upgrade should complete.
2045        std::mem::drop(read1);
2046        assert!(futures::poll!(commit_prepare).is_ready());
2047
2048        // But the second read should still be blocked.
2049        assert!(futures::poll!(read2.as_mut()).is_pending());
2050
2051        // If we drop the write lock now, the read should be unblocked.
2052        std::mem::drop(guard);
2053        assert!(futures::poll!(read2).is_ready());
2054    }
2055
2056    static LOCK_KEY_1: LockKey = LockKey::flush(1);
2057    static LOCK_KEY_2: LockKey = LockKey::flush(2);
2058    static LOCK_KEY_3: LockKey = LockKey::flush(3);
2059
2060    // The keys, storage method, and capacity must all match.
2061    fn assert_lock_keys_equal(value: &LockKeys, expected: &LockKeys) {
2062        match (value, expected) {
2063            (LockKeys::None, LockKeys::None) => {}
2064            (LockKeys::Inline(key1), LockKeys::Inline(key2)) => {
2065                if key1 != key2 {
2066                    panic!("{key1:?} != {key2:?}");
2067                }
2068            }
2069            (LockKeys::Vec(vec1), LockKeys::Vec(vec2)) => {
2070                if vec1 != vec2 {
2071                    panic!("{vec1:?} != {vec2:?}");
2072                }
2073                if vec1.capacity() != vec2.capacity() {
2074                    panic!(
2075                        "LockKeys have different capacity: {} != {}",
2076                        vec1.capacity(),
2077                        vec2.capacity()
2078                    );
2079                }
2080            }
2081            (_, _) => panic!("{value:?} != {expected:?}"),
2082        }
2083    }
2084
2085    // Only the keys must match. Storage method and capacity don't matter.
2086    fn assert_lock_keys_equivalent(value: &LockKeys, expected: &LockKeys) {
2087        let value: Vec<_> = value.iter().collect();
2088        let expected: Vec<_> = expected.iter().collect();
2089        assert_eq!(value, expected);
2090    }
2091
2092    #[test]
2093    fn test_lock_keys_macro() {
2094        assert_lock_keys_equal(&lock_keys![], &LockKeys::None);
2095        assert_lock_keys_equal(&lock_keys![LOCK_KEY_1], &LockKeys::Inline(LOCK_KEY_1));
2096        assert_lock_keys_equal(
2097            &lock_keys![LOCK_KEY_1, LOCK_KEY_2],
2098            &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]),
2099        );
2100    }
2101
2102    #[test]
2103    fn test_lock_keys_with_capacity() {
2104        assert_lock_keys_equal(&LockKeys::with_capacity(0), &LockKeys::None);
2105        assert_lock_keys_equal(&LockKeys::with_capacity(1), &LockKeys::None);
2106        assert_lock_keys_equal(&LockKeys::with_capacity(2), &LockKeys::Vec(Vec::with_capacity(2)));
2107    }
2108
2109    #[test]
2110    fn test_lock_keys_len() {
2111        assert_eq!(lock_keys![].len(), 0);
2112        assert_eq!(lock_keys![LOCK_KEY_1].len(), 1);
2113        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].len(), 2);
2114    }
2115
2116    #[test]
2117    fn test_lock_keys_contains() {
2118        assert_eq!(lock_keys![].contains(&LOCK_KEY_1), false);
2119        assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_1), true);
2120        assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_2), false);
2121        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_1), true);
2122        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_2), true);
2123        assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_3), false);
2124    }
2125
2126    #[test]
2127    fn test_lock_keys_push() {
2128        let mut keys = lock_keys![];
2129        keys.push(LOCK_KEY_1);
2130        assert_lock_keys_equal(&keys, &LockKeys::Inline(LOCK_KEY_1));
2131        keys.push(LOCK_KEY_2);
2132        assert_lock_keys_equal(&keys, &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]));
2133        keys.push(LOCK_KEY_3);
2134        assert_lock_keys_equivalent(
2135            &keys,
2136            &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2, LOCK_KEY_3]),
2137        );
2138    }
2139
2140    #[test]
2141    fn test_lock_keys_sort_unstable() {
2142        let mut keys = lock_keys![];
2143        keys.sort_unstable();
2144        assert_lock_keys_equal(&keys, &lock_keys![]);
2145
2146        let mut keys = lock_keys![LOCK_KEY_1];
2147        keys.sort_unstable();
2148        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2149
2150        let mut keys = lock_keys![LOCK_KEY_2, LOCK_KEY_1];
2151        keys.sort_unstable();
2152        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2153    }
2154
2155    #[test]
2156    fn test_lock_keys_dedup() {
2157        let mut keys = lock_keys![];
2158        keys.dedup();
2159        assert_lock_keys_equal(&keys, &lock_keys![]);
2160
2161        let mut keys = lock_keys![LOCK_KEY_1];
2162        keys.dedup();
2163        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2164
2165        let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_1];
2166        keys.dedup();
2167        assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2168    }
2169
2170    #[test]
2171    fn test_lock_keys_truncate() {
2172        let mut keys = lock_keys![];
2173        keys.truncate(5);
2174        assert_lock_keys_equal(&keys, &lock_keys![]);
2175        keys.truncate(0);
2176        assert_lock_keys_equal(&keys, &lock_keys![]);
2177
2178        let mut keys = lock_keys![LOCK_KEY_1];
2179        keys.truncate(5);
2180        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2181        keys.truncate(0);
2182        assert_lock_keys_equal(&keys, &lock_keys![]);
2183
2184        let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_2];
2185        keys.truncate(5);
2186        assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2187        keys.truncate(1);
2188        // Although there's only 1 key after truncate the key is not stored inline.
2189        assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2190    }
2191
2192    #[test]
2193    fn test_lock_keys_iter() {
2194        assert_eq!(lock_keys![].iter().collect::<Vec<_>>(), Vec::<&LockKey>::new());
2195
2196        assert_eq!(lock_keys![LOCK_KEY_1].iter().collect::<Vec<_>>(), vec![&LOCK_KEY_1]);
2197
2198        assert_eq!(
2199            lock_keys![LOCK_KEY_1, LOCK_KEY_2].iter().collect::<Vec<_>>(),
2200            vec![&LOCK_KEY_1, &LOCK_KEY_2]
2201        );
2202    }
2203}