1use anyhow::{Context as _, Result};
6use bytes::{Bytes, BytesMut};
7use camino::{Utf8Path, Utf8PathBuf};
8use futures::{Stream, TryStreamExt as _, stream};
9use std::cmp::min;
10use std::fs::{copy, create_dir_all};
11use std::io;
12use std::path::Path;
13use std::task::Poll;
14use walkdir::WalkDir;
15
16pub(crate) const CHUNK_SIZE: usize = 8_192;
19
20pub(crate) async fn read_stream_to_end<S>(mut stream: S, buf: &mut Vec<u8>) -> io::Result<()>
22where
23 S: Stream<Item = io::Result<Bytes>> + Unpin,
24{
25 while let Some(chunk) = stream.try_next().await? {
26 buf.extend_from_slice(&chunk);
27 }
28 Ok(())
29}
30
31#[cfg(unix)]
32fn path_nlink(path: &Utf8Path) -> Option<u64> {
33 use std::os::unix::fs::MetadataExt as _;
34 std::fs::metadata(path).ok().map(|metadata| metadata.nlink())
35}
36
37#[cfg(not(unix))]
38fn path_nlink(_path: &Utf8Path) -> Option<usize> {
39 None
40}
41
42pub(super) fn file_stream(
46 expected_len: u64,
47 mut file: impl io::Read,
48 path: Option<Utf8PathBuf>,
49) -> impl Stream<Item = io::Result<Bytes>> {
50 let mut buf = BytesMut::new();
51 let mut remaining_len = expected_len;
52
53 stream::poll_fn(move |_cx| {
54 if remaining_len == 0 {
55 return Poll::Ready(None);
56 }
57
58 buf.resize(min(CHUNK_SIZE, remaining_len as usize), 0);
59
60 let n = match file.read(&mut buf) {
66 Ok(n) => n as u64,
67 Err(err) => {
68 return Poll::Ready(Some(Err(err)));
69 }
70 };
71
72 if n == 0 {
74 let msg = if let Some(path) = &path {
75 if let Some(nlink) = path_nlink(path) {
76 format!(
77 "file {} truncated: only read {} out of {} bytes: nlink: {}",
78 path,
79 expected_len - remaining_len,
80 expected_len,
81 nlink,
82 )
83 } else {
84 format!(
85 "file {} truncated: only read {} out of {} bytes",
86 path,
87 expected_len - remaining_len,
88 expected_len,
89 )
90 }
91 } else {
92 format!(
93 "file truncated: only read {} out of {} bytes",
94 expected_len - remaining_len,
95 expected_len,
96 )
97 };
98 remaining_len = 0;
100 return Poll::Ready(Some(Err(io::Error::other(msg))));
101 }
102
103 let mut chunk = buf.split_to(n as usize).freeze();
107 if n > remaining_len {
108 chunk = chunk.split_to(remaining_len as usize);
109 remaining_len = 0;
110 } else {
111 remaining_len -= n;
112 }
113
114 Poll::Ready(Some(Ok(chunk)))
115 })
116}
117
118pub fn copy_dir(from: &Path, to: &Path) -> Result<()> {
119 let walker = WalkDir::new(from);
120 for entry in walker.into_iter() {
121 let entry = entry?;
122 let to_path = to.join(entry.path().strip_prefix(from)?);
123 if entry.metadata()?.is_dir() {
124 if to_path.exists() {
125 continue;
126 } else {
127 create_dir_all(&to_path).with_context(|| format!("creating {to_path:?}"))?;
128 }
129 } else {
130 copy(entry.path(), &to_path)
131 .with_context(|| format!("copying {:?} to {:?}", entry.path(), to_path))?;
132 }
133 }
134
135 Ok(())
136}
137
138#[cfg(test)]
139mod tests {
140 use super::*;
141 use proptest::prelude::*;
142
143 #[fuchsia_async::run_singlethreaded(test)]
144 async fn test_file_stream() {
145 for size in [0, CHUNK_SIZE - 1, CHUNK_SIZE, CHUNK_SIZE + 1, CHUNK_SIZE * 2 + 1] {
146 let expected = (0..u8::MAX).cycle().take(size).collect::<Vec<_>>();
147 let stream = file_stream(size as u64, &*expected, None);
148
149 let mut actual = vec![];
150 read_stream_to_end(stream, &mut actual).await.unwrap();
151 assert_eq!(actual, expected);
152 }
153 }
154
155 #[fuchsia_async::run_singlethreaded(test)]
156 async fn test_file_stream_chunks() {
157 let size = CHUNK_SIZE * 3 + 10;
158
159 let expected = (0..u8::MAX).cycle().take(size).collect::<Vec<_>>();
160 let mut stream = file_stream(size as u64, &*expected, None);
161
162 let mut expected_chunks = expected.chunks(CHUNK_SIZE).map(Bytes::copy_from_slice);
163
164 assert_eq!(stream.try_next().await.unwrap(), expected_chunks.next());
165 assert_eq!(stream.try_next().await.unwrap(), expected_chunks.next());
166 assert_eq!(stream.try_next().await.unwrap(), expected_chunks.next());
167 assert_eq!(stream.try_next().await.unwrap(), expected_chunks.next());
168 assert_eq!(stream.try_next().await.unwrap(), None);
169 assert_eq!(expected_chunks.next(), None);
170 }
171
172 #[fuchsia_async::run_singlethreaded(test)]
173 async fn test_file_stream_file_truncated() {
174 let len = CHUNK_SIZE * 2;
175 let long_len = CHUNK_SIZE * 3;
176
177 let truncated_buf = vec![0; len];
178 let stream = file_stream(long_len as u64, truncated_buf.as_slice(), None);
179
180 let mut actual = vec![];
181 assert_eq!(
182 read_stream_to_end(stream, &mut actual).await.unwrap_err().to_string(),
183 format!("file truncated: only read {len} out of {long_len} bytes")
184 );
185 }
186
187 #[fuchsia_async::run_singlethreaded(test)]
188 async fn test_file_stream_file_extended() {
189 let len = CHUNK_SIZE * 3;
190 let short_len = CHUNK_SIZE * 2;
191
192 let buf = (0..u8::MAX).cycle().take(len).collect::<Vec<_>>();
193 let stream = file_stream(short_len as u64, buf.as_slice(), None);
194
195 let mut actual = vec![];
196 read_stream_to_end(stream, &mut actual).await.unwrap();
197 assert_eq!(actual, &buf[..short_len]);
198 }
199
200 proptest! {
201 #[test]
202 fn test_file_stream_proptest(len in 0usize..CHUNK_SIZE * 100) {
203 let mut executor = fuchsia_async::TestExecutor::new();
204 let () = executor.run_singlethreaded(async move {
205 let expected = (0..u8::MAX).cycle().take(len).collect::<Vec<_>>();
206 let stream = file_stream(expected.len() as u64, expected.as_slice(), None);
207
208 let mut actual = vec![];
209 read_stream_to_end(stream, &mut actual).await.unwrap();
210
211 assert_eq!(expected, actual);
212 });
213 }
214 }
215}