Skip to main content

tuf/repository/
ephemeral.rs

1//! Repository implementation backed by memory
2
3use {
4    crate::{
5        error::Error,
6        metadata::{MetadataPath, MetadataVersion, TargetPath},
7        pouf::Pouf,
8        repository::{RepositoryProvider, RepositoryStorage},
9        Result,
10    },
11    futures_io::AsyncRead,
12    futures_util::{
13        future::{BoxFuture, FutureExt},
14        io::{AsyncReadExt, Cursor},
15    },
16    std::{
17        collections::HashMap,
18        marker::PhantomData,
19        sync::{Arc, RwLock},
20    },
21};
22
23/// An ephemeral repository contained solely in memory.
24#[derive(Debug, Default)]
25pub struct EphemeralRepository<D> {
26    inner: RwLock<Inner>,
27    _pouf: PhantomData<D>,
28}
29
30type MetadataMap = HashMap<(MetadataPath, MetadataVersion), Arc<[u8]>>;
31type TargetsMap = HashMap<TargetPath, Arc<[u8]>>;
32
33#[derive(Debug, Default)]
34struct Inner {
35    version: u64,
36    metadata: MetadataMap,
37    targets: TargetsMap,
38}
39
40impl<D> EphemeralRepository<D>
41where
42    D: Pouf,
43{
44    /// Create a new ephemeral repository.
45    pub fn new() -> Self {
46        Self {
47            inner: RwLock::new(Inner {
48                version: 0,
49                metadata: MetadataMap::new(),
50                targets: TargetsMap::new(),
51            }),
52            _pouf: PhantomData,
53        }
54    }
55
56    /// Returns a [EphemeralBatchUpdate] for manipulating this repository. This allows callers to
57    /// stage a number of mutations, and optionally atomically write them all at once.
58    pub fn batch_update(&self) -> EphemeralBatchUpdate<'_, D> {
59        EphemeralBatchUpdate {
60            initial_parent_version: self.inner.read().unwrap().version,
61            parent_repo: &self.inner,
62            staging_repo: RwLock::new(Inner {
63                version: 0,
64                metadata: MetadataMap::new(),
65                targets: TargetsMap::new(),
66            }),
67            _pouf: self._pouf,
68        }
69    }
70
71    #[cfg(test)]
72    pub(crate) fn metadata(&self) -> MetadataMap {
73        self.inner.read().unwrap().metadata.clone()
74    }
75}
76
77impl<D> RepositoryProvider<D> for EphemeralRepository<D>
78where
79    D: Pouf,
80{
81    fn fetch_metadata<'a>(
82        &'a self,
83        meta_path: &MetadataPath,
84        version: MetadataVersion,
85    ) -> BoxFuture<'a, Result<Box<dyn AsyncRead + Send + Unpin + 'a>>> {
86        let bytes = match self
87            .inner
88            .read()
89            .unwrap()
90            .metadata
91            .get(&(meta_path.clone(), version))
92        {
93            Some(bytes) => Ok(Arc::clone(bytes)),
94            None => Err(Error::MetadataNotFound {
95                path: meta_path.clone(),
96                version,
97            }),
98        };
99        bytes_to_reader(bytes).boxed()
100    }
101
102    fn fetch_target<'a>(
103        &'a self,
104        target_path: &TargetPath,
105    ) -> BoxFuture<'a, Result<Box<dyn AsyncRead + Send + Unpin + 'a>>> {
106        let bytes = match self.inner.read().unwrap().targets.get(target_path) {
107            Some(bytes) => Ok(Arc::clone(bytes)),
108            None => Err(Error::TargetNotFound(target_path.clone())),
109        };
110        bytes_to_reader(bytes).boxed()
111    }
112}
113
114impl<D> RepositoryStorage<D> for EphemeralRepository<D>
115where
116    D: Pouf,
117{
118    fn store_metadata<'a>(
119        &'a self,
120        meta_path: &MetadataPath,
121        version: MetadataVersion,
122        metadata: &'a mut (dyn AsyncRead + Send + Unpin + 'a),
123    ) -> BoxFuture<'a, Result<()>> {
124        store_metadata(&self.inner, meta_path, version, metadata)
125    }
126
127    fn store_target<'a>(
128        &'a self,
129        target_path: &TargetPath,
130        read: &'a mut (dyn AsyncRead + Send + Unpin + 'a),
131    ) -> BoxFuture<'a, Result<()>> {
132        store_target(&self.inner, target_path, read)
133    }
134}
135
136/// [EphemeralBatchUpdate] is a special repository that is designed to write the metadata and
137/// targets to an [EphemeralRepository] in a single batch.
138///
139/// Note: `EphemeralBatchUpdate::commit()` must be called in order to write the metadata and
140/// targets to the [EphemeralRepository]. Otherwise any queued changes will be lost on drop.
141#[derive(Debug)]
142pub struct EphemeralBatchUpdate<'a, D> {
143    initial_parent_version: u64,
144    parent_repo: &'a RwLock<Inner>,
145    staging_repo: RwLock<Inner>,
146    _pouf: PhantomData<D>,
147}
148
149/// Conflict occurred during commit.
150#[derive(Debug, thiserror::Error)]
151pub enum CommitError {
152    // Conflicting change occurred during commit.
153    #[error("conflicting change occurred during commit")]
154    Conflict,
155}
156
157impl<D> EphemeralBatchUpdate<'_, D>
158where
159    D: Pouf,
160{
161    /// Write all the metadata and targets in the [EphemeralBatchUpdate] to the source
162    /// [EphemeralRepository] in a single batch operation.
163    pub async fn commit(self) -> std::result::Result<(), CommitError> {
164        let mut parent_repo = self.parent_repo.write().unwrap();
165
166        // Check if the parent changed while this batch was being processed.
167        if self.initial_parent_version != parent_repo.version {
168            return Err(CommitError::Conflict);
169        }
170
171        // Since parent hasn't changed, merged everything we wrote into its tables.
172        let staging_repo = self.staging_repo.into_inner().unwrap();
173        parent_repo.metadata.extend(staging_repo.metadata);
174        parent_repo.targets.extend(staging_repo.targets);
175
176        // Increment the version number because we modified the repository.
177        parent_repo.version += 1;
178
179        Ok(())
180    }
181}
182
183impl<D> RepositoryProvider<D> for EphemeralBatchUpdate<'_, D>
184where
185    D: Pouf,
186{
187    fn fetch_metadata<'a>(
188        &'a self,
189        meta_path: &MetadataPath,
190        version: MetadataVersion,
191    ) -> BoxFuture<'a, Result<Box<dyn AsyncRead + Send + Unpin + 'a>>> {
192        let key = (meta_path.clone(), version);
193        let bytes = if let Some(bytes) = self.staging_repo.read().unwrap().metadata.get(&key) {
194            Ok(Arc::clone(bytes))
195        } else {
196            self.parent_repo
197                .read()
198                .unwrap()
199                .metadata
200                .get(&key)
201                .cloned()
202                .ok_or_else(|| Error::MetadataNotFound {
203                    path: meta_path.clone(),
204                    version,
205                })
206        };
207        bytes_to_reader(bytes).boxed()
208    }
209
210    fn fetch_target<'a>(
211        &'a self,
212        target_path: &TargetPath,
213    ) -> BoxFuture<'a, Result<Box<dyn AsyncRead + Send + Unpin + 'a>>> {
214        let bytes = if let Some(bytes) = self.staging_repo.read().unwrap().targets.get(target_path)
215        {
216            Ok(Arc::clone(bytes))
217        } else {
218            self.parent_repo
219                .read()
220                .unwrap()
221                .targets
222                .get(target_path)
223                .cloned()
224                .ok_or_else(|| Error::TargetNotFound(target_path.clone()))
225        };
226        bytes_to_reader(bytes).boxed()
227    }
228}
229
230impl<D> RepositoryStorage<D> for EphemeralBatchUpdate<'_, D>
231where
232    D: Pouf,
233{
234    fn store_metadata<'a>(
235        &'a self,
236        meta_path: &MetadataPath,
237        version: MetadataVersion,
238        metadata: &'a mut (dyn AsyncRead + Send + Unpin),
239    ) -> BoxFuture<'a, Result<()>> {
240        store_metadata(&self.staging_repo, meta_path, version, metadata)
241    }
242
243    fn store_target<'a>(
244        &'a self,
245        target_path: &TargetPath,
246        read: &'a mut (dyn AsyncRead + Send + Unpin),
247    ) -> BoxFuture<'a, Result<()>> {
248        store_target(&self.staging_repo, target_path, read)
249    }
250}
251
252fn store_metadata<'a>(
253    inner: &'a RwLock<Inner>,
254    meta_path: &MetadataPath,
255    version: MetadataVersion,
256    metadata: &'a mut (dyn AsyncRead + Send + Unpin),
257) -> BoxFuture<'a, Result<()>> {
258    let meta_path = meta_path.clone();
259    async move {
260        let mut buf = Vec::new();
261        metadata.read_to_end(&mut buf).await?;
262        buf.shrink_to_fit();
263
264        let mut inner = inner.write().unwrap();
265
266        inner.metadata.insert((meta_path, version), buf.into());
267
268        // Increment the version since we changed.
269        inner.version += 1;
270
271        Ok(())
272    }
273    .boxed()
274}
275
276fn store_target<'a>(
277    inner: &'a RwLock<Inner>,
278    target_path: &TargetPath,
279    read: &'a mut (dyn AsyncRead + Send + Unpin + 'a),
280) -> BoxFuture<'a, Result<()>> {
281    let target_path = target_path.clone();
282    async move {
283        let mut buf = Vec::new();
284        read.read_to_end(&mut buf).await?;
285        buf.shrink_to_fit();
286
287        let mut inner = inner.write().unwrap();
288
289        inner.targets.insert(target_path, buf.into());
290
291        // Increment the version since we changed.
292        inner.version += 1;
293
294        Ok(())
295    }
296    .boxed()
297}
298
299#[allow(clippy::borrowed_box)]
300async fn bytes_to_reader<'a>(
301    bytes: Result<Arc<[u8]>>,
302) -> Result<Box<dyn AsyncRead + Send + Unpin + 'a>> {
303    let bytes = bytes?;
304    let reader: Box<dyn AsyncRead + Send + Unpin> = Box::new(Cursor::new(bytes));
305    Ok(reader)
306}
307
308#[cfg(test)]
309mod test {
310    use super::*;
311    use crate::pouf::Pouf1;
312    use crate::repository::{fetch_metadata_to_string, fetch_target_to_string};
313    use assert_matches::assert_matches;
314    use futures_executor::block_on;
315
316    #[test]
317    fn ephemeral_repo_targets() {
318        block_on(async {
319            let repo = EphemeralRepository::<Pouf1>::new();
320
321            let path = TargetPath::new("batty").unwrap();
322            if let Err(err) = repo.fetch_target(&path).await {
323                assert_matches!(err, Error::TargetNotFound(p) if p == path);
324            } else {
325                panic!("expected fetch_target to fail");
326            }
327
328            let data: &[u8] = b"like tears in the rain";
329            let path = TargetPath::new("batty").unwrap();
330            repo.store_target(&path, &mut &*data).await.unwrap();
331
332            let mut read = repo.fetch_target(&path).await.unwrap();
333            let mut buf = Vec::new();
334            read.read_to_end(&mut buf).await.unwrap();
335            assert_eq!(buf.as_slice(), data);
336            drop(read);
337
338            // RepositoryProvider implementations do not guarantee data is not corrupt.
339            let bad_data: &[u8] = b"you're in a desert";
340            repo.store_target(&path, &mut &*bad_data).await.unwrap();
341            let mut read = repo.fetch_target(&path).await.unwrap();
342            buf.clear();
343            read.read_to_end(&mut buf).await.unwrap();
344            assert_eq!(buf.as_slice(), bad_data);
345        })
346    }
347
348    #[test]
349    fn ephemeral_repo_batch_update() {
350        block_on(async {
351            let repo = EphemeralRepository::<Pouf1>::new();
352
353            let meta_path = MetadataPath::new("meta").unwrap();
354            let meta_version = MetadataVersion::None;
355            let target_path = TargetPath::new("target").unwrap();
356
357            // First, write some stuff to the repository.
358            let committed_meta = "committed meta";
359            let committed_target = "committed target";
360
361            repo.store_metadata(&meta_path, meta_version, &mut committed_meta.as_bytes())
362                .await
363                .unwrap();
364
365            repo.store_target(&target_path, &mut committed_target.as_bytes())
366                .await
367                .unwrap();
368
369            let batch = repo.batch_update();
370
371            // Make sure we can read back the committed stuff.
372            assert_eq!(
373                fetch_metadata_to_string(&batch, &meta_path, meta_version)
374                    .await
375                    .unwrap(),
376                committed_meta,
377            );
378            assert_eq!(
379                fetch_target_to_string(&batch, &target_path).await.unwrap(),
380                committed_target,
381            );
382
383            // Next, stage some stuff in the batch_update.
384            let staged_meta = "staged meta";
385            let staged_target = "staged target";
386            batch
387                .store_metadata(&meta_path, meta_version, &mut staged_meta.as_bytes())
388                .await
389                .unwrap();
390            batch
391                .store_target(&target_path, &mut staged_target.as_bytes())
392                .await
393                .unwrap();
394
395            // Make sure it got staged.
396            assert_eq!(
397                fetch_metadata_to_string(&batch, &meta_path, meta_version)
398                    .await
399                    .unwrap(),
400                staged_meta,
401            );
402            assert_eq!(
403                fetch_target_to_string(&batch, &target_path).await.unwrap(),
404                staged_target,
405            );
406
407            // Next, drop the batch_update. We shouldn't have written the data back to the
408            // repository.
409            drop(batch);
410
411            assert_eq!(
412                fetch_metadata_to_string(&repo, &meta_path, meta_version)
413                    .await
414                    .unwrap(),
415                committed_meta,
416            );
417            assert_eq!(
418                fetch_target_to_string(&repo, &target_path).await.unwrap(),
419                committed_target,
420            );
421
422            // Do the batch_update again, but this time write the data.
423            let batch = repo.batch_update();
424            batch
425                .store_metadata(&meta_path, meta_version, &mut staged_meta.as_bytes())
426                .await
427                .unwrap();
428            batch
429                .store_target(&target_path, &mut staged_target.as_bytes())
430                .await
431                .unwrap();
432
433            batch.commit().await.unwrap();
434
435            // Make sure the new data got to the repository.
436            assert_eq!(
437                fetch_metadata_to_string(&repo, &meta_path, meta_version)
438                    .await
439                    .unwrap(),
440                staged_meta,
441            );
442            assert_eq!(
443                fetch_target_to_string(&repo, &target_path).await.unwrap(),
444                staged_target,
445            );
446        })
447    }
448
449    #[test]
450    fn ephemeral_repo_batch_commit_fails_with_metadata_conflicts() {
451        block_on(async {
452            let repo = EphemeralRepository::<Pouf1>::new();
453
454            // commit() fails if we did nothing to the batch, but the repo changed.
455            let batch = repo.batch_update();
456
457            repo.store_metadata(
458                &MetadataPath::new("meta1").unwrap(),
459                MetadataVersion::None,
460                &mut "meta1".as_bytes(),
461            )
462            .await
463            .unwrap();
464
465            assert_matches!(batch.commit().await, Err(CommitError::Conflict));
466
467            // writing to both the repo and the batch should conflict.
468            let batch = repo.batch_update();
469
470            repo.store_metadata(
471                &MetadataPath::new("meta2").unwrap(),
472                MetadataVersion::None,
473                &mut "meta2".as_bytes(),
474            )
475            .await
476            .unwrap();
477
478            batch
479                .store_metadata(
480                    &MetadataPath::new("meta3").unwrap(),
481                    MetadataVersion::None,
482                    &mut "meta3".as_bytes(),
483                )
484                .await
485                .unwrap();
486
487            assert_matches!(batch.commit().await, Err(CommitError::Conflict));
488        })
489    }
490
491    #[test]
492    fn ephemeral_repo_batch_commit_fails_with_target_conflicts() {
493        block_on(async {
494            let repo = EphemeralRepository::<Pouf1>::new();
495
496            // commit() fails if we did nothing to the batch, but the repo changed.
497            let batch = repo.batch_update();
498
499            repo.store_target(
500                &TargetPath::new("target1").unwrap(),
501                &mut "target1".as_bytes(),
502            )
503            .await
504            .unwrap();
505
506            assert_matches!(batch.commit().await, Err(CommitError::Conflict));
507
508            // writing to both the repo and the batch should conflict.
509            let batch = repo.batch_update();
510
511            repo.store_target(
512                &TargetPath::new("target2").unwrap(),
513                &mut "target2".as_bytes(),
514            )
515            .await
516            .unwrap();
517
518            batch
519                .store_target(
520                    &TargetPath::new("target3").unwrap(),
521                    &mut "target3".as_bytes(),
522                )
523                .await
524                .unwrap();
525
526            assert_matches!(batch.commit().await, Err(CommitError::Conflict));
527
528            // multiple batches should conflict.
529            let batch1 = repo.batch_update();
530            let batch2 = repo.batch_update();
531
532            batch1
533                .store_target(
534                    &TargetPath::new("target4").unwrap(),
535                    &mut "target4".as_bytes(),
536                )
537                .await
538                .unwrap();
539
540            batch2
541                .store_target(
542                    &TargetPath::new("target5").unwrap(),
543                    &mut "target5".as_bytes(),
544                )
545                .await
546                .unwrap();
547
548            assert_matches!(batch1.commit().await, Ok(()));
549            assert_matches!(batch2.commit().await, Err(CommitError::Conflict));
550        })
551    }
552}