1use fidl::client::QueryResponseFut;
6use flex_client::Dialect;
7use flex_fuchsia_io as fio;
8use futures::FutureExt as _;
9use futures::future::Future;
10use futures::io::{AsyncRead, AsyncSeek, SeekFrom};
11use futures::lock::Mutex;
12use std::cmp::min;
13use std::convert::TryInto as _;
14use std::io;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18
19pub trait AsyncReadAt {
22 fn poll_read_at(
28 self: Pin<&mut Self>,
29 cx: &mut Context<'_>,
30 offset: u64,
31 buf: &mut [u8],
32 ) -> Poll<io::Result<usize>>;
33}
34
35pub trait AsyncGetSize {
37 fn poll_get_size(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>;
39}
40
41impl<T: AsyncReadAt + Unpin> AsyncReadAt for Arc<Mutex<T>> {
42 fn poll_read_at(
43 self: Pin<&mut Self>,
44 cx: &mut Context<'_>,
45 offset: u64,
46 buf: &mut [u8],
47 ) -> Poll<Result<usize, std::io::Error>> {
48 let mut guard = futures::ready!(self.lock().poll_unpin(cx));
49 Pin::new(&mut *guard).poll_read_at(cx, offset, buf)
50 }
51}
52
53impl<T: AsyncGetSize + Unpin> AsyncGetSize for Arc<Mutex<T>> {
54 fn poll_get_size(
55 self: Pin<&mut Self>,
56 cx: &mut Context<'_>,
57 ) -> Poll<Result<u64, std::io::Error>> {
58 let mut guard = futures::ready!(self.lock().poll_unpin(cx));
59 Pin::new(&mut *guard).poll_get_size(cx)
60 }
61}
62
63pub trait AsyncGetSizeExt: AsyncGetSize {
65 fn get_size<'a>(&'a mut self) -> GetSize<'a, Self>
67 where
68 Self: Unpin,
69 {
70 GetSize { size_getter: self }
71 }
72}
73
74impl<T: AsyncGetSize + ?Sized> AsyncGetSizeExt for T {}
75
76#[derive(Debug)]
78#[must_use = "futures do nothing unless you `.await` or poll them"]
79pub struct GetSize<'a, R: ?Sized> {
80 size_getter: &'a mut R,
81}
82
83impl<R: ?Sized + Unpin> Unpin for GetSize<'_, R> {}
84
85impl<R: AsyncGetSize + ?Sized + Unpin> Future for GetSize<'_, R> {
86 type Output = io::Result<u64>;
87
88 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
89 let this = &mut *self;
90 Pin::new(&mut *this.size_getter).poll_get_size(cx)
91 }
92}
93
94#[derive(Debug)]
101pub struct AsyncFile {
102 file: fio::FileProxy,
103 read_at_state: ReadAtState,
104 get_attributes_fut: Option<
105 QueryResponseFut<
106 Result<(fio::MutableNodeAttributes, fio::ImmutableNodeAttributes), i32>,
107 Dialect,
108 >,
109 >,
110}
111
112#[derive(Debug)]
113enum ReadAtState {
114 Empty,
115 Forwarding {
116 fut: QueryResponseFut<Result<Vec<u8>, i32>, Dialect>,
117 file_offset: u64,
118 zero_byte_request: bool,
119 },
120 Bytes {
121 bytes: Vec<u8>,
122 file_offset: u64,
123 },
124}
125
126impl AsyncFile {
127 pub fn from_proxy(file: fio::FileProxy) -> Self {
128 Self { file, read_at_state: ReadAtState::Empty, get_attributes_fut: None }
129 }
130}
131
132impl AsyncReadAt for AsyncFile {
133 fn poll_read_at(
134 mut self: Pin<&mut Self>,
135 cx: &mut Context<'_>,
136 offset: u64,
137 buf: &mut [u8],
138 ) -> Poll<std::io::Result<usize>> {
139 loop {
140 match self.read_at_state {
141 ReadAtState::Empty => {
142 let len = if let Ok(len) = buf.len().try_into() {
143 min(len, fio::MAX_BUF)
144 } else {
145 fio::MAX_BUF
146 };
147 self.read_at_state = ReadAtState::Forwarding {
148 fut: self.file.read_at(len, offset),
149 file_offset: offset,
150 zero_byte_request: len == 0,
151 };
152 }
153 ReadAtState::Forwarding { ref mut fut, file_offset, zero_byte_request } => {
154 match futures::ready!(Pin::new(fut).poll(cx)) {
155 Ok(result) => {
156 match result {
157 Err(s) => {
158 self.read_at_state = ReadAtState::Empty;
159 return Poll::Ready(Err(
160 zx_status::Status::from_raw(s).into_io_error()
161 ));
162 }
163 Ok(bytes) => {
164 if zero_byte_request && buf.len() != 0 {
173 self.read_at_state = ReadAtState::Empty;
174 } else {
175 self.read_at_state =
176 ReadAtState::Bytes { bytes, file_offset };
177 }
178 }
179 }
180 }
181 Err(e) => {
182 self.read_at_state = ReadAtState::Empty;
183 return Poll::Ready(Err(std::io::Error::other(e)));
184 }
185 }
186 }
187 ReadAtState::Bytes { ref bytes, file_offset } => {
188 if offset < file_offset {
189 self.read_at_state = ReadAtState::Empty;
190 continue;
191 }
192 let bytes_offset = match (offset - file_offset).try_into() {
193 Ok(offset) => offset,
194 Err(_) => {
195 self.read_at_state = ReadAtState::Empty;
196 continue;
197 }
198 };
199 if bytes_offset != 0 && bytes_offset >= bytes.len() {
200 self.read_at_state = ReadAtState::Empty;
201 continue;
202 }
203 let n = min(buf.len(), bytes.len() - bytes_offset);
204 let () = buf[..n].copy_from_slice(&bytes[bytes_offset..bytes_offset + n]);
205 return Poll::Ready(Ok(n));
206 }
207 }
208 }
209 }
210}
211
212impl AsyncGetSize for AsyncFile {
213 fn poll_get_size(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
214 if self.get_attributes_fut.is_none() {
215 self.get_attributes_fut =
216 Some(self.file.get_attributes(fio::NodeAttributesQuery::CONTENT_SIZE));
217 }
218 let fut = self.get_attributes_fut.as_mut().unwrap();
219 let get_attributes_fut_result = futures::ready!(fut.poll_unpin(cx));
220 self.get_attributes_fut = None;
221 match get_attributes_fut_result {
222 Ok(get_attributes_response) => match get_attributes_response {
223 Ok((_mutable_attr, immutable_attr)) => {
224 if let Some(content_size) = immutable_attr.content_size {
225 return Poll::Ready(Ok(content_size));
226 }
227 return Poll::Ready(Err(zx_status::Status::NOT_SUPPORTED.into_io_error()));
228 }
229 Err(status) => {
230 return Poll::Ready(Err(zx_status::Status::from_raw(status).into_io_error()));
231 }
232 },
233 Err(e) => {
234 return Poll::Ready(Err(std::io::Error::other(e)));
235 }
236 }
237 }
238}
239
240#[derive(Debug)]
242pub struct Adapter<T> {
243 inner: T,
244}
245
246impl<T> Adapter<T> {
247 pub fn new(inner: T) -> Adapter<T> {
248 Self { inner }
249 }
250}
251
252impl<T: AsyncRead + AsyncSeek + Unpin> AsyncReadAt for Adapter<T> {
253 fn poll_read_at(
254 mut self: Pin<&mut Self>,
255 cx: &mut Context<'_>,
256 offset: u64,
257 buf: &mut [u8],
258 ) -> Poll<std::io::Result<usize>> {
259 futures::ready!(Pin::new(&mut self.inner).poll_seek(cx, SeekFrom::Start(offset)))?;
260 Pin::new(&mut self.inner).poll_read(cx, buf)
261 }
262}
263
264impl<T: AsyncSeek + Unpin> AsyncGetSize for Adapter<T> {
265 fn poll_get_size(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
266 Pin::new(&mut self.inner).poll_seek(cx, SeekFrom::End(0))
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273 use crate::file::{self, AsyncReadAtExt};
274 use assert_matches::assert_matches;
275 use fidl::endpoints;
276 use fuchsia_async as fasync;
277 use futures::future::{self, poll_fn};
278 use futures::{StreamExt as _, TryStreamExt as _};
279 use std::convert::TryFrom as _;
280 use std::io::Write;
281 use tempfile::{NamedTempFile, TempDir};
282
283 async fn poll_read_at_with_specific_buf_size(
284 poll_read_size: u64,
285 expected_file_read_size: u64,
286 ) {
287 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
288
289 let mut reader = AsyncFile::from_proxy(proxy);
290
291 let () = poll_fn(|cx| {
292 let mut buf = vec![0u8; poll_read_size.try_into().unwrap()];
293 assert_matches!(
294 Pin::new(&mut reader).poll_read_at(cx, 0, buf.as_mut_slice()),
295 Poll::Pending
296 );
297 Poll::Ready(())
298 })
299 .await;
300
301 match stream.next().await.unwrap().unwrap() {
302 fio::FileRequest::ReadAt { count, .. } => {
303 assert_eq!(count, expected_file_read_size);
304 }
305 req => panic!("unhandled request {:?}", req),
306 }
307 }
308
309 #[fasync::run_singlethreaded(test)]
310 async fn poll_read_at_empty_buf() {
311 poll_read_at_with_specific_buf_size(0, 0).await;
312 }
313
314 #[fasync::run_singlethreaded(test)]
315 async fn poll_read_at_caps_buf_size() {
316 poll_read_at_with_specific_buf_size(fio::MAX_BUF * 2, fio::MAX_BUF).await;
317 }
318
319 #[fasync::run_singlethreaded(test)]
320 async fn poll_read_at_pending_saves_future() {
321 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
322
323 let mut reader = AsyncFile::from_proxy(proxy);
324
325 let () = poll_fn(|cx| {
330 assert_matches!(
331 Pin::new(&mut reader).poll_read_at(cx, 2, &mut [0u8; 1]),
332 Poll::Pending
333 );
334 Poll::Ready(())
335 })
336 .await;
337
338 let poll_read_at = async move {
341 let mut buf = [0u8; 1];
342 assert_eq!(reader.read_at(2, &mut buf).await.unwrap(), buf.len());
343 assert_eq!(&buf, &[1]);
344 };
345
346 let mut file_read_requests = 0u8;
347 let handle_file_stream = async {
348 while let Some(req) = stream.try_next().await.unwrap() {
349 file_read_requests += 1;
350 match req {
351 fio::FileRequest::ReadAt { count, offset, responder } => {
352 assert_eq!(count, 1);
353 assert_eq!(offset, 2);
354 responder.send(Ok(&[file_read_requests])).unwrap();
355 }
356 req => panic!("unhandled request {:?}", req),
357 }
358 }
359 };
360
361 let ((), ()) = future::join(poll_read_at, handle_file_stream).await;
362 assert_eq!(file_read_requests, 1);
363 }
364
365 #[fasync::run_singlethreaded(test)]
366 async fn poll_read_at_with_smaller_buf_after_pending() {
367 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
368
369 let mut reader = AsyncFile::from_proxy(proxy);
370
371 let () = poll_fn(|cx| {
375 assert_matches!(
376 Pin::new(&mut reader).poll_read_at(cx, 0, &mut [0u8; 3]),
377 Poll::Pending
378 );
379 Poll::Ready(())
380 })
381 .await;
382
383 let () = async {
385 match stream.next().await.unwrap().unwrap() {
386 fio::FileRequest::ReadAt { count, offset, responder } => {
387 assert_eq!(count, 3);
388 assert_eq!(offset, 0);
389 responder.send(Ok(b"012")).unwrap();
390 }
391 req => panic!("unhandled request {:?}", req),
392 }
393 }
394 .await;
395
396 let mut buf = [0u8; 1];
400 assert_eq!(reader.read_at(0, &mut buf).await.unwrap(), buf.len());
401 assert_eq!(&buf, b"0");
402
403 let mut buf = [0u8; 1];
406 assert_eq!(reader.read_at(1, &mut buf).await.unwrap(), buf.len());
407 assert_eq!(&buf, b"1");
408
409 let mut buf = [0u8; 2];
412 assert_eq!(reader.read_at(2, &mut buf).await.unwrap(), 1);
413 assert_eq!(&buf[..1], b"2");
414
415 let mut buf = [0u8; 4];
418 let poll_read_at = reader.read_at(3, &mut buf);
419
420 let handle_second_file_request = async {
421 match stream.next().await.unwrap().unwrap() {
422 fio::FileRequest::ReadAt { count, offset, responder } => {
423 assert_eq!(count, 4);
424 assert_eq!(offset, 3);
425 responder.send(Ok(b"3456")).unwrap();
426 }
427 req => panic!("unhandled request {:?}", req),
428 }
429 };
430
431 let (read_res, ()) = future::join(poll_read_at, handle_second_file_request).await;
432 assert_eq!(read_res.unwrap(), 4);
433 assert_eq!(&buf, b"3456");
434 }
435
436 #[fasync::run_singlethreaded(test)]
437 async fn transition_to_empty_on_fidl_error() {
438 let (proxy, _) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
439
440 let mut reader = AsyncFile::from_proxy(proxy);
441
442 let () = poll_fn(|cx| {
444 assert_matches!(
445 Pin::new(&mut reader).poll_read_at(cx, 0, &mut [0u8; 1]),
446 Poll::Ready(Err(_))
447 );
448 Poll::Ready(())
449 })
450 .await;
451
452 assert_matches!(reader.read_at_state, ReadAtState::Empty);
457 }
458
459 #[fasync::run_singlethreaded(test)]
460 async fn recover_from_file_read_error() {
461 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
462
463 let mut reader = AsyncFile::from_proxy(proxy);
464
465 let mut buf = [0u8; 1];
467 let poll_read_at = reader.read_at(0, &mut buf);
468
469 let failing_file_response = async {
470 match stream.next().await.unwrap().unwrap() {
471 fio::FileRequest::ReadAt { count, offset, responder } => {
472 assert_eq!(count, 1);
473 assert_eq!(offset, 0);
474 responder.send(Err(zx_status::Status::NO_MEMORY.into_raw())).unwrap();
475 }
476 req => panic!("unhandled request {:?}", req),
477 }
478 };
479
480 let (read_res, ()) = future::join(poll_read_at, failing_file_response).await;
481 assert_matches!(read_res, Err(_));
482
483 let mut buf = [0u8; 1];
486 let poll_read_at = reader.read_at(0, &mut buf);
487
488 let succeeding_file_response = async {
489 match stream.next().await.unwrap().unwrap() {
490 fio::FileRequest::ReadAt { count, offset, responder } => {
491 assert_eq!(count, 1);
492 assert_eq!(offset, 0);
493 responder.send(Ok(b"0")).unwrap();
494 }
495 req => panic!("unhandled request {:?}", req),
496 }
497 };
498
499 let (read_res, ()) = future::join(poll_read_at, succeeding_file_response).await;
500 assert_eq!(read_res.unwrap(), 1);
501 assert_eq!(&buf, b"0");
502 }
503
504 #[fasync::run_singlethreaded(test)]
505 async fn poll_read_at_zero_then_read_nonzero() {
506 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
507
508 let mut reader = AsyncFile::from_proxy(proxy);
509
510 let () = poll_fn(|cx| {
512 assert_matches!(Pin::new(&mut reader).poll_read_at(cx, 0, &mut []), Poll::Pending);
513 Poll::Ready(())
514 })
515 .await;
516
517 match stream.next().await.unwrap().unwrap() {
519 fio::FileRequest::ReadAt { count, offset, responder } => {
520 assert_eq!(count, 0);
521 assert_eq!(offset, 0);
522 responder.send(Ok(&[])).unwrap();
523 }
524 req => panic!("unhandled request {:?}", req),
525 }
526
527 let mut buf = vec![0u8; 1];
529 let poll_read_at = reader.read_at(0, &mut buf);
530
531 let handle_file_request = async {
536 match stream.next().await.unwrap().unwrap() {
537 fio::FileRequest::ReadAt { count, offset, responder } => {
538 assert_eq!(count, 1);
539 assert_eq!(offset, 0);
540 responder.send(Ok(&[1])).unwrap();
541 }
542 req => panic!("unhandled request {:?}", req),
543 }
544 };
545
546 let (poll_read, ()) = future::join(poll_read_at, handle_file_request).await;
547
548 assert_eq!(poll_read.unwrap(), 1);
551 assert_eq!(&buf[..], &[1]);
552 }
553
554 #[fasync::run_singlethreaded(test)]
555 async fn different_poll_read_at_and_file_sizes() {
556 for first_poll_read_len in 0..5 {
557 for file_size in 0..5 {
558 for second_poll_offset in 0..file_size {
559 for second_poll_read_len in 0..5 {
560 let (proxy, mut stream) =
561 endpoints::create_proxy_and_stream::<fio::FileMarker>();
562
563 let mut reader = AsyncFile::from_proxy(proxy);
564
565 let () = poll_fn(|cx| {
567 let mut buf = vec![0u8; first_poll_read_len];
568 assert_matches!(
569 Pin::new(&mut reader).poll_read_at(cx, 0, &mut buf),
570 Poll::Pending
571 );
572 Poll::Ready(())
573 })
574 .await;
575
576 match stream.next().await.unwrap().unwrap() {
579 fio::FileRequest::ReadAt { count, offset, responder } => {
580 assert_eq!(count, u64::try_from(first_poll_read_len).unwrap());
581 assert_eq!(offset, 0);
582 let resp = vec![7u8; min(file_size, first_poll_read_len)];
583 responder.send(Ok(&resp)).unwrap();
584 }
585 req => panic!("unhandled request {:?}", req),
586 }
587
588 let mut buf = vec![0u8; second_poll_read_len];
593 let poll_read_at = reader.read_at(second_poll_offset as u64, &mut buf);
594
595 let second_request = first_poll_read_len == 0 && second_poll_read_len != 0
596 || second_poll_offset != 0 && second_poll_offset >= first_poll_read_len;
597 let handle_conditional_file_request = async {
598 if second_request {
599 match stream.next().await.unwrap().unwrap() {
600 fio::FileRequest::ReadAt { count, offset, responder } => {
601 assert_eq!(
602 count,
603 u64::try_from(second_poll_read_len).unwrap()
604 );
605 assert_eq!(
606 offset,
607 u64::try_from(second_poll_offset).unwrap()
608 );
609 let resp = vec![
610 7u8;
611 min(
612 file_size - second_poll_offset,
613 second_poll_read_len
614 )
615 ];
616 responder.send(Ok(&resp)).unwrap();
617 }
618 req => panic!("unhandled request {:?}", req),
619 }
620 }
621 };
622
623 let (read_res, ()) =
624 future::join(poll_read_at, handle_conditional_file_request).await;
625
626 let expected_len = if second_request {
627 min(file_size - second_poll_offset, second_poll_read_len)
628 } else {
629 min(
630 min(file_size, first_poll_read_len) - second_poll_offset,
631 second_poll_read_len,
632 )
633 };
634 let expected = vec![7u8; expected_len];
635 assert_eq!(read_res.unwrap(), expected_len);
636 assert_eq!(&buf[..expected_len], &expected[..]);
637 }
638 }
639 }
640 }
641 }
642
643 async fn get_size_file_with_contents(contents: &[u8]) {
644 let dir = TempDir::new().unwrap();
645 let path = dir.path().join("get_size_file_with_contents").to_str().unwrap().to_owned();
646 let () = file::write_in_namespace(&path, contents).await.unwrap();
647 let file = file::open_in_namespace(&path, fio::PERM_READABLE).unwrap();
648
649 let mut reader = AsyncFile::from_proxy(file);
650
651 assert_eq!(reader.get_size().await.unwrap(), contents.len() as u64);
652 }
653
654 #[fasync::run_singlethreaded(test)]
655 async fn get_size_empty() {
656 get_size_file_with_contents(&[]).await;
657 }
658
659 #[fasync::run_singlethreaded(test)]
660 async fn get_size_large() {
661 let expected_contents = vec![7u8; (fio::MAX_BUF * 3).try_into().unwrap()];
662 get_size_file_with_contents(&expected_contents[..]).await;
663 }
664
665 #[fasync::run_singlethreaded(test)]
666 async fn get_size_changing_size() {
667 let (mut file, path) = NamedTempFile::new().unwrap().into_parts();
668 let proxy = file::open_in_namespace(path.to_str().unwrap(), fio::PERM_READABLE).unwrap();
669
670 let mut reader = AsyncFile::from_proxy(proxy);
671
672 assert_eq!(reader.get_size().await.unwrap(), 0);
673 file.write_all(&[1; 3][..]).unwrap();
674 assert_eq!(reader.get_size().await.unwrap(), 3);
675 file.write_all(&[2; 5][..]).unwrap();
676 assert_eq!(reader.get_size().await.unwrap(), 8);
677 }
678
679 #[fasync::run_singlethreaded(test)]
680 async fn adapter_for_cursor() {
681 let data = (0..1000).map(|i| (i % 256) as u8).collect::<Vec<_>>();
682 let cursor = futures::io::Cursor::new(data.clone());
683 let mut adapter = Adapter::new(cursor);
684
685 assert_eq!(adapter.get_size().await.unwrap(), 1000);
686
687 let mut buffer = vec![];
688 adapter.read_to_end(&mut buffer).await.unwrap();
689 assert_eq!(buffer, data);
690
691 let mut buffer = vec![0; 100];
692 adapter.read_at_exact(333, &mut buffer).await.unwrap();
693 assert_eq!(buffer, &data[333..433]);
694 }
695}