use crate::file::{AsyncGetSize, AsyncReadAt};
use fidl_fuchsia_io as fio;
use pin_project::pin_project;
use std::cmp::min;
use std::convert::TryInto as _;
use std::pin::Pin;
use std::task::{Context, Poll};
trait UsizeExt {
fn add(self, rhs: usize) -> Result<usize, std::io::Error>;
}
impl UsizeExt for usize {
fn add(self, rhs: usize) -> Result<usize, std::io::Error> {
self.checked_add(rhs).ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::Other, "usize addition overflowed")
})
}
}
fn u64_to_usize_safe(u: u64) -> usize {
let ret: usize = u.try_into().unwrap();
static_assertions::assert_eq_size_val!(u, ret);
ret
}
#[pin_project]
pub struct BufferedAsyncReadAt<T> {
#[pin]
wrapped: T,
offset: usize,
len: usize,
cache: Option<Box<[u8; fio::MAX_TRANSFER_SIZE as usize]>>,
}
impl<T> BufferedAsyncReadAt<T> {
pub fn new(wrapped: T) -> Self {
Self { wrapped, offset: 0, len: 0, cache: None }
}
}
impl<T: AsyncReadAt> AsyncReadAt for BufferedAsyncReadAt<T> {
fn poll_read_at(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
offset_u64: u64,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
let this = self.project();
let offset = u64_to_usize_safe(offset_u64);
let cache =
this.cache.get_or_insert_with(|| Box::new([0u8; fio::MAX_TRANSFER_SIZE as usize]));
if *this.offset <= offset && this.offset.add(*this.len)? > offset {
let start = offset - *this.offset;
let n = min(buf.len(), *this.len - start);
let () = buf[..n].copy_from_slice(&cache[start..start + n]);
return Poll::Ready(Ok(n));
}
match this.wrapped.poll_read_at(cx, offset_u64, &mut cache[..]) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(len)) => {
*this.offset = offset;
*this.len = len;
let n = min(len, buf.len());
let () = buf[..n].copy_from_slice(&cache[..n]);
return Poll::Ready(Ok(n));
}
p @ Poll::Ready(_) => {
return p;
}
}
}
}
impl<T: AsyncGetSize> AsyncGetSize for BufferedAsyncReadAt<T> {
fn poll_get_size(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
let this = self.project();
this.wrapped.poll_get_size(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::file::{AsyncGetSizeExt as _, AsyncReadAtExt as _};
use assert_matches::assert_matches;
use std::cell::RefCell;
use std::convert::TryFrom as _;
use std::rc::Rc;
#[test]
fn max_transfer_size_fits_in_usize() {
assert_eq!(
fio::MAX_TRANSFER_SIZE,
u64::try_from(usize::try_from(fio::MAX_TRANSFER_SIZE).unwrap()).unwrap()
);
}
#[test]
fn usize_ext_add() {
assert_eq!(0usize.add(1).unwrap(), 1);
assert_matches!(usize::MAX.add(1), Err(_));
}
#[test]
fn u64_to_usize_safe() {
assert_eq!(super::u64_to_usize_safe(5u64), 5usize);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn poll_get_size_forwards() {
struct Mock {
called: bool,
}
impl AsyncGetSize for Mock {
fn poll_get_size(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<std::io::Result<u64>> {
self.called = true;
Poll::Ready(Ok(3))
}
}
let mut reader = BufferedAsyncReadAt::new(Mock { called: false });
assert_matches!(reader.get_size().await, Ok(3));
assert!(reader.wrapped.called);
}
struct Mock {
recorded_offsets: Rc<RefCell<Vec<u64>>>,
content: Vec<u8>,
}
impl Mock {
fn new(content: Vec<u8>) -> (Self, Rc<RefCell<Vec<u64>>>) {
let recorded_offsets = Rc::new(RefCell::new(vec![]));
(Self { recorded_offsets: recorded_offsets.clone(), content }, recorded_offsets)
}
}
impl AsyncReadAt for Mock {
fn poll_read_at(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
offset: u64,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
self.recorded_offsets.borrow_mut().push(offset);
let offset = super::u64_to_usize_safe(offset);
assert_eq!(buf.len(), usize::try_from(fio::MAX_TRANSFER_SIZE).unwrap());
let start = std::cmp::min(offset, self.content.len());
let n = std::cmp::min(buf.len(), self.content.len() - start);
let end = start + n;
buf[..n].copy_from_slice(&self.content[start..end]);
Poll::Ready(Ok(n))
}
}
#[fuchsia_async::run_singlethreaded(test)]
async fn poll_read_at_uses_cache() {
let (mock, offsets) = Mock::new(vec![0, 1, 2, 3, 4]);
let mut reader = BufferedAsyncReadAt::new(mock);
let mut buf = vec![5; 3];
let bytes_read = reader.read_at(1, buf.as_mut_slice()).await.unwrap();
assert_eq!(bytes_read, 3);
assert_eq!(buf, vec![1, 2, 3]);
assert_eq!(*offsets.borrow(), vec![1]);
offsets.borrow_mut().clear();
let mut buf = vec![5; 2];
let bytes_read = reader.read_at(1, buf.as_mut_slice()).await.unwrap();
assert_eq!(bytes_read, 2);
assert_eq!(buf, vec![1, 2]);
assert_eq!(*offsets.borrow(), Vec::<u64>::new());
let mut buf = vec![5; 2];
let bytes_read = reader.read_at(2, buf.as_mut_slice()).await.unwrap();
assert_eq!(bytes_read, 2);
assert_eq!(buf, vec![2, 3]);
assert_eq!(*offsets.borrow(), Vec::<u64>::new());
let mut buf = vec![5; 4];
let bytes_read = reader.read_at(1, buf.as_mut_slice()).await.unwrap();
assert_eq!(bytes_read, 4);
assert_eq!(buf, vec![1, 2, 3, 4]);
assert_eq!(*offsets.borrow(), Vec::<u64>::new());
let mut buf = vec![5; 1];
let bytes_read = reader.read_at(4, buf.as_mut_slice()).await.unwrap();
assert_eq!(bytes_read, 1);
assert_eq!(buf, vec![4]);
assert_eq!(*offsets.borrow(), Vec::<u64>::new());
let mut buf = vec![5; 3];
let bytes_read = reader.read_at(3, buf.as_mut_slice()).await.unwrap();
assert_eq!(bytes_read, 2);
assert_eq!(buf, vec![3, 4, 5]);
assert_eq!(*offsets.borrow(), Vec::<u64>::new());
}
#[fuchsia_async::run_singlethreaded(test)]
async fn poll_read_at_forwards() {
let content = (0u8..8)
.into_iter()
.cycle()
.take(fio::MAX_TRANSFER_SIZE.try_into().unwrap())
.chain([8])
.collect();
let (mock, offsets) = Mock::new(content);
let mut reader = BufferedAsyncReadAt::new(mock);
let mut buf = vec![9; 1];
let bytes_read = reader.read_at(1, buf.as_mut_slice()).await.unwrap();
assert_eq!(bytes_read, 1);
assert_eq!(buf, vec![1]);
assert_eq!(*offsets.borrow(), vec![1]);
offsets.borrow_mut().clear();
let mut buf = vec![9; 1];
let bytes_read = reader.read_at(0, buf.as_mut_slice()).await.unwrap();
assert_eq!(bytes_read, 1);
assert_eq!(buf, vec![0]);
assert_eq!(*offsets.borrow(), vec![0]);
offsets.borrow_mut().clear();
let mut buf = vec![9; 1];
let bytes_read = reader.read_at(fio::MAX_TRANSFER_SIZE, buf.as_mut_slice()).await.unwrap();
assert_eq!(bytes_read, 1);
assert_eq!(buf, vec![8]);
assert_eq!(*offsets.borrow(), vec![fio::MAX_TRANSFER_SIZE]);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn poll_read_at_requested_range_ends_beyond_content() {
let (mock, offsets) = Mock::new(vec![0, 1, 2]);
let mut reader = BufferedAsyncReadAt::new(mock);
let mut buf = vec![3; 5];
let bytes_read = reader.read_at(0, buf.as_mut_slice()).await.unwrap();
assert_eq!(bytes_read, 3);
assert_eq!(buf, vec![0, 1, 2, 3, 3]);
assert_eq!(*offsets.borrow(), vec![0]);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn poll_read_at_requested_range_starts_beyond_content() {
let (mock, offsets) = Mock::new(vec![0, 1, 2]);
let mut reader = BufferedAsyncReadAt::new(mock);
let mut buf = vec![3; 5];
let bytes_read = reader.read_at(3, buf.as_mut_slice()).await.unwrap();
assert_eq!(bytes_read, 0);
assert_eq!(buf, vec![3, 3, 3, 3, 3]);
assert_eq!(*offsets.borrow(), vec![3]);
}
#[fuchsia_async::run_singlethreaded(test)]
async fn poll_read_at_forwards_error() {
struct Mock;
impl AsyncReadAt for Mock {
fn poll_read_at(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_offset: u64,
_buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"BufferedAsyncReadAt forwarded the error",
)))
}
}
let mut reader = BufferedAsyncReadAt::new(Mock);
let mut buf = vec![0u8; 1];
let res = reader.read_at(0, buf.as_mut_slice()).await;
assert_matches!(res, Err(_));
assert_eq!(res.err().unwrap().to_string(), "BufferedAsyncReadAt forwarded the error");
}
#[fuchsia_async::run_singlethreaded(test)]
async fn poll_read_at_forwards_pending() {
struct Mock;
impl AsyncReadAt for Mock {
fn poll_read_at(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_offset: u64,
_buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
Poll::Pending
}
}
#[pin_project]
struct VerifyPending {
#[pin]
object_under_test: BufferedAsyncReadAt<Mock>,
}
impl futures::future::Future for VerifyPending {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let res = this.object_under_test.poll_read_at(cx, 0, &mut [0]);
assert_matches!(res, Poll::Pending);
Poll::Ready(())
}
}
let reader = BufferedAsyncReadAt::new(Mock);
let verifier = VerifyPending { object_under_test: reader };
let () = verifier.await;
}
}