use fidl::client::QueryResponseFut;
use futures::future::Future;
use futures::io::{AsyncRead, AsyncSeek, SeekFrom};
use futures::FutureExt;
use std::cmp::min;
use std::convert::TryInto as _;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use {fidl_fuchsia_io as fio, zx_status};
pub trait AsyncReadAt {
fn poll_read_at(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
offset: u64,
buf: &mut [u8],
) -> Poll<io::Result<usize>>;
}
pub trait AsyncGetSize {
fn poll_get_size(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>;
}
pub trait AsyncGetSizeExt: AsyncGetSize {
fn get_size<'a>(&'a mut self) -> GetSize<'a, Self>
where
Self: Unpin,
{
GetSize { size_getter: self }
}
}
impl<T: AsyncGetSize + ?Sized> AsyncGetSizeExt for T {}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct GetSize<'a, R: ?Sized> {
size_getter: &'a mut R,
}
impl<R: ?Sized + Unpin> Unpin for GetSize<'_, R> {}
impl<R: AsyncGetSize + ?Sized + Unpin> Future for GetSize<'_, R> {
type Output = io::Result<u64>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
Pin::new(&mut *this.size_getter).poll_get_size(cx)
}
}
#[derive(Debug)]
pub struct AsyncFile {
file: fio::FileProxy,
read_at_state: ReadAtState,
get_attributes_fut: Option<
QueryResponseFut<Result<(fio::MutableNodeAttributes, fio::ImmutableNodeAttributes), i32>>,
>,
}
#[derive(Debug)]
enum ReadAtState {
Empty,
Forwarding {
fut: QueryResponseFut<Result<Vec<u8>, i32>>,
file_offset: u64,
zero_byte_request: bool,
},
Bytes {
bytes: Vec<u8>,
file_offset: u64,
},
}
impl AsyncFile {
pub fn from_proxy(file: fio::FileProxy) -> Self {
Self { file, read_at_state: ReadAtState::Empty, get_attributes_fut: None }
}
}
impl AsyncReadAt for AsyncFile {
fn poll_read_at(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
offset: u64,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
loop {
match self.read_at_state {
ReadAtState::Empty => {
let len = if let Ok(len) = buf.len().try_into() {
min(len, fio::MAX_BUF)
} else {
fio::MAX_BUF
};
self.read_at_state = ReadAtState::Forwarding {
fut: self.file.read_at(len, offset),
file_offset: offset,
zero_byte_request: len == 0,
};
}
ReadAtState::Forwarding { ref mut fut, file_offset, zero_byte_request } => {
match futures::ready!(Pin::new(fut).poll(cx)) {
Ok(result) => {
match result {
Err(s) => {
self.read_at_state = ReadAtState::Empty;
return Poll::Ready(Err(
zx_status::Status::from_raw(s).into_io_error()
));
}
Ok(bytes) => {
if zero_byte_request && buf.len() != 0 {
self.read_at_state = ReadAtState::Empty;
} else {
self.read_at_state =
ReadAtState::Bytes { bytes, file_offset };
}
}
}
}
Err(e) => {
self.read_at_state = ReadAtState::Empty;
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
e,
)));
}
}
}
ReadAtState::Bytes { ref bytes, file_offset } => {
if offset < file_offset {
self.read_at_state = ReadAtState::Empty;
continue;
}
let bytes_offset = match (offset - file_offset).try_into() {
Ok(offset) => offset,
Err(_) => {
self.read_at_state = ReadAtState::Empty;
continue;
}
};
if bytes_offset != 0 && bytes_offset >= bytes.len() {
self.read_at_state = ReadAtState::Empty;
continue;
}
let n = min(buf.len(), bytes.len() - bytes_offset);
let () = buf[..n].copy_from_slice(&bytes[bytes_offset..bytes_offset + n]);
return Poll::Ready(Ok(n));
}
}
}
}
}
impl AsyncGetSize for AsyncFile {
fn poll_get_size(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
if self.get_attributes_fut.is_none() {
self.get_attributes_fut =
Some(self.file.get_attributes(fio::NodeAttributesQuery::CONTENT_SIZE));
}
let fut = self.get_attributes_fut.as_mut().unwrap();
let get_attributes_fut_result = futures::ready!(fut.poll_unpin(cx));
self.get_attributes_fut = None;
match get_attributes_fut_result {
Ok(get_attributes_response) => match get_attributes_response {
Ok((_mutable_attr, immutable_attr)) => {
if let Some(content_size) = immutable_attr.content_size {
return Poll::Ready(Ok(content_size));
}
return Poll::Ready(Err(zx_status::Status::NOT_SUPPORTED.into_io_error()));
}
Err(status) => {
return Poll::Ready(Err(zx_status::Status::from_raw(status).into_io_error()));
}
},
Err(e) => {
return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, e)));
}
}
}
}
#[derive(Debug)]
pub struct Adapter<T> {
inner: T,
}
impl<T> Adapter<T> {
pub fn new(inner: T) -> Adapter<T> {
Self { inner }
}
}
impl<T: AsyncRead + AsyncSeek + Unpin> AsyncReadAt for Adapter<T> {
fn poll_read_at(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
offset: u64,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
futures::ready!(Pin::new(&mut self.inner).poll_seek(cx, SeekFrom::Start(offset)))?;
Pin::new(&mut self.inner).poll_read(cx, buf)
}
}
impl<T: AsyncSeek + Unpin> AsyncGetSize for Adapter<T> {
fn poll_get_size(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
Pin::new(&mut self.inner).poll_seek(cx, SeekFrom::End(0))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::file::{self, AsyncReadAtExt};
use assert_matches::assert_matches;
use fidl::endpoints;
use fuchsia_async as fasync;
use futures::future::{self, poll_fn};
use futures::{StreamExt as _, TryStreamExt as _};
use std::convert::TryFrom as _;
use std::io::Write;
use tempfile::{NamedTempFile, TempDir};
async fn poll_read_at_with_specific_buf_size(
poll_read_size: u64,
expected_file_read_size: u64,
) {
let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
let mut reader = AsyncFile::from_proxy(proxy);
let () = poll_fn(|cx| {
let mut buf = vec![0u8; poll_read_size.try_into().unwrap()];
assert_matches!(
Pin::new(&mut reader).poll_read_at(cx, 0, buf.as_mut_slice()),
Poll::Pending
);
Poll::Ready(())
})
.await;
match stream.next().await.unwrap().unwrap() {
fio::FileRequest::ReadAt { count, .. } => {
assert_eq!(count, expected_file_read_size);
}
req => panic!("unhandled request {:?}", req),
}
}
#[fasync::run_singlethreaded(test)]
async fn poll_read_at_empty_buf() {
poll_read_at_with_specific_buf_size(0, 0).await;
}
#[fasync::run_singlethreaded(test)]
async fn poll_read_at_caps_buf_size() {
poll_read_at_with_specific_buf_size(fio::MAX_BUF * 2, fio::MAX_BUF).await;
}
#[fasync::run_singlethreaded(test)]
async fn poll_read_at_pending_saves_future() {
let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
let mut reader = AsyncFile::from_proxy(proxy);
let () = poll_fn(|cx| {
assert_matches!(
Pin::new(&mut reader).poll_read_at(cx, 2, &mut [0u8; 1]),
Poll::Pending
);
Poll::Ready(())
})
.await;
let poll_read_at = async move {
let mut buf = [0u8; 1];
assert_eq!(reader.read_at(2, &mut buf).await.unwrap(), buf.len());
assert_eq!(&buf, &[1]);
};
let mut file_read_requests = 0u8;
let handle_file_stream = async {
while let Some(req) = stream.try_next().await.unwrap() {
file_read_requests += 1;
match req {
fio::FileRequest::ReadAt { count, offset, responder } => {
assert_eq!(count, 1);
assert_eq!(offset, 2);
responder.send(Ok(&[file_read_requests])).unwrap();
}
req => panic!("unhandled request {:?}", req),
}
}
};
let ((), ()) = future::join(poll_read_at, handle_file_stream).await;
assert_eq!(file_read_requests, 1);
}
#[fasync::run_singlethreaded(test)]
async fn poll_read_at_with_smaller_buf_after_pending() {
let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
let mut reader = AsyncFile::from_proxy(proxy);
let () = poll_fn(|cx| {
assert_matches!(
Pin::new(&mut reader).poll_read_at(cx, 0, &mut [0u8; 3]),
Poll::Pending
);
Poll::Ready(())
})
.await;
let () = async {
match stream.next().await.unwrap().unwrap() {
fio::FileRequest::ReadAt { count, offset, responder } => {
assert_eq!(count, 3);
assert_eq!(offset, 0);
responder.send(Ok(b"012")).unwrap();
}
req => panic!("unhandled request {:?}", req),
}
}
.await;
let mut buf = [0u8; 1];
assert_eq!(reader.read_at(0, &mut buf).await.unwrap(), buf.len());
assert_eq!(&buf, b"0");
let mut buf = [0u8; 1];
assert_eq!(reader.read_at(1, &mut buf).await.unwrap(), buf.len());
assert_eq!(&buf, b"1");
let mut buf = [0u8; 2];
assert_eq!(reader.read_at(2, &mut buf).await.unwrap(), 1);
assert_eq!(&buf[..1], b"2");
let mut buf = [0u8; 4];
let poll_read_at = reader.read_at(3, &mut buf);
let handle_second_file_request = async {
match stream.next().await.unwrap().unwrap() {
fio::FileRequest::ReadAt { count, offset, responder } => {
assert_eq!(count, 4);
assert_eq!(offset, 3);
responder.send(Ok(b"3456")).unwrap();
}
req => panic!("unhandled request {:?}", req),
}
};
let (read_res, ()) = future::join(poll_read_at, handle_second_file_request).await;
assert_eq!(read_res.unwrap(), 4);
assert_eq!(&buf, b"3456");
}
#[fasync::run_singlethreaded(test)]
async fn transition_to_empty_on_fidl_error() {
let (proxy, _) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
let mut reader = AsyncFile::from_proxy(proxy);
let () = poll_fn(|cx| {
assert_matches!(
Pin::new(&mut reader).poll_read_at(cx, 0, &mut [0u8; 1]),
Poll::Ready(Err(_))
);
Poll::Ready(())
})
.await;
assert_matches!(reader.read_at_state, ReadAtState::Empty);
}
#[fasync::run_singlethreaded(test)]
async fn recover_from_file_read_error() {
let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
let mut reader = AsyncFile::from_proxy(proxy);
let mut buf = [0u8; 1];
let poll_read_at = reader.read_at(0, &mut buf);
let failing_file_response = async {
match stream.next().await.unwrap().unwrap() {
fio::FileRequest::ReadAt { count, offset, responder } => {
assert_eq!(count, 1);
assert_eq!(offset, 0);
responder.send(Err(zx_status::Status::NO_MEMORY.into_raw())).unwrap();
}
req => panic!("unhandled request {:?}", req),
}
};
let (read_res, ()) = future::join(poll_read_at, failing_file_response).await;
assert_matches!(read_res, Err(_));
let mut buf = [0u8; 1];
let poll_read_at = reader.read_at(0, &mut buf);
let succeeding_file_response = async {
match stream.next().await.unwrap().unwrap() {
fio::FileRequest::ReadAt { count, offset, responder } => {
assert_eq!(count, 1);
assert_eq!(offset, 0);
responder.send(Ok(b"0")).unwrap();
}
req => panic!("unhandled request {:?}", req),
}
};
let (read_res, ()) = future::join(poll_read_at, succeeding_file_response).await;
assert_eq!(read_res.unwrap(), 1);
assert_eq!(&buf, b"0");
}
#[fasync::run_singlethreaded(test)]
async fn poll_read_at_zero_then_read_nonzero() {
let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
let mut reader = AsyncFile::from_proxy(proxy);
let () = poll_fn(|cx| {
assert_matches!(Pin::new(&mut reader).poll_read_at(cx, 0, &mut []), Poll::Pending);
Poll::Ready(())
})
.await;
match stream.next().await.unwrap().unwrap() {
fio::FileRequest::ReadAt { count, offset, responder } => {
assert_eq!(count, 0);
assert_eq!(offset, 0);
responder.send(Ok(&[])).unwrap();
}
req => panic!("unhandled request {:?}", req),
}
let mut buf = vec![0u8; 1];
let poll_read_at = reader.read_at(0, &mut buf);
let handle_file_request = async {
match stream.next().await.unwrap().unwrap() {
fio::FileRequest::ReadAt { count, offset, responder } => {
assert_eq!(count, 1);
assert_eq!(offset, 0);
responder.send(Ok(&[1])).unwrap();
}
req => panic!("unhandled request {:?}", req),
}
};
let (poll_read, ()) = future::join(poll_read_at, handle_file_request).await;
assert_eq!(poll_read.unwrap(), 1);
assert_eq!(&buf[..], &[1]);
}
#[fasync::run_singlethreaded(test)]
async fn different_poll_read_at_and_file_sizes() {
for first_poll_read_len in 0..5 {
for file_size in 0..5 {
for second_poll_offset in 0..file_size {
for second_poll_read_len in 0..5 {
let (proxy, mut stream) =
endpoints::create_proxy_and_stream::<fio::FileMarker>();
let mut reader = AsyncFile::from_proxy(proxy);
let () = poll_fn(|cx| {
let mut buf = vec![0u8; first_poll_read_len];
assert_matches!(
Pin::new(&mut reader).poll_read_at(cx, 0, &mut buf),
Poll::Pending
);
Poll::Ready(())
})
.await;
match stream.next().await.unwrap().unwrap() {
fio::FileRequest::ReadAt { count, offset, responder } => {
assert_eq!(count, u64::try_from(first_poll_read_len).unwrap());
assert_eq!(offset, 0);
let resp = vec![7u8; min(file_size, first_poll_read_len)];
responder.send(Ok(&resp)).unwrap();
}
req => panic!("unhandled request {:?}", req),
}
let mut buf = vec![0u8; second_poll_read_len];
let poll_read_at = reader.read_at(second_poll_offset as u64, &mut buf);
let second_request = first_poll_read_len == 0 && second_poll_read_len != 0
|| second_poll_offset != 0 && second_poll_offset >= first_poll_read_len;
let handle_conditional_file_request = async {
if second_request {
match stream.next().await.unwrap().unwrap() {
fio::FileRequest::ReadAt { count, offset, responder } => {
assert_eq!(
count,
u64::try_from(second_poll_read_len).unwrap()
);
assert_eq!(
offset,
u64::try_from(second_poll_offset).unwrap()
);
let resp = vec![
7u8;
min(
file_size - second_poll_offset,
second_poll_read_len
)
];
responder.send(Ok(&resp)).unwrap();
}
req => panic!("unhandled request {:?}", req),
}
}
};
let (read_res, ()) =
future::join(poll_read_at, handle_conditional_file_request).await;
let expected_len = if second_request {
min(file_size - second_poll_offset, second_poll_read_len)
} else {
min(
min(file_size, first_poll_read_len) - second_poll_offset,
second_poll_read_len,
)
};
let expected = vec![7u8; expected_len];
assert_eq!(read_res.unwrap(), expected_len);
assert_eq!(&buf[..expected_len], &expected[..]);
}
}
}
}
}
async fn get_size_file_with_contents(contents: &[u8]) {
let dir = TempDir::new().unwrap();
let path = dir.path().join("get_size_file_with_contents").to_str().unwrap().to_owned();
let () = file::write_in_namespace(&path, contents).await.unwrap();
let file = file::open_in_namespace(&path, fio::PERM_READABLE).unwrap();
let mut reader = AsyncFile::from_proxy(file);
assert_eq!(reader.get_size().await.unwrap(), contents.len() as u64);
}
#[fasync::run_singlethreaded(test)]
async fn get_size_empty() {
get_size_file_with_contents(&[]).await;
}
#[fasync::run_singlethreaded(test)]
async fn get_size_large() {
let expected_contents = vec![7u8; (fio::MAX_BUF * 3).try_into().unwrap()];
get_size_file_with_contents(&expected_contents[..]).await;
}
#[fasync::run_singlethreaded(test)]
async fn get_size_changing_size() {
let (mut file, path) = NamedTempFile::new().unwrap().into_parts();
let proxy = file::open_in_namespace(path.to_str().unwrap(), fio::PERM_READABLE).unwrap();
let mut reader = AsyncFile::from_proxy(proxy);
assert_eq!(reader.get_size().await.unwrap(), 0);
file.write_all(&[1; 3][..]).unwrap();
assert_eq!(reader.get_size().await.unwrap(), 3);
file.write_all(&[2; 5][..]).unwrap();
assert_eq!(reader.get_size().await.unwrap(), 8);
}
#[fasync::run_singlethreaded(test)]
async fn adapter_for_cursor() {
let data = (0..1000).map(|i| (i % 256) as u8).collect::<Vec<_>>();
let cursor = futures::io::Cursor::new(data.clone());
let mut adapter = Adapter::new(cursor);
assert_eq!(adapter.get_size().await.unwrap(), 1000);
let mut buffer = vec![];
adapter.read_to_end(&mut buffer).await.unwrap();
assert_eq!(buffer, data);
let mut buffer = vec![0; 100];
adapter.read_at_exact(333, &mut buffer).await.unwrap();
assert_eq!(buffer, &data[333..433]);
}
}