fuchsia_fs/file/
async_read_at.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl::client::QueryResponseFut;
6use flex_client::Dialect;
7use flex_fuchsia_io as fio;
8use futures::FutureExt as _;
9use futures::future::Future;
10use futures::io::{AsyncRead, AsyncSeek, SeekFrom};
11use futures::lock::Mutex;
12use std::cmp::min;
13use std::convert::TryInto as _;
14use std::io;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19/// Trait for reading at a given offset asynchronously.
20/// This is basically `futures::io::AsyncRead` with an extra offset.
21pub trait AsyncReadAt {
22    /// Attempt to read at most `buf.len()` bytes starting at `offset` into `buf`. On success
23    /// returns the number of bytes read.
24    /// Contents of `buf` are only altered on success.
25    /// Reads of more than zero but fewer than `buf.len()` bytes do NOT indicate EOF.
26    /// Reads of zero bytes only occur if `buf.len() == 0` or EOF.
27    fn poll_read_at(
28        self: Pin<&mut Self>,
29        cx: &mut Context<'_>,
30        offset: u64,
31        buf: &mut [u8],
32    ) -> Poll<io::Result<usize>>;
33}
34
35/// Trait for getting the size of the file asynchronously.
36pub trait AsyncGetSize {
37    /// Attempt to get the size of the file, on success returns the file size.
38    fn poll_get_size(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>;
39}
40
41impl<T: AsyncReadAt + Unpin> AsyncReadAt for Arc<Mutex<T>> {
42    fn poll_read_at(
43        self: Pin<&mut Self>,
44        cx: &mut Context<'_>,
45        offset: u64,
46        buf: &mut [u8],
47    ) -> Poll<Result<usize, std::io::Error>> {
48        let mut guard = futures::ready!(self.lock().poll_unpin(cx));
49        Pin::new(&mut *guard).poll_read_at(cx, offset, buf)
50    }
51}
52
53impl<T: AsyncGetSize + Unpin> AsyncGetSize for Arc<Mutex<T>> {
54    fn poll_get_size(
55        self: Pin<&mut Self>,
56        cx: &mut Context<'_>,
57    ) -> Poll<Result<u64, std::io::Error>> {
58        let mut guard = futures::ready!(self.lock().poll_unpin(cx));
59        Pin::new(&mut *guard).poll_get_size(cx)
60    }
61}
62
63/// An extension trait which adds utility methods to AsyncGetSize.
64pub trait AsyncGetSizeExt: AsyncGetSize {
65    /// Returns a future that will return the file size on success.
66    fn get_size<'a>(&'a mut self) -> GetSize<'a, Self>
67    where
68        Self: Unpin,
69    {
70        GetSize { size_getter: self }
71    }
72}
73
74impl<T: AsyncGetSize + ?Sized> AsyncGetSizeExt for T {}
75
76/// Future for the [`get_size`](AsyncGetSizeExt::get_size) method.
77#[derive(Debug)]
78#[must_use = "futures do nothing unless you `.await` or poll them"]
79pub struct GetSize<'a, R: ?Sized> {
80    size_getter: &'a mut R,
81}
82
83impl<R: ?Sized + Unpin> Unpin for GetSize<'_, R> {}
84
85impl<R: AsyncGetSize + ?Sized + Unpin> Future for GetSize<'_, R> {
86    type Output = io::Result<u64>;
87
88    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
89        let this = &mut *self;
90        Pin::new(&mut *this.size_getter).poll_get_size(cx)
91    }
92}
93
94/// Wraps a `fidl_fuchsia_io::FileProxy` and implements `AsyncReadAt` and `AsyncGetSize`, which
95/// allows one to perform asynchronous file reads that don't block the current thread while waiting
96/// for data.
97/// Unlike `AsyncReader`, this struct does not require exclusive ownership, because `read_at` does
98/// not rely on the file offset state in the connection. This is useful if one wants to efficiently
99/// read different parts of the file at the same time.
100#[derive(Debug)]
101pub struct AsyncFile {
102    file: fio::FileProxy,
103    read_at_state: ReadAtState,
104    get_attributes_fut: Option<
105        QueryResponseFut<
106            Result<(fio::MutableNodeAttributes, fio::ImmutableNodeAttributes), i32>,
107            Dialect,
108        >,
109    >,
110}
111
112#[derive(Debug)]
113enum ReadAtState {
114    Empty,
115    Forwarding {
116        fut: QueryResponseFut<Result<Vec<u8>, i32>, Dialect>,
117        file_offset: u64,
118        zero_byte_request: bool,
119    },
120    Bytes {
121        bytes: Vec<u8>,
122        file_offset: u64,
123    },
124}
125
126impl AsyncFile {
127    pub fn from_proxy(file: fio::FileProxy) -> Self {
128        Self { file, read_at_state: ReadAtState::Empty, get_attributes_fut: None }
129    }
130}
131
132impl AsyncReadAt for AsyncFile {
133    fn poll_read_at(
134        mut self: Pin<&mut Self>,
135        cx: &mut Context<'_>,
136        offset: u64,
137        buf: &mut [u8],
138    ) -> Poll<std::io::Result<usize>> {
139        loop {
140            match self.read_at_state {
141                ReadAtState::Empty => {
142                    let len = if let Ok(len) = buf.len().try_into() {
143                        min(len, fio::MAX_BUF)
144                    } else {
145                        fio::MAX_BUF
146                    };
147                    self.read_at_state = ReadAtState::Forwarding {
148                        fut: self.file.read_at(len, offset),
149                        file_offset: offset,
150                        zero_byte_request: len == 0,
151                    };
152                }
153                ReadAtState::Forwarding { ref mut fut, file_offset, zero_byte_request } => {
154                    match futures::ready!(Pin::new(fut).poll(cx)) {
155                        Ok(result) => {
156                            match result {
157                                Err(s) => {
158                                    self.read_at_state = ReadAtState::Empty;
159                                    return Poll::Ready(Err(
160                                        zx_status::Status::from_raw(s).into_io_error()
161                                    ));
162                                }
163                                Ok(bytes) => {
164                                    // If the File.ReadAt request was for zero bytes, but the current
165                                    // poll_read_at is not (because the File.ReadAt request was made by an
166                                    // earlier call to poll_read_at with a zero length buffer) then we
167                                    // should not advance to ReadAtState::Bytes because that would return
168                                    // Ready(Ok(0)), which would indicate EOF to the client.
169                                    // This handling is done here instead of short-circuiting at the
170                                    // beginning of the function so that zero-length poll_read_ats still
171                                    // trigger the validation performed by File.ReadAt.
172                                    if zero_byte_request && buf.len() != 0 {
173                                        self.read_at_state = ReadAtState::Empty;
174                                    } else {
175                                        self.read_at_state =
176                                            ReadAtState::Bytes { bytes, file_offset };
177                                    }
178                                }
179                            }
180                        }
181                        Err(e) => {
182                            self.read_at_state = ReadAtState::Empty;
183                            return Poll::Ready(Err(std::io::Error::other(e)));
184                        }
185                    }
186                }
187                ReadAtState::Bytes { ref bytes, file_offset } => {
188                    if offset < file_offset {
189                        self.read_at_state = ReadAtState::Empty;
190                        continue;
191                    }
192                    let bytes_offset = match (offset - file_offset).try_into() {
193                        Ok(offset) => offset,
194                        Err(_) => {
195                            self.read_at_state = ReadAtState::Empty;
196                            continue;
197                        }
198                    };
199                    if bytes_offset != 0 && bytes_offset >= bytes.len() {
200                        self.read_at_state = ReadAtState::Empty;
201                        continue;
202                    }
203                    let n = min(buf.len(), bytes.len() - bytes_offset);
204                    let () = buf[..n].copy_from_slice(&bytes[bytes_offset..bytes_offset + n]);
205                    return Poll::Ready(Ok(n));
206                }
207            }
208        }
209    }
210}
211
212impl AsyncGetSize for AsyncFile {
213    fn poll_get_size(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
214        if self.get_attributes_fut.is_none() {
215            self.get_attributes_fut =
216                Some(self.file.get_attributes(fio::NodeAttributesQuery::CONTENT_SIZE));
217        }
218        let fut = self.get_attributes_fut.as_mut().unwrap();
219        let get_attributes_fut_result = futures::ready!(fut.poll_unpin(cx));
220        self.get_attributes_fut = None;
221        match get_attributes_fut_result {
222            Ok(get_attributes_response) => match get_attributes_response {
223                Ok((_mutable_attr, immutable_attr)) => {
224                    if let Some(content_size) = immutable_attr.content_size {
225                        return Poll::Ready(Ok(content_size));
226                    }
227                    return Poll::Ready(Err(zx_status::Status::NOT_SUPPORTED.into_io_error()));
228                }
229                Err(status) => {
230                    return Poll::Ready(Err(zx_status::Status::from_raw(status).into_io_error()));
231                }
232            },
233            Err(e) => {
234                return Poll::Ready(Err(std::io::Error::other(e)));
235            }
236        }
237    }
238}
239
240/// Adapter to implement AsyncReadAt + AsyncGetSize for AsyncRead + AsyncSeek.
241#[derive(Debug)]
242pub struct Adapter<T> {
243    inner: T,
244}
245
246impl<T> Adapter<T> {
247    pub fn new(inner: T) -> Adapter<T> {
248        Self { inner }
249    }
250}
251
252impl<T: AsyncRead + AsyncSeek + Unpin> AsyncReadAt for Adapter<T> {
253    fn poll_read_at(
254        mut self: Pin<&mut Self>,
255        cx: &mut Context<'_>,
256        offset: u64,
257        buf: &mut [u8],
258    ) -> Poll<std::io::Result<usize>> {
259        futures::ready!(Pin::new(&mut self.inner).poll_seek(cx, SeekFrom::Start(offset)))?;
260        Pin::new(&mut self.inner).poll_read(cx, buf)
261    }
262}
263
264impl<T: AsyncSeek + Unpin> AsyncGetSize for Adapter<T> {
265    fn poll_get_size(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
266        Pin::new(&mut self.inner).poll_seek(cx, SeekFrom::End(0))
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273    use crate::file::{self, AsyncReadAtExt};
274    use assert_matches::assert_matches;
275    use fidl::endpoints;
276    use fuchsia_async as fasync;
277    use futures::future::{self, poll_fn};
278    use futures::{StreamExt as _, TryStreamExt as _};
279    use std::convert::TryFrom as _;
280    use std::io::Write;
281    use tempfile::{NamedTempFile, TempDir};
282
283    async fn poll_read_at_with_specific_buf_size(
284        poll_read_size: u64,
285        expected_file_read_size: u64,
286    ) {
287        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
288
289        let mut reader = AsyncFile::from_proxy(proxy);
290
291        let () = poll_fn(|cx| {
292            let mut buf = vec![0u8; poll_read_size.try_into().unwrap()];
293            assert_matches!(
294                Pin::new(&mut reader).poll_read_at(cx, 0, buf.as_mut_slice()),
295                Poll::Pending
296            );
297            Poll::Ready(())
298        })
299        .await;
300
301        match stream.next().await.unwrap().unwrap() {
302            fio::FileRequest::ReadAt { count, .. } => {
303                assert_eq!(count, expected_file_read_size);
304            }
305            req => panic!("unhandled request {:?}", req),
306        }
307    }
308
309    #[fasync::run_singlethreaded(test)]
310    async fn poll_read_at_empty_buf() {
311        poll_read_at_with_specific_buf_size(0, 0).await;
312    }
313
314    #[fasync::run_singlethreaded(test)]
315    async fn poll_read_at_caps_buf_size() {
316        poll_read_at_with_specific_buf_size(fio::MAX_BUF * 2, fio::MAX_BUF).await;
317    }
318
319    #[fasync::run_singlethreaded(test)]
320    async fn poll_read_at_pending_saves_future() {
321        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
322
323        let mut reader = AsyncFile::from_proxy(proxy);
324
325        // This poll_read_at call will create a File.ReadAt future and poll it. The poll of the
326        // File.ReadAt future will return Pending because nothing is handling the FileRequestStream
327        // yet. The reader should save this File.ReadAt future for handling subsequent poll_read_at
328        // calls.
329        let () = poll_fn(|cx| {
330            assert_matches!(
331                Pin::new(&mut reader).poll_read_at(cx, 2, &mut [0u8; 1]),
332                Poll::Pending
333            );
334            Poll::Ready(())
335        })
336        .await;
337
338        // Call poll_read_at until we get a byte out. This byte should be from the first and only
339        // File.ReadAt request.
340        let poll_read_at = async move {
341            let mut buf = [0u8; 1];
342            assert_eq!(reader.read_at(2, &mut buf).await.unwrap(), buf.len());
343            assert_eq!(&buf, &[1]);
344        };
345
346        let mut file_read_requests = 0u8;
347        let handle_file_stream = async {
348            while let Some(req) = stream.try_next().await.unwrap() {
349                file_read_requests += 1;
350                match req {
351                    fio::FileRequest::ReadAt { count, offset, responder } => {
352                        assert_eq!(count, 1);
353                        assert_eq!(offset, 2);
354                        responder.send(Ok(&[file_read_requests])).unwrap();
355                    }
356                    req => panic!("unhandled request {:?}", req),
357                }
358            }
359        };
360
361        let ((), ()) = future::join(poll_read_at, handle_file_stream).await;
362        assert_eq!(file_read_requests, 1);
363    }
364
365    #[fasync::run_singlethreaded(test)]
366    async fn poll_read_at_with_smaller_buf_after_pending() {
367        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
368
369        let mut reader = AsyncFile::from_proxy(proxy);
370
371        // Call poll_read_at with a buf of length 3. This is the first poll_read_at call, so the
372        // reader will create a File.ReadAt future for 3 bytes. poll_read_at will return Pending
373        // because nothing is handling the FileRequestStream yet.
374        let () = poll_fn(|cx| {
375            assert_matches!(
376                Pin::new(&mut reader).poll_read_at(cx, 0, &mut [0u8; 3]),
377                Poll::Pending
378            );
379            Poll::Ready(())
380        })
381        .await;
382
383        // Respond to the three byte File.ReadAt request.
384        let () = async {
385            match stream.next().await.unwrap().unwrap() {
386                fio::FileRequest::ReadAt { count, offset, responder } => {
387                    assert_eq!(count, 3);
388                    assert_eq!(offset, 0);
389                    responder.send(Ok(b"012")).unwrap();
390                }
391                req => panic!("unhandled request {:?}", req),
392            }
393        }
394        .await;
395
396        // Call poll_read_at with a buf of length 1. This should resolve the previously created 3
397        // byte File.ReadAt future and return the first byte from it while saving the remaining two
398        // bytes.
399        let mut buf = [0u8; 1];
400        assert_eq!(reader.read_at(0, &mut buf).await.unwrap(), buf.len());
401        assert_eq!(&buf, b"0");
402
403        // Call poll_read_at with a buf of len 1. This should return the first saved byte, which
404        // should be the second byte from the original File.ReadAt request.
405        let mut buf = [0u8; 1];
406        assert_eq!(reader.read_at(1, &mut buf).await.unwrap(), buf.len());
407        assert_eq!(&buf, b"1");
408
409        // Call poll_read_at with a buf of len 2. There should only be one remaining saved byte
410        // from the original File.ReadAt request, so poll_read_at should only return one byte.
411        let mut buf = [0u8; 2];
412        assert_eq!(reader.read_at(2, &mut buf).await.unwrap(), 1);
413        assert_eq!(&buf[..1], b"2");
414
415        // There should be no saved bytes remaining, so a poll_read_at of four bytes should cause a
416        // new File.ReadAt request.
417        let mut buf = [0u8; 4];
418        let poll_read_at = reader.read_at(3, &mut buf);
419
420        let handle_second_file_request = async {
421            match stream.next().await.unwrap().unwrap() {
422                fio::FileRequest::ReadAt { count, offset, responder } => {
423                    assert_eq!(count, 4);
424                    assert_eq!(offset, 3);
425                    responder.send(Ok(b"3456")).unwrap();
426                }
427                req => panic!("unhandled request {:?}", req),
428            }
429        };
430
431        let (read_res, ()) = future::join(poll_read_at, handle_second_file_request).await;
432        assert_eq!(read_res.unwrap(), 4);
433        assert_eq!(&buf, b"3456");
434    }
435
436    #[fasync::run_singlethreaded(test)]
437    async fn transition_to_empty_on_fidl_error() {
438        let (proxy, _) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
439
440        let mut reader = AsyncFile::from_proxy(proxy);
441
442        // poll_read_at will fail because the channel is closed because the server end was dropped.
443        let () = poll_fn(|cx| {
444            assert_matches!(
445                Pin::new(&mut reader).poll_read_at(cx, 0, &mut [0u8; 1]),
446                Poll::Ready(Err(_))
447            );
448            Poll::Ready(())
449        })
450        .await;
451
452        // This test is accessing internal state because the only fidl error that is easy to inject
453        // is ZX_ERR_PEER_CLOSED (by closing the channel). Once the channel is closed, all new
454        // futures created by the AsyncFile will fail, but, if poll'ed, the old future would also
455        // continue to fail (not panic) because it is Fused.
456        assert_matches!(reader.read_at_state, ReadAtState::Empty);
457    }
458
459    #[fasync::run_singlethreaded(test)]
460    async fn recover_from_file_read_error() {
461        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
462
463        let mut reader = AsyncFile::from_proxy(proxy);
464
465        // Call poll_read_at until failure.
466        let mut buf = [0u8; 1];
467        let poll_read_at = reader.read_at(0, &mut buf);
468
469        let failing_file_response = async {
470            match stream.next().await.unwrap().unwrap() {
471                fio::FileRequest::ReadAt { count, offset, responder } => {
472                    assert_eq!(count, 1);
473                    assert_eq!(offset, 0);
474                    responder.send(Err(zx_status::Status::NO_MEMORY.into_raw())).unwrap();
475                }
476                req => panic!("unhandled request {:?}", req),
477            }
478        };
479
480        let (read_res, ()) = future::join(poll_read_at, failing_file_response).await;
481        assert_matches!(read_res, Err(_));
482
483        // Calling poll_read_at again should create a new File.ReadAt request instead of reusing
484        // the old future.
485        let mut buf = [0u8; 1];
486        let poll_read_at = reader.read_at(0, &mut buf);
487
488        let succeeding_file_response = async {
489            match stream.next().await.unwrap().unwrap() {
490                fio::FileRequest::ReadAt { count, offset, responder } => {
491                    assert_eq!(count, 1);
492                    assert_eq!(offset, 0);
493                    responder.send(Ok(b"0")).unwrap();
494                }
495                req => panic!("unhandled request {:?}", req),
496            }
497        };
498
499        let (read_res, ()) = future::join(poll_read_at, succeeding_file_response).await;
500        assert_eq!(read_res.unwrap(), 1);
501        assert_eq!(&buf, b"0");
502    }
503
504    #[fasync::run_singlethreaded(test)]
505    async fn poll_read_at_zero_then_read_nonzero() {
506        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
507
508        let mut reader = AsyncFile::from_proxy(proxy);
509
510        // Call poll_read_at with a zero-length buffer.
511        let () = poll_fn(|cx| {
512            assert_matches!(Pin::new(&mut reader).poll_read_at(cx, 0, &mut []), Poll::Pending);
513            Poll::Ready(())
514        })
515        .await;
516
517        // Handle the zero-length File.ReadAt request.
518        match stream.next().await.unwrap().unwrap() {
519            fio::FileRequest::ReadAt { count, offset, responder } => {
520                assert_eq!(count, 0);
521                assert_eq!(offset, 0);
522                responder.send(Ok(&[])).unwrap();
523            }
524            req => panic!("unhandled request {:?}", req),
525        }
526
527        // Call poll_read_at with a length 1 buffer until Ready is returned;
528        let mut buf = vec![0u8; 1];
529        let poll_read_at = reader.read_at(0, &mut buf);
530
531        // The AsyncFile will discard the File.ReadAt response from the first poll_read, and create
532        // another request, this handles that second request. The AsyncFile discards the first
533        // response because the first poll_read_at was for zero bytes, but the current poll_read_at
534        // is not.
535        let handle_file_request = async {
536            match stream.next().await.unwrap().unwrap() {
537                fio::FileRequest::ReadAt { count, offset, responder } => {
538                    assert_eq!(count, 1);
539                    assert_eq!(offset, 0);
540                    responder.send(Ok(&[1])).unwrap();
541                }
542                req => panic!("unhandled request {:?}", req),
543            }
544        };
545
546        let (poll_read, ()) = future::join(poll_read_at, handle_file_request).await;
547
548        // poll_read_at should read 1 byte, even though the first poll_read_at request was for zero
549        // bytes and returned Pending.
550        assert_eq!(poll_read.unwrap(), 1);
551        assert_eq!(&buf[..], &[1]);
552    }
553
554    #[fasync::run_singlethreaded(test)]
555    async fn different_poll_read_at_and_file_sizes() {
556        for first_poll_read_len in 0..5 {
557            for file_size in 0..5 {
558                for second_poll_offset in 0..file_size {
559                    for second_poll_read_len in 0..5 {
560                        let (proxy, mut stream) =
561                            endpoints::create_proxy_and_stream::<fio::FileMarker>();
562
563                        let mut reader = AsyncFile::from_proxy(proxy);
564
565                        // poll_read_at causes the AsyncFile to create a File.ReadAt request.
566                        let () = poll_fn(|cx| {
567                            let mut buf = vec![0u8; first_poll_read_len];
568                            assert_matches!(
569                                Pin::new(&mut reader).poll_read_at(cx, 0, &mut buf),
570                                Poll::Pending
571                            );
572                            Poll::Ready(())
573                        })
574                        .await;
575
576                        // Respond to the File.ReadAt request with at most as many bytes as the
577                        // poll_read_at requested.
578                        match stream.next().await.unwrap().unwrap() {
579                            fio::FileRequest::ReadAt { count, offset, responder } => {
580                                assert_eq!(count, u64::try_from(first_poll_read_len).unwrap());
581                                assert_eq!(offset, 0);
582                                let resp = vec![7u8; min(file_size, first_poll_read_len)];
583                                responder.send(Ok(&resp)).unwrap();
584                            }
585                            req => panic!("unhandled request {:?}", req),
586                        }
587
588                        // Call poll_read_at until it returns Ready. If the first poll_read_at was
589                        // for zero bytes and this poll_read_at is not or this poll_read_at offset
590                        // is outside the buffer, the AsyncFile will make another File.ReadAt
591                        // request.
592                        let mut buf = vec![0u8; second_poll_read_len];
593                        let poll_read_at = reader.read_at(second_poll_offset as u64, &mut buf);
594
595                        let second_request = first_poll_read_len == 0 && second_poll_read_len != 0
596                            || second_poll_offset != 0 && second_poll_offset >= first_poll_read_len;
597                        let handle_conditional_file_request = async {
598                            if second_request {
599                                match stream.next().await.unwrap().unwrap() {
600                                    fio::FileRequest::ReadAt { count, offset, responder } => {
601                                        assert_eq!(
602                                            count,
603                                            u64::try_from(second_poll_read_len).unwrap()
604                                        );
605                                        assert_eq!(
606                                            offset,
607                                            u64::try_from(second_poll_offset).unwrap()
608                                        );
609                                        let resp = vec![
610                                            7u8;
611                                            min(
612                                                file_size - second_poll_offset,
613                                                second_poll_read_len
614                                            )
615                                        ];
616                                        responder.send(Ok(&resp)).unwrap();
617                                    }
618                                    req => panic!("unhandled request {:?}", req),
619                                }
620                            }
621                        };
622
623                        let (read_res, ()) =
624                            future::join(poll_read_at, handle_conditional_file_request).await;
625
626                        let expected_len = if second_request {
627                            min(file_size - second_poll_offset, second_poll_read_len)
628                        } else {
629                            min(
630                                min(file_size, first_poll_read_len) - second_poll_offset,
631                                second_poll_read_len,
632                            )
633                        };
634                        let expected = vec![7u8; expected_len];
635                        assert_eq!(read_res.unwrap(), expected_len);
636                        assert_eq!(&buf[..expected_len], &expected[..]);
637                    }
638                }
639            }
640        }
641    }
642
643    async fn get_size_file_with_contents(contents: &[u8]) {
644        let dir = TempDir::new().unwrap();
645        let path = dir.path().join("get_size_file_with_contents").to_str().unwrap().to_owned();
646        let () = file::write_in_namespace(&path, contents).await.unwrap();
647        let file = file::open_in_namespace(&path, fio::PERM_READABLE).unwrap();
648
649        let mut reader = AsyncFile::from_proxy(file);
650
651        assert_eq!(reader.get_size().await.unwrap(), contents.len() as u64);
652    }
653
654    #[fasync::run_singlethreaded(test)]
655    async fn get_size_empty() {
656        get_size_file_with_contents(&[]).await;
657    }
658
659    #[fasync::run_singlethreaded(test)]
660    async fn get_size_large() {
661        let expected_contents = vec![7u8; (fio::MAX_BUF * 3).try_into().unwrap()];
662        get_size_file_with_contents(&expected_contents[..]).await;
663    }
664
665    #[fasync::run_singlethreaded(test)]
666    async fn get_size_changing_size() {
667        let (mut file, path) = NamedTempFile::new().unwrap().into_parts();
668        let proxy = file::open_in_namespace(path.to_str().unwrap(), fio::PERM_READABLE).unwrap();
669
670        let mut reader = AsyncFile::from_proxy(proxy);
671
672        assert_eq!(reader.get_size().await.unwrap(), 0);
673        file.write_all(&[1; 3][..]).unwrap();
674        assert_eq!(reader.get_size().await.unwrap(), 3);
675        file.write_all(&[2; 5][..]).unwrap();
676        assert_eq!(reader.get_size().await.unwrap(), 8);
677    }
678
679    #[fasync::run_singlethreaded(test)]
680    async fn adapter_for_cursor() {
681        let data = (0..1000).map(|i| (i % 256) as u8).collect::<Vec<_>>();
682        let cursor = futures::io::Cursor::new(data.clone());
683        let mut adapter = Adapter::new(cursor);
684
685        assert_eq!(adapter.get_size().await.unwrap(), 1000);
686
687        let mut buffer = vec![];
688        adapter.read_to_end(&mut buffer).await.unwrap();
689        assert_eq!(buffer, data);
690
691        let mut buffer = vec![0; 100];
692        adapter.read_at_exact(333, &mut buffer).await.unwrap();
693        assert_eq!(buffer, &data[333..433]);
694    }
695}