1use crate::errors::FxfsError;
6use crate::lsm_tree::types::{ItemRef, LayerIterator};
7use crate::lsm_tree::Query;
8use crate::object_store::transaction::{lock_keys, LockKey, Mutation, Options};
9use crate::object_store::{
10 ObjectKey, ObjectKeyData, ObjectKind, ObjectStore, ObjectValue, ProjectProperty,
11};
12use anyhow::{ensure, Error};
13
14impl ObjectStore {
15 pub async fn set_project_limit(
18 &self,
19 project_id: u64,
20 bytes: u64,
21 nodes: u64,
22 ) -> Result<(), Error> {
23 ensure!(project_id != 0, FxfsError::OutOfRange);
24 let root_id = self.root_directory_object_id();
25 let mut transaction = self
26 .filesystem()
27 .new_transaction(
28 lock_keys![LockKey::ProjectId {
29 store_object_id: self.store_object_id,
30 project_id
31 }],
32 Options::default(),
33 )
34 .await?;
35 transaction.add(
36 self.store_object_id,
37 Mutation::replace_or_insert_object(
38 ObjectKey::project_limit(root_id, project_id),
39 ObjectValue::BytesAndNodes {
40 bytes: bytes.try_into().map_err(|_| FxfsError::TooBig)?,
41 nodes: nodes.try_into().map_err(|_| FxfsError::TooBig)?,
42 },
43 ),
44 );
45 transaction.commit().await?;
46 Ok(())
47 }
48
49 pub async fn clear_project_limit(&self, project_id: u64) -> Result<(), Error> {
52 let root_id = self.root_directory_object_id();
53 let mut transaction = self
54 .filesystem()
55 .new_transaction(
56 lock_keys![LockKey::ProjectId {
57 store_object_id: self.store_object_id,
58 project_id
59 }],
60 Options::default(),
61 )
62 .await?;
63 transaction.add(
64 self.store_object_id,
65 Mutation::replace_or_insert_object(
66 ObjectKey::project_limit(root_id, project_id),
67 ObjectValue::None,
68 ),
69 );
70 transaction.commit().await?;
71 Ok(())
72 }
73
74 pub async fn set_project_for_node(&self, node_id: u64, project_id: u64) -> Result<(), Error> {
76 ensure!(project_id != 0, FxfsError::OutOfRange);
77 let root_id = self.root_directory_object_id();
78 let mut transaction = self
79 .filesystem()
80 .new_transaction(
81 lock_keys![LockKey::object(self.store_object_id, node_id)],
82 Options::default(),
83 )
84 .await?;
85
86 let object_key = ObjectKey::object(node_id);
87 let (kind, mut attributes) =
88 match self.tree().find(&object_key).await?.ok_or(FxfsError::NotFound)?.value {
89 ObjectValue::Object { kind, attributes } => (kind, attributes),
90 _ => return Err(FxfsError::Inconsistent.into()),
91 };
92 match kind {
94 ObjectKind::File { .. } | ObjectKind::Directory { .. } => (),
95 ObjectKind::Symlink { .. } => return Err(FxfsError::NotSupported.into()),
98 ObjectKind::Graveyard => return Err(FxfsError::Inconsistent.into()),
99 }
100 let storage_size = attributes.allocated_size.try_into().map_err(|_| FxfsError::TooBig)?;
101 let old_project_id = attributes.project_id;
102 if old_project_id == project_id {
103 return Ok(());
104 }
105 attributes.project_id = project_id;
106
107 transaction.add(
108 self.store_object_id,
109 Mutation::replace_or_insert_object(
110 object_key,
111 ObjectValue::Object { kind, attributes },
112 ),
113 );
114 transaction.add(
115 self.store_object_id,
116 Mutation::merge_object(
117 ObjectKey::project_usage(root_id, project_id),
118 ObjectValue::BytesAndNodes { bytes: storage_size, nodes: 1 },
119 ),
120 );
121 if old_project_id != 0 {
122 transaction.add(
123 self.store_object_id,
124 Mutation::merge_object(
125 ObjectKey::project_usage(root_id, old_project_id),
126 ObjectValue::BytesAndNodes { bytes: -storage_size, nodes: -1 },
127 ),
128 );
129 }
130 transaction.commit().await?;
131 Ok(())
132 }
133
134 pub async fn get_project_for_node(&self, node_id: u64) -> Result<u64, Error> {
136 match self.tree().find(&ObjectKey::object(node_id)).await?.ok_or(FxfsError::NotFound)?.value
137 {
138 ObjectValue::Object { attributes, .. } => match attributes.project_id {
139 id => Ok(id),
140 },
141 _ => return Err(FxfsError::Inconsistent.into()),
142 }
143 }
144
145 pub async fn clear_project_for_node(&self, node_id: u64) -> Result<(), Error> {
148 let root_id = self.root_directory_object_id();
149 let mut transaction = self
150 .filesystem()
151 .new_transaction(
152 lock_keys![LockKey::object(self.store_object_id, node_id)],
153 Options::default(),
154 )
155 .await?;
156
157 let object_key = ObjectKey::object(node_id);
158 let (kind, mut attributes) =
159 match self.tree().find(&object_key).await?.ok_or(FxfsError::NotFound)?.value {
160 ObjectValue::Object { kind, attributes } => (kind, attributes),
161 _ => return Err(FxfsError::Inconsistent.into()),
162 };
163 if attributes.project_id == 0 {
164 return Ok(());
165 }
166 match kind {
168 ObjectKind::File { .. } | ObjectKind::Directory { .. } => (),
169 ObjectKind::Symlink { .. } => return Err(FxfsError::NotSupported.into()),
172 ObjectKind::Graveyard => return Err(FxfsError::Inconsistent.into()),
173 }
174 let old_project_id = attributes.project_id;
175 attributes.project_id = 0;
176 let storage_size = attributes.allocated_size;
177 transaction.add(
178 self.store_object_id,
179 Mutation::replace_or_insert_object(
180 object_key,
181 ObjectValue::Object { kind, attributes },
182 ),
183 );
184 transaction.add(
187 self.store_object_id,
188 Mutation::merge_object(
189 ObjectKey::project_usage(root_id, old_project_id),
190 ObjectValue::BytesAndNodes {
191 bytes: -(storage_size.try_into().map_err(|_| FxfsError::TooBig)?),
192 nodes: -1,
193 },
194 ),
195 );
196 transaction.commit().await?;
197 Ok(())
198 }
199
200 pub async fn list_projects(
205 &self,
206 start_id: u64,
207 max_entries: usize,
208 ) -> Result<(Vec<u64>, Option<u64>), Error> {
209 let root_dir_id = self.root_directory_object_id();
210 let layer_set = self.tree().layer_set();
211 let mut merger = layer_set.merger();
212 let mut iter = merger
213 .query(Query::FullRange(&ObjectKey::project_limit(root_dir_id, start_id)))
214 .await?;
215 let mut entries = Vec::new();
216 let mut prev_entry = 0;
217 let mut next_entry = None;
218 while let Some(ItemRef { key: ObjectKey { object_id, data: key_data }, value, .. }) =
219 iter.get()
220 {
221 if *object_id != root_dir_id {
223 break;
224 }
225 match key_data {
226 ObjectKeyData::Project { project_id, .. } => {
227 if *value != ObjectValue::None && prev_entry < *project_id {
229 if entries.len() == max_entries {
230 next_entry = Some(*project_id);
231 break;
232 }
233 prev_entry = *project_id;
234 entries.push(*project_id);
235 }
236 }
237 _ => {
239 break;
240 }
241 }
242 iter.advance().await?;
243 }
244 Ok((entries, next_entry))
246 }
247
248 pub async fn project_info(
251 &self,
252 project_id: u64,
253 ) -> Result<(Option<(u64, u64)>, Option<(u64, u64)>), Error> {
254 let root_id = self.root_directory_object_id();
255 let layer_set = self.tree().layer_set();
256 let mut merger = layer_set.merger();
257 let mut iter =
258 merger.query(Query::FullRange(&ObjectKey::project_limit(root_id, project_id))).await?;
259 let mut limit = None;
260 let mut usage = None;
261 while let Some(ItemRef { key: ObjectKey { object_id, data: key_data }, value, .. }) =
263 iter.get()
264 {
265 if *object_id != root_id {
267 break;
268 }
269 if let (
270 ObjectKeyData::Project { project_id: found_project_id, property },
271 ObjectValue::BytesAndNodes { bytes, nodes },
272 ) = (key_data, value)
273 {
274 if *found_project_id != project_id {
276 break;
277 }
278 let raw_value: (u64, u64) = (
279 (*bytes).try_into().map_err(|_| FxfsError::Inconsistent)?,
281 (*nodes).try_into().map_err(|_| FxfsError::Inconsistent)?,
282 );
283 match property {
284 ProjectProperty::Limit => limit = Some(raw_value),
285 ProjectProperty::Usage => usage = Some(raw_value),
286 }
287 } else {
288 break;
289 }
290 iter.advance().await?;
291 }
292 Ok((limit, usage))
293 }
294}
295
296