1use fidl::client::QueryResponseFut;
6use flex_client::Dialect;
7use flex_fuchsia_io as fio;
8use futures::future::Future;
9use futures::io::{AsyncRead, AsyncSeek, SeekFrom};
10use futures::FutureExt;
11use std::cmp::min;
12use std::convert::TryInto as _;
13use std::io;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17pub trait AsyncReadAt {
20 fn poll_read_at(
26 self: Pin<&mut Self>,
27 cx: &mut Context<'_>,
28 offset: u64,
29 buf: &mut [u8],
30 ) -> Poll<io::Result<usize>>;
31}
32
33pub trait AsyncGetSize {
35 fn poll_get_size(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>;
37}
38
39pub trait AsyncGetSizeExt: AsyncGetSize {
41 fn get_size<'a>(&'a mut self) -> GetSize<'a, Self>
43 where
44 Self: Unpin,
45 {
46 GetSize { size_getter: self }
47 }
48}
49
50impl<T: AsyncGetSize + ?Sized> AsyncGetSizeExt for T {}
51
52#[derive(Debug)]
54#[must_use = "futures do nothing unless you `.await` or poll them"]
55pub struct GetSize<'a, R: ?Sized> {
56 size_getter: &'a mut R,
57}
58
59impl<R: ?Sized + Unpin> Unpin for GetSize<'_, R> {}
60
61impl<R: AsyncGetSize + ?Sized + Unpin> Future for GetSize<'_, R> {
62 type Output = io::Result<u64>;
63
64 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
65 let this = &mut *self;
66 Pin::new(&mut *this.size_getter).poll_get_size(cx)
67 }
68}
69
70#[derive(Debug)]
77pub struct AsyncFile {
78 file: fio::FileProxy,
79 read_at_state: ReadAtState,
80 get_attributes_fut: Option<
81 QueryResponseFut<
82 Result<(fio::MutableNodeAttributes, fio::ImmutableNodeAttributes), i32>,
83 Dialect,
84 >,
85 >,
86}
87
88#[derive(Debug)]
89enum ReadAtState {
90 Empty,
91 Forwarding {
92 fut: QueryResponseFut<Result<Vec<u8>, i32>, Dialect>,
93 file_offset: u64,
94 zero_byte_request: bool,
95 },
96 Bytes {
97 bytes: Vec<u8>,
98 file_offset: u64,
99 },
100}
101
102impl AsyncFile {
103 pub fn from_proxy(file: fio::FileProxy) -> Self {
104 Self { file, read_at_state: ReadAtState::Empty, get_attributes_fut: None }
105 }
106}
107
108impl AsyncReadAt for AsyncFile {
109 fn poll_read_at(
110 mut self: Pin<&mut Self>,
111 cx: &mut Context<'_>,
112 offset: u64,
113 buf: &mut [u8],
114 ) -> Poll<std::io::Result<usize>> {
115 loop {
116 match self.read_at_state {
117 ReadAtState::Empty => {
118 let len = if let Ok(len) = buf.len().try_into() {
119 min(len, fio::MAX_BUF)
120 } else {
121 fio::MAX_BUF
122 };
123 self.read_at_state = ReadAtState::Forwarding {
124 fut: self.file.read_at(len, offset),
125 file_offset: offset,
126 zero_byte_request: len == 0,
127 };
128 }
129 ReadAtState::Forwarding { ref mut fut, file_offset, zero_byte_request } => {
130 match futures::ready!(Pin::new(fut).poll(cx)) {
131 Ok(result) => {
132 match result {
133 Err(s) => {
134 self.read_at_state = ReadAtState::Empty;
135 return Poll::Ready(Err(
136 zx_status::Status::from_raw(s).into_io_error()
137 ));
138 }
139 Ok(bytes) => {
140 if zero_byte_request && buf.len() != 0 {
149 self.read_at_state = ReadAtState::Empty;
150 } else {
151 self.read_at_state =
152 ReadAtState::Bytes { bytes, file_offset };
153 }
154 }
155 }
156 }
157 Err(e) => {
158 self.read_at_state = ReadAtState::Empty;
159 return Poll::Ready(Err(std::io::Error::other(e)));
160 }
161 }
162 }
163 ReadAtState::Bytes { ref bytes, file_offset } => {
164 if offset < file_offset {
165 self.read_at_state = ReadAtState::Empty;
166 continue;
167 }
168 let bytes_offset = match (offset - file_offset).try_into() {
169 Ok(offset) => offset,
170 Err(_) => {
171 self.read_at_state = ReadAtState::Empty;
172 continue;
173 }
174 };
175 if bytes_offset != 0 && bytes_offset >= bytes.len() {
176 self.read_at_state = ReadAtState::Empty;
177 continue;
178 }
179 let n = min(buf.len(), bytes.len() - bytes_offset);
180 let () = buf[..n].copy_from_slice(&bytes[bytes_offset..bytes_offset + n]);
181 return Poll::Ready(Ok(n));
182 }
183 }
184 }
185 }
186}
187
188impl AsyncGetSize for AsyncFile {
189 fn poll_get_size(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
190 if self.get_attributes_fut.is_none() {
191 self.get_attributes_fut =
192 Some(self.file.get_attributes(fio::NodeAttributesQuery::CONTENT_SIZE));
193 }
194 let fut = self.get_attributes_fut.as_mut().unwrap();
195 let get_attributes_fut_result = futures::ready!(fut.poll_unpin(cx));
196 self.get_attributes_fut = None;
197 match get_attributes_fut_result {
198 Ok(get_attributes_response) => match get_attributes_response {
199 Ok((_mutable_attr, immutable_attr)) => {
200 if let Some(content_size) = immutable_attr.content_size {
201 return Poll::Ready(Ok(content_size));
202 }
203 return Poll::Ready(Err(zx_status::Status::NOT_SUPPORTED.into_io_error()));
204 }
205 Err(status) => {
206 return Poll::Ready(Err(zx_status::Status::from_raw(status).into_io_error()));
207 }
208 },
209 Err(e) => {
210 return Poll::Ready(Err(std::io::Error::other(e)));
211 }
212 }
213 }
214}
215
216#[derive(Debug)]
218pub struct Adapter<T> {
219 inner: T,
220}
221
222impl<T> Adapter<T> {
223 pub fn new(inner: T) -> Adapter<T> {
224 Self { inner }
225 }
226}
227
228impl<T: AsyncRead + AsyncSeek + Unpin> AsyncReadAt for Adapter<T> {
229 fn poll_read_at(
230 mut self: Pin<&mut Self>,
231 cx: &mut Context<'_>,
232 offset: u64,
233 buf: &mut [u8],
234 ) -> Poll<std::io::Result<usize>> {
235 futures::ready!(Pin::new(&mut self.inner).poll_seek(cx, SeekFrom::Start(offset)))?;
236 Pin::new(&mut self.inner).poll_read(cx, buf)
237 }
238}
239
240impl<T: AsyncSeek + Unpin> AsyncGetSize for Adapter<T> {
241 fn poll_get_size(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
242 Pin::new(&mut self.inner).poll_seek(cx, SeekFrom::End(0))
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249 use crate::file::{self, AsyncReadAtExt};
250 use assert_matches::assert_matches;
251 use fidl::endpoints;
252 use fuchsia_async as fasync;
253 use futures::future::{self, poll_fn};
254 use futures::{StreamExt as _, TryStreamExt as _};
255 use std::convert::TryFrom as _;
256 use std::io::Write;
257 use tempfile::{NamedTempFile, TempDir};
258
259 async fn poll_read_at_with_specific_buf_size(
260 poll_read_size: u64,
261 expected_file_read_size: u64,
262 ) {
263 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
264
265 let mut reader = AsyncFile::from_proxy(proxy);
266
267 let () = poll_fn(|cx| {
268 let mut buf = vec![0u8; poll_read_size.try_into().unwrap()];
269 assert_matches!(
270 Pin::new(&mut reader).poll_read_at(cx, 0, buf.as_mut_slice()),
271 Poll::Pending
272 );
273 Poll::Ready(())
274 })
275 .await;
276
277 match stream.next().await.unwrap().unwrap() {
278 fio::FileRequest::ReadAt { count, .. } => {
279 assert_eq!(count, expected_file_read_size);
280 }
281 req => panic!("unhandled request {:?}", req),
282 }
283 }
284
285 #[fasync::run_singlethreaded(test)]
286 async fn poll_read_at_empty_buf() {
287 poll_read_at_with_specific_buf_size(0, 0).await;
288 }
289
290 #[fasync::run_singlethreaded(test)]
291 async fn poll_read_at_caps_buf_size() {
292 poll_read_at_with_specific_buf_size(fio::MAX_BUF * 2, fio::MAX_BUF).await;
293 }
294
295 #[fasync::run_singlethreaded(test)]
296 async fn poll_read_at_pending_saves_future() {
297 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
298
299 let mut reader = AsyncFile::from_proxy(proxy);
300
301 let () = poll_fn(|cx| {
306 assert_matches!(
307 Pin::new(&mut reader).poll_read_at(cx, 2, &mut [0u8; 1]),
308 Poll::Pending
309 );
310 Poll::Ready(())
311 })
312 .await;
313
314 let poll_read_at = async move {
317 let mut buf = [0u8; 1];
318 assert_eq!(reader.read_at(2, &mut buf).await.unwrap(), buf.len());
319 assert_eq!(&buf, &[1]);
320 };
321
322 let mut file_read_requests = 0u8;
323 let handle_file_stream = async {
324 while let Some(req) = stream.try_next().await.unwrap() {
325 file_read_requests += 1;
326 match req {
327 fio::FileRequest::ReadAt { count, offset, responder } => {
328 assert_eq!(count, 1);
329 assert_eq!(offset, 2);
330 responder.send(Ok(&[file_read_requests])).unwrap();
331 }
332 req => panic!("unhandled request {:?}", req),
333 }
334 }
335 };
336
337 let ((), ()) = future::join(poll_read_at, handle_file_stream).await;
338 assert_eq!(file_read_requests, 1);
339 }
340
341 #[fasync::run_singlethreaded(test)]
342 async fn poll_read_at_with_smaller_buf_after_pending() {
343 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
344
345 let mut reader = AsyncFile::from_proxy(proxy);
346
347 let () = poll_fn(|cx| {
351 assert_matches!(
352 Pin::new(&mut reader).poll_read_at(cx, 0, &mut [0u8; 3]),
353 Poll::Pending
354 );
355 Poll::Ready(())
356 })
357 .await;
358
359 let () = async {
361 match stream.next().await.unwrap().unwrap() {
362 fio::FileRequest::ReadAt { count, offset, responder } => {
363 assert_eq!(count, 3);
364 assert_eq!(offset, 0);
365 responder.send(Ok(b"012")).unwrap();
366 }
367 req => panic!("unhandled request {:?}", req),
368 }
369 }
370 .await;
371
372 let mut buf = [0u8; 1];
376 assert_eq!(reader.read_at(0, &mut buf).await.unwrap(), buf.len());
377 assert_eq!(&buf, b"0");
378
379 let mut buf = [0u8; 1];
382 assert_eq!(reader.read_at(1, &mut buf).await.unwrap(), buf.len());
383 assert_eq!(&buf, b"1");
384
385 let mut buf = [0u8; 2];
388 assert_eq!(reader.read_at(2, &mut buf).await.unwrap(), 1);
389 assert_eq!(&buf[..1], b"2");
390
391 let mut buf = [0u8; 4];
394 let poll_read_at = reader.read_at(3, &mut buf);
395
396 let handle_second_file_request = async {
397 match stream.next().await.unwrap().unwrap() {
398 fio::FileRequest::ReadAt { count, offset, responder } => {
399 assert_eq!(count, 4);
400 assert_eq!(offset, 3);
401 responder.send(Ok(b"3456")).unwrap();
402 }
403 req => panic!("unhandled request {:?}", req),
404 }
405 };
406
407 let (read_res, ()) = future::join(poll_read_at, handle_second_file_request).await;
408 assert_eq!(read_res.unwrap(), 4);
409 assert_eq!(&buf, b"3456");
410 }
411
412 #[fasync::run_singlethreaded(test)]
413 async fn transition_to_empty_on_fidl_error() {
414 let (proxy, _) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
415
416 let mut reader = AsyncFile::from_proxy(proxy);
417
418 let () = poll_fn(|cx| {
420 assert_matches!(
421 Pin::new(&mut reader).poll_read_at(cx, 0, &mut [0u8; 1]),
422 Poll::Ready(Err(_))
423 );
424 Poll::Ready(())
425 })
426 .await;
427
428 assert_matches!(reader.read_at_state, ReadAtState::Empty);
433 }
434
435 #[fasync::run_singlethreaded(test)]
436 async fn recover_from_file_read_error() {
437 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
438
439 let mut reader = AsyncFile::from_proxy(proxy);
440
441 let mut buf = [0u8; 1];
443 let poll_read_at = reader.read_at(0, &mut buf);
444
445 let failing_file_response = async {
446 match stream.next().await.unwrap().unwrap() {
447 fio::FileRequest::ReadAt { count, offset, responder } => {
448 assert_eq!(count, 1);
449 assert_eq!(offset, 0);
450 responder.send(Err(zx_status::Status::NO_MEMORY.into_raw())).unwrap();
451 }
452 req => panic!("unhandled request {:?}", req),
453 }
454 };
455
456 let (read_res, ()) = future::join(poll_read_at, failing_file_response).await;
457 assert_matches!(read_res, Err(_));
458
459 let mut buf = [0u8; 1];
462 let poll_read_at = reader.read_at(0, &mut buf);
463
464 let succeeding_file_response = async {
465 match stream.next().await.unwrap().unwrap() {
466 fio::FileRequest::ReadAt { count, offset, responder } => {
467 assert_eq!(count, 1);
468 assert_eq!(offset, 0);
469 responder.send(Ok(b"0")).unwrap();
470 }
471 req => panic!("unhandled request {:?}", req),
472 }
473 };
474
475 let (read_res, ()) = future::join(poll_read_at, succeeding_file_response).await;
476 assert_eq!(read_res.unwrap(), 1);
477 assert_eq!(&buf, b"0");
478 }
479
480 #[fasync::run_singlethreaded(test)]
481 async fn poll_read_at_zero_then_read_nonzero() {
482 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
483
484 let mut reader = AsyncFile::from_proxy(proxy);
485
486 let () = poll_fn(|cx| {
488 assert_matches!(Pin::new(&mut reader).poll_read_at(cx, 0, &mut []), Poll::Pending);
489 Poll::Ready(())
490 })
491 .await;
492
493 match stream.next().await.unwrap().unwrap() {
495 fio::FileRequest::ReadAt { count, offset, responder } => {
496 assert_eq!(count, 0);
497 assert_eq!(offset, 0);
498 responder.send(Ok(&[])).unwrap();
499 }
500 req => panic!("unhandled request {:?}", req),
501 }
502
503 let mut buf = vec![0u8; 1];
505 let poll_read_at = reader.read_at(0, &mut buf);
506
507 let handle_file_request = async {
512 match stream.next().await.unwrap().unwrap() {
513 fio::FileRequest::ReadAt { count, offset, responder } => {
514 assert_eq!(count, 1);
515 assert_eq!(offset, 0);
516 responder.send(Ok(&[1])).unwrap();
517 }
518 req => panic!("unhandled request {:?}", req),
519 }
520 };
521
522 let (poll_read, ()) = future::join(poll_read_at, handle_file_request).await;
523
524 assert_eq!(poll_read.unwrap(), 1);
527 assert_eq!(&buf[..], &[1]);
528 }
529
530 #[fasync::run_singlethreaded(test)]
531 async fn different_poll_read_at_and_file_sizes() {
532 for first_poll_read_len in 0..5 {
533 for file_size in 0..5 {
534 for second_poll_offset in 0..file_size {
535 for second_poll_read_len in 0..5 {
536 let (proxy, mut stream) =
537 endpoints::create_proxy_and_stream::<fio::FileMarker>();
538
539 let mut reader = AsyncFile::from_proxy(proxy);
540
541 let () = poll_fn(|cx| {
543 let mut buf = vec![0u8; first_poll_read_len];
544 assert_matches!(
545 Pin::new(&mut reader).poll_read_at(cx, 0, &mut buf),
546 Poll::Pending
547 );
548 Poll::Ready(())
549 })
550 .await;
551
552 match stream.next().await.unwrap().unwrap() {
555 fio::FileRequest::ReadAt { count, offset, responder } => {
556 assert_eq!(count, u64::try_from(first_poll_read_len).unwrap());
557 assert_eq!(offset, 0);
558 let resp = vec![7u8; min(file_size, first_poll_read_len)];
559 responder.send(Ok(&resp)).unwrap();
560 }
561 req => panic!("unhandled request {:?}", req),
562 }
563
564 let mut buf = vec![0u8; second_poll_read_len];
569 let poll_read_at = reader.read_at(second_poll_offset as u64, &mut buf);
570
571 let second_request = first_poll_read_len == 0 && second_poll_read_len != 0
572 || second_poll_offset != 0 && second_poll_offset >= first_poll_read_len;
573 let handle_conditional_file_request = async {
574 if second_request {
575 match stream.next().await.unwrap().unwrap() {
576 fio::FileRequest::ReadAt { count, offset, responder } => {
577 assert_eq!(
578 count,
579 u64::try_from(second_poll_read_len).unwrap()
580 );
581 assert_eq!(
582 offset,
583 u64::try_from(second_poll_offset).unwrap()
584 );
585 let resp = vec![
586 7u8;
587 min(
588 file_size - second_poll_offset,
589 second_poll_read_len
590 )
591 ];
592 responder.send(Ok(&resp)).unwrap();
593 }
594 req => panic!("unhandled request {:?}", req),
595 }
596 }
597 };
598
599 let (read_res, ()) =
600 future::join(poll_read_at, handle_conditional_file_request).await;
601
602 let expected_len = if second_request {
603 min(file_size - second_poll_offset, second_poll_read_len)
604 } else {
605 min(
606 min(file_size, first_poll_read_len) - second_poll_offset,
607 second_poll_read_len,
608 )
609 };
610 let expected = vec![7u8; expected_len];
611 assert_eq!(read_res.unwrap(), expected_len);
612 assert_eq!(&buf[..expected_len], &expected[..]);
613 }
614 }
615 }
616 }
617 }
618
619 async fn get_size_file_with_contents(contents: &[u8]) {
620 let dir = TempDir::new().unwrap();
621 let path = dir.path().join("get_size_file_with_contents").to_str().unwrap().to_owned();
622 let () = file::write_in_namespace(&path, contents).await.unwrap();
623 let file = file::open_in_namespace(&path, fio::PERM_READABLE).unwrap();
624
625 let mut reader = AsyncFile::from_proxy(file);
626
627 assert_eq!(reader.get_size().await.unwrap(), contents.len() as u64);
628 }
629
630 #[fasync::run_singlethreaded(test)]
631 async fn get_size_empty() {
632 get_size_file_with_contents(&[]).await;
633 }
634
635 #[fasync::run_singlethreaded(test)]
636 async fn get_size_large() {
637 let expected_contents = vec![7u8; (fio::MAX_BUF * 3).try_into().unwrap()];
638 get_size_file_with_contents(&expected_contents[..]).await;
639 }
640
641 #[fasync::run_singlethreaded(test)]
642 async fn get_size_changing_size() {
643 let (mut file, path) = NamedTempFile::new().unwrap().into_parts();
644 let proxy = file::open_in_namespace(path.to_str().unwrap(), fio::PERM_READABLE).unwrap();
645
646 let mut reader = AsyncFile::from_proxy(proxy);
647
648 assert_eq!(reader.get_size().await.unwrap(), 0);
649 file.write_all(&[1; 3][..]).unwrap();
650 assert_eq!(reader.get_size().await.unwrap(), 3);
651 file.write_all(&[2; 5][..]).unwrap();
652 assert_eq!(reader.get_size().await.unwrap(), 8);
653 }
654
655 #[fasync::run_singlethreaded(test)]
656 async fn adapter_for_cursor() {
657 let data = (0..1000).map(|i| (i % 256) as u8).collect::<Vec<_>>();
658 let cursor = futures::io::Cursor::new(data.clone());
659 let mut adapter = Adapter::new(cursor);
660
661 assert_eq!(adapter.get_size().await.unwrap(), 1000);
662
663 let mut buffer = vec![];
664 adapter.read_to_end(&mut buffer).await.unwrap();
665 assert_eq!(buffer, data);
666
667 let mut buffer = vec![0; 100];
668 adapter.read_at_exact(333, &mut buffer).await.unwrap();
669 assert_eq!(buffer, &data[333..433]);
670 }
671}