1use crate::errors::FxfsError;
6use crate::lsm_tree::Query;
7use crate::lsm_tree::types::{ItemRef, LayerIterator};
8use crate::object_store::transaction::{LockKey, Mutation, Options, lock_keys};
9use crate::object_store::{
10 ObjectKey, ObjectKeyData, ObjectKind, ObjectStore, ObjectValue, ProjectProperty,
11};
12use anyhow::Error;
13use fprint::TypeFingerprint;
14use fxfs_macros::SerializeKey;
15use serde::{Deserialize, Serialize};
16use std::num::NonZeroU64;
17
18impl ObjectStore {
19 pub async fn set_project_limit(
22 &self,
23 project_id: ProjectId,
24 bytes: u64,
25 nodes: u64,
26 ) -> Result<(), Error> {
27 let root_id = self.root_directory_object_id();
28 let mut transaction = self
29 .filesystem()
30 .new_transaction(
31 lock_keys![LockKey::ProjectId {
32 store_object_id: self.store_object_id,
33 project_id
34 }],
35 Options::default(),
36 )
37 .await?;
38 transaction.add(
39 self.store_object_id,
40 Mutation::replace_or_insert_object(
41 ObjectKey::project_limit(root_id, project_id),
42 ObjectValue::BytesAndNodes {
43 bytes: bytes.try_into().map_err(|_| FxfsError::TooBig)?,
44 nodes: nodes.try_into().map_err(|_| FxfsError::TooBig)?,
45 },
46 ),
47 );
48 transaction.commit().await?;
49 Ok(())
50 }
51
52 pub async fn clear_project_limit(&self, project_id: ProjectId) -> Result<(), Error> {
55 let root_id = self.root_directory_object_id();
56 let mut transaction = self
57 .filesystem()
58 .new_transaction(
59 lock_keys![LockKey::ProjectId {
60 store_object_id: self.store_object_id,
61 project_id
62 }],
63 Options::default(),
64 )
65 .await?;
66 transaction.add(
67 self.store_object_id,
68 Mutation::replace_or_insert_object(
69 ObjectKey::project_limit(root_id, project_id),
70 ObjectValue::None,
71 ),
72 );
73 transaction.commit().await?;
74 Ok(())
75 }
76
77 pub async fn set_project_for_node(
79 &self,
80 node_id: u64,
81 project_id: ProjectId,
82 ) -> Result<(), Error> {
83 let root_id = self.root_directory_object_id();
84 let mut transaction = self
85 .filesystem()
86 .new_transaction(
87 lock_keys![LockKey::object(self.store_object_id, node_id)],
88 Options::default(),
89 )
90 .await?;
91
92 let object_key = ObjectKey::object(node_id);
93 let (kind, mut attributes) =
94 match self.tree().find(&object_key).await?.ok_or(FxfsError::NotFound)?.value {
95 ObjectValue::Object { kind, attributes } => (kind, attributes),
96 _ => return Err(FxfsError::Inconsistent.into()),
97 };
98 match kind {
100 ObjectKind::File { .. } | ObjectKind::Directory { .. } => (),
101 ObjectKind::Symlink { .. } | ObjectKind::EncryptedSymlink { .. } => {
104 return Err(FxfsError::NotSupported.into());
105 }
106 ObjectKind::Graveyard => return Err(FxfsError::Inconsistent.into()),
107 }
108 let storage_size = attributes.allocated_size.try_into().map_err(|_| FxfsError::TooBig)?;
109 let old_project_id = attributes.project_id;
110 if old_project_id == Some(project_id) {
111 return Ok(());
112 }
113 attributes.project_id = Some(project_id);
114
115 transaction.add(
116 self.store_object_id,
117 Mutation::replace_or_insert_object(
118 object_key,
119 ObjectValue::Object { kind, attributes },
120 ),
121 );
122 transaction.add(
123 self.store_object_id,
124 Mutation::merge_object(
125 ObjectKey::project_usage(root_id, project_id),
126 ObjectValue::BytesAndNodes { bytes: storage_size, nodes: 1 },
127 ),
128 );
129 if let Some(old_project_id) = old_project_id {
130 transaction.add(
131 self.store_object_id,
132 Mutation::merge_object(
133 ObjectKey::project_usage(root_id, old_project_id),
134 ObjectValue::BytesAndNodes { bytes: -storage_size, nodes: -1 },
135 ),
136 );
137 }
138 transaction.commit().await?;
139 Ok(())
140 }
141
142 pub async fn get_project_for_node(&self, node_id: u64) -> Result<Option<ProjectId>, Error> {
144 match self.tree().find(&ObjectKey::object(node_id)).await?.ok_or(FxfsError::NotFound)?.value
145 {
146 ObjectValue::Object { attributes, .. } => match attributes.project_id {
147 id => Ok(id),
148 },
149 _ => return Err(FxfsError::Inconsistent.into()),
150 }
151 }
152
153 pub async fn clear_project_for_node(&self, node_id: u64) -> Result<(), Error> {
156 let root_id = self.root_directory_object_id();
157 let mut transaction = self
158 .filesystem()
159 .new_transaction(
160 lock_keys![LockKey::object(self.store_object_id, node_id)],
161 Options::default(),
162 )
163 .await?;
164
165 let object_key = ObjectKey::object(node_id);
166 let (kind, mut attributes) =
167 match self.tree().find(&object_key).await?.ok_or(FxfsError::NotFound)?.value {
168 ObjectValue::Object { kind, attributes } => (kind, attributes),
169 _ => return Err(FxfsError::Inconsistent.into()),
170 };
171 let Some(old_project_id) = attributes.project_id else {
172 return Ok(());
173 };
174 match kind {
176 ObjectKind::File { .. } | ObjectKind::Directory { .. } => (),
177 ObjectKind::Symlink { .. } | ObjectKind::EncryptedSymlink { .. } => {
180 return Err(FxfsError::NotSupported.into());
181 }
182 ObjectKind::Graveyard => return Err(FxfsError::Inconsistent.into()),
183 }
184 attributes.project_id = None;
185 let storage_size = attributes.allocated_size;
186 transaction.add(
187 self.store_object_id,
188 Mutation::replace_or_insert_object(
189 object_key,
190 ObjectValue::Object { kind, attributes },
191 ),
192 );
193 transaction.add(
196 self.store_object_id,
197 Mutation::merge_object(
198 ObjectKey::project_usage(root_id, old_project_id),
199 ObjectValue::BytesAndNodes {
200 bytes: -(storage_size.try_into().map_err(|_| FxfsError::TooBig)?),
201 nodes: -1,
202 },
203 ),
204 );
205 transaction.commit().await?;
206 Ok(())
207 }
208
209 pub async fn list_projects(
214 &self,
215 start_id: Option<ProjectId>,
216 max_entries: usize,
217 ) -> Result<(Vec<ProjectId>, Option<ProjectId>), Error> {
218 let start_id = start_id.unwrap_or(ProjectId::SORTED_START);
219 let root_dir_id = self.root_directory_object_id();
220 let layer_set = self.tree().layer_set();
221 let mut merger = layer_set.merger();
222 let mut iter = merger
223 .query(Query::FullRange(&ObjectKey::project_limit(root_dir_id, start_id)))
224 .await?;
225 let mut entries = Vec::new();
226 let mut prev_entry: Option<ProjectId> = None;
227 let mut next_entry = None;
228 while let Some(ItemRef { key: ObjectKey { object_id, data: key_data }, value, .. }) =
229 iter.get()
230 {
231 if *object_id != root_dir_id {
233 break;
234 }
235 match key_data {
236 ObjectKeyData::Project { project_id, .. } => {
237 if *value != ObjectValue::None && prev_entry < Some(*project_id) {
239 if entries.len() == max_entries {
240 next_entry = Some(*project_id);
241 break;
242 }
243 prev_entry = Some(*project_id);
244 entries.push(*project_id);
245 }
246 }
247 _ => {
249 break;
250 }
251 }
252 iter.advance().await?;
253 }
254 Ok((entries, next_entry))
256 }
257
258 pub async fn project_info(
261 &self,
262 project_id: ProjectId,
263 ) -> Result<(Option<(u64, u64)>, Option<(u64, u64)>), Error> {
264 let root_id = self.root_directory_object_id();
265 let layer_set = self.tree().layer_set();
266 let mut merger = layer_set.merger();
267 let mut iter =
268 merger.query(Query::FullRange(&ObjectKey::project_limit(root_id, project_id))).await?;
269 let mut limit = None;
270 let mut usage = None;
271 while let Some(ItemRef { key: ObjectKey { object_id, data: key_data }, value, .. }) =
273 iter.get()
274 {
275 if *object_id != root_id {
277 break;
278 }
279 if let (
280 ObjectKeyData::Project { project_id: found_project_id, property },
281 ObjectValue::BytesAndNodes { bytes, nodes },
282 ) = (key_data, value)
283 {
284 if *found_project_id != project_id {
286 break;
287 }
288 let raw_value: (u64, u64) = (
289 (*bytes).try_into().map_err(|_| FxfsError::Inconsistent)?,
291 (*nodes).try_into().map_err(|_| FxfsError::Inconsistent)?,
292 );
293 match property {
294 ProjectProperty::Limit => limit = Some(raw_value),
295 ProjectProperty::Usage => usage = Some(raw_value),
296 }
297 } else {
298 break;
299 }
300 iter.advance().await?;
301 }
302 Ok((limit, usage))
303 }
304}
305
306#[derive(
307 Clone,
308 Copy,
309 PartialEq,
310 Eq,
311 PartialOrd,
312 Ord,
313 Debug,
314 Serialize,
315 Deserialize,
316 Hash,
317 TypeFingerprint,
318 SerializeKey,
319)]
320#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
321#[repr(transparent)]
322pub struct ProjectId(NonZeroU64);
323
324impl ProjectId {
325 pub const SORTED_START: Self = Self::new(1).unwrap();
326
327 pub const fn new(project_id: u64) -> Option<Self> {
328 match NonZeroU64::new(project_id) {
329 None => None,
330 Some(non_zero) => Some(Self(non_zero)),
331 }
332 }
333
334 pub const fn raw(self) -> u64 {
336 self.0.get()
337 }
338}
339
340impl log::kv::ToValue for ProjectId {
341 fn to_value(&self) -> log::kv::Value<'_> {
342 log::kv::Value::from(self.0)
343 }
344}
345
346impl std::fmt::Display for ProjectId {
347 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348 std::fmt::Display::fmt(&self.0, f)
349 }
350}
351
352pub trait ProjectIdExt {
354 fn raw(self) -> u64;
356}
357
358impl ProjectIdExt for Option<ProjectId> {
359 fn raw(self) -> u64 {
360 match self {
361 None => 0,
362 Some(project_id) => project_id.raw(),
363 }
364 }
365}
366
367pub mod optional_project_id {
368 use super::{ProjectId, ProjectIdExt};
369 use serde::{Deserializer, Serializer};
370
371 pub fn serialize<S>(value: &Option<ProjectId>, serializer: S) -> Result<S::Ok, S::Error>
373 where
374 S: Serializer,
375 {
376 serializer.serialize_u64(value.raw())
377 }
378
379 pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<ProjectId>, D::Error>
381 where
382 D: Deserializer<'de>,
383 {
384 deserializer.deserialize_u64(Visitor)
385 }
386
387 struct Visitor;
388 impl<'de> serde::de::Visitor<'de> for Visitor {
389 type Value = Option<ProjectId>;
390
391 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
392 write!(formatter, "a u64",)
393 }
394
395 fn visit_u64<E>(self, raw: u64) -> Result<Self::Value, E>
396 where
397 E: serde::de::Error,
398 {
399 Ok(ProjectId::new(raw))
400 }
401 }
402
403 pub fn fingerprint<T>() -> String {
404 "u64".to_string()
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use super::{ProjectId, ProjectIdExt};
414 use crate::serialized_types::{LATEST_VERSION, Versioned};
415 use serde::{Deserialize, Serialize};
416
417 impl Versioned for ProjectId {}
420
421 #[test]
422 fn test_project_id_serialization_matches_u64() {
423 fn verify_matches(x: u64) {
424 let mut u64_buf = Vec::new();
426 x.serialize_into(&mut u64_buf).unwrap();
427
428 let project_id = ProjectId::new(x).unwrap();
430 let mut project_id_buf = Vec::new();
431 project_id.serialize_into(&mut project_id_buf).unwrap();
432
433 assert_eq!(u64_buf, project_id_buf);
435
436 let deserialized_project_id =
438 ProjectId::deserialize_from(&mut u64_buf.as_slice(), LATEST_VERSION).unwrap();
439 assert_eq!(deserialized_project_id, project_id);
440
441 let deserialized_u64 =
443 u64::deserialize_from(&mut project_id_buf.as_slice(), LATEST_VERSION).unwrap();
444 assert_eq!(deserialized_u64, x);
445 }
446
447 verify_matches(1);
448 verify_matches(2);
449 verify_matches(u16::MAX as u64 - 1);
450 verify_matches(u16::MAX as u64);
451 verify_matches(u16::MAX as u64 + 1);
452 verify_matches(u32::MAX as u64 - 1);
453 verify_matches(u32::MAX as u64);
454 verify_matches(u32::MAX as u64 + 1);
455 verify_matches(u64::MAX - 1);
456 verify_matches(u64::MAX);
457 }
458
459 #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Versioned)]
460 struct OptionWrapper {
461 #[serde(with = "super::optional_project_id")]
462 project_id: Option<ProjectId>,
463 }
464
465 #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Versioned)]
466 struct U64Wrapper {
467 project_id: u64,
468 }
469
470 #[test]
471 fn test_optional_project_id_serialization_matches_u64() {
472 fn verify_matches(x: u64) {
473 let u64_wrapper = U64Wrapper { project_id: x };
475 let mut u64_buf = Vec::new();
476 u64_wrapper.serialize_into(&mut u64_buf).unwrap();
477
478 let opt_project_id = OptionWrapper { project_id: ProjectId::new(x) };
480 let mut opt_buf = Vec::new();
481 opt_project_id.serialize_into(&mut opt_buf).unwrap();
482
483 assert_eq!(u64_buf, opt_buf);
485
486 let deserialized_opt =
488 OptionWrapper::deserialize_from(&mut u64_buf.as_slice(), LATEST_VERSION).unwrap();
489 assert_eq!(deserialized_opt, opt_project_id);
490
491 let deserialized_u64 =
493 U64Wrapper::deserialize_from(&mut opt_buf.as_slice(), LATEST_VERSION).unwrap();
494 assert_eq!(deserialized_u64, u64_wrapper);
495 }
496
497 verify_matches(0);
498 verify_matches(1);
499 verify_matches(2);
500 verify_matches(u16::MAX as u64 - 1);
501 verify_matches(u16::MAX as u64);
502 verify_matches(u16::MAX as u64 + 1);
503 verify_matches(u32::MAX as u64 - 1);
504 verify_matches(u32::MAX as u64);
505 verify_matches(u32::MAX as u64 + 1);
506 verify_matches(u64::MAX - 1);
507 verify_matches(u64::MAX);
508 }
509
510 #[test]
511 fn test_option_project_id_sorting() {
512 let none = ProjectId::new(0);
513 assert!(none.is_none());
514
515 let one = ProjectId::new(1);
516 let max = ProjectId::new(u64::MAX);
517
518 assert!(none < one);
520 assert!(none < max);
521 assert!(one < max);
522 }
523
524 #[test]
525 fn test_project_id_raw() {
526 assert_eq!(ProjectId::new(0).raw(), 0);
527 assert_eq!(ProjectId::new(1).raw(), 1);
528 assert_eq!(ProjectId::new(u64::MAX).raw(), u64::MAX);
529 }
530}