1use crate::checksum::Checksum;
6use crate::filesystem::{FxFilesystem, TxnGuard};
7use crate::log::*;
8use crate::lsm_tree::types::Item;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::AttributeKey;
11use crate::object_store::allocator::{AllocatorItem, Reservation};
12use crate::object_store::object_manager::{ObjectManager, reserved_space_from_journal_usage};
13use crate::object_store::object_record::{
14 FxfsKey, FxfsKeyV40, FxfsKeyV49, ObjectItem, ObjectItemV40, ObjectItemV41, ObjectItemV43,
15 ObjectItemV46, ObjectItemV47, ObjectItemV49, ObjectKey, ObjectKeyData, ObjectValue,
16 ProjectProperty,
17};
18use crate::serialized_types::{Migrate, Versioned, migrate_nodefault, migrate_to_version};
19use anyhow::Error;
20use either::{Either, Left, Right};
21use fprint::TypeFingerprint;
22use fuchsia_sync::Mutex;
23use futures::future::poll_fn;
24use futures::pin_mut;
25use rustc_hash::FxHashMap as HashMap;
26use scopeguard::ScopeGuard;
27use serde::{Deserialize, Serialize};
28use std::cell::UnsafeCell;
29use std::cmp::Ordering;
30use std::collections::BTreeSet;
31use std::collections::hash_map::Entry;
32use std::marker::PhantomPinned;
33use std::ops::{Deref, DerefMut, Range};
34use std::sync::Arc;
35use std::task::{Poll, Waker};
36use std::{fmt, mem};
37
38#[derive(Clone, Copy, Default)]
42pub struct Options<'a> {
43 pub skip_journal_checks: bool,
46
47 pub borrow_metadata_space: bool,
52
53 pub allocator_reservation: Option<&'a Reservation>,
58
59 pub txn_guard: Option<&'a TxnGuard<'a>>,
61}
62
63pub const TRANSACTION_MAX_JOURNAL_USAGE: u64 = 24_576;
70pub const TRANSACTION_METADATA_MAX_AMOUNT: u64 =
71 reserved_space_from_journal_usage(TRANSACTION_MAX_JOURNAL_USAGE);
72
73#[must_use]
74pub struct TransactionLocks<'a>(pub WriteGuard<'a>);
75
76pub type Mutation = MutationV49;
81
82#[derive(
83 Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned,
84)]
85#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
86pub enum MutationV49 {
87 ObjectStore(ObjectStoreMutationV49),
88 EncryptedObjectStore(Box<[u8]>),
89 Allocator(AllocatorMutationV32),
90 BeginFlush,
92 EndFlush,
95 DeleteVolume,
97 UpdateBorrowed(u64),
98 UpdateMutationsKey(UpdateMutationsKeyV49),
99 CreateInternalDir(u64),
100}
101
102#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
103#[migrate_to_version(MutationV49)]
104pub enum MutationV47 {
105 ObjectStore(ObjectStoreMutationV47),
106 EncryptedObjectStore(Box<[u8]>),
107 Allocator(AllocatorMutationV32),
108 BeginFlush,
109 EndFlush,
110 DeleteVolume,
111 UpdateBorrowed(u64),
112 UpdateMutationsKey(UpdateMutationsKeyV40),
113 CreateInternalDir(u64),
114}
115
116#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
117#[migrate_to_version(MutationV47)]
118pub enum MutationV46 {
119 ObjectStore(ObjectStoreMutationV46),
120 EncryptedObjectStore(Box<[u8]>),
121 Allocator(AllocatorMutationV32),
122 BeginFlush,
123 EndFlush,
124 DeleteVolume,
125 UpdateBorrowed(u64),
126 UpdateMutationsKey(UpdateMutationsKeyV40),
127 CreateInternalDir(u64),
128}
129
130#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
131#[migrate_to_version(MutationV46)]
132pub enum MutationV43 {
133 ObjectStore(ObjectStoreMutationV43),
134 EncryptedObjectStore(Box<[u8]>),
135 Allocator(AllocatorMutationV32),
136 BeginFlush,
137 EndFlush,
138 DeleteVolume,
139 UpdateBorrowed(u64),
140 UpdateMutationsKey(UpdateMutationsKeyV40),
141 CreateInternalDir(u64),
142}
143
144#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
145#[migrate_to_version(MutationV43)]
146pub enum MutationV41 {
147 ObjectStore(ObjectStoreMutationV41),
148 EncryptedObjectStore(Box<[u8]>),
149 Allocator(AllocatorMutationV32),
150 BeginFlush,
151 EndFlush,
152 DeleteVolume,
153 UpdateBorrowed(u64),
154 UpdateMutationsKey(UpdateMutationsKeyV40),
155 CreateInternalDir(u64),
156}
157
158#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
159#[migrate_to_version(MutationV41)]
160pub enum MutationV40 {
161 ObjectStore(ObjectStoreMutationV40),
162 EncryptedObjectStore(Box<[u8]>),
163 Allocator(AllocatorMutationV32),
164 BeginFlush,
165 EndFlush,
166 DeleteVolume,
167 UpdateBorrowed(u64),
168 UpdateMutationsKey(UpdateMutationsKeyV40),
169 CreateInternalDir(u64),
170}
171
172impl Mutation {
173 pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
174 Mutation::ObjectStore(ObjectStoreMutation {
175 item: Item::new(key, value),
176 op: Operation::Insert,
177 })
178 }
179
180 pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
181 Mutation::ObjectStore(ObjectStoreMutation {
182 item: Item::new(key, value),
183 op: Operation::ReplaceOrInsert,
184 })
185 }
186
187 pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
188 Mutation::ObjectStore(ObjectStoreMutation {
189 item: Item::new(key, value),
190 op: Operation::Merge,
191 })
192 }
193
194 pub fn update_mutations_key(key: FxfsKey) -> Self {
195 Mutation::UpdateMutationsKey(key.into())
196 }
197}
198
199pub type ObjectStoreMutation = ObjectStoreMutationV49;
203
204#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
205#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
206pub struct ObjectStoreMutationV49 {
207 pub item: ObjectItemV49,
208 pub op: OperationV32,
209}
210
211#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
212#[migrate_to_version(ObjectStoreMutationV49)]
213#[migrate_nodefault]
214pub struct ObjectStoreMutationV47 {
215 pub item: ObjectItemV47,
216 pub op: OperationV32,
217}
218
219#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
220#[migrate_to_version(ObjectStoreMutationV47)]
221#[migrate_nodefault]
222pub struct ObjectStoreMutationV46 {
223 pub item: ObjectItemV46,
224 pub op: OperationV32,
225}
226
227#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
228#[migrate_to_version(ObjectStoreMutationV46)]
229#[migrate_nodefault]
230pub struct ObjectStoreMutationV43 {
231 pub item: ObjectItemV43,
232 pub op: OperationV32,
233}
234
235#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
236#[migrate_to_version(ObjectStoreMutationV43)]
237#[migrate_nodefault]
238pub struct ObjectStoreMutationV41 {
239 pub item: ObjectItemV41,
240 pub op: OperationV32,
241}
242
243#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
244#[migrate_nodefault]
245#[migrate_to_version(ObjectStoreMutationV41)]
246pub struct ObjectStoreMutationV40 {
247 pub item: ObjectItemV40,
248 pub op: OperationV32,
249}
250
251pub type Operation = OperationV32;
253
254#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
255#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
256pub enum OperationV32 {
257 Insert,
258 ReplaceOrInsert,
259 Merge,
260}
261
262impl Ord for ObjectStoreMutation {
263 fn cmp(&self, other: &Self) -> Ordering {
264 self.item.key.cmp(&other.item.key)
265 }
266}
267
268impl PartialOrd for ObjectStoreMutation {
269 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
270 Some(self.cmp(other))
271 }
272}
273
274impl PartialEq for ObjectStoreMutation {
275 fn eq(&self, other: &Self) -> bool {
276 self.item.key.eq(&other.item.key)
277 }
278}
279
280impl Eq for ObjectStoreMutation {}
281
282impl Ord for AllocatorItem {
283 fn cmp(&self, other: &Self) -> Ordering {
284 self.key.cmp(&other.key)
285 }
286}
287
288impl PartialOrd for AllocatorItem {
289 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
290 Some(self.cmp(other))
291 }
292}
293
294pub type DeviceRange = DeviceRangeV32;
297
298#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypeFingerprint)]
299#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
300pub struct DeviceRangeV32(pub Range<u64>);
301
302impl Deref for DeviceRange {
303 type Target = Range<u64>;
304
305 fn deref(&self) -> &Self::Target {
306 &self.0
307 }
308}
309
310impl DerefMut for DeviceRange {
311 fn deref_mut(&mut self) -> &mut Self::Target {
312 &mut self.0
313 }
314}
315
316impl From<Range<u64>> for DeviceRange {
317 fn from(range: Range<u64>) -> Self {
318 Self(range)
319 }
320}
321
322impl Into<Range<u64>> for DeviceRange {
323 fn into(self) -> Range<u64> {
324 self.0
325 }
326}
327
328impl Ord for DeviceRange {
329 fn cmp(&self, other: &Self) -> Ordering {
330 self.start.cmp(&other.start).then(self.end.cmp(&other.end))
331 }
332}
333
334impl PartialOrd for DeviceRange {
335 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
336 Some(self.cmp(other))
337 }
338}
339
340pub type AllocatorMutation = AllocatorMutationV32;
341
342#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint)]
343#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
344pub enum AllocatorMutationV32 {
345 Allocate {
346 device_range: DeviceRangeV32,
347 owner_object_id: u64,
348 },
349 Deallocate {
350 device_range: DeviceRangeV32,
351 owner_object_id: u64,
352 },
353 SetLimit {
354 owner_object_id: u64,
355 bytes: u64,
356 },
357 MarkForDeletion(u64),
363}
364
365pub type UpdateMutationsKey = UpdateMutationsKeyV49;
366
367#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
368pub struct UpdateMutationsKeyV49(pub FxfsKeyV49);
369
370#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
371#[migrate_to_version(UpdateMutationsKeyV49)]
372pub struct UpdateMutationsKeyV40(pub FxfsKeyV40);
373
374impl From<UpdateMutationsKey> for FxfsKey {
375 fn from(outer: UpdateMutationsKey) -> Self {
376 outer.0
377 }
378}
379
380impl From<FxfsKey> for UpdateMutationsKey {
381 fn from(inner: FxfsKey) -> Self {
382 Self(inner)
383 }
384}
385
386#[cfg(fuzz)]
387impl<'a> arbitrary::Arbitrary<'a> for UpdateMutationsKey {
388 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
389 Ok(UpdateMutationsKey::from(FxfsKey::arbitrary(u).unwrap()))
390 }
391}
392
393impl Ord for UpdateMutationsKey {
394 fn cmp(&self, other: &Self) -> Ordering {
395 (self as *const UpdateMutationsKey).cmp(&(other as *const _))
396 }
397}
398
399impl PartialOrd for UpdateMutationsKey {
400 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
401 Some(self.cmp(other))
402 }
403}
404
405impl Eq for UpdateMutationsKey {}
406
407impl PartialEq for UpdateMutationsKey {
408 fn eq(&self, other: &Self) -> bool {
409 std::ptr::eq(self, other)
410 }
411}
412
413#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Copy)]
423pub enum LockKey {
424 Filesystem,
426
427 Flush {
429 object_id: u64,
430 },
431
432 ObjectAttribute {
434 store_object_id: u64,
435 object_id: u64,
436 attribute_id: u64,
437 },
438
439 Object {
441 store_object_id: u64,
442 object_id: u64,
443 },
444
445 ProjectId {
446 store_object_id: u64,
447 project_id: u64,
448 },
449
450 Truncate {
452 store_object_id: u64,
453 object_id: u64,
454 },
455}
456
457impl LockKey {
458 pub const fn object_attribute(store_object_id: u64, object_id: u64, attribute_id: u64) -> Self {
459 LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
460 }
461
462 pub const fn object(store_object_id: u64, object_id: u64) -> Self {
463 LockKey::Object { store_object_id, object_id }
464 }
465
466 pub const fn flush(object_id: u64) -> Self {
467 LockKey::Flush { object_id }
468 }
469
470 pub const fn truncate(store_object_id: u64, object_id: u64) -> Self {
471 LockKey::Truncate { store_object_id, object_id }
472 }
473}
474
475#[derive(Clone, Debug)]
477pub enum LockKeys {
478 None,
479 Inline(LockKey),
480 Vec(Vec<LockKey>),
481}
482
483impl LockKeys {
484 pub fn with_capacity(capacity: usize) -> Self {
485 if capacity > 1 { LockKeys::Vec(Vec::with_capacity(capacity)) } else { LockKeys::None }
486 }
487
488 pub fn push(&mut self, key: LockKey) {
489 match self {
490 Self::None => *self = LockKeys::Inline(key),
491 Self::Inline(inline) => {
492 *self = LockKeys::Vec(vec![*inline, key]);
493 }
494 Self::Vec(vec) => vec.push(key),
495 }
496 }
497
498 pub fn truncate(&mut self, len: usize) {
499 match self {
500 Self::None => {}
501 Self::Inline(_) => {
502 if len == 0 {
503 *self = Self::None;
504 }
505 }
506 Self::Vec(vec) => vec.truncate(len),
507 }
508 }
509
510 fn len(&self) -> usize {
511 match self {
512 Self::None => 0,
513 Self::Inline(_) => 1,
514 Self::Vec(vec) => vec.len(),
515 }
516 }
517
518 fn contains(&self, key: &LockKey) -> bool {
519 match self {
520 Self::None => false,
521 Self::Inline(single) => single == key,
522 Self::Vec(vec) => vec.contains(key),
523 }
524 }
525
526 fn sort_unstable(&mut self) {
527 match self {
528 Self::Vec(vec) => vec.sort_unstable(),
529 _ => {}
530 }
531 }
532
533 fn dedup(&mut self) {
534 match self {
535 Self::Vec(vec) => vec.dedup(),
536 _ => {}
537 }
538 }
539
540 fn iter(&self) -> LockKeysIter<'_> {
541 match self {
542 LockKeys::None => LockKeysIter::None,
543 LockKeys::Inline(key) => LockKeysIter::Inline(key),
544 LockKeys::Vec(keys) => LockKeysIter::Vec(keys.into_iter()),
545 }
546 }
547}
548
549enum LockKeysIter<'a> {
550 None,
551 Inline(&'a LockKey),
552 Vec(<&'a Vec<LockKey> as IntoIterator>::IntoIter),
553}
554
555impl<'a> Iterator for LockKeysIter<'a> {
556 type Item = &'a LockKey;
557 fn next(&mut self) -> Option<Self::Item> {
558 match self {
559 Self::None => None,
560 Self::Inline(inline) => {
561 let next = *inline;
562 *self = Self::None;
563 Some(next)
564 }
565 Self::Vec(vec) => vec.next(),
566 }
567 }
568}
569
570impl Default for LockKeys {
571 fn default() -> Self {
572 LockKeys::None
573 }
574}
575
576#[macro_export]
577macro_rules! lock_keys {
578 () => {
579 $crate::object_store::transaction::LockKeys::None
580 };
581 ($lock_key:expr $(,)?) => {
582 $crate::object_store::transaction::LockKeys::Inline($lock_key)
583 };
584 ($($lock_keys:expr),+ $(,)?) => {
585 $crate::object_store::transaction::LockKeys::Vec(vec![$($lock_keys),+])
586 };
587}
588pub use lock_keys;
589
590pub trait AssociatedObject: Send + Sync {
594 fn will_apply_mutation(&self, _mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
595 }
596}
597
598pub enum AssocObj<'a> {
599 None,
600 Borrowed(&'a dyn AssociatedObject),
601 Owned(Box<dyn AssociatedObject>),
602}
603
604impl AssocObj<'_> {
605 pub fn map<R, F: FnOnce(&dyn AssociatedObject) -> R>(&self, f: F) -> Option<R> {
606 match self {
607 AssocObj::None => None,
608 AssocObj::Borrowed(b) => Some(f(*b)),
609 AssocObj::Owned(o) => Some(f(o.as_ref())),
610 }
611 }
612}
613
614pub struct TxnMutation<'a> {
615 pub object_id: u64,
619
620 pub mutation: Mutation,
622
623 pub associated_object: AssocObj<'a>,
626}
627
628impl Ord for TxnMutation<'_> {
631 fn cmp(&self, other: &Self) -> Ordering {
632 self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
633 }
634}
635
636impl PartialOrd for TxnMutation<'_> {
637 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
638 Some(self.cmp(other))
639 }
640}
641
642impl PartialEq for TxnMutation<'_> {
643 fn eq(&self, other: &Self) -> bool {
644 self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
645 }
646}
647
648impl Eq for TxnMutation<'_> {}
649
650impl std::fmt::Debug for TxnMutation<'_> {
651 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
652 f.debug_struct("TxnMutation")
653 .field("object_id", &self.object_id)
654 .field("mutation", &self.mutation)
655 .finish()
656 }
657}
658
659pub enum MetadataReservation {
660 None,
662
663 Borrowed,
666
667 Reservation(Reservation),
669
670 Hold(u64),
672}
673
674pub struct Transaction<'a> {
676 txn_guard: TxnGuard<'a>,
677
678 mutations: BTreeSet<TxnMutation<'a>>,
680
681 txn_locks: LockKeys,
683
684 pub allocator_reservation: Option<&'a Reservation>,
686
687 pub metadata_reservation: MetadataReservation,
689
690 new_objects: BTreeSet<(u64, u64)>,
693
694 checksums: Vec<(Range<u64>, Vec<Checksum>, bool)>,
696
697 includes_write: bool,
699}
700
701impl<'a> Transaction<'a> {
702 pub async fn new(
705 txn_guard: TxnGuard<'a>,
706 options: Options<'a>,
707 txn_locks: LockKeys,
708 ) -> Result<Transaction<'a>, Error> {
709 txn_guard.fs().add_transaction(options.skip_journal_checks).await;
710 let fs = txn_guard.fs().clone();
711 let guard = scopeguard::guard((), |_| fs.sub_transaction());
712 let (metadata_reservation, allocator_reservation, hold) =
713 txn_guard.fs().reservation_for_transaction(options).await?;
714
715 let txn_locks = {
716 let lock_manager = txn_guard.fs().lock_manager();
717 let mut write_guard = lock_manager.txn_lock(txn_locks).await;
718 std::mem::take(&mut write_guard.0.lock_keys)
719 };
720 let mut transaction = Transaction {
721 txn_guard,
722 mutations: BTreeSet::new(),
723 txn_locks,
724 allocator_reservation: None,
725 metadata_reservation,
726 new_objects: BTreeSet::new(),
727 checksums: Vec::new(),
728 includes_write: false,
729 };
730
731 ScopeGuard::into_inner(guard);
732 hold.map(|h| h.forget()); transaction.allocator_reservation = allocator_reservation;
734 Ok(transaction)
735 }
736
737 pub fn txn_guard(&self) -> &TxnGuard<'_> {
738 &self.txn_guard
739 }
740
741 pub fn mutations(&self) -> &BTreeSet<TxnMutation<'a>> {
742 &self.mutations
743 }
744
745 pub fn take_mutations(&mut self) -> BTreeSet<TxnMutation<'a>> {
746 self.new_objects.clear();
747 mem::take(&mut self.mutations)
748 }
749
750 pub fn add(&mut self, object_id: u64, mutation: Mutation) -> Option<Mutation> {
753 self.add_with_object(object_id, mutation, AssocObj::None)
754 }
755
756 pub fn remove(&mut self, object_id: u64, mutation: Mutation) {
758 let txn_mutation = TxnMutation { object_id, mutation, associated_object: AssocObj::None };
759 if self.mutations.remove(&txn_mutation) {
760 if let Mutation::ObjectStore(ObjectStoreMutation {
761 item:
762 ObjectItem {
763 key: ObjectKey { object_id: new_object_id, data: ObjectKeyData::Object },
764 ..
765 },
766 op: Operation::Insert,
767 }) = txn_mutation.mutation
768 {
769 self.new_objects.remove(&(object_id, new_object_id));
770 }
771 }
772 }
773
774 pub fn add_with_object(
777 &mut self,
778 object_id: u64,
779 mutation: Mutation,
780 associated_object: AssocObj<'a>,
781 ) -> Option<Mutation> {
782 assert!(object_id != INVALID_OBJECT_ID);
783 if let Mutation::ObjectStore(ObjectStoreMutation {
784 item:
785 Item {
786 key:
787 ObjectKey { data: ObjectKeyData::Attribute(_, AttributeKey::Extent(_)), .. },
788 ..
789 },
790 ..
791 }) = &mutation
792 {
793 self.includes_write = true;
794 }
795 let txn_mutation = TxnMutation { object_id, mutation, associated_object };
796 self.verify_locks(&txn_mutation);
797 self.mutations.replace(txn_mutation).map(|m| m.mutation)
798 }
799
800 pub fn add_checksum(&mut self, range: Range<u64>, checksums: Vec<Checksum>, first_write: bool) {
801 self.checksums.push((range, checksums, first_write));
802 }
803
804 pub fn includes_write(&self) -> bool {
805 self.includes_write
806 }
807
808 pub fn checksums(&self) -> &[(Range<u64>, Vec<Checksum>, bool)] {
809 &self.checksums
810 }
811
812 pub fn take_checksums(&mut self) -> Vec<(Range<u64>, Vec<Checksum>, bool)> {
813 std::mem::replace(&mut self.checksums, Vec::new())
814 }
815
816 fn verify_locks(&mut self, mutation: &TxnMutation<'_>) {
817 match mutation {
821 TxnMutation {
822 mutation:
823 Mutation::ObjectStore {
824 0: ObjectStoreMutation { item: ObjectItem { key, .. }, op },
825 },
826 object_id: store_object_id,
827 ..
828 } => {
829 match &key.data {
830 ObjectKeyData::Attribute(..) => {
831 }
833 ObjectKeyData::Child { .. }
834 | ObjectKeyData::EncryptedChild { .. }
835 | ObjectKeyData::CasefoldChild { .. } => {
836 let id = key.object_id;
837 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
838 && !self.new_objects.contains(&(*store_object_id, id))
839 {
840 debug_assert!(
841 false,
842 "Not holding required lock for object {id} \
843 in store {store_object_id}"
844 );
845 error!(
846 "Not holding required lock for object {id} in store \
847 {store_object_id}"
848 )
849 }
850 }
851 ObjectKeyData::GraveyardEntry { .. } => {
852 }
854 ObjectKeyData::GraveyardAttributeEntry { .. } => {
855 }
857 ObjectKeyData::Keys => {
858 let id = key.object_id;
859 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
860 && !self.new_objects.contains(&(*store_object_id, id))
861 {
862 debug_assert!(
863 false,
864 "Not holding required lock for object {id} \
865 in store {store_object_id}"
866 );
867 error!(
868 "Not holding required lock for object {id} in store \
869 {store_object_id}"
870 )
871 }
872 }
873 ObjectKeyData::Object => match op {
874 Operation::Insert => {
876 self.new_objects.insert((*store_object_id, key.object_id));
877 }
878 Operation::Merge | Operation::ReplaceOrInsert => {
879 let id = key.object_id;
880 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
881 && !self.new_objects.contains(&(*store_object_id, id))
882 {
883 debug_assert!(
884 false,
885 "Not holding required lock for object {id} \
886 in store {store_object_id}"
887 );
888 error!(
889 "Not holding required lock for object {id} in store \
890 {store_object_id}"
891 )
892 }
893 }
894 },
895 ObjectKeyData::Project { project_id, property: ProjectProperty::Limit } => {
896 if !self.txn_locks.contains(&LockKey::ProjectId {
897 store_object_id: *store_object_id,
898 project_id: *project_id,
899 }) {
900 debug_assert!(
901 false,
902 "Not holding required lock for project limit id {project_id} \
903 in store {store_object_id}"
904 );
905 error!(
906 "Not holding required lock for project limit id {project_id} in \
907 store {store_object_id}"
908 )
909 }
910 }
911 ObjectKeyData::Project { property: ProjectProperty::Usage, .. } => match op {
912 Operation::Insert | Operation::ReplaceOrInsert => {
913 panic!(
914 "Project usage is all handled by merging deltas, no inserts or \
915 replacements should be used"
916 );
917 }
918 Operation::Merge => {}
920 },
921 ObjectKeyData::ExtendedAttribute { .. } => {
922 let id = key.object_id;
923 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
924 && !self.new_objects.contains(&(*store_object_id, id))
925 {
926 debug_assert!(
927 false,
928 "Not holding required lock for object {id} \
929 in store {store_object_id} while mutating extended attribute"
930 );
931 error!(
932 "Not holding required lock for object {id} in store \
933 {store_object_id} while mutating extended attribute"
934 )
935 }
936 }
937 }
938 }
939 TxnMutation { mutation: Mutation::DeleteVolume, object_id, .. } => {
940 if !self.txn_locks.contains(&LockKey::flush(*object_id)) {
941 debug_assert!(false, "Not holding required lock for DeleteVolume");
942 error!("Not holding required lock for DeleteVolume");
943 }
944 }
945 _ => {}
946 }
947 }
948
949 pub fn is_empty(&self) -> bool {
951 self.mutations.is_empty()
952 }
953
954 pub fn get_object_mutation(
957 &self,
958 store_object_id: u64,
959 key: ObjectKey,
960 ) -> Option<&ObjectStoreMutation> {
961 if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
962 self.mutations.get(&TxnMutation {
963 object_id: store_object_id,
964 mutation: Mutation::insert_object(key, ObjectValue::None),
965 associated_object: AssocObj::None,
966 })
967 {
968 Some(mutation)
969 } else {
970 None
971 }
972 }
973
974 pub async fn commit(mut self) -> Result<u64, Error> {
976 debug!(txn:? = &self; "Commit");
977 self.txn_guard.fs().clone().commit_transaction(&mut self, &mut |_| {}).await
978 }
979
980 pub async fn commit_with_callback<R: Send>(
983 mut self,
984 f: impl FnOnce(u64) -> R + Send,
985 ) -> Result<R, Error> {
986 debug!(txn:? = &self; "Commit");
987 let mut f = Some(f);
990 let mut result = None;
991 self.txn_guard
992 .fs()
993 .clone()
994 .commit_transaction(&mut self, &mut |offset| {
995 result = Some(f.take().unwrap()(offset));
996 })
997 .await?;
998 Ok(result.unwrap())
999 }
1000
1001 pub async fn commit_and_continue(&mut self) -> Result<(), Error> {
1004 debug!(txn:? = self; "Commit");
1005 self.txn_guard.fs().clone().commit_transaction(self, &mut |_| {}).await?;
1006 assert!(self.mutations.is_empty());
1007 self.txn_guard.fs().lock_manager().downgrade_locks(&self.txn_locks);
1008 Ok(())
1009 }
1010}
1011
1012impl Drop for Transaction<'_> {
1013 fn drop(&mut self) {
1014 debug!(txn:? = &self; "Drop");
1017 self.txn_guard.fs().clone().drop_transaction(self);
1018 }
1019}
1020
1021impl std::fmt::Debug for Transaction<'_> {
1022 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1023 f.debug_struct("Transaction")
1024 .field("mutations", &self.mutations)
1025 .field("txn_locks", &self.txn_locks)
1026 .field("reservation", &self.allocator_reservation)
1027 .finish()
1028 }
1029}
1030
1031pub enum BorrowedOrOwned<'a, T> {
1032 Borrowed(&'a T),
1033 Owned(T),
1034}
1035
1036impl<T> Deref for BorrowedOrOwned<'_, T> {
1037 type Target = T;
1038
1039 fn deref(&self) -> &Self::Target {
1040 match self {
1041 BorrowedOrOwned::Borrowed(b) => b,
1042 BorrowedOrOwned::Owned(o) => &o,
1043 }
1044 }
1045}
1046
1047impl<'a, T> From<&'a T> for BorrowedOrOwned<'a, T> {
1048 fn from(value: &'a T) -> Self {
1049 BorrowedOrOwned::Borrowed(value)
1050 }
1051}
1052
1053impl<T> From<T> for BorrowedOrOwned<'_, T> {
1054 fn from(value: T) -> Self {
1055 BorrowedOrOwned::Owned(value)
1056 }
1057}
1058
1059pub struct LockManager {
1088 locks: Mutex<Locks>,
1089}
1090
1091struct Locks {
1092 keys: HashMap<LockKey, LockEntry>,
1093}
1094
1095impl Locks {
1096 fn drop_lock(&mut self, key: LockKey, state: LockState) {
1097 if let Entry::Occupied(mut occupied) = self.keys.entry(key) {
1098 let entry = occupied.get_mut();
1099 let wake = match state {
1100 LockState::ReadLock => {
1101 entry.read_count -= 1;
1102 entry.read_count == 0
1103 }
1104 LockState::Locked | LockState::WriteLock => {
1106 entry.state = LockState::ReadLock;
1107 true
1108 }
1109 };
1110 if wake {
1111 unsafe {
1113 entry.wake();
1114 }
1115 if entry.can_remove() {
1116 occupied.remove_entry();
1117 }
1118 }
1119 } else {
1120 unreachable!();
1121 }
1122 }
1123
1124 fn drop_read_locks(&mut self, lock_keys: LockKeys) {
1125 for lock in lock_keys.iter() {
1126 self.drop_lock(*lock, LockState::ReadLock);
1127 }
1128 }
1129
1130 fn drop_write_locks(&mut self, lock_keys: LockKeys) {
1131 for lock in lock_keys.iter() {
1132 self.drop_lock(*lock, LockState::WriteLock);
1135 }
1136 }
1137
1138 fn downgrade_locks(&mut self, lock_keys: &LockKeys) {
1140 for lock in lock_keys.iter() {
1141 unsafe {
1143 self.keys.get_mut(lock).unwrap().downgrade_lock();
1144 }
1145 }
1146 }
1147}
1148
1149#[derive(Debug)]
1150struct LockEntry {
1151 read_count: u64,
1154
1155 state: LockState,
1157
1158 head: *const LockWaker,
1163 tail: *const LockWaker,
1164}
1165
1166unsafe impl Send for LockEntry {}
1167
1168struct LockWaker {
1171 next: UnsafeCell<*const LockWaker>,
1173 prev: UnsafeCell<*const LockWaker>,
1174
1175 key: LockKey,
1178
1179 waker: UnsafeCell<WakerState>,
1181
1182 target_state: LockState,
1184
1185 is_upgrade: bool,
1187
1188 _pin: PhantomPinned,
1190}
1191
1192enum WakerState {
1193 Pending,
1195
1196 Registered(Waker),
1198
1199 Woken,
1201}
1202
1203impl WakerState {
1204 fn is_woken(&self) -> bool {
1205 matches!(self, WakerState::Woken)
1206 }
1207}
1208
1209unsafe impl Send for LockWaker {}
1210unsafe impl Sync for LockWaker {}
1211
1212impl LockWaker {
1213 async fn wait(&self, manager: &LockManager) {
1215 let waker_guard = scopeguard::guard((), |_| {
1217 let mut locks = manager.locks.lock();
1218 unsafe {
1220 if (*self.waker.get()).is_woken() {
1221 if self.is_upgrade {
1223 locks.keys.get_mut(&self.key).unwrap().downgrade_lock();
1224 } else {
1225 locks.drop_lock(self.key, self.target_state);
1226 }
1227 } else {
1228 locks.keys.get_mut(&self.key).unwrap().remove_waker(self);
1231 }
1232 }
1233 });
1234
1235 poll_fn(|cx| {
1236 let _locks = manager.locks.lock();
1237 unsafe {
1239 if (*self.waker.get()).is_woken() {
1240 Poll::Ready(())
1241 } else {
1242 *self.waker.get() = WakerState::Registered(cx.waker().clone());
1243 Poll::Pending
1244 }
1245 }
1246 })
1247 .await;
1248
1249 ScopeGuard::into_inner(waker_guard);
1250 }
1251}
1252
1253#[derive(Copy, Clone, Debug, PartialEq)]
1254enum LockState {
1255 ReadLock,
1257
1258 Locked,
1261
1262 WriteLock,
1264}
1265
1266impl LockManager {
1267 pub fn new() -> Self {
1268 LockManager { locks: Mutex::new(Locks { keys: HashMap::default() }) }
1269 }
1270
1271 pub async fn txn_lock<'a>(&'a self, lock_keys: LockKeys) -> TransactionLocks<'a> {
1275 TransactionLocks(
1276 debug_assert_not_too_long!(self.lock(lock_keys, LockState::Locked)).right().unwrap(),
1277 )
1278 }
1279
1280 async fn lock<'a>(
1283 &'a self,
1284 mut lock_keys: LockKeys,
1285 target_state: LockState,
1286 ) -> Either<ReadGuard<'a>, WriteGuard<'a>> {
1287 let mut guard = match &target_state {
1288 LockState::ReadLock => Left(ReadGuard {
1289 manager: self.into(),
1290 lock_keys: LockKeys::with_capacity(lock_keys.len()),
1291 }),
1292 LockState::Locked | LockState::WriteLock => Right(WriteGuard {
1293 manager: self.into(),
1294 lock_keys: LockKeys::with_capacity(lock_keys.len()),
1295 }),
1296 };
1297 let guard_keys = match &mut guard {
1298 Left(g) => &mut g.lock_keys,
1299 Right(g) => &mut g.lock_keys,
1300 };
1301 lock_keys.sort_unstable();
1302 lock_keys.dedup();
1303 for lock in lock_keys.iter() {
1304 let lock_waker = None;
1305 pin_mut!(lock_waker);
1306 {
1307 let mut locks = self.locks.lock();
1308 match locks.keys.entry(*lock) {
1309 Entry::Vacant(vacant) => {
1310 vacant.insert(LockEntry {
1311 read_count: if let LockState::ReadLock = target_state {
1312 guard_keys.push(*lock);
1313 1
1314 } else {
1315 guard_keys.push(*lock);
1316 0
1317 },
1318 state: target_state,
1319 head: std::ptr::null(),
1320 tail: std::ptr::null(),
1321 });
1322 }
1323 Entry::Occupied(mut occupied) => {
1324 let entry = occupied.get_mut();
1325 if unsafe { entry.is_allowed(target_state, entry.head.is_null()) } {
1327 if let LockState::ReadLock = target_state {
1328 entry.read_count += 1;
1329 guard_keys.push(*lock);
1330 } else {
1331 entry.state = target_state;
1332 guard_keys.push(*lock);
1333 }
1334 } else {
1335 unsafe {
1338 *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1339 next: UnsafeCell::new(std::ptr::null()),
1340 prev: UnsafeCell::new(entry.tail),
1341 key: *lock,
1342 waker: UnsafeCell::new(WakerState::Pending),
1343 target_state: target_state,
1344 is_upgrade: false,
1345 _pin: PhantomPinned,
1346 });
1347 }
1348 let waker = (*lock_waker).as_ref().unwrap();
1349 if entry.tail.is_null() {
1350 entry.head = waker;
1351 } else {
1352 unsafe {
1354 *(*entry.tail).next.get() = waker;
1355 }
1356 }
1357 entry.tail = waker;
1358 }
1359 }
1360 }
1361 }
1362 if let Some(waker) = &*lock_waker {
1363 waker.wait(self).await;
1364 guard_keys.push(*lock);
1365 }
1366 }
1367 guard
1368 }
1369
1370 pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
1372 let mut locks = self.locks.lock();
1373 locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks));
1374 }
1375
1376 pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
1378 self.commit_prepare_keys(&transaction.txn_locks).await;
1379 }
1380
1381 async fn commit_prepare_keys(&self, lock_keys: &LockKeys) {
1382 for lock in lock_keys.iter() {
1383 let lock_waker = None;
1384 pin_mut!(lock_waker);
1385 {
1386 let mut locks = self.locks.lock();
1387 let entry = locks.keys.get_mut(lock).unwrap();
1388 assert_eq!(entry.state, LockState::Locked);
1389
1390 if entry.read_count == 0 {
1391 entry.state = LockState::WriteLock;
1392 } else {
1393 unsafe {
1396 *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1397 next: UnsafeCell::new(entry.head),
1398 prev: UnsafeCell::new(std::ptr::null()),
1399 key: *lock,
1400 waker: UnsafeCell::new(WakerState::Pending),
1401 target_state: LockState::WriteLock,
1402 is_upgrade: true,
1403 _pin: PhantomPinned,
1404 });
1405 }
1406 let waker = (*lock_waker).as_ref().unwrap();
1407 if entry.head.is_null() {
1408 entry.tail = (*lock_waker).as_ref().unwrap();
1409 } else {
1410 unsafe {
1412 *(*entry.head).prev.get() = waker;
1413 }
1414 }
1415 entry.head = waker;
1416 }
1417 }
1418
1419 if let Some(waker) = &*lock_waker {
1420 waker.wait(self).await;
1421 }
1422 }
1423 }
1424
1425 pub async fn read_lock<'a>(&'a self, lock_keys: LockKeys) -> ReadGuard<'a> {
1431 debug_assert_not_too_long!(self.lock(lock_keys, LockState::ReadLock)).left().unwrap()
1432 }
1433
1434 pub async fn write_lock<'a>(&'a self, lock_keys: LockKeys) -> WriteGuard<'a> {
1437 debug_assert_not_too_long!(self.lock(lock_keys, LockState::WriteLock)).right().unwrap()
1438 }
1439
1440 pub fn downgrade_locks(&self, lock_keys: &LockKeys) {
1443 self.locks.lock().downgrade_locks(lock_keys);
1444 }
1445}
1446
1447impl LockEntry {
1449 unsafe fn wake(&mut self) {
1450 if self.head.is_null() || self.state == LockState::WriteLock {
1452 return;
1453 }
1454
1455 let waker = unsafe { &*self.head };
1456
1457 if waker.is_upgrade {
1458 if self.read_count > 0 {
1459 return;
1460 }
1461 } else if !unsafe { self.is_allowed(waker.target_state, true) } {
1462 return;
1463 }
1464
1465 unsafe { self.pop_and_wake() };
1466
1467 if waker.target_state == LockState::WriteLock {
1470 return;
1471 }
1472
1473 while !self.head.is_null() && unsafe { (*self.head).target_state } == LockState::ReadLock {
1474 unsafe { self.pop_and_wake() };
1475 }
1476 }
1477
1478 unsafe fn pop_and_wake(&mut self) {
1479 let waker = unsafe { &*self.head };
1480
1481 self.head = unsafe { *waker.next.get() };
1483 if self.head.is_null() {
1484 self.tail = std::ptr::null()
1485 } else {
1486 unsafe { *(*self.head).prev.get() = std::ptr::null() };
1487 }
1488
1489 if waker.target_state == LockState::ReadLock {
1491 self.read_count += 1;
1492 } else {
1493 self.state = waker.target_state;
1494 }
1495
1496 if let WakerState::Registered(waker) =
1498 std::mem::replace(unsafe { &mut *waker.waker.get() }, WakerState::Woken)
1499 {
1500 waker.wake();
1501 }
1502 }
1503
1504 fn can_remove(&self) -> bool {
1505 self.state == LockState::ReadLock && self.read_count == 0
1506 }
1507
1508 unsafe fn remove_waker(&mut self, waker: &LockWaker) {
1509 unsafe {
1510 let is_first = (*waker.prev.get()).is_null();
1511 if is_first {
1512 self.head = *waker.next.get();
1513 } else {
1514 *(**waker.prev.get()).next.get() = *waker.next.get();
1515 }
1516 if (*waker.next.get()).is_null() {
1517 self.tail = *waker.prev.get();
1518 } else {
1519 *(**waker.next.get()).prev.get() = *waker.prev.get();
1520 }
1521 if is_first {
1522 self.wake();
1525 }
1526 }
1527 }
1528
1529 unsafe fn is_allowed(&self, target_state: LockState, is_head: bool) -> bool {
1533 match self.state {
1534 LockState::ReadLock => {
1535 (self.read_count == 0
1537 || target_state == LockState::Locked
1538 || target_state == LockState::ReadLock)
1539 && is_head
1540 }
1541 LockState::Locked => {
1542 target_state == LockState::ReadLock
1546 && (is_head || unsafe { !(*self.head).is_upgrade })
1547 }
1548 LockState::WriteLock => false,
1549 }
1550 }
1551
1552 unsafe fn downgrade_lock(&mut self) {
1553 assert_eq!(std::mem::replace(&mut self.state, LockState::Locked), LockState::WriteLock);
1554 unsafe { self.wake() };
1555 }
1556}
1557
1558#[must_use]
1559pub struct ReadGuard<'a> {
1560 manager: LockManagerRef<'a>,
1561 lock_keys: LockKeys,
1562}
1563
1564impl ReadGuard<'_> {
1565 pub fn fs(&self) -> Option<&Arc<FxFilesystem>> {
1566 if let LockManagerRef::Owned(fs) = &self.manager { Some(fs) } else { None }
1567 }
1568
1569 pub fn into_owned(mut self, fs: Arc<FxFilesystem>) -> ReadGuard<'static> {
1570 ReadGuard {
1571 manager: LockManagerRef::Owned(fs),
1572 lock_keys: std::mem::replace(&mut self.lock_keys, LockKeys::None),
1573 }
1574 }
1575}
1576
1577impl Drop for ReadGuard<'_> {
1578 fn drop(&mut self) {
1579 let mut locks = self.manager.locks.lock();
1580 locks.drop_read_locks(std::mem::take(&mut self.lock_keys));
1581 }
1582}
1583
1584impl fmt::Debug for ReadGuard<'_> {
1585 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1586 f.debug_struct("ReadGuard")
1587 .field("manager", &(&self.manager as *const _))
1588 .field("lock_keys", &self.lock_keys)
1589 .finish()
1590 }
1591}
1592
1593#[must_use]
1594pub struct WriteGuard<'a> {
1595 manager: LockManagerRef<'a>,
1596 lock_keys: LockKeys,
1597}
1598
1599impl Drop for WriteGuard<'_> {
1600 fn drop(&mut self) {
1601 let mut locks = self.manager.locks.lock();
1602 locks.drop_write_locks(std::mem::take(&mut self.lock_keys));
1603 }
1604}
1605
1606impl fmt::Debug for WriteGuard<'_> {
1607 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1608 f.debug_struct("WriteGuard")
1609 .field("manager", &(&self.manager as *const _))
1610 .field("lock_keys", &self.lock_keys)
1611 .finish()
1612 }
1613}
1614
1615enum LockManagerRef<'a> {
1616 Borrowed(&'a LockManager),
1617 Owned(Arc<FxFilesystem>),
1618}
1619
1620impl Deref for LockManagerRef<'_> {
1621 type Target = LockManager;
1622
1623 fn deref(&self) -> &Self::Target {
1624 match self {
1625 LockManagerRef::Borrowed(m) => m,
1626 LockManagerRef::Owned(f) => f.lock_manager(),
1627 }
1628 }
1629}
1630
1631impl<'a> From<&'a LockManager> for LockManagerRef<'a> {
1632 fn from(value: &'a LockManager) -> Self {
1633 LockManagerRef::Borrowed(value)
1634 }
1635}
1636
1637#[cfg(test)]
1638mod tests {
1639 use super::{LockKey, LockKeys, LockManager, LockState, Mutation, Options};
1640 use crate::filesystem::FxFilesystem;
1641 use fuchsia_async as fasync;
1642 use fuchsia_sync::Mutex;
1643 use futures::channel::oneshot::channel;
1644 use futures::future::FutureExt;
1645 use futures::stream::FuturesUnordered;
1646 use futures::{StreamExt, join, pin_mut};
1647 use std::task::Poll;
1648 use std::time::Duration;
1649 use storage_device::DeviceHolder;
1650 use storage_device::fake_device::FakeDevice;
1651
1652 #[fuchsia::test]
1653 async fn test_simple() {
1654 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1655 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1656 let mut t = fs
1657 .clone()
1658 .new_transaction(lock_keys![], Options::default())
1659 .await
1660 .expect("new_transaction failed");
1661 t.add(1, Mutation::BeginFlush);
1662 assert!(!t.is_empty());
1663 }
1664
1665 #[fuchsia::test]
1666 async fn test_locks() {
1667 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1668 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1669 let (send1, recv1) = channel();
1670 let (send2, recv2) = channel();
1671 let (send3, recv3) = channel();
1672 let done = Mutex::new(false);
1673 let mut futures = FuturesUnordered::new();
1674 futures.push(
1675 async {
1676 let _t = fs
1677 .clone()
1678 .new_transaction(
1679 lock_keys![LockKey::object_attribute(1, 2, 3)],
1680 Options::default(),
1681 )
1682 .await
1683 .expect("new_transaction failed");
1684 send1.send(()).unwrap(); send3.send(()).unwrap(); recv2.await.unwrap();
1687 fasync::Timer::new(Duration::from_millis(100)).await;
1689 assert!(!*done.lock());
1690 }
1691 .boxed(),
1692 );
1693 futures.push(
1694 async {
1695 recv1.await.unwrap();
1696 let _t = fs
1698 .clone()
1699 .new_transaction(
1700 lock_keys![LockKey::object_attribute(2, 2, 3)],
1701 Options::default(),
1702 )
1703 .await
1704 .expect("new_transaction failed");
1705 send2.send(()).unwrap();
1707 }
1708 .boxed(),
1709 );
1710 futures.push(
1711 async {
1712 recv3.await.unwrap();
1714 let _t = fs
1715 .clone()
1716 .new_transaction(
1717 lock_keys![LockKey::object_attribute(1, 2, 3)],
1718 Options::default(),
1719 )
1720 .await;
1721 *done.lock() = true;
1722 }
1723 .boxed(),
1724 );
1725 while let Some(()) = futures.next().await {}
1726 }
1727
1728 #[fuchsia::test]
1729 async fn test_read_lock_after_write_lock() {
1730 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1731 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1732 let (send1, recv1) = channel();
1733 let (send2, recv2) = channel();
1734 let done = Mutex::new(false);
1735 join!(
1736 async {
1737 let t = fs
1738 .clone()
1739 .new_transaction(
1740 lock_keys![LockKey::object_attribute(1, 2, 3)],
1741 Options::default(),
1742 )
1743 .await
1744 .expect("new_transaction failed");
1745 send1.send(()).unwrap(); recv2.await.unwrap();
1747 t.commit().await.expect("commit failed");
1748 *done.lock() = true;
1749 },
1750 async {
1751 recv1.await.unwrap();
1752 let _guard = fs
1754 .lock_manager()
1755 .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1756 .await;
1757 send2.send(()).unwrap();
1759 fasync::Timer::new(Duration::from_millis(100)).await;
1762 assert!(!*done.lock());
1763 },
1764 );
1765 }
1766
1767 #[fuchsia::test]
1768 async fn test_write_lock_after_read_lock() {
1769 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1770 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1771 let (send1, recv1) = channel();
1772 let (send2, recv2) = channel();
1773 let done = Mutex::new(false);
1774 join!(
1775 async {
1776 let _guard = fs
1778 .lock_manager()
1779 .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1780 .await;
1781 send1.send(()).unwrap();
1783 recv2.await.unwrap();
1784 fasync::Timer::new(Duration::from_millis(100)).await;
1787 assert!(!*done.lock());
1788 },
1789 async {
1790 recv1.await.unwrap();
1791 let t = fs
1792 .clone()
1793 .new_transaction(
1794 lock_keys![LockKey::object_attribute(1, 2, 3)],
1795 Options::default(),
1796 )
1797 .await
1798 .expect("new_transaction failed");
1799 send2.send(()).unwrap(); t.commit().await.expect("commit failed");
1801 *done.lock() = true;
1802 },
1803 );
1804 }
1805
1806 #[fuchsia::test]
1807 async fn test_drop_uncommitted_transaction() {
1808 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1809 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1810 let key = lock_keys![LockKey::object(1, 1)];
1811
1812 {
1814 let _write_lock = fs
1815 .clone()
1816 .new_transaction(key.clone(), Options::default())
1817 .await
1818 .expect("new_transaction failed");
1819 let _read_lock = fs.lock_manager().read_lock(key.clone()).await;
1820 }
1821 {
1823 let _write_lock = fs
1824 .clone()
1825 .new_transaction(key.clone(), Options::default())
1826 .await
1827 .expect("new_transaction failed");
1828 }
1829 fs.clone()
1831 .new_transaction(key.clone(), Options::default())
1832 .await
1833 .expect("new_transaction failed");
1834 }
1835
1836 #[fuchsia::test]
1837 async fn test_drop_waiting_write_lock() {
1838 let manager = LockManager::new();
1839 let keys = lock_keys![LockKey::object(1, 1)];
1840 {
1841 let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1842 if let Poll::Ready(_) =
1843 futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1844 {
1845 assert!(false);
1846 }
1847 }
1848 let _ = manager.lock(keys, LockState::WriteLock).await;
1849 }
1850
1851 #[fuchsia::test]
1852 async fn test_write_lock_blocks_everything() {
1853 let manager = LockManager::new();
1854 let keys = lock_keys![LockKey::object(1, 1)];
1855 {
1856 let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1857 if let Poll::Ready(_) =
1858 futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1859 {
1860 assert!(false);
1861 }
1862 if let Poll::Ready(_) =
1863 futures::poll!(manager.lock(keys.clone(), LockState::ReadLock).boxed())
1864 {
1865 assert!(false);
1866 }
1867 }
1868 {
1869 let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1870 }
1871 {
1872 let _guard = manager.lock(keys, LockState::ReadLock).await;
1873 }
1874 }
1875
1876 #[fuchsia::test]
1877 async fn test_downgrade_locks() {
1878 let manager = LockManager::new();
1879 let keys = lock_keys![LockKey::object(1, 1)];
1880 let _guard = manager.txn_lock(keys.clone()).await;
1881 manager.commit_prepare_keys(&keys).await;
1882
1883 let mut read_lock: FuturesUnordered<_> =
1885 std::iter::once(manager.read_lock(keys.clone())).collect();
1886
1887 assert!(futures::poll!(read_lock.next()).is_pending());
1889
1890 manager.downgrade_locks(&keys);
1891
1892 assert!(futures::poll!(read_lock.next()).is_ready());
1894 }
1895
1896 #[fuchsia::test]
1897 async fn test_dropped_write_lock_wakes() {
1898 let manager = LockManager::new();
1899 let keys = lock_keys![LockKey::object(1, 1)];
1900 let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1901 let mut read_lock = FuturesUnordered::new();
1902 read_lock.push(manager.lock(keys.clone(), LockState::ReadLock));
1903
1904 {
1905 let write_lock = manager.lock(keys, LockState::WriteLock);
1906 pin_mut!(write_lock);
1907
1908 assert!(futures::poll!(write_lock).is_pending());
1910
1911 assert!(futures::poll!(read_lock.next()).is_pending());
1913 }
1914
1915 assert!(futures::poll!(read_lock.next()).is_ready());
1917 }
1918
1919 #[fuchsia::test]
1920 async fn test_drop_upgrade() {
1921 let manager = LockManager::new();
1922 let keys = lock_keys![LockKey::object(1, 1)];
1923 let _guard = manager.lock(keys.clone(), LockState::Locked).await;
1924
1925 {
1926 let commit_prepare = manager.commit_prepare_keys(&keys);
1927 pin_mut!(commit_prepare);
1928 let _read_guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1929 assert!(futures::poll!(commit_prepare).is_pending());
1930
1931 }
1934
1935 manager.commit_prepare_keys(&keys).await;
1937 }
1938
1939 #[fasync::run_singlethreaded(test)]
1940 async fn test_woken_upgrade_blocks_reads() {
1941 let manager = LockManager::new();
1942 let keys = lock_keys![LockKey::object(1, 1)];
1943 let guard = manager.lock(keys.clone(), LockState::Locked).await;
1945
1946 let read1 = manager.lock(keys.clone(), LockState::ReadLock).await;
1948
1949 let commit_prepare = manager.commit_prepare_keys(&keys);
1951 pin_mut!(commit_prepare);
1952 assert!(futures::poll!(commit_prepare.as_mut()).is_pending());
1953
1954 let read2 = manager.lock(keys.clone(), LockState::ReadLock);
1956 pin_mut!(read2);
1957 assert!(futures::poll!(read2.as_mut()).is_pending());
1958
1959 std::mem::drop(read1);
1961 assert!(futures::poll!(commit_prepare).is_ready());
1962
1963 assert!(futures::poll!(read2.as_mut()).is_pending());
1965
1966 std::mem::drop(guard);
1968 assert!(futures::poll!(read2).is_ready());
1969 }
1970
1971 static LOCK_KEY_1: LockKey = LockKey::flush(1);
1972 static LOCK_KEY_2: LockKey = LockKey::flush(2);
1973 static LOCK_KEY_3: LockKey = LockKey::flush(3);
1974
1975 fn assert_lock_keys_equal(value: &LockKeys, expected: &LockKeys) {
1977 match (value, expected) {
1978 (LockKeys::None, LockKeys::None) => {}
1979 (LockKeys::Inline(key1), LockKeys::Inline(key2)) => {
1980 if key1 != key2 {
1981 panic!("{key1:?} != {key2:?}");
1982 }
1983 }
1984 (LockKeys::Vec(vec1), LockKeys::Vec(vec2)) => {
1985 if vec1 != vec2 {
1986 panic!("{vec1:?} != {vec2:?}");
1987 }
1988 if vec1.capacity() != vec2.capacity() {
1989 panic!(
1990 "LockKeys have different capacity: {} != {}",
1991 vec1.capacity(),
1992 vec2.capacity()
1993 );
1994 }
1995 }
1996 (_, _) => panic!("{value:?} != {expected:?}"),
1997 }
1998 }
1999
2000 fn assert_lock_keys_equivalent(value: &LockKeys, expected: &LockKeys) {
2002 let value: Vec<_> = value.iter().collect();
2003 let expected: Vec<_> = expected.iter().collect();
2004 assert_eq!(value, expected);
2005 }
2006
2007 #[test]
2008 fn test_lock_keys_macro() {
2009 assert_lock_keys_equal(&lock_keys![], &LockKeys::None);
2010 assert_lock_keys_equal(&lock_keys![LOCK_KEY_1], &LockKeys::Inline(LOCK_KEY_1));
2011 assert_lock_keys_equal(
2012 &lock_keys![LOCK_KEY_1, LOCK_KEY_2],
2013 &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]),
2014 );
2015 }
2016
2017 #[test]
2018 fn test_lock_keys_with_capacity() {
2019 assert_lock_keys_equal(&LockKeys::with_capacity(0), &LockKeys::None);
2020 assert_lock_keys_equal(&LockKeys::with_capacity(1), &LockKeys::None);
2021 assert_lock_keys_equal(&LockKeys::with_capacity(2), &LockKeys::Vec(Vec::with_capacity(2)));
2022 }
2023
2024 #[test]
2025 fn test_lock_keys_len() {
2026 assert_eq!(lock_keys![].len(), 0);
2027 assert_eq!(lock_keys![LOCK_KEY_1].len(), 1);
2028 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].len(), 2);
2029 }
2030
2031 #[test]
2032 fn test_lock_keys_contains() {
2033 assert_eq!(lock_keys![].contains(&LOCK_KEY_1), false);
2034 assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_1), true);
2035 assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_2), false);
2036 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_1), true);
2037 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_2), true);
2038 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_3), false);
2039 }
2040
2041 #[test]
2042 fn test_lock_keys_push() {
2043 let mut keys = lock_keys![];
2044 keys.push(LOCK_KEY_1);
2045 assert_lock_keys_equal(&keys, &LockKeys::Inline(LOCK_KEY_1));
2046 keys.push(LOCK_KEY_2);
2047 assert_lock_keys_equal(&keys, &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]));
2048 keys.push(LOCK_KEY_3);
2049 assert_lock_keys_equivalent(
2050 &keys,
2051 &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2, LOCK_KEY_3]),
2052 );
2053 }
2054
2055 #[test]
2056 fn test_lock_keys_sort_unstable() {
2057 let mut keys = lock_keys![];
2058 keys.sort_unstable();
2059 assert_lock_keys_equal(&keys, &lock_keys![]);
2060
2061 let mut keys = lock_keys![LOCK_KEY_1];
2062 keys.sort_unstable();
2063 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2064
2065 let mut keys = lock_keys![LOCK_KEY_2, LOCK_KEY_1];
2066 keys.sort_unstable();
2067 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2068 }
2069
2070 #[test]
2071 fn test_lock_keys_dedup() {
2072 let mut keys = lock_keys![];
2073 keys.dedup();
2074 assert_lock_keys_equal(&keys, &lock_keys![]);
2075
2076 let mut keys = lock_keys![LOCK_KEY_1];
2077 keys.dedup();
2078 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2079
2080 let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_1];
2081 keys.dedup();
2082 assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2083 }
2084
2085 #[test]
2086 fn test_lock_keys_truncate() {
2087 let mut keys = lock_keys![];
2088 keys.truncate(5);
2089 assert_lock_keys_equal(&keys, &lock_keys![]);
2090 keys.truncate(0);
2091 assert_lock_keys_equal(&keys, &lock_keys![]);
2092
2093 let mut keys = lock_keys![LOCK_KEY_1];
2094 keys.truncate(5);
2095 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2096 keys.truncate(0);
2097 assert_lock_keys_equal(&keys, &lock_keys![]);
2098
2099 let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_2];
2100 keys.truncate(5);
2101 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2102 keys.truncate(1);
2103 assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2105 }
2106
2107 #[test]
2108 fn test_lock_keys_iter() {
2109 assert_eq!(lock_keys![].iter().collect::<Vec<_>>(), Vec::<&LockKey>::new());
2110
2111 assert_eq!(lock_keys![LOCK_KEY_1].iter().collect::<Vec<_>>(), vec![&LOCK_KEY_1]);
2112
2113 assert_eq!(
2114 lock_keys![LOCK_KEY_1, LOCK_KEY_2].iter().collect::<Vec<_>>(),
2115 vec![&LOCK_KEY_1, &LOCK_KEY_2]
2116 );
2117 }
2118}