Skip to main content

fuchsia_fs/file/
async_read_at.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl::client::QueryResponseFut;
6use flex_client::Dialect;
7use flex_fuchsia_io as fio;
8use futures::FutureExt as _;
9use futures::future::Future;
10use futures::io::{AsyncRead, AsyncSeek, SeekFrom};
11use futures::lock::Mutex;
12use std::cmp::min;
13use std::convert::TryInto as _;
14use std::io;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18use zx_status_ext::StatusExt;
19
20/// Trait for reading at a given offset asynchronously.
21/// This is basically `futures::io::AsyncRead` with an extra offset.
22pub trait AsyncReadAt {
23    /// Attempt to read at most `buf.len()` bytes starting at `offset` into `buf`. On success
24    /// returns the number of bytes read.
25    /// Contents of `buf` are only altered on success.
26    /// Reads of more than zero but fewer than `buf.len()` bytes do NOT indicate EOF.
27    /// Reads of zero bytes only occur if `buf.len() == 0` or EOF.
28    fn poll_read_at(
29        self: Pin<&mut Self>,
30        cx: &mut Context<'_>,
31        offset: u64,
32        buf: &mut [u8],
33    ) -> Poll<io::Result<usize>>;
34}
35
36/// Trait for getting the size of the file asynchronously.
37pub trait AsyncGetSize {
38    /// Attempt to get the size of the file, on success returns the file size.
39    fn poll_get_size(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>;
40}
41
42impl<T: AsyncReadAt + Unpin> AsyncReadAt for Arc<Mutex<T>> {
43    fn poll_read_at(
44        self: Pin<&mut Self>,
45        cx: &mut Context<'_>,
46        offset: u64,
47        buf: &mut [u8],
48    ) -> Poll<Result<usize, std::io::Error>> {
49        let mut guard = futures::ready!(self.lock().poll_unpin(cx));
50        Pin::new(&mut *guard).poll_read_at(cx, offset, buf)
51    }
52}
53
54impl<T: AsyncGetSize + Unpin> AsyncGetSize for Arc<Mutex<T>> {
55    fn poll_get_size(
56        self: Pin<&mut Self>,
57        cx: &mut Context<'_>,
58    ) -> Poll<Result<u64, std::io::Error>> {
59        let mut guard = futures::ready!(self.lock().poll_unpin(cx));
60        Pin::new(&mut *guard).poll_get_size(cx)
61    }
62}
63
64/// An extension trait which adds utility methods to AsyncGetSize.
65pub trait AsyncGetSizeExt: AsyncGetSize {
66    /// Returns a future that will return the file size on success.
67    fn get_size<'a>(&'a mut self) -> GetSize<'a, Self>
68    where
69        Self: Unpin,
70    {
71        GetSize { size_getter: self }
72    }
73}
74
75impl<T: AsyncGetSize + ?Sized> AsyncGetSizeExt for T {}
76
77/// Future for the [`get_size`](AsyncGetSizeExt::get_size) method.
78#[derive(Debug)]
79#[must_use = "futures do nothing unless you `.await` or poll them"]
80pub struct GetSize<'a, R: ?Sized> {
81    size_getter: &'a mut R,
82}
83
84impl<R: ?Sized + Unpin> Unpin for GetSize<'_, R> {}
85
86impl<R: AsyncGetSize + ?Sized + Unpin> Future for GetSize<'_, R> {
87    type Output = io::Result<u64>;
88
89    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
90        let this = &mut *self;
91        Pin::new(&mut *this.size_getter).poll_get_size(cx)
92    }
93}
94
95/// Wraps a `fidl_fuchsia_io::FileProxy` and implements `AsyncReadAt` and `AsyncGetSize`, which
96/// allows one to perform asynchronous file reads that don't block the current thread while waiting
97/// for data.
98/// Unlike `AsyncReader`, this struct does not require exclusive ownership, because `read_at` does
99/// not rely on the file offset state in the connection. This is useful if one wants to efficiently
100/// read different parts of the file at the same time.
101#[derive(Debug)]
102pub struct AsyncFile {
103    file: fio::FileProxy,
104    read_at_state: ReadAtState,
105    get_attributes_fut: Option<
106        QueryResponseFut<
107            Result<(fio::MutableNodeAttributes, fio::ImmutableNodeAttributes), i32>,
108            Dialect,
109        >,
110    >,
111}
112
113#[derive(Debug)]
114enum ReadAtState {
115    Empty,
116    Forwarding {
117        fut: QueryResponseFut<Result<Vec<u8>, i32>, Dialect>,
118        file_offset: u64,
119        zero_byte_request: bool,
120    },
121    Bytes {
122        bytes: Vec<u8>,
123        file_offset: u64,
124    },
125}
126
127impl AsyncFile {
128    pub fn from_proxy(file: fio::FileProxy) -> Self {
129        Self { file, read_at_state: ReadAtState::Empty, get_attributes_fut: None }
130    }
131}
132
133impl AsyncReadAt for AsyncFile {
134    fn poll_read_at(
135        mut self: Pin<&mut Self>,
136        cx: &mut Context<'_>,
137        offset: u64,
138        buf: &mut [u8],
139    ) -> Poll<std::io::Result<usize>> {
140        loop {
141            match self.read_at_state {
142                ReadAtState::Empty => {
143                    let len = if let Ok(len) = buf.len().try_into() {
144                        min(len, fio::MAX_BUF)
145                    } else {
146                        fio::MAX_BUF
147                    };
148                    self.read_at_state = ReadAtState::Forwarding {
149                        fut: self.file.read_at(len, offset),
150                        file_offset: offset,
151                        zero_byte_request: len == 0,
152                    };
153                }
154                ReadAtState::Forwarding { ref mut fut, file_offset, zero_byte_request } => {
155                    match futures::ready!(Pin::new(fut).poll(cx)) {
156                        Ok(result) => {
157                            match result {
158                                Err(s) => {
159                                    self.read_at_state = ReadAtState::Empty;
160                                    return Poll::Ready(Err(
161                                        zx_status::Status::from_raw(s).into_io_error()
162                                    ));
163                                }
164                                Ok(bytes) => {
165                                    // If the File.ReadAt request was for zero bytes, but the current
166                                    // poll_read_at is not (because the File.ReadAt request was made by an
167                                    // earlier call to poll_read_at with a zero length buffer) then we
168                                    // should not advance to ReadAtState::Bytes because that would return
169                                    // Ready(Ok(0)), which would indicate EOF to the client.
170                                    // This handling is done here instead of short-circuiting at the
171                                    // beginning of the function so that zero-length poll_read_ats still
172                                    // trigger the validation performed by File.ReadAt.
173                                    if zero_byte_request && buf.len() != 0 {
174                                        self.read_at_state = ReadAtState::Empty;
175                                    } else {
176                                        self.read_at_state =
177                                            ReadAtState::Bytes { bytes, file_offset };
178                                    }
179                                }
180                            }
181                        }
182                        Err(e) => {
183                            self.read_at_state = ReadAtState::Empty;
184                            return Poll::Ready(Err(std::io::Error::other(e)));
185                        }
186                    }
187                }
188                ReadAtState::Bytes { ref bytes, file_offset } => {
189                    if offset < file_offset {
190                        self.read_at_state = ReadAtState::Empty;
191                        continue;
192                    }
193                    let bytes_offset = match (offset - file_offset).try_into() {
194                        Ok(offset) => offset,
195                        Err(_) => {
196                            self.read_at_state = ReadAtState::Empty;
197                            continue;
198                        }
199                    };
200                    if bytes_offset != 0 && bytes_offset >= bytes.len() {
201                        self.read_at_state = ReadAtState::Empty;
202                        continue;
203                    }
204                    let n = min(buf.len(), bytes.len() - bytes_offset);
205                    let () = buf[..n].copy_from_slice(&bytes[bytes_offset..bytes_offset + n]);
206                    return Poll::Ready(Ok(n));
207                }
208            }
209        }
210    }
211}
212
213impl AsyncGetSize for AsyncFile {
214    fn poll_get_size(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
215        if self.get_attributes_fut.is_none() {
216            self.get_attributes_fut =
217                Some(self.file.get_attributes(fio::NodeAttributesQuery::CONTENT_SIZE));
218        }
219        let fut = self.get_attributes_fut.as_mut().unwrap();
220        let get_attributes_fut_result = futures::ready!(fut.poll_unpin(cx));
221        self.get_attributes_fut = None;
222        match get_attributes_fut_result {
223            Ok(get_attributes_response) => match get_attributes_response {
224                Ok((_mutable_attr, immutable_attr)) => {
225                    if let Some(content_size) = immutable_attr.content_size {
226                        return Poll::Ready(Ok(content_size));
227                    }
228                    return Poll::Ready(Err(zx_status::Status::NOT_SUPPORTED.into_io_error()));
229                }
230                Err(status) => {
231                    return Poll::Ready(Err(zx_status::Status::from_raw(status).into_io_error()));
232                }
233            },
234            Err(e) => {
235                return Poll::Ready(Err(std::io::Error::other(e)));
236            }
237        }
238    }
239}
240
241/// Adapter to implement AsyncReadAt + AsyncGetSize for AsyncRead + AsyncSeek.
242#[derive(Debug)]
243pub struct Adapter<T> {
244    inner: T,
245}
246
247impl<T> Adapter<T> {
248    pub fn new(inner: T) -> Adapter<T> {
249        Self { inner }
250    }
251}
252
253impl<T: AsyncRead + AsyncSeek + Unpin> AsyncReadAt for Adapter<T> {
254    fn poll_read_at(
255        mut self: Pin<&mut Self>,
256        cx: &mut Context<'_>,
257        offset: u64,
258        buf: &mut [u8],
259    ) -> Poll<std::io::Result<usize>> {
260        futures::ready!(Pin::new(&mut self.inner).poll_seek(cx, SeekFrom::Start(offset)))?;
261        Pin::new(&mut self.inner).poll_read(cx, buf)
262    }
263}
264
265impl<T: AsyncSeek + Unpin> AsyncGetSize for Adapter<T> {
266    fn poll_get_size(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
267        Pin::new(&mut self.inner).poll_seek(cx, SeekFrom::End(0))
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274    use crate::file::{self, AsyncReadAtExt};
275    use assert_matches::assert_matches;
276    use fidl::endpoints;
277    use fuchsia_async as fasync;
278    use futures::future::{self, poll_fn};
279    use futures::{StreamExt as _, TryStreamExt as _};
280    use std::convert::TryFrom as _;
281    use std::io::Write;
282    use tempfile::{NamedTempFile, TempDir};
283
284    async fn poll_read_at_with_specific_buf_size(
285        poll_read_size: u64,
286        expected_file_read_size: u64,
287    ) {
288        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
289
290        let mut reader = AsyncFile::from_proxy(proxy);
291
292        let () = poll_fn(|cx| {
293            let mut buf = vec![0u8; poll_read_size.try_into().unwrap()];
294            assert_matches!(
295                Pin::new(&mut reader).poll_read_at(cx, 0, buf.as_mut_slice()),
296                Poll::Pending
297            );
298            Poll::Ready(())
299        })
300        .await;
301
302        match stream.next().await.unwrap().unwrap() {
303            fio::FileRequest::ReadAt { count, .. } => {
304                assert_eq!(count, expected_file_read_size);
305            }
306            req => panic!("unhandled request {:?}", req),
307        }
308    }
309
310    #[fasync::run_singlethreaded(test)]
311    async fn poll_read_at_empty_buf() {
312        poll_read_at_with_specific_buf_size(0, 0).await;
313    }
314
315    #[fasync::run_singlethreaded(test)]
316    async fn poll_read_at_caps_buf_size() {
317        poll_read_at_with_specific_buf_size(fio::MAX_BUF * 2, fio::MAX_BUF).await;
318    }
319
320    #[fasync::run_singlethreaded(test)]
321    async fn poll_read_at_pending_saves_future() {
322        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
323
324        let mut reader = AsyncFile::from_proxy(proxy);
325
326        // This poll_read_at call will create a File.ReadAt future and poll it. The poll of the
327        // File.ReadAt future will return Pending because nothing is handling the FileRequestStream
328        // yet. The reader should save this File.ReadAt future for handling subsequent poll_read_at
329        // calls.
330        let () = poll_fn(|cx| {
331            assert_matches!(
332                Pin::new(&mut reader).poll_read_at(cx, 2, &mut [0u8; 1]),
333                Poll::Pending
334            );
335            Poll::Ready(())
336        })
337        .await;
338
339        // Call poll_read_at until we get a byte out. This byte should be from the first and only
340        // File.ReadAt request.
341        let poll_read_at = async move {
342            let mut buf = [0u8; 1];
343            assert_eq!(reader.read_at(2, &mut buf).await.unwrap(), buf.len());
344            assert_eq!(&buf, &[1]);
345        };
346
347        let mut file_read_requests = 0u8;
348        let handle_file_stream = async {
349            while let Some(req) = stream.try_next().await.unwrap() {
350                file_read_requests += 1;
351                match req {
352                    fio::FileRequest::ReadAt { count, offset, responder } => {
353                        assert_eq!(count, 1);
354                        assert_eq!(offset, 2);
355                        responder.send(Ok(&[file_read_requests])).unwrap();
356                    }
357                    req => panic!("unhandled request {:?}", req),
358                }
359            }
360        };
361
362        let ((), ()) = future::join(poll_read_at, handle_file_stream).await;
363        assert_eq!(file_read_requests, 1);
364    }
365
366    #[fasync::run_singlethreaded(test)]
367    async fn poll_read_at_with_smaller_buf_after_pending() {
368        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
369
370        let mut reader = AsyncFile::from_proxy(proxy);
371
372        // Call poll_read_at with a buf of length 3. This is the first poll_read_at call, so the
373        // reader will create a File.ReadAt future for 3 bytes. poll_read_at will return Pending
374        // because nothing is handling the FileRequestStream yet.
375        let () = poll_fn(|cx| {
376            assert_matches!(
377                Pin::new(&mut reader).poll_read_at(cx, 0, &mut [0u8; 3]),
378                Poll::Pending
379            );
380            Poll::Ready(())
381        })
382        .await;
383
384        // Respond to the three byte File.ReadAt request.
385        let () = async {
386            match stream.next().await.unwrap().unwrap() {
387                fio::FileRequest::ReadAt { count, offset, responder } => {
388                    assert_eq!(count, 3);
389                    assert_eq!(offset, 0);
390                    responder.send(Ok(b"012")).unwrap();
391                }
392                req => panic!("unhandled request {:?}", req),
393            }
394        }
395        .await;
396
397        // Call poll_read_at with a buf of length 1. This should resolve the previously created 3
398        // byte File.ReadAt future and return the first byte from it while saving the remaining two
399        // bytes.
400        let mut buf = [0u8; 1];
401        assert_eq!(reader.read_at(0, &mut buf).await.unwrap(), buf.len());
402        assert_eq!(&buf, b"0");
403
404        // Call poll_read_at with a buf of len 1. This should return the first saved byte, which
405        // should be the second byte from the original File.ReadAt request.
406        let mut buf = [0u8; 1];
407        assert_eq!(reader.read_at(1, &mut buf).await.unwrap(), buf.len());
408        assert_eq!(&buf, b"1");
409
410        // Call poll_read_at with a buf of len 2. There should only be one remaining saved byte
411        // from the original File.ReadAt request, so poll_read_at should only return one byte.
412        let mut buf = [0u8; 2];
413        assert_eq!(reader.read_at(2, &mut buf).await.unwrap(), 1);
414        assert_eq!(&buf[..1], b"2");
415
416        // There should be no saved bytes remaining, so a poll_read_at of four bytes should cause a
417        // new File.ReadAt request.
418        let mut buf = [0u8; 4];
419        let poll_read_at = reader.read_at(3, &mut buf);
420
421        let handle_second_file_request = async {
422            match stream.next().await.unwrap().unwrap() {
423                fio::FileRequest::ReadAt { count, offset, responder } => {
424                    assert_eq!(count, 4);
425                    assert_eq!(offset, 3);
426                    responder.send(Ok(b"3456")).unwrap();
427                }
428                req => panic!("unhandled request {:?}", req),
429            }
430        };
431
432        let (read_res, ()) = future::join(poll_read_at, handle_second_file_request).await;
433        assert_eq!(read_res.unwrap(), 4);
434        assert_eq!(&buf, b"3456");
435    }
436
437    #[fasync::run_singlethreaded(test)]
438    async fn transition_to_empty_on_fidl_error() {
439        let (proxy, _) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
440
441        let mut reader = AsyncFile::from_proxy(proxy);
442
443        // poll_read_at will fail because the channel is closed because the server end was dropped.
444        let () = poll_fn(|cx| {
445            assert_matches!(
446                Pin::new(&mut reader).poll_read_at(cx, 0, &mut [0u8; 1]),
447                Poll::Ready(Err(_))
448            );
449            Poll::Ready(())
450        })
451        .await;
452
453        // This test is accessing internal state because the only fidl error that is easy to inject
454        // is ZX_ERR_PEER_CLOSED (by closing the channel). Once the channel is closed, all new
455        // futures created by the AsyncFile will fail, but, if poll'ed, the old future would also
456        // continue to fail (not panic) because it is Fused.
457        assert_matches!(reader.read_at_state, ReadAtState::Empty);
458    }
459
460    #[fasync::run_singlethreaded(test)]
461    async fn recover_from_file_read_error() {
462        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
463
464        let mut reader = AsyncFile::from_proxy(proxy);
465
466        // Call poll_read_at until failure.
467        let mut buf = [0u8; 1];
468        let poll_read_at = reader.read_at(0, &mut buf);
469
470        let failing_file_response = async {
471            match stream.next().await.unwrap().unwrap() {
472                fio::FileRequest::ReadAt { count, offset, responder } => {
473                    assert_eq!(count, 1);
474                    assert_eq!(offset, 0);
475                    responder.send(Err(zx_status::Status::NO_MEMORY.into_raw())).unwrap();
476                }
477                req => panic!("unhandled request {:?}", req),
478            }
479        };
480
481        let (read_res, ()) = future::join(poll_read_at, failing_file_response).await;
482        assert_matches!(read_res, Err(_));
483
484        // Calling poll_read_at again should create a new File.ReadAt request instead of reusing
485        // the old future.
486        let mut buf = [0u8; 1];
487        let poll_read_at = reader.read_at(0, &mut buf);
488
489        let succeeding_file_response = async {
490            match stream.next().await.unwrap().unwrap() {
491                fio::FileRequest::ReadAt { count, offset, responder } => {
492                    assert_eq!(count, 1);
493                    assert_eq!(offset, 0);
494                    responder.send(Ok(b"0")).unwrap();
495                }
496                req => panic!("unhandled request {:?}", req),
497            }
498        };
499
500        let (read_res, ()) = future::join(poll_read_at, succeeding_file_response).await;
501        assert_eq!(read_res.unwrap(), 1);
502        assert_eq!(&buf, b"0");
503    }
504
505    #[fasync::run_singlethreaded(test)]
506    async fn poll_read_at_zero_then_read_nonzero() {
507        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
508
509        let mut reader = AsyncFile::from_proxy(proxy);
510
511        // Call poll_read_at with a zero-length buffer.
512        let () = poll_fn(|cx| {
513            assert_matches!(Pin::new(&mut reader).poll_read_at(cx, 0, &mut []), Poll::Pending);
514            Poll::Ready(())
515        })
516        .await;
517
518        // Handle the zero-length File.ReadAt request.
519        match stream.next().await.unwrap().unwrap() {
520            fio::FileRequest::ReadAt { count, offset, responder } => {
521                assert_eq!(count, 0);
522                assert_eq!(offset, 0);
523                responder.send(Ok(&[])).unwrap();
524            }
525            req => panic!("unhandled request {:?}", req),
526        }
527
528        // Call poll_read_at with a length 1 buffer until Ready is returned;
529        let mut buf = vec![0u8; 1];
530        let poll_read_at = reader.read_at(0, &mut buf);
531
532        // The AsyncFile will discard the File.ReadAt response from the first poll_read, and create
533        // another request, this handles that second request. The AsyncFile discards the first
534        // response because the first poll_read_at was for zero bytes, but the current poll_read_at
535        // is not.
536        let handle_file_request = async {
537            match stream.next().await.unwrap().unwrap() {
538                fio::FileRequest::ReadAt { count, offset, responder } => {
539                    assert_eq!(count, 1);
540                    assert_eq!(offset, 0);
541                    responder.send(Ok(&[1])).unwrap();
542                }
543                req => panic!("unhandled request {:?}", req),
544            }
545        };
546
547        let (poll_read, ()) = future::join(poll_read_at, handle_file_request).await;
548
549        // poll_read_at should read 1 byte, even though the first poll_read_at request was for zero
550        // bytes and returned Pending.
551        assert_eq!(poll_read.unwrap(), 1);
552        assert_eq!(&buf[..], &[1]);
553    }
554
555    #[fasync::run_singlethreaded(test)]
556    async fn different_poll_read_at_and_file_sizes() {
557        for first_poll_read_len in 0..5 {
558            for file_size in 0..5 {
559                for second_poll_offset in 0..file_size {
560                    for second_poll_read_len in 0..5 {
561                        let (proxy, mut stream) =
562                            endpoints::create_proxy_and_stream::<fio::FileMarker>();
563
564                        let mut reader = AsyncFile::from_proxy(proxy);
565
566                        // poll_read_at causes the AsyncFile to create a File.ReadAt request.
567                        let () = poll_fn(|cx| {
568                            let mut buf = vec![0u8; first_poll_read_len];
569                            assert_matches!(
570                                Pin::new(&mut reader).poll_read_at(cx, 0, &mut buf),
571                                Poll::Pending
572                            );
573                            Poll::Ready(())
574                        })
575                        .await;
576
577                        // Respond to the File.ReadAt request with at most as many bytes as the
578                        // poll_read_at requested.
579                        match stream.next().await.unwrap().unwrap() {
580                            fio::FileRequest::ReadAt { count, offset, responder } => {
581                                assert_eq!(count, u64::try_from(first_poll_read_len).unwrap());
582                                assert_eq!(offset, 0);
583                                let resp = vec![7u8; min(file_size, first_poll_read_len)];
584                                responder.send(Ok(&resp)).unwrap();
585                            }
586                            req => panic!("unhandled request {:?}", req),
587                        }
588
589                        // Call poll_read_at until it returns Ready. If the first poll_read_at was
590                        // for zero bytes and this poll_read_at is not or this poll_read_at offset
591                        // is outside the buffer, the AsyncFile will make another File.ReadAt
592                        // request.
593                        let mut buf = vec![0u8; second_poll_read_len];
594                        let poll_read_at = reader.read_at(second_poll_offset as u64, &mut buf);
595
596                        let second_request = first_poll_read_len == 0 && second_poll_read_len != 0
597                            || second_poll_offset != 0 && second_poll_offset >= first_poll_read_len;
598                        let handle_conditional_file_request = async {
599                            if second_request {
600                                match stream.next().await.unwrap().unwrap() {
601                                    fio::FileRequest::ReadAt { count, offset, responder } => {
602                                        assert_eq!(
603                                            count,
604                                            u64::try_from(second_poll_read_len).unwrap()
605                                        );
606                                        assert_eq!(
607                                            offset,
608                                            u64::try_from(second_poll_offset).unwrap()
609                                        );
610                                        let resp = vec![
611                                            7u8;
612                                            min(
613                                                file_size - second_poll_offset,
614                                                second_poll_read_len
615                                            )
616                                        ];
617                                        responder.send(Ok(&resp)).unwrap();
618                                    }
619                                    req => panic!("unhandled request {:?}", req),
620                                }
621                            }
622                        };
623
624                        let (read_res, ()) =
625                            future::join(poll_read_at, handle_conditional_file_request).await;
626
627                        let expected_len = if second_request {
628                            min(file_size - second_poll_offset, second_poll_read_len)
629                        } else {
630                            min(
631                                min(file_size, first_poll_read_len) - second_poll_offset,
632                                second_poll_read_len,
633                            )
634                        };
635                        let expected = vec![7u8; expected_len];
636                        assert_eq!(read_res.unwrap(), expected_len);
637                        assert_eq!(&buf[..expected_len], &expected[..]);
638                    }
639                }
640            }
641        }
642    }
643
644    async fn get_size_file_with_contents(contents: &[u8]) {
645        let dir = TempDir::new().unwrap();
646        let path = dir.path().join("get_size_file_with_contents").to_str().unwrap().to_owned();
647        let () = file::write_in_namespace(&path, contents).await.unwrap();
648        let file = file::open_in_namespace(&path, fio::PERM_READABLE).unwrap();
649
650        let mut reader = AsyncFile::from_proxy(file);
651
652        assert_eq!(reader.get_size().await.unwrap(), contents.len() as u64);
653    }
654
655    #[fasync::run_singlethreaded(test)]
656    async fn get_size_empty() {
657        get_size_file_with_contents(&[]).await;
658    }
659
660    #[fasync::run_singlethreaded(test)]
661    async fn get_size_large() {
662        let expected_contents = vec![7u8; (fio::MAX_BUF * 3).try_into().unwrap()];
663        get_size_file_with_contents(&expected_contents[..]).await;
664    }
665
666    #[fasync::run_singlethreaded(test)]
667    async fn get_size_changing_size() {
668        let (mut file, path) = NamedTempFile::new().unwrap().into_parts();
669        let proxy = file::open_in_namespace(path.to_str().unwrap(), fio::PERM_READABLE).unwrap();
670
671        let mut reader = AsyncFile::from_proxy(proxy);
672
673        assert_eq!(reader.get_size().await.unwrap(), 0);
674        file.write_all(&[1; 3][..]).unwrap();
675        assert_eq!(reader.get_size().await.unwrap(), 3);
676        file.write_all(&[2; 5][..]).unwrap();
677        assert_eq!(reader.get_size().await.unwrap(), 8);
678    }
679
680    #[fasync::run_singlethreaded(test)]
681    async fn adapter_for_cursor() {
682        let data = (0..1000).map(|i| (i % 256) as u8).collect::<Vec<_>>();
683        let cursor = futures::io::Cursor::new(data.clone());
684        let mut adapter = Adapter::new(cursor);
685
686        assert_eq!(adapter.get_size().await.unwrap(), 1000);
687
688        let mut buffer = vec![];
689        adapter.read_to_end(&mut buffer).await.unwrap();
690        assert_eq!(buffer, data);
691
692        let mut buffer = vec![0; 100];
693        adapter.read_at_exact(333, &mut buffer).await.unwrap();
694        assert_eq!(buffer, &data[333..433]);
695    }
696}