Skip to main content

fidl_fuchsia_pkg_ext/
cache.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![cfg(target_os = "fuchsia")]
6#![deny(missing_docs)]
7
8//! Wrapper types for [`fidl_fuchsia_pkg::PackageCacheProxy`] and its related protocols.
9
10use crate::types::{BlobId, BlobInfo};
11use fidl_fuchsia_fxfs as ffxfs;
12use fidl_fuchsia_pkg as fpkg;
13use fuchsia_pkg::PackageDirectory;
14use futures::prelude::*;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use zx_status::Status;
18
19/// An open connection to a provider of the `fuchsia.pkg.PackageCache`.
20#[derive(Debug, Clone)]
21pub struct Client {
22    proxy: fpkg::PackageCacheProxy,
23}
24
25impl Client {
26    /// Constructs a client from the given proxy.
27    pub fn from_proxy(proxy: fpkg::PackageCacheProxy) -> Self {
28        Self { proxy }
29    }
30
31    /// Returns a reference to the underlying PackageCacheProxy connection.
32    pub fn proxy(&self) -> &fpkg::PackageCacheProxy {
33        &self.proxy
34    }
35
36    /// Opens the package specified by `meta_far_blob` with the intent to fetch any missing blobs
37    /// using the returned [`Get`] type if needed.
38    pub fn get(
39        &self,
40        meta_far_blob: BlobInfo,
41        gc_protection: fpkg::GcProtection,
42    ) -> Result<Get, fidl::Error> {
43        let (needed_blobs, needed_blobs_server_end) =
44            fidl::endpoints::create_proxy::<fpkg::NeededBlobsMarker>();
45        let (pkg_dir, pkg_dir_server_end) = PackageDirectory::create_request()?;
46
47        let get_fut = self.proxy.get(
48            &meta_far_blob.into(),
49            gc_protection,
50            needed_blobs_server_end,
51            pkg_dir_server_end,
52        );
53
54        Ok(Get {
55            get_fut,
56            pkg_dir,
57            needed_blobs,
58            pkg_present: SharedBoolEvent::new(),
59            meta_far: meta_far_blob.blob_id,
60        })
61    }
62
63    /// Uses PackageCache.Get to obtain the package directory of a package that is already cached
64    /// (all blobs are already in blobfs).
65    /// Errors if the package is not already cached.
66    /// Always uses open package tracking GC protection, because OTA (the only client of Retained
67    /// GC protection), should never need to get an already cached package.
68    ///
69    /// Compared to `get_cached`:
70    ///   * Activates `meta_far_blob` in the dynamic index
71    ///   * Must not be called concurrently with the same `meta_far_blob`
72    pub async fn get_already_cached(
73        &self,
74        meta_far_blob: BlobId,
75    ) -> Result<PackageDirectory, GetAlreadyCachedError> {
76        let mut get = self
77            .get(
78                BlobInfo { blob_id: meta_far_blob, length: 0 },
79                fpkg::GcProtection::OpenPackageTracking,
80            )
81            .map_err(GetAlreadyCachedError::Get)?;
82        if let Some(_) = get.open_meta_blob().await.map_err(GetAlreadyCachedError::OpenMetaBlob)? {
83            return Err(GetAlreadyCachedError::MissingMetaFar);
84        }
85
86        if let Some(missing_blobs) = get
87            .get_missing_blobs()
88            .try_next()
89            .await
90            .map_err(GetAlreadyCachedError::GetMissingBlobs)?
91        {
92            return Err(GetAlreadyCachedError::MissingContentBlobs(missing_blobs));
93        }
94
95        get.finish().await.map_err(GetAlreadyCachedError::FinishGet)
96    }
97
98    /// Uses PackageCache.GetSubpackage to obtain the package directory of a subpackage.
99    /// Errors if there is not an open connection to the superpackage.
100    pub async fn get_subpackage(
101        &self,
102        superpackage: BlobId,
103        subpackage: &fuchsia_url::RelativePackageUrl,
104    ) -> Result<PackageDirectory, GetSubpackageError> {
105        let (dir, dir_server_end) =
106            PackageDirectory::create_request().map_err(GetSubpackageError::CreatingHandles)?;
107        let () = self
108            .proxy
109            .get_subpackage(
110                &superpackage.into(),
111                &fpkg::PackageUrl { url: subpackage.into() },
112                dir_server_end,
113            )
114            .await
115            .map_err(GetSubpackageError::CallingGetSubpackage)??;
116        Ok(dir)
117    }
118
119    /// Write blobs using the returned [`WriteBlobs`] type.
120    pub fn write_blobs(&self) -> Result<WriteBlobs, fidl::Error> {
121        let (needed_blobs, needed_blobs_server_end) =
122            fidl::endpoints::create_proxy::<fpkg::NeededBlobsMarker>();
123
124        let () = self.proxy.write_blobs(needed_blobs_server_end)?;
125
126        Ok(WriteBlobs { needed_blobs })
127    }
128}
129
130#[derive(thiserror::Error, Debug)]
131#[allow(missing_docs)]
132pub enum GetAlreadyCachedError {
133    #[error("calling get")]
134    Get(#[source] fidl::Error),
135
136    #[error("opening meta blob")]
137    OpenMetaBlob(#[source] OpenBlobError),
138
139    #[error("meta.far blob not cached")]
140    MissingMetaFar,
141
142    #[error("getting missing blobs")]
143    GetMissingBlobs(#[source] ListMissingBlobsError),
144
145    #[error("content blobs not cached {0:?}")]
146    MissingContentBlobs(Vec<BlobInfo>),
147
148    #[error("finishing get")]
149    FinishGet(#[source] GetError),
150}
151
152impl GetAlreadyCachedError {
153    /// Returns true if the get failed because the package was not cached.
154    pub fn was_not_cached(&self) -> bool {
155        use GetAlreadyCachedError::*;
156        match self {
157            Get(..) | OpenMetaBlob(..) | GetMissingBlobs(..) | FinishGet(..) => false,
158            MissingMetaFar | MissingContentBlobs(..) => true,
159        }
160    }
161}
162
163#[derive(thiserror::Error, Debug)]
164#[allow(missing_docs)]
165pub enum GetSubpackageError {
166    #[error("creating handles")]
167    CreatingHandles(#[source] fidl::Error),
168
169    #[error("calling GetCached FIDL")]
170    CallingGetSubpackage(#[source] fidl::Error),
171
172    #[error("the superpackage does not have an open package connection")]
173    SuperpackageClosed,
174
175    #[error("the subpackage does not exist")]
176    DoesNotExist,
177
178    #[error("internal")]
179    Internal,
180}
181
182impl From<fpkg::GetSubpackageError> for GetSubpackageError {
183    fn from(fidl: fpkg::GetSubpackageError) -> Self {
184        use GetSubpackageError::*;
185        use fpkg::GetSubpackageError as fErr;
186        match fidl {
187            fErr::SuperpackageClosed => SuperpackageClosed,
188            fErr::DoesNotExist => DoesNotExist,
189            fErr::Internal => Internal,
190        }
191    }
192}
193
194#[derive(Debug, Clone)]
195struct SharedBoolEvent(Arc<AtomicBool>);
196
197impl SharedBoolEvent {
198    fn new() -> Self {
199        Self(Arc::new(AtomicBool::new(false)))
200    }
201
202    fn get(&self) -> bool {
203        self.0.load(Ordering::SeqCst)
204    }
205
206    fn set(&self) {
207        self.0.store(true, Ordering::SeqCst)
208    }
209}
210
211async fn open_blob(
212    needed_blobs: &fpkg::NeededBlobsProxy,
213    kind: OpenKind,
214    blob_id: BlobId,
215    pkg_present: Option<&SharedBoolEvent>,
216    allow_existing: bool,
217) -> Result<Option<NeededBlob>, OpenBlobError> {
218    let open_fut = match kind {
219        OpenKind::Meta => needed_blobs.open_meta_blob(),
220        OpenKind::Content => needed_blobs.open_blob(&blob_id.into(), allow_existing),
221    };
222    match open_fut.await {
223        Err(fidl::Error::ClientChannelClosed { status: Status::OK, .. }) => {
224            if let Some(pkg_present) = pkg_present {
225                pkg_present.set();
226            }
227            Ok(None)
228        }
229        res => {
230            if let Some(blob) = res?? {
231                Ok(Some(NeededBlob {
232                    blob: Blob {
233                        needed_blobs: needed_blobs.clone(),
234                        blob_id,
235                        state: NeedsTruncate(blob),
236                    },
237                }))
238            } else {
239                Ok(None)
240            }
241        }
242    }
243}
244
245#[derive(Debug, Clone, Copy, PartialEq, Eq)]
246enum OpenKind {
247    Meta,
248    Content,
249}
250
251/// A deferred call to [`Get::open_meta_blob`] or [`Get::open_blob`].
252#[derive(Debug)]
253pub struct DeferredOpenBlob {
254    needed_blobs: fpkg::NeededBlobsProxy,
255    kind: OpenKind,
256    blob_id: BlobId,
257    allow_existing: bool,
258    pkg_present: Option<SharedBoolEvent>,
259}
260
261impl DeferredOpenBlob {
262    /// Opens the blob for write, if it is still needed. The blob's data can be provided using the
263    /// returned NeededBlob.
264    pub async fn open(&self) -> Result<Option<NeededBlob>, OpenBlobError> {
265        open_blob(
266            &self.needed_blobs,
267            self.kind,
268            self.blob_id,
269            self.pkg_present.as_ref(),
270            self.allow_existing,
271        )
272        .await
273    }
274
275    fn proxy_cmp_key(&self) -> u32 {
276        use fidl::AsHandleRef;
277        use fidl::endpoints::Proxy;
278        self.needed_blobs.as_channel().as_handle_ref().raw_handle()
279    }
280}
281
282impl std::cmp::PartialEq for DeferredOpenBlob {
283    fn eq(&self, other: &Self) -> bool {
284        self.proxy_cmp_key() == other.proxy_cmp_key() && self.kind == other.kind
285    }
286}
287
288impl std::cmp::Eq for DeferredOpenBlob {}
289
290/// A pending `fuchsia.pkg/PackageCache.Get()` request. Clients must, in order:
291/// 1. open/write the meta blob, if Some(NeededBlob) is provided by that API
292/// 2. enumerate all missing content blobs
293/// 3. open/write all missing content blobs, if Some(NeededBlob) is provided by that API
294/// 4. finish() to complete the Get() request.
295#[derive(Debug)]
296pub struct Get {
297    get_fut: fidl::client::QueryResponseFut<Result<(), i32>>,
298    needed_blobs: fpkg::NeededBlobsProxy,
299    pkg_dir: PackageDirectory,
300    pkg_present: SharedBoolEvent,
301    meta_far: BlobId,
302}
303
304impl Get {
305    /// Returns an independent object that can be used to open the meta blob for write.  See
306    /// [`Self::open_meta_blob`].
307    pub fn make_open_meta_blob(&mut self) -> DeferredOpenBlob {
308        DeferredOpenBlob {
309            needed_blobs: self.needed_blobs.clone(),
310            kind: OpenKind::Meta,
311            blob_id: self.meta_far,
312            allow_existing: false,
313            pkg_present: Some(self.pkg_present.clone()),
314        }
315    }
316
317    /// Opens the meta blob for write, if it is still needed. The blob's data can be provided using
318    /// the returned NeededBlob.
319    pub async fn open_meta_blob(&mut self) -> Result<Option<NeededBlob>, OpenBlobError> {
320        open_blob(&self.needed_blobs, OpenKind::Meta, self.meta_far, Some(&self.pkg_present), false)
321            .await
322    }
323
324    fn start_get_missing_blobs(
325        &mut self,
326    ) -> Result<Option<fpkg::BlobInfoIteratorProxy>, fidl::Error> {
327        if self.pkg_present.get() {
328            return Ok(None);
329        }
330
331        let (blob_iterator, blob_iterator_server_end) =
332            fidl::endpoints::create_proxy::<fpkg::BlobInfoIteratorMarker>();
333
334        self.needed_blobs.get_missing_blobs(blob_iterator_server_end)?;
335        Ok(Some(blob_iterator))
336    }
337
338    /// Determines the set of blobs that the caller must open/write to complete this `Get()`
339    /// operation.
340    /// The returned stream will never yield an empty `Vec`.
341    /// Callers should process the missing blobs (via `make_open_blob` or `open_blob`) concurrently
342    /// with reading the stream to guarantee stream termination.
343    pub fn get_missing_blobs(
344        &mut self,
345    ) -> impl Stream<Item = Result<Vec<BlobInfo>, ListMissingBlobsError>> + Unpin + use<> {
346        match self.start_get_missing_blobs() {
347            Ok(option_iter) => match option_iter {
348                Some(iterator) => crate::fidl_iterator_to_stream(iterator)
349                    .map_ok(|v| v.into_iter().map(BlobInfo::from).collect())
350                    .map_err(ListMissingBlobsError::CallNextOnBlobIterator)
351                    .left_stream(),
352                None => futures::stream::empty().right_stream(),
353            }
354            .left_stream(),
355            Err(e) => {
356                futures::stream::iter(Some(Err(ListMissingBlobsError::CallGetMissingBlobs(e))))
357                    .right_stream()
358            }
359        }
360    }
361
362    /// Returns an independent object that can be used to open the `content_blob` for write.  See
363    /// [`Self::open_blob`].
364    pub fn make_open_blob(&mut self, content_blob: BlobId) -> DeferredOpenBlob {
365        DeferredOpenBlob {
366            needed_blobs: self.needed_blobs.clone(),
367            kind: OpenKind::Content,
368            blob_id: content_blob,
369            allow_existing: false,
370            pkg_present: None,
371        }
372    }
373
374    /// Opens `content_blob` for write, if it is still needed. The blob's data can be provided
375    /// using the returned NeededBlob.
376    pub async fn open_blob(
377        &mut self,
378        content_blob: BlobId,
379    ) -> Result<Option<NeededBlob>, OpenBlobError> {
380        open_blob(&self.needed_blobs, OpenKind::Content, content_blob, None, false).await
381    }
382
383    /// Notifies the endpoint that all blobs have been written and wait for the response to the
384    /// pending `Get()` request, returning the cached [`PackageDirectory`].
385    pub async fn finish(self) -> Result<PackageDirectory, GetError> {
386        drop(self.needed_blobs);
387        let () = self.get_fut.await?.map_err(Status::from_raw)?;
388        Ok(self.pkg_dir)
389    }
390
391    /// Aborts this caching operation for the package.
392    pub async fn abort(self) {
393        self.needed_blobs.abort().map(|_: Result<(), fidl::Error>| ()).await;
394        // The package is not guaranteed to be removed from the dynamic index after abort
395        // returns, we have to wait until finish returns (to prevent a resolve retry from
396        // racing). The finish call will return an error that just tells us that we called
397        // abort, so we ignore it.
398        let _ = self.get_fut.await;
399    }
400}
401
402/// A pending `fuchsia.pkg/PackageCache.WriteBlob()` request.
403#[derive(Clone, Debug)]
404pub struct WriteBlobs {
405    needed_blobs: fpkg::NeededBlobsProxy,
406}
407
408impl WriteBlobs {
409    /// Returns an independent object that can be used to open the `blob` for write.  See
410    /// [`Self::open_blob`].
411    pub fn make_open_blob(&mut self, blob: BlobId, allow_existing: bool) -> DeferredOpenBlob {
412        DeferredOpenBlob {
413            needed_blobs: self.needed_blobs.clone(),
414            kind: OpenKind::Content,
415            blob_id: blob,
416            allow_existing,
417            pkg_present: None,
418        }
419    }
420
421    /// Opens `blob` for write. The blob's data can be provided using the returned NeededBlob.
422    pub async fn open_blob(
423        &mut self,
424        blob: BlobId,
425        allow_existing: bool,
426    ) -> Result<Option<NeededBlob>, OpenBlobError> {
427        open_blob(&self.needed_blobs, OpenKind::Content, blob, None, allow_existing).await
428    }
429}
430
431/// A blob that needs to be written.
432#[derive(Debug)]
433pub struct NeededBlob {
434    /// Typestate wrapper around the blob. Clients must first call truncate(), then write() until
435    /// all data is provided.
436    pub blob: Blob<NeedsTruncate>,
437}
438
439/// The successful result of truncating a blob.
440#[derive(Debug)]
441pub enum TruncateBlobSuccess {
442    /// The blob contents need to be written.
443    NeedsData(Blob<NeedsData>),
444
445    /// The blob is fully written (it was the empty blob) and now a
446    /// fuchsia.pkg.NeededBlobs.BlobWritten message should be sent.
447    AllWritten(Blob<NeedsBlobWritten>),
448}
449
450/// The successful result of writing some data to a blob.
451#[derive(Debug)]
452pub enum BlobWriteSuccess {
453    /// There is still more data to write.
454    NeedsData(Blob<NeedsData>),
455
456    /// The blob is fully written and now a fuchsia.pkg.NeededBlobs.BlobWritten
457    /// message should be sent.
458    AllWritten(Blob<NeedsBlobWritten>),
459}
460
461/// State for a blob that can be truncated.
462#[derive(Debug)]
463pub struct NeedsTruncate(fidl::endpoints::ClientEnd<ffxfs::BlobWriterMarker>);
464
465/// State for a blob that can be written to.
466#[derive(Debug)]
467pub struct NeedsData {
468    size: u64,
469    written: u64,
470    writer: blob_writer::BlobWriter,
471}
472
473/// State for a blob that has been fully written but that needs a
474/// fuchsia.pkg.NeededBlobs.BlobWritten message sent to pkg-cache.
475#[derive(Debug)]
476pub struct NeedsBlobWritten;
477
478/// A blob in the process of being written.
479#[derive(Debug)]
480#[must_use]
481pub struct Blob<S> {
482    needed_blobs: fpkg::NeededBlobsProxy,
483    blob_id: BlobId,
484    state: S,
485}
486
487impl Blob<NeedsTruncate> {
488    /// Truncates the blob to the given size. On success, the blob enters the writable state.
489    pub async fn truncate(self, size: u64) -> Result<TruncateBlobSuccess, TruncateBlobError> {
490        let Self { needed_blobs, blob_id, state: NeedsTruncate(blob) } = self;
491
492        let writer = blob_writer::BlobWriter::create(blob.into_proxy(), size).await.map_err(
493            |e| match e {
494                blob_writer::CreateError::GetVmo(s) if s == Status::NO_SPACE => {
495                    TruncateBlobError::NoSpace
496                }
497                _ => TruncateBlobError::CreateBlobWriter(e),
498            },
499        )?;
500
501        Ok(if size == 0 {
502            TruncateBlobSuccess::AllWritten(Blob { needed_blobs, blob_id, state: NeedsBlobWritten })
503        } else {
504            TruncateBlobSuccess::NeedsData(Blob {
505                needed_blobs,
506                blob_id,
507                state: NeedsData { size, written: 0, writer },
508            })
509        })
510    }
511
512    /// Return the fields for manual handling of the protocol.
513    ///
514    /// The returned BlobWriter client end is in the initial state of the protocol (GetVmo has not
515    /// been called).
516    ///
517    /// Once the blob has been written through the BlobWriter, BlobWritten should be called on the
518    /// returned NeededBlobs proxy. The returned NeededBlobs proxy should not otherwise be used.
519    pub fn deconstruct(
520        self,
521    ) -> (fpkg::NeededBlobsProxy, BlobId, fidl::endpoints::ClientEnd<ffxfs::BlobWriterMarker>) {
522        let Blob { needed_blobs, blob_id, state: NeedsTruncate(client) } = self;
523        (needed_blobs, blob_id, client)
524    }
525}
526
527impl Blob<NeedsData> {
528    /// Writes all of the given buffer to the blob.
529    ///
530    /// # Panics
531    ///
532    /// Panics if a write is attempted with a buf larger than the remaining blob size.
533    pub fn write(
534        self,
535        buf: &[u8],
536    ) -> impl Future<Output = Result<BlobWriteSuccess, WriteBlobError>> + '_ {
537        self.write_with_trace_callbacks(buf, &|_| {}, &|| {})
538    }
539
540    /// Writes all of the given buffer to the blob.
541    ///
542    /// `after_write` and `after_write_ack` are called before and after, respectively, waiting for
543    /// the server to acknowledge writes.
544    /// They may be called multiple times if the write of `buf` is chunked.
545    /// `after_write` is given the size of each write in bytes.
546    /// Useful for creating trace spans.
547    ///
548    /// # Panics
549    ///
550    /// Panics if a write is attempted with a buf larger than the remaining blob size.
551    pub async fn write_with_trace_callbacks(
552        mut self,
553        buf: &[u8],
554        after_write: &(dyn Fn(u64) + Send + Sync),
555        after_write_ack: &(dyn Fn() + Send + Sync),
556    ) -> Result<BlobWriteSuccess, WriteBlobError> {
557        assert!(self.state.written + buf.len() as u64 <= self.state.size);
558
559        let fut = self.state.writer.write(buf);
560        let () = after_write(buf.len() as u64);
561        let res = fut.await;
562        let () = after_write_ack();
563        let () = res.map_err(|e| match e {
564            e @ blob_writer::WriteError::BytesReady(s) => match s {
565                Status::IO_DATA_INTEGRITY => WriteBlobError::Corrupt,
566                Status::NO_SPACE => WriteBlobError::NoSpace,
567                _ => WriteBlobError::FxBlob(e),
568            },
569            e => WriteBlobError::FxBlob(e),
570        })?;
571
572        self.state.written += buf.len() as u64;
573
574        if self.state.written == self.state.size {
575            let Self { needed_blobs, blob_id, state: _ } = self;
576            Ok(BlobWriteSuccess::AllWritten(Blob {
577                needed_blobs,
578                blob_id,
579                state: NeedsBlobWritten,
580            }))
581        } else {
582            Ok(BlobWriteSuccess::NeedsData(self))
583        }
584    }
585}
586
587impl Blob<NeedsBlobWritten> {
588    /// Tells pkg-cache that the blob has been successfully written and can now be read.
589    pub async fn blob_written(self) -> Result<(), BlobWrittenError> {
590        Ok(self.needed_blobs.blob_written(&self.blob_id.into()).await??)
591    }
592}
593
594/// An error encountered while opening a package.
595#[derive(Debug, thiserror::Error)]
596#[allow(missing_docs)]
597pub enum OpenError {
598    #[error("the package does not exist")]
599    NotFound,
600
601    #[error("Open() responded with an unexpected status")]
602    UnexpectedResponse(#[source] Status),
603
604    #[error("transport error")]
605    Fidl(#[from] fidl::Error),
606}
607/// An error encountered while caching a package.
608#[derive(Debug, thiserror::Error)]
609#[allow(missing_docs)]
610pub enum GetError {
611    #[error("Get() responded with an unexpected status")]
612    UnexpectedResponse(#[from] Status),
613
614    #[error("transport error")]
615    Fidl(#[from] fidl::Error),
616}
617
618/// An error encountered while opening a metadata or content blob for write.
619#[derive(Debug, thiserror::Error)]
620#[allow(missing_docs)]
621pub enum OpenBlobError {
622    #[error("there is insufficient storage space available to persist this blob")]
623    OutOfSpace,
624
625    #[error("this blob is already open for write by another cache operation")]
626    ConcurrentWrite,
627
628    #[error("an unspecified error occurred during underlying I/O")]
629    UnspecifiedIo,
630
631    #[error("an unspecified error occurred")]
632    Internal,
633
634    #[error("transport error")]
635    Fidl(#[from] fidl::Error),
636}
637
638impl From<fpkg::OpenBlobError> for OpenBlobError {
639    fn from(e: fpkg::OpenBlobError) -> Self {
640        match e {
641            fpkg::OpenBlobError::OutOfSpace => OpenBlobError::OutOfSpace,
642            fpkg::OpenBlobError::ConcurrentWrite => OpenBlobError::ConcurrentWrite,
643            fpkg::OpenBlobError::UnspecifiedIo => OpenBlobError::UnspecifiedIo,
644            fpkg::OpenBlobError::Internal => OpenBlobError::Internal,
645        }
646    }
647}
648
649/// An error encountered while enumerating missing content blobs.
650#[derive(Debug, thiserror::Error)]
651#[allow(missing_docs)]
652pub enum ListMissingBlobsError {
653    #[error("while obtaining the missing blobs fidl iterator")]
654    CallGetMissingBlobs(#[source] fidl::Error),
655
656    #[error("while obtaining the next chunk of blobs from the fidl iterator")]
657    CallNextOnBlobIterator(#[source] fidl::Error),
658}
659
660/// An error encountered while truncating a blob
661#[derive(Debug, thiserror::Error)]
662#[allow(missing_docs)]
663pub enum TruncateBlobError {
664    #[error("insufficient storage space is available")]
665    NoSpace,
666
667    #[error("creating blob writer")]
668    CreateBlobWriter(#[source] blob_writer::CreateError),
669
670    #[error("transport error")]
671    Fidl(#[from] fidl::Error),
672
673    #[error("blob is in an invalid state")]
674    BadState,
675}
676
677/// An error encountered while writing a blob.
678#[derive(Debug, thiserror::Error)]
679#[allow(missing_docs)]
680pub enum WriteBlobError {
681    #[error("the written data was corrupt")]
682    Corrupt,
683
684    #[error("insufficient storage space is available")]
685    NoSpace,
686
687    #[error("transport error")]
688    Fidl(#[from] fidl::Error),
689
690    #[error("while using the fxblob writer")]
691    FxBlob(#[source] blob_writer::WriteError),
692}
693
694/// An error encountered while sending the BlobWritten message.
695#[derive(Debug, thiserror::Error)]
696#[allow(missing_docs)]
697pub enum BlobWrittenError {
698    #[error("pkg-cache could not find the blob after it was successfully written")]
699    MissingAfterWritten,
700
701    #[error("NeededBlobs.BlobWritten was called before the blob was opened")]
702    UnopenedBlob,
703
704    #[error("transport error")]
705    Fidl(#[from] fidl::Error),
706}
707
708impl From<fpkg::BlobWrittenError> for BlobWrittenError {
709    fn from(e: fpkg::BlobWrittenError) -> Self {
710        match e {
711            fpkg::BlobWrittenError::NotWritten => BlobWrittenError::MissingAfterWritten,
712            fpkg::BlobWrittenError::UnopenedBlob => BlobWrittenError::UnopenedBlob,
713        }
714    }
715}
716
717#[cfg(test)]
718mod tests {
719    use super::*;
720    use assert_matches::assert_matches;
721    use fidl::endpoints::{ClientEnd, ControlHandle as _, RequestStream as _};
722    use fidl_fuchsia_io as fio;
723    use fidl_fuchsia_pkg::{
724        BlobInfoIteratorRequest, NeededBlobsRequest, NeededBlobsRequestStream,
725        PackageCacheGetResponder, PackageCacheMarker, PackageCacheRequest,
726        PackageCacheRequestStream,
727    };
728
729    struct MockPackageCache {
730        stream: PackageCacheRequestStream,
731    }
732
733    impl MockPackageCache {
734        fn new() -> (Client, Self) {
735            let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<PackageCacheMarker>();
736            (Client::from_proxy(proxy), Self { stream })
737        }
738
739        async fn expect_get(
740            &mut self,
741            blob_info: BlobInfo,
742            expected_gc_protection: fpkg::GcProtection,
743        ) -> PendingGet {
744            match self.stream.next().await {
745                Some(Ok(PackageCacheRequest::Get {
746                    meta_far_blob,
747                    gc_protection,
748                    needed_blobs,
749                    dir,
750                    responder,
751                })) => {
752                    assert_eq!(BlobInfo::from(meta_far_blob), blob_info);
753                    assert_eq!(gc_protection, expected_gc_protection);
754                    let needed_blobs = needed_blobs.into_stream();
755                    let dir = dir.into_stream();
756
757                    PendingGet { stream: needed_blobs, dir, responder }
758                }
759                r => panic!("Unexpected request: {r:?}"),
760            }
761        }
762
763        async fn expect_closed(mut self) {
764            assert_matches!(self.stream.next().await, None);
765        }
766    }
767
768    struct PendingGet {
769        stream: NeededBlobsRequestStream,
770        dir: fio::DirectoryRequestStream,
771        responder: PackageCacheGetResponder,
772    }
773
774    impl PendingGet {
775        async fn new() -> (Get, PendingGet) {
776            let (client, mut server) = MockPackageCache::new();
777
778            let get = client.get(blob_info(42), fpkg::GcProtection::OpenPackageTracking).unwrap();
779            let pending_get =
780                server.expect_get(blob_info(42), fpkg::GcProtection::OpenPackageTracking).await;
781            (get, pending_get)
782        }
783
784        fn finish_hold_stream_open(self) -> (NeededBlobsRequestStream, PackageDirProvider) {
785            self.stream.control_handle().shutdown_with_epitaph(Status::OK);
786            self.responder.send(Ok(())).unwrap();
787            (self.stream, PackageDirProvider { stream: self.dir })
788        }
789
790        fn finish(self) -> PackageDirProvider {
791            self.stream.control_handle().shutdown_with_epitaph(Status::OK);
792            self.responder.send(Ok(())).unwrap();
793            PackageDirProvider { stream: self.dir }
794        }
795
796        #[cfg(target_os = "fuchsia")]
797        fn fail_the_get(self) {
798            self.responder
799                .send(Err(Status::IO_INVALID.into_raw()))
800                .expect("client should be waiting");
801        }
802
803        async fn expect_open_meta_blob(
804            mut self,
805            res: Result<Option<ClientEnd<ffxfs::BlobWriterMarker>>, fpkg::OpenBlobError>,
806        ) -> Self {
807            match self.stream.next().await {
808                Some(Ok(NeededBlobsRequest::OpenMetaBlob { responder })) => {
809                    responder.send(res).unwrap();
810                }
811                r => panic!("Unexpected request: {r:?}"),
812            }
813            self
814        }
815
816        async fn expect_open_blob(
817            mut self,
818            expected_blob_id: BlobId,
819            res: Result<Option<ClientEnd<ffxfs::BlobWriterMarker>>, fpkg::OpenBlobError>,
820        ) -> Self {
821            match self.stream.next().await {
822                Some(Ok(NeededBlobsRequest::OpenBlob {
823                    blob_id,
824                    allow_existing: _,
825                    responder,
826                })) => {
827                    assert_eq!(BlobId::from(blob_id), expected_blob_id);
828                    responder.send(res).unwrap();
829                }
830                r => panic!("Unexpected request: {r:?}"),
831            }
832            self
833        }
834
835        async fn expect_get_missing_blobs(mut self, response_chunks: Vec<Vec<BlobInfo>>) -> Self {
836            match self.stream.next().await {
837                Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
838                    let mut stream = iterator.into_stream();
839
840                    // Respond to each next request with the next chunk.
841                    for chunk in response_chunks {
842                        let chunk = chunk.into_iter().map(fpkg::BlobInfo::from).collect::<Vec<_>>();
843
844                        let BlobInfoIteratorRequest::Next { responder } =
845                            stream.next().await.unwrap().unwrap();
846                        responder.send(&chunk).unwrap();
847                    }
848
849                    // Then respond with an empty chunk.
850                    let BlobInfoIteratorRequest::Next { responder } =
851                        stream.next().await.unwrap().unwrap();
852                    responder.send(&[]).unwrap();
853
854                    // Expect the client to stop asking.
855                    assert_matches!(stream.next().await, None);
856                }
857                r => panic!("Unexpected request: {r:?}"),
858            }
859            self
860        }
861
862        async fn expect_get_missing_blobs_client_closes_channel(
863            mut self,
864            response_chunks: Vec<Vec<BlobInfo>>,
865        ) -> Self {
866            match self.stream.next().await {
867                Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
868                    let mut stream = iterator.into_stream();
869
870                    // Respond to each next request with the next chunk.
871                    for chunk in response_chunks {
872                        let chunk = chunk.into_iter().map(fpkg::BlobInfo::from).collect::<Vec<_>>();
873
874                        let BlobInfoIteratorRequest::Next { responder } =
875                            stream.next().await.unwrap().unwrap();
876                        responder.send(&chunk).unwrap();
877                    }
878
879                    // The client closes the channel before we can respond with an empty chunk.
880                    assert_matches!(stream.next().await, None);
881                }
882                r => panic!("Unexpected request: {r:?}"),
883            }
884            self
885        }
886
887        async fn expect_get_missing_blobs_inject_iterator_error(mut self) -> Self {
888            match self.stream.next().await {
889                Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
890                    iterator
891                        .into_stream_and_control_handle()
892                        .1
893                        .shutdown_with_epitaph(Status::ADDRESS_IN_USE);
894                }
895                r => panic!("Unexpected request: {r:?}"),
896            }
897            self
898        }
899
900        #[cfg(target_os = "fuchsia")]
901        async fn expect_abort(mut self) -> Self {
902            match self.stream.next().await {
903                Some(Ok(NeededBlobsRequest::Abort { responder })) => {
904                    responder.send().unwrap();
905                }
906                r => panic!("Unexpected request: {r:?}"),
907            }
908            self
909        }
910    }
911
912    struct PackageDirProvider {
913        stream: fio::DirectoryRequestStream,
914    }
915
916    impl PackageDirProvider {
917        fn close_pkg_dir(self) {
918            self.stream.control_handle().shutdown_with_epitaph(Status::NOT_EMPTY);
919        }
920    }
921
922    fn blob_id(n: u8) -> BlobId {
923        BlobId::from([n; 32])
924    }
925
926    fn blob_info(n: u8) -> BlobInfo {
927        BlobInfo { blob_id: blob_id(n), length: 0 }
928    }
929
930    #[fuchsia_async::run_singlethreaded(test)]
931    async fn constructor() {
932        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<PackageCacheMarker>();
933        let client = Client::from_proxy(proxy);
934
935        drop(stream);
936        assert_matches!(client.proxy().sync().await, Err(_));
937    }
938
939    #[fuchsia_async::run_singlethreaded(test)]
940    async fn get_present_package() {
941        let (client, mut server) = MockPackageCache::new();
942
943        let ((), ()) = future::join(
944            async {
945                server
946                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
947                    .await
948                    .finish()
949                    .close_pkg_dir();
950                server.expect_closed().await;
951            },
952            async move {
953                let mut get =
954                    client.get(blob_info(2), fpkg::GcProtection::OpenPackageTracking).unwrap();
955
956                assert_matches!(get.open_meta_blob().await.unwrap(), None);
957                assert_eq!(get.get_missing_blobs().try_concat().await.unwrap(), vec![]);
958                let pkg_dir = get.finish().await.unwrap();
959
960                assert_matches!(
961                    pkg_dir.into_proxy().take_event_stream().next().await,
962                    Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
963                );
964            },
965        )
966        .await;
967    }
968
969    #[fuchsia_async::run_singlethreaded(test)]
970    async fn get_present_package_handles_slow_stream_close() {
971        let (client, mut server) = MockPackageCache::new();
972
973        let (send, recv) = futures::channel::oneshot::channel::<()>();
974
975        let ((), ()) = future::join(
976            async {
977                let (needed_blobs_stream, pkg_dir) = server
978                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
979                    .await
980                    .finish_hold_stream_open();
981                pkg_dir.close_pkg_dir();
982
983                // wait until `send` is dropped to drop the request stream.
984                let _ = recv.await;
985                drop(needed_blobs_stream);
986            },
987            async move {
988                let mut get =
989                    client.get(blob_info(2), fpkg::GcProtection::OpenPackageTracking).unwrap();
990
991                assert_matches!(get.open_meta_blob().await.unwrap(), None);
992
993                // ensure sending the request doesn't fail, then unblock closing the channel, then
994                // ensure the get_missing_blobs call detects the closed iterator as success instead
995                // of a PEER_CLOSED error.
996                let missing_blobs_stream = get.get_missing_blobs();
997                drop(send);
998                assert_eq!(missing_blobs_stream.try_concat().await.unwrap(), vec![]);
999                let pkg_dir = get.finish().await.unwrap();
1000
1001                assert_matches!(
1002                    pkg_dir.into_proxy().take_event_stream().next().await,
1003                    Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
1004                );
1005            },
1006        )
1007        .await;
1008    }
1009
1010    #[fuchsia_async::run_singlethreaded(test)]
1011    async fn needed_blobs_open_meta_far() {
1012        let (mut get, pending_get) = PendingGet::new().await;
1013
1014        let ((), ()) = future::join(
1015            async {
1016                pending_get
1017                    .expect_open_meta_blob(Ok(None))
1018                    .await
1019                    .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1020                    .await
1021                    .expect_open_meta_blob(Ok(None))
1022                    .await
1023                    .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1024                    .await
1025                    .expect_open_meta_blob(Err(fpkg::OpenBlobError::OutOfSpace))
1026                    .await
1027                    .expect_open_meta_blob(Err(fpkg::OpenBlobError::UnspecifiedIo))
1028                    .await;
1029            },
1030            async {
1031                {
1032                    let opener = get.make_open_meta_blob();
1033                    assert_matches!(opener.open().await.unwrap(), None);
1034                    assert_matches!(opener.open().await.unwrap(), Some(_));
1035                }
1036                assert_matches!(get.open_meta_blob().await.unwrap(), None);
1037                assert_matches!(get.open_meta_blob().await.unwrap(), Some(_));
1038                assert_matches!(get.open_meta_blob().await, Err(OpenBlobError::OutOfSpace));
1039                assert_matches!(get.open_meta_blob().await, Err(OpenBlobError::UnspecifiedIo));
1040            },
1041        )
1042        .await;
1043    }
1044
1045    #[fuchsia_async::run_singlethreaded(test)]
1046    async fn needed_blobs_open_content_blob() {
1047        let (mut get, pending_get) = PendingGet::new().await;
1048
1049        let ((), ()) = future::join(
1050            async {
1051                pending_get
1052                    .expect_open_blob(blob_id(2), Ok(None))
1053                    .await
1054                    .expect_open_blob(blob_id(2), Ok(Some(fidl::endpoints::create_endpoints().0)))
1055                    .await
1056                    .expect_open_blob(blob_id(10), Ok(None))
1057                    .await
1058                    .expect_open_blob(blob_id(11), Ok(Some(fidl::endpoints::create_endpoints().0)))
1059                    .await
1060                    .expect_open_blob(blob_id(12), Err(fpkg::OpenBlobError::OutOfSpace))
1061                    .await
1062                    .expect_open_blob(blob_id(13), Err(fpkg::OpenBlobError::UnspecifiedIo))
1063                    .await;
1064            },
1065            async {
1066                {
1067                    let opener = get.make_open_blob(blob_id(2));
1068                    assert_matches!(opener.open().await.unwrap(), None);
1069                    assert_matches!(opener.open().await.unwrap(), Some(_));
1070                }
1071                assert_matches!(get.open_blob(blob_id(10),).await.unwrap(), None);
1072                assert_matches!(get.open_blob(blob_id(11),).await.unwrap(), Some(_));
1073                assert_matches!(get.open_blob(blob_id(12),).await, Err(OpenBlobError::OutOfSpace));
1074                assert_matches!(
1075                    get.open_blob(blob_id(13),).await,
1076                    Err(OpenBlobError::UnspecifiedIo)
1077                );
1078            },
1079        )
1080        .await;
1081    }
1082
1083    #[fuchsia_async::run_singlethreaded(test)]
1084    async fn needed_blobs_get_missing_blobs_on_closed_ok() {
1085        let (mut get, pending_get) = PendingGet::new().await;
1086        let _ = pending_get.finish();
1087
1088        assert_matches!(get.open_meta_blob().await, Ok(None));
1089        assert_eq!(get.get_missing_blobs().try_concat().await.unwrap(), vec![]);
1090    }
1091
1092    #[fuchsia_async::run_singlethreaded(test)]
1093    async fn needed_blobs_get_missing_blobs() {
1094        let (mut get, pending_get) = PendingGet::new().await;
1095
1096        let ((), ()) = future::join(
1097            async {
1098                pending_get
1099                    .expect_get_missing_blobs(vec![
1100                        vec![blob_info(1), blob_info(2)],
1101                        vec![blob_info(3)],
1102                    ])
1103                    .await;
1104            },
1105            async {
1106                assert_eq!(
1107                    get.get_missing_blobs().try_concat().await.unwrap(),
1108                    vec![blob_info(1), blob_info(2), blob_info(3)]
1109                );
1110            },
1111        )
1112        .await;
1113    }
1114
1115    #[fuchsia_async::run_singlethreaded(test)]
1116    async fn needed_blobs_get_missing_blobs_fail_to_obtain_iterator() {
1117        let (mut get, pending_get) = PendingGet::new().await;
1118        drop(pending_get);
1119
1120        assert_matches!(
1121            get.get_missing_blobs().try_concat().await,
1122            Err(ListMissingBlobsError::CallNextOnBlobIterator(
1123                fidl::Error::ClientChannelClosed{status, ..})
1124            )
1125                if status == Status::PEER_CLOSED
1126        );
1127    }
1128
1129    #[fuchsia_async::run_singlethreaded(test)]
1130    async fn needed_blobs_get_missing_blobs_iterator_contains_error() {
1131        let (mut get, pending_get) = PendingGet::new().await;
1132
1133        let (_, ()) =
1134            future::join(pending_get.expect_get_missing_blobs_inject_iterator_error(), async {
1135                assert_matches!(
1136                    get.get_missing_blobs().try_concat().await,
1137                    Err(ListMissingBlobsError::CallNextOnBlobIterator(
1138                        fidl::Error::ClientChannelClosed{status, ..}
1139                    ))
1140                        if status == Status::ADDRESS_IN_USE
1141                );
1142            })
1143            .await;
1144    }
1145
1146    #[cfg(target_os = "fuchsia")]
1147    #[test]
1148    fn needed_blobs_abort() {
1149        use futures::future::Either;
1150        use futures::pin_mut;
1151        use std::task::Poll;
1152
1153        let mut executor = fuchsia_async::TestExecutor::new_with_fake_time();
1154
1155        let fut = async {
1156            let (get, pending_get) = PendingGet::new().await;
1157
1158            let abort_fut = get.abort().boxed();
1159            let expect_abort_fut = pending_get.expect_abort();
1160            pin_mut!(expect_abort_fut);
1161
1162            match futures::future::select(abort_fut, expect_abort_fut).await {
1163                Either::Left(((), _expect_abort_fut)) => {
1164                    panic!("abort should wait for the get future to complete")
1165                }
1166                Either::Right((pending_get, abort_fut)) => (abort_fut, pending_get),
1167            }
1168        };
1169        pin_mut!(fut);
1170
1171        let (mut abort_fut, pending_get) = match executor.run_until_stalled(&mut fut) {
1172            Poll::Pending => panic!("should complete"),
1173            Poll::Ready((abort_fut, pending_get)) => (abort_fut, pending_get),
1174        };
1175
1176        // NeededBlobs.Abort should wait until PackageCache.Get returns
1177        assert_matches!(executor.run_until_stalled(&mut abort_fut), Poll::Pending);
1178        pending_get.fail_the_get();
1179        assert_matches!(executor.run_until_stalled(&mut abort_fut), Poll::Ready(()));
1180    }
1181
1182    struct MockNeededBlob {
1183        writer: ffxfs::BlobWriterRequestStream,
1184        needed_blobs: fpkg::NeededBlobsRequestStream,
1185        vmo: Option<zx::Vmo>,
1186    }
1187
1188    impl MockNeededBlob {
1189        fn mock_hash() -> BlobId {
1190            [7; 32].into()
1191        }
1192
1193        fn new() -> (NeededBlob, Self) {
1194            let (writer_client, writer) =
1195                fidl::endpoints::create_request_stream::<ffxfs::BlobWriterMarker>();
1196            let (needed_blobs_proxy, needed_blobs) =
1197                fidl::endpoints::create_proxy_and_stream::<fpkg::NeededBlobsMarker>();
1198            (
1199                NeededBlob {
1200                    blob: Blob {
1201                        needed_blobs: needed_blobs_proxy,
1202                        blob_id: Self::mock_hash(),
1203                        state: NeedsTruncate(writer_client),
1204                    },
1205                },
1206                Self { writer, needed_blobs, vmo: None },
1207            )
1208        }
1209
1210        async fn fail_get_vmo(mut self) -> Self {
1211            match self.writer.next().await {
1212                Some(Ok(ffxfs::BlobWriterRequest::GetVmo { size: _, responder })) => {
1213                    responder.send(Err(Status::NO_SPACE.into_raw())).unwrap();
1214                }
1215                r => panic!("Unexpected request: {r:?}"),
1216            }
1217            self
1218        }
1219
1220        async fn expect_get_vmo(mut self, expected_size: u64) -> Self {
1221            match self.writer.next().await {
1222                Some(Ok(ffxfs::BlobWriterRequest::GetVmo { size, responder })) => {
1223                    assert_eq!(size, expected_size);
1224                    let vmo = zx::Vmo::create(size).unwrap();
1225                    assert!(self.vmo.is_none());
1226                    self.vmo = Some(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap());
1227                    responder.send(Ok(vmo)).unwrap();
1228                }
1229                r => panic!("Unexpected request: {r:?}"),
1230            }
1231            self
1232        }
1233
1234        async fn fail_bytes_ready(mut self) -> Self {
1235            match self.writer.next().await {
1236                Some(Ok(ffxfs::BlobWriterRequest::BytesReady { bytes_written: _, responder })) => {
1237                    responder.send(Err(Status::NO_SPACE.into_raw())).unwrap();
1238                }
1239                r => panic!("Unexpected request: {r:?}"),
1240            }
1241            self
1242        }
1243
1244        async fn expect_bytes_ready(mut self, expected_payload: &[u8], offset: u64) -> Self {
1245            match self.writer.next().await {
1246                Some(Ok(ffxfs::BlobWriterRequest::BytesReady { bytes_written, responder })) => {
1247                    assert_eq!(bytes_written, u64::try_from(expected_payload.len()).unwrap());
1248
1249                    let vmo = self.vmo.as_ref().unwrap();
1250                    let mut buf = vec![0; expected_payload.len()];
1251                    let () = vmo.read(&mut buf, offset).unwrap();
1252                    assert_eq!(buf, expected_payload);
1253
1254                    responder.send(Ok(())).unwrap();
1255                }
1256                r => panic!("Unexpected request: {r:?}"),
1257            }
1258            self
1259        }
1260
1261        async fn expect_blob_written(mut self) -> Self {
1262            match self.needed_blobs.next().await {
1263                Some(Ok(fpkg::NeededBlobsRequest::BlobWritten { blob_id, responder })) => {
1264                    assert_eq!(blob_id, Self::mock_hash().into());
1265                    responder.send(Ok(())).unwrap();
1266                }
1267                r => panic!("Unexpected request: {r:?}"),
1268            }
1269            self
1270        }
1271    }
1272
1273    #[fuchsia_async::run_singlethreaded(test)]
1274    async fn empty_blob_write() {
1275        let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1276
1277        let ((), ()) = future::join(
1278            async {
1279                blob_server.expect_get_vmo(0).await.expect_blob_written().await;
1280            },
1281            async {
1282                let blob = match blob.truncate(0).await.unwrap() {
1283                    TruncateBlobSuccess::AllWritten(blob) => blob,
1284                    other => panic!("empty blob shouldn't need bytes {other:?}"),
1285                };
1286                let () = blob.blob_written().await.unwrap();
1287            },
1288        )
1289        .await;
1290    }
1291
1292    impl TruncateBlobSuccess {
1293        fn unwrap_needs_data(self) -> Blob<NeedsData> {
1294            match self {
1295                TruncateBlobSuccess::NeedsData(blob) => blob,
1296                TruncateBlobSuccess::AllWritten(_) => panic!("blob should need data"),
1297            }
1298        }
1299    }
1300
1301    impl BlobWriteSuccess {
1302        fn unwrap_needs_data(self) -> Blob<NeedsData> {
1303            match self {
1304                BlobWriteSuccess::NeedsData(blob) => blob,
1305                BlobWriteSuccess::AllWritten(_) => panic!("blob should need data"),
1306            }
1307        }
1308
1309        fn unwrap_all_written(self) -> Blob<NeedsBlobWritten> {
1310            match self {
1311                BlobWriteSuccess::NeedsData(_) => panic!("blob should be completely written"),
1312                BlobWriteSuccess::AllWritten(blob) => blob,
1313            }
1314        }
1315    }
1316
1317    #[fuchsia_async::run_singlethreaded(test)]
1318    async fn small_blob_write() {
1319        let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1320
1321        let ((), ()) = future::join(
1322            async {
1323                blob_server
1324                    .expect_get_vmo(4)
1325                    .await
1326                    .expect_bytes_ready(b"test", 0)
1327                    .await
1328                    .expect_blob_written()
1329                    .await;
1330            },
1331            async {
1332                let blob = blob.truncate(4).await.unwrap().unwrap_needs_data();
1333                let blob = blob.write(b"test").await.unwrap().unwrap_all_written();
1334                let () = blob.blob_written().await.unwrap();
1335            },
1336        )
1337        .await;
1338    }
1339
1340    #[fuchsia_async::run_singlethreaded(test)]
1341    async fn blob_truncate_no_space() {
1342        let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1343
1344        let ((), ()) = future::join(
1345            async {
1346                blob_server.fail_get_vmo().await;
1347            },
1348            async {
1349                assert_matches!(blob.truncate(4).await, Err(TruncateBlobError::NoSpace));
1350            },
1351        )
1352        .await;
1353    }
1354
1355    #[fuchsia_async::run_singlethreaded(test)]
1356    async fn blob_write_no_space() {
1357        let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1358
1359        let ((), ()) = future::join(
1360            async {
1361                blob_server.expect_get_vmo(4).await.fail_bytes_ready().await;
1362            },
1363            async {
1364                let blob = blob.truncate(4).await.unwrap().unwrap_needs_data();
1365                assert_matches!(blob.write(b"test").await, Err(WriteBlobError::NoSpace));
1366            },
1367        )
1368        .await;
1369    }
1370
1371    #[fuchsia_async::run_singlethreaded(test)]
1372    async fn blob_write_multiple_write() {
1373        let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1374
1375        let ((), ()) = future::join(
1376            async {
1377                blob_server
1378                    .expect_get_vmo(6)
1379                    .await
1380                    .expect_bytes_ready(b"abc", 0)
1381                    .await
1382                    .expect_bytes_ready(b"123", 3)
1383                    .await
1384                    .expect_blob_written()
1385                    .await;
1386            },
1387            async {
1388                let blob = blob.truncate(6).await.unwrap().unwrap_needs_data();
1389                let blob = blob.write(b"abc").await.unwrap().unwrap_needs_data();
1390                let blob = blob.write(b"123").await.unwrap().unwrap_all_written();
1391                let () = blob.blob_written().await.unwrap();
1392            },
1393        )
1394        .await;
1395    }
1396
1397    #[fuchsia_async::run_singlethreaded(test)]
1398    async fn get_already_cached_success() {
1399        let (client, mut server) = MockPackageCache::new();
1400
1401        let ((), ()) = future::join(
1402            async {
1403                server
1404                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1405                    .await
1406                    .finish()
1407                    .close_pkg_dir();
1408                server.expect_closed().await;
1409            },
1410            async move {
1411                let pkg_dir = client.get_already_cached(blob_id(2)).await.unwrap();
1412
1413                assert_matches!(
1414                    pkg_dir.into_proxy().take_event_stream().next().await,
1415                    Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
1416                );
1417            },
1418        )
1419        .await;
1420    }
1421
1422    #[fuchsia_async::run_singlethreaded(test)]
1423    async fn get_already_cached_missing_meta_far() {
1424        let (client, mut server) = MockPackageCache::new();
1425
1426        let ((), ()) = future::join(
1427            async {
1428                server
1429                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1430                    .await
1431                    .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1432                    .await;
1433            },
1434            async move {
1435                assert_matches!(
1436                    client.get_already_cached(blob_id(2)).await,
1437                    Err(GetAlreadyCachedError::MissingMetaFar)
1438                );
1439            },
1440        )
1441        .await;
1442    }
1443
1444    #[fuchsia_async::run_singlethreaded(test)]
1445    async fn get_already_cached_missing_content_blob() {
1446        let (client, mut server) = MockPackageCache::new();
1447
1448        let ((), ()) = future::join(
1449            async {
1450                server
1451                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1452                    .await
1453                    .expect_open_meta_blob(Ok(None))
1454                    .await
1455                    .expect_get_missing_blobs_client_closes_channel(vec![vec![BlobInfo {
1456                        blob_id: [0; 32].into(),
1457                        length: 0,
1458                    }]])
1459                    .await;
1460            },
1461            async move {
1462                assert_matches!(
1463                    client.get_already_cached(blob_id(2)).await,
1464                    Err(GetAlreadyCachedError::MissingContentBlobs(v))
1465                        if v == vec![BlobInfo {
1466                            blob_id: [0; 32].into(),
1467                            length: 0,
1468                        }]
1469                );
1470            },
1471        )
1472        .await;
1473    }
1474}