fuchsia_archive/
async_read.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::{
6    DIR_CHUNK_TYPE, DIR_NAMES_CHUNK_TYPE, DIRECTORY_ENTRY_LEN, DirectoryEntry, Error,
7    INDEX_ENTRY_LEN, INDEX_LEN, Index, IndexEntry, MAGIC_INDEX_VALUE,
8};
9use fuchsia_fs::file::{AsyncGetSize, AsyncGetSizeExt, AsyncReadAt, AsyncReadAtExt};
10use futures::lock::Mutex;
11use std::convert::TryInto as _;
12use std::sync::Arc;
13use zerocopy::IntoBytes as _;
14
15/// A struct to open and read a FAR-formatted archive asynchronously.
16#[derive(Debug)]
17pub struct AsyncReader<T>
18where
19    T: AsyncReadAt + AsyncGetSize + Unpin,
20{
21    source: T,
22    directory_entries: Box<[DirectoryEntry]>,
23    path_data: Box<[u8]>,
24}
25
26impl<T> AsyncReader<T>
27where
28    T: AsyncReadAt + AsyncGetSize + Unpin,
29{
30    /// Create a new AsyncReader for the provided source.
31    pub async fn new(mut source: T) -> Result<Self, Error> {
32        let index = Self::read_index_header(&mut source).await?;
33        let (dir_index, dir_name_index, end_of_last_non_content_chunk) =
34            Self::read_index_entries(&mut source, &index).await?;
35        let stream_len = source.get_size().await.map_err(Error::GetSize)?;
36
37        // Read directory entries
38        if !dir_index.length.get().is_multiple_of(DIRECTORY_ENTRY_LEN) {
39            return Err(Error::InvalidDirectoryChunkLen(dir_index.length.get()));
40        }
41        let mut directory_entries =
42            vec![
43                DirectoryEntry::default();
44                (dir_index.length.get() / DIRECTORY_ENTRY_LEN)
45                    .try_into()
46                    .map_err(|_| Error::InvalidDirectoryChunkLen(dir_index.length.get()))?
47            ];
48        source
49            .read_at_exact(dir_index.offset.get(), directory_entries.as_mut_bytes())
50            .await
51            .map_err(Error::Read)?;
52        let directory_entries = directory_entries.into_boxed_slice();
53
54        // Read path data
55        if !dir_name_index.length.get().is_multiple_of(8)
56            || dir_name_index.length.get() > stream_len
57        {
58            return Err(Error::InvalidDirectoryNamesChunkLen(dir_name_index.length.get()));
59        }
60        let path_data_length = dir_name_index
61            .length
62            .get()
63            .try_into()
64            .map_err(|_| Error::InvalidDirectoryNamesChunkLen(dir_name_index.length.get()))?;
65        let mut path_data = vec![0; path_data_length];
66        source
67            .read_at_exact(dir_name_index.offset.get(), &mut path_data)
68            .await
69            .map_err(Error::Read)?;
70        let path_data = path_data.into_boxed_slice();
71
72        let () = crate::validate_directory_entries_and_paths(
73            &directory_entries,
74            &path_data,
75            stream_len,
76            end_of_last_non_content_chunk,
77        )?;
78
79        Ok(Self { source, directory_entries, path_data })
80    }
81
82    async fn read_index_header(source: &mut T) -> Result<Index, Error> {
83        let mut index = Index::default();
84        source.read_at_exact(0, index.as_mut_bytes()).await.map_err(Error::Read)?;
85        if index.magic != MAGIC_INDEX_VALUE {
86            Err(Error::InvalidMagic(index.magic))
87        } else if !index.length.get().is_multiple_of(INDEX_ENTRY_LEN)
88            || INDEX_LEN.checked_add(index.length.get()).is_none()
89        {
90            Err(Error::InvalidIndexEntriesLen(index.length.get()))
91        } else {
92            Ok(index)
93        }
94    }
95
96    // Returns (directory_index, directory_names_index, end_of_last_chunk).
97    async fn read_index_entries(
98        source: &mut T,
99        index: &Index,
100    ) -> Result<(IndexEntry, IndexEntry, u64), Error> {
101        let mut dir_index: Option<IndexEntry> = None;
102        let mut dir_name_index: Option<IndexEntry> = None;
103        let mut previous_entry: Option<IndexEntry> = None;
104        for i in 0..index.length.get() / INDEX_ENTRY_LEN {
105            let mut entry = IndexEntry::default();
106            let entry_offset = INDEX_LEN + INDEX_ENTRY_LEN * i;
107            source.read_at_exact(entry_offset, entry.as_mut_bytes()).await.map_err(Error::Read)?;
108
109            let expected_offset = if let Some(previous_entry) = previous_entry {
110                if previous_entry.chunk_type >= entry.chunk_type {
111                    return Err(Error::IndexEntriesOutOfOrder {
112                        prev: previous_entry.chunk_type,
113                        next: entry.chunk_type,
114                    });
115                }
116                previous_entry.offset.get() + previous_entry.length.get()
117            } else {
118                INDEX_LEN + index.length.get()
119            };
120            if entry.offset.get() != expected_offset {
121                return Err(Error::InvalidChunkOffset {
122                    chunk_type: entry.chunk_type,
123                    expected: expected_offset,
124                    actual: entry.offset.get(),
125                });
126            }
127            if entry.offset.get().checked_add(entry.length.get()).is_none() {
128                return Err(Error::InvalidChunkLength {
129                    chunk_type: entry.chunk_type,
130                    offset: entry.offset.get(),
131                    length: entry.length.get(),
132                });
133            }
134
135            match entry.chunk_type {
136                DIR_CHUNK_TYPE => {
137                    dir_index = Some(entry);
138                }
139                DIR_NAMES_CHUNK_TYPE => {
140                    dir_name_index = Some(entry);
141                }
142                // FAR spec does not forbid unknown chunk types
143                _ => {}
144            }
145            previous_entry = Some(entry);
146        }
147        let end_of_last_chunk = if let Some(previous_entry) = previous_entry {
148            previous_entry.offset.get() + previous_entry.length.get()
149        } else {
150            INDEX_LEN
151        };
152        Ok((
153            dir_index.ok_or(Error::MissingDirectoryChunkIndexEntry)?,
154            dir_name_index.ok_or(Error::MissingDirectoryNamesChunkIndexEntry)?,
155            end_of_last_chunk,
156        ))
157    }
158
159    /// Return a list of the items in the archive
160    pub fn list(&self) -> impl ExactSizeIterator<Item = crate::Entry<'_>> {
161        crate::list(&self.directory_entries, &self.path_data)
162    }
163
164    /// Read the entire contents of the entry with the specified path.
165    /// O(log(# directory entries))
166    pub async fn read_file(&mut self, path: &[u8]) -> Result<Vec<u8>, Error> {
167        let entry = crate::find_directory_entry(&self.directory_entries, &self.path_data, path)?;
168        let mut data = vec![
169            0;
170            usize::try_from(entry.data_length.get()).map_err(|_| {
171                Error::ContentChunkDoesNotFitInMemory {
172                    name: path.into(),
173                    chunk_size: entry.data_length.get(),
174                }
175            })?
176        ];
177        let () = self
178            .source
179            .read_at_exact(entry.data_offset.get(), &mut data)
180            .await
181            .map_err(Error::Read)?;
182        Ok(data)
183    }
184
185    /// Get the size in bytes of the entry with the specified path.
186    /// O(log(# directory entries))
187    pub fn get_size(&self, path: &[u8]) -> Result<u64, Error> {
188        Ok(crate::find_directory_entry(&self.directory_entries, &self.path_data, path)?
189            .data_length
190            .get())
191    }
192
193    pub fn into_source(self) -> T {
194        self.source
195    }
196}
197
198impl<T> AsyncReader<Arc<Mutex<T>>>
199where
200    T: AsyncReadAt + AsyncGetSize + Unpin + Send,
201{
202    /// Read the contents of the entry with the specified path as a stream.
203    /// Each Vec in the stream will have a maximum size of `buffer_size`.
204    /// O(log(# directory entries))
205    pub fn read_file_stream(
206        &self,
207        path: &[u8],
208        buffer_size: usize,
209    ) -> Result<
210        (u64, impl futures::stream::Stream<Item = Result<Vec<u8>, std::io::Error>> + Send),
211        Error,
212    > {
213        let entry = crate::find_directory_entry(&self.directory_entries, &self.path_data, path)?;
214        let offset = entry.data_offset.get();
215        let bytes_remaining = entry.data_length.get();
216        let source = Arc::clone(&self.source);
217
218        let stream = futures::stream::unfold(
219            (source, offset, bytes_remaining),
220            move |(source, offset, bytes_remaining)| async move {
221                if bytes_remaining == 0 {
222                    return None;
223                }
224                let mut buf = vec![0; buffer_size];
225                let bytes_to_read = std::cmp::min(bytes_remaining, buf.len() as u64);
226                let res =
227                    source.lock().await.read_at(offset, &mut buf[..bytes_to_read as usize]).await;
228                Some(match res {
229                    Ok(bytes_read) => {
230                        buf.truncate(bytes_read);
231                        (
232                            Ok(buf),
233                            (
234                                source,
235                                offset + bytes_read as u64,
236                                bytes_remaining - bytes_read as u64,
237                            ),
238                        )
239                    }
240                    Err(e) => (Err(e), (source, offset, bytes_remaining)),
241                })
242            },
243        );
244        Ok((bytes_remaining, stream))
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251    use crate::tests::example_archive;
252    use assert_matches::assert_matches;
253    use fuchsia_async as fasync;
254    use fuchsia_fs::file::Adapter;
255    use futures::io::Cursor;
256    use futures::stream::TryStreamExt as _;
257
258    #[fasync::run_singlethreaded(test)]
259    async fn list() {
260        let example = example_archive();
261        let reader = AsyncReader::new(Adapter::new(Cursor::new(&example))).await.unwrap();
262        itertools::assert_equal(
263            reader.list(),
264            [
265                crate::Entry { path: b"a", offset: 4096, length: 2 },
266                crate::Entry { path: b"b", offset: 8192, length: 2 },
267                crate::Entry { path: b"dir/c", offset: 12288, length: 6 },
268            ],
269        );
270    }
271
272    #[fasync::run_singlethreaded(test)]
273    async fn read_file() {
274        let example = example_archive();
275        let mut reader = AsyncReader::new(Adapter::new(Cursor::new(&example))).await.unwrap();
276        for one_name in ["a", "b", "dir/c"] {
277            let content = reader.read_file(one_name.as_bytes()).await.unwrap();
278            let content_str = std::str::from_utf8(&content).unwrap();
279            let expected = format!("{one_name}\n");
280            assert_eq!(content_str, &expected);
281        }
282    }
283
284    #[fasync::run_singlethreaded(test)]
285    async fn read_file_stream() {
286        let example = example_archive();
287        let reader = AsyncReader::new(Arc::new(Mutex::new(Adapter::new(Cursor::new(&example)))))
288            .await
289            .unwrap();
290        for one_name in ["a", "b", "dir/c"] {
291            let (size, stream) = reader.read_file_stream(one_name.as_bytes(), 4).unwrap();
292            let expected = format!("{one_name}\n");
293            assert_eq!(size, expected.len() as u64);
294            let content: Vec<u8> =
295                stream.try_collect::<Vec<Vec<u8>>>().await.unwrap().into_iter().flatten().collect();
296            let content_str = std::str::from_utf8(&content).unwrap();
297            assert_eq!(content_str, &expected);
298        }
299    }
300
301    #[fasync::run_singlethreaded(test)]
302    async fn get_size() {
303        let example = example_archive();
304        let reader = AsyncReader::new(Adapter::new(Cursor::new(&example))).await.unwrap();
305        for one_name in ["a", "b", "dir/c"].iter().map(|s| s.as_bytes()) {
306            let returned_size = reader.get_size(one_name).unwrap();
307            let expected_size = one_name.len() + 1;
308            assert_eq!(returned_size, u64::try_from(expected_size).unwrap());
309        }
310    }
311
312    #[fasync::run_singlethreaded(test)]
313    async fn accessors_error_on_missing_path() {
314        let example = example_archive();
315        let mut reader = AsyncReader::new(Adapter::new(Cursor::new(&example))).await.unwrap();
316        assert_matches!(
317            reader.read_file(b"missing-path").await,
318            Err(Error::PathNotPresent(path)) if path == b"missing-path"
319        );
320        assert_matches!(
321            reader.get_size(b"missing-path"),
322            Err(Error::PathNotPresent(path)) if path == b"missing-path"
323        );
324    }
325}