1use crate::checksum::Checksum;
6use crate::filesystem::{FxFilesystem, TxnGuard};
7use crate::log::*;
8use crate::lsm_tree::types::Item;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::AttributeKey;
11use crate::object_store::allocator::{AllocatorItem, Reservation};
12use crate::object_store::object_manager::{ObjectManager, reserved_space_from_journal_usage};
13use crate::object_store::object_record::{
14 FxfsKey, FxfsKeyV40, FxfsKeyV49, ObjectItem, ObjectItemV40, ObjectItemV41, ObjectItemV43,
15 ObjectItemV46, ObjectItemV47, ObjectItemV49, ObjectItemV50, ObjectKey, ObjectKeyData,
16 ObjectValue, ProjectProperty,
17};
18use crate::serialized_types::{Migrate, Versioned, migrate_nodefault, migrate_to_version};
19use anyhow::Error;
20use either::{Either, Left, Right};
21use fprint::TypeFingerprint;
22use fuchsia_sync::Mutex;
23use futures::future::poll_fn;
24use futures::pin_mut;
25use rustc_hash::FxHashMap as HashMap;
26use scopeguard::ScopeGuard;
27use serde::{Deserialize, Serialize};
28use std::cell::UnsafeCell;
29use std::cmp::Ordering;
30use std::collections::BTreeSet;
31use std::collections::hash_map::Entry;
32use std::marker::PhantomPinned;
33use std::ops::{Deref, DerefMut, Range};
34use std::sync::Arc;
35use std::task::{Poll, Waker};
36use std::{fmt, mem};
37
38#[derive(Clone, Copy, Default)]
42pub struct Options<'a> {
43 pub skip_journal_checks: bool,
46
47 pub borrow_metadata_space: bool,
52
53 pub allocator_reservation: Option<&'a Reservation>,
58
59 pub txn_guard: Option<&'a TxnGuard<'a>>,
61}
62
63pub const TRANSACTION_MAX_JOURNAL_USAGE: u64 = 24_576;
70pub const TRANSACTION_METADATA_MAX_AMOUNT: u64 =
71 reserved_space_from_journal_usage(TRANSACTION_MAX_JOURNAL_USAGE);
72
73#[must_use]
74pub struct TransactionLocks<'a>(pub WriteGuard<'a>);
75
76pub type Mutation = MutationV50;
81
82#[derive(
83 Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned,
84)]
85#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
86pub enum MutationV50 {
87 ObjectStore(ObjectStoreMutationV50),
88 EncryptedObjectStore(Box<[u8]>),
89 Allocator(AllocatorMutationV32),
90 BeginFlush,
92 EndFlush,
95 DeleteVolume,
97 UpdateBorrowed(u64),
98 UpdateMutationsKey(UpdateMutationsKeyV49),
99 CreateInternalDir(u64),
100}
101
102#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
103#[migrate_to_version(MutationV50)]
104pub enum MutationV49 {
105 ObjectStore(ObjectStoreMutationV49),
106 EncryptedObjectStore(Box<[u8]>),
107 Allocator(AllocatorMutationV32),
108 BeginFlush,
109 EndFlush,
110 DeleteVolume,
111 UpdateBorrowed(u64),
112 UpdateMutationsKey(UpdateMutationsKeyV49),
113 CreateInternalDir(u64),
114}
115
116#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
117#[migrate_to_version(MutationV49)]
118pub enum MutationV47 {
119 ObjectStore(ObjectStoreMutationV47),
120 EncryptedObjectStore(Box<[u8]>),
121 Allocator(AllocatorMutationV32),
122 BeginFlush,
123 EndFlush,
124 DeleteVolume,
125 UpdateBorrowed(u64),
126 UpdateMutationsKey(UpdateMutationsKeyV40),
127 CreateInternalDir(u64),
128}
129
130#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
131#[migrate_to_version(MutationV47)]
132pub enum MutationV46 {
133 ObjectStore(ObjectStoreMutationV46),
134 EncryptedObjectStore(Box<[u8]>),
135 Allocator(AllocatorMutationV32),
136 BeginFlush,
137 EndFlush,
138 DeleteVolume,
139 UpdateBorrowed(u64),
140 UpdateMutationsKey(UpdateMutationsKeyV40),
141 CreateInternalDir(u64),
142}
143
144#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
145#[migrate_to_version(MutationV46)]
146pub enum MutationV43 {
147 ObjectStore(ObjectStoreMutationV43),
148 EncryptedObjectStore(Box<[u8]>),
149 Allocator(AllocatorMutationV32),
150 BeginFlush,
151 EndFlush,
152 DeleteVolume,
153 UpdateBorrowed(u64),
154 UpdateMutationsKey(UpdateMutationsKeyV40),
155 CreateInternalDir(u64),
156}
157
158#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
159#[migrate_to_version(MutationV43)]
160pub enum MutationV41 {
161 ObjectStore(ObjectStoreMutationV41),
162 EncryptedObjectStore(Box<[u8]>),
163 Allocator(AllocatorMutationV32),
164 BeginFlush,
165 EndFlush,
166 DeleteVolume,
167 UpdateBorrowed(u64),
168 UpdateMutationsKey(UpdateMutationsKeyV40),
169 CreateInternalDir(u64),
170}
171
172#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
173#[migrate_to_version(MutationV41)]
174pub enum MutationV40 {
175 ObjectStore(ObjectStoreMutationV40),
176 EncryptedObjectStore(Box<[u8]>),
177 Allocator(AllocatorMutationV32),
178 BeginFlush,
179 EndFlush,
180 DeleteVolume,
181 UpdateBorrowed(u64),
182 UpdateMutationsKey(UpdateMutationsKeyV40),
183 CreateInternalDir(u64),
184}
185
186impl Mutation {
187 pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
188 Mutation::ObjectStore(ObjectStoreMutation {
189 item: Item::new(key, value),
190 op: Operation::Insert,
191 })
192 }
193
194 pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
195 Mutation::ObjectStore(ObjectStoreMutation {
196 item: Item::new(key, value),
197 op: Operation::ReplaceOrInsert,
198 })
199 }
200
201 pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
202 Mutation::ObjectStore(ObjectStoreMutation {
203 item: Item::new(key, value),
204 op: Operation::Merge,
205 })
206 }
207
208 pub fn update_mutations_key(key: FxfsKey) -> Self {
209 Mutation::UpdateMutationsKey(key.into())
210 }
211}
212
213pub type ObjectStoreMutation = ObjectStoreMutationV50;
217
218#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
219#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
220pub struct ObjectStoreMutationV50 {
221 pub item: ObjectItemV50,
222 pub op: OperationV32,
223}
224
225#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
226#[migrate_to_version(ObjectStoreMutationV50)]
227#[migrate_nodefault]
228pub struct ObjectStoreMutationV49 {
229 pub item: ObjectItemV49,
230 pub op: OperationV32,
231}
232
233#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
234#[migrate_to_version(ObjectStoreMutationV49)]
235#[migrate_nodefault]
236pub struct ObjectStoreMutationV47 {
237 pub item: ObjectItemV47,
238 pub op: OperationV32,
239}
240
241#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
242#[migrate_to_version(ObjectStoreMutationV47)]
243#[migrate_nodefault]
244pub struct ObjectStoreMutationV46 {
245 pub item: ObjectItemV46,
246 pub op: OperationV32,
247}
248
249#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
250#[migrate_to_version(ObjectStoreMutationV46)]
251#[migrate_nodefault]
252pub struct ObjectStoreMutationV43 {
253 pub item: ObjectItemV43,
254 pub op: OperationV32,
255}
256
257#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
258#[migrate_to_version(ObjectStoreMutationV43)]
259#[migrate_nodefault]
260pub struct ObjectStoreMutationV41 {
261 pub item: ObjectItemV41,
262 pub op: OperationV32,
263}
264
265#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
266#[migrate_nodefault]
267#[migrate_to_version(ObjectStoreMutationV41)]
268pub struct ObjectStoreMutationV40 {
269 pub item: ObjectItemV40,
270 pub op: OperationV32,
271}
272
273pub type Operation = OperationV32;
275
276#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
277#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
278pub enum OperationV32 {
279 Insert,
280 ReplaceOrInsert,
281 Merge,
282}
283
284impl Ord for ObjectStoreMutation {
285 fn cmp(&self, other: &Self) -> Ordering {
286 self.item.key.cmp(&other.item.key)
287 }
288}
289
290impl PartialOrd for ObjectStoreMutation {
291 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
292 Some(self.cmp(other))
293 }
294}
295
296impl PartialEq for ObjectStoreMutation {
297 fn eq(&self, other: &Self) -> bool {
298 self.item.key.eq(&other.item.key)
299 }
300}
301
302impl Eq for ObjectStoreMutation {}
303
304impl Ord for AllocatorItem {
305 fn cmp(&self, other: &Self) -> Ordering {
306 self.key.cmp(&other.key)
307 }
308}
309
310impl PartialOrd for AllocatorItem {
311 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
312 Some(self.cmp(other))
313 }
314}
315
316pub type DeviceRange = DeviceRangeV32;
319
320#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypeFingerprint)]
321#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
322pub struct DeviceRangeV32(pub Range<u64>);
323
324impl Deref for DeviceRange {
325 type Target = Range<u64>;
326
327 fn deref(&self) -> &Self::Target {
328 &self.0
329 }
330}
331
332impl DerefMut for DeviceRange {
333 fn deref_mut(&mut self) -> &mut Self::Target {
334 &mut self.0
335 }
336}
337
338impl From<Range<u64>> for DeviceRange {
339 fn from(range: Range<u64>) -> Self {
340 Self(range)
341 }
342}
343
344impl Into<Range<u64>> for DeviceRange {
345 fn into(self) -> Range<u64> {
346 self.0
347 }
348}
349
350impl Ord for DeviceRange {
351 fn cmp(&self, other: &Self) -> Ordering {
352 self.start.cmp(&other.start).then(self.end.cmp(&other.end))
353 }
354}
355
356impl PartialOrd for DeviceRange {
357 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
358 Some(self.cmp(other))
359 }
360}
361
362pub type AllocatorMutation = AllocatorMutationV32;
363
364#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint)]
365#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
366pub enum AllocatorMutationV32 {
367 Allocate {
368 device_range: DeviceRangeV32,
369 owner_object_id: u64,
370 },
371 Deallocate {
372 device_range: DeviceRangeV32,
373 owner_object_id: u64,
374 },
375 SetLimit {
376 owner_object_id: u64,
377 bytes: u64,
378 },
379 MarkForDeletion(u64),
385}
386
387pub type UpdateMutationsKey = UpdateMutationsKeyV49;
388
389#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
390pub struct UpdateMutationsKeyV49(pub FxfsKeyV49);
391
392#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
393#[migrate_to_version(UpdateMutationsKeyV49)]
394pub struct UpdateMutationsKeyV40(pub FxfsKeyV40);
395
396impl From<UpdateMutationsKey> for FxfsKey {
397 fn from(outer: UpdateMutationsKey) -> Self {
398 outer.0
399 }
400}
401
402impl From<FxfsKey> for UpdateMutationsKey {
403 fn from(inner: FxfsKey) -> Self {
404 Self(inner)
405 }
406}
407
408#[cfg(fuzz)]
409impl<'a> arbitrary::Arbitrary<'a> for UpdateMutationsKey {
410 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
411 Ok(UpdateMutationsKey::from(FxfsKey::arbitrary(u).unwrap()))
412 }
413}
414
415impl Ord for UpdateMutationsKey {
416 fn cmp(&self, other: &Self) -> Ordering {
417 (self as *const UpdateMutationsKey).cmp(&(other as *const _))
418 }
419}
420
421impl PartialOrd for UpdateMutationsKey {
422 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
423 Some(self.cmp(other))
424 }
425}
426
427impl Eq for UpdateMutationsKey {}
428
429impl PartialEq for UpdateMutationsKey {
430 fn eq(&self, other: &Self) -> bool {
431 std::ptr::eq(self, other)
432 }
433}
434
435#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Copy)]
445pub enum LockKey {
446 Filesystem,
448
449 Flush {
451 object_id: u64,
452 },
453
454 ObjectAttribute {
456 store_object_id: u64,
457 object_id: u64,
458 attribute_id: u64,
459 },
460
461 Object {
463 store_object_id: u64,
464 object_id: u64,
465 },
466
467 ProjectId {
468 store_object_id: u64,
469 project_id: u64,
470 },
471
472 Truncate {
474 store_object_id: u64,
475 object_id: u64,
476 },
477
478 InternalDirectory {
480 store_object_id: u64,
481 },
482}
483
484impl LockKey {
485 pub const fn object_attribute(store_object_id: u64, object_id: u64, attribute_id: u64) -> Self {
486 LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
487 }
488
489 pub const fn object(store_object_id: u64, object_id: u64) -> Self {
490 LockKey::Object { store_object_id, object_id }
491 }
492
493 pub const fn flush(object_id: u64) -> Self {
494 LockKey::Flush { object_id }
495 }
496
497 pub const fn truncate(store_object_id: u64, object_id: u64) -> Self {
498 LockKey::Truncate { store_object_id, object_id }
499 }
500}
501
502#[derive(Clone, Debug)]
504pub enum LockKeys {
505 None,
506 Inline(LockKey),
507 Vec(Vec<LockKey>),
508}
509
510impl LockKeys {
511 pub fn with_capacity(capacity: usize) -> Self {
512 if capacity > 1 { LockKeys::Vec(Vec::with_capacity(capacity)) } else { LockKeys::None }
513 }
514
515 pub fn push(&mut self, key: LockKey) {
516 match self {
517 Self::None => *self = LockKeys::Inline(key),
518 Self::Inline(inline) => {
519 *self = LockKeys::Vec(vec![*inline, key]);
520 }
521 Self::Vec(vec) => vec.push(key),
522 }
523 }
524
525 pub fn truncate(&mut self, len: usize) {
526 match self {
527 Self::None => {}
528 Self::Inline(_) => {
529 if len == 0 {
530 *self = Self::None;
531 }
532 }
533 Self::Vec(vec) => vec.truncate(len),
534 }
535 }
536
537 fn len(&self) -> usize {
538 match self {
539 Self::None => 0,
540 Self::Inline(_) => 1,
541 Self::Vec(vec) => vec.len(),
542 }
543 }
544
545 fn contains(&self, key: &LockKey) -> bool {
546 match self {
547 Self::None => false,
548 Self::Inline(single) => single == key,
549 Self::Vec(vec) => vec.contains(key),
550 }
551 }
552
553 fn sort_unstable(&mut self) {
554 match self {
555 Self::Vec(vec) => vec.sort_unstable(),
556 _ => {}
557 }
558 }
559
560 fn dedup(&mut self) {
561 match self {
562 Self::Vec(vec) => vec.dedup(),
563 _ => {}
564 }
565 }
566
567 fn iter(&self) -> LockKeysIter<'_> {
568 match self {
569 LockKeys::None => LockKeysIter::None,
570 LockKeys::Inline(key) => LockKeysIter::Inline(key),
571 LockKeys::Vec(keys) => LockKeysIter::Vec(keys.into_iter()),
572 }
573 }
574}
575
576enum LockKeysIter<'a> {
577 None,
578 Inline(&'a LockKey),
579 Vec(<&'a Vec<LockKey> as IntoIterator>::IntoIter),
580}
581
582impl<'a> Iterator for LockKeysIter<'a> {
583 type Item = &'a LockKey;
584 fn next(&mut self) -> Option<Self::Item> {
585 match self {
586 Self::None => None,
587 Self::Inline(inline) => {
588 let next = *inline;
589 *self = Self::None;
590 Some(next)
591 }
592 Self::Vec(vec) => vec.next(),
593 }
594 }
595}
596
597impl Default for LockKeys {
598 fn default() -> Self {
599 LockKeys::None
600 }
601}
602
603#[macro_export]
604macro_rules! lock_keys {
605 () => {
606 $crate::object_store::transaction::LockKeys::None
607 };
608 ($lock_key:expr $(,)?) => {
609 $crate::object_store::transaction::LockKeys::Inline($lock_key)
610 };
611 ($($lock_keys:expr),+ $(,)?) => {
612 $crate::object_store::transaction::LockKeys::Vec(vec![$($lock_keys),+])
613 };
614}
615pub use lock_keys;
616
617pub trait AssociatedObject: Send + Sync {
621 fn will_apply_mutation(&self, _mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
622 }
623}
624
625pub enum AssocObj<'a> {
626 None,
627 Borrowed(&'a dyn AssociatedObject),
628 Owned(Box<dyn AssociatedObject>),
629}
630
631impl AssocObj<'_> {
632 pub fn map<R, F: FnOnce(&dyn AssociatedObject) -> R>(&self, f: F) -> Option<R> {
633 match self {
634 AssocObj::None => None,
635 AssocObj::Borrowed(b) => Some(f(*b)),
636 AssocObj::Owned(o) => Some(f(o.as_ref())),
637 }
638 }
639}
640
641pub struct TxnMutation<'a> {
642 pub object_id: u64,
646
647 pub mutation: Mutation,
649
650 pub associated_object: AssocObj<'a>,
653}
654
655impl Ord for TxnMutation<'_> {
658 fn cmp(&self, other: &Self) -> Ordering {
659 self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
660 }
661}
662
663impl PartialOrd for TxnMutation<'_> {
664 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
665 Some(self.cmp(other))
666 }
667}
668
669impl PartialEq for TxnMutation<'_> {
670 fn eq(&self, other: &Self) -> bool {
671 self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
672 }
673}
674
675impl Eq for TxnMutation<'_> {}
676
677impl std::fmt::Debug for TxnMutation<'_> {
678 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
679 f.debug_struct("TxnMutation")
680 .field("object_id", &self.object_id)
681 .field("mutation", &self.mutation)
682 .finish()
683 }
684}
685
686pub enum MetadataReservation {
687 None,
689
690 Borrowed,
693
694 Reservation(Reservation),
696
697 Hold(u64),
699}
700
701pub struct Transaction<'a> {
703 txn_guard: TxnGuard<'a>,
704
705 mutations: BTreeSet<TxnMutation<'a>>,
707
708 txn_locks: LockKeys,
710
711 pub allocator_reservation: Option<&'a Reservation>,
713
714 pub metadata_reservation: MetadataReservation,
716
717 new_objects: BTreeSet<(u64, u64)>,
720
721 checksums: Vec<(Range<u64>, Vec<Checksum>, bool)>,
723
724 includes_write: bool,
726}
727
728impl<'a> Transaction<'a> {
729 pub async fn new(
732 txn_guard: TxnGuard<'a>,
733 options: Options<'a>,
734 txn_locks: LockKeys,
735 ) -> Result<Transaction<'a>, Error> {
736 txn_guard.fs().add_transaction(options.skip_journal_checks).await;
737 let fs = txn_guard.fs().clone();
738 let guard = scopeguard::guard((), |_| fs.sub_transaction());
739 let (metadata_reservation, allocator_reservation, hold) =
740 txn_guard.fs().reservation_for_transaction(options).await?;
741
742 let txn_locks = {
743 let lock_manager = txn_guard.fs().lock_manager();
744 let mut write_guard = lock_manager.txn_lock(txn_locks).await;
745 std::mem::take(&mut write_guard.0.lock_keys)
746 };
747 let mut transaction = Transaction {
748 txn_guard,
749 mutations: BTreeSet::new(),
750 txn_locks,
751 allocator_reservation: None,
752 metadata_reservation,
753 new_objects: BTreeSet::new(),
754 checksums: Vec::new(),
755 includes_write: false,
756 };
757
758 ScopeGuard::into_inner(guard);
759 hold.map(|h| h.forget()); transaction.allocator_reservation = allocator_reservation;
761 Ok(transaction)
762 }
763
764 pub fn txn_guard(&self) -> &TxnGuard<'_> {
765 &self.txn_guard
766 }
767
768 pub fn mutations(&self) -> &BTreeSet<TxnMutation<'a>> {
769 &self.mutations
770 }
771
772 pub fn take_mutations(&mut self) -> BTreeSet<TxnMutation<'a>> {
773 self.new_objects.clear();
774 mem::take(&mut self.mutations)
775 }
776
777 pub fn add(&mut self, object_id: u64, mutation: Mutation) -> Option<Mutation> {
780 self.add_with_object(object_id, mutation, AssocObj::None)
781 }
782
783 pub fn remove(&mut self, object_id: u64, mutation: Mutation) {
785 let txn_mutation = TxnMutation { object_id, mutation, associated_object: AssocObj::None };
786 if self.mutations.remove(&txn_mutation) {
787 if let Mutation::ObjectStore(ObjectStoreMutation {
788 item:
789 ObjectItem {
790 key: ObjectKey { object_id: new_object_id, data: ObjectKeyData::Object },
791 ..
792 },
793 op: Operation::Insert,
794 }) = txn_mutation.mutation
795 {
796 self.new_objects.remove(&(object_id, new_object_id));
797 }
798 }
799 }
800
801 pub fn add_with_object(
804 &mut self,
805 object_id: u64,
806 mutation: Mutation,
807 associated_object: AssocObj<'a>,
808 ) -> Option<Mutation> {
809 assert!(object_id != INVALID_OBJECT_ID);
810 if let Mutation::ObjectStore(ObjectStoreMutation {
811 item:
812 Item {
813 key:
814 ObjectKey { data: ObjectKeyData::Attribute(_, AttributeKey::Extent(_)), .. },
815 ..
816 },
817 ..
818 }) = &mutation
819 {
820 self.includes_write = true;
821 }
822 let txn_mutation = TxnMutation { object_id, mutation, associated_object };
823 self.verify_locks(&txn_mutation);
824 self.mutations.replace(txn_mutation).map(|m| m.mutation)
825 }
826
827 pub fn add_checksum(&mut self, range: Range<u64>, checksums: Vec<Checksum>, first_write: bool) {
828 self.checksums.push((range, checksums, first_write));
829 }
830
831 pub fn includes_write(&self) -> bool {
832 self.includes_write
833 }
834
835 pub fn checksums(&self) -> &[(Range<u64>, Vec<Checksum>, bool)] {
836 &self.checksums
837 }
838
839 pub fn take_checksums(&mut self) -> Vec<(Range<u64>, Vec<Checksum>, bool)> {
840 std::mem::replace(&mut self.checksums, Vec::new())
841 }
842
843 fn verify_locks(&mut self, mutation: &TxnMutation<'_>) {
844 match mutation {
848 TxnMutation {
849 mutation:
850 Mutation::ObjectStore {
851 0: ObjectStoreMutation { item: ObjectItem { key, .. }, op },
852 },
853 object_id: store_object_id,
854 ..
855 } => {
856 match &key.data {
857 ObjectKeyData::Attribute(..) => {
858 }
860 ObjectKeyData::Child { .. }
861 | ObjectKeyData::EncryptedChild(_)
862 | ObjectKeyData::EncryptedCasefoldChild(_)
863 | ObjectKeyData::CasefoldChild { .. } => {
864 let id = key.object_id;
865 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
866 && !self.new_objects.contains(&(*store_object_id, id))
867 {
868 debug_assert!(
869 false,
870 "Not holding required lock for object {id} \
871 in store {store_object_id}"
872 );
873 error!(
874 "Not holding required lock for object {id} in store \
875 {store_object_id}"
876 )
877 }
878 }
879 ObjectKeyData::GraveyardEntry { .. } => {
880 }
882 ObjectKeyData::GraveyardAttributeEntry { .. } => {
883 }
885 ObjectKeyData::Keys => {
886 let id = key.object_id;
887 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
888 && !self.new_objects.contains(&(*store_object_id, id))
889 {
890 debug_assert!(
891 false,
892 "Not holding required lock for object {id} \
893 in store {store_object_id}"
894 );
895 error!(
896 "Not holding required lock for object {id} in store \
897 {store_object_id}"
898 )
899 }
900 }
901 ObjectKeyData::Object => match op {
902 Operation::Insert => {
904 self.new_objects.insert((*store_object_id, key.object_id));
905 }
906 Operation::Merge | Operation::ReplaceOrInsert => {
907 let id = key.object_id;
908 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
909 && !self.new_objects.contains(&(*store_object_id, id))
910 {
911 debug_assert!(
912 false,
913 "Not holding required lock for object {id} \
914 in store {store_object_id}"
915 );
916 error!(
917 "Not holding required lock for object {id} in store \
918 {store_object_id}"
919 )
920 }
921 }
922 },
923 ObjectKeyData::Project { project_id, property: ProjectProperty::Limit } => {
924 if !self.txn_locks.contains(&LockKey::ProjectId {
925 store_object_id: *store_object_id,
926 project_id: *project_id,
927 }) {
928 debug_assert!(
929 false,
930 "Not holding required lock for project limit id {project_id} \
931 in store {store_object_id}"
932 );
933 error!(
934 "Not holding required lock for project limit id {project_id} in \
935 store {store_object_id}"
936 )
937 }
938 }
939 ObjectKeyData::Project { property: ProjectProperty::Usage, .. } => match op {
940 Operation::Insert | Operation::ReplaceOrInsert => {
941 panic!(
942 "Project usage is all handled by merging deltas, no inserts or \
943 replacements should be used"
944 );
945 }
946 Operation::Merge => {}
948 },
949 ObjectKeyData::ExtendedAttribute { .. } => {
950 let id = key.object_id;
951 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
952 && !self.new_objects.contains(&(*store_object_id, id))
953 {
954 debug_assert!(
955 false,
956 "Not holding required lock for object {id} \
957 in store {store_object_id} while mutating extended attribute"
958 );
959 error!(
960 "Not holding required lock for object {id} in store \
961 {store_object_id} while mutating extended attribute"
962 )
963 }
964 }
965 }
966 }
967 TxnMutation { mutation: Mutation::DeleteVolume, object_id, .. } => {
968 if !self.txn_locks.contains(&LockKey::flush(*object_id)) {
969 debug_assert!(false, "Not holding required lock for DeleteVolume");
970 error!("Not holding required lock for DeleteVolume");
971 }
972 }
973 _ => {}
974 }
975 }
976
977 pub fn is_empty(&self) -> bool {
979 self.mutations.is_empty()
980 }
981
982 pub fn get_object_mutation(
985 &self,
986 store_object_id: u64,
987 key: ObjectKey,
988 ) -> Option<&ObjectStoreMutation> {
989 if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
990 self.mutations.get(&TxnMutation {
991 object_id: store_object_id,
992 mutation: Mutation::insert_object(key, ObjectValue::None),
993 associated_object: AssocObj::None,
994 })
995 {
996 Some(mutation)
997 } else {
998 None
999 }
1000 }
1001
1002 pub async fn commit(mut self) -> Result<u64, Error> {
1004 debug!(txn:? = &self; "Commit");
1005 self.txn_guard.fs().clone().commit_transaction(&mut self, &mut |_| {}).await
1006 }
1007
1008 pub async fn commit_with_callback<R: Send>(
1011 mut self,
1012 f: impl FnOnce(u64) -> R + Send,
1013 ) -> Result<R, Error> {
1014 debug!(txn:? = &self; "Commit");
1015 let mut f = Some(f);
1018 let mut result = None;
1019 self.txn_guard
1020 .fs()
1021 .clone()
1022 .commit_transaction(&mut self, &mut |offset| {
1023 result = Some(f.take().unwrap()(offset));
1024 })
1025 .await?;
1026 Ok(result.unwrap())
1027 }
1028
1029 pub async fn commit_and_continue(&mut self) -> Result<(), Error> {
1032 debug!(txn:? = self; "Commit");
1033 self.txn_guard.fs().clone().commit_transaction(self, &mut |_| {}).await?;
1034 assert!(self.mutations.is_empty());
1035 self.txn_guard.fs().lock_manager().downgrade_locks(&self.txn_locks);
1036 Ok(())
1037 }
1038}
1039
1040impl Drop for Transaction<'_> {
1041 fn drop(&mut self) {
1042 debug!(txn:? = &self; "Drop");
1045 self.txn_guard.fs().clone().drop_transaction(self);
1046 }
1047}
1048
1049impl std::fmt::Debug for Transaction<'_> {
1050 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1051 f.debug_struct("Transaction")
1052 .field("mutations", &self.mutations)
1053 .field("txn_locks", &self.txn_locks)
1054 .field("reservation", &self.allocator_reservation)
1055 .finish()
1056 }
1057}
1058
1059pub enum BorrowedOrOwned<'a, T> {
1060 Borrowed(&'a T),
1061 Owned(T),
1062}
1063
1064impl<T> Deref for BorrowedOrOwned<'_, T> {
1065 type Target = T;
1066
1067 fn deref(&self) -> &Self::Target {
1068 match self {
1069 BorrowedOrOwned::Borrowed(b) => b,
1070 BorrowedOrOwned::Owned(o) => &o,
1071 }
1072 }
1073}
1074
1075impl<'a, T> From<&'a T> for BorrowedOrOwned<'a, T> {
1076 fn from(value: &'a T) -> Self {
1077 BorrowedOrOwned::Borrowed(value)
1078 }
1079}
1080
1081impl<T> From<T> for BorrowedOrOwned<'_, T> {
1082 fn from(value: T) -> Self {
1083 BorrowedOrOwned::Owned(value)
1084 }
1085}
1086
1087pub struct LockManager {
1116 locks: Mutex<Locks>,
1117}
1118
1119struct Locks {
1120 keys: HashMap<LockKey, LockEntry>,
1121}
1122
1123impl Locks {
1124 fn drop_lock(&mut self, key: LockKey, state: LockState) {
1125 if let Entry::Occupied(mut occupied) = self.keys.entry(key) {
1126 let entry = occupied.get_mut();
1127 let wake = match state {
1128 LockState::ReadLock => {
1129 entry.read_count -= 1;
1130 entry.read_count == 0
1131 }
1132 LockState::Locked | LockState::WriteLock => {
1134 entry.state = LockState::ReadLock;
1135 true
1136 }
1137 };
1138 if wake {
1139 unsafe {
1141 entry.wake();
1142 }
1143 if entry.can_remove() {
1144 occupied.remove_entry();
1145 }
1146 }
1147 } else {
1148 unreachable!();
1149 }
1150 }
1151
1152 fn drop_read_locks(&mut self, lock_keys: LockKeys) {
1153 for lock in lock_keys.iter() {
1154 self.drop_lock(*lock, LockState::ReadLock);
1155 }
1156 }
1157
1158 fn drop_write_locks(&mut self, lock_keys: LockKeys) {
1159 for lock in lock_keys.iter() {
1160 self.drop_lock(*lock, LockState::WriteLock);
1163 }
1164 }
1165
1166 fn downgrade_locks(&mut self, lock_keys: &LockKeys) {
1168 for lock in lock_keys.iter() {
1169 unsafe {
1171 self.keys.get_mut(lock).unwrap().downgrade_lock();
1172 }
1173 }
1174 }
1175}
1176
1177#[derive(Debug)]
1178struct LockEntry {
1179 read_count: u64,
1182
1183 state: LockState,
1185
1186 head: *const LockWaker,
1191 tail: *const LockWaker,
1192}
1193
1194unsafe impl Send for LockEntry {}
1195
1196struct LockWaker {
1199 next: UnsafeCell<*const LockWaker>,
1201 prev: UnsafeCell<*const LockWaker>,
1202
1203 key: LockKey,
1206
1207 waker: UnsafeCell<WakerState>,
1209
1210 target_state: LockState,
1212
1213 is_upgrade: bool,
1215
1216 _pin: PhantomPinned,
1218}
1219
1220enum WakerState {
1221 Pending,
1223
1224 Registered(Waker),
1226
1227 Woken,
1229}
1230
1231impl WakerState {
1232 fn is_woken(&self) -> bool {
1233 matches!(self, WakerState::Woken)
1234 }
1235}
1236
1237unsafe impl Send for LockWaker {}
1238unsafe impl Sync for LockWaker {}
1239
1240impl LockWaker {
1241 async fn wait(&self, manager: &LockManager) {
1243 let waker_guard = scopeguard::guard((), |_| {
1245 let mut locks = manager.locks.lock();
1246 unsafe {
1248 if (*self.waker.get()).is_woken() {
1249 if self.is_upgrade {
1251 locks.keys.get_mut(&self.key).unwrap().downgrade_lock();
1252 } else {
1253 locks.drop_lock(self.key, self.target_state);
1254 }
1255 } else {
1256 locks.keys.get_mut(&self.key).unwrap().remove_waker(self);
1259 }
1260 }
1261 });
1262
1263 poll_fn(|cx| {
1264 let _locks = manager.locks.lock();
1265 unsafe {
1267 if (*self.waker.get()).is_woken() {
1268 Poll::Ready(())
1269 } else {
1270 *self.waker.get() = WakerState::Registered(cx.waker().clone());
1271 Poll::Pending
1272 }
1273 }
1274 })
1275 .await;
1276
1277 ScopeGuard::into_inner(waker_guard);
1278 }
1279}
1280
1281#[derive(Copy, Clone, Debug, PartialEq)]
1282enum LockState {
1283 ReadLock,
1285
1286 Locked,
1289
1290 WriteLock,
1292}
1293
1294impl LockManager {
1295 pub fn new() -> Self {
1296 LockManager { locks: Mutex::new(Locks { keys: HashMap::default() }) }
1297 }
1298
1299 pub async fn txn_lock<'a>(&'a self, lock_keys: LockKeys) -> TransactionLocks<'a> {
1303 TransactionLocks(
1304 debug_assert_not_too_long!(self.lock(lock_keys, LockState::Locked)).right().unwrap(),
1305 )
1306 }
1307
1308 async fn lock<'a>(
1311 &'a self,
1312 mut lock_keys: LockKeys,
1313 target_state: LockState,
1314 ) -> Either<ReadGuard<'a>, WriteGuard<'a>> {
1315 let mut guard = match &target_state {
1316 LockState::ReadLock => Left(ReadGuard {
1317 manager: self.into(),
1318 lock_keys: LockKeys::with_capacity(lock_keys.len()),
1319 }),
1320 LockState::Locked | LockState::WriteLock => Right(WriteGuard {
1321 manager: self.into(),
1322 lock_keys: LockKeys::with_capacity(lock_keys.len()),
1323 }),
1324 };
1325 let guard_keys = match &mut guard {
1326 Left(g) => &mut g.lock_keys,
1327 Right(g) => &mut g.lock_keys,
1328 };
1329 lock_keys.sort_unstable();
1330 lock_keys.dedup();
1331 for lock in lock_keys.iter() {
1332 let lock_waker = None;
1333 pin_mut!(lock_waker);
1334 {
1335 let mut locks = self.locks.lock();
1336 match locks.keys.entry(*lock) {
1337 Entry::Vacant(vacant) => {
1338 vacant.insert(LockEntry {
1339 read_count: if let LockState::ReadLock = target_state {
1340 guard_keys.push(*lock);
1341 1
1342 } else {
1343 guard_keys.push(*lock);
1344 0
1345 },
1346 state: target_state,
1347 head: std::ptr::null(),
1348 tail: std::ptr::null(),
1349 });
1350 }
1351 Entry::Occupied(mut occupied) => {
1352 let entry = occupied.get_mut();
1353 if unsafe { entry.is_allowed(target_state, entry.head.is_null()) } {
1355 if let LockState::ReadLock = target_state {
1356 entry.read_count += 1;
1357 guard_keys.push(*lock);
1358 } else {
1359 entry.state = target_state;
1360 guard_keys.push(*lock);
1361 }
1362 } else {
1363 unsafe {
1366 *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1367 next: UnsafeCell::new(std::ptr::null()),
1368 prev: UnsafeCell::new(entry.tail),
1369 key: *lock,
1370 waker: UnsafeCell::new(WakerState::Pending),
1371 target_state: target_state,
1372 is_upgrade: false,
1373 _pin: PhantomPinned,
1374 });
1375 }
1376 let waker = (*lock_waker).as_ref().unwrap();
1377 if entry.tail.is_null() {
1378 entry.head = waker;
1379 } else {
1380 unsafe {
1382 *(*entry.tail).next.get() = waker;
1383 }
1384 }
1385 entry.tail = waker;
1386 }
1387 }
1388 }
1389 }
1390 if let Some(waker) = &*lock_waker {
1391 waker.wait(self).await;
1392 guard_keys.push(*lock);
1393 }
1394 }
1395 guard
1396 }
1397
1398 pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
1400 let mut locks = self.locks.lock();
1401 locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks));
1402 }
1403
1404 pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
1406 self.commit_prepare_keys(&transaction.txn_locks).await;
1407 }
1408
1409 async fn commit_prepare_keys(&self, lock_keys: &LockKeys) {
1410 for lock in lock_keys.iter() {
1411 let lock_waker = None;
1412 pin_mut!(lock_waker);
1413 {
1414 let mut locks = self.locks.lock();
1415 let entry = locks.keys.get_mut(lock).unwrap();
1416 assert_eq!(entry.state, LockState::Locked);
1417
1418 if entry.read_count == 0 {
1419 entry.state = LockState::WriteLock;
1420 } else {
1421 unsafe {
1424 *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1425 next: UnsafeCell::new(entry.head),
1426 prev: UnsafeCell::new(std::ptr::null()),
1427 key: *lock,
1428 waker: UnsafeCell::new(WakerState::Pending),
1429 target_state: LockState::WriteLock,
1430 is_upgrade: true,
1431 _pin: PhantomPinned,
1432 });
1433 }
1434 let waker = (*lock_waker).as_ref().unwrap();
1435 if entry.head.is_null() {
1436 entry.tail = (*lock_waker).as_ref().unwrap();
1437 } else {
1438 unsafe {
1440 *(*entry.head).prev.get() = waker;
1441 }
1442 }
1443 entry.head = waker;
1444 }
1445 }
1446
1447 if let Some(waker) = &*lock_waker {
1448 waker.wait(self).await;
1449 }
1450 }
1451 }
1452
1453 pub async fn read_lock<'a>(&'a self, lock_keys: LockKeys) -> ReadGuard<'a> {
1459 debug_assert_not_too_long!(self.lock(lock_keys, LockState::ReadLock)).left().unwrap()
1460 }
1461
1462 pub async fn write_lock<'a>(&'a self, lock_keys: LockKeys) -> WriteGuard<'a> {
1465 debug_assert_not_too_long!(self.lock(lock_keys, LockState::WriteLock)).right().unwrap()
1466 }
1467
1468 pub fn downgrade_locks(&self, lock_keys: &LockKeys) {
1471 self.locks.lock().downgrade_locks(lock_keys);
1472 }
1473}
1474
1475impl LockEntry {
1477 unsafe fn wake(&mut self) {
1478 if self.head.is_null() || self.state == LockState::WriteLock {
1480 return;
1481 }
1482
1483 let waker = unsafe { &*self.head };
1484
1485 if waker.is_upgrade {
1486 if self.read_count > 0 {
1487 return;
1488 }
1489 } else if !unsafe { self.is_allowed(waker.target_state, true) } {
1490 return;
1491 }
1492
1493 unsafe { self.pop_and_wake() };
1494
1495 if waker.target_state == LockState::WriteLock {
1498 return;
1499 }
1500
1501 while !self.head.is_null() && unsafe { (*self.head).target_state } == LockState::ReadLock {
1502 unsafe { self.pop_and_wake() };
1503 }
1504 }
1505
1506 unsafe fn pop_and_wake(&mut self) {
1507 let waker = unsafe { &*self.head };
1508
1509 self.head = unsafe { *waker.next.get() };
1511 if self.head.is_null() {
1512 self.tail = std::ptr::null()
1513 } else {
1514 unsafe { *(*self.head).prev.get() = std::ptr::null() };
1515 }
1516
1517 if waker.target_state == LockState::ReadLock {
1519 self.read_count += 1;
1520 } else {
1521 self.state = waker.target_state;
1522 }
1523
1524 if let WakerState::Registered(waker) =
1526 std::mem::replace(unsafe { &mut *waker.waker.get() }, WakerState::Woken)
1527 {
1528 waker.wake();
1529 }
1530 }
1531
1532 fn can_remove(&self) -> bool {
1533 self.state == LockState::ReadLock && self.read_count == 0
1534 }
1535
1536 unsafe fn remove_waker(&mut self, waker: &LockWaker) {
1537 unsafe {
1538 let is_first = (*waker.prev.get()).is_null();
1539 if is_first {
1540 self.head = *waker.next.get();
1541 } else {
1542 *(**waker.prev.get()).next.get() = *waker.next.get();
1543 }
1544 if (*waker.next.get()).is_null() {
1545 self.tail = *waker.prev.get();
1546 } else {
1547 *(**waker.next.get()).prev.get() = *waker.prev.get();
1548 }
1549 if is_first {
1550 self.wake();
1553 }
1554 }
1555 }
1556
1557 unsafe fn is_allowed(&self, target_state: LockState, is_head: bool) -> bool {
1561 match self.state {
1562 LockState::ReadLock => {
1563 (self.read_count == 0
1565 || target_state == LockState::Locked
1566 || target_state == LockState::ReadLock)
1567 && is_head
1568 }
1569 LockState::Locked => {
1570 target_state == LockState::ReadLock
1574 && (is_head || unsafe { !(*self.head).is_upgrade })
1575 }
1576 LockState::WriteLock => false,
1577 }
1578 }
1579
1580 unsafe fn downgrade_lock(&mut self) {
1581 assert_eq!(std::mem::replace(&mut self.state, LockState::Locked), LockState::WriteLock);
1582 unsafe { self.wake() };
1583 }
1584}
1585
1586#[must_use]
1587pub struct ReadGuard<'a> {
1588 manager: LockManagerRef<'a>,
1589 lock_keys: LockKeys,
1590}
1591
1592impl ReadGuard<'_> {
1593 pub fn fs(&self) -> Option<&Arc<FxFilesystem>> {
1594 if let LockManagerRef::Owned(fs) = &self.manager { Some(fs) } else { None }
1595 }
1596
1597 pub fn into_owned(mut self, fs: Arc<FxFilesystem>) -> ReadGuard<'static> {
1598 ReadGuard {
1599 manager: LockManagerRef::Owned(fs),
1600 lock_keys: std::mem::replace(&mut self.lock_keys, LockKeys::None),
1601 }
1602 }
1603}
1604
1605impl Drop for ReadGuard<'_> {
1606 fn drop(&mut self) {
1607 let mut locks = self.manager.locks.lock();
1608 locks.drop_read_locks(std::mem::take(&mut self.lock_keys));
1609 }
1610}
1611
1612impl fmt::Debug for ReadGuard<'_> {
1613 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1614 f.debug_struct("ReadGuard")
1615 .field("manager", &(&self.manager as *const _))
1616 .field("lock_keys", &self.lock_keys)
1617 .finish()
1618 }
1619}
1620
1621#[must_use]
1622pub struct WriteGuard<'a> {
1623 manager: LockManagerRef<'a>,
1624 lock_keys: LockKeys,
1625}
1626
1627impl Drop for WriteGuard<'_> {
1628 fn drop(&mut self) {
1629 let mut locks = self.manager.locks.lock();
1630 locks.drop_write_locks(std::mem::take(&mut self.lock_keys));
1631 }
1632}
1633
1634impl fmt::Debug for WriteGuard<'_> {
1635 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1636 f.debug_struct("WriteGuard")
1637 .field("manager", &(&self.manager as *const _))
1638 .field("lock_keys", &self.lock_keys)
1639 .finish()
1640 }
1641}
1642
1643enum LockManagerRef<'a> {
1644 Borrowed(&'a LockManager),
1645 Owned(Arc<FxFilesystem>),
1646}
1647
1648impl Deref for LockManagerRef<'_> {
1649 type Target = LockManager;
1650
1651 fn deref(&self) -> &Self::Target {
1652 match self {
1653 LockManagerRef::Borrowed(m) => m,
1654 LockManagerRef::Owned(f) => f.lock_manager(),
1655 }
1656 }
1657}
1658
1659impl<'a> From<&'a LockManager> for LockManagerRef<'a> {
1660 fn from(value: &'a LockManager) -> Self {
1661 LockManagerRef::Borrowed(value)
1662 }
1663}
1664
1665#[cfg(test)]
1666mod tests {
1667 use super::{LockKey, LockKeys, LockManager, LockState, Mutation, Options};
1668 use crate::filesystem::FxFilesystem;
1669 use fuchsia_async as fasync;
1670 use fuchsia_sync::Mutex;
1671 use futures::channel::oneshot::channel;
1672 use futures::future::FutureExt;
1673 use futures::stream::FuturesUnordered;
1674 use futures::{StreamExt, join, pin_mut};
1675 use std::task::Poll;
1676 use std::time::Duration;
1677 use storage_device::DeviceHolder;
1678 use storage_device::fake_device::FakeDevice;
1679
1680 #[fuchsia::test]
1681 async fn test_simple() {
1682 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1683 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1684 let mut t = fs
1685 .clone()
1686 .new_transaction(lock_keys![], Options::default())
1687 .await
1688 .expect("new_transaction failed");
1689 t.add(1, Mutation::BeginFlush);
1690 assert!(!t.is_empty());
1691 }
1692
1693 #[fuchsia::test]
1694 async fn test_locks() {
1695 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1696 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1697 let (send1, recv1) = channel();
1698 let (send2, recv2) = channel();
1699 let (send3, recv3) = channel();
1700 let done = Mutex::new(false);
1701 let mut futures = FuturesUnordered::new();
1702 futures.push(
1703 async {
1704 let _t = fs
1705 .clone()
1706 .new_transaction(
1707 lock_keys![LockKey::object_attribute(1, 2, 3)],
1708 Options::default(),
1709 )
1710 .await
1711 .expect("new_transaction failed");
1712 send1.send(()).unwrap(); send3.send(()).unwrap(); recv2.await.unwrap();
1715 fasync::Timer::new(Duration::from_millis(100)).await;
1717 assert!(!*done.lock());
1718 }
1719 .boxed(),
1720 );
1721 futures.push(
1722 async {
1723 recv1.await.unwrap();
1724 let _t = fs
1726 .clone()
1727 .new_transaction(
1728 lock_keys![LockKey::object_attribute(2, 2, 3)],
1729 Options::default(),
1730 )
1731 .await
1732 .expect("new_transaction failed");
1733 send2.send(()).unwrap();
1735 }
1736 .boxed(),
1737 );
1738 futures.push(
1739 async {
1740 recv3.await.unwrap();
1742 let _t = fs
1743 .clone()
1744 .new_transaction(
1745 lock_keys![LockKey::object_attribute(1, 2, 3)],
1746 Options::default(),
1747 )
1748 .await;
1749 *done.lock() = true;
1750 }
1751 .boxed(),
1752 );
1753 while let Some(()) = futures.next().await {}
1754 }
1755
1756 #[fuchsia::test]
1757 async fn test_read_lock_after_write_lock() {
1758 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1759 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1760 let (send1, recv1) = channel();
1761 let (send2, recv2) = channel();
1762 let done = Mutex::new(false);
1763 join!(
1764 async {
1765 let t = fs
1766 .clone()
1767 .new_transaction(
1768 lock_keys![LockKey::object_attribute(1, 2, 3)],
1769 Options::default(),
1770 )
1771 .await
1772 .expect("new_transaction failed");
1773 send1.send(()).unwrap(); recv2.await.unwrap();
1775 t.commit().await.expect("commit failed");
1776 *done.lock() = true;
1777 },
1778 async {
1779 recv1.await.unwrap();
1780 let _guard = fs
1782 .lock_manager()
1783 .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1784 .await;
1785 send2.send(()).unwrap();
1787 fasync::Timer::new(Duration::from_millis(100)).await;
1790 assert!(!*done.lock());
1791 },
1792 );
1793 }
1794
1795 #[fuchsia::test]
1796 async fn test_write_lock_after_read_lock() {
1797 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1798 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1799 let (send1, recv1) = channel();
1800 let (send2, recv2) = channel();
1801 let done = Mutex::new(false);
1802 join!(
1803 async {
1804 let _guard = fs
1806 .lock_manager()
1807 .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1808 .await;
1809 send1.send(()).unwrap();
1811 recv2.await.unwrap();
1812 fasync::Timer::new(Duration::from_millis(100)).await;
1815 assert!(!*done.lock());
1816 },
1817 async {
1818 recv1.await.unwrap();
1819 let t = fs
1820 .clone()
1821 .new_transaction(
1822 lock_keys![LockKey::object_attribute(1, 2, 3)],
1823 Options::default(),
1824 )
1825 .await
1826 .expect("new_transaction failed");
1827 send2.send(()).unwrap(); t.commit().await.expect("commit failed");
1829 *done.lock() = true;
1830 },
1831 );
1832 }
1833
1834 #[fuchsia::test]
1835 async fn test_drop_uncommitted_transaction() {
1836 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1837 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1838 let key = lock_keys![LockKey::object(1, 1)];
1839
1840 {
1842 let _write_lock = fs
1843 .clone()
1844 .new_transaction(key.clone(), Options::default())
1845 .await
1846 .expect("new_transaction failed");
1847 let _read_lock = fs.lock_manager().read_lock(key.clone()).await;
1848 }
1849 {
1851 let _write_lock = fs
1852 .clone()
1853 .new_transaction(key.clone(), Options::default())
1854 .await
1855 .expect("new_transaction failed");
1856 }
1857 fs.clone()
1859 .new_transaction(key.clone(), Options::default())
1860 .await
1861 .expect("new_transaction failed");
1862 }
1863
1864 #[fuchsia::test]
1865 async fn test_drop_waiting_write_lock() {
1866 let manager = LockManager::new();
1867 let keys = lock_keys![LockKey::object(1, 1)];
1868 {
1869 let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1870 if let Poll::Ready(_) =
1871 futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1872 {
1873 assert!(false);
1874 }
1875 }
1876 let _ = manager.lock(keys, LockState::WriteLock).await;
1877 }
1878
1879 #[fuchsia::test]
1880 async fn test_write_lock_blocks_everything() {
1881 let manager = LockManager::new();
1882 let keys = lock_keys![LockKey::object(1, 1)];
1883 {
1884 let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1885 if let Poll::Ready(_) =
1886 futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1887 {
1888 assert!(false);
1889 }
1890 if let Poll::Ready(_) =
1891 futures::poll!(manager.lock(keys.clone(), LockState::ReadLock).boxed())
1892 {
1893 assert!(false);
1894 }
1895 }
1896 {
1897 let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1898 }
1899 {
1900 let _guard = manager.lock(keys, LockState::ReadLock).await;
1901 }
1902 }
1903
1904 #[fuchsia::test]
1905 async fn test_downgrade_locks() {
1906 let manager = LockManager::new();
1907 let keys = lock_keys![LockKey::object(1, 1)];
1908 let _guard = manager.txn_lock(keys.clone()).await;
1909 manager.commit_prepare_keys(&keys).await;
1910
1911 let mut read_lock: FuturesUnordered<_> =
1913 std::iter::once(manager.read_lock(keys.clone())).collect();
1914
1915 assert!(futures::poll!(read_lock.next()).is_pending());
1917
1918 manager.downgrade_locks(&keys);
1919
1920 assert!(futures::poll!(read_lock.next()).is_ready());
1922 }
1923
1924 #[fuchsia::test]
1925 async fn test_dropped_write_lock_wakes() {
1926 let manager = LockManager::new();
1927 let keys = lock_keys![LockKey::object(1, 1)];
1928 let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1929 let mut read_lock = FuturesUnordered::new();
1930 read_lock.push(manager.lock(keys.clone(), LockState::ReadLock));
1931
1932 {
1933 let write_lock = manager.lock(keys, LockState::WriteLock);
1934 pin_mut!(write_lock);
1935
1936 assert!(futures::poll!(write_lock).is_pending());
1938
1939 assert!(futures::poll!(read_lock.next()).is_pending());
1941 }
1942
1943 assert!(futures::poll!(read_lock.next()).is_ready());
1945 }
1946
1947 #[fuchsia::test]
1948 async fn test_drop_upgrade() {
1949 let manager = LockManager::new();
1950 let keys = lock_keys![LockKey::object(1, 1)];
1951 let _guard = manager.lock(keys.clone(), LockState::Locked).await;
1952
1953 {
1954 let commit_prepare = manager.commit_prepare_keys(&keys);
1955 pin_mut!(commit_prepare);
1956 let _read_guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1957 assert!(futures::poll!(commit_prepare).is_pending());
1958
1959 }
1962
1963 manager.commit_prepare_keys(&keys).await;
1965 }
1966
1967 #[fasync::run_singlethreaded(test)]
1968 async fn test_woken_upgrade_blocks_reads() {
1969 let manager = LockManager::new();
1970 let keys = lock_keys![LockKey::object(1, 1)];
1971 let guard = manager.lock(keys.clone(), LockState::Locked).await;
1973
1974 let read1 = manager.lock(keys.clone(), LockState::ReadLock).await;
1976
1977 let commit_prepare = manager.commit_prepare_keys(&keys);
1979 pin_mut!(commit_prepare);
1980 assert!(futures::poll!(commit_prepare.as_mut()).is_pending());
1981
1982 let read2 = manager.lock(keys.clone(), LockState::ReadLock);
1984 pin_mut!(read2);
1985 assert!(futures::poll!(read2.as_mut()).is_pending());
1986
1987 std::mem::drop(read1);
1989 assert!(futures::poll!(commit_prepare).is_ready());
1990
1991 assert!(futures::poll!(read2.as_mut()).is_pending());
1993
1994 std::mem::drop(guard);
1996 assert!(futures::poll!(read2).is_ready());
1997 }
1998
1999 static LOCK_KEY_1: LockKey = LockKey::flush(1);
2000 static LOCK_KEY_2: LockKey = LockKey::flush(2);
2001 static LOCK_KEY_3: LockKey = LockKey::flush(3);
2002
2003 fn assert_lock_keys_equal(value: &LockKeys, expected: &LockKeys) {
2005 match (value, expected) {
2006 (LockKeys::None, LockKeys::None) => {}
2007 (LockKeys::Inline(key1), LockKeys::Inline(key2)) => {
2008 if key1 != key2 {
2009 panic!("{key1:?} != {key2:?}");
2010 }
2011 }
2012 (LockKeys::Vec(vec1), LockKeys::Vec(vec2)) => {
2013 if vec1 != vec2 {
2014 panic!("{vec1:?} != {vec2:?}");
2015 }
2016 if vec1.capacity() != vec2.capacity() {
2017 panic!(
2018 "LockKeys have different capacity: {} != {}",
2019 vec1.capacity(),
2020 vec2.capacity()
2021 );
2022 }
2023 }
2024 (_, _) => panic!("{value:?} != {expected:?}"),
2025 }
2026 }
2027
2028 fn assert_lock_keys_equivalent(value: &LockKeys, expected: &LockKeys) {
2030 let value: Vec<_> = value.iter().collect();
2031 let expected: Vec<_> = expected.iter().collect();
2032 assert_eq!(value, expected);
2033 }
2034
2035 #[test]
2036 fn test_lock_keys_macro() {
2037 assert_lock_keys_equal(&lock_keys![], &LockKeys::None);
2038 assert_lock_keys_equal(&lock_keys![LOCK_KEY_1], &LockKeys::Inline(LOCK_KEY_1));
2039 assert_lock_keys_equal(
2040 &lock_keys![LOCK_KEY_1, LOCK_KEY_2],
2041 &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]),
2042 );
2043 }
2044
2045 #[test]
2046 fn test_lock_keys_with_capacity() {
2047 assert_lock_keys_equal(&LockKeys::with_capacity(0), &LockKeys::None);
2048 assert_lock_keys_equal(&LockKeys::with_capacity(1), &LockKeys::None);
2049 assert_lock_keys_equal(&LockKeys::with_capacity(2), &LockKeys::Vec(Vec::with_capacity(2)));
2050 }
2051
2052 #[test]
2053 fn test_lock_keys_len() {
2054 assert_eq!(lock_keys![].len(), 0);
2055 assert_eq!(lock_keys![LOCK_KEY_1].len(), 1);
2056 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].len(), 2);
2057 }
2058
2059 #[test]
2060 fn test_lock_keys_contains() {
2061 assert_eq!(lock_keys![].contains(&LOCK_KEY_1), false);
2062 assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_1), true);
2063 assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_2), false);
2064 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_1), true);
2065 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_2), true);
2066 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_3), false);
2067 }
2068
2069 #[test]
2070 fn test_lock_keys_push() {
2071 let mut keys = lock_keys![];
2072 keys.push(LOCK_KEY_1);
2073 assert_lock_keys_equal(&keys, &LockKeys::Inline(LOCK_KEY_1));
2074 keys.push(LOCK_KEY_2);
2075 assert_lock_keys_equal(&keys, &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]));
2076 keys.push(LOCK_KEY_3);
2077 assert_lock_keys_equivalent(
2078 &keys,
2079 &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2, LOCK_KEY_3]),
2080 );
2081 }
2082
2083 #[test]
2084 fn test_lock_keys_sort_unstable() {
2085 let mut keys = lock_keys![];
2086 keys.sort_unstable();
2087 assert_lock_keys_equal(&keys, &lock_keys![]);
2088
2089 let mut keys = lock_keys![LOCK_KEY_1];
2090 keys.sort_unstable();
2091 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2092
2093 let mut keys = lock_keys![LOCK_KEY_2, LOCK_KEY_1];
2094 keys.sort_unstable();
2095 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2096 }
2097
2098 #[test]
2099 fn test_lock_keys_dedup() {
2100 let mut keys = lock_keys![];
2101 keys.dedup();
2102 assert_lock_keys_equal(&keys, &lock_keys![]);
2103
2104 let mut keys = lock_keys![LOCK_KEY_1];
2105 keys.dedup();
2106 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2107
2108 let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_1];
2109 keys.dedup();
2110 assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2111 }
2112
2113 #[test]
2114 fn test_lock_keys_truncate() {
2115 let mut keys = lock_keys![];
2116 keys.truncate(5);
2117 assert_lock_keys_equal(&keys, &lock_keys![]);
2118 keys.truncate(0);
2119 assert_lock_keys_equal(&keys, &lock_keys![]);
2120
2121 let mut keys = lock_keys![LOCK_KEY_1];
2122 keys.truncate(5);
2123 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2124 keys.truncate(0);
2125 assert_lock_keys_equal(&keys, &lock_keys![]);
2126
2127 let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_2];
2128 keys.truncate(5);
2129 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2130 keys.truncate(1);
2131 assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2133 }
2134
2135 #[test]
2136 fn test_lock_keys_iter() {
2137 assert_eq!(lock_keys![].iter().collect::<Vec<_>>(), Vec::<&LockKey>::new());
2138
2139 assert_eq!(lock_keys![LOCK_KEY_1].iter().collect::<Vec<_>>(), vec![&LOCK_KEY_1]);
2140
2141 assert_eq!(
2142 lock_keys![LOCK_KEY_1, LOCK_KEY_2].iter().collect::<Vec<_>>(),
2143 vec![&LOCK_KEY_1, &LOCK_KEY_2]
2144 );
2145 }
2146}