1use crate::checksum::Checksum;
6use crate::filesystem::{FxFilesystem, TxnGuard};
7use crate::log::*;
8use crate::lsm_tree::types::Item;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::AttributeKey;
11use crate::object_store::allocator::{AllocatorItem, Reservation};
12use crate::object_store::object_manager::{ObjectManager, reserved_space_from_journal_usage};
13use crate::object_store::object_record::{
14 FxfsKey, FxfsKeyV40, FxfsKeyV49, ObjectItem, ObjectItemV40, ObjectItemV41, ObjectItemV43,
15 ObjectItemV46, ObjectItemV47, ObjectItemV49, ObjectItemV50, ObjectKey, ObjectKeyData,
16 ObjectValue, ProjectProperty,
17};
18use crate::serialized_types::{Migrate, Versioned, migrate_nodefault, migrate_to_version};
19use anyhow::Error;
20use either::{Either, Left, Right};
21use fprint::TypeFingerprint;
22use fuchsia_sync::Mutex;
23use futures::future::poll_fn;
24use futures::pin_mut;
25use rustc_hash::FxHashMap as HashMap;
26use scopeguard::ScopeGuard;
27use serde::{Deserialize, Serialize};
28use std::cell::UnsafeCell;
29use std::cmp::Ordering;
30use std::collections::BTreeSet;
31use std::collections::hash_map::Entry;
32use std::marker::PhantomPinned;
33use std::ops::{Deref, DerefMut, Range};
34use std::sync::Arc;
35use std::task::{Poll, Waker};
36use std::{fmt, mem};
37
38#[derive(Clone, Copy, Default)]
42pub struct Options<'a> {
43 pub skip_journal_checks: bool,
46
47 pub borrow_metadata_space: bool,
52
53 pub allocator_reservation: Option<&'a Reservation>,
58
59 pub txn_guard: Option<&'a TxnGuard<'a>>,
61}
62
63pub const TRANSACTION_MAX_JOURNAL_USAGE: u64 = 24_576;
70pub const TRANSACTION_METADATA_MAX_AMOUNT: u64 =
71 reserved_space_from_journal_usage(TRANSACTION_MAX_JOURNAL_USAGE);
72
73#[must_use]
74pub struct TransactionLocks<'a>(pub WriteGuard<'a>);
75
76pub type Mutation = MutationV50;
81
82#[derive(
83 Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint, Versioned,
84)]
85#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
86pub enum MutationV50 {
87 ObjectStore(ObjectStoreMutationV50),
88 EncryptedObjectStore(Box<[u8]>),
89 Allocator(AllocatorMutationV32),
90 BeginFlush,
92 EndFlush,
95 DeleteVolume,
97 UpdateBorrowed(u64),
98 UpdateMutationsKey(UpdateMutationsKeyV49),
99 CreateInternalDir(u64),
100}
101
102#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
103#[migrate_to_version(MutationV50)]
104pub enum MutationV49 {
105 ObjectStore(ObjectStoreMutationV49),
106 EncryptedObjectStore(Box<[u8]>),
107 Allocator(AllocatorMutationV32),
108 BeginFlush,
109 EndFlush,
110 DeleteVolume,
111 UpdateBorrowed(u64),
112 UpdateMutationsKey(UpdateMutationsKeyV49),
113 CreateInternalDir(u64),
114}
115
116#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
117#[migrate_to_version(MutationV49)]
118pub enum MutationV47 {
119 ObjectStore(ObjectStoreMutationV47),
120 EncryptedObjectStore(Box<[u8]>),
121 Allocator(AllocatorMutationV32),
122 BeginFlush,
123 EndFlush,
124 DeleteVolume,
125 UpdateBorrowed(u64),
126 UpdateMutationsKey(UpdateMutationsKeyV40),
127 CreateInternalDir(u64),
128}
129
130#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
131#[migrate_to_version(MutationV47)]
132pub enum MutationV46 {
133 ObjectStore(ObjectStoreMutationV46),
134 EncryptedObjectStore(Box<[u8]>),
135 Allocator(AllocatorMutationV32),
136 BeginFlush,
137 EndFlush,
138 DeleteVolume,
139 UpdateBorrowed(u64),
140 UpdateMutationsKey(UpdateMutationsKeyV40),
141 CreateInternalDir(u64),
142}
143
144#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
145#[migrate_to_version(MutationV46)]
146pub enum MutationV43 {
147 ObjectStore(ObjectStoreMutationV43),
148 EncryptedObjectStore(Box<[u8]>),
149 Allocator(AllocatorMutationV32),
150 BeginFlush,
151 EndFlush,
152 DeleteVolume,
153 UpdateBorrowed(u64),
154 UpdateMutationsKey(UpdateMutationsKeyV40),
155 CreateInternalDir(u64),
156}
157
158#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
159#[migrate_to_version(MutationV43)]
160pub enum MutationV41 {
161 ObjectStore(ObjectStoreMutationV41),
162 EncryptedObjectStore(Box<[u8]>),
163 Allocator(AllocatorMutationV32),
164 BeginFlush,
165 EndFlush,
166 DeleteVolume,
167 UpdateBorrowed(u64),
168 UpdateMutationsKey(UpdateMutationsKeyV40),
169 CreateInternalDir(u64),
170}
171
172#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
173#[migrate_to_version(MutationV41)]
174pub enum MutationV40 {
175 ObjectStore(ObjectStoreMutationV40),
176 EncryptedObjectStore(Box<[u8]>),
177 Allocator(AllocatorMutationV32),
178 BeginFlush,
179 EndFlush,
180 DeleteVolume,
181 UpdateBorrowed(u64),
182 UpdateMutationsKey(UpdateMutationsKeyV40),
183 CreateInternalDir(u64),
184}
185
186impl Mutation {
187 pub fn insert_object(key: ObjectKey, value: ObjectValue) -> Self {
188 Mutation::ObjectStore(ObjectStoreMutation {
189 item: Item::new(key, value),
190 op: Operation::Insert,
191 })
192 }
193
194 pub fn replace_or_insert_object(key: ObjectKey, value: ObjectValue) -> Self {
195 Mutation::ObjectStore(ObjectStoreMutation {
196 item: Item::new(key, value),
197 op: Operation::ReplaceOrInsert,
198 })
199 }
200
201 pub fn merge_object(key: ObjectKey, value: ObjectValue) -> Self {
202 Mutation::ObjectStore(ObjectStoreMutation {
203 item: Item::new(key, value),
204 op: Operation::Merge,
205 })
206 }
207
208 pub fn update_mutations_key(key: FxfsKey) -> Self {
209 Mutation::UpdateMutationsKey(key.into())
210 }
211}
212
213pub type ObjectStoreMutation = ObjectStoreMutationV50;
217
218#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
219#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
220pub struct ObjectStoreMutationV50 {
221 pub item: ObjectItemV50,
222 pub op: OperationV32,
223}
224
225#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
226#[migrate_to_version(ObjectStoreMutationV50)]
227#[migrate_nodefault]
228pub struct ObjectStoreMutationV49 {
229 pub item: ObjectItemV49,
230 pub op: OperationV32,
231}
232
233#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
234#[migrate_to_version(ObjectStoreMutationV49)]
235#[migrate_nodefault]
236pub struct ObjectStoreMutationV47 {
237 pub item: ObjectItemV47,
238 pub op: OperationV32,
239}
240
241#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
242#[migrate_to_version(ObjectStoreMutationV47)]
243#[migrate_nodefault]
244pub struct ObjectStoreMutationV46 {
245 pub item: ObjectItemV46,
246 pub op: OperationV32,
247}
248
249#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
250#[migrate_to_version(ObjectStoreMutationV46)]
251#[migrate_nodefault]
252pub struct ObjectStoreMutationV43 {
253 pub item: ObjectItemV43,
254 pub op: OperationV32,
255}
256
257#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
258#[migrate_to_version(ObjectStoreMutationV43)]
259#[migrate_nodefault]
260pub struct ObjectStoreMutationV41 {
261 pub item: ObjectItemV41,
262 pub op: OperationV32,
263}
264
265#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
266#[migrate_nodefault]
267#[migrate_to_version(ObjectStoreMutationV41)]
268pub struct ObjectStoreMutationV40 {
269 pub item: ObjectItemV40,
270 pub op: OperationV32,
271}
272
273pub type Operation = OperationV32;
275
276#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
277#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
278pub enum OperationV32 {
279 Insert,
280 ReplaceOrInsert,
281 Merge,
282}
283
284impl Ord for ObjectStoreMutation {
285 fn cmp(&self, other: &Self) -> Ordering {
286 self.item.key.cmp(&other.item.key)
287 }
288}
289
290impl PartialOrd for ObjectStoreMutation {
291 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
292 Some(self.cmp(other))
293 }
294}
295
296impl PartialEq for ObjectStoreMutation {
297 fn eq(&self, other: &Self) -> bool {
298 self.item.key.eq(&other.item.key)
299 }
300}
301
302impl Eq for ObjectStoreMutation {}
303
304impl Ord for AllocatorItem {
305 fn cmp(&self, other: &Self) -> Ordering {
306 self.key.cmp(&other.key)
307 }
308}
309
310impl PartialOrd for AllocatorItem {
311 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
312 Some(self.cmp(other))
313 }
314}
315
316pub type DeviceRange = DeviceRangeV32;
319
320#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TypeFingerprint)]
321#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
322pub struct DeviceRangeV32(pub Range<u64>);
323
324impl Deref for DeviceRange {
325 type Target = Range<u64>;
326
327 fn deref(&self) -> &Self::Target {
328 &self.0
329 }
330}
331
332impl DerefMut for DeviceRange {
333 fn deref_mut(&mut self) -> &mut Self::Target {
334 &mut self.0
335 }
336}
337
338impl From<Range<u64>> for DeviceRange {
339 fn from(range: Range<u64>) -> Self {
340 Self(range)
341 }
342}
343
344impl Into<Range<u64>> for DeviceRange {
345 fn into(self) -> Range<u64> {
346 self.0
347 }
348}
349
350impl Ord for DeviceRange {
351 fn cmp(&self, other: &Self) -> Ordering {
352 self.start.cmp(&other.start).then(self.end.cmp(&other.end))
353 }
354}
355
356impl PartialOrd for DeviceRange {
357 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
358 Some(self.cmp(other))
359 }
360}
361
362pub type AllocatorMutation = AllocatorMutationV32;
363
364#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize, TypeFingerprint)]
365#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
366pub enum AllocatorMutationV32 {
367 Allocate {
368 device_range: DeviceRangeV32,
369 owner_object_id: u64,
370 },
371 Deallocate {
372 device_range: DeviceRangeV32,
373 owner_object_id: u64,
374 },
375 SetLimit {
376 owner_object_id: u64,
377 bytes: u64,
378 },
379 MarkForDeletion(u64),
385}
386
387pub type UpdateMutationsKey = UpdateMutationsKeyV49;
388
389#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
390pub struct UpdateMutationsKeyV49(pub FxfsKeyV49);
391
392#[derive(Migrate, Serialize, Deserialize, TypeFingerprint)]
393#[migrate_to_version(UpdateMutationsKeyV49)]
394pub struct UpdateMutationsKeyV40(pub FxfsKeyV40);
395
396impl From<UpdateMutationsKey> for FxfsKey {
397 fn from(outer: UpdateMutationsKey) -> Self {
398 outer.0
399 }
400}
401
402impl From<FxfsKey> for UpdateMutationsKey {
403 fn from(inner: FxfsKey) -> Self {
404 Self(inner)
405 }
406}
407
408#[cfg(fuzz)]
409impl<'a> arbitrary::Arbitrary<'a> for UpdateMutationsKey {
410 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
411 Ok(UpdateMutationsKey::from(FxfsKey::arbitrary(u).unwrap()))
412 }
413}
414
415impl Ord for UpdateMutationsKey {
416 fn cmp(&self, other: &Self) -> Ordering {
417 (self as *const UpdateMutationsKey).cmp(&(other as *const _))
418 }
419}
420
421impl PartialOrd for UpdateMutationsKey {
422 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
423 Some(self.cmp(other))
424 }
425}
426
427impl Eq for UpdateMutationsKey {}
428
429impl PartialEq for UpdateMutationsKey {
430 fn eq(&self, other: &Self) -> bool {
431 std::ptr::eq(self, other)
432 }
433}
434
435#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Copy)]
445pub enum LockKey {
446 Filesystem,
448
449 Flush {
451 object_id: u64,
452 },
453
454 ObjectAttribute {
456 store_object_id: u64,
457 object_id: u64,
458 attribute_id: u64,
459 },
460
461 Object {
463 store_object_id: u64,
464 object_id: u64,
465 },
466
467 ProjectId {
468 store_object_id: u64,
469 project_id: u64,
470 },
471
472 Truncate {
474 store_object_id: u64,
475 object_id: u64,
476 },
477}
478
479impl LockKey {
480 pub const fn object_attribute(store_object_id: u64, object_id: u64, attribute_id: u64) -> Self {
481 LockKey::ObjectAttribute { store_object_id, object_id, attribute_id }
482 }
483
484 pub const fn object(store_object_id: u64, object_id: u64) -> Self {
485 LockKey::Object { store_object_id, object_id }
486 }
487
488 pub const fn flush(object_id: u64) -> Self {
489 LockKey::Flush { object_id }
490 }
491
492 pub const fn truncate(store_object_id: u64, object_id: u64) -> Self {
493 LockKey::Truncate { store_object_id, object_id }
494 }
495}
496
497#[derive(Clone, Debug)]
499pub enum LockKeys {
500 None,
501 Inline(LockKey),
502 Vec(Vec<LockKey>),
503}
504
505impl LockKeys {
506 pub fn with_capacity(capacity: usize) -> Self {
507 if capacity > 1 { LockKeys::Vec(Vec::with_capacity(capacity)) } else { LockKeys::None }
508 }
509
510 pub fn push(&mut self, key: LockKey) {
511 match self {
512 Self::None => *self = LockKeys::Inline(key),
513 Self::Inline(inline) => {
514 *self = LockKeys::Vec(vec![*inline, key]);
515 }
516 Self::Vec(vec) => vec.push(key),
517 }
518 }
519
520 pub fn truncate(&mut self, len: usize) {
521 match self {
522 Self::None => {}
523 Self::Inline(_) => {
524 if len == 0 {
525 *self = Self::None;
526 }
527 }
528 Self::Vec(vec) => vec.truncate(len),
529 }
530 }
531
532 fn len(&self) -> usize {
533 match self {
534 Self::None => 0,
535 Self::Inline(_) => 1,
536 Self::Vec(vec) => vec.len(),
537 }
538 }
539
540 fn contains(&self, key: &LockKey) -> bool {
541 match self {
542 Self::None => false,
543 Self::Inline(single) => single == key,
544 Self::Vec(vec) => vec.contains(key),
545 }
546 }
547
548 fn sort_unstable(&mut self) {
549 match self {
550 Self::Vec(vec) => vec.sort_unstable(),
551 _ => {}
552 }
553 }
554
555 fn dedup(&mut self) {
556 match self {
557 Self::Vec(vec) => vec.dedup(),
558 _ => {}
559 }
560 }
561
562 fn iter(&self) -> LockKeysIter<'_> {
563 match self {
564 LockKeys::None => LockKeysIter::None,
565 LockKeys::Inline(key) => LockKeysIter::Inline(key),
566 LockKeys::Vec(keys) => LockKeysIter::Vec(keys.into_iter()),
567 }
568 }
569}
570
571enum LockKeysIter<'a> {
572 None,
573 Inline(&'a LockKey),
574 Vec(<&'a Vec<LockKey> as IntoIterator>::IntoIter),
575}
576
577impl<'a> Iterator for LockKeysIter<'a> {
578 type Item = &'a LockKey;
579 fn next(&mut self) -> Option<Self::Item> {
580 match self {
581 Self::None => None,
582 Self::Inline(inline) => {
583 let next = *inline;
584 *self = Self::None;
585 Some(next)
586 }
587 Self::Vec(vec) => vec.next(),
588 }
589 }
590}
591
592impl Default for LockKeys {
593 fn default() -> Self {
594 LockKeys::None
595 }
596}
597
598#[macro_export]
599macro_rules! lock_keys {
600 () => {
601 $crate::object_store::transaction::LockKeys::None
602 };
603 ($lock_key:expr $(,)?) => {
604 $crate::object_store::transaction::LockKeys::Inline($lock_key)
605 };
606 ($($lock_keys:expr),+ $(,)?) => {
607 $crate::object_store::transaction::LockKeys::Vec(vec![$($lock_keys),+])
608 };
609}
610pub use lock_keys;
611
612pub trait AssociatedObject: Send + Sync {
616 fn will_apply_mutation(&self, _mutation: &Mutation, _object_id: u64, _manager: &ObjectManager) {
617 }
618}
619
620pub enum AssocObj<'a> {
621 None,
622 Borrowed(&'a dyn AssociatedObject),
623 Owned(Box<dyn AssociatedObject>),
624}
625
626impl AssocObj<'_> {
627 pub fn map<R, F: FnOnce(&dyn AssociatedObject) -> R>(&self, f: F) -> Option<R> {
628 match self {
629 AssocObj::None => None,
630 AssocObj::Borrowed(b) => Some(f(*b)),
631 AssocObj::Owned(o) => Some(f(o.as_ref())),
632 }
633 }
634}
635
636pub struct TxnMutation<'a> {
637 pub object_id: u64,
641
642 pub mutation: Mutation,
644
645 pub associated_object: AssocObj<'a>,
648}
649
650impl Ord for TxnMutation<'_> {
653 fn cmp(&self, other: &Self) -> Ordering {
654 self.object_id.cmp(&other.object_id).then_with(|| self.mutation.cmp(&other.mutation))
655 }
656}
657
658impl PartialOrd for TxnMutation<'_> {
659 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
660 Some(self.cmp(other))
661 }
662}
663
664impl PartialEq for TxnMutation<'_> {
665 fn eq(&self, other: &Self) -> bool {
666 self.object_id.eq(&other.object_id) && self.mutation.eq(&other.mutation)
667 }
668}
669
670impl Eq for TxnMutation<'_> {}
671
672impl std::fmt::Debug for TxnMutation<'_> {
673 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
674 f.debug_struct("TxnMutation")
675 .field("object_id", &self.object_id)
676 .field("mutation", &self.mutation)
677 .finish()
678 }
679}
680
681pub enum MetadataReservation {
682 None,
684
685 Borrowed,
688
689 Reservation(Reservation),
691
692 Hold(u64),
694}
695
696pub struct Transaction<'a> {
698 txn_guard: TxnGuard<'a>,
699
700 mutations: BTreeSet<TxnMutation<'a>>,
702
703 txn_locks: LockKeys,
705
706 pub allocator_reservation: Option<&'a Reservation>,
708
709 pub metadata_reservation: MetadataReservation,
711
712 new_objects: BTreeSet<(u64, u64)>,
715
716 checksums: Vec<(Range<u64>, Vec<Checksum>, bool)>,
718
719 includes_write: bool,
721}
722
723impl<'a> Transaction<'a> {
724 pub async fn new(
727 txn_guard: TxnGuard<'a>,
728 options: Options<'a>,
729 txn_locks: LockKeys,
730 ) -> Result<Transaction<'a>, Error> {
731 txn_guard.fs().add_transaction(options.skip_journal_checks).await;
732 let fs = txn_guard.fs().clone();
733 let guard = scopeguard::guard((), |_| fs.sub_transaction());
734 let (metadata_reservation, allocator_reservation, hold) =
735 txn_guard.fs().reservation_for_transaction(options).await?;
736
737 let txn_locks = {
738 let lock_manager = txn_guard.fs().lock_manager();
739 let mut write_guard = lock_manager.txn_lock(txn_locks).await;
740 std::mem::take(&mut write_guard.0.lock_keys)
741 };
742 let mut transaction = Transaction {
743 txn_guard,
744 mutations: BTreeSet::new(),
745 txn_locks,
746 allocator_reservation: None,
747 metadata_reservation,
748 new_objects: BTreeSet::new(),
749 checksums: Vec::new(),
750 includes_write: false,
751 };
752
753 ScopeGuard::into_inner(guard);
754 hold.map(|h| h.forget()); transaction.allocator_reservation = allocator_reservation;
756 Ok(transaction)
757 }
758
759 pub fn txn_guard(&self) -> &TxnGuard<'_> {
760 &self.txn_guard
761 }
762
763 pub fn mutations(&self) -> &BTreeSet<TxnMutation<'a>> {
764 &self.mutations
765 }
766
767 pub fn take_mutations(&mut self) -> BTreeSet<TxnMutation<'a>> {
768 self.new_objects.clear();
769 mem::take(&mut self.mutations)
770 }
771
772 pub fn add(&mut self, object_id: u64, mutation: Mutation) -> Option<Mutation> {
775 self.add_with_object(object_id, mutation, AssocObj::None)
776 }
777
778 pub fn remove(&mut self, object_id: u64, mutation: Mutation) {
780 let txn_mutation = TxnMutation { object_id, mutation, associated_object: AssocObj::None };
781 if self.mutations.remove(&txn_mutation) {
782 if let Mutation::ObjectStore(ObjectStoreMutation {
783 item:
784 ObjectItem {
785 key: ObjectKey { object_id: new_object_id, data: ObjectKeyData::Object },
786 ..
787 },
788 op: Operation::Insert,
789 }) = txn_mutation.mutation
790 {
791 self.new_objects.remove(&(object_id, new_object_id));
792 }
793 }
794 }
795
796 pub fn add_with_object(
799 &mut self,
800 object_id: u64,
801 mutation: Mutation,
802 associated_object: AssocObj<'a>,
803 ) -> Option<Mutation> {
804 assert!(object_id != INVALID_OBJECT_ID);
805 if let Mutation::ObjectStore(ObjectStoreMutation {
806 item:
807 Item {
808 key:
809 ObjectKey { data: ObjectKeyData::Attribute(_, AttributeKey::Extent(_)), .. },
810 ..
811 },
812 ..
813 }) = &mutation
814 {
815 self.includes_write = true;
816 }
817 let txn_mutation = TxnMutation { object_id, mutation, associated_object };
818 self.verify_locks(&txn_mutation);
819 self.mutations.replace(txn_mutation).map(|m| m.mutation)
820 }
821
822 pub fn add_checksum(&mut self, range: Range<u64>, checksums: Vec<Checksum>, first_write: bool) {
823 self.checksums.push((range, checksums, first_write));
824 }
825
826 pub fn includes_write(&self) -> bool {
827 self.includes_write
828 }
829
830 pub fn checksums(&self) -> &[(Range<u64>, Vec<Checksum>, bool)] {
831 &self.checksums
832 }
833
834 pub fn take_checksums(&mut self) -> Vec<(Range<u64>, Vec<Checksum>, bool)> {
835 std::mem::replace(&mut self.checksums, Vec::new())
836 }
837
838 fn verify_locks(&mut self, mutation: &TxnMutation<'_>) {
839 match mutation {
843 TxnMutation {
844 mutation:
845 Mutation::ObjectStore {
846 0: ObjectStoreMutation { item: ObjectItem { key, .. }, op },
847 },
848 object_id: store_object_id,
849 ..
850 } => {
851 match &key.data {
852 ObjectKeyData::Attribute(..) => {
853 }
855 ObjectKeyData::Child { .. }
856 | ObjectKeyData::EncryptedChild(_)
857 | ObjectKeyData::EncryptedCasefoldChild(_)
858 | ObjectKeyData::CasefoldChild { .. } => {
859 let id = key.object_id;
860 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
861 && !self.new_objects.contains(&(*store_object_id, id))
862 {
863 debug_assert!(
864 false,
865 "Not holding required lock for object {id} \
866 in store {store_object_id}"
867 );
868 error!(
869 "Not holding required lock for object {id} in store \
870 {store_object_id}"
871 )
872 }
873 }
874 ObjectKeyData::GraveyardEntry { .. } => {
875 }
877 ObjectKeyData::GraveyardAttributeEntry { .. } => {
878 }
880 ObjectKeyData::Keys => {
881 let id = key.object_id;
882 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
883 && !self.new_objects.contains(&(*store_object_id, id))
884 {
885 debug_assert!(
886 false,
887 "Not holding required lock for object {id} \
888 in store {store_object_id}"
889 );
890 error!(
891 "Not holding required lock for object {id} in store \
892 {store_object_id}"
893 )
894 }
895 }
896 ObjectKeyData::Object => match op {
897 Operation::Insert => {
899 self.new_objects.insert((*store_object_id, key.object_id));
900 }
901 Operation::Merge | Operation::ReplaceOrInsert => {
902 let id = key.object_id;
903 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
904 && !self.new_objects.contains(&(*store_object_id, id))
905 {
906 debug_assert!(
907 false,
908 "Not holding required lock for object {id} \
909 in store {store_object_id}"
910 );
911 error!(
912 "Not holding required lock for object {id} in store \
913 {store_object_id}"
914 )
915 }
916 }
917 },
918 ObjectKeyData::Project { project_id, property: ProjectProperty::Limit } => {
919 if !self.txn_locks.contains(&LockKey::ProjectId {
920 store_object_id: *store_object_id,
921 project_id: *project_id,
922 }) {
923 debug_assert!(
924 false,
925 "Not holding required lock for project limit id {project_id} \
926 in store {store_object_id}"
927 );
928 error!(
929 "Not holding required lock for project limit id {project_id} in \
930 store {store_object_id}"
931 )
932 }
933 }
934 ObjectKeyData::Project { property: ProjectProperty::Usage, .. } => match op {
935 Operation::Insert | Operation::ReplaceOrInsert => {
936 panic!(
937 "Project usage is all handled by merging deltas, no inserts or \
938 replacements should be used"
939 );
940 }
941 Operation::Merge => {}
943 },
944 ObjectKeyData::ExtendedAttribute { .. } => {
945 let id = key.object_id;
946 if !self.txn_locks.contains(&LockKey::object(*store_object_id, id))
947 && !self.new_objects.contains(&(*store_object_id, id))
948 {
949 debug_assert!(
950 false,
951 "Not holding required lock for object {id} \
952 in store {store_object_id} while mutating extended attribute"
953 );
954 error!(
955 "Not holding required lock for object {id} in store \
956 {store_object_id} while mutating extended attribute"
957 )
958 }
959 }
960 }
961 }
962 TxnMutation { mutation: Mutation::DeleteVolume, object_id, .. } => {
963 if !self.txn_locks.contains(&LockKey::flush(*object_id)) {
964 debug_assert!(false, "Not holding required lock for DeleteVolume");
965 error!("Not holding required lock for DeleteVolume");
966 }
967 }
968 _ => {}
969 }
970 }
971
972 pub fn is_empty(&self) -> bool {
974 self.mutations.is_empty()
975 }
976
977 pub fn get_object_mutation(
980 &self,
981 store_object_id: u64,
982 key: ObjectKey,
983 ) -> Option<&ObjectStoreMutation> {
984 if let Some(TxnMutation { mutation: Mutation::ObjectStore(mutation), .. }) =
985 self.mutations.get(&TxnMutation {
986 object_id: store_object_id,
987 mutation: Mutation::insert_object(key, ObjectValue::None),
988 associated_object: AssocObj::None,
989 })
990 {
991 Some(mutation)
992 } else {
993 None
994 }
995 }
996
997 pub async fn commit(mut self) -> Result<u64, Error> {
999 debug!(txn:? = &self; "Commit");
1000 self.txn_guard.fs().clone().commit_transaction(&mut self, &mut |_| {}).await
1001 }
1002
1003 pub async fn commit_with_callback<R: Send>(
1006 mut self,
1007 f: impl FnOnce(u64) -> R + Send,
1008 ) -> Result<R, Error> {
1009 debug!(txn:? = &self; "Commit");
1010 let mut f = Some(f);
1013 let mut result = None;
1014 self.txn_guard
1015 .fs()
1016 .clone()
1017 .commit_transaction(&mut self, &mut |offset| {
1018 result = Some(f.take().unwrap()(offset));
1019 })
1020 .await?;
1021 Ok(result.unwrap())
1022 }
1023
1024 pub async fn commit_and_continue(&mut self) -> Result<(), Error> {
1027 debug!(txn:? = self; "Commit");
1028 self.txn_guard.fs().clone().commit_transaction(self, &mut |_| {}).await?;
1029 assert!(self.mutations.is_empty());
1030 self.txn_guard.fs().lock_manager().downgrade_locks(&self.txn_locks);
1031 Ok(())
1032 }
1033}
1034
1035impl Drop for Transaction<'_> {
1036 fn drop(&mut self) {
1037 debug!(txn:? = &self; "Drop");
1040 self.txn_guard.fs().clone().drop_transaction(self);
1041 }
1042}
1043
1044impl std::fmt::Debug for Transaction<'_> {
1045 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1046 f.debug_struct("Transaction")
1047 .field("mutations", &self.mutations)
1048 .field("txn_locks", &self.txn_locks)
1049 .field("reservation", &self.allocator_reservation)
1050 .finish()
1051 }
1052}
1053
1054pub enum BorrowedOrOwned<'a, T> {
1055 Borrowed(&'a T),
1056 Owned(T),
1057}
1058
1059impl<T> Deref for BorrowedOrOwned<'_, T> {
1060 type Target = T;
1061
1062 fn deref(&self) -> &Self::Target {
1063 match self {
1064 BorrowedOrOwned::Borrowed(b) => b,
1065 BorrowedOrOwned::Owned(o) => &o,
1066 }
1067 }
1068}
1069
1070impl<'a, T> From<&'a T> for BorrowedOrOwned<'a, T> {
1071 fn from(value: &'a T) -> Self {
1072 BorrowedOrOwned::Borrowed(value)
1073 }
1074}
1075
1076impl<T> From<T> for BorrowedOrOwned<'_, T> {
1077 fn from(value: T) -> Self {
1078 BorrowedOrOwned::Owned(value)
1079 }
1080}
1081
1082pub struct LockManager {
1111 locks: Mutex<Locks>,
1112}
1113
1114struct Locks {
1115 keys: HashMap<LockKey, LockEntry>,
1116}
1117
1118impl Locks {
1119 fn drop_lock(&mut self, key: LockKey, state: LockState) {
1120 if let Entry::Occupied(mut occupied) = self.keys.entry(key) {
1121 let entry = occupied.get_mut();
1122 let wake = match state {
1123 LockState::ReadLock => {
1124 entry.read_count -= 1;
1125 entry.read_count == 0
1126 }
1127 LockState::Locked | LockState::WriteLock => {
1129 entry.state = LockState::ReadLock;
1130 true
1131 }
1132 };
1133 if wake {
1134 unsafe {
1136 entry.wake();
1137 }
1138 if entry.can_remove() {
1139 occupied.remove_entry();
1140 }
1141 }
1142 } else {
1143 unreachable!();
1144 }
1145 }
1146
1147 fn drop_read_locks(&mut self, lock_keys: LockKeys) {
1148 for lock in lock_keys.iter() {
1149 self.drop_lock(*lock, LockState::ReadLock);
1150 }
1151 }
1152
1153 fn drop_write_locks(&mut self, lock_keys: LockKeys) {
1154 for lock in lock_keys.iter() {
1155 self.drop_lock(*lock, LockState::WriteLock);
1158 }
1159 }
1160
1161 fn downgrade_locks(&mut self, lock_keys: &LockKeys) {
1163 for lock in lock_keys.iter() {
1164 unsafe {
1166 self.keys.get_mut(lock).unwrap().downgrade_lock();
1167 }
1168 }
1169 }
1170}
1171
1172#[derive(Debug)]
1173struct LockEntry {
1174 read_count: u64,
1177
1178 state: LockState,
1180
1181 head: *const LockWaker,
1186 tail: *const LockWaker,
1187}
1188
1189unsafe impl Send for LockEntry {}
1190
1191struct LockWaker {
1194 next: UnsafeCell<*const LockWaker>,
1196 prev: UnsafeCell<*const LockWaker>,
1197
1198 key: LockKey,
1201
1202 waker: UnsafeCell<WakerState>,
1204
1205 target_state: LockState,
1207
1208 is_upgrade: bool,
1210
1211 _pin: PhantomPinned,
1213}
1214
1215enum WakerState {
1216 Pending,
1218
1219 Registered(Waker),
1221
1222 Woken,
1224}
1225
1226impl WakerState {
1227 fn is_woken(&self) -> bool {
1228 matches!(self, WakerState::Woken)
1229 }
1230}
1231
1232unsafe impl Send for LockWaker {}
1233unsafe impl Sync for LockWaker {}
1234
1235impl LockWaker {
1236 async fn wait(&self, manager: &LockManager) {
1238 let waker_guard = scopeguard::guard((), |_| {
1240 let mut locks = manager.locks.lock();
1241 unsafe {
1243 if (*self.waker.get()).is_woken() {
1244 if self.is_upgrade {
1246 locks.keys.get_mut(&self.key).unwrap().downgrade_lock();
1247 } else {
1248 locks.drop_lock(self.key, self.target_state);
1249 }
1250 } else {
1251 locks.keys.get_mut(&self.key).unwrap().remove_waker(self);
1254 }
1255 }
1256 });
1257
1258 poll_fn(|cx| {
1259 let _locks = manager.locks.lock();
1260 unsafe {
1262 if (*self.waker.get()).is_woken() {
1263 Poll::Ready(())
1264 } else {
1265 *self.waker.get() = WakerState::Registered(cx.waker().clone());
1266 Poll::Pending
1267 }
1268 }
1269 })
1270 .await;
1271
1272 ScopeGuard::into_inner(waker_guard);
1273 }
1274}
1275
1276#[derive(Copy, Clone, Debug, PartialEq)]
1277enum LockState {
1278 ReadLock,
1280
1281 Locked,
1284
1285 WriteLock,
1287}
1288
1289impl LockManager {
1290 pub fn new() -> Self {
1291 LockManager { locks: Mutex::new(Locks { keys: HashMap::default() }) }
1292 }
1293
1294 pub async fn txn_lock<'a>(&'a self, lock_keys: LockKeys) -> TransactionLocks<'a> {
1298 TransactionLocks(
1299 debug_assert_not_too_long!(self.lock(lock_keys, LockState::Locked)).right().unwrap(),
1300 )
1301 }
1302
1303 async fn lock<'a>(
1306 &'a self,
1307 mut lock_keys: LockKeys,
1308 target_state: LockState,
1309 ) -> Either<ReadGuard<'a>, WriteGuard<'a>> {
1310 let mut guard = match &target_state {
1311 LockState::ReadLock => Left(ReadGuard {
1312 manager: self.into(),
1313 lock_keys: LockKeys::with_capacity(lock_keys.len()),
1314 }),
1315 LockState::Locked | LockState::WriteLock => Right(WriteGuard {
1316 manager: self.into(),
1317 lock_keys: LockKeys::with_capacity(lock_keys.len()),
1318 }),
1319 };
1320 let guard_keys = match &mut guard {
1321 Left(g) => &mut g.lock_keys,
1322 Right(g) => &mut g.lock_keys,
1323 };
1324 lock_keys.sort_unstable();
1325 lock_keys.dedup();
1326 for lock in lock_keys.iter() {
1327 let lock_waker = None;
1328 pin_mut!(lock_waker);
1329 {
1330 let mut locks = self.locks.lock();
1331 match locks.keys.entry(*lock) {
1332 Entry::Vacant(vacant) => {
1333 vacant.insert(LockEntry {
1334 read_count: if let LockState::ReadLock = target_state {
1335 guard_keys.push(*lock);
1336 1
1337 } else {
1338 guard_keys.push(*lock);
1339 0
1340 },
1341 state: target_state,
1342 head: std::ptr::null(),
1343 tail: std::ptr::null(),
1344 });
1345 }
1346 Entry::Occupied(mut occupied) => {
1347 let entry = occupied.get_mut();
1348 if unsafe { entry.is_allowed(target_state, entry.head.is_null()) } {
1350 if let LockState::ReadLock = target_state {
1351 entry.read_count += 1;
1352 guard_keys.push(*lock);
1353 } else {
1354 entry.state = target_state;
1355 guard_keys.push(*lock);
1356 }
1357 } else {
1358 unsafe {
1361 *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1362 next: UnsafeCell::new(std::ptr::null()),
1363 prev: UnsafeCell::new(entry.tail),
1364 key: *lock,
1365 waker: UnsafeCell::new(WakerState::Pending),
1366 target_state: target_state,
1367 is_upgrade: false,
1368 _pin: PhantomPinned,
1369 });
1370 }
1371 let waker = (*lock_waker).as_ref().unwrap();
1372 if entry.tail.is_null() {
1373 entry.head = waker;
1374 } else {
1375 unsafe {
1377 *(*entry.tail).next.get() = waker;
1378 }
1379 }
1380 entry.tail = waker;
1381 }
1382 }
1383 }
1384 }
1385 if let Some(waker) = &*lock_waker {
1386 waker.wait(self).await;
1387 guard_keys.push(*lock);
1388 }
1389 }
1390 guard
1391 }
1392
1393 pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
1395 let mut locks = self.locks.lock();
1396 locks.drop_write_locks(std::mem::take(&mut transaction.txn_locks));
1397 }
1398
1399 pub async fn commit_prepare(&self, transaction: &Transaction<'_>) {
1401 self.commit_prepare_keys(&transaction.txn_locks).await;
1402 }
1403
1404 async fn commit_prepare_keys(&self, lock_keys: &LockKeys) {
1405 for lock in lock_keys.iter() {
1406 let lock_waker = None;
1407 pin_mut!(lock_waker);
1408 {
1409 let mut locks = self.locks.lock();
1410 let entry = locks.keys.get_mut(lock).unwrap();
1411 assert_eq!(entry.state, LockState::Locked);
1412
1413 if entry.read_count == 0 {
1414 entry.state = LockState::WriteLock;
1415 } else {
1416 unsafe {
1419 *lock_waker.as_mut().get_unchecked_mut() = Some(LockWaker {
1420 next: UnsafeCell::new(entry.head),
1421 prev: UnsafeCell::new(std::ptr::null()),
1422 key: *lock,
1423 waker: UnsafeCell::new(WakerState::Pending),
1424 target_state: LockState::WriteLock,
1425 is_upgrade: true,
1426 _pin: PhantomPinned,
1427 });
1428 }
1429 let waker = (*lock_waker).as_ref().unwrap();
1430 if entry.head.is_null() {
1431 entry.tail = (*lock_waker).as_ref().unwrap();
1432 } else {
1433 unsafe {
1435 *(*entry.head).prev.get() = waker;
1436 }
1437 }
1438 entry.head = waker;
1439 }
1440 }
1441
1442 if let Some(waker) = &*lock_waker {
1443 waker.wait(self).await;
1444 }
1445 }
1446 }
1447
1448 pub async fn read_lock<'a>(&'a self, lock_keys: LockKeys) -> ReadGuard<'a> {
1454 debug_assert_not_too_long!(self.lock(lock_keys, LockState::ReadLock)).left().unwrap()
1455 }
1456
1457 pub async fn write_lock<'a>(&'a self, lock_keys: LockKeys) -> WriteGuard<'a> {
1460 debug_assert_not_too_long!(self.lock(lock_keys, LockState::WriteLock)).right().unwrap()
1461 }
1462
1463 pub fn downgrade_locks(&self, lock_keys: &LockKeys) {
1466 self.locks.lock().downgrade_locks(lock_keys);
1467 }
1468}
1469
1470impl LockEntry {
1472 unsafe fn wake(&mut self) {
1473 if self.head.is_null() || self.state == LockState::WriteLock {
1475 return;
1476 }
1477
1478 let waker = unsafe { &*self.head };
1479
1480 if waker.is_upgrade {
1481 if self.read_count > 0 {
1482 return;
1483 }
1484 } else if !unsafe { self.is_allowed(waker.target_state, true) } {
1485 return;
1486 }
1487
1488 unsafe { self.pop_and_wake() };
1489
1490 if waker.target_state == LockState::WriteLock {
1493 return;
1494 }
1495
1496 while !self.head.is_null() && unsafe { (*self.head).target_state } == LockState::ReadLock {
1497 unsafe { self.pop_and_wake() };
1498 }
1499 }
1500
1501 unsafe fn pop_and_wake(&mut self) {
1502 let waker = unsafe { &*self.head };
1503
1504 self.head = unsafe { *waker.next.get() };
1506 if self.head.is_null() {
1507 self.tail = std::ptr::null()
1508 } else {
1509 unsafe { *(*self.head).prev.get() = std::ptr::null() };
1510 }
1511
1512 if waker.target_state == LockState::ReadLock {
1514 self.read_count += 1;
1515 } else {
1516 self.state = waker.target_state;
1517 }
1518
1519 if let WakerState::Registered(waker) =
1521 std::mem::replace(unsafe { &mut *waker.waker.get() }, WakerState::Woken)
1522 {
1523 waker.wake();
1524 }
1525 }
1526
1527 fn can_remove(&self) -> bool {
1528 self.state == LockState::ReadLock && self.read_count == 0
1529 }
1530
1531 unsafe fn remove_waker(&mut self, waker: &LockWaker) {
1532 unsafe {
1533 let is_first = (*waker.prev.get()).is_null();
1534 if is_first {
1535 self.head = *waker.next.get();
1536 } else {
1537 *(**waker.prev.get()).next.get() = *waker.next.get();
1538 }
1539 if (*waker.next.get()).is_null() {
1540 self.tail = *waker.prev.get();
1541 } else {
1542 *(**waker.next.get()).prev.get() = *waker.prev.get();
1543 }
1544 if is_first {
1545 self.wake();
1548 }
1549 }
1550 }
1551
1552 unsafe fn is_allowed(&self, target_state: LockState, is_head: bool) -> bool {
1556 match self.state {
1557 LockState::ReadLock => {
1558 (self.read_count == 0
1560 || target_state == LockState::Locked
1561 || target_state == LockState::ReadLock)
1562 && is_head
1563 }
1564 LockState::Locked => {
1565 target_state == LockState::ReadLock
1569 && (is_head || unsafe { !(*self.head).is_upgrade })
1570 }
1571 LockState::WriteLock => false,
1572 }
1573 }
1574
1575 unsafe fn downgrade_lock(&mut self) {
1576 assert_eq!(std::mem::replace(&mut self.state, LockState::Locked), LockState::WriteLock);
1577 unsafe { self.wake() };
1578 }
1579}
1580
1581#[must_use]
1582pub struct ReadGuard<'a> {
1583 manager: LockManagerRef<'a>,
1584 lock_keys: LockKeys,
1585}
1586
1587impl ReadGuard<'_> {
1588 pub fn fs(&self) -> Option<&Arc<FxFilesystem>> {
1589 if let LockManagerRef::Owned(fs) = &self.manager { Some(fs) } else { None }
1590 }
1591
1592 pub fn into_owned(mut self, fs: Arc<FxFilesystem>) -> ReadGuard<'static> {
1593 ReadGuard {
1594 manager: LockManagerRef::Owned(fs),
1595 lock_keys: std::mem::replace(&mut self.lock_keys, LockKeys::None),
1596 }
1597 }
1598}
1599
1600impl Drop for ReadGuard<'_> {
1601 fn drop(&mut self) {
1602 let mut locks = self.manager.locks.lock();
1603 locks.drop_read_locks(std::mem::take(&mut self.lock_keys));
1604 }
1605}
1606
1607impl fmt::Debug for ReadGuard<'_> {
1608 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1609 f.debug_struct("ReadGuard")
1610 .field("manager", &(&self.manager as *const _))
1611 .field("lock_keys", &self.lock_keys)
1612 .finish()
1613 }
1614}
1615
1616#[must_use]
1617pub struct WriteGuard<'a> {
1618 manager: LockManagerRef<'a>,
1619 lock_keys: LockKeys,
1620}
1621
1622impl Drop for WriteGuard<'_> {
1623 fn drop(&mut self) {
1624 let mut locks = self.manager.locks.lock();
1625 locks.drop_write_locks(std::mem::take(&mut self.lock_keys));
1626 }
1627}
1628
1629impl fmt::Debug for WriteGuard<'_> {
1630 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1631 f.debug_struct("WriteGuard")
1632 .field("manager", &(&self.manager as *const _))
1633 .field("lock_keys", &self.lock_keys)
1634 .finish()
1635 }
1636}
1637
1638enum LockManagerRef<'a> {
1639 Borrowed(&'a LockManager),
1640 Owned(Arc<FxFilesystem>),
1641}
1642
1643impl Deref for LockManagerRef<'_> {
1644 type Target = LockManager;
1645
1646 fn deref(&self) -> &Self::Target {
1647 match self {
1648 LockManagerRef::Borrowed(m) => m,
1649 LockManagerRef::Owned(f) => f.lock_manager(),
1650 }
1651 }
1652}
1653
1654impl<'a> From<&'a LockManager> for LockManagerRef<'a> {
1655 fn from(value: &'a LockManager) -> Self {
1656 LockManagerRef::Borrowed(value)
1657 }
1658}
1659
1660#[cfg(test)]
1661mod tests {
1662 use super::{LockKey, LockKeys, LockManager, LockState, Mutation, Options};
1663 use crate::filesystem::FxFilesystem;
1664 use fuchsia_async as fasync;
1665 use fuchsia_sync::Mutex;
1666 use futures::channel::oneshot::channel;
1667 use futures::future::FutureExt;
1668 use futures::stream::FuturesUnordered;
1669 use futures::{StreamExt, join, pin_mut};
1670 use std::task::Poll;
1671 use std::time::Duration;
1672 use storage_device::DeviceHolder;
1673 use storage_device::fake_device::FakeDevice;
1674
1675 #[fuchsia::test]
1676 async fn test_simple() {
1677 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1678 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1679 let mut t = fs
1680 .clone()
1681 .new_transaction(lock_keys![], Options::default())
1682 .await
1683 .expect("new_transaction failed");
1684 t.add(1, Mutation::BeginFlush);
1685 assert!(!t.is_empty());
1686 }
1687
1688 #[fuchsia::test]
1689 async fn test_locks() {
1690 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1691 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1692 let (send1, recv1) = channel();
1693 let (send2, recv2) = channel();
1694 let (send3, recv3) = channel();
1695 let done = Mutex::new(false);
1696 let mut futures = FuturesUnordered::new();
1697 futures.push(
1698 async {
1699 let _t = fs
1700 .clone()
1701 .new_transaction(
1702 lock_keys![LockKey::object_attribute(1, 2, 3)],
1703 Options::default(),
1704 )
1705 .await
1706 .expect("new_transaction failed");
1707 send1.send(()).unwrap(); send3.send(()).unwrap(); recv2.await.unwrap();
1710 fasync::Timer::new(Duration::from_millis(100)).await;
1712 assert!(!*done.lock());
1713 }
1714 .boxed(),
1715 );
1716 futures.push(
1717 async {
1718 recv1.await.unwrap();
1719 let _t = fs
1721 .clone()
1722 .new_transaction(
1723 lock_keys![LockKey::object_attribute(2, 2, 3)],
1724 Options::default(),
1725 )
1726 .await
1727 .expect("new_transaction failed");
1728 send2.send(()).unwrap();
1730 }
1731 .boxed(),
1732 );
1733 futures.push(
1734 async {
1735 recv3.await.unwrap();
1737 let _t = fs
1738 .clone()
1739 .new_transaction(
1740 lock_keys![LockKey::object_attribute(1, 2, 3)],
1741 Options::default(),
1742 )
1743 .await;
1744 *done.lock() = true;
1745 }
1746 .boxed(),
1747 );
1748 while let Some(()) = futures.next().await {}
1749 }
1750
1751 #[fuchsia::test]
1752 async fn test_read_lock_after_write_lock() {
1753 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1754 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1755 let (send1, recv1) = channel();
1756 let (send2, recv2) = channel();
1757 let done = Mutex::new(false);
1758 join!(
1759 async {
1760 let t = fs
1761 .clone()
1762 .new_transaction(
1763 lock_keys![LockKey::object_attribute(1, 2, 3)],
1764 Options::default(),
1765 )
1766 .await
1767 .expect("new_transaction failed");
1768 send1.send(()).unwrap(); recv2.await.unwrap();
1770 t.commit().await.expect("commit failed");
1771 *done.lock() = true;
1772 },
1773 async {
1774 recv1.await.unwrap();
1775 let _guard = fs
1777 .lock_manager()
1778 .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1779 .await;
1780 send2.send(()).unwrap();
1782 fasync::Timer::new(Duration::from_millis(100)).await;
1785 assert!(!*done.lock());
1786 },
1787 );
1788 }
1789
1790 #[fuchsia::test]
1791 async fn test_write_lock_after_read_lock() {
1792 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1793 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1794 let (send1, recv1) = channel();
1795 let (send2, recv2) = channel();
1796 let done = Mutex::new(false);
1797 join!(
1798 async {
1799 let _guard = fs
1801 .lock_manager()
1802 .read_lock(lock_keys![LockKey::object_attribute(1, 2, 3)])
1803 .await;
1804 send1.send(()).unwrap();
1806 recv2.await.unwrap();
1807 fasync::Timer::new(Duration::from_millis(100)).await;
1810 assert!(!*done.lock());
1811 },
1812 async {
1813 recv1.await.unwrap();
1814 let t = fs
1815 .clone()
1816 .new_transaction(
1817 lock_keys![LockKey::object_attribute(1, 2, 3)],
1818 Options::default(),
1819 )
1820 .await
1821 .expect("new_transaction failed");
1822 send2.send(()).unwrap(); t.commit().await.expect("commit failed");
1824 *done.lock() = true;
1825 },
1826 );
1827 }
1828
1829 #[fuchsia::test]
1830 async fn test_drop_uncommitted_transaction() {
1831 let device = DeviceHolder::new(FakeDevice::new(4096, 1024));
1832 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1833 let key = lock_keys![LockKey::object(1, 1)];
1834
1835 {
1837 let _write_lock = fs
1838 .clone()
1839 .new_transaction(key.clone(), Options::default())
1840 .await
1841 .expect("new_transaction failed");
1842 let _read_lock = fs.lock_manager().read_lock(key.clone()).await;
1843 }
1844 {
1846 let _write_lock = fs
1847 .clone()
1848 .new_transaction(key.clone(), Options::default())
1849 .await
1850 .expect("new_transaction failed");
1851 }
1852 fs.clone()
1854 .new_transaction(key.clone(), Options::default())
1855 .await
1856 .expect("new_transaction failed");
1857 }
1858
1859 #[fuchsia::test]
1860 async fn test_drop_waiting_write_lock() {
1861 let manager = LockManager::new();
1862 let keys = lock_keys![LockKey::object(1, 1)];
1863 {
1864 let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1865 if let Poll::Ready(_) =
1866 futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1867 {
1868 assert!(false);
1869 }
1870 }
1871 let _ = manager.lock(keys, LockState::WriteLock).await;
1872 }
1873
1874 #[fuchsia::test]
1875 async fn test_write_lock_blocks_everything() {
1876 let manager = LockManager::new();
1877 let keys = lock_keys![LockKey::object(1, 1)];
1878 {
1879 let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1880 if let Poll::Ready(_) =
1881 futures::poll!(manager.lock(keys.clone(), LockState::WriteLock).boxed())
1882 {
1883 assert!(false);
1884 }
1885 if let Poll::Ready(_) =
1886 futures::poll!(manager.lock(keys.clone(), LockState::ReadLock).boxed())
1887 {
1888 assert!(false);
1889 }
1890 }
1891 {
1892 let _guard = manager.lock(keys.clone(), LockState::WriteLock).await;
1893 }
1894 {
1895 let _guard = manager.lock(keys, LockState::ReadLock).await;
1896 }
1897 }
1898
1899 #[fuchsia::test]
1900 async fn test_downgrade_locks() {
1901 let manager = LockManager::new();
1902 let keys = lock_keys![LockKey::object(1, 1)];
1903 let _guard = manager.txn_lock(keys.clone()).await;
1904 manager.commit_prepare_keys(&keys).await;
1905
1906 let mut read_lock: FuturesUnordered<_> =
1908 std::iter::once(manager.read_lock(keys.clone())).collect();
1909
1910 assert!(futures::poll!(read_lock.next()).is_pending());
1912
1913 manager.downgrade_locks(&keys);
1914
1915 assert!(futures::poll!(read_lock.next()).is_ready());
1917 }
1918
1919 #[fuchsia::test]
1920 async fn test_dropped_write_lock_wakes() {
1921 let manager = LockManager::new();
1922 let keys = lock_keys![LockKey::object(1, 1)];
1923 let _guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1924 let mut read_lock = FuturesUnordered::new();
1925 read_lock.push(manager.lock(keys.clone(), LockState::ReadLock));
1926
1927 {
1928 let write_lock = manager.lock(keys, LockState::WriteLock);
1929 pin_mut!(write_lock);
1930
1931 assert!(futures::poll!(write_lock).is_pending());
1933
1934 assert!(futures::poll!(read_lock.next()).is_pending());
1936 }
1937
1938 assert!(futures::poll!(read_lock.next()).is_ready());
1940 }
1941
1942 #[fuchsia::test]
1943 async fn test_drop_upgrade() {
1944 let manager = LockManager::new();
1945 let keys = lock_keys![LockKey::object(1, 1)];
1946 let _guard = manager.lock(keys.clone(), LockState::Locked).await;
1947
1948 {
1949 let commit_prepare = manager.commit_prepare_keys(&keys);
1950 pin_mut!(commit_prepare);
1951 let _read_guard = manager.lock(keys.clone(), LockState::ReadLock).await;
1952 assert!(futures::poll!(commit_prepare).is_pending());
1953
1954 }
1957
1958 manager.commit_prepare_keys(&keys).await;
1960 }
1961
1962 #[fasync::run_singlethreaded(test)]
1963 async fn test_woken_upgrade_blocks_reads() {
1964 let manager = LockManager::new();
1965 let keys = lock_keys![LockKey::object(1, 1)];
1966 let guard = manager.lock(keys.clone(), LockState::Locked).await;
1968
1969 let read1 = manager.lock(keys.clone(), LockState::ReadLock).await;
1971
1972 let commit_prepare = manager.commit_prepare_keys(&keys);
1974 pin_mut!(commit_prepare);
1975 assert!(futures::poll!(commit_prepare.as_mut()).is_pending());
1976
1977 let read2 = manager.lock(keys.clone(), LockState::ReadLock);
1979 pin_mut!(read2);
1980 assert!(futures::poll!(read2.as_mut()).is_pending());
1981
1982 std::mem::drop(read1);
1984 assert!(futures::poll!(commit_prepare).is_ready());
1985
1986 assert!(futures::poll!(read2.as_mut()).is_pending());
1988
1989 std::mem::drop(guard);
1991 assert!(futures::poll!(read2).is_ready());
1992 }
1993
1994 static LOCK_KEY_1: LockKey = LockKey::flush(1);
1995 static LOCK_KEY_2: LockKey = LockKey::flush(2);
1996 static LOCK_KEY_3: LockKey = LockKey::flush(3);
1997
1998 fn assert_lock_keys_equal(value: &LockKeys, expected: &LockKeys) {
2000 match (value, expected) {
2001 (LockKeys::None, LockKeys::None) => {}
2002 (LockKeys::Inline(key1), LockKeys::Inline(key2)) => {
2003 if key1 != key2 {
2004 panic!("{key1:?} != {key2:?}");
2005 }
2006 }
2007 (LockKeys::Vec(vec1), LockKeys::Vec(vec2)) => {
2008 if vec1 != vec2 {
2009 panic!("{vec1:?} != {vec2:?}");
2010 }
2011 if vec1.capacity() != vec2.capacity() {
2012 panic!(
2013 "LockKeys have different capacity: {} != {}",
2014 vec1.capacity(),
2015 vec2.capacity()
2016 );
2017 }
2018 }
2019 (_, _) => panic!("{value:?} != {expected:?}"),
2020 }
2021 }
2022
2023 fn assert_lock_keys_equivalent(value: &LockKeys, expected: &LockKeys) {
2025 let value: Vec<_> = value.iter().collect();
2026 let expected: Vec<_> = expected.iter().collect();
2027 assert_eq!(value, expected);
2028 }
2029
2030 #[test]
2031 fn test_lock_keys_macro() {
2032 assert_lock_keys_equal(&lock_keys![], &LockKeys::None);
2033 assert_lock_keys_equal(&lock_keys![LOCK_KEY_1], &LockKeys::Inline(LOCK_KEY_1));
2034 assert_lock_keys_equal(
2035 &lock_keys![LOCK_KEY_1, LOCK_KEY_2],
2036 &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]),
2037 );
2038 }
2039
2040 #[test]
2041 fn test_lock_keys_with_capacity() {
2042 assert_lock_keys_equal(&LockKeys::with_capacity(0), &LockKeys::None);
2043 assert_lock_keys_equal(&LockKeys::with_capacity(1), &LockKeys::None);
2044 assert_lock_keys_equal(&LockKeys::with_capacity(2), &LockKeys::Vec(Vec::with_capacity(2)));
2045 }
2046
2047 #[test]
2048 fn test_lock_keys_len() {
2049 assert_eq!(lock_keys![].len(), 0);
2050 assert_eq!(lock_keys![LOCK_KEY_1].len(), 1);
2051 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].len(), 2);
2052 }
2053
2054 #[test]
2055 fn test_lock_keys_contains() {
2056 assert_eq!(lock_keys![].contains(&LOCK_KEY_1), false);
2057 assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_1), true);
2058 assert_eq!(lock_keys![LOCK_KEY_1].contains(&LOCK_KEY_2), false);
2059 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_1), true);
2060 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_2), true);
2061 assert_eq!(lock_keys![LOCK_KEY_1, LOCK_KEY_2].contains(&LOCK_KEY_3), false);
2062 }
2063
2064 #[test]
2065 fn test_lock_keys_push() {
2066 let mut keys = lock_keys![];
2067 keys.push(LOCK_KEY_1);
2068 assert_lock_keys_equal(&keys, &LockKeys::Inline(LOCK_KEY_1));
2069 keys.push(LOCK_KEY_2);
2070 assert_lock_keys_equal(&keys, &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2]));
2071 keys.push(LOCK_KEY_3);
2072 assert_lock_keys_equivalent(
2073 &keys,
2074 &LockKeys::Vec(vec![LOCK_KEY_1, LOCK_KEY_2, LOCK_KEY_3]),
2075 );
2076 }
2077
2078 #[test]
2079 fn test_lock_keys_sort_unstable() {
2080 let mut keys = lock_keys![];
2081 keys.sort_unstable();
2082 assert_lock_keys_equal(&keys, &lock_keys![]);
2083
2084 let mut keys = lock_keys![LOCK_KEY_1];
2085 keys.sort_unstable();
2086 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2087
2088 let mut keys = lock_keys![LOCK_KEY_2, LOCK_KEY_1];
2089 keys.sort_unstable();
2090 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2091 }
2092
2093 #[test]
2094 fn test_lock_keys_dedup() {
2095 let mut keys = lock_keys![];
2096 keys.dedup();
2097 assert_lock_keys_equal(&keys, &lock_keys![]);
2098
2099 let mut keys = lock_keys![LOCK_KEY_1];
2100 keys.dedup();
2101 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2102
2103 let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_1];
2104 keys.dedup();
2105 assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2106 }
2107
2108 #[test]
2109 fn test_lock_keys_truncate() {
2110 let mut keys = lock_keys![];
2111 keys.truncate(5);
2112 assert_lock_keys_equal(&keys, &lock_keys![]);
2113 keys.truncate(0);
2114 assert_lock_keys_equal(&keys, &lock_keys![]);
2115
2116 let mut keys = lock_keys![LOCK_KEY_1];
2117 keys.truncate(5);
2118 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1]);
2119 keys.truncate(0);
2120 assert_lock_keys_equal(&keys, &lock_keys![]);
2121
2122 let mut keys = lock_keys![LOCK_KEY_1, LOCK_KEY_2];
2123 keys.truncate(5);
2124 assert_lock_keys_equal(&keys, &lock_keys![LOCK_KEY_1, LOCK_KEY_2]);
2125 keys.truncate(1);
2126 assert_lock_keys_equivalent(&keys, &lock_keys![LOCK_KEY_1]);
2128 }
2129
2130 #[test]
2131 fn test_lock_keys_iter() {
2132 assert_eq!(lock_keys![].iter().collect::<Vec<_>>(), Vec::<&LockKey>::new());
2133
2134 assert_eq!(lock_keys![LOCK_KEY_1].iter().collect::<Vec<_>>(), vec![&LOCK_KEY_1]);
2135
2136 assert_eq!(
2137 lock_keys![LOCK_KEY_1, LOCK_KEY_2].iter().collect::<Vec<_>>(),
2138 vec![&LOCK_KEY_1, &LOCK_KEY_2]
2139 );
2140 }
2141}