1#![cfg(target_os = "fuchsia")]
6#![deny(missing_docs)]
7
8use crate::types::{BlobId, BlobInfo};
11use fidl_fuchsia_pkg as fpkg;
12use fuchsia_pkg::PackageDirectory;
13use futures::prelude::*;
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::sync::Arc;
16use zx_status::Status;
17
18mod storage;
19
20#[derive(Debug, Clone)]
22pub struct Client {
23 proxy: fpkg::PackageCacheProxy,
24}
25
26impl Client {
27 pub fn from_proxy(proxy: fpkg::PackageCacheProxy) -> Self {
29 Self { proxy }
30 }
31
32 pub fn proxy(&self) -> &fpkg::PackageCacheProxy {
34 &self.proxy
35 }
36
37 pub fn get(
40 &self,
41 meta_far_blob: BlobInfo,
42 gc_protection: fpkg::GcProtection,
43 ) -> Result<Get, fidl::Error> {
44 let (needed_blobs, needed_blobs_server_end) =
45 fidl::endpoints::create_proxy::<fpkg::NeededBlobsMarker>();
46 let (pkg_dir, pkg_dir_server_end) = PackageDirectory::create_request()?;
47
48 let get_fut = self.proxy.get(
49 &meta_far_blob.into(),
50 gc_protection,
51 needed_blobs_server_end,
52 pkg_dir_server_end,
53 );
54
55 Ok(Get {
56 get_fut,
57 pkg_dir,
58 needed_blobs,
59 pkg_present: SharedBoolEvent::new(),
60 meta_far: meta_far_blob.blob_id,
61 })
62 }
63
64 pub async fn get_already_cached(
74 &self,
75 meta_far_blob: BlobId,
76 ) -> Result<PackageDirectory, GetAlreadyCachedError> {
77 let mut get = self
78 .get(
79 BlobInfo { blob_id: meta_far_blob, length: 0 },
80 fpkg::GcProtection::OpenPackageTracking,
81 )
82 .map_err(GetAlreadyCachedError::Get)?;
83 if let Some(_) = get.open_meta_blob().await.map_err(GetAlreadyCachedError::OpenMetaBlob)? {
84 return Err(GetAlreadyCachedError::MissingMetaFar);
85 }
86
87 if let Some(missing_blobs) = get
88 .get_missing_blobs()
89 .try_next()
90 .await
91 .map_err(GetAlreadyCachedError::GetMissingBlobs)?
92 {
93 return Err(GetAlreadyCachedError::MissingContentBlobs(missing_blobs));
94 }
95
96 get.finish().await.map_err(GetAlreadyCachedError::FinishGet)
97 }
98
99 pub async fn get_subpackage(
102 &self,
103 superpackage: BlobId,
104 subpackage: &fuchsia_url::RelativePackageUrl,
105 ) -> Result<PackageDirectory, GetSubpackageError> {
106 let (dir, dir_server_end) =
107 PackageDirectory::create_request().map_err(GetSubpackageError::CreatingHandles)?;
108 let () = self
109 .proxy
110 .get_subpackage(
111 &superpackage.into(),
112 &fpkg::PackageUrl { url: subpackage.into() },
113 dir_server_end,
114 )
115 .await
116 .map_err(GetSubpackageError::CallingGetSubpackage)??;
117 Ok(dir)
118 }
119}
120
121#[derive(thiserror::Error, Debug)]
122#[allow(missing_docs)]
123pub enum GetAlreadyCachedError {
124 #[error("calling get")]
125 Get(#[source] fidl::Error),
126
127 #[error("opening meta blob")]
128 OpenMetaBlob(#[source] OpenBlobError),
129
130 #[error("meta.far blob not cached")]
131 MissingMetaFar,
132
133 #[error("getting missing blobs")]
134 GetMissingBlobs(#[source] ListMissingBlobsError),
135
136 #[error("content blobs not cached {0:?}")]
137 MissingContentBlobs(Vec<BlobInfo>),
138
139 #[error("finishing get")]
140 FinishGet(#[source] GetError),
141}
142
143impl GetAlreadyCachedError {
144 pub fn was_not_cached(&self) -> bool {
146 use GetAlreadyCachedError::*;
147 match self {
148 Get(..) | OpenMetaBlob(..) | GetMissingBlobs(..) | FinishGet(..) => false,
149 MissingMetaFar | MissingContentBlobs(..) => true,
150 }
151 }
152}
153
154#[derive(thiserror::Error, Debug)]
155#[allow(missing_docs)]
156pub enum GetSubpackageError {
157 #[error("creating handles")]
158 CreatingHandles(#[source] fidl::Error),
159
160 #[error("calling GetCached FIDL")]
161 CallingGetSubpackage(#[source] fidl::Error),
162
163 #[error("the superpackage does not have an open package connection")]
164 SuperpackageClosed,
165
166 #[error("the subpackage does not exist")]
167 DoesNotExist,
168
169 #[error("internal")]
170 Internal,
171}
172
173impl From<fpkg::GetSubpackageError> for GetSubpackageError {
174 fn from(fidl: fpkg::GetSubpackageError) -> Self {
175 use fpkg::GetSubpackageError as fErr;
176 use GetSubpackageError::*;
177 match fidl {
178 fErr::SuperpackageClosed => SuperpackageClosed,
179 fErr::DoesNotExist => DoesNotExist,
180 fErr::Internal => Internal,
181 }
182 }
183}
184
185#[derive(Debug, Clone)]
186struct SharedBoolEvent(Arc<AtomicBool>);
187
188impl SharedBoolEvent {
189 fn new() -> Self {
190 Self(Arc::new(AtomicBool::new(false)))
191 }
192
193 fn get(&self) -> bool {
194 self.0.load(Ordering::SeqCst)
195 }
196
197 fn set(&self) {
198 self.0.store(true, Ordering::SeqCst)
199 }
200}
201
202async fn open_blob(
203 needed_blobs: &fpkg::NeededBlobsProxy,
204 kind: OpenKind,
205 blob_id: BlobId,
206 pkg_present: Option<&SharedBoolEvent>,
207) -> Result<Option<NeededBlob>, OpenBlobError> {
208 let open_fut = match kind {
209 OpenKind::Meta => needed_blobs.open_meta_blob(),
210 OpenKind::Content => needed_blobs.open_blob(&blob_id.into()),
211 };
212 match open_fut.await {
213 Err(fidl::Error::ClientChannelClosed { status: Status::OK, .. }) => {
214 if let Some(pkg_present) = pkg_present {
215 pkg_present.set();
216 }
217 Ok(None)
218 }
219 res => {
220 if let Some(blob) = res?? {
221 let (writer, closer) = self::storage::into_blob_writer_and_closer(*blob)?;
222 Ok(Some(NeededBlob {
223 blob: Blob {
224 writer,
225 needed_blobs: needed_blobs.clone(),
226 blob_id,
227 state: NeedsTruncate,
228 },
229 closer: BlobCloser { closer, closed: false },
230 }))
231 } else {
232 Ok(None)
233 }
234 }
235 }
236}
237
238#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239enum OpenKind {
240 Meta,
241 Content,
242}
243
244#[derive(Debug)]
246pub struct DeferredOpenBlob {
247 needed_blobs: fpkg::NeededBlobsProxy,
248 kind: OpenKind,
249 blob_id: BlobId,
250 pkg_present: Option<SharedBoolEvent>,
251}
252
253impl DeferredOpenBlob {
254 pub async fn open(&self) -> Result<Option<NeededBlob>, OpenBlobError> {
257 open_blob(&self.needed_blobs, self.kind, self.blob_id, self.pkg_present.as_ref()).await
258 }
259
260 fn proxy_cmp_key(&self) -> u32 {
261 use fidl::endpoints::Proxy;
262 use fidl::AsHandleRef;
263 self.needed_blobs.as_channel().raw_handle()
264 }
265}
266
267impl std::cmp::PartialEq for DeferredOpenBlob {
268 fn eq(&self, other: &Self) -> bool {
269 self.proxy_cmp_key() == other.proxy_cmp_key() && self.kind == other.kind
270 }
271}
272
273impl std::cmp::Eq for DeferredOpenBlob {}
274
275#[derive(Debug)]
281pub struct Get {
282 get_fut: fidl::client::QueryResponseFut<Result<(), i32>>,
283 needed_blobs: fpkg::NeededBlobsProxy,
284 pkg_dir: PackageDirectory,
285 pkg_present: SharedBoolEvent,
286 meta_far: BlobId,
287}
288
289impl Get {
290 pub fn make_open_meta_blob(&mut self) -> DeferredOpenBlob {
293 DeferredOpenBlob {
294 needed_blobs: self.needed_blobs.clone(),
295 kind: OpenKind::Meta,
296 blob_id: self.meta_far,
297 pkg_present: Some(self.pkg_present.clone()),
298 }
299 }
300
301 pub async fn open_meta_blob(&mut self) -> Result<Option<NeededBlob>, OpenBlobError> {
304 open_blob(&self.needed_blobs, OpenKind::Meta, self.meta_far, Some(&self.pkg_present)).await
305 }
306
307 fn start_get_missing_blobs(
308 &mut self,
309 ) -> Result<Option<fpkg::BlobInfoIteratorProxy>, fidl::Error> {
310 if self.pkg_present.get() {
311 return Ok(None);
312 }
313
314 let (blob_iterator, blob_iterator_server_end) =
315 fidl::endpoints::create_proxy::<fpkg::BlobInfoIteratorMarker>();
316
317 self.needed_blobs.get_missing_blobs(blob_iterator_server_end)?;
318 Ok(Some(blob_iterator))
319 }
320
321 pub fn get_missing_blobs(
327 &mut self,
328 ) -> impl Stream<Item = Result<Vec<BlobInfo>, ListMissingBlobsError>> + Unpin {
329 match self.start_get_missing_blobs() {
330 Ok(option_iter) => match option_iter {
331 Some(iterator) => crate::fidl_iterator_to_stream(iterator)
332 .map_ok(|v| v.into_iter().map(BlobInfo::from).collect())
333 .map_err(ListMissingBlobsError::CallNextOnBlobIterator)
334 .left_stream(),
335 None => futures::stream::empty().right_stream(),
336 }
337 .left_stream(),
338 Err(e) => {
339 futures::stream::iter(Some(Err(ListMissingBlobsError::CallGetMissingBlobs(e))))
340 .right_stream()
341 }
342 }
343 }
344
345 pub fn make_open_blob(&mut self, content_blob: BlobId) -> DeferredOpenBlob {
348 DeferredOpenBlob {
349 needed_blobs: self.needed_blobs.clone(),
350 kind: OpenKind::Content,
351 blob_id: content_blob,
352 pkg_present: None,
353 }
354 }
355
356 pub async fn open_blob(
359 &mut self,
360 content_blob: BlobId,
361 ) -> Result<Option<NeededBlob>, OpenBlobError> {
362 open_blob(&self.needed_blobs, OpenKind::Content, content_blob, None).await
363 }
364
365 pub async fn finish(self) -> Result<PackageDirectory, GetError> {
368 drop(self.needed_blobs);
369 let () = self.get_fut.await?.map_err(Status::from_raw)?;
370 Ok(self.pkg_dir)
371 }
372
373 pub async fn abort(self) {
375 self.needed_blobs.abort().map(|_: Result<(), fidl::Error>| ()).await;
376 let _ = self.get_fut.await;
381 }
382}
383
384#[derive(Debug)]
386pub struct NeededBlob {
387 pub blob: Blob<NeedsTruncate>,
390
391 pub closer: BlobCloser,
393}
394
395#[derive(Debug)]
398#[must_use = "Subsequent opens of this blob may race with closing this one"]
399pub struct BlobCloser {
400 closer: Box<dyn self::storage::Closer>,
401 closed: bool,
402}
403
404impl BlobCloser {
405 pub async fn close(mut self) {
407 let () = self.closer.close().await;
408 self.closed = true;
409 }
410}
411
412impl Drop for BlobCloser {
413 fn drop(&mut self) {
414 if !self.closed {
415 let () = self.closer.best_effort_close();
416 }
417 }
418}
419
420#[derive(Debug)]
422pub enum TruncateBlobSuccess {
423 NeedsData(Blob<NeedsData>),
425
426 AllWritten(Blob<NeedsBlobWritten>),
429}
430
431#[derive(Debug)]
433pub enum BlobWriteSuccess {
434 NeedsData(Blob<NeedsData>),
436
437 AllWritten(Blob<NeedsBlobWritten>),
440}
441
442#[derive(Debug)]
444pub struct NeedsTruncate;
445
446#[derive(Debug)]
448pub struct NeedsData {
449 size: u64,
450 written: u64,
451}
452
453#[derive(Debug)]
456pub struct NeedsBlobWritten;
457
458#[derive(Debug)]
460#[must_use]
461pub struct Blob<S> {
462 writer: Box<dyn self::storage::Writer>,
463 needed_blobs: fpkg::NeededBlobsProxy,
464 blob_id: BlobId,
465 state: S,
466}
467
468impl Blob<NeedsTruncate> {
469 pub async fn truncate(mut self, size: u64) -> Result<TruncateBlobSuccess, TruncateBlobError> {
471 let () = self.writer.truncate(size).await?;
472
473 let Self { writer, needed_blobs, blob_id, state: _ } = self;
474
475 Ok(if size == 0 {
476 TruncateBlobSuccess::AllWritten(Blob {
477 writer,
478 needed_blobs,
479 blob_id,
480 state: NeedsBlobWritten,
481 })
482 } else {
483 TruncateBlobSuccess::NeedsData(Blob {
484 writer,
485 needed_blobs,
486 blob_id,
487 state: NeedsData { size, written: 0 },
488 })
489 })
490 }
491}
492
493impl Blob<NeedsData> {
494 pub fn write(
500 self,
501 buf: &[u8],
502 ) -> impl Future<Output = Result<BlobWriteSuccess, WriteBlobError>> + '_ {
503 self.write_with_trace_callbacks(buf, &|_| {}, &|| {})
504 }
505
506 pub async fn write_with_trace_callbacks(
518 mut self,
519 buf: &[u8],
520 after_write: &(dyn Fn(u64) + Send + Sync),
521 after_write_ack: &(dyn Fn() + Send + Sync),
522 ) -> Result<BlobWriteSuccess, WriteBlobError> {
523 assert!(self.state.written + buf.len() as u64 <= self.state.size);
524
525 let () = self.writer.write(buf, after_write, after_write_ack).await?;
526 self.state.written += buf.len() as u64;
527
528 if self.state.written == self.state.size {
529 let Self { writer, needed_blobs, blob_id, state: _ } = self;
530 Ok(BlobWriteSuccess::AllWritten(Blob {
531 writer,
532 needed_blobs,
533 blob_id,
534 state: NeedsBlobWritten,
535 }))
536 } else {
537 Ok(BlobWriteSuccess::NeedsData(self))
538 }
539 }
540}
541
542impl Blob<NeedsBlobWritten> {
543 pub async fn blob_written(self) -> Result<(), BlobWrittenError> {
545 Ok(self.needed_blobs.blob_written(&self.blob_id.into()).await??)
546 }
547}
548
549#[derive(Debug, thiserror::Error)]
551#[allow(missing_docs)]
552pub enum OpenError {
553 #[error("the package does not exist")]
554 NotFound,
555
556 #[error("Open() responded with an unexpected status")]
557 UnexpectedResponse(#[source] Status),
558
559 #[error("transport error")]
560 Fidl(#[from] fidl::Error),
561}
562#[derive(Debug, thiserror::Error)]
564#[allow(missing_docs)]
565pub enum GetError {
566 #[error("Get() responded with an unexpected status")]
567 UnexpectedResponse(#[from] Status),
568
569 #[error("transport error")]
570 Fidl(#[from] fidl::Error),
571}
572
573#[derive(Debug, thiserror::Error)]
575#[allow(missing_docs)]
576pub enum OpenBlobError {
577 #[error("there is insufficient storage space available to persist this blob")]
578 OutOfSpace,
579
580 #[error("this blob is already open for write by another cache operation")]
581 ConcurrentWrite,
582
583 #[error("an unspecified error occurred during underlying I/O")]
584 UnspecifiedIo,
585
586 #[error("an unspecified error occurred")]
587 Internal,
588
589 #[error("transport error")]
590 Fidl(#[from] fidl::Error),
591}
592
593impl From<fpkg::OpenBlobError> for OpenBlobError {
594 fn from(e: fpkg::OpenBlobError) -> Self {
595 match e {
596 fpkg::OpenBlobError::OutOfSpace => OpenBlobError::OutOfSpace,
597 fpkg::OpenBlobError::ConcurrentWrite => OpenBlobError::ConcurrentWrite,
598 fpkg::OpenBlobError::UnspecifiedIo => OpenBlobError::UnspecifiedIo,
599 fpkg::OpenBlobError::Internal => OpenBlobError::Internal,
600 }
601 }
602}
603
604#[derive(Debug, thiserror::Error)]
606#[allow(missing_docs)]
607pub enum ListMissingBlobsError {
608 #[error("while obtaining the missing blobs fidl iterator")]
609 CallGetMissingBlobs(#[source] fidl::Error),
610
611 #[error("while obtaining the next chunk of blobs from the fidl iterator")]
612 CallNextOnBlobIterator(#[source] fidl::Error),
613}
614
615#[derive(Debug, thiserror::Error)]
617#[allow(missing_docs)]
618pub enum TruncateBlobError {
619 #[error("insufficient storage space is available")]
620 NoSpace,
621
622 #[error("Truncate() responded with an unexpected status")]
623 UnexpectedResponse(#[source] Status),
624
625 #[error("transport error")]
626 Fidl(#[from] fidl::Error),
627
628 #[error("already truncated, currently in state {0}")]
629 AlreadyTruncated(&'static str),
630
631 #[error("unspecified error")]
633 Other(#[source] anyhow::Error),
634
635 #[error("blob is in an invalid state")]
636 BadState,
637}
638
639#[derive(Debug, thiserror::Error)]
641#[allow(missing_docs)]
642pub enum WriteBlobError {
643 #[error("file endpoint reported it wrote more bytes than were actually provided to the file endpoint")]
644 Overwrite,
645
646 #[error("the written data was corrupt")]
647 Corrupt,
648
649 #[error("insufficient storage space is available")]
650 NoSpace,
651
652 #[error("Write() responded with an unexpected status")]
653 UnexpectedResponse(#[source] Status),
654
655 #[error("transport error")]
656 Fidl(#[from] fidl::Error),
657
658 #[error("bytes were written but not needed in state {0}")]
659 BytesNotNeeded(&'static str),
660
661 #[error("while using the fxblob writer")]
662 FxBlob(#[source] blob_writer::WriteError),
663}
664
665#[derive(Debug, thiserror::Error)]
667#[allow(missing_docs)]
668pub enum BlobWrittenError {
669 #[error("pkg-cache could not find the blob after it was successfully written")]
670 MissingAfterWritten,
671
672 #[error("NeededBlobs.BlobWritten was called before the blob was opened")]
673 UnopenedBlob,
674
675 #[error("transport error")]
676 Fidl(#[from] fidl::Error),
677}
678
679impl From<fpkg::BlobWrittenError> for BlobWrittenError {
680 fn from(e: fpkg::BlobWrittenError) -> Self {
681 match e {
682 fpkg::BlobWrittenError::NotWritten => BlobWrittenError::MissingAfterWritten,
683 fpkg::BlobWrittenError::UnopenedBlob => BlobWrittenError::UnopenedBlob,
684 }
685 }
686}
687
688#[cfg(test)]
689mod tests {
690 use super::*;
691 use assert_matches::assert_matches;
692 use fidl::endpoints::{ClientEnd, ControlHandle as _, RequestStream as _};
693 use fidl_fuchsia_io as fio;
694 use fidl_fuchsia_pkg::{
695 BlobInfoIteratorRequest, NeededBlobsRequest, NeededBlobsRequestStream,
696 PackageCacheGetResponder, PackageCacheMarker, PackageCacheRequest,
697 PackageCacheRequestStream,
698 };
699
700 struct MockPackageCache {
701 stream: PackageCacheRequestStream,
702 }
703
704 impl MockPackageCache {
705 fn new() -> (Client, Self) {
706 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<PackageCacheMarker>();
707 (Client::from_proxy(proxy), Self { stream })
708 }
709
710 async fn expect_get(
711 &mut self,
712 blob_info: BlobInfo,
713 expected_gc_protection: fpkg::GcProtection,
714 ) -> PendingGet {
715 match self.stream.next().await {
716 Some(Ok(PackageCacheRequest::Get {
717 meta_far_blob,
718 gc_protection,
719 needed_blobs,
720 dir,
721 responder,
722 })) => {
723 assert_eq!(BlobInfo::from(meta_far_blob), blob_info);
724 assert_eq!(gc_protection, expected_gc_protection);
725 let needed_blobs = needed_blobs.into_stream();
726 let dir = dir.into_stream();
727
728 PendingGet { stream: needed_blobs, dir, responder }
729 }
730 r => panic!("Unexpected request: {:?}", r),
731 }
732 }
733
734 async fn expect_closed(mut self) {
735 assert_matches!(self.stream.next().await, None);
736 }
737 }
738
739 struct PendingGet {
740 stream: NeededBlobsRequestStream,
741 dir: fio::DirectoryRequestStream,
742 responder: PackageCacheGetResponder,
743 }
744
745 impl PendingGet {
746 async fn new() -> (Get, PendingGet) {
747 let (client, mut server) = MockPackageCache::new();
748
749 let get = client.get(blob_info(42), fpkg::GcProtection::OpenPackageTracking).unwrap();
750 let pending_get =
751 server.expect_get(blob_info(42), fpkg::GcProtection::OpenPackageTracking).await;
752 (get, pending_get)
753 }
754
755 fn finish_hold_stream_open(self) -> (NeededBlobsRequestStream, PackageDirProvider) {
756 self.stream.control_handle().shutdown_with_epitaph(Status::OK);
757 self.responder.send(Ok(())).unwrap();
758 (self.stream, PackageDirProvider { stream: self.dir })
759 }
760
761 fn finish(self) -> PackageDirProvider {
762 self.stream.control_handle().shutdown_with_epitaph(Status::OK);
763 self.responder.send(Ok(())).unwrap();
764 PackageDirProvider { stream: self.dir }
765 }
766
767 #[cfg(target_os = "fuchsia")]
768 fn fail_the_get(self) {
769 self.responder
770 .send(Err(Status::IO_INVALID.into_raw()))
771 .expect("client should be waiting");
772 }
773
774 async fn expect_open_meta_blob(
775 mut self,
776 res: Result<Option<ClientEnd<fio::FileMarker>>, fpkg::OpenBlobError>,
777 ) -> Self {
778 match self.stream.next().await {
779 Some(Ok(NeededBlobsRequest::OpenMetaBlob { responder })) => {
780 responder.send(res.map(|o| o.map(fpkg::BlobWriter::File))).unwrap();
781 }
782 r => panic!("Unexpected request: {:?}", r),
783 }
784 self
785 }
786
787 async fn expect_open_blob(
788 mut self,
789 expected_blob_id: BlobId,
790 res: Result<Option<ClientEnd<fio::FileMarker>>, fpkg::OpenBlobError>,
791 ) -> Self {
792 match self.stream.next().await {
793 Some(Ok(NeededBlobsRequest::OpenBlob { blob_id, responder })) => {
794 assert_eq!(BlobId::from(blob_id), expected_blob_id);
795 responder.send(res.map(|o| o.map(fpkg::BlobWriter::File))).unwrap();
796 }
797 r => panic!("Unexpected request: {:?}", r),
798 }
799 self
800 }
801
802 async fn expect_get_missing_blobs(mut self, response_chunks: Vec<Vec<BlobInfo>>) -> Self {
803 match self.stream.next().await {
804 Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
805 let mut stream = iterator.into_stream();
806
807 for chunk in response_chunks {
809 let chunk = chunk
810 .into_iter()
811 .map(fidl_fuchsia_pkg::BlobInfo::from)
812 .collect::<Vec<_>>();
813
814 let BlobInfoIteratorRequest::Next { responder } =
815 stream.next().await.unwrap().unwrap();
816 responder.send(&chunk).unwrap();
817 }
818
819 let BlobInfoIteratorRequest::Next { responder } =
821 stream.next().await.unwrap().unwrap();
822 responder.send(&[]).unwrap();
823
824 assert_matches!(stream.next().await, None);
826 }
827 r => panic!("Unexpected request: {:?}", r),
828 }
829 self
830 }
831
832 async fn expect_get_missing_blobs_client_closes_channel(
833 mut self,
834 response_chunks: Vec<Vec<BlobInfo>>,
835 ) -> Self {
836 match self.stream.next().await {
837 Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
838 let mut stream = iterator.into_stream();
839
840 for chunk in response_chunks {
842 let chunk = chunk
843 .into_iter()
844 .map(fidl_fuchsia_pkg::BlobInfo::from)
845 .collect::<Vec<_>>();
846
847 let BlobInfoIteratorRequest::Next { responder } =
848 stream.next().await.unwrap().unwrap();
849 responder.send(&chunk).unwrap();
850 }
851
852 assert_matches!(stream.next().await, None);
854 }
855 r => panic!("Unexpected request: {:?}", r),
856 }
857 self
858 }
859
860 async fn expect_get_missing_blobs_inject_iterator_error(mut self) -> Self {
861 match self.stream.next().await {
862 Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
863 iterator
864 .into_stream_and_control_handle()
865 .1
866 .shutdown_with_epitaph(Status::ADDRESS_IN_USE);
867 }
868 r => panic!("Unexpected request: {:?}", r),
869 }
870 self
871 }
872
873 #[cfg(target_os = "fuchsia")]
874 async fn expect_abort(mut self) -> Self {
875 match self.stream.next().await {
876 Some(Ok(NeededBlobsRequest::Abort { responder })) => {
877 responder.send().unwrap();
878 }
879 r => panic!("Unexpected request: {:?}", r),
880 }
881 self
882 }
883 }
884
885 struct PackageDirProvider {
886 stream: fio::DirectoryRequestStream,
887 }
888
889 impl PackageDirProvider {
890 fn close_pkg_dir(self) {
891 self.stream.control_handle().shutdown_with_epitaph(Status::NOT_EMPTY);
892 }
893 }
894
895 fn blob_id(n: u8) -> BlobId {
896 BlobId::from([n; 32])
897 }
898
899 fn blob_info(n: u8) -> BlobInfo {
900 BlobInfo { blob_id: blob_id(n), length: 0 }
901 }
902
903 #[fuchsia_async::run_singlethreaded(test)]
904 async fn constructor() {
905 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<PackageCacheMarker>();
906 let client = Client::from_proxy(proxy);
907
908 drop(stream);
909 assert_matches!(client.proxy().sync().await, Err(_));
910 }
911
912 #[fuchsia_async::run_singlethreaded(test)]
913 async fn get_present_package() {
914 let (client, mut server) = MockPackageCache::new();
915
916 let ((), ()) = future::join(
917 async {
918 server
919 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
920 .await
921 .finish()
922 .close_pkg_dir();
923 server.expect_closed().await;
924 },
925 async move {
926 let mut get =
927 client.get(blob_info(2), fpkg::GcProtection::OpenPackageTracking).unwrap();
928
929 assert_matches!(get.open_meta_blob().await.unwrap(), None);
930 assert_eq!(get.get_missing_blobs().try_concat().await.unwrap(), vec![]);
931 let pkg_dir = get.finish().await.unwrap();
932
933 assert_matches!(
934 pkg_dir.into_proxy().take_event_stream().next().await,
935 Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
936 );
937 },
938 )
939 .await;
940 }
941
942 #[fuchsia_async::run_singlethreaded(test)]
943 async fn get_present_package_handles_slow_stream_close() {
944 let (client, mut server) = MockPackageCache::new();
945
946 let (send, recv) = futures::channel::oneshot::channel::<()>();
947
948 let ((), ()) = future::join(
949 async {
950 let (needed_blobs_stream, pkg_dir) = server
951 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
952 .await
953 .finish_hold_stream_open();
954 pkg_dir.close_pkg_dir();
955
956 let _ = recv.await;
958 drop(needed_blobs_stream);
959 },
960 async move {
961 let mut get =
962 client.get(blob_info(2), fpkg::GcProtection::OpenPackageTracking).unwrap();
963
964 assert_matches!(get.open_meta_blob().await.unwrap(), None);
965
966 let missing_blobs_stream = get.get_missing_blobs();
970 drop(send);
971 assert_eq!(missing_blobs_stream.try_concat().await.unwrap(), vec![]);
972 let pkg_dir = get.finish().await.unwrap();
973
974 assert_matches!(
975 pkg_dir.into_proxy().take_event_stream().next().await,
976 Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
977 );
978 },
979 )
980 .await;
981 }
982
983 #[fuchsia_async::run_singlethreaded(test)]
984 async fn needed_blobs_open_meta_far() {
985 let (mut get, pending_get) = PendingGet::new().await;
986
987 let ((), ()) = future::join(
988 async {
989 pending_get
990 .expect_open_meta_blob(Ok(None))
991 .await
992 .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
993 .await
994 .expect_open_meta_blob(Ok(None))
995 .await
996 .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
997 .await
998 .expect_open_meta_blob(Err(fpkg::OpenBlobError::OutOfSpace))
999 .await
1000 .expect_open_meta_blob(Err(fpkg::OpenBlobError::UnspecifiedIo))
1001 .await;
1002 },
1003 async {
1004 {
1005 let opener = get.make_open_meta_blob();
1006 assert_matches!(opener.open().await.unwrap(), None);
1007 assert_matches!(opener.open().await.unwrap(), Some(_));
1008 }
1009 assert_matches!(get.open_meta_blob().await.unwrap(), None);
1010 assert_matches!(get.open_meta_blob().await.unwrap(), Some(_));
1011 assert_matches!(get.open_meta_blob().await, Err(OpenBlobError::OutOfSpace));
1012 assert_matches!(get.open_meta_blob().await, Err(OpenBlobError::UnspecifiedIo));
1013 },
1014 )
1015 .await;
1016 }
1017
1018 #[fuchsia_async::run_singlethreaded(test)]
1019 async fn needed_blobs_open_content_blob() {
1020 let (mut get, pending_get) = PendingGet::new().await;
1021
1022 let ((), ()) = future::join(
1023 async {
1024 pending_get
1025 .expect_open_blob(blob_id(2), Ok(None))
1026 .await
1027 .expect_open_blob(blob_id(2), Ok(Some(fidl::endpoints::create_endpoints().0)))
1028 .await
1029 .expect_open_blob(blob_id(10), Ok(None))
1030 .await
1031 .expect_open_blob(blob_id(11), Ok(Some(fidl::endpoints::create_endpoints().0)))
1032 .await
1033 .expect_open_blob(blob_id(12), Err(fpkg::OpenBlobError::OutOfSpace))
1034 .await
1035 .expect_open_blob(blob_id(13), Err(fpkg::OpenBlobError::UnspecifiedIo))
1036 .await;
1037 },
1038 async {
1039 {
1040 let opener = get.make_open_blob(blob_id(2));
1041 assert_matches!(opener.open().await.unwrap(), None);
1042 assert_matches!(opener.open().await.unwrap(), Some(_));
1043 }
1044 assert_matches!(get.open_blob(blob_id(10),).await.unwrap(), None);
1045 assert_matches!(get.open_blob(blob_id(11),).await.unwrap(), Some(_));
1046 assert_matches!(get.open_blob(blob_id(12),).await, Err(OpenBlobError::OutOfSpace));
1047 assert_matches!(
1048 get.open_blob(blob_id(13),).await,
1049 Err(OpenBlobError::UnspecifiedIo)
1050 );
1051 },
1052 )
1053 .await;
1054 }
1055
1056 #[fuchsia_async::run_singlethreaded(test)]
1057 async fn needed_blobs_get_missing_blobs_on_closed_ok() {
1058 let (mut get, pending_get) = PendingGet::new().await;
1059 let _ = pending_get.finish();
1060
1061 assert_matches!(get.open_meta_blob().await, Ok(None));
1062 assert_eq!(get.get_missing_blobs().try_concat().await.unwrap(), vec![]);
1063 }
1064
1065 #[fuchsia_async::run_singlethreaded(test)]
1066 async fn needed_blobs_get_missing_blobs() {
1067 let (mut get, pending_get) = PendingGet::new().await;
1068
1069 let ((), ()) = future::join(
1070 async {
1071 pending_get
1072 .expect_get_missing_blobs(vec![
1073 vec![blob_info(1), blob_info(2)],
1074 vec![blob_info(3)],
1075 ])
1076 .await;
1077 },
1078 async {
1079 assert_eq!(
1080 get.get_missing_blobs().try_concat().await.unwrap(),
1081 vec![blob_info(1), blob_info(2), blob_info(3)]
1082 );
1083 },
1084 )
1085 .await;
1086 }
1087
1088 #[fuchsia_async::run_singlethreaded(test)]
1089 async fn needed_blobs_get_missing_blobs_fail_to_obtain_iterator() {
1090 let (mut get, pending_get) = PendingGet::new().await;
1091 drop(pending_get);
1092
1093 assert_matches!(
1094 get.get_missing_blobs().try_concat().await,
1095 Err(ListMissingBlobsError::CallNextOnBlobIterator(
1096 fidl::Error::ClientChannelClosed{status, ..})
1097 )
1098 if status == Status::PEER_CLOSED
1099 );
1100 }
1101
1102 #[fuchsia_async::run_singlethreaded(test)]
1103 async fn needed_blobs_get_missing_blobs_iterator_contains_error() {
1104 let (mut get, pending_get) = PendingGet::new().await;
1105
1106 let (_, ()) =
1107 future::join(pending_get.expect_get_missing_blobs_inject_iterator_error(), async {
1108 assert_matches!(
1109 get.get_missing_blobs().try_concat().await,
1110 Err(ListMissingBlobsError::CallNextOnBlobIterator(
1111 fidl::Error::ClientChannelClosed{status, ..}
1112 ))
1113 if status == Status::ADDRESS_IN_USE
1114 );
1115 })
1116 .await;
1117 }
1118
1119 #[cfg(target_os = "fuchsia")]
1120 #[test]
1121 fn needed_blobs_abort() {
1122 use futures::future::Either;
1123 use futures::pin_mut;
1124 use std::task::Poll;
1125
1126 let mut executor = fuchsia_async::TestExecutor::new_with_fake_time();
1127
1128 let fut = async {
1129 let (get, pending_get) = PendingGet::new().await;
1130
1131 let abort_fut = get.abort().boxed();
1132 let expect_abort_fut = pending_get.expect_abort();
1133 pin_mut!(expect_abort_fut);
1134
1135 match futures::future::select(abort_fut, expect_abort_fut).await {
1136 Either::Left(((), _expect_abort_fut)) => {
1137 panic!("abort should wait for the get future to complete")
1138 }
1139 Either::Right((pending_get, abort_fut)) => (abort_fut, pending_get),
1140 }
1141 };
1142 pin_mut!(fut);
1143
1144 let (mut abort_fut, pending_get) = match executor.run_until_stalled(&mut fut) {
1145 Poll::Pending => panic!("should complete"),
1146 Poll::Ready((abort_fut, pending_get)) => (abort_fut, pending_get),
1147 };
1148
1149 assert_matches!(executor.run_until_stalled(&mut abort_fut), Poll::Pending);
1151 pending_get.fail_the_get();
1152 assert_matches!(executor.run_until_stalled(&mut abort_fut), Poll::Ready(()));
1153 }
1154
1155 struct MockNeededBlob {
1156 blob: fio::FileRequestStream,
1157 needed_blobs: fpkg::NeededBlobsRequestStream,
1158 }
1159
1160 impl MockNeededBlob {
1161 fn mock_hash() -> BlobId {
1162 [7; 32].into()
1163 }
1164
1165 fn new() -> (NeededBlob, Self) {
1166 let (blob_proxy, blob) = fidl::endpoints::create_proxy_and_stream::<fio::FileMarker>();
1167 let (needed_blobs_proxy, needed_blobs) =
1168 fidl::endpoints::create_proxy_and_stream::<fpkg::NeededBlobsMarker>();
1169 (
1170 NeededBlob {
1171 blob: Blob {
1172 writer: Box::new(Clone::clone(&blob_proxy)),
1173 needed_blobs: needed_blobs_proxy,
1174 blob_id: Self::mock_hash(),
1175 state: NeedsTruncate,
1176 },
1177 closer: BlobCloser { closer: Box::new(blob_proxy), closed: false },
1178 },
1179 Self { blob, needed_blobs },
1180 )
1181 }
1182
1183 async fn fail_truncate(mut self) -> Self {
1184 match self.blob.next().await {
1185 Some(Ok(fio::FileRequest::Resize { length: _, responder })) => {
1186 responder.send(Err(Status::NO_SPACE.into_raw())).unwrap();
1187 }
1188 r => panic!("Unexpected request: {:?}", r),
1189 }
1190 self
1191 }
1192
1193 async fn expect_truncate(mut self, expected_length: u64) -> Self {
1194 match self.blob.next().await {
1195 Some(Ok(fio::FileRequest::Resize { length, responder })) => {
1196 assert_eq!(length, expected_length);
1197 responder.send(Ok(())).unwrap();
1198 }
1199 r => panic!("Unexpected request: {:?}", r),
1200 }
1201 self
1202 }
1203
1204 async fn fail_write(mut self) -> Self {
1205 match self.blob.next().await {
1206 Some(Ok(fio::FileRequest::Write { data: _, responder })) => {
1207 responder.send(Err(Status::NO_SPACE.into_raw())).unwrap();
1208 }
1209 r => panic!("Unexpected request: {:?}", r),
1210 }
1211 self
1212 }
1213
1214 async fn expect_write(mut self, expected_payload: &[u8]) -> Self {
1215 match self.blob.next().await {
1216 Some(Ok(fio::FileRequest::Write { data, responder })) => {
1217 assert_eq!(data, expected_payload);
1218 responder.send(Ok(data.len() as u64)).unwrap();
1219 }
1220 r => panic!("Unexpected request: {:?}", r),
1221 }
1222 self
1223 }
1224
1225 async fn expect_write_partial(
1226 mut self,
1227 expected_payload: &[u8],
1228 bytes_to_consume: u64,
1229 ) -> Self {
1230 match self.blob.next().await {
1231 Some(Ok(fio::FileRequest::Write { data, responder })) => {
1232 assert_eq!(data, expected_payload);
1233 responder.send(Ok(bytes_to_consume)).unwrap();
1234 }
1235 r => panic!("Unexpected request: {:?}", r),
1236 }
1237 self
1238 }
1239
1240 async fn expect_close(mut self) {
1241 match self.blob.next().await {
1242 Some(Ok(fio::FileRequest::Close { responder })) => {
1243 responder.send(Ok(())).unwrap();
1244 }
1245 r => panic!("Unexpected request: {:?}", r),
1246 }
1247 }
1248
1249 async fn expect_blob_written(mut self) -> Self {
1250 match self.needed_blobs.next().await {
1251 Some(Ok(fpkg::NeededBlobsRequest::BlobWritten { blob_id, responder })) => {
1252 assert_eq!(blob_id, Self::mock_hash().into());
1253 responder.send(Ok(())).unwrap();
1254 }
1255 r => panic!("Unexpected request: {:?}", r),
1256 }
1257 self
1258 }
1259 }
1260
1261 #[fuchsia_async::run_singlethreaded(test)]
1262 async fn empty_blob_write() {
1263 let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1264
1265 let ((), ()) = future::join(
1266 async {
1267 blob_server
1268 .expect_truncate(0)
1269 .await
1270 .expect_blob_written()
1271 .await
1272 .expect_close()
1273 .await;
1274 },
1275 async {
1276 let blob = match blob.truncate(0).await.unwrap() {
1277 TruncateBlobSuccess::AllWritten(blob) => blob,
1278 other => panic!("empty blob shouldn't need bytes {other:?}"),
1279 };
1280 let () = blob.blob_written().await.unwrap();
1281 closer.close().await;
1282 },
1283 )
1284 .await;
1285 }
1286
1287 impl TruncateBlobSuccess {
1288 fn unwrap_needs_data(self) -> Blob<NeedsData> {
1289 match self {
1290 TruncateBlobSuccess::NeedsData(blob) => blob,
1291 TruncateBlobSuccess::AllWritten(_) => panic!("blob should need data"),
1292 }
1293 }
1294 }
1295
1296 impl BlobWriteSuccess {
1297 fn unwrap_needs_data(self) -> Blob<NeedsData> {
1298 match self {
1299 BlobWriteSuccess::NeedsData(blob) => blob,
1300 BlobWriteSuccess::AllWritten(_) => panic!("blob should need data"),
1301 }
1302 }
1303
1304 fn unwrap_all_written(self) -> Blob<NeedsBlobWritten> {
1305 match self {
1306 BlobWriteSuccess::NeedsData(_) => panic!("blob should be completely written"),
1307 BlobWriteSuccess::AllWritten(blob) => blob,
1308 }
1309 }
1310 }
1311
1312 #[fuchsia_async::run_singlethreaded(test)]
1313 async fn small_blob_write() {
1314 let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1315
1316 let ((), ()) = future::join(
1317 async {
1318 blob_server
1319 .expect_truncate(4)
1320 .await
1321 .expect_write(b"test")
1322 .await
1323 .expect_blob_written()
1324 .await
1325 .expect_close()
1326 .await;
1327 },
1328 async {
1329 let blob = blob.truncate(4).await.unwrap().unwrap_needs_data();
1330 let blob = blob.write(b"test").await.unwrap().unwrap_all_written();
1331 let () = blob.blob_written().await.unwrap();
1332 closer.close().await;
1333 },
1334 )
1335 .await;
1336 }
1337
1338 #[fuchsia_async::run_singlethreaded(test)]
1339 async fn blob_truncate_no_space() {
1340 let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1341
1342 let ((), ()) = future::join(
1343 async {
1344 blob_server.fail_truncate().await;
1345 },
1346 async {
1347 assert_matches!(blob.truncate(4).await, Err(TruncateBlobError::NoSpace));
1348 closer.close().await;
1349 },
1350 )
1351 .await;
1352 }
1353
1354 #[fuchsia_async::run_singlethreaded(test)]
1355 async fn blob_write_no_space() {
1356 let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1357
1358 let ((), ()) = future::join(
1359 async {
1360 blob_server.expect_truncate(4).await.fail_write().await;
1361 },
1362 async {
1363 let blob = blob.truncate(4).await.unwrap().unwrap_needs_data();
1364 assert_matches!(blob.write(b"test").await, Err(WriteBlobError::NoSpace));
1365 closer.close().await;
1366 },
1367 )
1368 .await;
1369 }
1370
1371 #[fuchsia_async::run_singlethreaded(test)]
1372 async fn blob_write_server_partial_write() {
1373 let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1374
1375 let ((), ()) = future::join(
1376 async {
1377 blob_server
1378 .expect_truncate(6)
1379 .await
1380 .expect_write_partial(b"abc123", 3)
1381 .await
1382 .expect_write(b"123")
1383 .await
1384 .expect_blob_written()
1385 .await;
1386 },
1387 async {
1388 let blob = blob.truncate(6).await.unwrap().unwrap_needs_data();
1389 let blob = blob.write(b"abc123").await.unwrap().unwrap_all_written();
1390 let () = blob.blob_written().await.unwrap();
1391 closer.close().await;
1392 },
1393 )
1394 .await;
1395 }
1396
1397 #[fuchsia_async::run_singlethreaded(test)]
1398 async fn blob_write_client_partial_write() {
1399 let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1400
1401 let ((), ()) = future::join(
1402 async {
1403 blob_server
1404 .expect_truncate(6)
1405 .await
1406 .expect_write(b"abc")
1407 .await
1408 .expect_write(b"123")
1409 .await
1410 .expect_blob_written()
1411 .await;
1412 },
1413 async {
1414 let blob = blob.truncate(6).await.unwrap().unwrap_needs_data();
1415 let blob = blob.write(b"abc").await.unwrap().unwrap_needs_data();
1416 let blob = blob.write(b"123").await.unwrap().unwrap_all_written();
1417 let () = blob.blob_written().await.unwrap();
1418 closer.close().await;
1419 },
1420 )
1421 .await;
1422 }
1423
1424 #[fuchsia_async::run_singlethreaded(test)]
1425 async fn blob_write_chunkize_payload() {
1426 const CHUNK_SIZE: usize = fio::MAX_BUF as usize;
1427
1428 let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1429
1430 let ((), ()) = future::join(
1431 async {
1432 blob_server
1433 .expect_truncate(3 * CHUNK_SIZE as u64)
1434 .await
1435 .expect_write(&[0u8; CHUNK_SIZE])
1436 .await
1437 .expect_write(&[1u8; CHUNK_SIZE])
1438 .await
1439 .expect_write(&[2u8; CHUNK_SIZE])
1440 .await
1441 .expect_blob_written()
1442 .await;
1443 },
1444 async {
1445 let payload =
1446 (0..3).flat_map(|n| std::iter::repeat(n).take(CHUNK_SIZE)).collect::<Vec<u8>>();
1447 let blob = blob.truncate(3 * CHUNK_SIZE as u64).await.unwrap().unwrap_needs_data();
1448 let blob = blob.write(&payload).await.unwrap().unwrap_all_written();
1449 let () = blob.blob_written().await.unwrap();
1450 closer.close().await;
1451 },
1452 )
1453 .await;
1454 }
1455
1456 #[fuchsia_async::run_singlethreaded(test)]
1457 async fn get_already_cached_success() {
1458 let (client, mut server) = MockPackageCache::new();
1459
1460 let ((), ()) = future::join(
1461 async {
1462 server
1463 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1464 .await
1465 .finish()
1466 .close_pkg_dir();
1467 server.expect_closed().await;
1468 },
1469 async move {
1470 let pkg_dir = client.get_already_cached(blob_id(2)).await.unwrap();
1471
1472 assert_matches!(
1473 pkg_dir.into_proxy().take_event_stream().next().await,
1474 Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
1475 );
1476 },
1477 )
1478 .await;
1479 }
1480
1481 #[fuchsia_async::run_singlethreaded(test)]
1482 async fn get_already_cached_missing_meta_far() {
1483 let (client, mut server) = MockPackageCache::new();
1484
1485 let ((), ()) = future::join(
1486 async {
1487 server
1488 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1489 .await
1490 .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1491 .await;
1492 },
1493 async move {
1494 assert_matches!(
1495 client.get_already_cached(blob_id(2)).await,
1496 Err(GetAlreadyCachedError::MissingMetaFar)
1497 );
1498 },
1499 )
1500 .await;
1501 }
1502
1503 #[fuchsia_async::run_singlethreaded(test)]
1504 async fn get_already_cached_missing_content_blob() {
1505 let (client, mut server) = MockPackageCache::new();
1506
1507 let ((), ()) = future::join(
1508 async {
1509 server
1510 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1511 .await
1512 .expect_open_meta_blob(Ok(None))
1513 .await
1514 .expect_get_missing_blobs_client_closes_channel(vec![vec![BlobInfo {
1515 blob_id: [0; 32].into(),
1516 length: 0,
1517 }]])
1518 .await;
1519 },
1520 async move {
1521 assert_matches!(
1522 client.get_already_cached(blob_id(2)).await,
1523 Err(GetAlreadyCachedError::MissingContentBlobs(v))
1524 if v == vec![BlobInfo {
1525 blob_id: [0; 32].into(),
1526 length: 0,
1527 }]
1528 );
1529 },
1530 )
1531 .await;
1532 }
1533}