Skip to main content

fuchsia_repo/repository/
file_system.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::range::{ContentRange, Range};
6use crate::repository::{Error, RepoProvider, RepoStorage, Resource};
7use crate::util::file_stream;
8use anyhow::{Context as _, Result, anyhow};
9use camino::{Utf8Component, Utf8Path, Utf8PathBuf};
10use delivery_blob::DeliveryBlobType;
11use fuchsia_async as fasync;
12use fuchsia_merkle::Hash;
13use futures::future::BoxFuture;
14use futures::{AsyncRead, FutureExt as _};
15use log::warn;
16use std::collections::BTreeSet;
17use std::fs::{self, DirBuilder};
18use std::io::{Seek as _, SeekFrom};
19use std::os::unix::fs::MetadataExt;
20use std::time::SystemTime;
21use tempfile::{NamedTempFile, TempPath};
22use tuf::metadata::{MetadataPath, MetadataVersion, TargetPath};
23use tuf::pouf::Pouf1;
24use tuf::repository::{
25    FileSystemRepository as TufFileSystemRepository,
26    FileSystemRepositoryBuilder as TufFileSystemRepositoryBuilder,
27    RepositoryProvider as TufRepositoryProvider, RepositoryStorage as TufRepositoryStorage,
28};
29
30#[cfg(not(target_os = "fuchsia"))]
31use {
32    crate::repository::RepositorySpec,
33    futures::{Stream, StreamExt as _, stream::BoxStream},
34    notify::{RecursiveMode, Watcher as _, recommended_watcher},
35    std::{
36        ffi::OsStr,
37        pin::Pin,
38        task::{Context, Poll},
39    },
40};
41
42/// Describes how package blobs should be copied into the repository.
43#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
44pub enum CopyMode {
45    /// Copy package blobs into the repository. This will skip copying the blob if it already exists
46    /// in the repository.
47    ///
48    /// This will create a Copy-on-Write (reflink) on file systems that support it.
49    #[default]
50    Copy,
51
52    /// Copy package blobs into the repository. This will overwrite a blob if it already exists in
53    /// the repository.
54    ///
55    /// This will create a Copy-on-Write (reflink) on file systems that support it.
56    CopyOverwrite,
57
58    /// Create hard links from the package blobs into the repository.
59    HardLink,
60}
61
62/// A builder to create a repository contained on the local file system.
63pub struct FileSystemRepositoryBuilder {
64    metadata_repo_path: Utf8PathBuf,
65    blob_repo_path: Utf8PathBuf,
66    copy_mode: CopyMode,
67    aliases: BTreeSet<String>,
68    delivery_blob_type: DeliveryBlobType,
69}
70
71impl FileSystemRepositoryBuilder {
72    /// Creates a [FileSystemRepositoryBuilder] where the TUF metadata is stored in
73    /// `metadata_repo_path`, and the blobs are stored in `blob_repo_path`.
74    pub fn new(metadata_repo_path: Utf8PathBuf, blob_repo_path: Utf8PathBuf) -> Self {
75        FileSystemRepositoryBuilder {
76            metadata_repo_path,
77            blob_repo_path,
78            copy_mode: CopyMode::Copy,
79            aliases: BTreeSet::new(),
80            delivery_blob_type: DeliveryBlobType::Type1,
81        }
82    }
83
84    /// Select which [CopyMode] to use when copying files into the repository.
85    pub fn copy_mode(mut self, copy_mode: CopyMode) -> Self {
86        self.copy_mode = copy_mode;
87        self
88    }
89
90    /// alias this repository to this name when this repository is registered on a target.
91    pub fn alias(mut self, alias: String) -> Self {
92        self.aliases.insert(alias);
93        self
94    }
95
96    /// alias this repository to these names when this repository is registered on a target.
97    pub fn aliases(mut self, aliases: impl IntoIterator<Item = String>) -> Self {
98        for alias in aliases {
99            self = self.alias(alias);
100        }
101        self
102    }
103
104    /// Set the type of delivery blob to generate when copying blobs into the repository.
105    pub fn delivery_blob_type(mut self, delivery_blob_type: DeliveryBlobType) -> Self {
106        self.delivery_blob_type = delivery_blob_type;
107        self
108    }
109
110    /// Set the path to the blob repo.
111    pub fn blob_repo_path(mut self, blob_repo_path: Utf8PathBuf) -> Self {
112        self.blob_repo_path = blob_repo_path;
113        self
114    }
115
116    /// Build a [FileSystemRepository].
117    pub fn build(self) -> FileSystemRepository {
118        FileSystemRepository {
119            metadata_repo_path: self.metadata_repo_path.clone(),
120            blob_repo_path: self.blob_repo_path,
121            copy_mode: self.copy_mode,
122            aliases: self.aliases,
123            delivery_blob_type: self.delivery_blob_type,
124            tuf_repo: TufFileSystemRepositoryBuilder::new(self.metadata_repo_path)
125                .targets_prefix("targets")
126                .build(),
127        }
128    }
129}
130
131/// Serve a repository from the file system.
132#[derive(Debug)]
133pub struct FileSystemRepository {
134    metadata_repo_path: Utf8PathBuf,
135    blob_repo_path: Utf8PathBuf,
136    copy_mode: CopyMode,
137    aliases: BTreeSet<String>,
138    delivery_blob_type: DeliveryBlobType,
139    tuf_repo: TufFileSystemRepository<Pouf1>,
140}
141
142impl FileSystemRepository {
143    /// Construct a [FileSystemRepositoryBuilder].
144    pub fn builder(
145        metadata_repo_path: Utf8PathBuf,
146        blob_repo_path: Utf8PathBuf,
147    ) -> FileSystemRepositoryBuilder {
148        FileSystemRepositoryBuilder::new(metadata_repo_path, blob_repo_path)
149    }
150
151    /// Construct a [FileSystemRepository].
152    pub fn new(metadata_repo_path: Utf8PathBuf, blob_repo_path: Utf8PathBuf) -> Self {
153        Self::builder(metadata_repo_path, blob_repo_path).build()
154    }
155
156    pub fn blob_repo_path(&self) -> &Utf8PathBuf {
157        &self.blob_repo_path
158    }
159
160    fn fetch<'a>(
161        &'a self,
162        repo_path: &Utf8Path,
163        resource_path: &str,
164        range: Range,
165    ) -> BoxFuture<'a, Result<Resource, Error>> {
166        let file_path = sanitize_path(repo_path, resource_path);
167        async move {
168            let file_path = file_path?;
169            let mut file = std::fs::File::open(&file_path).map_err(|err| {
170                if err.kind() == std::io::ErrorKind::NotFound {
171                    Error::NotFound
172                } else {
173                    Error::Io(err)
174                }
175            })?;
176
177            let total_len = file.metadata().map_err(Error::Io)?.len();
178
179            let content_range = match range {
180                Range::Full => ContentRange::Full { complete_len: total_len },
181                Range::Inclusive { first_byte_pos, last_byte_pos } => {
182                    if first_byte_pos > last_byte_pos
183                        || first_byte_pos >= total_len
184                        || last_byte_pos >= total_len
185                    {
186                        return Err(Error::RangeNotSatisfiable);
187                    }
188
189                    file.seek(SeekFrom::Start(first_byte_pos)).map_err(Error::Io)?;
190
191                    ContentRange::Inclusive {
192                        first_byte_pos,
193                        last_byte_pos,
194                        complete_len: total_len,
195                    }
196                }
197                Range::From { first_byte_pos } => {
198                    if first_byte_pos >= total_len {
199                        return Err(Error::RangeNotSatisfiable);
200                    }
201
202                    file.seek(SeekFrom::Start(first_byte_pos)).map_err(Error::Io)?;
203
204                    ContentRange::Inclusive {
205                        first_byte_pos,
206                        last_byte_pos: total_len - 1,
207                        complete_len: total_len,
208                    }
209                }
210                Range::Suffix { len } => {
211                    if len > total_len {
212                        return Err(Error::RangeNotSatisfiable);
213                    }
214                    let start = total_len - len;
215                    file.seek(SeekFrom::Start(start)).map_err(Error::Io)?;
216
217                    ContentRange::Inclusive {
218                        first_byte_pos: start,
219                        last_byte_pos: total_len - 1,
220                        complete_len: total_len,
221                    }
222                }
223            };
224
225            let content_len = content_range.content_len();
226
227            Ok(Resource {
228                content_range,
229                stream: Box::pin(file_stream(content_len, file, Some(file_path))),
230            })
231        }
232        .boxed()
233    }
234}
235
236impl RepoProvider for FileSystemRepository {
237    #[cfg(not(target_os = "fuchsia"))]
238    fn spec(&self) -> RepositorySpec {
239        RepositorySpec::FileSystem {
240            metadata_repo_path: self.metadata_repo_path.clone(),
241            blob_repo_path: self.blob_repo_path.clone(),
242            aliases: self.aliases.clone(),
243        }
244    }
245
246    fn aliases(&self) -> &BTreeSet<String> {
247        &self.aliases
248    }
249
250    fn fetch_metadata_range<'a>(
251        &'a self,
252        resource_path: &str,
253        range: Range,
254    ) -> BoxFuture<'a, Result<Resource, Error>> {
255        self.fetch(&self.metadata_repo_path, resource_path, range)
256    }
257
258    fn fetch_blob_range<'a>(
259        &'a self,
260        resource_path: &str,
261        range: Range,
262    ) -> BoxFuture<'a, Result<Resource, Error>> {
263        self.fetch(&self.blob_repo_path, resource_path, range)
264    }
265
266    #[cfg(not(target_os = "fuchsia"))]
267    fn supports_watch(&self) -> bool {
268        true
269    }
270
271    #[cfg(not(target_os = "fuchsia"))]
272    fn watch(&self) -> Result<BoxStream<'static, ()>> {
273        // Since all we are doing is signaling that the timestamp file is changed, it's it's fine
274        // if the channel is full, since that just means we haven't consumed our notice yet.
275        let (mut sender, receiver) = futures::channel::mpsc::channel(1);
276
277        let mut watcher = recommended_watcher(move |event: notify::Result<notify::Event>| {
278            let event = match event {
279                Ok(event) => event,
280                Err(err) => {
281                    warn!("error receving notify event: {}", err);
282                    return;
283                }
284            };
285
286            // Send an event if any applied to timestamp.json.
287            let timestamp_name = OsStr::new("timestamp.json");
288            if event.paths.iter().any(|p| p.file_name() == Some(timestamp_name))
289                && let Err(e) = sender.try_send(())
290            {
291                if e.is_full() {
292                    // It's okay to ignore a full channel, since that just means that the other
293                    // side of the channel still has an outstanding notice, which should be the
294                    // same effect if we re-sent the event.
295                } else if !e.is_disconnected() {
296                    warn!("Error sending event: {:?}", e);
297                }
298            }
299        })?;
300
301        // Watch the repo path instead of directly watching timestamp.json to avoid
302        // https://github.com/notify-rs/notify/issues/165.
303        watcher.watch(self.metadata_repo_path.as_std_path(), RecursiveMode::NonRecursive)?;
304
305        Ok(WatchStream { _watcher: watcher, receiver }.boxed())
306    }
307
308    fn blob_modification_time<'a>(
309        &'a self,
310        path: &str,
311    ) -> BoxFuture<'a, Result<Option<SystemTime>>> {
312        let file_path = sanitize_path(&self.blob_repo_path, path);
313        async move {
314            let file_path = file_path?;
315            Ok(Some(fs::metadata(&file_path)?.modified()?))
316        }
317        .boxed()
318    }
319
320    fn blob_type(&self) -> DeliveryBlobType {
321        self.delivery_blob_type
322    }
323}
324
325impl TufRepositoryProvider<Pouf1> for FileSystemRepository {
326    fn fetch_metadata<'a>(
327        &'a self,
328        meta_path: &MetadataPath,
329        version: MetadataVersion,
330    ) -> BoxFuture<'a, tuf::Result<Box<dyn AsyncRead + Send + Unpin + 'a>>> {
331        self.tuf_repo.fetch_metadata(meta_path, version)
332    }
333
334    fn fetch_target<'a>(
335        &'a self,
336        target_path: &TargetPath,
337    ) -> BoxFuture<'a, tuf::Result<Box<dyn AsyncRead + Send + Unpin + 'a>>> {
338        self.tuf_repo.fetch_target(target_path)
339    }
340}
341
342impl TufRepositoryStorage<Pouf1> for FileSystemRepository {
343    fn store_metadata<'a>(
344        &'a self,
345        meta_path: &MetadataPath,
346        version: MetadataVersion,
347        metadata: &'a mut (dyn AsyncRead + Send + Unpin + 'a),
348    ) -> BoxFuture<'a, tuf::Result<()>> {
349        self.tuf_repo.store_metadata(meta_path, version, metadata)
350    }
351
352    fn store_target<'a>(
353        &'a self,
354        target_path: &TargetPath,
355        target: &'a mut (dyn AsyncRead + Send + Unpin + 'a),
356    ) -> BoxFuture<'a, tuf::Result<()>> {
357        self.tuf_repo.store_target(target_path, target)
358    }
359}
360
361impl RepoStorage for FileSystemRepository {
362    fn store_blob<'a>(
363        &'a self,
364        hash: &Hash,
365        len: u64,
366        src: &Utf8Path,
367    ) -> BoxFuture<'a, Result<()>> {
368        let src = src.to_path_buf();
369        let hash_str = hash.to_string();
370        let hash = *hash;
371
372        async move {
373            let src_metadata = fs::metadata(&src)?;
374            if src_metadata.len() != len {
375                return Err(anyhow!(BlobSizeMismatchError {
376                    hash,
377                    path: src.clone(),
378                    manifest_size: len,
379                    file_size: src_metadata.len(),
380                }));
381            }
382
383            let dst = sanitize_path(
384                &self.blob_repo_path,
385                &format!("{}/{hash_str}", u32::from(self.delivery_blob_type)),
386            )?;
387            let existing_len = match fs::File::open(&dst) {
388                Ok(file) => {
389                    if let Ok(len) = delivery_blob::decompressed_size_from_reader(file) {
390                        Some(len)
391                    } else {
392                        // In the event that the delivery blob is corrupt, log a warning and
393                        // return None to signify that it needs to be written.
394                        warn!("corrupt delivery blob found at {dst}, overwriting");
395                        None
396                    }
397                }
398                Err(e) if e.kind() == std::io::ErrorKind::NotFound => None,
399                Err(e) => return Err(anyhow!(e)),
400            };
401
402            if self.copy_mode == CopyMode::CopyOverwrite || existing_len != Some(len) {
403                generate_delivery_blob(&src, &dst, self.delivery_blob_type).await?;
404            }
405
406            Ok(())
407        }
408        .boxed()
409    }
410
411    fn store_delivery_blob<'a>(
412        &'a self,
413        hash: &Hash,
414        src: &Utf8Path,
415        delivery_blob_type: DeliveryBlobType,
416    ) -> BoxFuture<'a, Result<()>> {
417        let src = src.to_path_buf();
418        let hash = *hash;
419
420        async move {
421            if delivery_blob_type != self.delivery_blob_type {
422                warn!(
423                    "storing delivery blob type {:?} in repository with delivery blob type {:?}",
424                    delivery_blob_type, self.delivery_blob_type,
425                );
426                // TODO: convert the delivery blob to the expected type?
427            }
428            let dst = sanitize_path(
429                &self.blob_repo_path,
430                &format!("{}/{hash}", u32::from(delivery_blob_type)),
431            )?;
432
433            let src_metadata = fs::metadata(&src)?;
434            let dst_metadata = match fs::metadata(&dst) {
435                Ok(metadata) => Some(metadata),
436                Err(e) if e.kind() == std::io::ErrorKind::NotFound => None,
437                Err(e) => return Err(anyhow!(e)),
438            };
439
440            let dst_is_hardlink = if let Some(dst_metadata) = &dst_metadata {
441                dst_metadata.nlink() > 1
442            } else {
443                false
444            };
445
446            let dst_exists = dst_metadata.is_some();
447            let dst_dirty = !dst_exists;
448
449            match self.copy_mode {
450                CopyMode::Copy => {
451                    if dst_dirty || dst_is_hardlink {
452                        copy_blob(&src, &dst).await?
453                    }
454                }
455                CopyMode::CopyOverwrite => copy_blob(&src, &dst).await?,
456                CopyMode::HardLink => {
457                    let is_hardlink = if let Some(dst_metadata) = &dst_metadata {
458                        src_metadata.dev() == dst_metadata.dev()
459                            && src_metadata.ino() == dst_metadata.ino()
460                    } else {
461                        false
462                    };
463
464                    if is_hardlink {
465                        // No work to do if src and dest are already hardlinks.
466                    } else {
467                        // Create the parent directory if it doesn't yet exist.
468                        if let Some(parent) = dst.parent() {
469                            std::fs::create_dir_all(parent)?;
470                        }
471                        match fs::hard_link(&src, &dst) {
472                            Ok(()) => {
473                                // FIXME(b/271694204): Workaround an unknown issue where hardlinks
474                                // aren't readable immediately after creation in some environments.
475                                if fs::metadata(&dst).is_err() {
476                                    fuchsia_async::Timer::new(std::time::Duration::from_secs(1))
477                                        .await;
478                                    if fs::metadata(&dst).is_err() {
479                                        copy_blob(&src, &dst).await?
480                                    }
481                                }
482                            }
483                            Err(_) if dst_dirty => copy_blob(&src, &dst).await?,
484                            Err(_) => {
485                                // The dest file exists and has the right size,
486                                // but we failed to make it a hardlink.
487                            }
488                        }
489                    }
490                }
491            }
492            Ok(())
493        }
494        .boxed()
495    }
496}
497
498async fn create_temp_file(path: &Utf8Path) -> Result<TempPath> {
499    let temp_file = if let Some(parent) = path.parent() {
500        DirBuilder::new().recursive(true).create(parent)?;
501
502        NamedTempFile::new_in(parent)?
503    } else {
504        NamedTempFile::new_in(".")?
505    };
506
507    Ok(temp_file.into_temp_path())
508}
509
510// Set the blob at `path` to be read-only.
511async fn set_blob_read_only(path: &Utf8Path) -> Result<()> {
512    let file = fs::File::open(path)?;
513    let mut permissions = file.metadata()?.permissions();
514    permissions.set_readonly(true);
515    file.set_permissions(permissions)?;
516
517    Ok(())
518}
519
520// Performs a Copy-on-Write (reflink) of the file at `src_path` to `dst_path`.
521#[cfg(target_os = "linux")]
522async fn reflink(src_path: &Utf8Path, dst_path: &Utf8Path) -> Result<(), std::io::Error> {
523    use std::os::fd::AsRawFd;
524
525    let src = fs::File::open(src_path)?;
526    let dst = fs::File::create(dst_path)?;
527
528    // Safe because this is a synchronous syscall and the raw fds don't outlive the call.
529    let res = unsafe { libc::ioctl(dst.as_raw_fd(), libc::FICLONE, src.as_raw_fd()) };
530
531    match res {
532        -1 => {
533            let err = std::io::Error::last_os_error();
534
535            drop(dst);
536            let _ = fs::remove_file(dst_path);
537
538            match err.raw_os_error().unwrap() {
539                // The filesystem does not support reflinks.
540                libc::EOPNOTSUPP |
541                // src_path and dst_path are different filesystems.
542                libc::EXDEV |
543                // An invalid ioctl number was specified in an ioctl system call.
544                libc::ENOTTY => {
545                    Err(std::io::Error::new(std::io::ErrorKind::Unsupported, err))
546                }
547                _ => Err(err),
548            }
549        }
550        _ => Ok(()),
551    }
552}
553
554#[cfg(not(target_os = "linux"))]
555async fn reflink(_src_path: &Utf8Path, _dst_path: &Utf8Path) -> Result<(), std::io::Error> {
556    use libc as _;
557    Err(std::io::Error::from(std::io::ErrorKind::Unsupported))
558}
559
560async fn copy_blob(src: &Utf8Path, dst: &Utf8Path) -> Result<()> {
561    let temp_path = create_temp_file(dst).await?;
562    match reflink(src, (*temp_path).try_into()?).await {
563        Ok(()) => {}
564        Err(e) if e.kind() == std::io::ErrorKind::Unsupported => {
565            let src = src.to_owned();
566            let temp_path = temp_path.to_path_buf();
567            fasync::unblock(move || fs::copy(src, &temp_path)).await?;
568        }
569        Err(e) => return Err(anyhow!(e)),
570    }
571    temp_path.persist(dst)?;
572
573    set_blob_read_only(dst).await
574}
575
576pub(crate) async fn generate_delivery_blob(
577    src: &Utf8Path,
578    dst: &Utf8Path,
579    blob_type: DeliveryBlobType,
580) -> Result<()> {
581    let src_blob = fs::read(src).with_context(|| format!("reading {src}"))?;
582
583    let temp_path = create_temp_file(dst).await?;
584    let file = std::fs::File::create(&temp_path)?;
585    fasync::unblock(move || {
586        delivery_blob::generate_to(blob_type, &src_blob, std::io::BufWriter::new(file))
587    })
588    .await
589    .context("generate delivery blob")?;
590
591    temp_path.persist(dst)?;
592
593    set_blob_read_only(dst).await
594}
595
596#[cfg(not(target_os = "fuchsia"))]
597#[pin_project::pin_project]
598struct WatchStream {
599    _watcher: notify::RecommendedWatcher,
600    #[pin]
601    receiver: futures::channel::mpsc::Receiver<()>,
602}
603
604#[cfg(not(target_os = "fuchsia"))]
605impl Stream for WatchStream {
606    type Item = ();
607    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
608        self.project().receiver.poll_next(cx)
609    }
610}
611
612/// Make sure the resource is inside the repo_path.
613fn sanitize_path(repo_path: &Utf8Path, resource_path: &str) -> Result<Utf8PathBuf, Error> {
614    let resource_path = Utf8Path::new(resource_path);
615
616    let mut parts = vec![];
617    for component in resource_path.components() {
618        match component {
619            Utf8Component::Normal(part) => {
620                parts.push(part);
621            }
622            _ => {
623                warn!("invalid resource_path: {}", resource_path);
624                return Err(Error::InvalidPath(resource_path.into()));
625            }
626        }
627    }
628
629    let path = parts.into_iter().collect::<Utf8PathBuf>();
630    Ok(repo_path.join(path))
631}
632
633#[derive(Debug, thiserror::Error)]
634#[error(
635    "blob {hash} at {path:?} is {file_size} bytes in size, \
636     but the package manifest indicates it should be {manifest_size} bytes in size"
637)]
638struct BlobSizeMismatchError {
639    hash: Hash,
640    path: Utf8PathBuf,
641    manifest_size: u64,
642    file_size: u64,
643}
644
645#[cfg(test)]
646mod tests {
647    use super::*;
648    use crate::repository::repo_tests::{self, TestEnv as _};
649    use crate::util::CHUNK_SIZE;
650    use assert_matches::assert_matches;
651    use fuchsia_async as fasync;
652    use futures::{FutureExt, StreamExt};
653    use std::fs::File;
654    use std::io::Write as _;
655    use std::time::Duration;
656    struct TestEnv {
657        _tmp: tempfile::TempDir,
658        metadata_path: Utf8PathBuf,
659        blob_path: Utf8PathBuf,
660        repo: FileSystemRepository,
661    }
662
663    impl TestEnv {
664        fn new() -> Self {
665            let tmp = tempfile::tempdir().unwrap();
666            let dir = Utf8Path::from_path(tmp.path()).unwrap();
667            let metadata_path = dir.join("metadata");
668            let blob_path = dir.join("blobs");
669            std::fs::create_dir(&metadata_path).unwrap();
670            std::fs::create_dir(&blob_path).unwrap();
671
672            Self {
673                _tmp: tmp,
674                metadata_path: metadata_path.clone(),
675                blob_path: blob_path.clone(),
676                repo: FileSystemRepository::new(metadata_path, blob_path),
677            }
678        }
679    }
680
681    #[async_trait::async_trait]
682    impl repo_tests::TestEnv for TestEnv {
683        fn supports_range(&self) -> bool {
684            true
685        }
686
687        fn write_metadata(&self, path: &str, bytes: &[u8]) {
688            let file_path = self.metadata_path.join(path);
689            let mut f = File::create(file_path).unwrap();
690            f.write_all(bytes).unwrap();
691        }
692
693        fn write_blob(&self, path: &str, bytes: &[u8]) {
694            let file_path = self.blob_path.join(path);
695            let mut f = File::create(file_path).unwrap();
696            f.write_all(bytes).unwrap();
697        }
698
699        fn repo(&self) -> &dyn RepoProvider {
700            &self.repo
701        }
702    }
703
704    repo_tests::repo_test_suite! {
705        env = TestEnv::new();
706        chunk_size = CHUNK_SIZE;
707    }
708
709    #[fuchsia_async::run_singlethreaded(test)]
710    async fn test_blob_modification_time() {
711        let env = TestEnv::new();
712
713        let f = File::create(env.blob_path.join("empty-blob")).unwrap();
714        let blob_mtime = f.metadata().unwrap().modified().unwrap();
715        drop(f);
716
717        assert_matches!(
718            env.repo.blob_modification_time("empty-blob").await,
719            Ok(Some(t)) if t == blob_mtime
720        );
721    }
722
723    #[fuchsia_async::run_singlethreaded(test)]
724    async fn test_reject_invalid_paths() {
725        let env = TestEnv::new();
726        env.write_metadata("empty", b"");
727
728        assert_matches!(repo_tests::read_metadata(&env, "empty", Range::Full).await, Ok(body) if body == b"");
729        assert_matches!(repo_tests::read_metadata(&env, "subdir/../empty", Range::Full).await,
730            Err(Error::InvalidPath(path)) if path == Utf8Path::new("subdir/../empty")
731        );
732    }
733
734    #[fuchsia_async::run_singlethreaded(test)]
735    async fn test_watch() {
736        let env = TestEnv::new();
737
738        // We support watch.
739        assert!(env.repo.supports_watch());
740
741        let mut watch_stream = env.repo.watch().unwrap().fuse();
742
743        // Try to read from the stream. This should not return anything since we haven't created a
744        // file yet.
745        futures::select! {
746            _ = watch_stream.next() => panic!("should not have received an event"),
747            _ = fasync::Timer::new(Duration::from_millis(10)).fuse() => (),
748        };
749
750        // Next, write to the file and make sure we observe an event.
751        env.write_metadata("timestamp.json", br#"{"version":1}"#);
752
753        futures::select! {
754            result = watch_stream.next() => {
755                assert_eq!(result, Some(()));
756            },
757            _ = fasync::Timer::new(Duration::from_secs(10)).fuse() => {
758                panic!("wrote to timestamp.json, but did not get an event");
759            },
760        };
761
762        // Write to the file again and make sure we receive another event.
763        env.write_metadata("timestamp.json", br#"{"version":2}"#);
764
765        futures::select! {
766            result = watch_stream.next() => {
767                assert_eq!(result, Some(()));
768            },
769            _ = fasync::Timer::new(Duration::from_secs(10)).fuse() => {
770                panic!("wrote to timestamp.json, but did not get an event");
771            },
772        };
773
774        // FIXME(https://github.com/notify-rs/notify/pull/337): On OSX, notify uses a
775        // crossbeam-channel in `Drop` to shut down the interior thread. Unfortunately this can
776        // trip over an issue where OSX will tear down the thread local storage before shutting
777        // down the thread, which can trigger a panic. To avoid this issue, sleep a little bit
778        // after shutting down our stream.
779        drop(watch_stream);
780        fasync::Timer::new(Duration::from_millis(100)).await;
781    }
782
783    #[fuchsia_async::run_singlethreaded(test)]
784    async fn test_store_blob_verifies_src_length() {
785        let tmp = tempfile::tempdir().unwrap();
786        let dir = Utf8Path::from_path(tmp.path()).unwrap();
787
788        let metadata_repo_path = dir.join("metadata");
789        let blob_repo_path = dir.join("blobs");
790        std::fs::create_dir(&metadata_repo_path).unwrap();
791        std::fs::create_dir(&blob_repo_path).unwrap();
792
793        let repo = FileSystemRepository::builder(metadata_repo_path, blob_repo_path.clone())
794            .copy_mode(CopyMode::Copy)
795            .build();
796
797        // Store the blob.
798        let contents = b"hello world";
799        let path = dir.join("my-blob");
800        std::fs::write(&path, contents).unwrap();
801
802        let hash = fuchsia_merkle::root_from_slice(contents);
803        let err = repo.store_blob(&hash, contents.len() as u64 + 1, &path).await.unwrap_err();
804        assert_matches!(err.downcast_ref::<BlobSizeMismatchError>(), Some(_));
805    }
806
807    #[fuchsia_async::run_singlethreaded(test)]
808    async fn test_store_blob_copy_detects_length_mismatch() {
809        let tmp = tempfile::tempdir().unwrap();
810        let dir = Utf8Path::from_path(tmp.path()).unwrap();
811
812        let metadata_repo_path = dir.join("metadata");
813        let blob_repo_path = dir.join("blobs");
814        std::fs::create_dir(&metadata_repo_path).unwrap();
815        std::fs::create_dir(&blob_repo_path).unwrap();
816
817        let repo = FileSystemRepository::builder(metadata_repo_path, blob_repo_path.clone())
818            .copy_mode(CopyMode::Copy)
819            .build();
820
821        // The blob contents and its hash.
822        let contents = b"hello world";
823        let hash = fuchsia_merkle::root_from_slice(contents);
824
825        let path = dir.join("my-blob");
826        std::fs::write(&path, contents).unwrap();
827
828        assert_matches!(repo.store_blob(&hash, contents.len() as u64, &path).await, Ok(()));
829
830        // Make sure we can read it back.
831        let blob_path = blob_repo_path.join(format!("1/{hash}"));
832        let delivery_blob = std::fs::read(&blob_path).unwrap();
833        let actual = delivery_blob::decompress(&delivery_blob).unwrap();
834        assert_eq!(&actual, &contents[..]);
835
836        assert!(std::fs::metadata(&blob_path).unwrap().permissions().readonly());
837
838        // Next, overwrite a blob that already exists.
839        let contents2 = b"another hello world";
840        let path2 = dir.join("my-blob2");
841        std::fs::write(&path2, contents2).unwrap();
842        assert_matches!(repo.store_blob(&hash, contents2.len() as u64, &path2).await, Ok(()));
843
844        // Make sure we get the new contents back.
845        let delivery_blob = std::fs::read(&blob_path).unwrap();
846        let actual = delivery_blob::decompress(&delivery_blob).unwrap();
847        assert_eq!(&actual, &contents2[..]);
848    }
849
850    #[fuchsia_async::run_singlethreaded(test)]
851    async fn test_store_blob_copy_skips_present_blobs_of_correct_length() {
852        let tmp = tempfile::tempdir().unwrap();
853        let dir = Utf8Path::from_path(tmp.path()).unwrap();
854
855        let metadata_repo_path = dir.join("metadata");
856        let blob_repo_path = dir.join("blobs");
857        std::fs::create_dir(&metadata_repo_path).unwrap();
858        std::fs::create_dir(&blob_repo_path).unwrap();
859
860        let repo = FileSystemRepository::builder(metadata_repo_path, blob_repo_path.clone())
861            .copy_mode(CopyMode::Copy)
862            .build();
863
864        // Store the blob.
865        let contents = b"hello world.";
866        let path = dir.join("my-blob");
867        std::fs::write(&path, contents).unwrap();
868
869        let hash = fuchsia_merkle::root_from_slice(contents);
870        assert_matches!(repo.store_blob(&hash, contents.len() as u64, &path).await, Ok(()));
871
872        // Make sure we can read it back.
873        let blob_path = blob_repo_path.join(format!("1/{hash}"));
874        let delivery_blob = std::fs::read(&blob_path).unwrap();
875        let actual = delivery_blob::decompress(&delivery_blob).unwrap();
876        assert_eq!(&actual, &contents[..]);
877
878        assert!(std::fs::metadata(&blob_path).unwrap().permissions().readonly());
879
880        // Next, we won't overwrite a blob that already exists.
881        let contents2 = b"Hello World!";
882        let path2 = dir.join("my-blob2");
883        std::fs::write(&path2, contents2).unwrap();
884        assert_matches!(repo.store_blob(&hash, contents2.len() as u64, &path2).await, Ok(()));
885
886        // Make sure we get the original contents back.
887        let delivery_blob = std::fs::read(&blob_path).unwrap();
888        let actual = delivery_blob::decompress(&delivery_blob).unwrap();
889        assert_eq!(&actual, &contents[..]);
890    }
891
892    #[fuchsia_async::run_singlethreaded(test)]
893    async fn test_store_blob_copy_overwrite() {
894        let tmp = tempfile::tempdir().unwrap();
895        let dir = Utf8Path::from_path(tmp.path()).unwrap();
896
897        let metadata_repo_path = dir.join("metadata");
898        let blob_repo_path = dir.join("blobs");
899        std::fs::create_dir(&metadata_repo_path).unwrap();
900        std::fs::create_dir(&blob_repo_path).unwrap();
901
902        let repo = FileSystemRepository::builder(metadata_repo_path, blob_repo_path.clone())
903            .copy_mode(CopyMode::CopyOverwrite)
904            .build();
905
906        // Store the blob.
907        let contents = b"hello world";
908        let path = dir.join("my-blob");
909        std::fs::write(&path, contents).unwrap();
910
911        let hash = fuchsia_merkle::root_from_slice(contents);
912        assert_matches!(repo.store_blob(&hash, contents.len() as u64, &path).await, Ok(()));
913
914        // Make sure we can read it back.
915        let blob_path = blob_repo_path.join(format!("1/{hash}"));
916        let delivery_blob = std::fs::read(&blob_path).unwrap();
917        let actual = delivery_blob::decompress(&delivery_blob).unwrap();
918        assert_eq!(&actual, &contents[..]);
919
920        assert!(std::fs::metadata(&blob_path).unwrap().permissions().readonly());
921
922        // Next, overwrite a blob that already exists.
923        let contents2 = b"another blob";
924        let path2 = dir.join("my-blob2");
925        std::fs::write(&path2, contents2).unwrap();
926        assert_matches!(repo.store_blob(&hash, contents2.len() as u64, &path2).await, Ok(()));
927
928        // Make sure we get the new contents back.
929        let delivery_blob = std::fs::read(&blob_path).unwrap();
930        let actual = delivery_blob::decompress(&delivery_blob).unwrap();
931        assert_eq!(&actual, &contents2[..]);
932    }
933
934    #[fuchsia_async::run_singlethreaded(test)]
935    async fn test_store_delivery_blob_hard_link() {
936        let tmp = tempfile::tempdir().unwrap();
937        let dir = Utf8Path::from_path(tmp.path()).unwrap();
938
939        let metadata_repo_path = dir.join("metadata");
940        let blob_repo_path = dir.join("blobs");
941        std::fs::create_dir(&metadata_repo_path).unwrap();
942        std::fs::create_dir(&blob_repo_path).unwrap();
943
944        let repo = FileSystemRepository::builder(metadata_repo_path, blob_repo_path.clone())
945            .copy_mode(CopyMode::HardLink)
946            .build();
947
948        // Store the blob.
949        let contents = b"hello world";
950        let hash = fuchsia_merkle::root_from_slice(contents);
951
952        let uncompressed_path = dir.join("my-blob");
953        std::fs::write(&uncompressed_path, contents).unwrap();
954        let path = dir.join("my-delivery-blob");
955        generate_delivery_blob(&uncompressed_path, &path, DeliveryBlobType::Type1).await.unwrap();
956
957        assert_matches!(
958            repo.store_delivery_blob(&hash, &path, DeliveryBlobType::Type1).await,
959            Ok(())
960        );
961
962        // Make sure we can read it back.
963        let blob_path = blob_repo_path.join(format!("1/{hash}"));
964        let delivery_blob = std::fs::read(&blob_path).unwrap();
965        let actual: Vec<u8> = delivery_blob::decompress(&delivery_blob).unwrap();
966        assert_eq!(&actual, &contents[..]);
967
968        #[cfg(target_family = "unix")]
969        async fn check_links(blob_path: &Utf8Path) {
970            use std::os::unix::fs::MetadataExt as _;
971
972            assert_eq!(std::fs::metadata(blob_path).unwrap().nlink(), 2);
973        }
974
975        #[cfg(not(target_family = "unix"))]
976        async fn check_links(_blob_path: &Utf8Path) {}
977
978        // Make sure the hard link count was incremented.
979        check_links(&blob_path).await;
980    }
981
982    #[fuchsia_async::run_singlethreaded(test)]
983    async fn test_store_blob_generates_delivery_blob() {
984        let tmp = tempfile::tempdir().unwrap();
985        let dir = Utf8Path::from_path(tmp.path()).unwrap();
986
987        let metadata_repo_path = dir.join("metadata");
988        let blob_repo_path = dir.join("blobs");
989        std::fs::create_dir(&metadata_repo_path).unwrap();
990        std::fs::create_dir(&blob_repo_path).unwrap();
991
992        let repo = FileSystemRepository::builder(metadata_repo_path, blob_repo_path.clone())
993            .delivery_blob_type(DeliveryBlobType::Type1)
994            .build();
995
996        // Store the blob.
997        let contents = b"hello world";
998        let hash = fuchsia_merkle::root_from_slice(contents);
999
1000        let path = dir.join("my-blob");
1001        std::fs::write(&path, contents).unwrap();
1002
1003        assert_matches!(repo.store_blob(&hash, contents.len() as u64, &path).await, Ok(()));
1004
1005        // Make sure we can read the delivery blob.
1006        let blob_path = blob_repo_path.join("1").join(hash.to_string());
1007        let delivery_blob = std::fs::read(&blob_path).unwrap();
1008        let actual = delivery_blob::decompress(&delivery_blob).unwrap();
1009        assert_eq!(&actual, &contents[..]);
1010
1011        assert!(std::fs::metadata(&blob_path).unwrap().permissions().readonly());
1012    }
1013
1014    #[fuchsia_async::run_singlethreaded(test)]
1015    async fn test_store_delivery_blob() {
1016        let tmp = tempfile::tempdir().unwrap();
1017        let dir = Utf8Path::from_path(tmp.path()).unwrap();
1018
1019        let metadata_repo_path = dir.join("metadata");
1020        let blob_repo_path = dir.join("blobs");
1021        std::fs::create_dir(&metadata_repo_path).unwrap();
1022        std::fs::create_dir(&blob_repo_path).unwrap();
1023
1024        let repo = FileSystemRepository::builder(metadata_repo_path, blob_repo_path.clone())
1025            .delivery_blob_type(DeliveryBlobType::Type1)
1026            .build();
1027
1028        // Store the blob.
1029        let contents = b"hello world";
1030        let uncompressed_path = dir.join("my-blob");
1031        std::fs::write(&uncompressed_path, contents).unwrap();
1032        let path = dir.join("my-delivery-blob");
1033        generate_delivery_blob(&uncompressed_path, &path, DeliveryBlobType::Type1).await.unwrap();
1034        let delivery_blob = std::fs::read(&path).unwrap();
1035
1036        let hash = fuchsia_merkle::root_from_slice(contents);
1037        assert_matches!(
1038            repo.store_delivery_blob(&hash, &path, DeliveryBlobType::Type1).await,
1039            Ok(())
1040        );
1041
1042        // Make sure we can read the delivery blob.
1043        let blob_path = blob_repo_path.join("1").join(hash.to_string());
1044        let stored_delivery_blob = std::fs::read(&blob_path).unwrap();
1045        assert_eq!(stored_delivery_blob, delivery_blob);
1046
1047        assert!(std::fs::metadata(&blob_path).unwrap().permissions().readonly());
1048    }
1049}