1use fidl::client::QueryResponseFut;
6use flex_client::Dialect;
7use flex_fuchsia_io as fio;
8use futures::FutureExt as _;
9use futures::future::Future;
10use futures::io::{AsyncRead, AsyncSeek, SeekFrom};
11use futures::lock::Mutex;
12use std::cmp::min;
13use std::convert::TryInto as _;
14use std::io;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18use zx_status_ext::StatusExt;
19
20pub trait AsyncReadAt {
23 fn poll_read_at(
29 self: Pin<&mut Self>,
30 cx: &mut Context<'_>,
31 offset: u64,
32 buf: &mut [u8],
33 ) -> Poll<io::Result<usize>>;
34}
35
36pub trait AsyncGetSize {
38 fn poll_get_size(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>;
40}
41
42impl<T: AsyncReadAt + Unpin> AsyncReadAt for Arc<Mutex<T>> {
43 fn poll_read_at(
44 self: Pin<&mut Self>,
45 cx: &mut Context<'_>,
46 offset: u64,
47 buf: &mut [u8],
48 ) -> Poll<Result<usize, std::io::Error>> {
49 let mut guard = futures::ready!(self.lock().poll_unpin(cx));
50 Pin::new(&mut *guard).poll_read_at(cx, offset, buf)
51 }
52}
53
54impl<T: AsyncGetSize + Unpin> AsyncGetSize for Arc<Mutex<T>> {
55 fn poll_get_size(
56 self: Pin<&mut Self>,
57 cx: &mut Context<'_>,
58 ) -> Poll<Result<u64, std::io::Error>> {
59 let mut guard = futures::ready!(self.lock().poll_unpin(cx));
60 Pin::new(&mut *guard).poll_get_size(cx)
61 }
62}
63
64pub trait AsyncGetSizeExt: AsyncGetSize {
66 fn get_size<'a>(&'a mut self) -> GetSize<'a, Self>
68 where
69 Self: Unpin,
70 {
71 GetSize { size_getter: self }
72 }
73}
74
75impl<T: AsyncGetSize + ?Sized> AsyncGetSizeExt for T {}
76
77#[derive(Debug)]
79#[must_use = "futures do nothing unless you `.await` or poll them"]
80pub struct GetSize<'a, R: ?Sized> {
81 size_getter: &'a mut R,
82}
83
84impl<R: ?Sized + Unpin> Unpin for GetSize<'_, R> {}
85
86impl<R: AsyncGetSize + ?Sized + Unpin> Future for GetSize<'_, R> {
87 type Output = io::Result<u64>;
88
89 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
90 let this = &mut *self;
91 Pin::new(&mut *this.size_getter).poll_get_size(cx)
92 }
93}
94
95#[derive(Debug)]
102pub struct AsyncFile {
103 file: fio::FileProxy,
104 read_at_state: ReadAtState,
105 get_attributes_fut: Option<
106 QueryResponseFut<
107 Result<(fio::MutableNodeAttributes, fio::ImmutableNodeAttributes), i32>,
108 Dialect,
109 >,
110 >,
111}
112
113#[derive(Debug)]
114enum ReadAtState {
115 Empty,
116 Forwarding {
117 fut: QueryResponseFut<Result<Vec<u8>, i32>, Dialect>,
118 file_offset: u64,
119 zero_byte_request: bool,
120 },
121 Bytes {
122 bytes: Vec<u8>,
123 file_offset: u64,
124 },
125}
126
127impl AsyncFile {
128 pub fn from_proxy(file: fio::FileProxy) -> Self {
129 Self { file, read_at_state: ReadAtState::Empty, get_attributes_fut: None }
130 }
131}
132
133impl AsyncReadAt for AsyncFile {
134 fn poll_read_at(
135 mut self: Pin<&mut Self>,
136 cx: &mut Context<'_>,
137 offset: u64,
138 buf: &mut [u8],
139 ) -> Poll<std::io::Result<usize>> {
140 loop {
141 match self.read_at_state {
142 ReadAtState::Empty => {
143 let len = if let Ok(len) = buf.len().try_into() {
144 min(len, fio::MAX_BUF)
145 } else {
146 fio::MAX_BUF
147 };
148 self.read_at_state = ReadAtState::Forwarding {
149 fut: self.file.read_at(len, offset),
150 file_offset: offset,
151 zero_byte_request: len == 0,
152 };
153 }
154 ReadAtState::Forwarding { ref mut fut, file_offset, zero_byte_request } => {
155 match futures::ready!(Pin::new(fut).poll(cx)) {
156 Ok(result) => {
157 match result {
158 Err(s) => {
159 self.read_at_state = ReadAtState::Empty;
160 return Poll::Ready(Err(
161 zx_status::Status::from_raw(s).into_io_error()
162 ));
163 }
164 Ok(bytes) => {
165 if zero_byte_request && buf.len() != 0 {
174 self.read_at_state = ReadAtState::Empty;
175 } else {
176 self.read_at_state =
177 ReadAtState::Bytes { bytes, file_offset };
178 }
179 }
180 }
181 }
182 Err(e) => {
183 self.read_at_state = ReadAtState::Empty;
184 return Poll::Ready(Err(std::io::Error::other(e)));
185 }
186 }
187 }
188 ReadAtState::Bytes { ref bytes, file_offset } => {
189 if offset < file_offset {
190 self.read_at_state = ReadAtState::Empty;
191 continue;
192 }
193 let bytes_offset = match (offset - file_offset).try_into() {
194 Ok(offset) => offset,
195 Err(_) => {
196 self.read_at_state = ReadAtState::Empty;
197 continue;
198 }
199 };
200 if bytes_offset != 0 && bytes_offset >= bytes.len() {
201 self.read_at_state = ReadAtState::Empty;
202 continue;
203 }
204 let n = min(buf.len(), bytes.len() - bytes_offset);
205 let () = buf[..n].copy_from_slice(&bytes[bytes_offset..bytes_offset + n]);
206 return Poll::Ready(Ok(n));
207 }
208 }
209 }
210 }
211}
212
213impl AsyncGetSize for AsyncFile {
214 fn poll_get_size(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
215 if self.get_attributes_fut.is_none() {
216 self.get_attributes_fut =
217 Some(self.file.get_attributes(fio::NodeAttributesQuery::CONTENT_SIZE));
218 }
219 let fut = self.get_attributes_fut.as_mut().unwrap();
220 let get_attributes_fut_result = futures::ready!(fut.poll_unpin(cx));
221 self.get_attributes_fut = None;
222 match get_attributes_fut_result {
223 Ok(get_attributes_response) => match get_attributes_response {
224 Ok((_mutable_attr, immutable_attr)) => {
225 if let Some(content_size) = immutable_attr.content_size {
226 return Poll::Ready(Ok(content_size));
227 }
228 return Poll::Ready(Err(zx_status::Status::NOT_SUPPORTED.into_io_error()));
229 }
230 Err(status) => {
231 return Poll::Ready(Err(zx_status::Status::from_raw(status).into_io_error()));
232 }
233 },
234 Err(e) => {
235 return Poll::Ready(Err(std::io::Error::other(e)));
236 }
237 }
238 }
239}
240
241#[derive(Debug)]
243pub struct Adapter<T> {
244 inner: T,
245}
246
247impl<T> Adapter<T> {
248 pub fn new(inner: T) -> Adapter<T> {
249 Self { inner }
250 }
251}
252
253impl<T: AsyncRead + AsyncSeek + Unpin> AsyncReadAt for Adapter<T> {
254 fn poll_read_at(
255 mut self: Pin<&mut Self>,
256 cx: &mut Context<'_>,
257 offset: u64,
258 buf: &mut [u8],
259 ) -> Poll<std::io::Result<usize>> {
260 futures::ready!(Pin::new(&mut self.inner).poll_seek(cx, SeekFrom::Start(offset)))?;
261 Pin::new(&mut self.inner).poll_read(cx, buf)
262 }
263}
264
265impl<T: AsyncSeek + Unpin> AsyncGetSize for Adapter<T> {
266 fn poll_get_size(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
267 Pin::new(&mut self.inner).poll_seek(cx, SeekFrom::End(0))
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274 use crate::file::{self, AsyncReadAtExt};
275 use assert_matches::assert_matches;
276 use fidl::endpoints;
277 use fuchsia_async as fasync;
278 use futures::future::{self, poll_fn};
279 use futures::{StreamExt as _, TryStreamExt as _};
280 use std::convert::TryFrom as _;
281 use std::io::Write;
282 use tempfile::{NamedTempFile, TempDir};
283
284 async fn poll_read_at_with_specific_buf_size(
285 poll_read_size: u64,
286 expected_file_read_size: u64,
287 ) {
288 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
289
290 let mut reader = AsyncFile::from_proxy(proxy);
291
292 let () = poll_fn(|cx| {
293 let mut buf = vec![0u8; poll_read_size.try_into().unwrap()];
294 assert_matches!(
295 Pin::new(&mut reader).poll_read_at(cx, 0, buf.as_mut_slice()),
296 Poll::Pending
297 );
298 Poll::Ready(())
299 })
300 .await;
301
302 match stream.next().await.unwrap().unwrap() {
303 fio::FileRequest::ReadAt { count, .. } => {
304 assert_eq!(count, expected_file_read_size);
305 }
306 req => panic!("unhandled request {:?}", req),
307 }
308 }
309
310 #[fasync::run_singlethreaded(test)]
311 async fn poll_read_at_empty_buf() {
312 poll_read_at_with_specific_buf_size(0, 0).await;
313 }
314
315 #[fasync::run_singlethreaded(test)]
316 async fn poll_read_at_caps_buf_size() {
317 poll_read_at_with_specific_buf_size(fio::MAX_BUF * 2, fio::MAX_BUF).await;
318 }
319
320 #[fasync::run_singlethreaded(test)]
321 async fn poll_read_at_pending_saves_future() {
322 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
323
324 let mut reader = AsyncFile::from_proxy(proxy);
325
326 let () = poll_fn(|cx| {
331 assert_matches!(
332 Pin::new(&mut reader).poll_read_at(cx, 2, &mut [0u8; 1]),
333 Poll::Pending
334 );
335 Poll::Ready(())
336 })
337 .await;
338
339 let poll_read_at = async move {
342 let mut buf = [0u8; 1];
343 assert_eq!(reader.read_at(2, &mut buf).await.unwrap(), buf.len());
344 assert_eq!(&buf, &[1]);
345 };
346
347 let mut file_read_requests = 0u8;
348 let handle_file_stream = async {
349 while let Some(req) = stream.try_next().await.unwrap() {
350 file_read_requests += 1;
351 match req {
352 fio::FileRequest::ReadAt { count, offset, responder } => {
353 assert_eq!(count, 1);
354 assert_eq!(offset, 2);
355 responder.send(Ok(&[file_read_requests])).unwrap();
356 }
357 req => panic!("unhandled request {:?}", req),
358 }
359 }
360 };
361
362 let ((), ()) = future::join(poll_read_at, handle_file_stream).await;
363 assert_eq!(file_read_requests, 1);
364 }
365
366 #[fasync::run_singlethreaded(test)]
367 async fn poll_read_at_with_smaller_buf_after_pending() {
368 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
369
370 let mut reader = AsyncFile::from_proxy(proxy);
371
372 let () = poll_fn(|cx| {
376 assert_matches!(
377 Pin::new(&mut reader).poll_read_at(cx, 0, &mut [0u8; 3]),
378 Poll::Pending
379 );
380 Poll::Ready(())
381 })
382 .await;
383
384 let () = async {
386 match stream.next().await.unwrap().unwrap() {
387 fio::FileRequest::ReadAt { count, offset, responder } => {
388 assert_eq!(count, 3);
389 assert_eq!(offset, 0);
390 responder.send(Ok(b"012")).unwrap();
391 }
392 req => panic!("unhandled request {:?}", req),
393 }
394 }
395 .await;
396
397 let mut buf = [0u8; 1];
401 assert_eq!(reader.read_at(0, &mut buf).await.unwrap(), buf.len());
402 assert_eq!(&buf, b"0");
403
404 let mut buf = [0u8; 1];
407 assert_eq!(reader.read_at(1, &mut buf).await.unwrap(), buf.len());
408 assert_eq!(&buf, b"1");
409
410 let mut buf = [0u8; 2];
413 assert_eq!(reader.read_at(2, &mut buf).await.unwrap(), 1);
414 assert_eq!(&buf[..1], b"2");
415
416 let mut buf = [0u8; 4];
419 let poll_read_at = reader.read_at(3, &mut buf);
420
421 let handle_second_file_request = async {
422 match stream.next().await.unwrap().unwrap() {
423 fio::FileRequest::ReadAt { count, offset, responder } => {
424 assert_eq!(count, 4);
425 assert_eq!(offset, 3);
426 responder.send(Ok(b"3456")).unwrap();
427 }
428 req => panic!("unhandled request {:?}", req),
429 }
430 };
431
432 let (read_res, ()) = future::join(poll_read_at, handle_second_file_request).await;
433 assert_eq!(read_res.unwrap(), 4);
434 assert_eq!(&buf, b"3456");
435 }
436
437 #[fasync::run_singlethreaded(test)]
438 async fn transition_to_empty_on_fidl_error() {
439 let (proxy, _) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
440
441 let mut reader = AsyncFile::from_proxy(proxy);
442
443 let () = poll_fn(|cx| {
445 assert_matches!(
446 Pin::new(&mut reader).poll_read_at(cx, 0, &mut [0u8; 1]),
447 Poll::Ready(Err(_))
448 );
449 Poll::Ready(())
450 })
451 .await;
452
453 assert_matches!(reader.read_at_state, ReadAtState::Empty);
458 }
459
460 #[fasync::run_singlethreaded(test)]
461 async fn recover_from_file_read_error() {
462 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
463
464 let mut reader = AsyncFile::from_proxy(proxy);
465
466 let mut buf = [0u8; 1];
468 let poll_read_at = reader.read_at(0, &mut buf);
469
470 let failing_file_response = async {
471 match stream.next().await.unwrap().unwrap() {
472 fio::FileRequest::ReadAt { count, offset, responder } => {
473 assert_eq!(count, 1);
474 assert_eq!(offset, 0);
475 responder.send(Err(zx_status::Status::NO_MEMORY.into_raw())).unwrap();
476 }
477 req => panic!("unhandled request {:?}", req),
478 }
479 };
480
481 let (read_res, ()) = future::join(poll_read_at, failing_file_response).await;
482 assert_matches!(read_res, Err(_));
483
484 let mut buf = [0u8; 1];
487 let poll_read_at = reader.read_at(0, &mut buf);
488
489 let succeeding_file_response = async {
490 match stream.next().await.unwrap().unwrap() {
491 fio::FileRequest::ReadAt { count, offset, responder } => {
492 assert_eq!(count, 1);
493 assert_eq!(offset, 0);
494 responder.send(Ok(b"0")).unwrap();
495 }
496 req => panic!("unhandled request {:?}", req),
497 }
498 };
499
500 let (read_res, ()) = future::join(poll_read_at, succeeding_file_response).await;
501 assert_eq!(read_res.unwrap(), 1);
502 assert_eq!(&buf, b"0");
503 }
504
505 #[fasync::run_singlethreaded(test)]
506 async fn poll_read_at_zero_then_read_nonzero() {
507 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
508
509 let mut reader = AsyncFile::from_proxy(proxy);
510
511 let () = poll_fn(|cx| {
513 assert_matches!(Pin::new(&mut reader).poll_read_at(cx, 0, &mut []), Poll::Pending);
514 Poll::Ready(())
515 })
516 .await;
517
518 match stream.next().await.unwrap().unwrap() {
520 fio::FileRequest::ReadAt { count, offset, responder } => {
521 assert_eq!(count, 0);
522 assert_eq!(offset, 0);
523 responder.send(Ok(&[])).unwrap();
524 }
525 req => panic!("unhandled request {:?}", req),
526 }
527
528 let mut buf = vec![0u8; 1];
530 let poll_read_at = reader.read_at(0, &mut buf);
531
532 let handle_file_request = async {
537 match stream.next().await.unwrap().unwrap() {
538 fio::FileRequest::ReadAt { count, offset, responder } => {
539 assert_eq!(count, 1);
540 assert_eq!(offset, 0);
541 responder.send(Ok(&[1])).unwrap();
542 }
543 req => panic!("unhandled request {:?}", req),
544 }
545 };
546
547 let (poll_read, ()) = future::join(poll_read_at, handle_file_request).await;
548
549 assert_eq!(poll_read.unwrap(), 1);
552 assert_eq!(&buf[..], &[1]);
553 }
554
555 #[fasync::run_singlethreaded(test)]
556 async fn different_poll_read_at_and_file_sizes() {
557 for first_poll_read_len in 0..5 {
558 for file_size in 0..5 {
559 for second_poll_offset in 0..file_size {
560 for second_poll_read_len in 0..5 {
561 let (proxy, mut stream) =
562 endpoints::create_proxy_and_stream::<fio::FileMarker>();
563
564 let mut reader = AsyncFile::from_proxy(proxy);
565
566 let () = poll_fn(|cx| {
568 let mut buf = vec![0u8; first_poll_read_len];
569 assert_matches!(
570 Pin::new(&mut reader).poll_read_at(cx, 0, &mut buf),
571 Poll::Pending
572 );
573 Poll::Ready(())
574 })
575 .await;
576
577 match stream.next().await.unwrap().unwrap() {
580 fio::FileRequest::ReadAt { count, offset, responder } => {
581 assert_eq!(count, u64::try_from(first_poll_read_len).unwrap());
582 assert_eq!(offset, 0);
583 let resp = vec![7u8; min(file_size, first_poll_read_len)];
584 responder.send(Ok(&resp)).unwrap();
585 }
586 req => panic!("unhandled request {:?}", req),
587 }
588
589 let mut buf = vec![0u8; second_poll_read_len];
594 let poll_read_at = reader.read_at(second_poll_offset as u64, &mut buf);
595
596 let second_request = first_poll_read_len == 0 && second_poll_read_len != 0
597 || second_poll_offset != 0 && second_poll_offset >= first_poll_read_len;
598 let handle_conditional_file_request = async {
599 if second_request {
600 match stream.next().await.unwrap().unwrap() {
601 fio::FileRequest::ReadAt { count, offset, responder } => {
602 assert_eq!(
603 count,
604 u64::try_from(second_poll_read_len).unwrap()
605 );
606 assert_eq!(
607 offset,
608 u64::try_from(second_poll_offset).unwrap()
609 );
610 let resp = vec![
611 7u8;
612 min(
613 file_size - second_poll_offset,
614 second_poll_read_len
615 )
616 ];
617 responder.send(Ok(&resp)).unwrap();
618 }
619 req => panic!("unhandled request {:?}", req),
620 }
621 }
622 };
623
624 let (read_res, ()) =
625 future::join(poll_read_at, handle_conditional_file_request).await;
626
627 let expected_len = if second_request {
628 min(file_size - second_poll_offset, second_poll_read_len)
629 } else {
630 min(
631 min(file_size, first_poll_read_len) - second_poll_offset,
632 second_poll_read_len,
633 )
634 };
635 let expected = vec![7u8; expected_len];
636 assert_eq!(read_res.unwrap(), expected_len);
637 assert_eq!(&buf[..expected_len], &expected[..]);
638 }
639 }
640 }
641 }
642 }
643
644 async fn get_size_file_with_contents(contents: &[u8]) {
645 let dir = TempDir::new().unwrap();
646 let path = dir.path().join("get_size_file_with_contents").to_str().unwrap().to_owned();
647 let () = file::write_in_namespace(&path, contents).await.unwrap();
648 let file = file::open_in_namespace(&path, fio::PERM_READABLE).unwrap();
649
650 let mut reader = AsyncFile::from_proxy(file);
651
652 assert_eq!(reader.get_size().await.unwrap(), contents.len() as u64);
653 }
654
655 #[fasync::run_singlethreaded(test)]
656 async fn get_size_empty() {
657 get_size_file_with_contents(&[]).await;
658 }
659
660 #[fasync::run_singlethreaded(test)]
661 async fn get_size_large() {
662 let expected_contents = vec![7u8; (fio::MAX_BUF * 3).try_into().unwrap()];
663 get_size_file_with_contents(&expected_contents[..]).await;
664 }
665
666 #[fasync::run_singlethreaded(test)]
667 async fn get_size_changing_size() {
668 let (mut file, path) = NamedTempFile::new().unwrap().into_parts();
669 let proxy = file::open_in_namespace(path.to_str().unwrap(), fio::PERM_READABLE).unwrap();
670
671 let mut reader = AsyncFile::from_proxy(proxy);
672
673 assert_eq!(reader.get_size().await.unwrap(), 0);
674 file.write_all(&[1; 3][..]).unwrap();
675 assert_eq!(reader.get_size().await.unwrap(), 3);
676 file.write_all(&[2; 5][..]).unwrap();
677 assert_eq!(reader.get_size().await.unwrap(), 8);
678 }
679
680 #[fasync::run_singlethreaded(test)]
681 async fn adapter_for_cursor() {
682 let data = (0..1000).map(|i| (i % 256) as u8).collect::<Vec<_>>();
683 let cursor = futures::io::Cursor::new(data.clone());
684 let mut adapter = Adapter::new(cursor);
685
686 assert_eq!(adapter.get_size().await.unwrap(), 1000);
687
688 let mut buffer = vec![];
689 adapter.read_to_end(&mut buffer).await.unwrap();
690 assert_eq!(buffer, data);
691
692 let mut buffer = vec![0; 100];
693 adapter.read_at_exact(333, &mut buffer).await.unwrap();
694 assert_eq!(buffer, &data[333..433]);
695 }
696}