1use crate::errors::FxfsError;
6use crate::lsm_tree::Query;
7use crate::lsm_tree::types::{ItemRef, LayerIterator};
8use crate::object_store::transaction::{LockKey, Mutation, Options, lock_keys};
9use crate::object_store::{
10 ObjectKey, ObjectKeyData, ObjectKind, ObjectStore, ObjectValue, ProjectProperty,
11};
12use anyhow::Error;
13use fprint::TypeFingerprint;
14use fxfs_macros::SerializeKey;
15use serde::{Deserialize, Serialize};
16use std::num::NonZeroU64;
17
18impl ObjectStore {
19 pub async fn set_project_limit(
22 &self,
23 project_id: ProjectId,
24 bytes: u64,
25 nodes: u64,
26 ) -> Result<(), Error> {
27 let root_id = self.root_directory_object_id();
28 let mut transaction = self
29 .new_transaction(
30 lock_keys![LockKey::ProjectId {
31 store_object_id: self.store_object_id,
32 project_id
33 }],
34 Options::default(),
35 )
36 .await?;
37 transaction.add(
38 self.store_object_id,
39 Mutation::replace_or_insert_object(
40 ObjectKey::project_limit(root_id, project_id),
41 ObjectValue::BytesAndNodes {
42 bytes: bytes.try_into().map_err(|_| FxfsError::TooBig)?,
43 nodes: nodes.try_into().map_err(|_| FxfsError::TooBig)?,
44 },
45 ),
46 );
47 transaction.commit().await?;
48 Ok(())
49 }
50
51 pub async fn clear_project_limit(&self, project_id: ProjectId) -> Result<(), Error> {
54 let root_id = self.root_directory_object_id();
55 let mut transaction = self
56 .new_transaction(
57 lock_keys![LockKey::ProjectId {
58 store_object_id: self.store_object_id,
59 project_id
60 }],
61 Options::default(),
62 )
63 .await?;
64 transaction.add(
65 self.store_object_id,
66 Mutation::replace_or_insert_object(
67 ObjectKey::project_limit(root_id, project_id),
68 ObjectValue::None,
69 ),
70 );
71 transaction.commit().await?;
72 Ok(())
73 }
74
75 pub async fn set_project_for_node(
77 &self,
78 node_id: u64,
79 project_id: ProjectId,
80 ) -> Result<(), Error> {
81 let root_id = self.root_directory_object_id();
82 let mut transaction = self
83 .new_transaction(
84 lock_keys![LockKey::object(self.store_object_id, node_id)],
85 Options::default(),
86 )
87 .await?;
88
89 let object_key = ObjectKey::object(node_id);
90 let (kind, mut attributes) =
91 match self.tree().find(&object_key).await?.ok_or(FxfsError::NotFound)?.value {
92 ObjectValue::Object { kind, attributes } => (kind, attributes),
93 _ => return Err(FxfsError::Inconsistent.into()),
94 };
95 match kind {
97 ObjectKind::File { .. } | ObjectKind::Directory { .. } => (),
98 ObjectKind::Symlink { .. } | ObjectKind::EncryptedSymlink { .. } => {
101 return Err(FxfsError::NotSupported.into());
102 }
103 ObjectKind::Graveyard => return Err(FxfsError::Inconsistent.into()),
104 }
105 let storage_size = attributes.allocated_size.try_into().map_err(|_| FxfsError::TooBig)?;
106 let old_project_id = attributes.project_id;
107 if old_project_id == Some(project_id) {
108 return Ok(());
109 }
110 attributes.project_id = Some(project_id);
111
112 transaction.add(
113 self.store_object_id,
114 Mutation::replace_or_insert_object(
115 object_key,
116 ObjectValue::Object { kind, attributes },
117 ),
118 );
119 transaction.add(
120 self.store_object_id,
121 Mutation::merge_object(
122 ObjectKey::project_usage(root_id, project_id),
123 ObjectValue::BytesAndNodes { bytes: storage_size, nodes: 1 },
124 ),
125 );
126 if let Some(old_project_id) = old_project_id {
127 transaction.add(
128 self.store_object_id,
129 Mutation::merge_object(
130 ObjectKey::project_usage(root_id, old_project_id),
131 ObjectValue::BytesAndNodes { bytes: -storage_size, nodes: -1 },
132 ),
133 );
134 }
135 transaction.commit().await?;
136 Ok(())
137 }
138
139 pub async fn get_project_for_node(&self, node_id: u64) -> Result<Option<ProjectId>, Error> {
141 match self.tree().find(&ObjectKey::object(node_id)).await?.ok_or(FxfsError::NotFound)?.value
142 {
143 ObjectValue::Object { attributes, .. } => match attributes.project_id {
144 id => Ok(id),
145 },
146 _ => return Err(FxfsError::Inconsistent.into()),
147 }
148 }
149
150 pub async fn clear_project_for_node(&self, node_id: u64) -> Result<(), Error> {
153 let root_id = self.root_directory_object_id();
154 let mut transaction = self
155 .new_transaction(
156 lock_keys![LockKey::object(self.store_object_id, node_id)],
157 Options::default(),
158 )
159 .await?;
160
161 let object_key = ObjectKey::object(node_id);
162 let (kind, mut attributes) =
163 match self.tree().find(&object_key).await?.ok_or(FxfsError::NotFound)?.value {
164 ObjectValue::Object { kind, attributes } => (kind, attributes),
165 _ => return Err(FxfsError::Inconsistent.into()),
166 };
167 let Some(old_project_id) = attributes.project_id else {
168 return Ok(());
169 };
170 match kind {
172 ObjectKind::File { .. } | ObjectKind::Directory { .. } => (),
173 ObjectKind::Symlink { .. } | ObjectKind::EncryptedSymlink { .. } => {
176 return Err(FxfsError::NotSupported.into());
177 }
178 ObjectKind::Graveyard => return Err(FxfsError::Inconsistent.into()),
179 }
180 attributes.project_id = None;
181 let storage_size = attributes.allocated_size;
182 transaction.add(
183 self.store_object_id,
184 Mutation::replace_or_insert_object(
185 object_key,
186 ObjectValue::Object { kind, attributes },
187 ),
188 );
189 transaction.add(
192 self.store_object_id,
193 Mutation::merge_object(
194 ObjectKey::project_usage(root_id, old_project_id),
195 ObjectValue::BytesAndNodes {
196 bytes: -(storage_size.try_into().map_err(|_| FxfsError::TooBig)?),
197 nodes: -1,
198 },
199 ),
200 );
201 transaction.commit().await?;
202 Ok(())
203 }
204
205 pub async fn list_projects(
210 &self,
211 start_id: Option<ProjectId>,
212 max_entries: usize,
213 ) -> Result<(Vec<ProjectId>, Option<ProjectId>), Error> {
214 let start_id = start_id.unwrap_or(ProjectId::SORTED_START);
215 let root_dir_id = self.root_directory_object_id();
216 let layer_set = self.tree().layer_set();
217 let mut merger = layer_set.merger();
218 let mut iter = merger
219 .query(Query::FullRange(&ObjectKey::project_limit(root_dir_id, start_id)))
220 .await?;
221 let mut entries = Vec::new();
222 let mut prev_entry: Option<ProjectId> = None;
223 let mut next_entry = None;
224 while let Some(ItemRef { key: ObjectKey { object_id, data: key_data }, value, .. }) =
225 iter.get()
226 {
227 if *object_id != root_dir_id {
229 break;
230 }
231 match key_data {
232 ObjectKeyData::Project { project_id, .. } => {
233 if *value != ObjectValue::None && prev_entry < Some(*project_id) {
235 if entries.len() == max_entries {
236 next_entry = Some(*project_id);
237 break;
238 }
239 prev_entry = Some(*project_id);
240 entries.push(*project_id);
241 }
242 }
243 _ => {
245 break;
246 }
247 }
248 iter.advance().await?;
249 }
250 Ok((entries, next_entry))
252 }
253
254 pub async fn project_info(
257 &self,
258 project_id: ProjectId,
259 ) -> Result<(Option<(u64, u64)>, Option<(u64, u64)>), Error> {
260 let root_id = self.root_directory_object_id();
261 let layer_set = self.tree().layer_set();
262 let mut merger = layer_set.merger();
263 let mut iter =
264 merger.query(Query::FullRange(&ObjectKey::project_limit(root_id, project_id))).await?;
265 let mut limit = None;
266 let mut usage = None;
267 while let Some(ItemRef { key: ObjectKey { object_id, data: key_data }, value, .. }) =
269 iter.get()
270 {
271 if *object_id != root_id {
273 break;
274 }
275 if let (
276 ObjectKeyData::Project { project_id: found_project_id, property },
277 ObjectValue::BytesAndNodes { bytes, nodes },
278 ) = (key_data, value)
279 {
280 if *found_project_id != project_id {
282 break;
283 }
284 let raw_value: (u64, u64) = (
285 (*bytes).try_into().map_err(|_| FxfsError::Inconsistent)?,
287 (*nodes).try_into().map_err(|_| FxfsError::Inconsistent)?,
288 );
289 match property {
290 ProjectProperty::Limit => limit = Some(raw_value),
291 ProjectProperty::Usage => usage = Some(raw_value),
292 }
293 } else {
294 break;
295 }
296 iter.advance().await?;
297 }
298 Ok((limit, usage))
299 }
300}
301
302#[derive(
303 Clone,
304 Copy,
305 PartialEq,
306 Eq,
307 PartialOrd,
308 Ord,
309 Debug,
310 Serialize,
311 Deserialize,
312 Hash,
313 TypeFingerprint,
314 SerializeKey,
315)]
316#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
317#[repr(transparent)]
318pub struct ProjectId(NonZeroU64);
319
320impl ProjectId {
321 pub const SORTED_START: Self = Self::new(1).unwrap();
322
323 pub const fn new(project_id: u64) -> Option<Self> {
324 match NonZeroU64::new(project_id) {
325 None => None,
326 Some(non_zero) => Some(Self(non_zero)),
327 }
328 }
329
330 pub const fn raw(self) -> u64 {
332 self.0.get()
333 }
334}
335
336impl log::kv::ToValue for ProjectId {
337 fn to_value(&self) -> log::kv::Value<'_> {
338 log::kv::Value::from(self.0)
339 }
340}
341
342impl std::fmt::Display for ProjectId {
343 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
344 std::fmt::Display::fmt(&self.0, f)
345 }
346}
347
348pub trait ProjectIdExt {
350 fn raw(self) -> u64;
352}
353
354impl ProjectIdExt for Option<ProjectId> {
355 fn raw(self) -> u64 {
356 match self {
357 None => 0,
358 Some(project_id) => project_id.raw(),
359 }
360 }
361}
362
363pub mod optional_project_id {
364 use super::{ProjectId, ProjectIdExt};
365 use serde::{Deserializer, Serializer};
366
367 pub fn serialize<S>(value: &Option<ProjectId>, serializer: S) -> Result<S::Ok, S::Error>
369 where
370 S: Serializer,
371 {
372 serializer.serialize_u64(value.raw())
373 }
374
375 pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<ProjectId>, D::Error>
377 where
378 D: Deserializer<'de>,
379 {
380 deserializer.deserialize_u64(Visitor)
381 }
382
383 struct Visitor;
384 impl<'de> serde::de::Visitor<'de> for Visitor {
385 type Value = Option<ProjectId>;
386
387 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
388 write!(formatter, "a u64",)
389 }
390
391 fn visit_u64<E>(self, raw: u64) -> Result<Self::Value, E>
392 where
393 E: serde::de::Error,
394 {
395 Ok(ProjectId::new(raw))
396 }
397 }
398
399 pub fn fingerprint<T>() -> String {
400 "u64".to_string()
401 }
402}
403
404#[cfg(test)]
405mod tests {
406 use super::{ProjectId, ProjectIdExt};
410 use crate::serialized_types::{LATEST_VERSION, Versioned};
411 use serde::{Deserialize, Serialize};
412
413 impl Versioned for ProjectId {}
416
417 #[test]
418 fn test_project_id_serialization_matches_u64() {
419 fn verify_matches(x: u64) {
420 let mut u64_buf = Vec::new();
422 x.serialize_into(&mut u64_buf).unwrap();
423
424 let project_id = ProjectId::new(x).unwrap();
426 let mut project_id_buf = Vec::new();
427 project_id.serialize_into(&mut project_id_buf).unwrap();
428
429 assert_eq!(u64_buf, project_id_buf);
431
432 let deserialized_project_id =
434 ProjectId::deserialize_from(&mut u64_buf.as_slice(), LATEST_VERSION).unwrap();
435 assert_eq!(deserialized_project_id, project_id);
436
437 let deserialized_u64 =
439 u64::deserialize_from(&mut project_id_buf.as_slice(), LATEST_VERSION).unwrap();
440 assert_eq!(deserialized_u64, x);
441 }
442
443 verify_matches(1);
444 verify_matches(2);
445 verify_matches(u16::MAX as u64 - 1);
446 verify_matches(u16::MAX as u64);
447 verify_matches(u16::MAX as u64 + 1);
448 verify_matches(u32::MAX as u64 - 1);
449 verify_matches(u32::MAX as u64);
450 verify_matches(u32::MAX as u64 + 1);
451 verify_matches(u64::MAX - 1);
452 verify_matches(u64::MAX);
453 }
454
455 #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Versioned)]
456 struct OptionWrapper {
457 #[serde(with = "super::optional_project_id")]
458 project_id: Option<ProjectId>,
459 }
460
461 #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Versioned)]
462 struct U64Wrapper {
463 project_id: u64,
464 }
465
466 #[test]
467 fn test_optional_project_id_serialization_matches_u64() {
468 fn verify_matches(x: u64) {
469 let u64_wrapper = U64Wrapper { project_id: x };
471 let mut u64_buf = Vec::new();
472 u64_wrapper.serialize_into(&mut u64_buf).unwrap();
473
474 let opt_project_id = OptionWrapper { project_id: ProjectId::new(x) };
476 let mut opt_buf = Vec::new();
477 opt_project_id.serialize_into(&mut opt_buf).unwrap();
478
479 assert_eq!(u64_buf, opt_buf);
481
482 let deserialized_opt =
484 OptionWrapper::deserialize_from(&mut u64_buf.as_slice(), LATEST_VERSION).unwrap();
485 assert_eq!(deserialized_opt, opt_project_id);
486
487 let deserialized_u64 =
489 U64Wrapper::deserialize_from(&mut opt_buf.as_slice(), LATEST_VERSION).unwrap();
490 assert_eq!(deserialized_u64, u64_wrapper);
491 }
492
493 verify_matches(0);
494 verify_matches(1);
495 verify_matches(2);
496 verify_matches(u16::MAX as u64 - 1);
497 verify_matches(u16::MAX as u64);
498 verify_matches(u16::MAX as u64 + 1);
499 verify_matches(u32::MAX as u64 - 1);
500 verify_matches(u32::MAX as u64);
501 verify_matches(u32::MAX as u64 + 1);
502 verify_matches(u64::MAX - 1);
503 verify_matches(u64::MAX);
504 }
505
506 #[test]
507 fn test_option_project_id_sorting() {
508 let none = ProjectId::new(0);
509 assert!(none.is_none());
510
511 let one = ProjectId::new(1);
512 let max = ProjectId::new(u64::MAX);
513
514 assert!(none < one);
516 assert!(none < max);
517 assert!(one < max);
518 }
519
520 #[test]
521 fn test_project_id_raw() {
522 assert_eq!(ProjectId::new(0).raw(), 0);
523 assert_eq!(ProjectId::new(1).raw(), 1);
524 assert_eq!(ProjectId::new(u64::MAX).raw(), u64::MAX);
525 }
526}