fuchsia_fs/file/
async_read_at.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl::client::QueryResponseFut;
6use flex_client::Dialect;
7use flex_fuchsia_io as fio;
8use futures::future::Future;
9use futures::io::{AsyncRead, AsyncSeek, SeekFrom};
10use futures::FutureExt;
11use std::cmp::min;
12use std::convert::TryInto as _;
13use std::io;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17/// Trait for reading at a given offset asynchronously.
18/// This is basically `futures::io::AsyncRead` with an extra offset.
19pub trait AsyncReadAt {
20    /// Attempt to read at most `buf.len()` bytes starting at `offset` into `buf`. On success
21    /// returns the number of bytes read.
22    /// Contents of `buf` are only altered on success.
23    /// Reads of more than zero but fewer than `buf.len()` bytes do NOT indicate EOF.
24    /// Reads of zero bytes only occur if `buf.len() == 0` or EOF.
25    fn poll_read_at(
26        self: Pin<&mut Self>,
27        cx: &mut Context<'_>,
28        offset: u64,
29        buf: &mut [u8],
30    ) -> Poll<io::Result<usize>>;
31}
32
33/// Trait for getting the size of the file asynchronously.
34pub trait AsyncGetSize {
35    /// Attempt to get the size of the file, on success returns the file size.
36    fn poll_get_size(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>;
37}
38
39/// An extension trait which adds utility methods to AsyncGetSize.
40pub trait AsyncGetSizeExt: AsyncGetSize {
41    /// Returns a future that will return the file size on success.
42    fn get_size<'a>(&'a mut self) -> GetSize<'a, Self>
43    where
44        Self: Unpin,
45    {
46        GetSize { size_getter: self }
47    }
48}
49
50impl<T: AsyncGetSize + ?Sized> AsyncGetSizeExt for T {}
51
52/// Future for the [`get_size`](AsyncGetSizeExt::get_size) method.
53#[derive(Debug)]
54#[must_use = "futures do nothing unless you `.await` or poll them"]
55pub struct GetSize<'a, R: ?Sized> {
56    size_getter: &'a mut R,
57}
58
59impl<R: ?Sized + Unpin> Unpin for GetSize<'_, R> {}
60
61impl<R: AsyncGetSize + ?Sized + Unpin> Future for GetSize<'_, R> {
62    type Output = io::Result<u64>;
63
64    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
65        let this = &mut *self;
66        Pin::new(&mut *this.size_getter).poll_get_size(cx)
67    }
68}
69
70/// Wraps a `fidl_fuchsia_io::FileProxy` and implements `AsyncReadAt` and `AsyncGetSize`, which
71/// allows one to perform asynchronous file reads that don't block the current thread while waiting
72/// for data.
73/// Unlike `AsyncReader`, this struct does not require exclusive ownership, because `read_at` does
74/// not rely on the file offset state in the connection. This is useful if one wants to efficiently
75/// read different parts of the file at the same time.
76#[derive(Debug)]
77pub struct AsyncFile {
78    file: fio::FileProxy,
79    read_at_state: ReadAtState,
80    get_attributes_fut: Option<
81        QueryResponseFut<
82            Result<(fio::MutableNodeAttributes, fio::ImmutableNodeAttributes), i32>,
83            Dialect,
84        >,
85    >,
86}
87
88#[derive(Debug)]
89enum ReadAtState {
90    Empty,
91    Forwarding {
92        fut: QueryResponseFut<Result<Vec<u8>, i32>, Dialect>,
93        file_offset: u64,
94        zero_byte_request: bool,
95    },
96    Bytes {
97        bytes: Vec<u8>,
98        file_offset: u64,
99    },
100}
101
102impl AsyncFile {
103    pub fn from_proxy(file: fio::FileProxy) -> Self {
104        Self { file, read_at_state: ReadAtState::Empty, get_attributes_fut: None }
105    }
106}
107
108impl AsyncReadAt for AsyncFile {
109    fn poll_read_at(
110        mut self: Pin<&mut Self>,
111        cx: &mut Context<'_>,
112        offset: u64,
113        buf: &mut [u8],
114    ) -> Poll<std::io::Result<usize>> {
115        loop {
116            match self.read_at_state {
117                ReadAtState::Empty => {
118                    let len = if let Ok(len) = buf.len().try_into() {
119                        min(len, fio::MAX_BUF)
120                    } else {
121                        fio::MAX_BUF
122                    };
123                    self.read_at_state = ReadAtState::Forwarding {
124                        fut: self.file.read_at(len, offset),
125                        file_offset: offset,
126                        zero_byte_request: len == 0,
127                    };
128                }
129                ReadAtState::Forwarding { ref mut fut, file_offset, zero_byte_request } => {
130                    match futures::ready!(Pin::new(fut).poll(cx)) {
131                        Ok(result) => {
132                            match result {
133                                Err(s) => {
134                                    self.read_at_state = ReadAtState::Empty;
135                                    return Poll::Ready(Err(
136                                        zx_status::Status::from_raw(s).into_io_error()
137                                    ));
138                                }
139                                Ok(bytes) => {
140                                    // If the File.ReadAt request was for zero bytes, but the current
141                                    // poll_read_at is not (because the File.ReadAt request was made by an
142                                    // earlier call to poll_read_at with a zero length buffer) then we
143                                    // should not advance to ReadAtState::Bytes because that would return
144                                    // Ready(Ok(0)), which would indicate EOF to the client.
145                                    // This handling is done here instead of short-circuiting at the
146                                    // beginning of the function so that zero-length poll_read_ats still
147                                    // trigger the validation performed by File.ReadAt.
148                                    if zero_byte_request && buf.len() != 0 {
149                                        self.read_at_state = ReadAtState::Empty;
150                                    } else {
151                                        self.read_at_state =
152                                            ReadAtState::Bytes { bytes, file_offset };
153                                    }
154                                }
155                            }
156                        }
157                        Err(e) => {
158                            self.read_at_state = ReadAtState::Empty;
159                            return Poll::Ready(Err(std::io::Error::other(e)));
160                        }
161                    }
162                }
163                ReadAtState::Bytes { ref bytes, file_offset } => {
164                    if offset < file_offset {
165                        self.read_at_state = ReadAtState::Empty;
166                        continue;
167                    }
168                    let bytes_offset = match (offset - file_offset).try_into() {
169                        Ok(offset) => offset,
170                        Err(_) => {
171                            self.read_at_state = ReadAtState::Empty;
172                            continue;
173                        }
174                    };
175                    if bytes_offset != 0 && bytes_offset >= bytes.len() {
176                        self.read_at_state = ReadAtState::Empty;
177                        continue;
178                    }
179                    let n = min(buf.len(), bytes.len() - bytes_offset);
180                    let () = buf[..n].copy_from_slice(&bytes[bytes_offset..bytes_offset + n]);
181                    return Poll::Ready(Ok(n));
182                }
183            }
184        }
185    }
186}
187
188impl AsyncGetSize for AsyncFile {
189    fn poll_get_size(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
190        if self.get_attributes_fut.is_none() {
191            self.get_attributes_fut =
192                Some(self.file.get_attributes(fio::NodeAttributesQuery::CONTENT_SIZE));
193        }
194        let fut = self.get_attributes_fut.as_mut().unwrap();
195        let get_attributes_fut_result = futures::ready!(fut.poll_unpin(cx));
196        self.get_attributes_fut = None;
197        match get_attributes_fut_result {
198            Ok(get_attributes_response) => match get_attributes_response {
199                Ok((_mutable_attr, immutable_attr)) => {
200                    if let Some(content_size) = immutable_attr.content_size {
201                        return Poll::Ready(Ok(content_size));
202                    }
203                    return Poll::Ready(Err(zx_status::Status::NOT_SUPPORTED.into_io_error()));
204                }
205                Err(status) => {
206                    return Poll::Ready(Err(zx_status::Status::from_raw(status).into_io_error()));
207                }
208            },
209            Err(e) => {
210                return Poll::Ready(Err(std::io::Error::other(e)));
211            }
212        }
213    }
214}
215
216/// Adapter to implement AsyncReadAt + AsyncGetSize for AsyncRead + AsyncSeek.
217#[derive(Debug)]
218pub struct Adapter<T> {
219    inner: T,
220}
221
222impl<T> Adapter<T> {
223    pub fn new(inner: T) -> Adapter<T> {
224        Self { inner }
225    }
226}
227
228impl<T: AsyncRead + AsyncSeek + Unpin> AsyncReadAt for Adapter<T> {
229    fn poll_read_at(
230        mut self: Pin<&mut Self>,
231        cx: &mut Context<'_>,
232        offset: u64,
233        buf: &mut [u8],
234    ) -> Poll<std::io::Result<usize>> {
235        futures::ready!(Pin::new(&mut self.inner).poll_seek(cx, SeekFrom::Start(offset)))?;
236        Pin::new(&mut self.inner).poll_read(cx, buf)
237    }
238}
239
240impl<T: AsyncSeek + Unpin> AsyncGetSize for Adapter<T> {
241    fn poll_get_size(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
242        Pin::new(&mut self.inner).poll_seek(cx, SeekFrom::End(0))
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use crate::file::{self, AsyncReadAtExt};
250    use assert_matches::assert_matches;
251    use fidl::endpoints;
252    use fuchsia_async as fasync;
253    use futures::future::{self, poll_fn};
254    use futures::{StreamExt as _, TryStreamExt as _};
255    use std::convert::TryFrom as _;
256    use std::io::Write;
257    use tempfile::{NamedTempFile, TempDir};
258
259    async fn poll_read_at_with_specific_buf_size(
260        poll_read_size: u64,
261        expected_file_read_size: u64,
262    ) {
263        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
264
265        let mut reader = AsyncFile::from_proxy(proxy);
266
267        let () = poll_fn(|cx| {
268            let mut buf = vec![0u8; poll_read_size.try_into().unwrap()];
269            assert_matches!(
270                Pin::new(&mut reader).poll_read_at(cx, 0, buf.as_mut_slice()),
271                Poll::Pending
272            );
273            Poll::Ready(())
274        })
275        .await;
276
277        match stream.next().await.unwrap().unwrap() {
278            fio::FileRequest::ReadAt { count, .. } => {
279                assert_eq!(count, expected_file_read_size);
280            }
281            req => panic!("unhandled request {:?}", req),
282        }
283    }
284
285    #[fasync::run_singlethreaded(test)]
286    async fn poll_read_at_empty_buf() {
287        poll_read_at_with_specific_buf_size(0, 0).await;
288    }
289
290    #[fasync::run_singlethreaded(test)]
291    async fn poll_read_at_caps_buf_size() {
292        poll_read_at_with_specific_buf_size(fio::MAX_BUF * 2, fio::MAX_BUF).await;
293    }
294
295    #[fasync::run_singlethreaded(test)]
296    async fn poll_read_at_pending_saves_future() {
297        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
298
299        let mut reader = AsyncFile::from_proxy(proxy);
300
301        // This poll_read_at call will create a File.ReadAt future and poll it. The poll of the
302        // File.ReadAt future will return Pending because nothing is handling the FileRequestStream
303        // yet. The reader should save this File.ReadAt future for handling subsequent poll_read_at
304        // calls.
305        let () = poll_fn(|cx| {
306            assert_matches!(
307                Pin::new(&mut reader).poll_read_at(cx, 2, &mut [0u8; 1]),
308                Poll::Pending
309            );
310            Poll::Ready(())
311        })
312        .await;
313
314        // Call poll_read_at until we get a byte out. This byte should be from the first and only
315        // File.ReadAt request.
316        let poll_read_at = async move {
317            let mut buf = [0u8; 1];
318            assert_eq!(reader.read_at(2, &mut buf).await.unwrap(), buf.len());
319            assert_eq!(&buf, &[1]);
320        };
321
322        let mut file_read_requests = 0u8;
323        let handle_file_stream = async {
324            while let Some(req) = stream.try_next().await.unwrap() {
325                file_read_requests += 1;
326                match req {
327                    fio::FileRequest::ReadAt { count, offset, responder } => {
328                        assert_eq!(count, 1);
329                        assert_eq!(offset, 2);
330                        responder.send(Ok(&[file_read_requests])).unwrap();
331                    }
332                    req => panic!("unhandled request {:?}", req),
333                }
334            }
335        };
336
337        let ((), ()) = future::join(poll_read_at, handle_file_stream).await;
338        assert_eq!(file_read_requests, 1);
339    }
340
341    #[fasync::run_singlethreaded(test)]
342    async fn poll_read_at_with_smaller_buf_after_pending() {
343        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
344
345        let mut reader = AsyncFile::from_proxy(proxy);
346
347        // Call poll_read_at with a buf of length 3. This is the first poll_read_at call, so the
348        // reader will create a File.ReadAt future for 3 bytes. poll_read_at will return Pending
349        // because nothing is handling the FileRequestStream yet.
350        let () = poll_fn(|cx| {
351            assert_matches!(
352                Pin::new(&mut reader).poll_read_at(cx, 0, &mut [0u8; 3]),
353                Poll::Pending
354            );
355            Poll::Ready(())
356        })
357        .await;
358
359        // Respond to the three byte File.ReadAt request.
360        let () = async {
361            match stream.next().await.unwrap().unwrap() {
362                fio::FileRequest::ReadAt { count, offset, responder } => {
363                    assert_eq!(count, 3);
364                    assert_eq!(offset, 0);
365                    responder.send(Ok(b"012")).unwrap();
366                }
367                req => panic!("unhandled request {:?}", req),
368            }
369        }
370        .await;
371
372        // Call poll_read_at with a buf of length 1. This should resolve the previously created 3
373        // byte File.ReadAt future and return the first byte from it while saving the remaining two
374        // bytes.
375        let mut buf = [0u8; 1];
376        assert_eq!(reader.read_at(0, &mut buf).await.unwrap(), buf.len());
377        assert_eq!(&buf, b"0");
378
379        // Call poll_read_at with a buf of len 1. This should return the first saved byte, which
380        // should be the second byte from the original File.ReadAt request.
381        let mut buf = [0u8; 1];
382        assert_eq!(reader.read_at(1, &mut buf).await.unwrap(), buf.len());
383        assert_eq!(&buf, b"1");
384
385        // Call poll_read_at with a buf of len 2. There should only be one remaining saved byte
386        // from the original File.ReadAt request, so poll_read_at should only return one byte.
387        let mut buf = [0u8; 2];
388        assert_eq!(reader.read_at(2, &mut buf).await.unwrap(), 1);
389        assert_eq!(&buf[..1], b"2");
390
391        // There should be no saved bytes remaining, so a poll_read_at of four bytes should cause a
392        // new File.ReadAt request.
393        let mut buf = [0u8; 4];
394        let poll_read_at = reader.read_at(3, &mut buf);
395
396        let handle_second_file_request = async {
397            match stream.next().await.unwrap().unwrap() {
398                fio::FileRequest::ReadAt { count, offset, responder } => {
399                    assert_eq!(count, 4);
400                    assert_eq!(offset, 3);
401                    responder.send(Ok(b"3456")).unwrap();
402                }
403                req => panic!("unhandled request {:?}", req),
404            }
405        };
406
407        let (read_res, ()) = future::join(poll_read_at, handle_second_file_request).await;
408        assert_eq!(read_res.unwrap(), 4);
409        assert_eq!(&buf, b"3456");
410    }
411
412    #[fasync::run_singlethreaded(test)]
413    async fn transition_to_empty_on_fidl_error() {
414        let (proxy, _) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
415
416        let mut reader = AsyncFile::from_proxy(proxy);
417
418        // poll_read_at will fail because the channel is closed because the server end was dropped.
419        let () = poll_fn(|cx| {
420            assert_matches!(
421                Pin::new(&mut reader).poll_read_at(cx, 0, &mut [0u8; 1]),
422                Poll::Ready(Err(_))
423            );
424            Poll::Ready(())
425        })
426        .await;
427
428        // This test is accessing internal state because the only fidl error that is easy to inject
429        // is ZX_ERR_PEER_CLOSED (by closing the channel). Once the channel is closed, all new
430        // futures created by the AsyncFile will fail, but, if poll'ed, the old future would also
431        // continue to fail (not panic) because it is Fused.
432        assert_matches!(reader.read_at_state, ReadAtState::Empty);
433    }
434
435    #[fasync::run_singlethreaded(test)]
436    async fn recover_from_file_read_error() {
437        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
438
439        let mut reader = AsyncFile::from_proxy(proxy);
440
441        // Call poll_read_at until failure.
442        let mut buf = [0u8; 1];
443        let poll_read_at = reader.read_at(0, &mut buf);
444
445        let failing_file_response = async {
446            match stream.next().await.unwrap().unwrap() {
447                fio::FileRequest::ReadAt { count, offset, responder } => {
448                    assert_eq!(count, 1);
449                    assert_eq!(offset, 0);
450                    responder.send(Err(zx_status::Status::NO_MEMORY.into_raw())).unwrap();
451                }
452                req => panic!("unhandled request {:?}", req),
453            }
454        };
455
456        let (read_res, ()) = future::join(poll_read_at, failing_file_response).await;
457        assert_matches!(read_res, Err(_));
458
459        // Calling poll_read_at again should create a new File.ReadAt request instead of reusing
460        // the old future.
461        let mut buf = [0u8; 1];
462        let poll_read_at = reader.read_at(0, &mut buf);
463
464        let succeeding_file_response = async {
465            match stream.next().await.unwrap().unwrap() {
466                fio::FileRequest::ReadAt { count, offset, responder } => {
467                    assert_eq!(count, 1);
468                    assert_eq!(offset, 0);
469                    responder.send(Ok(b"0")).unwrap();
470                }
471                req => panic!("unhandled request {:?}", req),
472            }
473        };
474
475        let (read_res, ()) = future::join(poll_read_at, succeeding_file_response).await;
476        assert_eq!(read_res.unwrap(), 1);
477        assert_eq!(&buf, b"0");
478    }
479
480    #[fasync::run_singlethreaded(test)]
481    async fn poll_read_at_zero_then_read_nonzero() {
482        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
483
484        let mut reader = AsyncFile::from_proxy(proxy);
485
486        // Call poll_read_at with a zero-length buffer.
487        let () = poll_fn(|cx| {
488            assert_matches!(Pin::new(&mut reader).poll_read_at(cx, 0, &mut []), Poll::Pending);
489            Poll::Ready(())
490        })
491        .await;
492
493        // Handle the zero-length File.ReadAt request.
494        match stream.next().await.unwrap().unwrap() {
495            fio::FileRequest::ReadAt { count, offset, responder } => {
496                assert_eq!(count, 0);
497                assert_eq!(offset, 0);
498                responder.send(Ok(&[])).unwrap();
499            }
500            req => panic!("unhandled request {:?}", req),
501        }
502
503        // Call poll_read_at with a length 1 buffer until Ready is returned;
504        let mut buf = vec![0u8; 1];
505        let poll_read_at = reader.read_at(0, &mut buf);
506
507        // The AsyncFile will discard the File.ReadAt response from the first poll_read, and create
508        // another request, this handles that second request. The AsyncFile discards the first
509        // response because the first poll_read_at was for zero bytes, but the current poll_read_at
510        // is not.
511        let handle_file_request = async {
512            match stream.next().await.unwrap().unwrap() {
513                fio::FileRequest::ReadAt { count, offset, responder } => {
514                    assert_eq!(count, 1);
515                    assert_eq!(offset, 0);
516                    responder.send(Ok(&[1])).unwrap();
517                }
518                req => panic!("unhandled request {:?}", req),
519            }
520        };
521
522        let (poll_read, ()) = future::join(poll_read_at, handle_file_request).await;
523
524        // poll_read_at should read 1 byte, even though the first poll_read_at request was for zero
525        // bytes and returned Pending.
526        assert_eq!(poll_read.unwrap(), 1);
527        assert_eq!(&buf[..], &[1]);
528    }
529
530    #[fasync::run_singlethreaded(test)]
531    async fn different_poll_read_at_and_file_sizes() {
532        for first_poll_read_len in 0..5 {
533            for file_size in 0..5 {
534                for second_poll_offset in 0..file_size {
535                    for second_poll_read_len in 0..5 {
536                        let (proxy, mut stream) =
537                            endpoints::create_proxy_and_stream::<fio::FileMarker>();
538
539                        let mut reader = AsyncFile::from_proxy(proxy);
540
541                        // poll_read_at causes the AsyncFile to create a File.ReadAt request.
542                        let () = poll_fn(|cx| {
543                            let mut buf = vec![0u8; first_poll_read_len];
544                            assert_matches!(
545                                Pin::new(&mut reader).poll_read_at(cx, 0, &mut buf),
546                                Poll::Pending
547                            );
548                            Poll::Ready(())
549                        })
550                        .await;
551
552                        // Respond to the File.ReadAt request with at most as many bytes as the
553                        // poll_read_at requested.
554                        match stream.next().await.unwrap().unwrap() {
555                            fio::FileRequest::ReadAt { count, offset, responder } => {
556                                assert_eq!(count, u64::try_from(first_poll_read_len).unwrap());
557                                assert_eq!(offset, 0);
558                                let resp = vec![7u8; min(file_size, first_poll_read_len)];
559                                responder.send(Ok(&resp)).unwrap();
560                            }
561                            req => panic!("unhandled request {:?}", req),
562                        }
563
564                        // Call poll_read_at until it returns Ready. If the first poll_read_at was
565                        // for zero bytes and this poll_read_at is not or this poll_read_at offset
566                        // is outside the buffer, the AsyncFile will make another File.ReadAt
567                        // request.
568                        let mut buf = vec![0u8; second_poll_read_len];
569                        let poll_read_at = reader.read_at(second_poll_offset as u64, &mut buf);
570
571                        let second_request = first_poll_read_len == 0 && second_poll_read_len != 0
572                            || second_poll_offset != 0 && second_poll_offset >= first_poll_read_len;
573                        let handle_conditional_file_request = async {
574                            if second_request {
575                                match stream.next().await.unwrap().unwrap() {
576                                    fio::FileRequest::ReadAt { count, offset, responder } => {
577                                        assert_eq!(
578                                            count,
579                                            u64::try_from(second_poll_read_len).unwrap()
580                                        );
581                                        assert_eq!(
582                                            offset,
583                                            u64::try_from(second_poll_offset).unwrap()
584                                        );
585                                        let resp = vec![
586                                            7u8;
587                                            min(
588                                                file_size - second_poll_offset,
589                                                second_poll_read_len
590                                            )
591                                        ];
592                                        responder.send(Ok(&resp)).unwrap();
593                                    }
594                                    req => panic!("unhandled request {:?}", req),
595                                }
596                            }
597                        };
598
599                        let (read_res, ()) =
600                            future::join(poll_read_at, handle_conditional_file_request).await;
601
602                        let expected_len = if second_request {
603                            min(file_size - second_poll_offset, second_poll_read_len)
604                        } else {
605                            min(
606                                min(file_size, first_poll_read_len) - second_poll_offset,
607                                second_poll_read_len,
608                            )
609                        };
610                        let expected = vec![7u8; expected_len];
611                        assert_eq!(read_res.unwrap(), expected_len);
612                        assert_eq!(&buf[..expected_len], &expected[..]);
613                    }
614                }
615            }
616        }
617    }
618
619    async fn get_size_file_with_contents(contents: &[u8]) {
620        let dir = TempDir::new().unwrap();
621        let path = dir.path().join("get_size_file_with_contents").to_str().unwrap().to_owned();
622        let () = file::write_in_namespace(&path, contents).await.unwrap();
623        let file = file::open_in_namespace(&path, fio::PERM_READABLE).unwrap();
624
625        let mut reader = AsyncFile::from_proxy(file);
626
627        assert_eq!(reader.get_size().await.unwrap(), contents.len() as u64);
628    }
629
630    #[fasync::run_singlethreaded(test)]
631    async fn get_size_empty() {
632        get_size_file_with_contents(&[]).await;
633    }
634
635    #[fasync::run_singlethreaded(test)]
636    async fn get_size_large() {
637        let expected_contents = vec![7u8; (fio::MAX_BUF * 3).try_into().unwrap()];
638        get_size_file_with_contents(&expected_contents[..]).await;
639    }
640
641    #[fasync::run_singlethreaded(test)]
642    async fn get_size_changing_size() {
643        let (mut file, path) = NamedTempFile::new().unwrap().into_parts();
644        let proxy = file::open_in_namespace(path.to_str().unwrap(), fio::PERM_READABLE).unwrap();
645
646        let mut reader = AsyncFile::from_proxy(proxy);
647
648        assert_eq!(reader.get_size().await.unwrap(), 0);
649        file.write_all(&[1; 3][..]).unwrap();
650        assert_eq!(reader.get_size().await.unwrap(), 3);
651        file.write_all(&[2; 5][..]).unwrap();
652        assert_eq!(reader.get_size().await.unwrap(), 8);
653    }
654
655    #[fasync::run_singlethreaded(test)]
656    async fn adapter_for_cursor() {
657        let data = (0..1000).map(|i| (i % 256) as u8).collect::<Vec<_>>();
658        let cursor = futures::io::Cursor::new(data.clone());
659        let mut adapter = Adapter::new(cursor);
660
661        assert_eq!(adapter.get_size().await.unwrap(), 1000);
662
663        let mut buffer = vec![];
664        adapter.read_to_end(&mut buffer).await.unwrap();
665        assert_eq!(buffer, data);
666
667        let mut buffer = vec![0; 100];
668        adapter.read_at_exact(333, &mut buffer).await.unwrap();
669        assert_eq!(buffer, &data[333..433]);
670    }
671}