1#![cfg(target_os = "fuchsia")]
6#![deny(missing_docs)]
7
8use crate::types::{BlobId, BlobInfo};
11use fuchsia_pkg::PackageDirectory;
12use futures::prelude::*;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicBool, Ordering};
15use zx_status::Status;
16use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_pkg as fpkg};
17
18#[derive(Debug, Clone)]
20pub struct Client {
21 proxy: fpkg::PackageCacheProxy,
22}
23
24impl Client {
25 pub fn from_proxy(proxy: fpkg::PackageCacheProxy) -> Self {
27 Self { proxy }
28 }
29
30 pub fn proxy(&self) -> &fpkg::PackageCacheProxy {
32 &self.proxy
33 }
34
35 pub fn get(
38 &self,
39 meta_far_blob: BlobInfo,
40 gc_protection: fpkg::GcProtection,
41 ) -> Result<Get, fidl::Error> {
42 let (needed_blobs, needed_blobs_server_end) =
43 fidl::endpoints::create_proxy::<fpkg::NeededBlobsMarker>();
44 let (pkg_dir, pkg_dir_server_end) = PackageDirectory::create_request()?;
45
46 let get_fut = self.proxy.get(
47 &meta_far_blob.into(),
48 gc_protection,
49 needed_blobs_server_end,
50 pkg_dir_server_end,
51 );
52
53 Ok(Get {
54 get_fut,
55 pkg_dir,
56 needed_blobs,
57 pkg_present: SharedBoolEvent::new(),
58 meta_far: meta_far_blob.blob_id,
59 })
60 }
61
62 pub async fn get_already_cached(
72 &self,
73 meta_far_blob: BlobId,
74 ) -> Result<PackageDirectory, GetAlreadyCachedError> {
75 let mut get = self
76 .get(
77 BlobInfo { blob_id: meta_far_blob, length: 0 },
78 fpkg::GcProtection::OpenPackageTracking,
79 )
80 .map_err(GetAlreadyCachedError::Get)?;
81 if let Some(_) = get.open_meta_blob().await.map_err(GetAlreadyCachedError::OpenMetaBlob)? {
82 return Err(GetAlreadyCachedError::MissingMetaFar);
83 }
84
85 if let Some(missing_blobs) = get
86 .get_missing_blobs()
87 .try_next()
88 .await
89 .map_err(GetAlreadyCachedError::GetMissingBlobs)?
90 {
91 return Err(GetAlreadyCachedError::MissingContentBlobs(missing_blobs));
92 }
93
94 get.finish().await.map_err(GetAlreadyCachedError::FinishGet)
95 }
96
97 pub async fn get_subpackage(
100 &self,
101 superpackage: BlobId,
102 subpackage: &fuchsia_url::RelativePackageUrl,
103 ) -> Result<PackageDirectory, GetSubpackageError> {
104 let (dir, dir_server_end) =
105 PackageDirectory::create_request().map_err(GetSubpackageError::CreatingHandles)?;
106 let () = self
107 .proxy
108 .get_subpackage(
109 &superpackage.into(),
110 &fpkg::PackageUrl { url: subpackage.into() },
111 dir_server_end,
112 )
113 .await
114 .map_err(GetSubpackageError::CallingGetSubpackage)??;
115 Ok(dir)
116 }
117
118 pub fn write_blobs(&self) -> Result<WriteBlobs, fidl::Error> {
120 let (needed_blobs, needed_blobs_server_end) =
121 fidl::endpoints::create_proxy::<fpkg::NeededBlobsMarker>();
122
123 let () = self.proxy.write_blobs(needed_blobs_server_end)?;
124
125 Ok(WriteBlobs { needed_blobs })
126 }
127}
128
129#[derive(thiserror::Error, Debug)]
130#[allow(missing_docs)]
131pub enum GetAlreadyCachedError {
132 #[error("calling get")]
133 Get(#[source] fidl::Error),
134
135 #[error("opening meta blob")]
136 OpenMetaBlob(#[source] OpenBlobError),
137
138 #[error("meta.far blob not cached")]
139 MissingMetaFar,
140
141 #[error("getting missing blobs")]
142 GetMissingBlobs(#[source] ListMissingBlobsError),
143
144 #[error("content blobs not cached {0:?}")]
145 MissingContentBlobs(Vec<BlobInfo>),
146
147 #[error("finishing get")]
148 FinishGet(#[source] GetError),
149}
150
151impl GetAlreadyCachedError {
152 pub fn was_not_cached(&self) -> bool {
154 use GetAlreadyCachedError::*;
155 match self {
156 Get(..) | OpenMetaBlob(..) | GetMissingBlobs(..) | FinishGet(..) => false,
157 MissingMetaFar | MissingContentBlobs(..) => true,
158 }
159 }
160}
161
162#[derive(thiserror::Error, Debug)]
163#[allow(missing_docs)]
164pub enum GetSubpackageError {
165 #[error("creating handles")]
166 CreatingHandles(#[source] fidl::Error),
167
168 #[error("calling GetCached FIDL")]
169 CallingGetSubpackage(#[source] fidl::Error),
170
171 #[error("the superpackage does not have an open package connection")]
172 SuperpackageClosed,
173
174 #[error("the subpackage does not exist")]
175 DoesNotExist,
176
177 #[error("internal")]
178 Internal,
179}
180
181impl From<fpkg::GetSubpackageError> for GetSubpackageError {
182 fn from(fidl: fpkg::GetSubpackageError) -> Self {
183 use GetSubpackageError::*;
184 use fpkg::GetSubpackageError as fErr;
185 match fidl {
186 fErr::SuperpackageClosed => SuperpackageClosed,
187 fErr::DoesNotExist => DoesNotExist,
188 fErr::Internal => Internal,
189 }
190 }
191}
192
193#[derive(Debug, Clone)]
194struct SharedBoolEvent(Arc<AtomicBool>);
195
196impl SharedBoolEvent {
197 fn new() -> Self {
198 Self(Arc::new(AtomicBool::new(false)))
199 }
200
201 fn get(&self) -> bool {
202 self.0.load(Ordering::SeqCst)
203 }
204
205 fn set(&self) {
206 self.0.store(true, Ordering::SeqCst)
207 }
208}
209
210async fn open_blob(
211 needed_blobs: &fpkg::NeededBlobsProxy,
212 kind: OpenKind,
213 blob_id: BlobId,
214 pkg_present: Option<&SharedBoolEvent>,
215 allow_existing: bool,
216) -> Result<Option<NeededBlob>, OpenBlobError> {
217 let open_fut = match kind {
218 OpenKind::Meta => needed_blobs.open_meta_blob(),
219 OpenKind::Content => needed_blobs.open_blob(&blob_id.into(), allow_existing),
220 };
221 match open_fut.await {
222 Err(fidl::Error::ClientChannelClosed { status: Status::OK, .. }) => {
223 if let Some(pkg_present) = pkg_present {
224 pkg_present.set();
225 }
226 Ok(None)
227 }
228 res => {
229 if let Some(blob) = res?? {
230 Ok(Some(NeededBlob {
231 blob: Blob {
232 needed_blobs: needed_blobs.clone(),
233 blob_id,
234 state: NeedsTruncate(blob),
235 },
236 }))
237 } else {
238 Ok(None)
239 }
240 }
241 }
242}
243
244#[derive(Debug, Clone, Copy, PartialEq, Eq)]
245enum OpenKind {
246 Meta,
247 Content,
248}
249
250#[derive(Debug)]
252pub struct DeferredOpenBlob {
253 needed_blobs: fpkg::NeededBlobsProxy,
254 kind: OpenKind,
255 blob_id: BlobId,
256 allow_existing: bool,
257 pkg_present: Option<SharedBoolEvent>,
258}
259
260impl DeferredOpenBlob {
261 pub async fn open(&self) -> Result<Option<NeededBlob>, OpenBlobError> {
264 open_blob(
265 &self.needed_blobs,
266 self.kind,
267 self.blob_id,
268 self.pkg_present.as_ref(),
269 self.allow_existing,
270 )
271 .await
272 }
273
274 fn proxy_cmp_key(&self) -> u32 {
275 use fidl::AsHandleRef;
276 use fidl::endpoints::Proxy;
277 self.needed_blobs.as_channel().raw_handle()
278 }
279}
280
281impl std::cmp::PartialEq for DeferredOpenBlob {
282 fn eq(&self, other: &Self) -> bool {
283 self.proxy_cmp_key() == other.proxy_cmp_key() && self.kind == other.kind
284 }
285}
286
287impl std::cmp::Eq for DeferredOpenBlob {}
288
289#[derive(Debug)]
295pub struct Get {
296 get_fut: fidl::client::QueryResponseFut<Result<(), i32>>,
297 needed_blobs: fpkg::NeededBlobsProxy,
298 pkg_dir: PackageDirectory,
299 pkg_present: SharedBoolEvent,
300 meta_far: BlobId,
301}
302
303impl Get {
304 pub fn make_open_meta_blob(&mut self) -> DeferredOpenBlob {
307 DeferredOpenBlob {
308 needed_blobs: self.needed_blobs.clone(),
309 kind: OpenKind::Meta,
310 blob_id: self.meta_far,
311 allow_existing: false,
312 pkg_present: Some(self.pkg_present.clone()),
313 }
314 }
315
316 pub async fn open_meta_blob(&mut self) -> Result<Option<NeededBlob>, OpenBlobError> {
319 open_blob(&self.needed_blobs, OpenKind::Meta, self.meta_far, Some(&self.pkg_present), false)
320 .await
321 }
322
323 fn start_get_missing_blobs(
324 &mut self,
325 ) -> Result<Option<fpkg::BlobInfoIteratorProxy>, fidl::Error> {
326 if self.pkg_present.get() {
327 return Ok(None);
328 }
329
330 let (blob_iterator, blob_iterator_server_end) =
331 fidl::endpoints::create_proxy::<fpkg::BlobInfoIteratorMarker>();
332
333 self.needed_blobs.get_missing_blobs(blob_iterator_server_end)?;
334 Ok(Some(blob_iterator))
335 }
336
337 pub fn get_missing_blobs(
343 &mut self,
344 ) -> impl Stream<Item = Result<Vec<BlobInfo>, ListMissingBlobsError>> + Unpin + use<> {
345 match self.start_get_missing_blobs() {
346 Ok(option_iter) => match option_iter {
347 Some(iterator) => crate::fidl_iterator_to_stream(iterator)
348 .map_ok(|v| v.into_iter().map(BlobInfo::from).collect())
349 .map_err(ListMissingBlobsError::CallNextOnBlobIterator)
350 .left_stream(),
351 None => futures::stream::empty().right_stream(),
352 }
353 .left_stream(),
354 Err(e) => {
355 futures::stream::iter(Some(Err(ListMissingBlobsError::CallGetMissingBlobs(e))))
356 .right_stream()
357 }
358 }
359 }
360
361 pub fn make_open_blob(&mut self, content_blob: BlobId) -> DeferredOpenBlob {
364 DeferredOpenBlob {
365 needed_blobs: self.needed_blobs.clone(),
366 kind: OpenKind::Content,
367 blob_id: content_blob,
368 allow_existing: false,
369 pkg_present: None,
370 }
371 }
372
373 pub async fn open_blob(
376 &mut self,
377 content_blob: BlobId,
378 ) -> Result<Option<NeededBlob>, OpenBlobError> {
379 open_blob(&self.needed_blobs, OpenKind::Content, content_blob, None, false).await
380 }
381
382 pub async fn finish(self) -> Result<PackageDirectory, GetError> {
385 drop(self.needed_blobs);
386 let () = self.get_fut.await?.map_err(Status::from_raw)?;
387 Ok(self.pkg_dir)
388 }
389
390 pub async fn abort(self) {
392 self.needed_blobs.abort().map(|_: Result<(), fidl::Error>| ()).await;
393 let _ = self.get_fut.await;
398 }
399}
400
401#[derive(Clone, Debug)]
403pub struct WriteBlobs {
404 needed_blobs: fpkg::NeededBlobsProxy,
405}
406
407impl WriteBlobs {
408 pub fn make_open_blob(&mut self, blob: BlobId, allow_existing: bool) -> DeferredOpenBlob {
411 DeferredOpenBlob {
412 needed_blobs: self.needed_blobs.clone(),
413 kind: OpenKind::Content,
414 blob_id: blob,
415 allow_existing,
416 pkg_present: None,
417 }
418 }
419
420 pub async fn open_blob(
422 &mut self,
423 blob: BlobId,
424 allow_existing: bool,
425 ) -> Result<Option<NeededBlob>, OpenBlobError> {
426 open_blob(&self.needed_blobs, OpenKind::Content, blob, None, allow_existing).await
427 }
428}
429
430#[derive(Debug)]
432pub struct NeededBlob {
433 pub blob: Blob<NeedsTruncate>,
436}
437
438#[derive(Debug)]
440pub enum TruncateBlobSuccess {
441 NeedsData(Blob<NeedsData>),
443
444 AllWritten(Blob<NeedsBlobWritten>),
447}
448
449#[derive(Debug)]
451pub enum BlobWriteSuccess {
452 NeedsData(Blob<NeedsData>),
454
455 AllWritten(Blob<NeedsBlobWritten>),
458}
459
460#[derive(Debug)]
462pub struct NeedsTruncate(fidl::endpoints::ClientEnd<ffxfs::BlobWriterMarker>);
463
464#[derive(Debug)]
466pub struct NeedsData {
467 size: u64,
468 written: u64,
469 writer: blob_writer::BlobWriter,
470}
471
472#[derive(Debug)]
475pub struct NeedsBlobWritten;
476
477#[derive(Debug)]
479#[must_use]
480pub struct Blob<S> {
481 needed_blobs: fpkg::NeededBlobsProxy,
482 blob_id: BlobId,
483 state: S,
484}
485
486impl Blob<NeedsTruncate> {
487 pub async fn truncate(self, size: u64) -> Result<TruncateBlobSuccess, TruncateBlobError> {
489 let Self { needed_blobs, blob_id, state: NeedsTruncate(blob) } = self;
490
491 let writer = blob_writer::BlobWriter::create(blob.into_proxy(), size).await.map_err(
492 |e| match e {
493 blob_writer::CreateError::GetVmo(s) if s == Status::NO_SPACE => {
494 TruncateBlobError::NoSpace
495 }
496 _ => TruncateBlobError::CreateBlobWriter(e),
497 },
498 )?;
499
500 Ok(if size == 0 {
501 TruncateBlobSuccess::AllWritten(Blob { needed_blobs, blob_id, state: NeedsBlobWritten })
502 } else {
503 TruncateBlobSuccess::NeedsData(Blob {
504 needed_blobs,
505 blob_id,
506 state: NeedsData { size, written: 0, writer },
507 })
508 })
509 }
510
511 pub fn deconstruct(
519 self,
520 ) -> (fpkg::NeededBlobsProxy, BlobId, fidl::endpoints::ClientEnd<ffxfs::BlobWriterMarker>) {
521 let Blob { needed_blobs, blob_id, state: NeedsTruncate(client) } = self;
522 (needed_blobs, blob_id, client)
523 }
524}
525
526impl Blob<NeedsData> {
527 pub fn write(
533 self,
534 buf: &[u8],
535 ) -> impl Future<Output = Result<BlobWriteSuccess, WriteBlobError>> + '_ {
536 self.write_with_trace_callbacks(buf, &|_| {}, &|| {})
537 }
538
539 pub async fn write_with_trace_callbacks(
551 mut self,
552 buf: &[u8],
553 after_write: &(dyn Fn(u64) + Send + Sync),
554 after_write_ack: &(dyn Fn() + Send + Sync),
555 ) -> Result<BlobWriteSuccess, WriteBlobError> {
556 assert!(self.state.written + buf.len() as u64 <= self.state.size);
557
558 let fut = self.state.writer.write(buf);
559 let () = after_write(buf.len() as u64);
560 let res = fut.await;
561 let () = after_write_ack();
562 let () = res.map_err(|e| match e {
563 e @ blob_writer::WriteError::BytesReady(s) => match s {
564 Status::IO_DATA_INTEGRITY => WriteBlobError::Corrupt,
565 Status::NO_SPACE => WriteBlobError::NoSpace,
566 _ => WriteBlobError::FxBlob(e),
567 },
568 e => WriteBlobError::FxBlob(e),
569 })?;
570
571 self.state.written += buf.len() as u64;
572
573 if self.state.written == self.state.size {
574 let Self { needed_blobs, blob_id, state: _ } = self;
575 Ok(BlobWriteSuccess::AllWritten(Blob {
576 needed_blobs,
577 blob_id,
578 state: NeedsBlobWritten,
579 }))
580 } else {
581 Ok(BlobWriteSuccess::NeedsData(self))
582 }
583 }
584}
585
586impl Blob<NeedsBlobWritten> {
587 pub async fn blob_written(self) -> Result<(), BlobWrittenError> {
589 Ok(self.needed_blobs.blob_written(&self.blob_id.into()).await??)
590 }
591}
592
593#[derive(Debug, thiserror::Error)]
595#[allow(missing_docs)]
596pub enum OpenError {
597 #[error("the package does not exist")]
598 NotFound,
599
600 #[error("Open() responded with an unexpected status")]
601 UnexpectedResponse(#[source] Status),
602
603 #[error("transport error")]
604 Fidl(#[from] fidl::Error),
605}
606#[derive(Debug, thiserror::Error)]
608#[allow(missing_docs)]
609pub enum GetError {
610 #[error("Get() responded with an unexpected status")]
611 UnexpectedResponse(#[from] Status),
612
613 #[error("transport error")]
614 Fidl(#[from] fidl::Error),
615}
616
617#[derive(Debug, thiserror::Error)]
619#[allow(missing_docs)]
620pub enum OpenBlobError {
621 #[error("there is insufficient storage space available to persist this blob")]
622 OutOfSpace,
623
624 #[error("this blob is already open for write by another cache operation")]
625 ConcurrentWrite,
626
627 #[error("an unspecified error occurred during underlying I/O")]
628 UnspecifiedIo,
629
630 #[error("an unspecified error occurred")]
631 Internal,
632
633 #[error("transport error")]
634 Fidl(#[from] fidl::Error),
635}
636
637impl From<fpkg::OpenBlobError> for OpenBlobError {
638 fn from(e: fpkg::OpenBlobError) -> Self {
639 match e {
640 fpkg::OpenBlobError::OutOfSpace => OpenBlobError::OutOfSpace,
641 fpkg::OpenBlobError::ConcurrentWrite => OpenBlobError::ConcurrentWrite,
642 fpkg::OpenBlobError::UnspecifiedIo => OpenBlobError::UnspecifiedIo,
643 fpkg::OpenBlobError::Internal => OpenBlobError::Internal,
644 }
645 }
646}
647
648#[derive(Debug, thiserror::Error)]
650#[allow(missing_docs)]
651pub enum ListMissingBlobsError {
652 #[error("while obtaining the missing blobs fidl iterator")]
653 CallGetMissingBlobs(#[source] fidl::Error),
654
655 #[error("while obtaining the next chunk of blobs from the fidl iterator")]
656 CallNextOnBlobIterator(#[source] fidl::Error),
657}
658
659#[derive(Debug, thiserror::Error)]
661#[allow(missing_docs)]
662pub enum TruncateBlobError {
663 #[error("insufficient storage space is available")]
664 NoSpace,
665
666 #[error("creating blob writer")]
667 CreateBlobWriter(#[source] blob_writer::CreateError),
668
669 #[error("transport error")]
670 Fidl(#[from] fidl::Error),
671
672 #[error("blob is in an invalid state")]
673 BadState,
674}
675
676#[derive(Debug, thiserror::Error)]
678#[allow(missing_docs)]
679pub enum WriteBlobError {
680 #[error("the written data was corrupt")]
681 Corrupt,
682
683 #[error("insufficient storage space is available")]
684 NoSpace,
685
686 #[error("transport error")]
687 Fidl(#[from] fidl::Error),
688
689 #[error("while using the fxblob writer")]
690 FxBlob(#[source] blob_writer::WriteError),
691}
692
693#[derive(Debug, thiserror::Error)]
695#[allow(missing_docs)]
696pub enum BlobWrittenError {
697 #[error("pkg-cache could not find the blob after it was successfully written")]
698 MissingAfterWritten,
699
700 #[error("NeededBlobs.BlobWritten was called before the blob was opened")]
701 UnopenedBlob,
702
703 #[error("transport error")]
704 Fidl(#[from] fidl::Error),
705}
706
707impl From<fpkg::BlobWrittenError> for BlobWrittenError {
708 fn from(e: fpkg::BlobWrittenError) -> Self {
709 match e {
710 fpkg::BlobWrittenError::NotWritten => BlobWrittenError::MissingAfterWritten,
711 fpkg::BlobWrittenError::UnopenedBlob => BlobWrittenError::UnopenedBlob,
712 }
713 }
714}
715
716#[cfg(test)]
717mod tests {
718 use super::*;
719 use assert_matches::assert_matches;
720 use fidl::endpoints::{ClientEnd, ControlHandle as _, RequestStream as _};
721 use fidl_fuchsia_io as fio;
722 use fidl_fuchsia_pkg::{
723 BlobInfoIteratorRequest, NeededBlobsRequest, NeededBlobsRequestStream,
724 PackageCacheGetResponder, PackageCacheMarker, PackageCacheRequest,
725 PackageCacheRequestStream,
726 };
727 use zx::HandleBased as _;
728
729 struct MockPackageCache {
730 stream: PackageCacheRequestStream,
731 }
732
733 impl MockPackageCache {
734 fn new() -> (Client, Self) {
735 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<PackageCacheMarker>();
736 (Client::from_proxy(proxy), Self { stream })
737 }
738
739 async fn expect_get(
740 &mut self,
741 blob_info: BlobInfo,
742 expected_gc_protection: fpkg::GcProtection,
743 ) -> PendingGet {
744 match self.stream.next().await {
745 Some(Ok(PackageCacheRequest::Get {
746 meta_far_blob,
747 gc_protection,
748 needed_blobs,
749 dir,
750 responder,
751 })) => {
752 assert_eq!(BlobInfo::from(meta_far_blob), blob_info);
753 assert_eq!(gc_protection, expected_gc_protection);
754 let needed_blobs = needed_blobs.into_stream();
755 let dir = dir.into_stream();
756
757 PendingGet { stream: needed_blobs, dir, responder }
758 }
759 r => panic!("Unexpected request: {r:?}"),
760 }
761 }
762
763 async fn expect_closed(mut self) {
764 assert_matches!(self.stream.next().await, None);
765 }
766 }
767
768 struct PendingGet {
769 stream: NeededBlobsRequestStream,
770 dir: fio::DirectoryRequestStream,
771 responder: PackageCacheGetResponder,
772 }
773
774 impl PendingGet {
775 async fn new() -> (Get, PendingGet) {
776 let (client, mut server) = MockPackageCache::new();
777
778 let get = client.get(blob_info(42), fpkg::GcProtection::OpenPackageTracking).unwrap();
779 let pending_get =
780 server.expect_get(blob_info(42), fpkg::GcProtection::OpenPackageTracking).await;
781 (get, pending_get)
782 }
783
784 fn finish_hold_stream_open(self) -> (NeededBlobsRequestStream, PackageDirProvider) {
785 self.stream.control_handle().shutdown_with_epitaph(Status::OK);
786 self.responder.send(Ok(())).unwrap();
787 (self.stream, PackageDirProvider { stream: self.dir })
788 }
789
790 fn finish(self) -> PackageDirProvider {
791 self.stream.control_handle().shutdown_with_epitaph(Status::OK);
792 self.responder.send(Ok(())).unwrap();
793 PackageDirProvider { stream: self.dir }
794 }
795
796 #[cfg(target_os = "fuchsia")]
797 fn fail_the_get(self) {
798 self.responder
799 .send(Err(Status::IO_INVALID.into_raw()))
800 .expect("client should be waiting");
801 }
802
803 async fn expect_open_meta_blob(
804 mut self,
805 res: Result<Option<ClientEnd<ffxfs::BlobWriterMarker>>, fpkg::OpenBlobError>,
806 ) -> Self {
807 match self.stream.next().await {
808 Some(Ok(NeededBlobsRequest::OpenMetaBlob { responder })) => {
809 responder.send(res).unwrap();
810 }
811 r => panic!("Unexpected request: {r:?}"),
812 }
813 self
814 }
815
816 async fn expect_open_blob(
817 mut self,
818 expected_blob_id: BlobId,
819 res: Result<Option<ClientEnd<ffxfs::BlobWriterMarker>>, fpkg::OpenBlobError>,
820 ) -> Self {
821 match self.stream.next().await {
822 Some(Ok(NeededBlobsRequest::OpenBlob {
823 blob_id,
824 allow_existing: _,
825 responder,
826 })) => {
827 assert_eq!(BlobId::from(blob_id), expected_blob_id);
828 responder.send(res).unwrap();
829 }
830 r => panic!("Unexpected request: {r:?}"),
831 }
832 self
833 }
834
835 async fn expect_get_missing_blobs(mut self, response_chunks: Vec<Vec<BlobInfo>>) -> Self {
836 match self.stream.next().await {
837 Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
838 let mut stream = iterator.into_stream();
839
840 for chunk in response_chunks {
842 let chunk = chunk.into_iter().map(fpkg::BlobInfo::from).collect::<Vec<_>>();
843
844 let BlobInfoIteratorRequest::Next { responder } =
845 stream.next().await.unwrap().unwrap();
846 responder.send(&chunk).unwrap();
847 }
848
849 let BlobInfoIteratorRequest::Next { responder } =
851 stream.next().await.unwrap().unwrap();
852 responder.send(&[]).unwrap();
853
854 assert_matches!(stream.next().await, None);
856 }
857 r => panic!("Unexpected request: {r:?}"),
858 }
859 self
860 }
861
862 async fn expect_get_missing_blobs_client_closes_channel(
863 mut self,
864 response_chunks: Vec<Vec<BlobInfo>>,
865 ) -> Self {
866 match self.stream.next().await {
867 Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
868 let mut stream = iterator.into_stream();
869
870 for chunk in response_chunks {
872 let chunk = chunk.into_iter().map(fpkg::BlobInfo::from).collect::<Vec<_>>();
873
874 let BlobInfoIteratorRequest::Next { responder } =
875 stream.next().await.unwrap().unwrap();
876 responder.send(&chunk).unwrap();
877 }
878
879 assert_matches!(stream.next().await, None);
881 }
882 r => panic!("Unexpected request: {r:?}"),
883 }
884 self
885 }
886
887 async fn expect_get_missing_blobs_inject_iterator_error(mut self) -> Self {
888 match self.stream.next().await {
889 Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
890 iterator
891 .into_stream_and_control_handle()
892 .1
893 .shutdown_with_epitaph(Status::ADDRESS_IN_USE);
894 }
895 r => panic!("Unexpected request: {r:?}"),
896 }
897 self
898 }
899
900 #[cfg(target_os = "fuchsia")]
901 async fn expect_abort(mut self) -> Self {
902 match self.stream.next().await {
903 Some(Ok(NeededBlobsRequest::Abort { responder })) => {
904 responder.send().unwrap();
905 }
906 r => panic!("Unexpected request: {r:?}"),
907 }
908 self
909 }
910 }
911
912 struct PackageDirProvider {
913 stream: fio::DirectoryRequestStream,
914 }
915
916 impl PackageDirProvider {
917 fn close_pkg_dir(self) {
918 self.stream.control_handle().shutdown_with_epitaph(Status::NOT_EMPTY);
919 }
920 }
921
922 fn blob_id(n: u8) -> BlobId {
923 BlobId::from([n; 32])
924 }
925
926 fn blob_info(n: u8) -> BlobInfo {
927 BlobInfo { blob_id: blob_id(n), length: 0 }
928 }
929
930 #[fuchsia_async::run_singlethreaded(test)]
931 async fn constructor() {
932 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<PackageCacheMarker>();
933 let client = Client::from_proxy(proxy);
934
935 drop(stream);
936 assert_matches!(client.proxy().sync().await, Err(_));
937 }
938
939 #[fuchsia_async::run_singlethreaded(test)]
940 async fn get_present_package() {
941 let (client, mut server) = MockPackageCache::new();
942
943 let ((), ()) = future::join(
944 async {
945 server
946 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
947 .await
948 .finish()
949 .close_pkg_dir();
950 server.expect_closed().await;
951 },
952 async move {
953 let mut get =
954 client.get(blob_info(2), fpkg::GcProtection::OpenPackageTracking).unwrap();
955
956 assert_matches!(get.open_meta_blob().await.unwrap(), None);
957 assert_eq!(get.get_missing_blobs().try_concat().await.unwrap(), vec![]);
958 let pkg_dir = get.finish().await.unwrap();
959
960 assert_matches!(
961 pkg_dir.into_proxy().take_event_stream().next().await,
962 Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
963 );
964 },
965 )
966 .await;
967 }
968
969 #[fuchsia_async::run_singlethreaded(test)]
970 async fn get_present_package_handles_slow_stream_close() {
971 let (client, mut server) = MockPackageCache::new();
972
973 let (send, recv) = futures::channel::oneshot::channel::<()>();
974
975 let ((), ()) = future::join(
976 async {
977 let (needed_blobs_stream, pkg_dir) = server
978 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
979 .await
980 .finish_hold_stream_open();
981 pkg_dir.close_pkg_dir();
982
983 let _ = recv.await;
985 drop(needed_blobs_stream);
986 },
987 async move {
988 let mut get =
989 client.get(blob_info(2), fpkg::GcProtection::OpenPackageTracking).unwrap();
990
991 assert_matches!(get.open_meta_blob().await.unwrap(), None);
992
993 let missing_blobs_stream = get.get_missing_blobs();
997 drop(send);
998 assert_eq!(missing_blobs_stream.try_concat().await.unwrap(), vec![]);
999 let pkg_dir = get.finish().await.unwrap();
1000
1001 assert_matches!(
1002 pkg_dir.into_proxy().take_event_stream().next().await,
1003 Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
1004 );
1005 },
1006 )
1007 .await;
1008 }
1009
1010 #[fuchsia_async::run_singlethreaded(test)]
1011 async fn needed_blobs_open_meta_far() {
1012 let (mut get, pending_get) = PendingGet::new().await;
1013
1014 let ((), ()) = future::join(
1015 async {
1016 pending_get
1017 .expect_open_meta_blob(Ok(None))
1018 .await
1019 .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1020 .await
1021 .expect_open_meta_blob(Ok(None))
1022 .await
1023 .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1024 .await
1025 .expect_open_meta_blob(Err(fpkg::OpenBlobError::OutOfSpace))
1026 .await
1027 .expect_open_meta_blob(Err(fpkg::OpenBlobError::UnspecifiedIo))
1028 .await;
1029 },
1030 async {
1031 {
1032 let opener = get.make_open_meta_blob();
1033 assert_matches!(opener.open().await.unwrap(), None);
1034 assert_matches!(opener.open().await.unwrap(), Some(_));
1035 }
1036 assert_matches!(get.open_meta_blob().await.unwrap(), None);
1037 assert_matches!(get.open_meta_blob().await.unwrap(), Some(_));
1038 assert_matches!(get.open_meta_blob().await, Err(OpenBlobError::OutOfSpace));
1039 assert_matches!(get.open_meta_blob().await, Err(OpenBlobError::UnspecifiedIo));
1040 },
1041 )
1042 .await;
1043 }
1044
1045 #[fuchsia_async::run_singlethreaded(test)]
1046 async fn needed_blobs_open_content_blob() {
1047 let (mut get, pending_get) = PendingGet::new().await;
1048
1049 let ((), ()) = future::join(
1050 async {
1051 pending_get
1052 .expect_open_blob(blob_id(2), Ok(None))
1053 .await
1054 .expect_open_blob(blob_id(2), Ok(Some(fidl::endpoints::create_endpoints().0)))
1055 .await
1056 .expect_open_blob(blob_id(10), Ok(None))
1057 .await
1058 .expect_open_blob(blob_id(11), Ok(Some(fidl::endpoints::create_endpoints().0)))
1059 .await
1060 .expect_open_blob(blob_id(12), Err(fpkg::OpenBlobError::OutOfSpace))
1061 .await
1062 .expect_open_blob(blob_id(13), Err(fpkg::OpenBlobError::UnspecifiedIo))
1063 .await;
1064 },
1065 async {
1066 {
1067 let opener = get.make_open_blob(blob_id(2));
1068 assert_matches!(opener.open().await.unwrap(), None);
1069 assert_matches!(opener.open().await.unwrap(), Some(_));
1070 }
1071 assert_matches!(get.open_blob(blob_id(10),).await.unwrap(), None);
1072 assert_matches!(get.open_blob(blob_id(11),).await.unwrap(), Some(_));
1073 assert_matches!(get.open_blob(blob_id(12),).await, Err(OpenBlobError::OutOfSpace));
1074 assert_matches!(
1075 get.open_blob(blob_id(13),).await,
1076 Err(OpenBlobError::UnspecifiedIo)
1077 );
1078 },
1079 )
1080 .await;
1081 }
1082
1083 #[fuchsia_async::run_singlethreaded(test)]
1084 async fn needed_blobs_get_missing_blobs_on_closed_ok() {
1085 let (mut get, pending_get) = PendingGet::new().await;
1086 let _ = pending_get.finish();
1087
1088 assert_matches!(get.open_meta_blob().await, Ok(None));
1089 assert_eq!(get.get_missing_blobs().try_concat().await.unwrap(), vec![]);
1090 }
1091
1092 #[fuchsia_async::run_singlethreaded(test)]
1093 async fn needed_blobs_get_missing_blobs() {
1094 let (mut get, pending_get) = PendingGet::new().await;
1095
1096 let ((), ()) = future::join(
1097 async {
1098 pending_get
1099 .expect_get_missing_blobs(vec![
1100 vec![blob_info(1), blob_info(2)],
1101 vec![blob_info(3)],
1102 ])
1103 .await;
1104 },
1105 async {
1106 assert_eq!(
1107 get.get_missing_blobs().try_concat().await.unwrap(),
1108 vec![blob_info(1), blob_info(2), blob_info(3)]
1109 );
1110 },
1111 )
1112 .await;
1113 }
1114
1115 #[fuchsia_async::run_singlethreaded(test)]
1116 async fn needed_blobs_get_missing_blobs_fail_to_obtain_iterator() {
1117 let (mut get, pending_get) = PendingGet::new().await;
1118 drop(pending_get);
1119
1120 assert_matches!(
1121 get.get_missing_blobs().try_concat().await,
1122 Err(ListMissingBlobsError::CallNextOnBlobIterator(
1123 fidl::Error::ClientChannelClosed{status, ..})
1124 )
1125 if status == Status::PEER_CLOSED
1126 );
1127 }
1128
1129 #[fuchsia_async::run_singlethreaded(test)]
1130 async fn needed_blobs_get_missing_blobs_iterator_contains_error() {
1131 let (mut get, pending_get) = PendingGet::new().await;
1132
1133 let (_, ()) =
1134 future::join(pending_get.expect_get_missing_blobs_inject_iterator_error(), async {
1135 assert_matches!(
1136 get.get_missing_blobs().try_concat().await,
1137 Err(ListMissingBlobsError::CallNextOnBlobIterator(
1138 fidl::Error::ClientChannelClosed{status, ..}
1139 ))
1140 if status == Status::ADDRESS_IN_USE
1141 );
1142 })
1143 .await;
1144 }
1145
1146 #[cfg(target_os = "fuchsia")]
1147 #[test]
1148 fn needed_blobs_abort() {
1149 use futures::future::Either;
1150 use futures::pin_mut;
1151 use std::task::Poll;
1152
1153 let mut executor = fuchsia_async::TestExecutor::new_with_fake_time();
1154
1155 let fut = async {
1156 let (get, pending_get) = PendingGet::new().await;
1157
1158 let abort_fut = get.abort().boxed();
1159 let expect_abort_fut = pending_get.expect_abort();
1160 pin_mut!(expect_abort_fut);
1161
1162 match futures::future::select(abort_fut, expect_abort_fut).await {
1163 Either::Left(((), _expect_abort_fut)) => {
1164 panic!("abort should wait for the get future to complete")
1165 }
1166 Either::Right((pending_get, abort_fut)) => (abort_fut, pending_get),
1167 }
1168 };
1169 pin_mut!(fut);
1170
1171 let (mut abort_fut, pending_get) = match executor.run_until_stalled(&mut fut) {
1172 Poll::Pending => panic!("should complete"),
1173 Poll::Ready((abort_fut, pending_get)) => (abort_fut, pending_get),
1174 };
1175
1176 assert_matches!(executor.run_until_stalled(&mut abort_fut), Poll::Pending);
1178 pending_get.fail_the_get();
1179 assert_matches!(executor.run_until_stalled(&mut abort_fut), Poll::Ready(()));
1180 }
1181
1182 struct MockNeededBlob {
1183 writer: ffxfs::BlobWriterRequestStream,
1184 needed_blobs: fpkg::NeededBlobsRequestStream,
1185 vmo: Option<zx::Vmo>,
1186 }
1187
1188 impl MockNeededBlob {
1189 fn mock_hash() -> BlobId {
1190 [7; 32].into()
1191 }
1192
1193 fn new() -> (NeededBlob, Self) {
1194 let (writer_client, writer) =
1195 fidl::endpoints::create_request_stream::<ffxfs::BlobWriterMarker>();
1196 let (needed_blobs_proxy, needed_blobs) =
1197 fidl::endpoints::create_proxy_and_stream::<fpkg::NeededBlobsMarker>();
1198 (
1199 NeededBlob {
1200 blob: Blob {
1201 needed_blobs: needed_blobs_proxy,
1202 blob_id: Self::mock_hash(),
1203 state: NeedsTruncate(writer_client),
1204 },
1205 },
1206 Self { writer, needed_blobs, vmo: None },
1207 )
1208 }
1209
1210 async fn fail_get_vmo(mut self) -> Self {
1211 match self.writer.next().await {
1212 Some(Ok(ffxfs::BlobWriterRequest::GetVmo { size: _, responder })) => {
1213 responder.send(Err(Status::NO_SPACE.into_raw())).unwrap();
1214 }
1215 r => panic!("Unexpected request: {r:?}"),
1216 }
1217 self
1218 }
1219
1220 async fn expect_get_vmo(mut self, expected_size: u64) -> Self {
1221 match self.writer.next().await {
1222 Some(Ok(ffxfs::BlobWriterRequest::GetVmo { size, responder })) => {
1223 assert_eq!(size, expected_size);
1224 let vmo = zx::Vmo::create(size).unwrap();
1225 assert!(self.vmo.is_none());
1226 self.vmo = Some(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap());
1227 responder.send(Ok(vmo)).unwrap();
1228 }
1229 r => panic!("Unexpected request: {r:?}"),
1230 }
1231 self
1232 }
1233
1234 async fn fail_bytes_ready(mut self) -> Self {
1235 match self.writer.next().await {
1236 Some(Ok(ffxfs::BlobWriterRequest::BytesReady { bytes_written: _, responder })) => {
1237 responder.send(Err(Status::NO_SPACE.into_raw())).unwrap();
1238 }
1239 r => panic!("Unexpected request: {r:?}"),
1240 }
1241 self
1242 }
1243
1244 async fn expect_bytes_ready(mut self, expected_payload: &[u8], offset: u64) -> Self {
1245 match self.writer.next().await {
1246 Some(Ok(ffxfs::BlobWriterRequest::BytesReady { bytes_written, responder })) => {
1247 assert_eq!(bytes_written, u64::try_from(expected_payload.len()).unwrap());
1248
1249 let vmo = self.vmo.as_ref().unwrap();
1250 let mut buf = vec![0; expected_payload.len()];
1251 let () = vmo.read(&mut buf, offset).unwrap();
1252 assert_eq!(buf, expected_payload);
1253
1254 responder.send(Ok(())).unwrap();
1255 }
1256 r => panic!("Unexpected request: {r:?}"),
1257 }
1258 self
1259 }
1260
1261 async fn expect_blob_written(mut self) -> Self {
1262 match self.needed_blobs.next().await {
1263 Some(Ok(fpkg::NeededBlobsRequest::BlobWritten { blob_id, responder })) => {
1264 assert_eq!(blob_id, Self::mock_hash().into());
1265 responder.send(Ok(())).unwrap();
1266 }
1267 r => panic!("Unexpected request: {r:?}"),
1268 }
1269 self
1270 }
1271 }
1272
1273 #[fuchsia_async::run_singlethreaded(test)]
1274 async fn empty_blob_write() {
1275 let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1276
1277 let ((), ()) = future::join(
1278 async {
1279 blob_server.expect_get_vmo(0).await.expect_blob_written().await;
1280 },
1281 async {
1282 let blob = match blob.truncate(0).await.unwrap() {
1283 TruncateBlobSuccess::AllWritten(blob) => blob,
1284 other => panic!("empty blob shouldn't need bytes {other:?}"),
1285 };
1286 let () = blob.blob_written().await.unwrap();
1287 },
1288 )
1289 .await;
1290 }
1291
1292 impl TruncateBlobSuccess {
1293 fn unwrap_needs_data(self) -> Blob<NeedsData> {
1294 match self {
1295 TruncateBlobSuccess::NeedsData(blob) => blob,
1296 TruncateBlobSuccess::AllWritten(_) => panic!("blob should need data"),
1297 }
1298 }
1299 }
1300
1301 impl BlobWriteSuccess {
1302 fn unwrap_needs_data(self) -> Blob<NeedsData> {
1303 match self {
1304 BlobWriteSuccess::NeedsData(blob) => blob,
1305 BlobWriteSuccess::AllWritten(_) => panic!("blob should need data"),
1306 }
1307 }
1308
1309 fn unwrap_all_written(self) -> Blob<NeedsBlobWritten> {
1310 match self {
1311 BlobWriteSuccess::NeedsData(_) => panic!("blob should be completely written"),
1312 BlobWriteSuccess::AllWritten(blob) => blob,
1313 }
1314 }
1315 }
1316
1317 #[fuchsia_async::run_singlethreaded(test)]
1318 async fn small_blob_write() {
1319 let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1320
1321 let ((), ()) = future::join(
1322 async {
1323 blob_server
1324 .expect_get_vmo(4)
1325 .await
1326 .expect_bytes_ready(b"test", 0)
1327 .await
1328 .expect_blob_written()
1329 .await;
1330 },
1331 async {
1332 let blob = blob.truncate(4).await.unwrap().unwrap_needs_data();
1333 let blob = blob.write(b"test").await.unwrap().unwrap_all_written();
1334 let () = blob.blob_written().await.unwrap();
1335 },
1336 )
1337 .await;
1338 }
1339
1340 #[fuchsia_async::run_singlethreaded(test)]
1341 async fn blob_truncate_no_space() {
1342 let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1343
1344 let ((), ()) = future::join(
1345 async {
1346 blob_server.fail_get_vmo().await;
1347 },
1348 async {
1349 assert_matches!(blob.truncate(4).await, Err(TruncateBlobError::NoSpace));
1350 },
1351 )
1352 .await;
1353 }
1354
1355 #[fuchsia_async::run_singlethreaded(test)]
1356 async fn blob_write_no_space() {
1357 let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1358
1359 let ((), ()) = future::join(
1360 async {
1361 blob_server.expect_get_vmo(4).await.fail_bytes_ready().await;
1362 },
1363 async {
1364 let blob = blob.truncate(4).await.unwrap().unwrap_needs_data();
1365 assert_matches!(blob.write(b"test").await, Err(WriteBlobError::NoSpace));
1366 },
1367 )
1368 .await;
1369 }
1370
1371 #[fuchsia_async::run_singlethreaded(test)]
1372 async fn blob_write_multiple_write() {
1373 let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1374
1375 let ((), ()) = future::join(
1376 async {
1377 blob_server
1378 .expect_get_vmo(6)
1379 .await
1380 .expect_bytes_ready(b"abc", 0)
1381 .await
1382 .expect_bytes_ready(b"123", 3)
1383 .await
1384 .expect_blob_written()
1385 .await;
1386 },
1387 async {
1388 let blob = blob.truncate(6).await.unwrap().unwrap_needs_data();
1389 let blob = blob.write(b"abc").await.unwrap().unwrap_needs_data();
1390 let blob = blob.write(b"123").await.unwrap().unwrap_all_written();
1391 let () = blob.blob_written().await.unwrap();
1392 },
1393 )
1394 .await;
1395 }
1396
1397 #[fuchsia_async::run_singlethreaded(test)]
1398 async fn get_already_cached_success() {
1399 let (client, mut server) = MockPackageCache::new();
1400
1401 let ((), ()) = future::join(
1402 async {
1403 server
1404 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1405 .await
1406 .finish()
1407 .close_pkg_dir();
1408 server.expect_closed().await;
1409 },
1410 async move {
1411 let pkg_dir = client.get_already_cached(blob_id(2)).await.unwrap();
1412
1413 assert_matches!(
1414 pkg_dir.into_proxy().take_event_stream().next().await,
1415 Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
1416 );
1417 },
1418 )
1419 .await;
1420 }
1421
1422 #[fuchsia_async::run_singlethreaded(test)]
1423 async fn get_already_cached_missing_meta_far() {
1424 let (client, mut server) = MockPackageCache::new();
1425
1426 let ((), ()) = future::join(
1427 async {
1428 server
1429 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1430 .await
1431 .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1432 .await;
1433 },
1434 async move {
1435 assert_matches!(
1436 client.get_already_cached(blob_id(2)).await,
1437 Err(GetAlreadyCachedError::MissingMetaFar)
1438 );
1439 },
1440 )
1441 .await;
1442 }
1443
1444 #[fuchsia_async::run_singlethreaded(test)]
1445 async fn get_already_cached_missing_content_blob() {
1446 let (client, mut server) = MockPackageCache::new();
1447
1448 let ((), ()) = future::join(
1449 async {
1450 server
1451 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1452 .await
1453 .expect_open_meta_blob(Ok(None))
1454 .await
1455 .expect_get_missing_blobs_client_closes_channel(vec![vec![BlobInfo {
1456 blob_id: [0; 32].into(),
1457 length: 0,
1458 }]])
1459 .await;
1460 },
1461 async move {
1462 assert_matches!(
1463 client.get_already_cached(blob_id(2)).await,
1464 Err(GetAlreadyCachedError::MissingContentBlobs(v))
1465 if v == vec![BlobInfo {
1466 blob_id: [0; 32].into(),
1467 length: 0,
1468 }]
1469 );
1470 },
1471 )
1472 .await;
1473 }
1474}