fidl_fuchsia_pkg_ext/
cache.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![cfg(target_os = "fuchsia")]
6#![deny(missing_docs)]
7
8//! Wrapper types for [`fidl_fuchsia_pkg::PackageCacheProxy`] and its related protocols.
9
10use crate::types::{BlobId, BlobInfo};
11use fuchsia_pkg::PackageDirectory;
12use futures::prelude::*;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicBool, Ordering};
15use zx_status::Status;
16use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_pkg as fpkg};
17
18/// An open connection to a provider of the `fuchsia.pkg.PackageCache`.
19#[derive(Debug, Clone)]
20pub struct Client {
21    proxy: fpkg::PackageCacheProxy,
22}
23
24impl Client {
25    /// Constructs a client from the given proxy.
26    pub fn from_proxy(proxy: fpkg::PackageCacheProxy) -> Self {
27        Self { proxy }
28    }
29
30    /// Returns a reference to the underlying PackageCacheProxy connection.
31    pub fn proxy(&self) -> &fpkg::PackageCacheProxy {
32        &self.proxy
33    }
34
35    /// Opens the package specified by `meta_far_blob` with the intent to fetch any missing blobs
36    /// using the returned [`Get`] type if needed.
37    pub fn get(
38        &self,
39        meta_far_blob: BlobInfo,
40        gc_protection: fpkg::GcProtection,
41    ) -> Result<Get, fidl::Error> {
42        let (needed_blobs, needed_blobs_server_end) =
43            fidl::endpoints::create_proxy::<fpkg::NeededBlobsMarker>();
44        let (pkg_dir, pkg_dir_server_end) = PackageDirectory::create_request()?;
45
46        let get_fut = self.proxy.get(
47            &meta_far_blob.into(),
48            gc_protection,
49            needed_blobs_server_end,
50            pkg_dir_server_end,
51        );
52
53        Ok(Get {
54            get_fut,
55            pkg_dir,
56            needed_blobs,
57            pkg_present: SharedBoolEvent::new(),
58            meta_far: meta_far_blob.blob_id,
59        })
60    }
61
62    /// Uses PackageCache.Get to obtain the package directory of a package that is already cached
63    /// (all blobs are already in blobfs).
64    /// Errors if the package is not already cached.
65    /// Always uses open package tracking GC protection, because OTA (the only client of Retained
66    /// GC protection), should never need to get an already cached package.
67    ///
68    /// Compared to `get_cached`:
69    ///   * Activates `meta_far_blob` in the dynamic index
70    ///   * Must not be called concurrently with the same `meta_far_blob`
71    pub async fn get_already_cached(
72        &self,
73        meta_far_blob: BlobId,
74    ) -> Result<PackageDirectory, GetAlreadyCachedError> {
75        let mut get = self
76            .get(
77                BlobInfo { blob_id: meta_far_blob, length: 0 },
78                fpkg::GcProtection::OpenPackageTracking,
79            )
80            .map_err(GetAlreadyCachedError::Get)?;
81        if let Some(_) = get.open_meta_blob().await.map_err(GetAlreadyCachedError::OpenMetaBlob)? {
82            return Err(GetAlreadyCachedError::MissingMetaFar);
83        }
84
85        if let Some(missing_blobs) = get
86            .get_missing_blobs()
87            .try_next()
88            .await
89            .map_err(GetAlreadyCachedError::GetMissingBlobs)?
90        {
91            return Err(GetAlreadyCachedError::MissingContentBlobs(missing_blobs));
92        }
93
94        get.finish().await.map_err(GetAlreadyCachedError::FinishGet)
95    }
96
97    /// Uses PackageCache.GetSubpackage to obtain the package directory of a subpackage.
98    /// Errors if there is not an open connection to the superpackage.
99    pub async fn get_subpackage(
100        &self,
101        superpackage: BlobId,
102        subpackage: &fuchsia_url::RelativePackageUrl,
103    ) -> Result<PackageDirectory, GetSubpackageError> {
104        let (dir, dir_server_end) =
105            PackageDirectory::create_request().map_err(GetSubpackageError::CreatingHandles)?;
106        let () = self
107            .proxy
108            .get_subpackage(
109                &superpackage.into(),
110                &fpkg::PackageUrl { url: subpackage.into() },
111                dir_server_end,
112            )
113            .await
114            .map_err(GetSubpackageError::CallingGetSubpackage)??;
115        Ok(dir)
116    }
117
118    /// Write blobs using the returned [`WriteBlobs`] type.
119    pub fn write_blobs(&self) -> Result<WriteBlobs, fidl::Error> {
120        let (needed_blobs, needed_blobs_server_end) =
121            fidl::endpoints::create_proxy::<fpkg::NeededBlobsMarker>();
122
123        let () = self.proxy.write_blobs(needed_blobs_server_end)?;
124
125        Ok(WriteBlobs { needed_blobs })
126    }
127}
128
129#[derive(thiserror::Error, Debug)]
130#[allow(missing_docs)]
131pub enum GetAlreadyCachedError {
132    #[error("calling get")]
133    Get(#[source] fidl::Error),
134
135    #[error("opening meta blob")]
136    OpenMetaBlob(#[source] OpenBlobError),
137
138    #[error("meta.far blob not cached")]
139    MissingMetaFar,
140
141    #[error("getting missing blobs")]
142    GetMissingBlobs(#[source] ListMissingBlobsError),
143
144    #[error("content blobs not cached {0:?}")]
145    MissingContentBlobs(Vec<BlobInfo>),
146
147    #[error("finishing get")]
148    FinishGet(#[source] GetError),
149}
150
151impl GetAlreadyCachedError {
152    /// Returns true if the get failed because the package was not cached.
153    pub fn was_not_cached(&self) -> bool {
154        use GetAlreadyCachedError::*;
155        match self {
156            Get(..) | OpenMetaBlob(..) | GetMissingBlobs(..) | FinishGet(..) => false,
157            MissingMetaFar | MissingContentBlobs(..) => true,
158        }
159    }
160}
161
162#[derive(thiserror::Error, Debug)]
163#[allow(missing_docs)]
164pub enum GetSubpackageError {
165    #[error("creating handles")]
166    CreatingHandles(#[source] fidl::Error),
167
168    #[error("calling GetCached FIDL")]
169    CallingGetSubpackage(#[source] fidl::Error),
170
171    #[error("the superpackage does not have an open package connection")]
172    SuperpackageClosed,
173
174    #[error("the subpackage does not exist")]
175    DoesNotExist,
176
177    #[error("internal")]
178    Internal,
179}
180
181impl From<fpkg::GetSubpackageError> for GetSubpackageError {
182    fn from(fidl: fpkg::GetSubpackageError) -> Self {
183        use GetSubpackageError::*;
184        use fpkg::GetSubpackageError as fErr;
185        match fidl {
186            fErr::SuperpackageClosed => SuperpackageClosed,
187            fErr::DoesNotExist => DoesNotExist,
188            fErr::Internal => Internal,
189        }
190    }
191}
192
193#[derive(Debug, Clone)]
194struct SharedBoolEvent(Arc<AtomicBool>);
195
196impl SharedBoolEvent {
197    fn new() -> Self {
198        Self(Arc::new(AtomicBool::new(false)))
199    }
200
201    fn get(&self) -> bool {
202        self.0.load(Ordering::SeqCst)
203    }
204
205    fn set(&self) {
206        self.0.store(true, Ordering::SeqCst)
207    }
208}
209
210async fn open_blob(
211    needed_blobs: &fpkg::NeededBlobsProxy,
212    kind: OpenKind,
213    blob_id: BlobId,
214    pkg_present: Option<&SharedBoolEvent>,
215    allow_existing: bool,
216) -> Result<Option<NeededBlob>, OpenBlobError> {
217    let open_fut = match kind {
218        OpenKind::Meta => needed_blobs.open_meta_blob(),
219        OpenKind::Content => needed_blobs.open_blob(&blob_id.into(), allow_existing),
220    };
221    match open_fut.await {
222        Err(fidl::Error::ClientChannelClosed { status: Status::OK, .. }) => {
223            if let Some(pkg_present) = pkg_present {
224                pkg_present.set();
225            }
226            Ok(None)
227        }
228        res => {
229            if let Some(blob) = res?? {
230                Ok(Some(NeededBlob {
231                    blob: Blob {
232                        needed_blobs: needed_blobs.clone(),
233                        blob_id,
234                        state: NeedsTruncate(blob),
235                    },
236                }))
237            } else {
238                Ok(None)
239            }
240        }
241    }
242}
243
244#[derive(Debug, Clone, Copy, PartialEq, Eq)]
245enum OpenKind {
246    Meta,
247    Content,
248}
249
250/// A deferred call to [`Get::open_meta_blob`] or [`Get::open_blob`].
251#[derive(Debug)]
252pub struct DeferredOpenBlob {
253    needed_blobs: fpkg::NeededBlobsProxy,
254    kind: OpenKind,
255    blob_id: BlobId,
256    allow_existing: bool,
257    pkg_present: Option<SharedBoolEvent>,
258}
259
260impl DeferredOpenBlob {
261    /// Opens the blob for write, if it is still needed. The blob's data can be provided using the
262    /// returned NeededBlob.
263    pub async fn open(&self) -> Result<Option<NeededBlob>, OpenBlobError> {
264        open_blob(
265            &self.needed_blobs,
266            self.kind,
267            self.blob_id,
268            self.pkg_present.as_ref(),
269            self.allow_existing,
270        )
271        .await
272    }
273
274    fn proxy_cmp_key(&self) -> u32 {
275        use fidl::AsHandleRef;
276        use fidl::endpoints::Proxy;
277        self.needed_blobs.as_channel().raw_handle()
278    }
279}
280
281impl std::cmp::PartialEq for DeferredOpenBlob {
282    fn eq(&self, other: &Self) -> bool {
283        self.proxy_cmp_key() == other.proxy_cmp_key() && self.kind == other.kind
284    }
285}
286
287impl std::cmp::Eq for DeferredOpenBlob {}
288
289/// A pending `fuchsia.pkg/PackageCache.Get()` request. Clients must, in order:
290/// 1. open/write the meta blob, if Some(NeededBlob) is provided by that API
291/// 2. enumerate all missing content blobs
292/// 3. open/write all missing content blobs, if Some(NeededBlob) is provided by that API
293/// 4. finish() to complete the Get() request.
294#[derive(Debug)]
295pub struct Get {
296    get_fut: fidl::client::QueryResponseFut<Result<(), i32>>,
297    needed_blobs: fpkg::NeededBlobsProxy,
298    pkg_dir: PackageDirectory,
299    pkg_present: SharedBoolEvent,
300    meta_far: BlobId,
301}
302
303impl Get {
304    /// Returns an independent object that can be used to open the meta blob for write.  See
305    /// [`Self::open_meta_blob`].
306    pub fn make_open_meta_blob(&mut self) -> DeferredOpenBlob {
307        DeferredOpenBlob {
308            needed_blobs: self.needed_blobs.clone(),
309            kind: OpenKind::Meta,
310            blob_id: self.meta_far,
311            allow_existing: false,
312            pkg_present: Some(self.pkg_present.clone()),
313        }
314    }
315
316    /// Opens the meta blob for write, if it is still needed. The blob's data can be provided using
317    /// the returned NeededBlob.
318    pub async fn open_meta_blob(&mut self) -> Result<Option<NeededBlob>, OpenBlobError> {
319        open_blob(&self.needed_blobs, OpenKind::Meta, self.meta_far, Some(&self.pkg_present), false)
320            .await
321    }
322
323    fn start_get_missing_blobs(
324        &mut self,
325    ) -> Result<Option<fpkg::BlobInfoIteratorProxy>, fidl::Error> {
326        if self.pkg_present.get() {
327            return Ok(None);
328        }
329
330        let (blob_iterator, blob_iterator_server_end) =
331            fidl::endpoints::create_proxy::<fpkg::BlobInfoIteratorMarker>();
332
333        self.needed_blobs.get_missing_blobs(blob_iterator_server_end)?;
334        Ok(Some(blob_iterator))
335    }
336
337    /// Determines the set of blobs that the caller must open/write to complete this `Get()`
338    /// operation.
339    /// The returned stream will never yield an empty `Vec`.
340    /// Callers should process the missing blobs (via `make_open_blob` or `open_blob`) concurrently
341    /// with reading the stream to guarantee stream termination.
342    pub fn get_missing_blobs(
343        &mut self,
344    ) -> impl Stream<Item = Result<Vec<BlobInfo>, ListMissingBlobsError>> + Unpin + use<> {
345        match self.start_get_missing_blobs() {
346            Ok(option_iter) => match option_iter {
347                Some(iterator) => crate::fidl_iterator_to_stream(iterator)
348                    .map_ok(|v| v.into_iter().map(BlobInfo::from).collect())
349                    .map_err(ListMissingBlobsError::CallNextOnBlobIterator)
350                    .left_stream(),
351                None => futures::stream::empty().right_stream(),
352            }
353            .left_stream(),
354            Err(e) => {
355                futures::stream::iter(Some(Err(ListMissingBlobsError::CallGetMissingBlobs(e))))
356                    .right_stream()
357            }
358        }
359    }
360
361    /// Returns an independent object that can be used to open the `content_blob` for write.  See
362    /// [`Self::open_blob`].
363    pub fn make_open_blob(&mut self, content_blob: BlobId) -> DeferredOpenBlob {
364        DeferredOpenBlob {
365            needed_blobs: self.needed_blobs.clone(),
366            kind: OpenKind::Content,
367            blob_id: content_blob,
368            allow_existing: false,
369            pkg_present: None,
370        }
371    }
372
373    /// Opens `content_blob` for write, if it is still needed. The blob's data can be provided
374    /// using the returned NeededBlob.
375    pub async fn open_blob(
376        &mut self,
377        content_blob: BlobId,
378    ) -> Result<Option<NeededBlob>, OpenBlobError> {
379        open_blob(&self.needed_blobs, OpenKind::Content, content_blob, None, false).await
380    }
381
382    /// Notifies the endpoint that all blobs have been written and wait for the response to the
383    /// pending `Get()` request, returning the cached [`PackageDirectory`].
384    pub async fn finish(self) -> Result<PackageDirectory, GetError> {
385        drop(self.needed_blobs);
386        let () = self.get_fut.await?.map_err(Status::from_raw)?;
387        Ok(self.pkg_dir)
388    }
389
390    /// Aborts this caching operation for the package.
391    pub async fn abort(self) {
392        self.needed_blobs.abort().map(|_: Result<(), fidl::Error>| ()).await;
393        // The package is not guaranteed to be removed from the dynamic index after abort
394        // returns, we have to wait until finish returns (to prevent a resolve retry from
395        // racing). The finish call will return an error that just tells us that we called
396        // abort, so we ignore it.
397        let _ = self.get_fut.await;
398    }
399}
400
401/// A pending `fuchsia.pkg/PackageCache.WriteBlob()` request.
402#[derive(Clone, Debug)]
403pub struct WriteBlobs {
404    needed_blobs: fpkg::NeededBlobsProxy,
405}
406
407impl WriteBlobs {
408    /// Returns an independent object that can be used to open the `blob` for write.  See
409    /// [`Self::open_blob`].
410    pub fn make_open_blob(&mut self, blob: BlobId, allow_existing: bool) -> DeferredOpenBlob {
411        DeferredOpenBlob {
412            needed_blobs: self.needed_blobs.clone(),
413            kind: OpenKind::Content,
414            blob_id: blob,
415            allow_existing,
416            pkg_present: None,
417        }
418    }
419
420    /// Opens `blob` for write. The blob's data can be provided using the returned NeededBlob.
421    pub async fn open_blob(
422        &mut self,
423        blob: BlobId,
424        allow_existing: bool,
425    ) -> Result<Option<NeededBlob>, OpenBlobError> {
426        open_blob(&self.needed_blobs, OpenKind::Content, blob, None, allow_existing).await
427    }
428}
429
430/// A blob that needs to be written.
431#[derive(Debug)]
432pub struct NeededBlob {
433    /// Typestate wrapper around the blob. Clients must first call truncate(), then write() until
434    /// all data is provided.
435    pub blob: Blob<NeedsTruncate>,
436}
437
438/// The successful result of truncating a blob.
439#[derive(Debug)]
440pub enum TruncateBlobSuccess {
441    /// The blob contents need to be written.
442    NeedsData(Blob<NeedsData>),
443
444    /// The blob is fully written (it was the empty blob) and now a
445    /// fuchsia.pkg.NeededBlobs.BlobWritten message should be sent.
446    AllWritten(Blob<NeedsBlobWritten>),
447}
448
449/// The successful result of writing some data to a blob.
450#[derive(Debug)]
451pub enum BlobWriteSuccess {
452    /// There is still more data to write.
453    NeedsData(Blob<NeedsData>),
454
455    /// The blob is fully written and now a fuchsia.pkg.NeededBlobs.BlobWritten
456    /// message should be sent.
457    AllWritten(Blob<NeedsBlobWritten>),
458}
459
460/// State for a blob that can be truncated.
461#[derive(Debug)]
462pub struct NeedsTruncate(fidl::endpoints::ClientEnd<ffxfs::BlobWriterMarker>);
463
464/// State for a blob that can be written to.
465#[derive(Debug)]
466pub struct NeedsData {
467    size: u64,
468    written: u64,
469    writer: blob_writer::BlobWriter,
470}
471
472/// State for a blob that has been fully written but that needs a
473/// fuchsia.pkg.NeededBlobs.BlobWritten message sent to pkg-cache.
474#[derive(Debug)]
475pub struct NeedsBlobWritten;
476
477/// A blob in the process of being written.
478#[derive(Debug)]
479#[must_use]
480pub struct Blob<S> {
481    needed_blobs: fpkg::NeededBlobsProxy,
482    blob_id: BlobId,
483    state: S,
484}
485
486impl Blob<NeedsTruncate> {
487    /// Truncates the blob to the given size. On success, the blob enters the writable state.
488    pub async fn truncate(self, size: u64) -> Result<TruncateBlobSuccess, TruncateBlobError> {
489        let Self { needed_blobs, blob_id, state: NeedsTruncate(blob) } = self;
490
491        let writer = blob_writer::BlobWriter::create(blob.into_proxy(), size).await.map_err(
492            |e| match e {
493                blob_writer::CreateError::GetVmo(s) if s == Status::NO_SPACE => {
494                    TruncateBlobError::NoSpace
495                }
496                _ => TruncateBlobError::CreateBlobWriter(e),
497            },
498        )?;
499
500        Ok(if size == 0 {
501            TruncateBlobSuccess::AllWritten(Blob { needed_blobs, blob_id, state: NeedsBlobWritten })
502        } else {
503            TruncateBlobSuccess::NeedsData(Blob {
504                needed_blobs,
505                blob_id,
506                state: NeedsData { size, written: 0, writer },
507            })
508        })
509    }
510
511    /// Return the fields for manual handling of the protocol.
512    ///
513    /// The returned BlobWriter client end is in the initial state of the protocol (GetVmo has not
514    /// been called).
515    ///
516    /// Once the blob has been written through the BlobWriter, BlobWritten should be called on the
517    /// returned NeededBlobs proxy. The returned NeededBlobs proxy should not otherwise be used.
518    pub fn deconstruct(
519        self,
520    ) -> (fpkg::NeededBlobsProxy, BlobId, fidl::endpoints::ClientEnd<ffxfs::BlobWriterMarker>) {
521        let Blob { needed_blobs, blob_id, state: NeedsTruncate(client) } = self;
522        (needed_blobs, blob_id, client)
523    }
524}
525
526impl Blob<NeedsData> {
527    /// Writes all of the given buffer to the blob.
528    ///
529    /// # Panics
530    ///
531    /// Panics if a write is attempted with a buf larger than the remaining blob size.
532    pub fn write(
533        self,
534        buf: &[u8],
535    ) -> impl Future<Output = Result<BlobWriteSuccess, WriteBlobError>> + '_ {
536        self.write_with_trace_callbacks(buf, &|_| {}, &|| {})
537    }
538
539    /// Writes all of the given buffer to the blob.
540    ///
541    /// `after_write` and `after_write_ack` are called before and after, respectively, waiting for
542    /// the server to acknowledge writes.
543    /// They may be called multiple times if the write of `buf` is chunked.
544    /// `after_write` is given the size of each write in bytes.
545    /// Useful for creating trace spans.
546    ///
547    /// # Panics
548    ///
549    /// Panics if a write is attempted with a buf larger than the remaining blob size.
550    pub async fn write_with_trace_callbacks(
551        mut self,
552        buf: &[u8],
553        after_write: &(dyn Fn(u64) + Send + Sync),
554        after_write_ack: &(dyn Fn() + Send + Sync),
555    ) -> Result<BlobWriteSuccess, WriteBlobError> {
556        assert!(self.state.written + buf.len() as u64 <= self.state.size);
557
558        let fut = self.state.writer.write(buf);
559        let () = after_write(buf.len() as u64);
560        let res = fut.await;
561        let () = after_write_ack();
562        let () = res.map_err(|e| match e {
563            e @ blob_writer::WriteError::BytesReady(s) => match s {
564                Status::IO_DATA_INTEGRITY => WriteBlobError::Corrupt,
565                Status::NO_SPACE => WriteBlobError::NoSpace,
566                _ => WriteBlobError::FxBlob(e),
567            },
568            e => WriteBlobError::FxBlob(e),
569        })?;
570
571        self.state.written += buf.len() as u64;
572
573        if self.state.written == self.state.size {
574            let Self { needed_blobs, blob_id, state: _ } = self;
575            Ok(BlobWriteSuccess::AllWritten(Blob {
576                needed_blobs,
577                blob_id,
578                state: NeedsBlobWritten,
579            }))
580        } else {
581            Ok(BlobWriteSuccess::NeedsData(self))
582        }
583    }
584}
585
586impl Blob<NeedsBlobWritten> {
587    /// Tells pkg-cache that the blob has been successfully written and can now be read.
588    pub async fn blob_written(self) -> Result<(), BlobWrittenError> {
589        Ok(self.needed_blobs.blob_written(&self.blob_id.into()).await??)
590    }
591}
592
593/// An error encountered while opening a package.
594#[derive(Debug, thiserror::Error)]
595#[allow(missing_docs)]
596pub enum OpenError {
597    #[error("the package does not exist")]
598    NotFound,
599
600    #[error("Open() responded with an unexpected status")]
601    UnexpectedResponse(#[source] Status),
602
603    #[error("transport error")]
604    Fidl(#[from] fidl::Error),
605}
606/// An error encountered while caching a package.
607#[derive(Debug, thiserror::Error)]
608#[allow(missing_docs)]
609pub enum GetError {
610    #[error("Get() responded with an unexpected status")]
611    UnexpectedResponse(#[from] Status),
612
613    #[error("transport error")]
614    Fidl(#[from] fidl::Error),
615}
616
617/// An error encountered while opening a metadata or content blob for write.
618#[derive(Debug, thiserror::Error)]
619#[allow(missing_docs)]
620pub enum OpenBlobError {
621    #[error("there is insufficient storage space available to persist this blob")]
622    OutOfSpace,
623
624    #[error("this blob is already open for write by another cache operation")]
625    ConcurrentWrite,
626
627    #[error("an unspecified error occurred during underlying I/O")]
628    UnspecifiedIo,
629
630    #[error("an unspecified error occurred")]
631    Internal,
632
633    #[error("transport error")]
634    Fidl(#[from] fidl::Error),
635}
636
637impl From<fpkg::OpenBlobError> for OpenBlobError {
638    fn from(e: fpkg::OpenBlobError) -> Self {
639        match e {
640            fpkg::OpenBlobError::OutOfSpace => OpenBlobError::OutOfSpace,
641            fpkg::OpenBlobError::ConcurrentWrite => OpenBlobError::ConcurrentWrite,
642            fpkg::OpenBlobError::UnspecifiedIo => OpenBlobError::UnspecifiedIo,
643            fpkg::OpenBlobError::Internal => OpenBlobError::Internal,
644        }
645    }
646}
647
648/// An error encountered while enumerating missing content blobs.
649#[derive(Debug, thiserror::Error)]
650#[allow(missing_docs)]
651pub enum ListMissingBlobsError {
652    #[error("while obtaining the missing blobs fidl iterator")]
653    CallGetMissingBlobs(#[source] fidl::Error),
654
655    #[error("while obtaining the next chunk of blobs from the fidl iterator")]
656    CallNextOnBlobIterator(#[source] fidl::Error),
657}
658
659/// An error encountered while truncating a blob
660#[derive(Debug, thiserror::Error)]
661#[allow(missing_docs)]
662pub enum TruncateBlobError {
663    #[error("insufficient storage space is available")]
664    NoSpace,
665
666    #[error("creating blob writer")]
667    CreateBlobWriter(#[source] blob_writer::CreateError),
668
669    #[error("transport error")]
670    Fidl(#[from] fidl::Error),
671
672    #[error("blob is in an invalid state")]
673    BadState,
674}
675
676/// An error encountered while writing a blob.
677#[derive(Debug, thiserror::Error)]
678#[allow(missing_docs)]
679pub enum WriteBlobError {
680    #[error("the written data was corrupt")]
681    Corrupt,
682
683    #[error("insufficient storage space is available")]
684    NoSpace,
685
686    #[error("transport error")]
687    Fidl(#[from] fidl::Error),
688
689    #[error("while using the fxblob writer")]
690    FxBlob(#[source] blob_writer::WriteError),
691}
692
693/// An error encountered while sending the BlobWritten message.
694#[derive(Debug, thiserror::Error)]
695#[allow(missing_docs)]
696pub enum BlobWrittenError {
697    #[error("pkg-cache could not find the blob after it was successfully written")]
698    MissingAfterWritten,
699
700    #[error("NeededBlobs.BlobWritten was called before the blob was opened")]
701    UnopenedBlob,
702
703    #[error("transport error")]
704    Fidl(#[from] fidl::Error),
705}
706
707impl From<fpkg::BlobWrittenError> for BlobWrittenError {
708    fn from(e: fpkg::BlobWrittenError) -> Self {
709        match e {
710            fpkg::BlobWrittenError::NotWritten => BlobWrittenError::MissingAfterWritten,
711            fpkg::BlobWrittenError::UnopenedBlob => BlobWrittenError::UnopenedBlob,
712        }
713    }
714}
715
716#[cfg(test)]
717mod tests {
718    use super::*;
719    use assert_matches::assert_matches;
720    use fidl::endpoints::{ClientEnd, ControlHandle as _, RequestStream as _};
721    use fidl_fuchsia_io as fio;
722    use fidl_fuchsia_pkg::{
723        BlobInfoIteratorRequest, NeededBlobsRequest, NeededBlobsRequestStream,
724        PackageCacheGetResponder, PackageCacheMarker, PackageCacheRequest,
725        PackageCacheRequestStream,
726    };
727    use zx::HandleBased as _;
728
729    struct MockPackageCache {
730        stream: PackageCacheRequestStream,
731    }
732
733    impl MockPackageCache {
734        fn new() -> (Client, Self) {
735            let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<PackageCacheMarker>();
736            (Client::from_proxy(proxy), Self { stream })
737        }
738
739        async fn expect_get(
740            &mut self,
741            blob_info: BlobInfo,
742            expected_gc_protection: fpkg::GcProtection,
743        ) -> PendingGet {
744            match self.stream.next().await {
745                Some(Ok(PackageCacheRequest::Get {
746                    meta_far_blob,
747                    gc_protection,
748                    needed_blobs,
749                    dir,
750                    responder,
751                })) => {
752                    assert_eq!(BlobInfo::from(meta_far_blob), blob_info);
753                    assert_eq!(gc_protection, expected_gc_protection);
754                    let needed_blobs = needed_blobs.into_stream();
755                    let dir = dir.into_stream();
756
757                    PendingGet { stream: needed_blobs, dir, responder }
758                }
759                r => panic!("Unexpected request: {r:?}"),
760            }
761        }
762
763        async fn expect_closed(mut self) {
764            assert_matches!(self.stream.next().await, None);
765        }
766    }
767
768    struct PendingGet {
769        stream: NeededBlobsRequestStream,
770        dir: fio::DirectoryRequestStream,
771        responder: PackageCacheGetResponder,
772    }
773
774    impl PendingGet {
775        async fn new() -> (Get, PendingGet) {
776            let (client, mut server) = MockPackageCache::new();
777
778            let get = client.get(blob_info(42), fpkg::GcProtection::OpenPackageTracking).unwrap();
779            let pending_get =
780                server.expect_get(blob_info(42), fpkg::GcProtection::OpenPackageTracking).await;
781            (get, pending_get)
782        }
783
784        fn finish_hold_stream_open(self) -> (NeededBlobsRequestStream, PackageDirProvider) {
785            self.stream.control_handle().shutdown_with_epitaph(Status::OK);
786            self.responder.send(Ok(())).unwrap();
787            (self.stream, PackageDirProvider { stream: self.dir })
788        }
789
790        fn finish(self) -> PackageDirProvider {
791            self.stream.control_handle().shutdown_with_epitaph(Status::OK);
792            self.responder.send(Ok(())).unwrap();
793            PackageDirProvider { stream: self.dir }
794        }
795
796        #[cfg(target_os = "fuchsia")]
797        fn fail_the_get(self) {
798            self.responder
799                .send(Err(Status::IO_INVALID.into_raw()))
800                .expect("client should be waiting");
801        }
802
803        async fn expect_open_meta_blob(
804            mut self,
805            res: Result<Option<ClientEnd<ffxfs::BlobWriterMarker>>, fpkg::OpenBlobError>,
806        ) -> Self {
807            match self.stream.next().await {
808                Some(Ok(NeededBlobsRequest::OpenMetaBlob { responder })) => {
809                    responder.send(res).unwrap();
810                }
811                r => panic!("Unexpected request: {r:?}"),
812            }
813            self
814        }
815
816        async fn expect_open_blob(
817            mut self,
818            expected_blob_id: BlobId,
819            res: Result<Option<ClientEnd<ffxfs::BlobWriterMarker>>, fpkg::OpenBlobError>,
820        ) -> Self {
821            match self.stream.next().await {
822                Some(Ok(NeededBlobsRequest::OpenBlob {
823                    blob_id,
824                    allow_existing: _,
825                    responder,
826                })) => {
827                    assert_eq!(BlobId::from(blob_id), expected_blob_id);
828                    responder.send(res).unwrap();
829                }
830                r => panic!("Unexpected request: {r:?}"),
831            }
832            self
833        }
834
835        async fn expect_get_missing_blobs(mut self, response_chunks: Vec<Vec<BlobInfo>>) -> Self {
836            match self.stream.next().await {
837                Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
838                    let mut stream = iterator.into_stream();
839
840                    // Respond to each next request with the next chunk.
841                    for chunk in response_chunks {
842                        let chunk = chunk.into_iter().map(fpkg::BlobInfo::from).collect::<Vec<_>>();
843
844                        let BlobInfoIteratorRequest::Next { responder } =
845                            stream.next().await.unwrap().unwrap();
846                        responder.send(&chunk).unwrap();
847                    }
848
849                    // Then respond with an empty chunk.
850                    let BlobInfoIteratorRequest::Next { responder } =
851                        stream.next().await.unwrap().unwrap();
852                    responder.send(&[]).unwrap();
853
854                    // Expect the client to stop asking.
855                    assert_matches!(stream.next().await, None);
856                }
857                r => panic!("Unexpected request: {r:?}"),
858            }
859            self
860        }
861
862        async fn expect_get_missing_blobs_client_closes_channel(
863            mut self,
864            response_chunks: Vec<Vec<BlobInfo>>,
865        ) -> Self {
866            match self.stream.next().await {
867                Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
868                    let mut stream = iterator.into_stream();
869
870                    // Respond to each next request with the next chunk.
871                    for chunk in response_chunks {
872                        let chunk = chunk.into_iter().map(fpkg::BlobInfo::from).collect::<Vec<_>>();
873
874                        let BlobInfoIteratorRequest::Next { responder } =
875                            stream.next().await.unwrap().unwrap();
876                        responder.send(&chunk).unwrap();
877                    }
878
879                    // The client closes the channel before we can respond with an empty chunk.
880                    assert_matches!(stream.next().await, None);
881                }
882                r => panic!("Unexpected request: {r:?}"),
883            }
884            self
885        }
886
887        async fn expect_get_missing_blobs_inject_iterator_error(mut self) -> Self {
888            match self.stream.next().await {
889                Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
890                    iterator
891                        .into_stream_and_control_handle()
892                        .1
893                        .shutdown_with_epitaph(Status::ADDRESS_IN_USE);
894                }
895                r => panic!("Unexpected request: {r:?}"),
896            }
897            self
898        }
899
900        #[cfg(target_os = "fuchsia")]
901        async fn expect_abort(mut self) -> Self {
902            match self.stream.next().await {
903                Some(Ok(NeededBlobsRequest::Abort { responder })) => {
904                    responder.send().unwrap();
905                }
906                r => panic!("Unexpected request: {r:?}"),
907            }
908            self
909        }
910    }
911
912    struct PackageDirProvider {
913        stream: fio::DirectoryRequestStream,
914    }
915
916    impl PackageDirProvider {
917        fn close_pkg_dir(self) {
918            self.stream.control_handle().shutdown_with_epitaph(Status::NOT_EMPTY);
919        }
920    }
921
922    fn blob_id(n: u8) -> BlobId {
923        BlobId::from([n; 32])
924    }
925
926    fn blob_info(n: u8) -> BlobInfo {
927        BlobInfo { blob_id: blob_id(n), length: 0 }
928    }
929
930    #[fuchsia_async::run_singlethreaded(test)]
931    async fn constructor() {
932        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<PackageCacheMarker>();
933        let client = Client::from_proxy(proxy);
934
935        drop(stream);
936        assert_matches!(client.proxy().sync().await, Err(_));
937    }
938
939    #[fuchsia_async::run_singlethreaded(test)]
940    async fn get_present_package() {
941        let (client, mut server) = MockPackageCache::new();
942
943        let ((), ()) = future::join(
944            async {
945                server
946                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
947                    .await
948                    .finish()
949                    .close_pkg_dir();
950                server.expect_closed().await;
951            },
952            async move {
953                let mut get =
954                    client.get(blob_info(2), fpkg::GcProtection::OpenPackageTracking).unwrap();
955
956                assert_matches!(get.open_meta_blob().await.unwrap(), None);
957                assert_eq!(get.get_missing_blobs().try_concat().await.unwrap(), vec![]);
958                let pkg_dir = get.finish().await.unwrap();
959
960                assert_matches!(
961                    pkg_dir.into_proxy().take_event_stream().next().await,
962                    Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
963                );
964            },
965        )
966        .await;
967    }
968
969    #[fuchsia_async::run_singlethreaded(test)]
970    async fn get_present_package_handles_slow_stream_close() {
971        let (client, mut server) = MockPackageCache::new();
972
973        let (send, recv) = futures::channel::oneshot::channel::<()>();
974
975        let ((), ()) = future::join(
976            async {
977                let (needed_blobs_stream, pkg_dir) = server
978                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
979                    .await
980                    .finish_hold_stream_open();
981                pkg_dir.close_pkg_dir();
982
983                // wait until `send` is dropped to drop the request stream.
984                let _ = recv.await;
985                drop(needed_blobs_stream);
986            },
987            async move {
988                let mut get =
989                    client.get(blob_info(2), fpkg::GcProtection::OpenPackageTracking).unwrap();
990
991                assert_matches!(get.open_meta_blob().await.unwrap(), None);
992
993                // ensure sending the request doesn't fail, then unblock closing the channel, then
994                // ensure the get_missing_blobs call detects the closed iterator as success instead
995                // of a PEER_CLOSED error.
996                let missing_blobs_stream = get.get_missing_blobs();
997                drop(send);
998                assert_eq!(missing_blobs_stream.try_concat().await.unwrap(), vec![]);
999                let pkg_dir = get.finish().await.unwrap();
1000
1001                assert_matches!(
1002                    pkg_dir.into_proxy().take_event_stream().next().await,
1003                    Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
1004                );
1005            },
1006        )
1007        .await;
1008    }
1009
1010    #[fuchsia_async::run_singlethreaded(test)]
1011    async fn needed_blobs_open_meta_far() {
1012        let (mut get, pending_get) = PendingGet::new().await;
1013
1014        let ((), ()) = future::join(
1015            async {
1016                pending_get
1017                    .expect_open_meta_blob(Ok(None))
1018                    .await
1019                    .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1020                    .await
1021                    .expect_open_meta_blob(Ok(None))
1022                    .await
1023                    .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1024                    .await
1025                    .expect_open_meta_blob(Err(fpkg::OpenBlobError::OutOfSpace))
1026                    .await
1027                    .expect_open_meta_blob(Err(fpkg::OpenBlobError::UnspecifiedIo))
1028                    .await;
1029            },
1030            async {
1031                {
1032                    let opener = get.make_open_meta_blob();
1033                    assert_matches!(opener.open().await.unwrap(), None);
1034                    assert_matches!(opener.open().await.unwrap(), Some(_));
1035                }
1036                assert_matches!(get.open_meta_blob().await.unwrap(), None);
1037                assert_matches!(get.open_meta_blob().await.unwrap(), Some(_));
1038                assert_matches!(get.open_meta_blob().await, Err(OpenBlobError::OutOfSpace));
1039                assert_matches!(get.open_meta_blob().await, Err(OpenBlobError::UnspecifiedIo));
1040            },
1041        )
1042        .await;
1043    }
1044
1045    #[fuchsia_async::run_singlethreaded(test)]
1046    async fn needed_blobs_open_content_blob() {
1047        let (mut get, pending_get) = PendingGet::new().await;
1048
1049        let ((), ()) = future::join(
1050            async {
1051                pending_get
1052                    .expect_open_blob(blob_id(2), Ok(None))
1053                    .await
1054                    .expect_open_blob(blob_id(2), Ok(Some(fidl::endpoints::create_endpoints().0)))
1055                    .await
1056                    .expect_open_blob(blob_id(10), Ok(None))
1057                    .await
1058                    .expect_open_blob(blob_id(11), Ok(Some(fidl::endpoints::create_endpoints().0)))
1059                    .await
1060                    .expect_open_blob(blob_id(12), Err(fpkg::OpenBlobError::OutOfSpace))
1061                    .await
1062                    .expect_open_blob(blob_id(13), Err(fpkg::OpenBlobError::UnspecifiedIo))
1063                    .await;
1064            },
1065            async {
1066                {
1067                    let opener = get.make_open_blob(blob_id(2));
1068                    assert_matches!(opener.open().await.unwrap(), None);
1069                    assert_matches!(opener.open().await.unwrap(), Some(_));
1070                }
1071                assert_matches!(get.open_blob(blob_id(10),).await.unwrap(), None);
1072                assert_matches!(get.open_blob(blob_id(11),).await.unwrap(), Some(_));
1073                assert_matches!(get.open_blob(blob_id(12),).await, Err(OpenBlobError::OutOfSpace));
1074                assert_matches!(
1075                    get.open_blob(blob_id(13),).await,
1076                    Err(OpenBlobError::UnspecifiedIo)
1077                );
1078            },
1079        )
1080        .await;
1081    }
1082
1083    #[fuchsia_async::run_singlethreaded(test)]
1084    async fn needed_blobs_get_missing_blobs_on_closed_ok() {
1085        let (mut get, pending_get) = PendingGet::new().await;
1086        let _ = pending_get.finish();
1087
1088        assert_matches!(get.open_meta_blob().await, Ok(None));
1089        assert_eq!(get.get_missing_blobs().try_concat().await.unwrap(), vec![]);
1090    }
1091
1092    #[fuchsia_async::run_singlethreaded(test)]
1093    async fn needed_blobs_get_missing_blobs() {
1094        let (mut get, pending_get) = PendingGet::new().await;
1095
1096        let ((), ()) = future::join(
1097            async {
1098                pending_get
1099                    .expect_get_missing_blobs(vec![
1100                        vec![blob_info(1), blob_info(2)],
1101                        vec![blob_info(3)],
1102                    ])
1103                    .await;
1104            },
1105            async {
1106                assert_eq!(
1107                    get.get_missing_blobs().try_concat().await.unwrap(),
1108                    vec![blob_info(1), blob_info(2), blob_info(3)]
1109                );
1110            },
1111        )
1112        .await;
1113    }
1114
1115    #[fuchsia_async::run_singlethreaded(test)]
1116    async fn needed_blobs_get_missing_blobs_fail_to_obtain_iterator() {
1117        let (mut get, pending_get) = PendingGet::new().await;
1118        drop(pending_get);
1119
1120        assert_matches!(
1121            get.get_missing_blobs().try_concat().await,
1122            Err(ListMissingBlobsError::CallNextOnBlobIterator(
1123                fidl::Error::ClientChannelClosed{status, ..})
1124            )
1125                if status == Status::PEER_CLOSED
1126        );
1127    }
1128
1129    #[fuchsia_async::run_singlethreaded(test)]
1130    async fn needed_blobs_get_missing_blobs_iterator_contains_error() {
1131        let (mut get, pending_get) = PendingGet::new().await;
1132
1133        let (_, ()) =
1134            future::join(pending_get.expect_get_missing_blobs_inject_iterator_error(), async {
1135                assert_matches!(
1136                    get.get_missing_blobs().try_concat().await,
1137                    Err(ListMissingBlobsError::CallNextOnBlobIterator(
1138                        fidl::Error::ClientChannelClosed{status, ..}
1139                    ))
1140                        if status == Status::ADDRESS_IN_USE
1141                );
1142            })
1143            .await;
1144    }
1145
1146    #[cfg(target_os = "fuchsia")]
1147    #[test]
1148    fn needed_blobs_abort() {
1149        use futures::future::Either;
1150        use futures::pin_mut;
1151        use std::task::Poll;
1152
1153        let mut executor = fuchsia_async::TestExecutor::new_with_fake_time();
1154
1155        let fut = async {
1156            let (get, pending_get) = PendingGet::new().await;
1157
1158            let abort_fut = get.abort().boxed();
1159            let expect_abort_fut = pending_get.expect_abort();
1160            pin_mut!(expect_abort_fut);
1161
1162            match futures::future::select(abort_fut, expect_abort_fut).await {
1163                Either::Left(((), _expect_abort_fut)) => {
1164                    panic!("abort should wait for the get future to complete")
1165                }
1166                Either::Right((pending_get, abort_fut)) => (abort_fut, pending_get),
1167            }
1168        };
1169        pin_mut!(fut);
1170
1171        let (mut abort_fut, pending_get) = match executor.run_until_stalled(&mut fut) {
1172            Poll::Pending => panic!("should complete"),
1173            Poll::Ready((abort_fut, pending_get)) => (abort_fut, pending_get),
1174        };
1175
1176        // NeededBlobs.Abort should wait until PackageCache.Get returns
1177        assert_matches!(executor.run_until_stalled(&mut abort_fut), Poll::Pending);
1178        pending_get.fail_the_get();
1179        assert_matches!(executor.run_until_stalled(&mut abort_fut), Poll::Ready(()));
1180    }
1181
1182    struct MockNeededBlob {
1183        writer: ffxfs::BlobWriterRequestStream,
1184        needed_blobs: fpkg::NeededBlobsRequestStream,
1185        vmo: Option<zx::Vmo>,
1186    }
1187
1188    impl MockNeededBlob {
1189        fn mock_hash() -> BlobId {
1190            [7; 32].into()
1191        }
1192
1193        fn new() -> (NeededBlob, Self) {
1194            let (writer_client, writer) =
1195                fidl::endpoints::create_request_stream::<ffxfs::BlobWriterMarker>();
1196            let (needed_blobs_proxy, needed_blobs) =
1197                fidl::endpoints::create_proxy_and_stream::<fpkg::NeededBlobsMarker>();
1198            (
1199                NeededBlob {
1200                    blob: Blob {
1201                        needed_blobs: needed_blobs_proxy,
1202                        blob_id: Self::mock_hash(),
1203                        state: NeedsTruncate(writer_client),
1204                    },
1205                },
1206                Self { writer, needed_blobs, vmo: None },
1207            )
1208        }
1209
1210        async fn fail_get_vmo(mut self) -> Self {
1211            match self.writer.next().await {
1212                Some(Ok(ffxfs::BlobWriterRequest::GetVmo { size: _, responder })) => {
1213                    responder.send(Err(Status::NO_SPACE.into_raw())).unwrap();
1214                }
1215                r => panic!("Unexpected request: {r:?}"),
1216            }
1217            self
1218        }
1219
1220        async fn expect_get_vmo(mut self, expected_size: u64) -> Self {
1221            match self.writer.next().await {
1222                Some(Ok(ffxfs::BlobWriterRequest::GetVmo { size, responder })) => {
1223                    assert_eq!(size, expected_size);
1224                    let vmo = zx::Vmo::create(size).unwrap();
1225                    assert!(self.vmo.is_none());
1226                    self.vmo = Some(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap());
1227                    responder.send(Ok(vmo)).unwrap();
1228                }
1229                r => panic!("Unexpected request: {r:?}"),
1230            }
1231            self
1232        }
1233
1234        async fn fail_bytes_ready(mut self) -> Self {
1235            match self.writer.next().await {
1236                Some(Ok(ffxfs::BlobWriterRequest::BytesReady { bytes_written: _, responder })) => {
1237                    responder.send(Err(Status::NO_SPACE.into_raw())).unwrap();
1238                }
1239                r => panic!("Unexpected request: {r:?}"),
1240            }
1241            self
1242        }
1243
1244        async fn expect_bytes_ready(mut self, expected_payload: &[u8], offset: u64) -> Self {
1245            match self.writer.next().await {
1246                Some(Ok(ffxfs::BlobWriterRequest::BytesReady { bytes_written, responder })) => {
1247                    assert_eq!(bytes_written, u64::try_from(expected_payload.len()).unwrap());
1248
1249                    let vmo = self.vmo.as_ref().unwrap();
1250                    let mut buf = vec![0; expected_payload.len()];
1251                    let () = vmo.read(&mut buf, offset).unwrap();
1252                    assert_eq!(buf, expected_payload);
1253
1254                    responder.send(Ok(())).unwrap();
1255                }
1256                r => panic!("Unexpected request: {r:?}"),
1257            }
1258            self
1259        }
1260
1261        async fn expect_blob_written(mut self) -> Self {
1262            match self.needed_blobs.next().await {
1263                Some(Ok(fpkg::NeededBlobsRequest::BlobWritten { blob_id, responder })) => {
1264                    assert_eq!(blob_id, Self::mock_hash().into());
1265                    responder.send(Ok(())).unwrap();
1266                }
1267                r => panic!("Unexpected request: {r:?}"),
1268            }
1269            self
1270        }
1271    }
1272
1273    #[fuchsia_async::run_singlethreaded(test)]
1274    async fn empty_blob_write() {
1275        let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1276
1277        let ((), ()) = future::join(
1278            async {
1279                blob_server.expect_get_vmo(0).await.expect_blob_written().await;
1280            },
1281            async {
1282                let blob = match blob.truncate(0).await.unwrap() {
1283                    TruncateBlobSuccess::AllWritten(blob) => blob,
1284                    other => panic!("empty blob shouldn't need bytes {other:?}"),
1285                };
1286                let () = blob.blob_written().await.unwrap();
1287            },
1288        )
1289        .await;
1290    }
1291
1292    impl TruncateBlobSuccess {
1293        fn unwrap_needs_data(self) -> Blob<NeedsData> {
1294            match self {
1295                TruncateBlobSuccess::NeedsData(blob) => blob,
1296                TruncateBlobSuccess::AllWritten(_) => panic!("blob should need data"),
1297            }
1298        }
1299    }
1300
1301    impl BlobWriteSuccess {
1302        fn unwrap_needs_data(self) -> Blob<NeedsData> {
1303            match self {
1304                BlobWriteSuccess::NeedsData(blob) => blob,
1305                BlobWriteSuccess::AllWritten(_) => panic!("blob should need data"),
1306            }
1307        }
1308
1309        fn unwrap_all_written(self) -> Blob<NeedsBlobWritten> {
1310            match self {
1311                BlobWriteSuccess::NeedsData(_) => panic!("blob should be completely written"),
1312                BlobWriteSuccess::AllWritten(blob) => blob,
1313            }
1314        }
1315    }
1316
1317    #[fuchsia_async::run_singlethreaded(test)]
1318    async fn small_blob_write() {
1319        let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1320
1321        let ((), ()) = future::join(
1322            async {
1323                blob_server
1324                    .expect_get_vmo(4)
1325                    .await
1326                    .expect_bytes_ready(b"test", 0)
1327                    .await
1328                    .expect_blob_written()
1329                    .await;
1330            },
1331            async {
1332                let blob = blob.truncate(4).await.unwrap().unwrap_needs_data();
1333                let blob = blob.write(b"test").await.unwrap().unwrap_all_written();
1334                let () = blob.blob_written().await.unwrap();
1335            },
1336        )
1337        .await;
1338    }
1339
1340    #[fuchsia_async::run_singlethreaded(test)]
1341    async fn blob_truncate_no_space() {
1342        let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1343
1344        let ((), ()) = future::join(
1345            async {
1346                blob_server.fail_get_vmo().await;
1347            },
1348            async {
1349                assert_matches!(blob.truncate(4).await, Err(TruncateBlobError::NoSpace));
1350            },
1351        )
1352        .await;
1353    }
1354
1355    #[fuchsia_async::run_singlethreaded(test)]
1356    async fn blob_write_no_space() {
1357        let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1358
1359        let ((), ()) = future::join(
1360            async {
1361                blob_server.expect_get_vmo(4).await.fail_bytes_ready().await;
1362            },
1363            async {
1364                let blob = blob.truncate(4).await.unwrap().unwrap_needs_data();
1365                assert_matches!(blob.write(b"test").await, Err(WriteBlobError::NoSpace));
1366            },
1367        )
1368        .await;
1369    }
1370
1371    #[fuchsia_async::run_singlethreaded(test)]
1372    async fn blob_write_multiple_write() {
1373        let (NeededBlob { blob }, blob_server) = MockNeededBlob::new();
1374
1375        let ((), ()) = future::join(
1376            async {
1377                blob_server
1378                    .expect_get_vmo(6)
1379                    .await
1380                    .expect_bytes_ready(b"abc", 0)
1381                    .await
1382                    .expect_bytes_ready(b"123", 3)
1383                    .await
1384                    .expect_blob_written()
1385                    .await;
1386            },
1387            async {
1388                let blob = blob.truncate(6).await.unwrap().unwrap_needs_data();
1389                let blob = blob.write(b"abc").await.unwrap().unwrap_needs_data();
1390                let blob = blob.write(b"123").await.unwrap().unwrap_all_written();
1391                let () = blob.blob_written().await.unwrap();
1392            },
1393        )
1394        .await;
1395    }
1396
1397    #[fuchsia_async::run_singlethreaded(test)]
1398    async fn get_already_cached_success() {
1399        let (client, mut server) = MockPackageCache::new();
1400
1401        let ((), ()) = future::join(
1402            async {
1403                server
1404                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1405                    .await
1406                    .finish()
1407                    .close_pkg_dir();
1408                server.expect_closed().await;
1409            },
1410            async move {
1411                let pkg_dir = client.get_already_cached(blob_id(2)).await.unwrap();
1412
1413                assert_matches!(
1414                    pkg_dir.into_proxy().take_event_stream().next().await,
1415                    Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
1416                );
1417            },
1418        )
1419        .await;
1420    }
1421
1422    #[fuchsia_async::run_singlethreaded(test)]
1423    async fn get_already_cached_missing_meta_far() {
1424        let (client, mut server) = MockPackageCache::new();
1425
1426        let ((), ()) = future::join(
1427            async {
1428                server
1429                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1430                    .await
1431                    .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1432                    .await;
1433            },
1434            async move {
1435                assert_matches!(
1436                    client.get_already_cached(blob_id(2)).await,
1437                    Err(GetAlreadyCachedError::MissingMetaFar)
1438                );
1439            },
1440        )
1441        .await;
1442    }
1443
1444    #[fuchsia_async::run_singlethreaded(test)]
1445    async fn get_already_cached_missing_content_blob() {
1446        let (client, mut server) = MockPackageCache::new();
1447
1448        let ((), ()) = future::join(
1449            async {
1450                server
1451                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1452                    .await
1453                    .expect_open_meta_blob(Ok(None))
1454                    .await
1455                    .expect_get_missing_blobs_client_closes_channel(vec![vec![BlobInfo {
1456                        blob_id: [0; 32].into(),
1457                        length: 0,
1458                    }]])
1459                    .await;
1460            },
1461            async move {
1462                assert_matches!(
1463                    client.get_already_cached(blob_id(2)).await,
1464                    Err(GetAlreadyCachedError::MissingContentBlobs(v))
1465                        if v == vec![BlobInfo {
1466                            blob_id: [0; 32].into(),
1467                            length: 0,
1468                        }]
1469                );
1470            },
1471        )
1472        .await;
1473    }
1474}