Skip to main content

fuchsia_repo/
util.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{Context as _, Result};
6use bytes::{Bytes, BytesMut};
7use camino::{Utf8Path, Utf8PathBuf};
8use futures::{Stream, TryStreamExt as _, stream};
9use std::cmp::min;
10use std::fs::{copy, create_dir_all};
11use std::io;
12use std::path::Path;
13use std::task::Poll;
14use walkdir::WalkDir;
15
16/// Read files in chunks of this size off the local storage.
17// Note: this is internally public to allow repository tests to check they work across chunks.
18pub(crate) const CHUNK_SIZE: usize = 8_192;
19
20/// Helper to read to the end of a [Bytes] stream.
21pub(crate) async fn read_stream_to_end<S>(mut stream: S, buf: &mut Vec<u8>) -> io::Result<()>
22where
23    S: Stream<Item = io::Result<Bytes>> + Unpin,
24{
25    while let Some(chunk) = stream.try_next().await? {
26        buf.extend_from_slice(&chunk);
27    }
28    Ok(())
29}
30
31#[cfg(unix)]
32fn path_nlink(path: &Utf8Path) -> Option<u64> {
33    use std::os::unix::fs::MetadataExt as _;
34    std::fs::metadata(path).ok().map(|metadata| metadata.nlink())
35}
36
37#[cfg(not(unix))]
38fn path_nlink(_path: &Utf8Path) -> Option<usize> {
39    None
40}
41
42/// Read a file up to `len` bytes in batches of [CHUNK_SIZE], and return a stream of [Bytes].
43///
44/// This will return an error if the file changed size during streaming.
45pub(super) fn file_stream(
46    expected_len: u64,
47    mut file: impl io::Read,
48    path: Option<Utf8PathBuf>,
49) -> impl Stream<Item = io::Result<Bytes>> {
50    let mut buf = BytesMut::new();
51    let mut remaining_len = expected_len;
52
53    stream::poll_fn(move |_cx| {
54        if remaining_len == 0 {
55            return Poll::Ready(None);
56        }
57
58        buf.resize(min(CHUNK_SIZE, remaining_len as usize), 0);
59
60        // Read a chunk from the file.
61        // FIXME(https://fxbug.dev/42079310): We should figure out why we were occasionally getting
62        // zero-sized reads from async IO, even though we knew there were more bytes available in
63        // the file. Once that bug is fixed, we should switch back to async IO to avoid stalling
64        // the executor.
65        let n = match file.read(&mut buf) {
66            Ok(n) => n as u64,
67            Err(err) => {
68                return Poll::Ready(Some(Err(err)));
69            }
70        };
71
72        // If we read zero bytes, then the file changed size while we were streaming it.
73        if n == 0 {
74            let msg = if let Some(path) = &path {
75                if let Some(nlink) = path_nlink(path) {
76                    format!(
77                        "file {} truncated: only read {} out of {} bytes: nlink: {}",
78                        path,
79                        expected_len - remaining_len,
80                        expected_len,
81                        nlink,
82                    )
83                } else {
84                    format!(
85                        "file {} truncated: only read {} out of {} bytes",
86                        path,
87                        expected_len - remaining_len,
88                        expected_len,
89                    )
90                }
91            } else {
92                format!(
93                    "file truncated: only read {} out of {} bytes",
94                    expected_len - remaining_len,
95                    expected_len,
96                )
97            };
98            // Clear out the remaining_len so we'll return None next time we're polled.
99            remaining_len = 0;
100            return Poll::Ready(Some(Err(io::Error::other(msg))));
101        }
102
103        // Return the chunk read from the file. The file may have changed size during streaming, so
104        // it's possible we could have read more than expected. If so, truncate the result to the
105        // limited size.
106        let mut chunk = buf.split_to(n as usize).freeze();
107        if n > remaining_len {
108            chunk = chunk.split_to(remaining_len as usize);
109            remaining_len = 0;
110        } else {
111            remaining_len -= n;
112        }
113
114        Poll::Ready(Some(Ok(chunk)))
115    })
116}
117
118pub fn copy_dir(from: &Path, to: &Path) -> Result<()> {
119    let walker = WalkDir::new(from);
120    for entry in walker.into_iter() {
121        let entry = entry?;
122        let to_path = to.join(entry.path().strip_prefix(from)?);
123        if entry.metadata()?.is_dir() {
124            if to_path.exists() {
125                continue;
126            } else {
127                create_dir_all(&to_path).with_context(|| format!("creating {to_path:?}"))?;
128            }
129        } else {
130            copy(entry.path(), &to_path)
131                .with_context(|| format!("copying {:?} to {:?}", entry.path(), to_path))?;
132        }
133    }
134
135    Ok(())
136}
137
138#[cfg(test)]
139mod tests {
140    use super::*;
141    use proptest::prelude::*;
142
143    #[fuchsia_async::run_singlethreaded(test)]
144    async fn test_file_stream() {
145        for size in [0, CHUNK_SIZE - 1, CHUNK_SIZE, CHUNK_SIZE + 1, CHUNK_SIZE * 2 + 1] {
146            let expected = (0..u8::MAX).cycle().take(size).collect::<Vec<_>>();
147            let stream = file_stream(size as u64, &*expected, None);
148
149            let mut actual = vec![];
150            read_stream_to_end(stream, &mut actual).await.unwrap();
151            assert_eq!(actual, expected);
152        }
153    }
154
155    #[fuchsia_async::run_singlethreaded(test)]
156    async fn test_file_stream_chunks() {
157        let size = CHUNK_SIZE * 3 + 10;
158
159        let expected = (0..u8::MAX).cycle().take(size).collect::<Vec<_>>();
160        let mut stream = file_stream(size as u64, &*expected, None);
161
162        let mut expected_chunks = expected.chunks(CHUNK_SIZE).map(Bytes::copy_from_slice);
163
164        assert_eq!(stream.try_next().await.unwrap(), expected_chunks.next());
165        assert_eq!(stream.try_next().await.unwrap(), expected_chunks.next());
166        assert_eq!(stream.try_next().await.unwrap(), expected_chunks.next());
167        assert_eq!(stream.try_next().await.unwrap(), expected_chunks.next());
168        assert_eq!(stream.try_next().await.unwrap(), None);
169        assert_eq!(expected_chunks.next(), None);
170    }
171
172    #[fuchsia_async::run_singlethreaded(test)]
173    async fn test_file_stream_file_truncated() {
174        let len = CHUNK_SIZE * 2;
175        let long_len = CHUNK_SIZE * 3;
176
177        let truncated_buf = vec![0; len];
178        let stream = file_stream(long_len as u64, truncated_buf.as_slice(), None);
179
180        let mut actual = vec![];
181        assert_eq!(
182            read_stream_to_end(stream, &mut actual).await.unwrap_err().to_string(),
183            format!("file truncated: only read {len} out of {long_len} bytes")
184        );
185    }
186
187    #[fuchsia_async::run_singlethreaded(test)]
188    async fn test_file_stream_file_extended() {
189        let len = CHUNK_SIZE * 3;
190        let short_len = CHUNK_SIZE * 2;
191
192        let buf = (0..u8::MAX).cycle().take(len).collect::<Vec<_>>();
193        let stream = file_stream(short_len as u64, buf.as_slice(), None);
194
195        let mut actual = vec![];
196        read_stream_to_end(stream, &mut actual).await.unwrap();
197        assert_eq!(actual, &buf[..short_len]);
198    }
199
200    proptest! {
201        #[test]
202        fn test_file_stream_proptest(len in 0usize..CHUNK_SIZE * 100) {
203            let mut executor = fuchsia_async::TestExecutor::new();
204            let () = executor.run_singlethreaded(async move {
205                let expected = (0..u8::MAX).cycle().take(len).collect::<Vec<_>>();
206                let stream = file_stream(expected.len() as u64, expected.as_slice(), None);
207
208                let mut actual = vec![];
209                read_stream_to_end(stream, &mut actual).await.unwrap();
210
211                assert_eq!(expected, actual);
212            });
213        }
214    }
215}