1#![cfg(target_os = "fuchsia")]
6#![deny(missing_docs)]
7
8use crate::types::{BlobId, BlobInfo};
11use fidl_fuchsia_fxfs as ffxfs;
12use fidl_fuchsia_pkg as fpkg;
13use fuchsia_pkg::PackageDirectory;
14use futures::prelude::*;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use zx_status::Status;
18
19#[derive(Debug, Clone)]
21pub struct Client {
22 proxy: fpkg::PackageCacheProxy,
23}
24
25impl Client {
26 pub fn from_proxy(proxy: fpkg::PackageCacheProxy) -> Self {
28 Self { proxy }
29 }
30
31 pub fn proxy(&self) -> &fpkg::PackageCacheProxy {
33 &self.proxy
34 }
35
36 pub fn get(
39 &self,
40 meta_far_blob: BlobInfo,
41 gc_protection: fpkg::GcProtection,
42 ) -> Result<Get, fidl::Error> {
43 let (needed_blobs, needed_blobs_server_end) =
44 fidl::endpoints::create_proxy::<fpkg::NeededBlobsMarker>();
45 let (pkg_dir, pkg_dir_server_end) = PackageDirectory::create_request()?;
46
47 let get_fut = self.proxy.get(
48 &meta_far_blob.into(),
49 gc_protection,
50 needed_blobs_server_end,
51 pkg_dir_server_end,
52 );
53
54 Ok(Get {
55 get_fut,
56 pkg_dir,
57 needed_blobs,
58 pkg_present: SharedBoolEvent::new(),
59 meta_far: meta_far_blob.blob_id,
60 })
61 }
62
63 pub async fn get_already_cached(
73 &self,
74 meta_far_blob: BlobId,
75 ) -> Result<PackageDirectory, GetAlreadyCachedError> {
76 let mut get = self
77 .get(
78 BlobInfo { blob_id: meta_far_blob, length: 0 },
79 fpkg::GcProtection::OpenPackageTracking,
80 )
81 .map_err(GetAlreadyCachedError::Get)?;
82 if let Some(_) = get.open_meta_blob().await.map_err(GetAlreadyCachedError::OpenMetaBlob)? {
83 return Err(GetAlreadyCachedError::MissingMetaFar);
84 }
85
86 if let Some(missing_blobs) = get
87 .get_missing_blobs()
88 .try_next()
89 .await
90 .map_err(GetAlreadyCachedError::GetMissingBlobs)?
91 {
92 return Err(GetAlreadyCachedError::MissingContentBlobs(missing_blobs));
93 }
94
95 get.finish().await.map_err(GetAlreadyCachedError::FinishGet)
96 }
97
98 pub async fn get_subpackage(
101 &self,
102 superpackage: BlobId,
103 subpackage: &fuchsia_url::RelativePackageUrl,
104 ) -> Result<PackageDirectory, GetSubpackageError> {
105 let (dir, dir_server_end) =
106 PackageDirectory::create_request().map_err(GetSubpackageError::CreatingHandles)?;
107 let () = self
108 .proxy
109 .get_subpackage(
110 &superpackage.into(),
111 &fpkg::PackageUrl { url: subpackage.into() },
112 dir_server_end,
113 )
114 .await
115 .map_err(GetSubpackageError::CallingGetSubpackage)??;
116 Ok(dir)
117 }
118
119 pub fn write_blobs(&self) -> Result<WriteBlobs, fidl::Error> {
121 let (needed_blobs, needed_blobs_server_end) =
122 fidl::endpoints::create_proxy::<fpkg::NeededBlobsMarker>();
123
124 let () = self.proxy.write_blobs(needed_blobs_server_end)?;
125
126 Ok(WriteBlobs { needed_blobs })
127 }
128}
129
130#[derive(thiserror::Error, Debug)]
131#[allow(missing_docs)]
132pub enum GetAlreadyCachedError {
133 #[error("calling get")]
134 Get(#[source] fidl::Error),
135
136 #[error("opening meta blob")]
137 OpenMetaBlob(#[source] OpenBlobError),
138
139 #[error("meta.far blob not cached")]
140 MissingMetaFar,
141
142 #[error("getting missing blobs")]
143 GetMissingBlobs(#[source] ListMissingBlobsError),
144
145 #[error("content blobs not cached {0:?}")]
146 MissingContentBlobs(Vec<BlobInfo>),
147
148 #[error("finishing get")]
149 FinishGet(#[source] GetError),
150}
151
152impl GetAlreadyCachedError {
153 pub fn was_not_cached(&self) -> bool {
155 use GetAlreadyCachedError::*;
156 match self {
157 Get(..) | OpenMetaBlob(..) | GetMissingBlobs(..) | FinishGet(..) => false,
158 MissingMetaFar | MissingContentBlobs(..) => true,
159 }
160 }
161}
162
163#[derive(thiserror::Error, Debug)]
164#[allow(missing_docs)]
165pub enum GetSubpackageError {
166 #[error("creating handles")]
167 CreatingHandles(#[source] fidl::Error),
168
169 #[error("calling GetCached FIDL")]
170 CallingGetSubpackage(#[source] fidl::Error),
171
172 #[error("the superpackage does not have an open package connection")]
173 SuperpackageClosed,
174
175 #[error("the subpackage does not exist")]
176 DoesNotExist,
177
178 #[error("internal")]
179 Internal,
180}
181
182impl From<fpkg::GetSubpackageError> for GetSubpackageError {
183 fn from(fidl: fpkg::GetSubpackageError) -> Self {
184 use GetSubpackageError::*;
185 use fpkg::GetSubpackageError as fErr;
186 match fidl {
187 fErr::SuperpackageClosed => SuperpackageClosed,
188 fErr::DoesNotExist => DoesNotExist,
189 fErr::Internal => Internal,
190 }
191 }
192}
193
194#[derive(Debug, Clone)]
195struct SharedBoolEvent(Arc<AtomicBool>);
196
197impl SharedBoolEvent {
198 fn new() -> Self {
199 Self(Arc::new(AtomicBool::new(false)))
200 }
201
202 fn get(&self) -> bool {
203 self.0.load(Ordering::SeqCst)
204 }
205
206 fn set(&self) {
207 self.0.store(true, Ordering::SeqCst)
208 }
209}
210
211async fn open_blob(
212 needed_blobs: &fpkg::NeededBlobsProxy,
213 kind: OpenKind,
214 blob_id: BlobId,
215 pkg_present: Option<&SharedBoolEvent>,
216 allow_existing: bool,
217) -> Result<Option<NeededBlob>, OpenBlobError> {
218 let open_fut = match kind {
219 OpenKind::Meta => needed_blobs.open_meta_blob(),
220 OpenKind::Content => needed_blobs.open_blob(&blob_id.into(), allow_existing),
221 };
222 match open_fut.await {
223 Err(fidl::Error::ClientChannelClosed { status: Status::OK, .. }) => {
224 if let Some(pkg_present) = pkg_present {
225 pkg_present.set();
226 }
227 Ok(None)
228 }
229 res => {
230 if let Some(blob) = res?? {
231 Ok(Some(NeededBlob {
232 blob: Blob {
233 needed_blobs: needed_blobs.clone(),
234 blob_id,
235 state: NeedsTruncate(blob),
236 },
237 }))
238 } else {
239 Ok(None)
240 }
241 }
242 }
243}
244
245#[derive(Debug, Clone, Copy, PartialEq, Eq)]
246enum OpenKind {
247 Meta,
248 Content,
249}
250
251#[derive(Debug)]
253pub struct DeferredOpenBlob {
254 needed_blobs: fpkg::NeededBlobsProxy,
255 kind: OpenKind,
256 blob_id: BlobId,
257 allow_existing: bool,
258 pkg_present: Option<SharedBoolEvent>,
259}
260
261impl DeferredOpenBlob {
262 pub async fn open(&self) -> Result<Option<NeededBlob>, OpenBlobError> {
265 open_blob(
266 &self.needed_blobs,
267 self.kind,
268 self.blob_id,
269 self.pkg_present.as_ref(),
270 self.allow_existing,
271 )
272 .await
273 }
274
275 fn proxy_cmp_key(&self) -> u32 {
276 use fidl::AsHandleRef;
277 use fidl::endpoints::Proxy;
278 self.needed_blobs.as_channel().as_handle_ref().raw_handle()
279 }
280}
281
282impl std::cmp::PartialEq for DeferredOpenBlob {
283 fn eq(&self, other: &Self) -> bool {
284 self.proxy_cmp_key() == other.proxy_cmp_key() && self.kind == other.kind
285 }
286}
287
288impl std::cmp::Eq for DeferredOpenBlob {}
289
290#[derive(Debug)]
296pub struct Get {
297 get_fut: fidl::client::QueryResponseFut<Result<(), i32>>,
298 needed_blobs: fpkg::NeededBlobsProxy,
299 pkg_dir: PackageDirectory,
300 pkg_present: SharedBoolEvent,
301 meta_far: BlobId,
302}
303
304impl Get {
305 pub fn make_open_meta_blob(&mut self) -> DeferredOpenBlob {
308 DeferredOpenBlob {
309 needed_blobs: self.needed_blobs.clone(),
310 kind: OpenKind::Meta,
311 blob_id: self.meta_far,
312 allow_existing: false,
313 pkg_present: Some(self.pkg_present.clone()),
314 }
315 }
316
317 pub async fn open_meta_blob(&mut self) -> Result<Option<NeededBlob>, OpenBlobError> {
320 open_blob(&self.needed_blobs, OpenKind::Meta, self.meta_far, Some(&self.pkg_present), false)
321 .await
322 }
323
324 fn start_get_missing_blobs(
325 &mut self,
326 ) -> Result<Option<fpkg::BlobInfoIteratorProxy>, fidl::Error> {
327 if self.pkg_present.get() {
328 return Ok(None);
329 }
330
331 let (blob_iterator, blob_iterator_server_end) =
332 fidl::endpoints::create_proxy::<fpkg::BlobInfoIteratorMarker>();
333
334 self.needed_blobs.get_missing_blobs(blob_iterator_server_end)?;
335 Ok(Some(blob_iterator))
336 }
337
338 pub fn get_missing_blobs(
344 &mut self,
345 ) -> impl Stream<Item = Result<Vec<BlobInfo>, ListMissingBlobsError>> + Unpin + use<> {
346 match self.start_get_missing_blobs() {
347 Ok(option_iter) => match option_iter {
348 Some(iterator) => crate::fidl_iterator_to_stream(iterator)
349 .map_ok(|v| v.into_iter().map(BlobInfo::from).collect())
350 .map_err(ListMissingBlobsError::CallNextOnBlobIterator)
351 .left_stream(),
352 None => futures::stream::empty().right_stream(),
353 }
354 .left_stream(),
355 Err(e) => {
356 futures::stream::iter(Some(Err(ListMissingBlobsError::CallGetMissingBlobs(e))))
357 .right_stream()
358 }
359 }
360 }
361
362 pub fn make_open_blob(&mut self, content_blob: BlobId) -> DeferredOpenBlob {
365 DeferredOpenBlob {
366 needed_blobs: self.needed_blobs.clone(),
367 kind: OpenKind::Content,
368 blob_id: content_blob,
369 allow_existing: false,
370 pkg_present: None,
371 }
372 }
373
374 pub async fn open_blob(
377 &mut self,
378 content_blob: BlobId,
379 ) -> Result<Option<NeededBlob>, OpenBlobError> {
380 open_blob(&self.needed_blobs, OpenKind::Content, content_blob, None, false).await
381 }
382
383 pub async fn finish(self) -> Result<PackageDirectory, GetError> {
386 drop(self.needed_blobs);
387 let () = self.get_fut.await?.map_err(Status::from_raw)?;
388 Ok(self.pkg_dir)
389 }
390
391 pub async fn abort(self) {
393 self.needed_blobs.abort().map(|_: Result<(), fidl::Error>| ()).await;
394 let _ = self.get_fut.await;
399 }
400}
401
402#[derive(Clone, Debug)]
404pub struct WriteBlobs {
405 needed_blobs: fpkg::NeededBlobsProxy,
406}
407
408impl WriteBlobs {
409 pub fn make_open_blob(&mut self, blob: BlobId, allow_existing: bool) -> DeferredOpenBlob {
412 DeferredOpenBlob {
413 needed_blobs: self.needed_blobs.clone(),
414 kind: OpenKind::Content,
415 blob_id: blob,
416 allow_existing,
417 pkg_present: None,
418 }
419 }
420
421 pub async fn open_blob(
423 &mut self,
424 blob: BlobId,
425 allow_existing: bool,
426 ) -> Result<Option<NeededBlob>, OpenBlobError> {
427 open_blob(&self.needed_blobs, OpenKind::Content, blob, None, allow_existing).await
428 }
429}
430
431#[derive(Debug)]
433pub struct NeededBlob {
434 pub blob: Blob<NeedsTruncate>,
437}
438
439#[derive(Debug)]
441pub enum TruncateBlobSuccess {
442 NeedsData(Blob<NeedsData>),
444
445 AllWritten(Blob<NeedsBlobWritten>),
448}
449
450#[derive(Debug)]
452pub enum BlobWriteSuccess {
453 NeedsData(Blob<NeedsData>),
455
456 AllWritten(Blob<NeedsBlobWritten>),
459}
460
461#[derive(Debug)]
463pub struct NeedsTruncate(fidl::endpoints::ClientEnd<ffxfs::BlobWriterMarker>);
464
465#[derive(Debug)]
467pub struct NeedsData {
468 size: u64,
469 written: u64,
470 writer: blob_writer::BlobWriter,
471}
472
473#[derive(Debug)]
476pub struct NeedsBlobWritten;
477
478#[derive(Debug)]
480#[must_use]
481pub struct Blob<S> {
482 needed_blobs: fpkg::NeededBlobsProxy,
483 blob_id: BlobId,
484 state: S,
485}
486
487impl Blob<NeedsTruncate> {
488 pub async fn truncate(self, size: u64) -> Result<TruncateBlobSuccess, TruncateBlobError> {
490 let Self { needed_blobs, blob_id, state: NeedsTruncate(blob) } = self;
491
492 let writer = blob_writer::BlobWriter::create(blob.into_proxy(), size).await.map_err(
493 |e| match e {
494 blob_writer::CreateError::GetVmo(s) if s == Status::NO_SPACE => {
495 TruncateBlobError::NoSpace
496 }
497 _ => TruncateBlobError::CreateBlobWriter(e),
498 },
499 )?;
500
501 Ok(if size == 0 {
502 TruncateBlobSuccess::AllWritten(Blob { needed_blobs, blob_id, state: NeedsBlobWritten })
503 } else {
504 TruncateBlobSuccess::NeedsData(Blob {
505 needed_blobs,
506 blob_id,
507 state: NeedsData { size, written: 0, writer },
508 })
509 })
510 }
511
512 pub fn deconstruct(
520 self,
521 ) -> (fpkg::NeededBlobsProxy, BlobId, fidl::endpoints::ClientEnd<ffxfs::BlobWriterMarker>) {
522 let Blob { needed_blobs, blob_id, state: NeedsTruncate(client) } = self;
523 (needed_blobs, blob_id, client)
524 }
525}
526
527impl Blob<NeedsData> {
528 pub fn write(
534 self,
535 buf: &[u8],
536 ) -> impl Future<Output = Result<BlobWriteSuccess, WriteBlobError>> + '_ {
537 self.write_with_trace_callbacks(buf, &|_| {}, &|| {})
538 }
539
540 pub async fn write_with_trace_callbacks(
552 mut self,
553 buf: &[u8],
554 after_write: &(dyn Fn(u64) + Send + Sync),
555 after_write_ack: &(dyn Fn() + Send + Sync),
556 ) -> Result<BlobWriteSuccess, WriteBlobError> {
557 assert!(self.state.written + buf.len() as u64 <= self.state.size);
558
559 let fut = self.state.writer.write(buf);
560 let () = after_write(buf.len() as u64);
561 let res = fut.await;
562 let () = after_write_ack();
563 let () = res.map_err(|e| match e {
564 e @ blob_writer::WriteError::BytesReady(s) => match s {
565 Status::IO_DATA_INTEGRITY => WriteBlobError::Corrupt,
566 Status::NO_SPACE => WriteBlobError::NoSpace,
567 _ => WriteBlobError::FxBlob(e),
568 },
569 e => WriteBlobError::FxBlob(e),
570 })?;
571
572 self.state.written += buf.len() as u64;
573
574 if self.state.written == self.state.size {
575 let Self { needed_blobs, blob_id, state: _ } = self;
576 Ok(BlobWriteSuccess::AllWritten(Blob {
577 needed_blobs,
578 blob_id,
579 state: NeedsBlobWritten,
580 }))
581 } else {
582 Ok(BlobWriteSuccess::NeedsData(self))
583 }
584 }
585}
586
587impl Blob<NeedsBlobWritten> {
588 pub async fn blob_written(self) -> Result<(), BlobWrittenError> {
590 Ok(self.needed_blobs.blob_written(&self.blob_id.into()).await??)
591 }
592}
593
594#[derive(Debug, thiserror::Error)]
596#[allow(missing_docs)]
597pub enum OpenError {
598 #[error("the package does not exist")]
599 NotFound,
600
601 #[error("Open() responded with an unexpected status")]
602 UnexpectedResponse(#[source] Status),
603
604 #[error("transport error")]
605 Fidl(#[from] fidl::Error),
606}
607#[derive(Debug, thiserror::Error)]
609#[allow(missing_docs)]
610pub enum GetError {
611 #[error("Get() responded with an unexpected status")]
612 UnexpectedResponse(#[from] Status),
613
614 #[error("transport error")]
615 Fidl(#[from] fidl::Error),
616}
617
618#[derive(Debug, thiserror::Error)]
620#[allow(missing_docs)]
621pub enum OpenBlobError {
622 #[error("there is insufficient storage space available to persist this blob")]
623 OutOfSpace,
624
625 #[error("this blob is already open for write by another cache operation")]
626 ConcurrentWrite,
627
628 #[error("an unspecified error occurred during underlying I/O")]
629 UnspecifiedIo,
630
631 #[error("an unspecified error occurred")]
632 Internal,
633
634 #[error("transport error")]
635 Fidl(#[from] fidl::Error),
636}
637
638impl From<fpkg::OpenBlobError> for OpenBlobError {
639 fn from(e: fpkg::OpenBlobError) -> Self {
640 match e {
641 fpkg::OpenBlobError::OutOfSpace => OpenBlobError::OutOfSpace,
642 fpkg::OpenBlobError::ConcurrentWrite => OpenBlobError::ConcurrentWrite,
643 fpkg::OpenBlobError::UnspecifiedIo => OpenBlobError::UnspecifiedIo,
644 fpkg::OpenBlobError::Internal => OpenBlobError::Internal,
645 }
646 }
647}
648
649#[derive(Debug, thiserror::Error)]
651#[allow(missing_docs)]
652pub enum ListMissingBlobsError {
653 #[error("while obtaining the missing blobs fidl iterator")]
654 CallGetMissingBlobs(#[source] fidl::Error),
655
656 #[error("while obtaining the next chunk of blobs from the fidl iterator")]
657 CallNextOnBlobIterator(#[source] fidl::Error),
658}
659
660#[derive(Debug, thiserror::Error)]
662#[allow(missing_docs)]
663pub enum TruncateBlobError {
664 #[error("insufficient storage space is available")]
665 NoSpace,
666
667 #[error("creating blob writer")]
668 CreateBlobWriter(#[source] blob_writer::CreateError),
669
670 #[error("transport error")]
671 Fidl(#[from] fidl::Error),
672
673 #[error("blob is in an invalid state")]
674 BadState,
675}
676
677#[derive(Debug, thiserror::Error)]
679#[allow(missing_docs)]
680pub enum WriteBlobError {
681 #[error("the written data was corrupt")]
682 Corrupt,
683
684 #[error("insufficient storage space is available")]
685 NoSpace,
686
687 #[error("transport error")]
688 Fidl(#[from] fidl::Error),
689
690 #[error("while using the fxblob writer")]
691 FxBlob(#[source] blob_writer::WriteError),
692}
693
694#[derive(Debug, thiserror::Error)]
696#[allow(missing_docs)]
697pub enum BlobWrittenError {
698 #[error("pkg-cache could not find the blob after it was successfully written")]
699 MissingAfterWritten,
700
701 #[error("NeededBlobs.BlobWritten was called before the blob was opened")]
702 UnopenedBlob,
703
704 #[error("transport error")]
705 Fidl(#[from] fidl::Error),
706}
707
708impl From<fpkg::BlobWrittenError> for BlobWrittenError {
709 fn from(e: fpkg::BlobWrittenError) -> Self {
710 match e {
711 fpkg::BlobWrittenError::NotWritten => BlobWrittenError::MissingAfterWritten,
712 fpkg::BlobWrittenError::UnopenedBlob => BlobWrittenError::UnopenedBlob,
713 }
714 }
715}
716
717#[cfg(test)]
718mod tests {
719 use super::*;
720 use assert_matches::assert_matches;
721 use fidl::endpoints::{ClientEnd, ControlHandle as _, RequestStream as _};
722 use fidl_fuchsia_io as fio;
723 use fidl_fuchsia_pkg::{
724 BlobInfoIteratorRequest, NeededBlobsRequest, NeededBlobsRequestStream,
725 PackageCacheGetResponder, PackageCacheMarker, PackageCacheRequest,
726 PackageCacheRequestStream,
727 };
728
729 struct MockPackageCache {
730 stream: PackageCacheRequestStream,
731 }
732
733 impl MockPackageCache {
734 fn new() -> (Client, Self) {
735 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<PackageCacheMarker>();
736 (Client::from_proxy(proxy), Self { stream })
737 }
738
739 async fn expect_get(
740 &mut self,
741 blob_info: BlobInfo,
742 expected_gc_protection: fpkg::GcProtection,
743 ) -> PendingGet {
744 match self.stream.next().await {
745 Some(Ok(PackageCacheRequest::Get {
746 meta_far_blob,
747 gc_protection,
748 needed_blobs,
749 dir,
750 responder,
751 })) => {
752 assert_eq!(BlobInfo::from(meta_far_blob), blob_info);
753 assert_eq!(gc_protection, expected_gc_protection);
754 let needed_blobs = needed_blobs.into_stream();
755 let dir = dir.into_stream();
756
757 PendingGet { stream: needed_blobs, dir, responder }
758 }
759 r => panic!("Unexpected request: {r:?}"),
760 }
761 }
762
763 async fn expect_closed(mut self) {
764 assert_matches!(self.stream.next().await, None);
765 }
766 }
767
768 struct PendingGet {
769 stream: NeededBlobsRequestStream,
770 dir: fio::DirectoryRequestStream,
771 responder: PackageCacheGetResponder,
772 }
773
774 impl PendingGet {
775 async fn new() -> (Get, PendingGet) {
776 let (client, mut server) = MockPackageCache::new();
777
778 let get = client.get(blob_info(42), fpkg::GcProtection::OpenPackageTracking).unwrap();
779 let pending_get =
780 server.expect_get(blob_info(42), fpkg::GcProtection::OpenPackageTracking).await;
781 (get, pending_get)
782 }
783
784 fn finish_hold_stream_open(self) -> (NeededBlobsRequestStream, PackageDirProvider) {
785 self.stream.control_handle().shutdown_with_epitaph(Status::OK);
786 self.responder.send(Ok(())).unwrap();
787 (self.stream, PackageDirProvider { stream: self.dir })
788 }
789
790 fn finish(self) -> PackageDirProvider {
791 self.stream.control_handle().shutdown_with_epitaph(Status::OK);
792 self.responder.send(Ok(())).unwrap();
793 PackageDirProvider { stream: self.dir }
794 }
795
796 #[cfg(target_os = "fuchsia")]
797 fn fail_the_get(self) {
798 self.responder
799 .send(Err(Status::IO_INVALID.into_raw()))
800 .expect("client should be waiting");
801 }
802
803 async fn expect_open_meta_blob(
804 mut self,
805 res: Result<Option<ClientEnd<ffxfs::BlobWriterMarker>>, fpkg::OpenBlobError>,
806 ) -> Self {
807 match self.stream.next().await {
808 Some(Ok(NeededBlobsRequest::OpenMetaBlob { responder })) => {
809 responder.send(res).unwrap();
810 }
811 r => panic!("Unexpected request: {r:?}"),
812 }
813 self
814 }
815
816 async fn expect_open_blob(
817 mut self,
818 expected_blob_id: BlobId,
819 res: Result<Option<ClientEnd<ffxfs::BlobWriterMarker>>, fpkg::OpenBlobError>,
820 ) -> Self {
821 match self.stream.next().await {
822 Some(Ok(NeededBlobsRequest::OpenBlob {
823 blob_id,
824 allow_existing: _,
825 responder,
826 })) => {
827 assert_eq!(BlobId::from(blob_id), expected_blob_id);
828 responder.send(res).unwrap();
829 }
830 r => panic!("Unexpected request: {r:?}"),
831 }
832 self
833 }
834
835 async fn expect_get_missing_blobs(mut self, response_chunks: Vec<Vec<BlobInfo>>) -> Self {
836 match self.stream.next().await {
837 Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
838 let mut stream = iterator.into_stream();
839
840 for chunk in response_chunks {
842 let chunk = chunk.into_iter().map(fpkg::BlobInfo::from).collect::<Vec<_>>();
843
844 let BlobInfoIteratorRequest::Next { responder } =
845 stream.next().await.unwrap().unwrap();
846 responder.send(&chunk).unwrap();
847 }
848
849 let BlobInfoIteratorRequest::Next { responder } =
851 stream.next().await.unwrap().unwrap();
852 responder.send(&[]).unwrap();
853
854 assert_matches!(stream.next().await, None);
856 }
857 r => panic!("Unexpected request: {r:?}"),
858 }
859 self
860 }
861
862 async fn expect_get_missing_blobs_client_closes_channel(
863 mut self,
864 response_chunks: Vec<Vec<BlobInfo>>,
865 ) -> Self {
866 match self.stream.next().await {
867 Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
868 let mut stream = iterator.into_stream();
869
870 for chunk in response_chunks {
872 let chunk = chunk.into_iter().map(fpkg::BlobInfo::from).collect::<Vec<_>>();
873
874 let BlobInfoIteratorRequest::Next { responder } =
875 stream.next().await.unwrap().unwrap();
876 responder.send(&chunk).unwrap();
877 }
878
879 assert_matches!(stream.next().await, None);
881 }
882 r => panic!("Unexpected request: {r:?}"),
883 }
884 self
885 }
886
887 async fn expect_get_missing_blobs_inject_iterator_error(mut self) -> Self {
888 match self.stream.next().await {
889 Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
890 iterator
891 .into_stream_and_control_handle()
892 .1
893 .shutdown_with_epitaph(Status::ADDRESS_IN_USE);
894 }
895 r => panic!("Unexpected request: {r:?}"),
896 }
897 self
898 }
899
900 #[cfg(target_os = "fuchsia")]
901 async fn expect_abort(mut self) -> Self {
902 match self.stream.next().await {
903 Some(Ok(NeededBlobsRequest::Abort { responder })) => {
904 responder.send().unwrap();
905 }
906 r => panic!("Unexpected request: {r:?}"),
907 }
908 self
909 }
910 }
911
912 struct PackageDirProvider {
913 stream: fio::DirectoryRequestStream,
914 }
915
916 impl PackageDirProvider {
917 fn close_pkg_dir(self) {
918 self.stream.control_handle().shutdown_with_epitaph(Status::NOT_EMPTY);
919 }
920 }
921
922 fn blob_id(n: u8) -> BlobId {
923 BlobId::from([n; 32])
924 }
925
926 fn blob_info(n: u8) -> BlobInfo {
927 BlobInfo { blob_id: blob_id(n), length: 0 }
928 }
929
930 #[fuchsia_async::run_singlethreaded(test)]
931 async fn constructor() {
932 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<PackageCacheMarker>();
933 let client = Client::from_proxy(proxy);
934
935 drop(stream);
936 assert_matches!(client.proxy().sync().await, Err(_));
937 }
938
939 #[fuchsia_async::run_singlethreaded(test)]
940 async fn get_present_package() {
941 let (client, mut server) = MockPackageCache::new();
942
943 let ((), ()) = future::join(
944 async {
945 server
946 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
947 .await
948 .finish()
949 .close_pkg_dir();
950 server.expect_closed().await;
951 },
952 async move {
953 let mut get =
954 client.get(blob_info(2), fpkg::GcProtection::OpenPackageTracking).unwrap();
955
956 assert_matches!(get.open_meta_blob().await.unwrap(), None);
957 assert_eq!(get.get_missing_blobs().try_concat().await.unwrap(), vec![]);
958 let pkg_dir = get.finish().await.unwrap();
959
960 assert_matches!(
961 pkg_dir.into_proxy().take_event_stream().next().await,
962 Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
963 );
964 },
965 )
966 .await;
967 }
968
969 #[fuchsia_async::run_singlethreaded(test)]
970 async fn get_present_package_handles_slow_stream_close() {
971 let (client, mut server) = MockPackageCache::new();
972
973 let (send, recv) = futures::channel::oneshot::channel::<()>();
974
975 let ((), ()) = future::join(
976 async {
977 let (needed_blobs_stream, pkg_dir) = server
978 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
979 .await
980 .finish_hold_stream_open();
981 pkg_dir.close_pkg_dir();
982
983 let _ = recv.await;
985 drop(needed_blobs_stream);
986 },
987 async move {
988 let mut get =
989 client.get(blob_info(2), fpkg::GcProtection::OpenPackageTracking).unwrap();
990
991 assert_matches!(get.open_meta_blob().await.unwrap(), None);
992
993 let missing_blobs_stream = get.get_missing_blobs();
997 drop(send);
998 assert_eq!(missing_blobs_stream.try_concat().await.unwrap(), vec![]);
999 let pkg_dir = get.finish().await.unwrap();
1000
1001 assert_matches!(
1002 pkg_dir.into_proxy().take_event_stream().next().await,
1003 Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
1004 );
1005 },
1006 )
1007 .await;
1008 }
1009
1010 #[fuchsia_async::run_singlethreaded(test)]
1011 async fn needed_blobs_open_meta_far() {
1012 let (mut get, pending_get) = PendingGet::new().await;
1013
1014 let ((), ()) = future::join(
1015 async {
1016 pending_get
1017 .expect_open_meta_blob(Ok(None))
1018 .await
1019 .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1020 .await
1021 .expect_open_meta_blob(Ok(None))
1022 .await
1023 .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1024 .await
1025 .expect_open_meta_blob(Err(fpkg::OpenBlobError::OutOfSpace))
1026 .await
1027 .expect_open_meta_blob(Err(fpkg::OpenBlobError::UnspecifiedIo))
1028 .await;
1029 },
1030 async {
1031 {
1032 let opener = get.make_open_meta_blob();
1033 assert_matches!(opener.open().await.unwrap(), None);
1034 assert_matches!(opener.open().await.unwrap(), Some(_));
1035 }
1036 assert_matches!(get.open_meta_blob().await.unwrap(), None);
1037 assert_matches!(get.open_meta_blob().await.unwrap(), Some(_));
1038 assert_matches!(get.open_meta_blob().await, Err(OpenBlobError::OutOfSpace));
1039 assert_matches!(get.open_meta_blob().await, Err(OpenBlobError::UnspecifiedIo));
1040 },
1041 )
1042 .await;
1043 }
1044
1045 #[fuchsia_async::run_singlethreaded(test)]
1046 async fn needed_blobs_open_content_blob() {
1047 let (mut get, pending_get) = PendingGet::new().await;
1048
1049 let ((), ()) = future::join(
1050 async {
1051 pending_get
1052 .expect_open_blob(blob_id(2), Ok(None))
1053 .await
1054 .expect_open_blob(blob_id(2), Ok(Some(fidl::endpoints::create_endpoints().0)))
1055 .await
1056 .expect_open_blob(blob_id(10), Ok(None))
1057 .await
1058 .expect_open_blob(blob_id(11), Ok(Some(fidl::endpoints::create_endpoints().0)))
1059 .await
1060 .expect_open_blob(blob_id(12), Err(fpkg::OpenBlobError::OutOfSpace))
1061 .await
1062 .expect_open_blob(blob_id(13), Err(fpkg::OpenBlobError::UnspecifiedIo))
1063 .await;
1064 },
1065 async {
1066 {
1067 let opener = get.make_open_blob(blob_id(2));
1068 assert_matches!(opener.open().await.unwrap(), None);
1069 assert_matches!(opener.open().await.unwrap(), Some(_));
1070 }
1071 assert_matches!(get.open_blob(blob_id(10),).await.unwrap(), None);
1072 assert_matches!(get.open_blob(blob_id(11),).await.unwrap(), Some(_));
1073 assert_matches!(get.open_blob(blob_id(12),).await, Err(OpenBlobError::OutOfSpace));
1074 assert_matches!(
1075 get.open_blob(blob_id(13),).await,
1076 Err(OpenBlobError::UnspecifiedIo)
1077 );
1078 },
1079 )
1080 .await;
1081 }
1082
1083 #[fuchsia_async::run_singlethreaded(test)]
1084 async fn needed_blobs_get_missing_blobs_on_closed_ok() {
1085 let (mut get, pending_get) = PendingGet::new().await;
1086 let _ = pending_get.finish();
1087
1088 assert_matches!(get.open_meta_blob().await, Ok(None));
1089 assert_eq!(get.get_missing_blobs().try_concat().await.unwrap(), vec![]);
1090 }
1091
1092 #[fuchsia_async::run_singlethreaded(test)]
1093 async fn needed_blobs_get_missing_blobs() {
1094 let (mut get, pending_get) = PendingGet::new().await;
1095
1096 let ((), ()) = future::join(
1097 async {
1098 pending_get
1099 .expect_get_missing_blobs(vec![
1100 vec![blob_info(1), blob_info(2)],
1101 vec![blob_info(3)],
1102 ])
1103 .await;
1104 },
1105 async {
1106 assert_eq!(
1107 get.get_missing_blobs().try_concat().await.unwrap(),
1108 vec![blob_info(1), blob_info(2), blob_info(3)]
1109 );
1110 },
1111 )
1112 .await;
1113 }
1114
1115 #[fuchsia_async::run_singlethreaded(test)]
1116 async fn needed_blobs_get_missing_blobs_fail_to_obtain_iterator() {
1117 let (mut get, pending_get) = PendingGet::new().await;
1118 drop(pending_get);
1119
1120 assert_matches!(
1121 get.get_missing_blobs().try_concat().await,
1122 Err(ListMissingBlobsError::CallNextOnBlobIterator(
1123 fidl::Error::ClientChannelClosed{status, ..})
1124 )
1125 if status == Status::PEER_CLOSED
1126 );
1127 }
1128
1129 #[fuchsia_async::run_singlethreaded(test)]
1130 async fn needed_blobs_get_missing_blobs_iterator_contains_error() {
1131 let (mut get, pending_get) = PendingGet::new().await;
1132
1133 let (_, ()) =
1134 future::join(pending_get.expect_get_missing_blobs_inject_iterator_error(), async {
1135 assert_matches!(
1136 get.get_missing_blobs().try_concat().await,
1137 Err(ListMissingBlobsError::CallNextOnBlobIterator(
1138 fidl::Error::ClientChannelClosed{status, ..}
1139 ))
1140 if status == Status::ADDRESS_IN_USE
1141 );
1142 })
1143 .await;
1144 }
1145
1146 #[cfg(target_os = "fuchsia")]
1147 #[test]
1148 fn needed_blobs_abort() {
1149 use futures::future::Either;
1150 use futures::pin_mut;
1151 use std::task::Poll;
1152
1153 let mut executor = fuchsia_async::TestExecutor::new_with_fake_time();
1154
1155 let fut = async {
1156 let (get, pending_get) = PendingGet::new().await;
1157
1158 let abort_fut = get.abort().boxed();
1159 let expect_abort_fut = pending_get.expect_abort();
1160 pin_mut!(expect_abort_fut);
1161
1162 match futures::future::select(abort_fut, expect_abort_fut).await {
1163 Either::Left(((), _expect_abort_fut)) => {
1164 panic!("abort should wait for the get future to complete")
1165 }
1166 Either::Right((pending_get, abort_fut)) => (abort_fut, pending_get),
1167 }
1168 };
1169 pin_mut!(fut);
1170
1171 let (mut abort_fut, pending_get) = match executor.run_until_stalled(&mut fut) {
1172 Poll::Pending => panic!("should complete"),
1173 Poll::Ready((abort_fut, pending_get)) => (abort_fut, pending_get),
1174 };
1175
1176 assert_matches!(executor.run_until_stalled(&mut abort_fut), Poll::Pending);
1178 pending_get.fail_the_get();
1179 assert_matches!(executor.run_until_stalled(&mut abort_fut), Poll::Ready(()));
1180 }
1181
1182 struct MockNeededBlob {
1183 writer: ffxfs::BlobWriterRequestStream,
1184 needed_blobs: fpkg::NeededBlobsRequestStream,
1185 vmo: Option<zx::Vmo>,
1186 }
1187
1188 impl MockNeededBlob {
1189 fn mock_hash() -> BlobId {
1190 [7; 32].into()
1191 }
1192
1193 fn new() -> (NeededBlob, Self) {
1194 let (writer_client, writer) =
1195 fidl::endpoints::create_request_stream::<ffxfs::BlobWriterMarker>();
1196 let (needed_blobs_proxy, needed_blobs) =
1197 fidl::endpoints::create_proxy_and_stream::<fpkg::NeededBlobsMarker>();
1198 (
1199 NeededBlob {
1200 blob: Blob {
1201 needed_blobs: needed_blobs_proxy,
1202 blob_id: Self::mock_hash(),
1203 state: NeedsTruncate(writer_client),
1204 },
1205 },
1206 Self { writer, needed_blobs, vmo: None },
1207 )
1208 }
1209
1210 async fn fail_get_vmo(mut self) -> Self {
1211 match self.writer.next().await {
1212 Some(Ok(ffxfs::BlobWriterRequest::GetVmo { size: _, responder })) => {
1213 responder.send(Err(Status::NO_SPACE.into_raw())).unwrap();
1214 }
1215 r => panic!("Unexpected request: {r:?}"),
1216 }
1217 self
1218 }
1219
1220 async fn expect_get_vmo(mut self, expected_size: u64) -> Self {
1221 match self.writer.next().await {
1222 Some(Ok(ffxfs::BlobWriterRequest::GetVmo { size, responder })) => {
1223 assert_eq!(size, expected_size);
1224 let vmo = zx::Vmo::create(size).unwrap();
1225 assert!(self.vmo.is_none());
1226 self.vmo = Some(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap());
1227 responder.send(Ok(vmo)).unwrap();
1228 }
1229 r => panic!("Unexpected request: {r:?}"),
1230 }
1231 self
1232 }
1233
1234 async fn fail_bytes_ready(mut self) -> Self {
1235 match self.writer.next().await {
1236 Some(Ok(ffxfs::BlobWriterRequest::BytesReady { bytes_written: _, responder })) => {
1237 responder.send(Err(Status::NO_SPACE.into_raw())).unwrap();
1238 }
1239 r => panic!("Unexpected request: {r:?}"),
1240 }
1241 self
1242 }
1243
1244 async fn expect_bytes_ready(mut self, expected_payload: &[u8], offset: u64) -> Self {
1245 match self.writer.next().await {
1246 Some(Ok(ffxfs::BlobWriterRequest::BytesReady { bytes_written, responder })) => {
1247 assert_eq!(bytes_written, u64::try_from(expected_payload.len()).unwrap());
1248
1249 let vmo = self.vmo.as_ref().unwrap();
1250 let mut buf = vec![0; expected_payload.len()];
1251 let () = vmo.read(&mut buf, offset).unwrap();
1252 assert_eq!(buf, expected_payload);
1253
1254 responder.send(Ok(())).unwrap();
1255 }
1256 r => panic!("Unexpected request: {r:?}"),
1257 }
1258 self
1259 }
1260
1261 async fn expect_blob_written(mut self) -> Self {
1262 match self.needed_blobs.next().await {
1263 Some(Ok(fpkg::NeededBlobsRequest::BlobWritten { blob_id, responder })) => {
1264 assert_eq!(blob_id, Self::mock_hash().into());
1265 responder.send(Ok(())).unwrap();
1266 }
1267 r => panic!("Unexpected request: {r:?}"),
1268 }
1269 self
1270 }
1271 }
1272
1273 #[fuchsia_async::run_singlethreaded(test)]
1274 async fn empty_blob_write() {
1275 let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1276
1277 let ((), ()) = future::join(
1278 async {
1279 blob_server.expect_get_vmo(0).await.expect_blob_written().await;
1280 },
1281 async {
1282 let blob = match blob.truncate(0).await.unwrap() {
1283 TruncateBlobSuccess::AllWritten(blob) => blob,
1284 other => panic!("empty blob shouldn't need bytes {other:?}"),
1285 };
1286 let () = blob.blob_written().await.unwrap();
1287 },
1288 )
1289 .await;
1290 }
1291
1292 impl TruncateBlobSuccess {
1293 fn unwrap_needs_data(self) -> Blob<NeedsData> {
1294 match self {
1295 TruncateBlobSuccess::NeedsData(blob) => blob,
1296 TruncateBlobSuccess::AllWritten(_) => panic!("blob should need data"),
1297 }
1298 }
1299 }
1300
1301 impl BlobWriteSuccess {
1302 fn unwrap_needs_data(self) -> Blob<NeedsData> {
1303 match self {
1304 BlobWriteSuccess::NeedsData(blob) => blob,
1305 BlobWriteSuccess::AllWritten(_) => panic!("blob should need data"),
1306 }
1307 }
1308
1309 fn unwrap_all_written(self) -> Blob<NeedsBlobWritten> {
1310 match self {
1311 BlobWriteSuccess::NeedsData(_) => panic!("blob should be completely written"),
1312 BlobWriteSuccess::AllWritten(blob) => blob,
1313 }
1314 }
1315 }
1316
1317 #[fuchsia_async::run_singlethreaded(test)]
1318 async fn small_blob_write() {
1319 let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1320
1321 let ((), ()) = future::join(
1322 async {
1323 blob_server
1324 .expect_get_vmo(4)
1325 .await
1326 .expect_bytes_ready(b"test", 0)
1327 .await
1328 .expect_blob_written()
1329 .await;
1330 },
1331 async {
1332 let blob = blob.truncate(4).await.unwrap().unwrap_needs_data();
1333 let blob = blob.write(b"test").await.unwrap().unwrap_all_written();
1334 let () = blob.blob_written().await.unwrap();
1335 },
1336 )
1337 .await;
1338 }
1339
1340 #[fuchsia_async::run_singlethreaded(test)]
1341 async fn blob_truncate_no_space() {
1342 let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1343
1344 let ((), ()) = future::join(
1345 async {
1346 blob_server.fail_get_vmo().await;
1347 },
1348 async {
1349 assert_matches!(blob.truncate(4).await, Err(TruncateBlobError::NoSpace));
1350 },
1351 )
1352 .await;
1353 }
1354
1355 #[fuchsia_async::run_singlethreaded(test)]
1356 async fn blob_write_no_space() {
1357 let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1358
1359 let ((), ()) = future::join(
1360 async {
1361 blob_server.expect_get_vmo(4).await.fail_bytes_ready().await;
1362 },
1363 async {
1364 let blob = blob.truncate(4).await.unwrap().unwrap_needs_data();
1365 assert_matches!(blob.write(b"test").await, Err(WriteBlobError::NoSpace));
1366 },
1367 )
1368 .await;
1369 }
1370
1371 #[fuchsia_async::run_singlethreaded(test)]
1372 async fn blob_write_multiple_write() {
1373 let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1374
1375 let ((), ()) = future::join(
1376 async {
1377 blob_server
1378 .expect_get_vmo(6)
1379 .await
1380 .expect_bytes_ready(b"abc", 0)
1381 .await
1382 .expect_bytes_ready(b"123", 3)
1383 .await
1384 .expect_blob_written()
1385 .await;
1386 },
1387 async {
1388 let blob = blob.truncate(6).await.unwrap().unwrap_needs_data();
1389 let blob = blob.write(b"abc").await.unwrap().unwrap_needs_data();
1390 let blob = blob.write(b"123").await.unwrap().unwrap_all_written();
1391 let () = blob.blob_written().await.unwrap();
1392 },
1393 )
1394 .await;
1395 }
1396
1397 #[fuchsia_async::run_singlethreaded(test)]
1398 async fn get_already_cached_success() {
1399 let (client, mut server) = MockPackageCache::new();
1400
1401 let ((), ()) = future::join(
1402 async {
1403 server
1404 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1405 .await
1406 .finish()
1407 .close_pkg_dir();
1408 server.expect_closed().await;
1409 },
1410 async move {
1411 let pkg_dir = client.get_already_cached(blob_id(2)).await.unwrap();
1412
1413 assert_matches!(
1414 pkg_dir.into_proxy().take_event_stream().next().await,
1415 Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
1416 );
1417 },
1418 )
1419 .await;
1420 }
1421
1422 #[fuchsia_async::run_singlethreaded(test)]
1423 async fn get_already_cached_missing_meta_far() {
1424 let (client, mut server) = MockPackageCache::new();
1425
1426 let ((), ()) = future::join(
1427 async {
1428 server
1429 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1430 .await
1431 .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1432 .await;
1433 },
1434 async move {
1435 assert_matches!(
1436 client.get_already_cached(blob_id(2)).await,
1437 Err(GetAlreadyCachedError::MissingMetaFar)
1438 );
1439 },
1440 )
1441 .await;
1442 }
1443
1444 #[fuchsia_async::run_singlethreaded(test)]
1445 async fn get_already_cached_missing_content_blob() {
1446 let (client, mut server) = MockPackageCache::new();
1447
1448 let ((), ()) = future::join(
1449 async {
1450 server
1451 .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1452 .await
1453 .expect_open_meta_blob(Ok(None))
1454 .await
1455 .expect_get_missing_blobs_client_closes_channel(vec![vec![BlobInfo {
1456 blob_id: [0; 32].into(),
1457 length: 0,
1458 }]])
1459 .await;
1460 },
1461 async move {
1462 assert_matches!(
1463 client.get_already_cached(blob_id(2)).await,
1464 Err(GetAlreadyCachedError::MissingContentBlobs(v))
1465 if v == vec![BlobInfo {
1466 blob_id: [0; 32].into(),
1467 length: 0,
1468 }]
1469 );
1470 },
1471 )
1472 .await;
1473 }
1474}