1use crate::{
6 DIR_CHUNK_TYPE, DIR_NAMES_CHUNK_TYPE, DIRECTORY_ENTRY_LEN, DirectoryEntry, Error,
7 INDEX_ENTRY_LEN, INDEX_LEN, Index, IndexEntry, MAGIC_INDEX_VALUE,
8};
9use fuchsia_fs::file::{AsyncGetSize, AsyncGetSizeExt, AsyncReadAt, AsyncReadAtExt};
10use futures::lock::Mutex;
11use std::convert::TryInto as _;
12use std::sync::Arc;
13use zerocopy::IntoBytes as _;
14
15#[derive(Debug)]
17pub struct AsyncReader<T>
18where
19 T: AsyncReadAt + AsyncGetSize + Unpin,
20{
21 source: T,
22 directory_entries: Box<[DirectoryEntry]>,
23 path_data: Box<[u8]>,
24}
25
26impl<T> AsyncReader<T>
27where
28 T: AsyncReadAt + AsyncGetSize + Unpin,
29{
30 pub async fn new(mut source: T) -> Result<Self, Error> {
32 let index = Self::read_index_header(&mut source).await?;
33 let (dir_index, dir_name_index, end_of_last_non_content_chunk) =
34 Self::read_index_entries(&mut source, &index).await?;
35 let stream_len = source.get_size().await.map_err(Error::GetSize)?;
36
37 if !dir_index.length.get().is_multiple_of(DIRECTORY_ENTRY_LEN) {
39 return Err(Error::InvalidDirectoryChunkLen(dir_index.length.get()));
40 }
41 let mut directory_entries =
42 vec![
43 DirectoryEntry::default();
44 (dir_index.length.get() / DIRECTORY_ENTRY_LEN)
45 .try_into()
46 .map_err(|_| Error::InvalidDirectoryChunkLen(dir_index.length.get()))?
47 ];
48 source
49 .read_at_exact(dir_index.offset.get(), directory_entries.as_mut_bytes())
50 .await
51 .map_err(Error::Read)?;
52 let directory_entries = directory_entries.into_boxed_slice();
53
54 if !dir_name_index.length.get().is_multiple_of(8)
56 || dir_name_index.length.get() > stream_len
57 {
58 return Err(Error::InvalidDirectoryNamesChunkLen(dir_name_index.length.get()));
59 }
60 let path_data_length = dir_name_index
61 .length
62 .get()
63 .try_into()
64 .map_err(|_| Error::InvalidDirectoryNamesChunkLen(dir_name_index.length.get()))?;
65 let mut path_data = vec![0; path_data_length];
66 source
67 .read_at_exact(dir_name_index.offset.get(), &mut path_data)
68 .await
69 .map_err(Error::Read)?;
70 let path_data = path_data.into_boxed_slice();
71
72 let () = crate::validate_directory_entries_and_paths(
73 &directory_entries,
74 &path_data,
75 stream_len,
76 end_of_last_non_content_chunk,
77 )?;
78
79 Ok(Self { source, directory_entries, path_data })
80 }
81
82 async fn read_index_header(source: &mut T) -> Result<Index, Error> {
83 let mut index = Index::default();
84 source.read_at_exact(0, index.as_mut_bytes()).await.map_err(Error::Read)?;
85 if index.magic != MAGIC_INDEX_VALUE {
86 Err(Error::InvalidMagic(index.magic))
87 } else if !index.length.get().is_multiple_of(INDEX_ENTRY_LEN)
88 || INDEX_LEN.checked_add(index.length.get()).is_none()
89 {
90 Err(Error::InvalidIndexEntriesLen(index.length.get()))
91 } else {
92 Ok(index)
93 }
94 }
95
96 async fn read_index_entries(
98 source: &mut T,
99 index: &Index,
100 ) -> Result<(IndexEntry, IndexEntry, u64), Error> {
101 let mut dir_index: Option<IndexEntry> = None;
102 let mut dir_name_index: Option<IndexEntry> = None;
103 let mut previous_entry: Option<IndexEntry> = None;
104 for i in 0..index.length.get() / INDEX_ENTRY_LEN {
105 let mut entry = IndexEntry::default();
106 let entry_offset = INDEX_LEN + INDEX_ENTRY_LEN * i;
107 source.read_at_exact(entry_offset, entry.as_mut_bytes()).await.map_err(Error::Read)?;
108
109 let expected_offset = if let Some(previous_entry) = previous_entry {
110 if previous_entry.chunk_type >= entry.chunk_type {
111 return Err(Error::IndexEntriesOutOfOrder {
112 prev: previous_entry.chunk_type,
113 next: entry.chunk_type,
114 });
115 }
116 previous_entry.offset.get() + previous_entry.length.get()
117 } else {
118 INDEX_LEN + index.length.get()
119 };
120 if entry.offset.get() != expected_offset {
121 return Err(Error::InvalidChunkOffset {
122 chunk_type: entry.chunk_type,
123 expected: expected_offset,
124 actual: entry.offset.get(),
125 });
126 }
127 if entry.offset.get().checked_add(entry.length.get()).is_none() {
128 return Err(Error::InvalidChunkLength {
129 chunk_type: entry.chunk_type,
130 offset: entry.offset.get(),
131 length: entry.length.get(),
132 });
133 }
134
135 match entry.chunk_type {
136 DIR_CHUNK_TYPE => {
137 dir_index = Some(entry);
138 }
139 DIR_NAMES_CHUNK_TYPE => {
140 dir_name_index = Some(entry);
141 }
142 _ => {}
144 }
145 previous_entry = Some(entry);
146 }
147 let end_of_last_chunk = if let Some(previous_entry) = previous_entry {
148 previous_entry.offset.get() + previous_entry.length.get()
149 } else {
150 INDEX_LEN
151 };
152 Ok((
153 dir_index.ok_or(Error::MissingDirectoryChunkIndexEntry)?,
154 dir_name_index.ok_or(Error::MissingDirectoryNamesChunkIndexEntry)?,
155 end_of_last_chunk,
156 ))
157 }
158
159 pub fn list(&self) -> impl ExactSizeIterator<Item = crate::Entry<'_>> {
161 crate::list(&self.directory_entries, &self.path_data)
162 }
163
164 pub async fn read_file(&mut self, path: &[u8]) -> Result<Vec<u8>, Error> {
167 let entry = crate::find_directory_entry(&self.directory_entries, &self.path_data, path)?;
168 let mut data = vec![
169 0;
170 usize::try_from(entry.data_length.get()).map_err(|_| {
171 Error::ContentChunkDoesNotFitInMemory {
172 name: path.into(),
173 chunk_size: entry.data_length.get(),
174 }
175 })?
176 ];
177 let () = self
178 .source
179 .read_at_exact(entry.data_offset.get(), &mut data)
180 .await
181 .map_err(Error::Read)?;
182 Ok(data)
183 }
184
185 pub fn get_size(&self, path: &[u8]) -> Result<u64, Error> {
188 Ok(crate::find_directory_entry(&self.directory_entries, &self.path_data, path)?
189 .data_length
190 .get())
191 }
192
193 pub fn into_source(self) -> T {
194 self.source
195 }
196}
197
198impl<T> AsyncReader<Arc<Mutex<T>>>
199where
200 T: AsyncReadAt + AsyncGetSize + Unpin + Send,
201{
202 pub fn read_file_stream(
206 &self,
207 path: &[u8],
208 buffer_size: usize,
209 ) -> Result<
210 (u64, impl futures::stream::Stream<Item = Result<Vec<u8>, std::io::Error>> + Send),
211 Error,
212 > {
213 let entry = crate::find_directory_entry(&self.directory_entries, &self.path_data, path)?;
214 let offset = entry.data_offset.get();
215 let bytes_remaining = entry.data_length.get();
216 let source = Arc::clone(&self.source);
217
218 let stream = futures::stream::unfold(
219 (source, offset, bytes_remaining),
220 move |(source, offset, bytes_remaining)| async move {
221 if bytes_remaining == 0 {
222 return None;
223 }
224 let mut buf = vec![0; buffer_size];
225 let bytes_to_read = std::cmp::min(bytes_remaining, buf.len() as u64);
226 let res =
227 source.lock().await.read_at(offset, &mut buf[..bytes_to_read as usize]).await;
228 Some(match res {
229 Ok(bytes_read) => {
230 buf.truncate(bytes_read);
231 (
232 Ok(buf),
233 (
234 source,
235 offset + bytes_read as u64,
236 bytes_remaining - bytes_read as u64,
237 ),
238 )
239 }
240 Err(e) => (Err(e), (source, offset, bytes_remaining)),
241 })
242 },
243 );
244 Ok((bytes_remaining, stream))
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251 use crate::tests::example_archive;
252 use assert_matches::assert_matches;
253 use fuchsia_async as fasync;
254 use fuchsia_fs::file::Adapter;
255 use futures::io::Cursor;
256 use futures::stream::TryStreamExt as _;
257
258 #[fasync::run_singlethreaded(test)]
259 async fn list() {
260 let example = example_archive();
261 let reader = AsyncReader::new(Adapter::new(Cursor::new(&example))).await.unwrap();
262 itertools::assert_equal(
263 reader.list(),
264 [
265 crate::Entry { path: b"a", offset: 4096, length: 2 },
266 crate::Entry { path: b"b", offset: 8192, length: 2 },
267 crate::Entry { path: b"dir/c", offset: 12288, length: 6 },
268 ],
269 );
270 }
271
272 #[fasync::run_singlethreaded(test)]
273 async fn read_file() {
274 let example = example_archive();
275 let mut reader = AsyncReader::new(Adapter::new(Cursor::new(&example))).await.unwrap();
276 for one_name in ["a", "b", "dir/c"] {
277 let content = reader.read_file(one_name.as_bytes()).await.unwrap();
278 let content_str = std::str::from_utf8(&content).unwrap();
279 let expected = format!("{one_name}\n");
280 assert_eq!(content_str, &expected);
281 }
282 }
283
284 #[fasync::run_singlethreaded(test)]
285 async fn read_file_stream() {
286 let example = example_archive();
287 let reader = AsyncReader::new(Arc::new(Mutex::new(Adapter::new(Cursor::new(&example)))))
288 .await
289 .unwrap();
290 for one_name in ["a", "b", "dir/c"] {
291 let (size, stream) = reader.read_file_stream(one_name.as_bytes(), 4).unwrap();
292 let expected = format!("{one_name}\n");
293 assert_eq!(size, expected.len() as u64);
294 let content: Vec<u8> =
295 stream.try_collect::<Vec<Vec<u8>>>().await.unwrap().into_iter().flatten().collect();
296 let content_str = std::str::from_utf8(&content).unwrap();
297 assert_eq!(content_str, &expected);
298 }
299 }
300
301 #[fasync::run_singlethreaded(test)]
302 async fn get_size() {
303 let example = example_archive();
304 let reader = AsyncReader::new(Adapter::new(Cursor::new(&example))).await.unwrap();
305 for one_name in ["a", "b", "dir/c"].iter().map(|s| s.as_bytes()) {
306 let returned_size = reader.get_size(one_name).unwrap();
307 let expected_size = one_name.len() + 1;
308 assert_eq!(returned_size, u64::try_from(expected_size).unwrap());
309 }
310 }
311
312 #[fasync::run_singlethreaded(test)]
313 async fn accessors_error_on_missing_path() {
314 let example = example_archive();
315 let mut reader = AsyncReader::new(Adapter::new(Cursor::new(&example))).await.unwrap();
316 assert_matches!(
317 reader.read_file(b"missing-path").await,
318 Err(Error::PathNotPresent(path)) if path == b"missing-path"
319 );
320 assert_matches!(
321 reader.get_size(b"missing-path"),
322 Err(Error::PathNotPresent(path)) if path == b"missing-path"
323 );
324 }
325}