1use {
4 crate::{
5 error::Error,
6 metadata::{MetadataPath, MetadataVersion, TargetPath},
7 pouf::Pouf,
8 repository::{RepositoryProvider, RepositoryStorage},
9 Result,
10 },
11 futures_io::AsyncRead,
12 futures_util::{
13 future::{BoxFuture, FutureExt},
14 io::{AsyncReadExt, Cursor},
15 },
16 std::{
17 collections::HashMap,
18 marker::PhantomData,
19 sync::{Arc, RwLock},
20 },
21};
22
23#[derive(Debug, Default)]
25pub struct EphemeralRepository<D> {
26 inner: RwLock<Inner>,
27 _pouf: PhantomData<D>,
28}
29
30type MetadataMap = HashMap<(MetadataPath, MetadataVersion), Arc<[u8]>>;
31type TargetsMap = HashMap<TargetPath, Arc<[u8]>>;
32
33#[derive(Debug, Default)]
34struct Inner {
35 version: u64,
36 metadata: MetadataMap,
37 targets: TargetsMap,
38}
39
40impl<D> EphemeralRepository<D>
41where
42 D: Pouf,
43{
44 pub fn new() -> Self {
46 Self {
47 inner: RwLock::new(Inner {
48 version: 0,
49 metadata: MetadataMap::new(),
50 targets: TargetsMap::new(),
51 }),
52 _pouf: PhantomData,
53 }
54 }
55
56 pub fn batch_update(&self) -> EphemeralBatchUpdate<'_, D> {
59 EphemeralBatchUpdate {
60 initial_parent_version: self.inner.read().unwrap().version,
61 parent_repo: &self.inner,
62 staging_repo: RwLock::new(Inner {
63 version: 0,
64 metadata: MetadataMap::new(),
65 targets: TargetsMap::new(),
66 }),
67 _pouf: self._pouf,
68 }
69 }
70
71 #[cfg(test)]
72 pub(crate) fn metadata(&self) -> MetadataMap {
73 self.inner.read().unwrap().metadata.clone()
74 }
75}
76
77impl<D> RepositoryProvider<D> for EphemeralRepository<D>
78where
79 D: Pouf,
80{
81 fn fetch_metadata<'a>(
82 &'a self,
83 meta_path: &MetadataPath,
84 version: MetadataVersion,
85 ) -> BoxFuture<'a, Result<Box<dyn AsyncRead + Send + Unpin + 'a>>> {
86 let bytes = match self
87 .inner
88 .read()
89 .unwrap()
90 .metadata
91 .get(&(meta_path.clone(), version))
92 {
93 Some(bytes) => Ok(Arc::clone(bytes)),
94 None => Err(Error::MetadataNotFound {
95 path: meta_path.clone(),
96 version,
97 }),
98 };
99 bytes_to_reader(bytes).boxed()
100 }
101
102 fn fetch_target<'a>(
103 &'a self,
104 target_path: &TargetPath,
105 ) -> BoxFuture<'a, Result<Box<dyn AsyncRead + Send + Unpin + 'a>>> {
106 let bytes = match self.inner.read().unwrap().targets.get(target_path) {
107 Some(bytes) => Ok(Arc::clone(bytes)),
108 None => Err(Error::TargetNotFound(target_path.clone())),
109 };
110 bytes_to_reader(bytes).boxed()
111 }
112}
113
114impl<D> RepositoryStorage<D> for EphemeralRepository<D>
115where
116 D: Pouf,
117{
118 fn store_metadata<'a>(
119 &'a self,
120 meta_path: &MetadataPath,
121 version: MetadataVersion,
122 metadata: &'a mut (dyn AsyncRead + Send + Unpin + 'a),
123 ) -> BoxFuture<'a, Result<()>> {
124 store_metadata(&self.inner, meta_path, version, metadata)
125 }
126
127 fn store_target<'a>(
128 &'a self,
129 target_path: &TargetPath,
130 read: &'a mut (dyn AsyncRead + Send + Unpin + 'a),
131 ) -> BoxFuture<'a, Result<()>> {
132 store_target(&self.inner, target_path, read)
133 }
134}
135
136#[derive(Debug)]
142pub struct EphemeralBatchUpdate<'a, D> {
143 initial_parent_version: u64,
144 parent_repo: &'a RwLock<Inner>,
145 staging_repo: RwLock<Inner>,
146 _pouf: PhantomData<D>,
147}
148
149#[derive(Debug, thiserror::Error)]
151pub enum CommitError {
152 #[error("conflicting change occurred during commit")]
154 Conflict,
155}
156
157impl<D> EphemeralBatchUpdate<'_, D>
158where
159 D: Pouf,
160{
161 pub async fn commit(self) -> std::result::Result<(), CommitError> {
164 let mut parent_repo = self.parent_repo.write().unwrap();
165
166 if self.initial_parent_version != parent_repo.version {
168 return Err(CommitError::Conflict);
169 }
170
171 let staging_repo = self.staging_repo.into_inner().unwrap();
173 parent_repo.metadata.extend(staging_repo.metadata);
174 parent_repo.targets.extend(staging_repo.targets);
175
176 parent_repo.version += 1;
178
179 Ok(())
180 }
181}
182
183impl<D> RepositoryProvider<D> for EphemeralBatchUpdate<'_, D>
184where
185 D: Pouf,
186{
187 fn fetch_metadata<'a>(
188 &'a self,
189 meta_path: &MetadataPath,
190 version: MetadataVersion,
191 ) -> BoxFuture<'a, Result<Box<dyn AsyncRead + Send + Unpin + 'a>>> {
192 let key = (meta_path.clone(), version);
193 let bytes = if let Some(bytes) = self.staging_repo.read().unwrap().metadata.get(&key) {
194 Ok(Arc::clone(bytes))
195 } else {
196 self.parent_repo
197 .read()
198 .unwrap()
199 .metadata
200 .get(&key)
201 .cloned()
202 .ok_or_else(|| Error::MetadataNotFound {
203 path: meta_path.clone(),
204 version,
205 })
206 };
207 bytes_to_reader(bytes).boxed()
208 }
209
210 fn fetch_target<'a>(
211 &'a self,
212 target_path: &TargetPath,
213 ) -> BoxFuture<'a, Result<Box<dyn AsyncRead + Send + Unpin + 'a>>> {
214 let bytes = if let Some(bytes) = self.staging_repo.read().unwrap().targets.get(target_path)
215 {
216 Ok(Arc::clone(bytes))
217 } else {
218 self.parent_repo
219 .read()
220 .unwrap()
221 .targets
222 .get(target_path)
223 .cloned()
224 .ok_or_else(|| Error::TargetNotFound(target_path.clone()))
225 };
226 bytes_to_reader(bytes).boxed()
227 }
228}
229
230impl<D> RepositoryStorage<D> for EphemeralBatchUpdate<'_, D>
231where
232 D: Pouf,
233{
234 fn store_metadata<'a>(
235 &'a self,
236 meta_path: &MetadataPath,
237 version: MetadataVersion,
238 metadata: &'a mut (dyn AsyncRead + Send + Unpin),
239 ) -> BoxFuture<'a, Result<()>> {
240 store_metadata(&self.staging_repo, meta_path, version, metadata)
241 }
242
243 fn store_target<'a>(
244 &'a self,
245 target_path: &TargetPath,
246 read: &'a mut (dyn AsyncRead + Send + Unpin),
247 ) -> BoxFuture<'a, Result<()>> {
248 store_target(&self.staging_repo, target_path, read)
249 }
250}
251
252fn store_metadata<'a>(
253 inner: &'a RwLock<Inner>,
254 meta_path: &MetadataPath,
255 version: MetadataVersion,
256 metadata: &'a mut (dyn AsyncRead + Send + Unpin),
257) -> BoxFuture<'a, Result<()>> {
258 let meta_path = meta_path.clone();
259 async move {
260 let mut buf = Vec::new();
261 metadata.read_to_end(&mut buf).await?;
262 buf.shrink_to_fit();
263
264 let mut inner = inner.write().unwrap();
265
266 inner.metadata.insert((meta_path, version), buf.into());
267
268 inner.version += 1;
270
271 Ok(())
272 }
273 .boxed()
274}
275
276fn store_target<'a>(
277 inner: &'a RwLock<Inner>,
278 target_path: &TargetPath,
279 read: &'a mut (dyn AsyncRead + Send + Unpin + 'a),
280) -> BoxFuture<'a, Result<()>> {
281 let target_path = target_path.clone();
282 async move {
283 let mut buf = Vec::new();
284 read.read_to_end(&mut buf).await?;
285 buf.shrink_to_fit();
286
287 let mut inner = inner.write().unwrap();
288
289 inner.targets.insert(target_path, buf.into());
290
291 inner.version += 1;
293
294 Ok(())
295 }
296 .boxed()
297}
298
299#[allow(clippy::borrowed_box)]
300async fn bytes_to_reader<'a>(
301 bytes: Result<Arc<[u8]>>,
302) -> Result<Box<dyn AsyncRead + Send + Unpin + 'a>> {
303 let bytes = bytes?;
304 let reader: Box<dyn AsyncRead + Send + Unpin> = Box::new(Cursor::new(bytes));
305 Ok(reader)
306}
307
308#[cfg(test)]
309mod test {
310 use super::*;
311 use crate::pouf::Pouf1;
312 use crate::repository::{fetch_metadata_to_string, fetch_target_to_string};
313 use assert_matches::assert_matches;
314 use futures_executor::block_on;
315
316 #[test]
317 fn ephemeral_repo_targets() {
318 block_on(async {
319 let repo = EphemeralRepository::<Pouf1>::new();
320
321 let path = TargetPath::new("batty").unwrap();
322 if let Err(err) = repo.fetch_target(&path).await {
323 assert_matches!(err, Error::TargetNotFound(p) if p == path);
324 } else {
325 panic!("expected fetch_target to fail");
326 }
327
328 let data: &[u8] = b"like tears in the rain";
329 let path = TargetPath::new("batty").unwrap();
330 repo.store_target(&path, &mut &*data).await.unwrap();
331
332 let mut read = repo.fetch_target(&path).await.unwrap();
333 let mut buf = Vec::new();
334 read.read_to_end(&mut buf).await.unwrap();
335 assert_eq!(buf.as_slice(), data);
336 drop(read);
337
338 let bad_data: &[u8] = b"you're in a desert";
340 repo.store_target(&path, &mut &*bad_data).await.unwrap();
341 let mut read = repo.fetch_target(&path).await.unwrap();
342 buf.clear();
343 read.read_to_end(&mut buf).await.unwrap();
344 assert_eq!(buf.as_slice(), bad_data);
345 })
346 }
347
348 #[test]
349 fn ephemeral_repo_batch_update() {
350 block_on(async {
351 let repo = EphemeralRepository::<Pouf1>::new();
352
353 let meta_path = MetadataPath::new("meta").unwrap();
354 let meta_version = MetadataVersion::None;
355 let target_path = TargetPath::new("target").unwrap();
356
357 let committed_meta = "committed meta";
359 let committed_target = "committed target";
360
361 repo.store_metadata(&meta_path, meta_version, &mut committed_meta.as_bytes())
362 .await
363 .unwrap();
364
365 repo.store_target(&target_path, &mut committed_target.as_bytes())
366 .await
367 .unwrap();
368
369 let batch = repo.batch_update();
370
371 assert_eq!(
373 fetch_metadata_to_string(&batch, &meta_path, meta_version)
374 .await
375 .unwrap(),
376 committed_meta,
377 );
378 assert_eq!(
379 fetch_target_to_string(&batch, &target_path).await.unwrap(),
380 committed_target,
381 );
382
383 let staged_meta = "staged meta";
385 let staged_target = "staged target";
386 batch
387 .store_metadata(&meta_path, meta_version, &mut staged_meta.as_bytes())
388 .await
389 .unwrap();
390 batch
391 .store_target(&target_path, &mut staged_target.as_bytes())
392 .await
393 .unwrap();
394
395 assert_eq!(
397 fetch_metadata_to_string(&batch, &meta_path, meta_version)
398 .await
399 .unwrap(),
400 staged_meta,
401 );
402 assert_eq!(
403 fetch_target_to_string(&batch, &target_path).await.unwrap(),
404 staged_target,
405 );
406
407 drop(batch);
410
411 assert_eq!(
412 fetch_metadata_to_string(&repo, &meta_path, meta_version)
413 .await
414 .unwrap(),
415 committed_meta,
416 );
417 assert_eq!(
418 fetch_target_to_string(&repo, &target_path).await.unwrap(),
419 committed_target,
420 );
421
422 let batch = repo.batch_update();
424 batch
425 .store_metadata(&meta_path, meta_version, &mut staged_meta.as_bytes())
426 .await
427 .unwrap();
428 batch
429 .store_target(&target_path, &mut staged_target.as_bytes())
430 .await
431 .unwrap();
432
433 batch.commit().await.unwrap();
434
435 assert_eq!(
437 fetch_metadata_to_string(&repo, &meta_path, meta_version)
438 .await
439 .unwrap(),
440 staged_meta,
441 );
442 assert_eq!(
443 fetch_target_to_string(&repo, &target_path).await.unwrap(),
444 staged_target,
445 );
446 })
447 }
448
449 #[test]
450 fn ephemeral_repo_batch_commit_fails_with_metadata_conflicts() {
451 block_on(async {
452 let repo = EphemeralRepository::<Pouf1>::new();
453
454 let batch = repo.batch_update();
456
457 repo.store_metadata(
458 &MetadataPath::new("meta1").unwrap(),
459 MetadataVersion::None,
460 &mut "meta1".as_bytes(),
461 )
462 .await
463 .unwrap();
464
465 assert_matches!(batch.commit().await, Err(CommitError::Conflict));
466
467 let batch = repo.batch_update();
469
470 repo.store_metadata(
471 &MetadataPath::new("meta2").unwrap(),
472 MetadataVersion::None,
473 &mut "meta2".as_bytes(),
474 )
475 .await
476 .unwrap();
477
478 batch
479 .store_metadata(
480 &MetadataPath::new("meta3").unwrap(),
481 MetadataVersion::None,
482 &mut "meta3".as_bytes(),
483 )
484 .await
485 .unwrap();
486
487 assert_matches!(batch.commit().await, Err(CommitError::Conflict));
488 })
489 }
490
491 #[test]
492 fn ephemeral_repo_batch_commit_fails_with_target_conflicts() {
493 block_on(async {
494 let repo = EphemeralRepository::<Pouf1>::new();
495
496 let batch = repo.batch_update();
498
499 repo.store_target(
500 &TargetPath::new("target1").unwrap(),
501 &mut "target1".as_bytes(),
502 )
503 .await
504 .unwrap();
505
506 assert_matches!(batch.commit().await, Err(CommitError::Conflict));
507
508 let batch = repo.batch_update();
510
511 repo.store_target(
512 &TargetPath::new("target2").unwrap(),
513 &mut "target2".as_bytes(),
514 )
515 .await
516 .unwrap();
517
518 batch
519 .store_target(
520 &TargetPath::new("target3").unwrap(),
521 &mut "target3".as_bytes(),
522 )
523 .await
524 .unwrap();
525
526 assert_matches!(batch.commit().await, Err(CommitError::Conflict));
527
528 let batch1 = repo.batch_update();
530 let batch2 = repo.batch_update();
531
532 batch1
533 .store_target(
534 &TargetPath::new("target4").unwrap(),
535 &mut "target4".as_bytes(),
536 )
537 .await
538 .unwrap();
539
540 batch2
541 .store_target(
542 &TargetPath::new("target5").unwrap(),
543 &mut "target5".as_bytes(),
544 )
545 .await
546 .unwrap();
547
548 assert_matches!(batch1.commit().await, Ok(()));
549 assert_matches!(batch2.commit().await, Err(CommitError::Conflict));
550 })
551 }
552}