fidl_fuchsia_pkg_ext/
cache.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![cfg(target_os = "fuchsia")]
6#![deny(missing_docs)]
7
8//! Wrapper types for [`fidl_fuchsia_pkg::PackageCacheProxy`] and its related protocols.
9
10use crate::types::{BlobId, BlobInfo};
11use fidl_fuchsia_pkg as fpkg;
12use fuchsia_pkg::PackageDirectory;
13use futures::prelude::*;
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::sync::Arc;
16use zx_status::Status;
17
18mod storage;
19
20/// An open connection to a provider of the `fuchsia.pkg.PackageCache`.
21#[derive(Debug, Clone)]
22pub struct Client {
23    proxy: fpkg::PackageCacheProxy,
24}
25
26impl Client {
27    /// Constructs a client from the given proxy.
28    pub fn from_proxy(proxy: fpkg::PackageCacheProxy) -> Self {
29        Self { proxy }
30    }
31
32    /// Returns a reference to the underlying PackageCacheProxy connection.
33    pub fn proxy(&self) -> &fpkg::PackageCacheProxy {
34        &self.proxy
35    }
36
37    /// Opens the package specified by `meta_far_blob` with the intent to fetch any missing blobs
38    /// using the returned [`Get`] type if needed.
39    pub fn get(
40        &self,
41        meta_far_blob: BlobInfo,
42        gc_protection: fpkg::GcProtection,
43    ) -> Result<Get, fidl::Error> {
44        let (needed_blobs, needed_blobs_server_end) =
45            fidl::endpoints::create_proxy::<fpkg::NeededBlobsMarker>();
46        let (pkg_dir, pkg_dir_server_end) = PackageDirectory::create_request()?;
47
48        let get_fut = self.proxy.get(
49            &meta_far_blob.into(),
50            gc_protection,
51            needed_blobs_server_end,
52            pkg_dir_server_end,
53        );
54
55        Ok(Get {
56            get_fut,
57            pkg_dir,
58            needed_blobs,
59            pkg_present: SharedBoolEvent::new(),
60            meta_far: meta_far_blob.blob_id,
61        })
62    }
63
64    /// Uses PackageCache.Get to obtain the package directory of a package that is already cached
65    /// (all blobs are already in blobfs).
66    /// Errors if the package is not already cached.
67    /// Always uses open package tracking GC protection, because OTA (the only client of Retained
68    /// GC protection), should never need to get an already cached package.
69    ///
70    /// Compared to `get_cached`:
71    ///   * Activates `meta_far_blob` in the dynamic index
72    ///   * Must not be called concurrently with the same `meta_far_blob`
73    pub async fn get_already_cached(
74        &self,
75        meta_far_blob: BlobId,
76    ) -> Result<PackageDirectory, GetAlreadyCachedError> {
77        let mut get = self
78            .get(
79                BlobInfo { blob_id: meta_far_blob, length: 0 },
80                fpkg::GcProtection::OpenPackageTracking,
81            )
82            .map_err(GetAlreadyCachedError::Get)?;
83        if let Some(_) = get.open_meta_blob().await.map_err(GetAlreadyCachedError::OpenMetaBlob)? {
84            return Err(GetAlreadyCachedError::MissingMetaFar);
85        }
86
87        if let Some(missing_blobs) = get
88            .get_missing_blobs()
89            .try_next()
90            .await
91            .map_err(GetAlreadyCachedError::GetMissingBlobs)?
92        {
93            return Err(GetAlreadyCachedError::MissingContentBlobs(missing_blobs));
94        }
95
96        get.finish().await.map_err(GetAlreadyCachedError::FinishGet)
97    }
98
99    /// Uses PackageCache.GetSubpackage to obtain the package directory of a subpackage.
100    /// Errors if there is not an open connection to the superpackage.
101    pub async fn get_subpackage(
102        &self,
103        superpackage: BlobId,
104        subpackage: &fuchsia_url::RelativePackageUrl,
105    ) -> Result<PackageDirectory, GetSubpackageError> {
106        let (dir, dir_server_end) =
107            PackageDirectory::create_request().map_err(GetSubpackageError::CreatingHandles)?;
108        let () = self
109            .proxy
110            .get_subpackage(
111                &superpackage.into(),
112                &fpkg::PackageUrl { url: subpackage.into() },
113                dir_server_end,
114            )
115            .await
116            .map_err(GetSubpackageError::CallingGetSubpackage)??;
117        Ok(dir)
118    }
119}
120
121#[derive(thiserror::Error, Debug)]
122#[allow(missing_docs)]
123pub enum GetAlreadyCachedError {
124    #[error("calling get")]
125    Get(#[source] fidl::Error),
126
127    #[error("opening meta blob")]
128    OpenMetaBlob(#[source] OpenBlobError),
129
130    #[error("meta.far blob not cached")]
131    MissingMetaFar,
132
133    #[error("getting missing blobs")]
134    GetMissingBlobs(#[source] ListMissingBlobsError),
135
136    #[error("content blobs not cached {0:?}")]
137    MissingContentBlobs(Vec<BlobInfo>),
138
139    #[error("finishing get")]
140    FinishGet(#[source] GetError),
141}
142
143impl GetAlreadyCachedError {
144    /// Returns true if the get failed because the package was not cached.
145    pub fn was_not_cached(&self) -> bool {
146        use GetAlreadyCachedError::*;
147        match self {
148            Get(..) | OpenMetaBlob(..) | GetMissingBlobs(..) | FinishGet(..) => false,
149            MissingMetaFar | MissingContentBlobs(..) => true,
150        }
151    }
152}
153
154#[derive(thiserror::Error, Debug)]
155#[allow(missing_docs)]
156pub enum GetSubpackageError {
157    #[error("creating handles")]
158    CreatingHandles(#[source] fidl::Error),
159
160    #[error("calling GetCached FIDL")]
161    CallingGetSubpackage(#[source] fidl::Error),
162
163    #[error("the superpackage does not have an open package connection")]
164    SuperpackageClosed,
165
166    #[error("the subpackage does not exist")]
167    DoesNotExist,
168
169    #[error("internal")]
170    Internal,
171}
172
173impl From<fpkg::GetSubpackageError> for GetSubpackageError {
174    fn from(fidl: fpkg::GetSubpackageError) -> Self {
175        use fpkg::GetSubpackageError as fErr;
176        use GetSubpackageError::*;
177        match fidl {
178            fErr::SuperpackageClosed => SuperpackageClosed,
179            fErr::DoesNotExist => DoesNotExist,
180            fErr::Internal => Internal,
181        }
182    }
183}
184
185#[derive(Debug, Clone)]
186struct SharedBoolEvent(Arc<AtomicBool>);
187
188impl SharedBoolEvent {
189    fn new() -> Self {
190        Self(Arc::new(AtomicBool::new(false)))
191    }
192
193    fn get(&self) -> bool {
194        self.0.load(Ordering::SeqCst)
195    }
196
197    fn set(&self) {
198        self.0.store(true, Ordering::SeqCst)
199    }
200}
201
202async fn open_blob(
203    needed_blobs: &fpkg::NeededBlobsProxy,
204    kind: OpenKind,
205    blob_id: BlobId,
206    pkg_present: Option<&SharedBoolEvent>,
207) -> Result<Option<NeededBlob>, OpenBlobError> {
208    let open_fut = match kind {
209        OpenKind::Meta => needed_blobs.open_meta_blob(),
210        OpenKind::Content => needed_blobs.open_blob(&blob_id.into()),
211    };
212    match open_fut.await {
213        Err(fidl::Error::ClientChannelClosed { status: Status::OK, .. }) => {
214            if let Some(pkg_present) = pkg_present {
215                pkg_present.set();
216            }
217            Ok(None)
218        }
219        res => {
220            if let Some(blob) = res?? {
221                let (writer, closer) = self::storage::into_blob_writer_and_closer(*blob)?;
222                Ok(Some(NeededBlob {
223                    blob: Blob {
224                        writer,
225                        needed_blobs: needed_blobs.clone(),
226                        blob_id,
227                        state: NeedsTruncate,
228                    },
229                    closer: BlobCloser { closer, closed: false },
230                }))
231            } else {
232                Ok(None)
233            }
234        }
235    }
236}
237
238#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239enum OpenKind {
240    Meta,
241    Content,
242}
243
244/// A deferred call to [`Get::open_meta_blob`] or [`Get::open_blob`].
245#[derive(Debug)]
246pub struct DeferredOpenBlob {
247    needed_blobs: fpkg::NeededBlobsProxy,
248    kind: OpenKind,
249    blob_id: BlobId,
250    pkg_present: Option<SharedBoolEvent>,
251}
252
253impl DeferredOpenBlob {
254    /// Opens the blob for write, if it is still needed. The blob's data can be provided using the
255    /// returned NeededBlob.
256    pub async fn open(&self) -> Result<Option<NeededBlob>, OpenBlobError> {
257        open_blob(&self.needed_blobs, self.kind, self.blob_id, self.pkg_present.as_ref()).await
258    }
259
260    fn proxy_cmp_key(&self) -> u32 {
261        use fidl::endpoints::Proxy;
262        use fidl::AsHandleRef;
263        self.needed_blobs.as_channel().raw_handle()
264    }
265}
266
267impl std::cmp::PartialEq for DeferredOpenBlob {
268    fn eq(&self, other: &Self) -> bool {
269        self.proxy_cmp_key() == other.proxy_cmp_key() && self.kind == other.kind
270    }
271}
272
273impl std::cmp::Eq for DeferredOpenBlob {}
274
275/// A pending `fuchsia.pkg/PackageCache.Get()` request. Clients must, in order:
276/// 1. open/write the meta blob, if Some(NeededBlob) is provided by that API
277/// 2. enumerate all missing content blobs
278/// 3. open/write all missing content blobs, if Some(NeededBlob) is provided by that API
279/// 4. finish() to complete the Get() request.
280#[derive(Debug)]
281pub struct Get {
282    get_fut: fidl::client::QueryResponseFut<Result<(), i32>>,
283    needed_blobs: fpkg::NeededBlobsProxy,
284    pkg_dir: PackageDirectory,
285    pkg_present: SharedBoolEvent,
286    meta_far: BlobId,
287}
288
289impl Get {
290    /// Returns an independent object that can be used to open the meta blob for write.  See
291    /// [`Self::open_meta_blob`].
292    pub fn make_open_meta_blob(&mut self) -> DeferredOpenBlob {
293        DeferredOpenBlob {
294            needed_blobs: self.needed_blobs.clone(),
295            kind: OpenKind::Meta,
296            blob_id: self.meta_far,
297            pkg_present: Some(self.pkg_present.clone()),
298        }
299    }
300
301    /// Opens the meta blob for write, if it is still needed. The blob's data can be provided using
302    /// the returned NeededBlob.
303    pub async fn open_meta_blob(&mut self) -> Result<Option<NeededBlob>, OpenBlobError> {
304        open_blob(&self.needed_blobs, OpenKind::Meta, self.meta_far, Some(&self.pkg_present)).await
305    }
306
307    fn start_get_missing_blobs(
308        &mut self,
309    ) -> Result<Option<fpkg::BlobInfoIteratorProxy>, fidl::Error> {
310        if self.pkg_present.get() {
311            return Ok(None);
312        }
313
314        let (blob_iterator, blob_iterator_server_end) =
315            fidl::endpoints::create_proxy::<fpkg::BlobInfoIteratorMarker>();
316
317        self.needed_blobs.get_missing_blobs(blob_iterator_server_end)?;
318        Ok(Some(blob_iterator))
319    }
320
321    /// Determines the set of blobs that the caller must open/write to complete this `Get()`
322    /// operation.
323    /// The returned stream will never yield an empty `Vec`.
324    /// Callers should process the missing blobs (via `make_open_blob` or `open_blob`) concurrently
325    /// with reading the stream to guarantee stream termination.
326    pub fn get_missing_blobs(
327        &mut self,
328    ) -> impl Stream<Item = Result<Vec<BlobInfo>, ListMissingBlobsError>> + Unpin {
329        match self.start_get_missing_blobs() {
330            Ok(option_iter) => match option_iter {
331                Some(iterator) => crate::fidl_iterator_to_stream(iterator)
332                    .map_ok(|v| v.into_iter().map(BlobInfo::from).collect())
333                    .map_err(ListMissingBlobsError::CallNextOnBlobIterator)
334                    .left_stream(),
335                None => futures::stream::empty().right_stream(),
336            }
337            .left_stream(),
338            Err(e) => {
339                futures::stream::iter(Some(Err(ListMissingBlobsError::CallGetMissingBlobs(e))))
340                    .right_stream()
341            }
342        }
343    }
344
345    /// Returns an independent object that can be used to open the `content_blob` for write.  See
346    /// [`Self::open_blob`].
347    pub fn make_open_blob(&mut self, content_blob: BlobId) -> DeferredOpenBlob {
348        DeferredOpenBlob {
349            needed_blobs: self.needed_blobs.clone(),
350            kind: OpenKind::Content,
351            blob_id: content_blob,
352            pkg_present: None,
353        }
354    }
355
356    /// Opens `content_blob` for write, if it is still needed. The blob's data can be provided
357    /// using the returned NeededBlob.
358    pub async fn open_blob(
359        &mut self,
360        content_blob: BlobId,
361    ) -> Result<Option<NeededBlob>, OpenBlobError> {
362        open_blob(&self.needed_blobs, OpenKind::Content, content_blob, None).await
363    }
364
365    /// Notifies the endpoint that all blobs have been written and wait for the response to the
366    /// pending `Get()` request, returning the cached [`PackageDirectory`].
367    pub async fn finish(self) -> Result<PackageDirectory, GetError> {
368        drop(self.needed_blobs);
369        let () = self.get_fut.await?.map_err(Status::from_raw)?;
370        Ok(self.pkg_dir)
371    }
372
373    /// Aborts this caching operation for the package.
374    pub async fn abort(self) {
375        self.needed_blobs.abort().map(|_: Result<(), fidl::Error>| ()).await;
376        // The package is not guaranteed to be removed from the dynamic index after abort
377        // returns, we have to wait until finish returns (to prevent a resolve retry from
378        // racing). The finish call will return an error that just tells us that we called
379        // abort, so we ignore it.
380        let _ = self.get_fut.await;
381    }
382}
383
384/// A blob that needs to be written.
385#[derive(Debug)]
386pub struct NeededBlob {
387    /// Typestate wrapper around the blob. Clients must first call truncate(), then write() until
388    /// all data is provided.
389    pub blob: Blob<NeedsTruncate>,
390
391    /// Helper object that can close the blob independent of what state `blob` is in.
392    pub closer: BlobCloser,
393}
394
395/// A handle to a blob that must be explicitly closed to prevent future opens of the same blob from
396/// racing with this blob closing.
397#[derive(Debug)]
398#[must_use = "Subsequent opens of this blob may race with closing this one"]
399pub struct BlobCloser {
400    closer: Box<dyn self::storage::Closer>,
401    closed: bool,
402}
403
404impl BlobCloser {
405    /// Close the blob, silently ignoring errors.
406    pub async fn close(mut self) {
407        let () = self.closer.close().await;
408        self.closed = true;
409    }
410}
411
412impl Drop for BlobCloser {
413    fn drop(&mut self) {
414        if !self.closed {
415            let () = self.closer.best_effort_close();
416        }
417    }
418}
419
420/// The successful result of truncating a blob.
421#[derive(Debug)]
422pub enum TruncateBlobSuccess {
423    /// The blob contents need to be written.
424    NeedsData(Blob<NeedsData>),
425
426    /// The blob is fully written (it was the empty blob) and now a
427    /// fuchsia.pkg.NeededBlobs.BlobWritten message should be sent.
428    AllWritten(Blob<NeedsBlobWritten>),
429}
430
431/// The successful result of writing some data to a blob.
432#[derive(Debug)]
433pub enum BlobWriteSuccess {
434    /// There is still more data to write.
435    NeedsData(Blob<NeedsData>),
436
437    /// The blob is fully written and now a fuchsia.pkg.NeededBlobs.BlobWritten
438    /// message should be sent.
439    AllWritten(Blob<NeedsBlobWritten>),
440}
441
442/// State for a blob that can be truncated.
443#[derive(Debug)]
444pub struct NeedsTruncate;
445
446/// State for a blob that can be written to.
447#[derive(Debug)]
448pub struct NeedsData {
449    size: u64,
450    written: u64,
451}
452
453/// State for a blob that has been fully written but that needs a
454/// fuchsia.pkg.NeededBlobs.BlobWritten message sent to pkg-cache.
455#[derive(Debug)]
456pub struct NeedsBlobWritten;
457
458/// A blob in the process of being written.
459#[derive(Debug)]
460#[must_use]
461pub struct Blob<S> {
462    writer: Box<dyn self::storage::Writer>,
463    needed_blobs: fpkg::NeededBlobsProxy,
464    blob_id: BlobId,
465    state: S,
466}
467
468impl Blob<NeedsTruncate> {
469    /// Truncates the blob to the given size. On success, the blob enters the writable state.
470    pub async fn truncate(mut self, size: u64) -> Result<TruncateBlobSuccess, TruncateBlobError> {
471        let () = self.writer.truncate(size).await?;
472
473        let Self { writer, needed_blobs, blob_id, state: _ } = self;
474
475        Ok(if size == 0 {
476            TruncateBlobSuccess::AllWritten(Blob {
477                writer,
478                needed_blobs,
479                blob_id,
480                state: NeedsBlobWritten,
481            })
482        } else {
483            TruncateBlobSuccess::NeedsData(Blob {
484                writer,
485                needed_blobs,
486                blob_id,
487                state: NeedsData { size, written: 0 },
488            })
489        })
490    }
491}
492
493impl Blob<NeedsData> {
494    /// Writes all of the given buffer to the blob.
495    ///
496    /// # Panics
497    ///
498    /// Panics if a write is attempted with a buf larger than the remaining blob size.
499    pub fn write(
500        self,
501        buf: &[u8],
502    ) -> impl Future<Output = Result<BlobWriteSuccess, WriteBlobError>> + '_ {
503        self.write_with_trace_callbacks(buf, &|_| {}, &|| {})
504    }
505
506    /// Writes all of the given buffer to the blob.
507    ///
508    /// `after_write` and `after_write_ack` are called before and after, respectively, waiting for
509    /// the server to acknowledge writes.
510    /// They may be called multiple times if the write of `buf` is chunked.
511    /// `after_write` is given the size of each write in bytes.
512    /// Useful for creating trace spans.
513    ///
514    /// # Panics
515    ///
516    /// Panics if a write is attempted with a buf larger than the remaining blob size.
517    pub async fn write_with_trace_callbacks(
518        mut self,
519        buf: &[u8],
520        after_write: &(dyn Fn(u64) + Send + Sync),
521        after_write_ack: &(dyn Fn() + Send + Sync),
522    ) -> Result<BlobWriteSuccess, WriteBlobError> {
523        assert!(self.state.written + buf.len() as u64 <= self.state.size);
524
525        let () = self.writer.write(buf, after_write, after_write_ack).await?;
526        self.state.written += buf.len() as u64;
527
528        if self.state.written == self.state.size {
529            let Self { writer, needed_blobs, blob_id, state: _ } = self;
530            Ok(BlobWriteSuccess::AllWritten(Blob {
531                writer,
532                needed_blobs,
533                blob_id,
534                state: NeedsBlobWritten,
535            }))
536        } else {
537            Ok(BlobWriteSuccess::NeedsData(self))
538        }
539    }
540}
541
542impl Blob<NeedsBlobWritten> {
543    /// Tells pkg-cache that the blob has been successfully written and can now be read.
544    pub async fn blob_written(self) -> Result<(), BlobWrittenError> {
545        Ok(self.needed_blobs.blob_written(&self.blob_id.into()).await??)
546    }
547}
548
549/// An error encountered while opening a package.
550#[derive(Debug, thiserror::Error)]
551#[allow(missing_docs)]
552pub enum OpenError {
553    #[error("the package does not exist")]
554    NotFound,
555
556    #[error("Open() responded with an unexpected status")]
557    UnexpectedResponse(#[source] Status),
558
559    #[error("transport error")]
560    Fidl(#[from] fidl::Error),
561}
562/// An error encountered while caching a package.
563#[derive(Debug, thiserror::Error)]
564#[allow(missing_docs)]
565pub enum GetError {
566    #[error("Get() responded with an unexpected status")]
567    UnexpectedResponse(#[from] Status),
568
569    #[error("transport error")]
570    Fidl(#[from] fidl::Error),
571}
572
573/// An error encountered while opening a metadata or content blob for write.
574#[derive(Debug, thiserror::Error)]
575#[allow(missing_docs)]
576pub enum OpenBlobError {
577    #[error("there is insufficient storage space available to persist this blob")]
578    OutOfSpace,
579
580    #[error("this blob is already open for write by another cache operation")]
581    ConcurrentWrite,
582
583    #[error("an unspecified error occurred during underlying I/O")]
584    UnspecifiedIo,
585
586    #[error("an unspecified error occurred")]
587    Internal,
588
589    #[error("transport error")]
590    Fidl(#[from] fidl::Error),
591}
592
593impl From<fpkg::OpenBlobError> for OpenBlobError {
594    fn from(e: fpkg::OpenBlobError) -> Self {
595        match e {
596            fpkg::OpenBlobError::OutOfSpace => OpenBlobError::OutOfSpace,
597            fpkg::OpenBlobError::ConcurrentWrite => OpenBlobError::ConcurrentWrite,
598            fpkg::OpenBlobError::UnspecifiedIo => OpenBlobError::UnspecifiedIo,
599            fpkg::OpenBlobError::Internal => OpenBlobError::Internal,
600        }
601    }
602}
603
604/// An error encountered while enumerating missing content blobs.
605#[derive(Debug, thiserror::Error)]
606#[allow(missing_docs)]
607pub enum ListMissingBlobsError {
608    #[error("while obtaining the missing blobs fidl iterator")]
609    CallGetMissingBlobs(#[source] fidl::Error),
610
611    #[error("while obtaining the next chunk of blobs from the fidl iterator")]
612    CallNextOnBlobIterator(#[source] fidl::Error),
613}
614
615/// An error encountered while truncating a blob
616#[derive(Debug, thiserror::Error)]
617#[allow(missing_docs)]
618pub enum TruncateBlobError {
619    #[error("insufficient storage space is available")]
620    NoSpace,
621
622    #[error("Truncate() responded with an unexpected status")]
623    UnexpectedResponse(#[source] Status),
624
625    #[error("transport error")]
626    Fidl(#[from] fidl::Error),
627
628    #[error("already truncated, currently in state {0}")]
629    AlreadyTruncated(&'static str),
630
631    // TODO(https://fxbug.dev/42080352) Add error variants to BlobWriter.
632    #[error("unspecified error")]
633    Other(#[source] anyhow::Error),
634
635    #[error("blob is in an invalid state")]
636    BadState,
637}
638
639/// An error encountered while writing a blob.
640#[derive(Debug, thiserror::Error)]
641#[allow(missing_docs)]
642pub enum WriteBlobError {
643    #[error("file endpoint reported it wrote more bytes than were actually provided to the file endpoint")]
644    Overwrite,
645
646    #[error("the written data was corrupt")]
647    Corrupt,
648
649    #[error("insufficient storage space is available")]
650    NoSpace,
651
652    #[error("Write() responded with an unexpected status")]
653    UnexpectedResponse(#[source] Status),
654
655    #[error("transport error")]
656    Fidl(#[from] fidl::Error),
657
658    #[error("bytes were written but not needed in state {0}")]
659    BytesNotNeeded(&'static str),
660
661    #[error("while using the fxblob writer")]
662    FxBlob(#[source] blob_writer::WriteError),
663}
664
665/// An error encountered while sending the BlobWritten message.
666#[derive(Debug, thiserror::Error)]
667#[allow(missing_docs)]
668pub enum BlobWrittenError {
669    #[error("pkg-cache could not find the blob after it was successfully written")]
670    MissingAfterWritten,
671
672    #[error("NeededBlobs.BlobWritten was called before the blob was opened")]
673    UnopenedBlob,
674
675    #[error("transport error")]
676    Fidl(#[from] fidl::Error),
677}
678
679impl From<fpkg::BlobWrittenError> for BlobWrittenError {
680    fn from(e: fpkg::BlobWrittenError) -> Self {
681        match e {
682            fpkg::BlobWrittenError::NotWritten => BlobWrittenError::MissingAfterWritten,
683            fpkg::BlobWrittenError::UnopenedBlob => BlobWrittenError::UnopenedBlob,
684        }
685    }
686}
687
688#[cfg(test)]
689mod tests {
690    use super::*;
691    use assert_matches::assert_matches;
692    use fidl::endpoints::{ClientEnd, ControlHandle as _, RequestStream as _};
693    use fidl_fuchsia_io as fio;
694    use fidl_fuchsia_pkg::{
695        BlobInfoIteratorRequest, NeededBlobsRequest, NeededBlobsRequestStream,
696        PackageCacheGetResponder, PackageCacheMarker, PackageCacheRequest,
697        PackageCacheRequestStream,
698    };
699
700    struct MockPackageCache {
701        stream: PackageCacheRequestStream,
702    }
703
704    impl MockPackageCache {
705        fn new() -> (Client, Self) {
706            let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<PackageCacheMarker>();
707            (Client::from_proxy(proxy), Self { stream })
708        }
709
710        async fn expect_get(
711            &mut self,
712            blob_info: BlobInfo,
713            expected_gc_protection: fpkg::GcProtection,
714        ) -> PendingGet {
715            match self.stream.next().await {
716                Some(Ok(PackageCacheRequest::Get {
717                    meta_far_blob,
718                    gc_protection,
719                    needed_blobs,
720                    dir,
721                    responder,
722                })) => {
723                    assert_eq!(BlobInfo::from(meta_far_blob), blob_info);
724                    assert_eq!(gc_protection, expected_gc_protection);
725                    let needed_blobs = needed_blobs.into_stream();
726                    let dir = dir.into_stream();
727
728                    PendingGet { stream: needed_blobs, dir, responder }
729                }
730                r => panic!("Unexpected request: {:?}", r),
731            }
732        }
733
734        async fn expect_closed(mut self) {
735            assert_matches!(self.stream.next().await, None);
736        }
737    }
738
739    struct PendingGet {
740        stream: NeededBlobsRequestStream,
741        dir: fio::DirectoryRequestStream,
742        responder: PackageCacheGetResponder,
743    }
744
745    impl PendingGet {
746        async fn new() -> (Get, PendingGet) {
747            let (client, mut server) = MockPackageCache::new();
748
749            let get = client.get(blob_info(42), fpkg::GcProtection::OpenPackageTracking).unwrap();
750            let pending_get =
751                server.expect_get(blob_info(42), fpkg::GcProtection::OpenPackageTracking).await;
752            (get, pending_get)
753        }
754
755        fn finish_hold_stream_open(self) -> (NeededBlobsRequestStream, PackageDirProvider) {
756            self.stream.control_handle().shutdown_with_epitaph(Status::OK);
757            self.responder.send(Ok(())).unwrap();
758            (self.stream, PackageDirProvider { stream: self.dir })
759        }
760
761        fn finish(self) -> PackageDirProvider {
762            self.stream.control_handle().shutdown_with_epitaph(Status::OK);
763            self.responder.send(Ok(())).unwrap();
764            PackageDirProvider { stream: self.dir }
765        }
766
767        #[cfg(target_os = "fuchsia")]
768        fn fail_the_get(self) {
769            self.responder
770                .send(Err(Status::IO_INVALID.into_raw()))
771                .expect("client should be waiting");
772        }
773
774        async fn expect_open_meta_blob(
775            mut self,
776            res: Result<Option<ClientEnd<fio::FileMarker>>, fpkg::OpenBlobError>,
777        ) -> Self {
778            match self.stream.next().await {
779                Some(Ok(NeededBlobsRequest::OpenMetaBlob { responder })) => {
780                    responder.send(res.map(|o| o.map(fpkg::BlobWriter::File))).unwrap();
781                }
782                r => panic!("Unexpected request: {:?}", r),
783            }
784            self
785        }
786
787        async fn expect_open_blob(
788            mut self,
789            expected_blob_id: BlobId,
790            res: Result<Option<ClientEnd<fio::FileMarker>>, fpkg::OpenBlobError>,
791        ) -> Self {
792            match self.stream.next().await {
793                Some(Ok(NeededBlobsRequest::OpenBlob { blob_id, responder })) => {
794                    assert_eq!(BlobId::from(blob_id), expected_blob_id);
795                    responder.send(res.map(|o| o.map(fpkg::BlobWriter::File))).unwrap();
796                }
797                r => panic!("Unexpected request: {:?}", r),
798            }
799            self
800        }
801
802        async fn expect_get_missing_blobs(mut self, response_chunks: Vec<Vec<BlobInfo>>) -> Self {
803            match self.stream.next().await {
804                Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
805                    let mut stream = iterator.into_stream();
806
807                    // Respond to each next request with the next chunk.
808                    for chunk in response_chunks {
809                        let chunk = chunk
810                            .into_iter()
811                            .map(fidl_fuchsia_pkg::BlobInfo::from)
812                            .collect::<Vec<_>>();
813
814                        let BlobInfoIteratorRequest::Next { responder } =
815                            stream.next().await.unwrap().unwrap();
816                        responder.send(&chunk).unwrap();
817                    }
818
819                    // Then respond with an empty chunk.
820                    let BlobInfoIteratorRequest::Next { responder } =
821                        stream.next().await.unwrap().unwrap();
822                    responder.send(&[]).unwrap();
823
824                    // Expect the client to stop asking.
825                    assert_matches!(stream.next().await, None);
826                }
827                r => panic!("Unexpected request: {:?}", r),
828            }
829            self
830        }
831
832        async fn expect_get_missing_blobs_client_closes_channel(
833            mut self,
834            response_chunks: Vec<Vec<BlobInfo>>,
835        ) -> Self {
836            match self.stream.next().await {
837                Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
838                    let mut stream = iterator.into_stream();
839
840                    // Respond to each next request with the next chunk.
841                    for chunk in response_chunks {
842                        let chunk = chunk
843                            .into_iter()
844                            .map(fidl_fuchsia_pkg::BlobInfo::from)
845                            .collect::<Vec<_>>();
846
847                        let BlobInfoIteratorRequest::Next { responder } =
848                            stream.next().await.unwrap().unwrap();
849                        responder.send(&chunk).unwrap();
850                    }
851
852                    // The client closes the channel before we can respond with an empty chunk.
853                    assert_matches!(stream.next().await, None);
854                }
855                r => panic!("Unexpected request: {:?}", r),
856            }
857            self
858        }
859
860        async fn expect_get_missing_blobs_inject_iterator_error(mut self) -> Self {
861            match self.stream.next().await {
862                Some(Ok(NeededBlobsRequest::GetMissingBlobs { iterator, control_handle: _ })) => {
863                    iterator
864                        .into_stream_and_control_handle()
865                        .1
866                        .shutdown_with_epitaph(Status::ADDRESS_IN_USE);
867                }
868                r => panic!("Unexpected request: {:?}", r),
869            }
870            self
871        }
872
873        #[cfg(target_os = "fuchsia")]
874        async fn expect_abort(mut self) -> Self {
875            match self.stream.next().await {
876                Some(Ok(NeededBlobsRequest::Abort { responder })) => {
877                    responder.send().unwrap();
878                }
879                r => panic!("Unexpected request: {:?}", r),
880            }
881            self
882        }
883    }
884
885    struct PackageDirProvider {
886        stream: fio::DirectoryRequestStream,
887    }
888
889    impl PackageDirProvider {
890        fn close_pkg_dir(self) {
891            self.stream.control_handle().shutdown_with_epitaph(Status::NOT_EMPTY);
892        }
893    }
894
895    fn blob_id(n: u8) -> BlobId {
896        BlobId::from([n; 32])
897    }
898
899    fn blob_info(n: u8) -> BlobInfo {
900        BlobInfo { blob_id: blob_id(n), length: 0 }
901    }
902
903    #[fuchsia_async::run_singlethreaded(test)]
904    async fn constructor() {
905        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<PackageCacheMarker>();
906        let client = Client::from_proxy(proxy);
907
908        drop(stream);
909        assert_matches!(client.proxy().sync().await, Err(_));
910    }
911
912    #[fuchsia_async::run_singlethreaded(test)]
913    async fn get_present_package() {
914        let (client, mut server) = MockPackageCache::new();
915
916        let ((), ()) = future::join(
917            async {
918                server
919                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
920                    .await
921                    .finish()
922                    .close_pkg_dir();
923                server.expect_closed().await;
924            },
925            async move {
926                let mut get =
927                    client.get(blob_info(2), fpkg::GcProtection::OpenPackageTracking).unwrap();
928
929                assert_matches!(get.open_meta_blob().await.unwrap(), None);
930                assert_eq!(get.get_missing_blobs().try_concat().await.unwrap(), vec![]);
931                let pkg_dir = get.finish().await.unwrap();
932
933                assert_matches!(
934                    pkg_dir.into_proxy().take_event_stream().next().await,
935                    Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
936                );
937            },
938        )
939        .await;
940    }
941
942    #[fuchsia_async::run_singlethreaded(test)]
943    async fn get_present_package_handles_slow_stream_close() {
944        let (client, mut server) = MockPackageCache::new();
945
946        let (send, recv) = futures::channel::oneshot::channel::<()>();
947
948        let ((), ()) = future::join(
949            async {
950                let (needed_blobs_stream, pkg_dir) = server
951                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
952                    .await
953                    .finish_hold_stream_open();
954                pkg_dir.close_pkg_dir();
955
956                // wait until `send` is dropped to drop the request stream.
957                let _ = recv.await;
958                drop(needed_blobs_stream);
959            },
960            async move {
961                let mut get =
962                    client.get(blob_info(2), fpkg::GcProtection::OpenPackageTracking).unwrap();
963
964                assert_matches!(get.open_meta_blob().await.unwrap(), None);
965
966                // ensure sending the request doesn't fail, then unblock closing the channel, then
967                // ensure the get_missing_blobs call detects the closed iterator as success instead
968                // of a PEER_CLOSED error.
969                let missing_blobs_stream = get.get_missing_blobs();
970                drop(send);
971                assert_eq!(missing_blobs_stream.try_concat().await.unwrap(), vec![]);
972                let pkg_dir = get.finish().await.unwrap();
973
974                assert_matches!(
975                    pkg_dir.into_proxy().take_event_stream().next().await,
976                    Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
977                );
978            },
979        )
980        .await;
981    }
982
983    #[fuchsia_async::run_singlethreaded(test)]
984    async fn needed_blobs_open_meta_far() {
985        let (mut get, pending_get) = PendingGet::new().await;
986
987        let ((), ()) = future::join(
988            async {
989                pending_get
990                    .expect_open_meta_blob(Ok(None))
991                    .await
992                    .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
993                    .await
994                    .expect_open_meta_blob(Ok(None))
995                    .await
996                    .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
997                    .await
998                    .expect_open_meta_blob(Err(fpkg::OpenBlobError::OutOfSpace))
999                    .await
1000                    .expect_open_meta_blob(Err(fpkg::OpenBlobError::UnspecifiedIo))
1001                    .await;
1002            },
1003            async {
1004                {
1005                    let opener = get.make_open_meta_blob();
1006                    assert_matches!(opener.open().await.unwrap(), None);
1007                    assert_matches!(opener.open().await.unwrap(), Some(_));
1008                }
1009                assert_matches!(get.open_meta_blob().await.unwrap(), None);
1010                assert_matches!(get.open_meta_blob().await.unwrap(), Some(_));
1011                assert_matches!(get.open_meta_blob().await, Err(OpenBlobError::OutOfSpace));
1012                assert_matches!(get.open_meta_blob().await, Err(OpenBlobError::UnspecifiedIo));
1013            },
1014        )
1015        .await;
1016    }
1017
1018    #[fuchsia_async::run_singlethreaded(test)]
1019    async fn needed_blobs_open_content_blob() {
1020        let (mut get, pending_get) = PendingGet::new().await;
1021
1022        let ((), ()) = future::join(
1023            async {
1024                pending_get
1025                    .expect_open_blob(blob_id(2), Ok(None))
1026                    .await
1027                    .expect_open_blob(blob_id(2), Ok(Some(fidl::endpoints::create_endpoints().0)))
1028                    .await
1029                    .expect_open_blob(blob_id(10), Ok(None))
1030                    .await
1031                    .expect_open_blob(blob_id(11), Ok(Some(fidl::endpoints::create_endpoints().0)))
1032                    .await
1033                    .expect_open_blob(blob_id(12), Err(fpkg::OpenBlobError::OutOfSpace))
1034                    .await
1035                    .expect_open_blob(blob_id(13), Err(fpkg::OpenBlobError::UnspecifiedIo))
1036                    .await;
1037            },
1038            async {
1039                {
1040                    let opener = get.make_open_blob(blob_id(2));
1041                    assert_matches!(opener.open().await.unwrap(), None);
1042                    assert_matches!(opener.open().await.unwrap(), Some(_));
1043                }
1044                assert_matches!(get.open_blob(blob_id(10),).await.unwrap(), None);
1045                assert_matches!(get.open_blob(blob_id(11),).await.unwrap(), Some(_));
1046                assert_matches!(get.open_blob(blob_id(12),).await, Err(OpenBlobError::OutOfSpace));
1047                assert_matches!(
1048                    get.open_blob(blob_id(13),).await,
1049                    Err(OpenBlobError::UnspecifiedIo)
1050                );
1051            },
1052        )
1053        .await;
1054    }
1055
1056    #[fuchsia_async::run_singlethreaded(test)]
1057    async fn needed_blobs_get_missing_blobs_on_closed_ok() {
1058        let (mut get, pending_get) = PendingGet::new().await;
1059        let _ = pending_get.finish();
1060
1061        assert_matches!(get.open_meta_blob().await, Ok(None));
1062        assert_eq!(get.get_missing_blobs().try_concat().await.unwrap(), vec![]);
1063    }
1064
1065    #[fuchsia_async::run_singlethreaded(test)]
1066    async fn needed_blobs_get_missing_blobs() {
1067        let (mut get, pending_get) = PendingGet::new().await;
1068
1069        let ((), ()) = future::join(
1070            async {
1071                pending_get
1072                    .expect_get_missing_blobs(vec![
1073                        vec![blob_info(1), blob_info(2)],
1074                        vec![blob_info(3)],
1075                    ])
1076                    .await;
1077            },
1078            async {
1079                assert_eq!(
1080                    get.get_missing_blobs().try_concat().await.unwrap(),
1081                    vec![blob_info(1), blob_info(2), blob_info(3)]
1082                );
1083            },
1084        )
1085        .await;
1086    }
1087
1088    #[fuchsia_async::run_singlethreaded(test)]
1089    async fn needed_blobs_get_missing_blobs_fail_to_obtain_iterator() {
1090        let (mut get, pending_get) = PendingGet::new().await;
1091        drop(pending_get);
1092
1093        assert_matches!(
1094            get.get_missing_blobs().try_concat().await,
1095            Err(ListMissingBlobsError::CallNextOnBlobIterator(
1096                fidl::Error::ClientChannelClosed{status, ..})
1097            )
1098                if status == Status::PEER_CLOSED
1099        );
1100    }
1101
1102    #[fuchsia_async::run_singlethreaded(test)]
1103    async fn needed_blobs_get_missing_blobs_iterator_contains_error() {
1104        let (mut get, pending_get) = PendingGet::new().await;
1105
1106        let (_, ()) =
1107            future::join(pending_get.expect_get_missing_blobs_inject_iterator_error(), async {
1108                assert_matches!(
1109                    get.get_missing_blobs().try_concat().await,
1110                    Err(ListMissingBlobsError::CallNextOnBlobIterator(
1111                        fidl::Error::ClientChannelClosed{status, ..}
1112                    ))
1113                        if status == Status::ADDRESS_IN_USE
1114                );
1115            })
1116            .await;
1117    }
1118
1119    #[cfg(target_os = "fuchsia")]
1120    #[test]
1121    fn needed_blobs_abort() {
1122        use futures::future::Either;
1123        use futures::pin_mut;
1124        use std::task::Poll;
1125
1126        let mut executor = fuchsia_async::TestExecutor::new_with_fake_time();
1127
1128        let fut = async {
1129            let (get, pending_get) = PendingGet::new().await;
1130
1131            let abort_fut = get.abort().boxed();
1132            let expect_abort_fut = pending_get.expect_abort();
1133            pin_mut!(expect_abort_fut);
1134
1135            match futures::future::select(abort_fut, expect_abort_fut).await {
1136                Either::Left(((), _expect_abort_fut)) => {
1137                    panic!("abort should wait for the get future to complete")
1138                }
1139                Either::Right((pending_get, abort_fut)) => (abort_fut, pending_get),
1140            }
1141        };
1142        pin_mut!(fut);
1143
1144        let (mut abort_fut, pending_get) = match executor.run_until_stalled(&mut fut) {
1145            Poll::Pending => panic!("should complete"),
1146            Poll::Ready((abort_fut, pending_get)) => (abort_fut, pending_get),
1147        };
1148
1149        // NeededBlobs.Abort should wait until PackageCache.Get returns
1150        assert_matches!(executor.run_until_stalled(&mut abort_fut), Poll::Pending);
1151        pending_get.fail_the_get();
1152        assert_matches!(executor.run_until_stalled(&mut abort_fut), Poll::Ready(()));
1153    }
1154
1155    struct MockNeededBlob {
1156        blob: fio::FileRequestStream,
1157        needed_blobs: fpkg::NeededBlobsRequestStream,
1158    }
1159
1160    impl MockNeededBlob {
1161        fn mock_hash() -> BlobId {
1162            [7; 32].into()
1163        }
1164
1165        fn new() -> (NeededBlob, Self) {
1166            let (blob_proxy, blob) = fidl::endpoints::create_proxy_and_stream::<fio::FileMarker>();
1167            let (needed_blobs_proxy, needed_blobs) =
1168                fidl::endpoints::create_proxy_and_stream::<fpkg::NeededBlobsMarker>();
1169            (
1170                NeededBlob {
1171                    blob: Blob {
1172                        writer: Box::new(Clone::clone(&blob_proxy)),
1173                        needed_blobs: needed_blobs_proxy,
1174                        blob_id: Self::mock_hash(),
1175                        state: NeedsTruncate,
1176                    },
1177                    closer: BlobCloser { closer: Box::new(blob_proxy), closed: false },
1178                },
1179                Self { blob, needed_blobs },
1180            )
1181        }
1182
1183        async fn fail_truncate(mut self) -> Self {
1184            match self.blob.next().await {
1185                Some(Ok(fio::FileRequest::Resize { length: _, responder })) => {
1186                    responder.send(Err(Status::NO_SPACE.into_raw())).unwrap();
1187                }
1188                r => panic!("Unexpected request: {:?}", r),
1189            }
1190            self
1191        }
1192
1193        async fn expect_truncate(mut self, expected_length: u64) -> Self {
1194            match self.blob.next().await {
1195                Some(Ok(fio::FileRequest::Resize { length, responder })) => {
1196                    assert_eq!(length, expected_length);
1197                    responder.send(Ok(())).unwrap();
1198                }
1199                r => panic!("Unexpected request: {:?}", r),
1200            }
1201            self
1202        }
1203
1204        async fn fail_write(mut self) -> Self {
1205            match self.blob.next().await {
1206                Some(Ok(fio::FileRequest::Write { data: _, responder })) => {
1207                    responder.send(Err(Status::NO_SPACE.into_raw())).unwrap();
1208                }
1209                r => panic!("Unexpected request: {:?}", r),
1210            }
1211            self
1212        }
1213
1214        async fn expect_write(mut self, expected_payload: &[u8]) -> Self {
1215            match self.blob.next().await {
1216                Some(Ok(fio::FileRequest::Write { data, responder })) => {
1217                    assert_eq!(data, expected_payload);
1218                    responder.send(Ok(data.len() as u64)).unwrap();
1219                }
1220                r => panic!("Unexpected request: {:?}", r),
1221            }
1222            self
1223        }
1224
1225        async fn expect_write_partial(
1226            mut self,
1227            expected_payload: &[u8],
1228            bytes_to_consume: u64,
1229        ) -> Self {
1230            match self.blob.next().await {
1231                Some(Ok(fio::FileRequest::Write { data, responder })) => {
1232                    assert_eq!(data, expected_payload);
1233                    responder.send(Ok(bytes_to_consume)).unwrap();
1234                }
1235                r => panic!("Unexpected request: {:?}", r),
1236            }
1237            self
1238        }
1239
1240        async fn expect_close(mut self) {
1241            match self.blob.next().await {
1242                Some(Ok(fio::FileRequest::Close { responder })) => {
1243                    responder.send(Ok(())).unwrap();
1244                }
1245                r => panic!("Unexpected request: {:?}", r),
1246            }
1247        }
1248
1249        async fn expect_blob_written(mut self) -> Self {
1250            match self.needed_blobs.next().await {
1251                Some(Ok(fpkg::NeededBlobsRequest::BlobWritten { blob_id, responder })) => {
1252                    assert_eq!(blob_id, Self::mock_hash().into());
1253                    responder.send(Ok(())).unwrap();
1254                }
1255                r => panic!("Unexpected request: {:?}", r),
1256            }
1257            self
1258        }
1259    }
1260
1261    #[fuchsia_async::run_singlethreaded(test)]
1262    async fn empty_blob_write() {
1263        let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1264
1265        let ((), ()) = future::join(
1266            async {
1267                blob_server
1268                    .expect_truncate(0)
1269                    .await
1270                    .expect_blob_written()
1271                    .await
1272                    .expect_close()
1273                    .await;
1274            },
1275            async {
1276                let blob = match blob.truncate(0).await.unwrap() {
1277                    TruncateBlobSuccess::AllWritten(blob) => blob,
1278                    other => panic!("empty blob shouldn't need bytes {other:?}"),
1279                };
1280                let () = blob.blob_written().await.unwrap();
1281                closer.close().await;
1282            },
1283        )
1284        .await;
1285    }
1286
1287    impl TruncateBlobSuccess {
1288        fn unwrap_needs_data(self) -> Blob<NeedsData> {
1289            match self {
1290                TruncateBlobSuccess::NeedsData(blob) => blob,
1291                TruncateBlobSuccess::AllWritten(_) => panic!("blob should need data"),
1292            }
1293        }
1294    }
1295
1296    impl BlobWriteSuccess {
1297        fn unwrap_needs_data(self) -> Blob<NeedsData> {
1298            match self {
1299                BlobWriteSuccess::NeedsData(blob) => blob,
1300                BlobWriteSuccess::AllWritten(_) => panic!("blob should need data"),
1301            }
1302        }
1303
1304        fn unwrap_all_written(self) -> Blob<NeedsBlobWritten> {
1305            match self {
1306                BlobWriteSuccess::NeedsData(_) => panic!("blob should be completely written"),
1307                BlobWriteSuccess::AllWritten(blob) => blob,
1308            }
1309        }
1310    }
1311
1312    #[fuchsia_async::run_singlethreaded(test)]
1313    async fn small_blob_write() {
1314        let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1315
1316        let ((), ()) = future::join(
1317            async {
1318                blob_server
1319                    .expect_truncate(4)
1320                    .await
1321                    .expect_write(b"test")
1322                    .await
1323                    .expect_blob_written()
1324                    .await
1325                    .expect_close()
1326                    .await;
1327            },
1328            async {
1329                let blob = blob.truncate(4).await.unwrap().unwrap_needs_data();
1330                let blob = blob.write(b"test").await.unwrap().unwrap_all_written();
1331                let () = blob.blob_written().await.unwrap();
1332                closer.close().await;
1333            },
1334        )
1335        .await;
1336    }
1337
1338    #[fuchsia_async::run_singlethreaded(test)]
1339    async fn blob_truncate_no_space() {
1340        let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1341
1342        let ((), ()) = future::join(
1343            async {
1344                blob_server.fail_truncate().await;
1345            },
1346            async {
1347                assert_matches!(blob.truncate(4).await, Err(TruncateBlobError::NoSpace));
1348                closer.close().await;
1349            },
1350        )
1351        .await;
1352    }
1353
1354    #[fuchsia_async::run_singlethreaded(test)]
1355    async fn blob_write_no_space() {
1356        let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1357
1358        let ((), ()) = future::join(
1359            async {
1360                blob_server.expect_truncate(4).await.fail_write().await;
1361            },
1362            async {
1363                let blob = blob.truncate(4).await.unwrap().unwrap_needs_data();
1364                assert_matches!(blob.write(b"test").await, Err(WriteBlobError::NoSpace));
1365                closer.close().await;
1366            },
1367        )
1368        .await;
1369    }
1370
1371    #[fuchsia_async::run_singlethreaded(test)]
1372    async fn blob_write_server_partial_write() {
1373        let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1374
1375        let ((), ()) = future::join(
1376            async {
1377                blob_server
1378                    .expect_truncate(6)
1379                    .await
1380                    .expect_write_partial(b"abc123", 3)
1381                    .await
1382                    .expect_write(b"123")
1383                    .await
1384                    .expect_blob_written()
1385                    .await;
1386            },
1387            async {
1388                let blob = blob.truncate(6).await.unwrap().unwrap_needs_data();
1389                let blob = blob.write(b"abc123").await.unwrap().unwrap_all_written();
1390                let () = blob.blob_written().await.unwrap();
1391                closer.close().await;
1392            },
1393        )
1394        .await;
1395    }
1396
1397    #[fuchsia_async::run_singlethreaded(test)]
1398    async fn blob_write_client_partial_write() {
1399        let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1400
1401        let ((), ()) = future::join(
1402            async {
1403                blob_server
1404                    .expect_truncate(6)
1405                    .await
1406                    .expect_write(b"abc")
1407                    .await
1408                    .expect_write(b"123")
1409                    .await
1410                    .expect_blob_written()
1411                    .await;
1412            },
1413            async {
1414                let blob = blob.truncate(6).await.unwrap().unwrap_needs_data();
1415                let blob = blob.write(b"abc").await.unwrap().unwrap_needs_data();
1416                let blob = blob.write(b"123").await.unwrap().unwrap_all_written();
1417                let () = blob.blob_written().await.unwrap();
1418                closer.close().await;
1419            },
1420        )
1421        .await;
1422    }
1423
1424    #[fuchsia_async::run_singlethreaded(test)]
1425    async fn blob_write_chunkize_payload() {
1426        const CHUNK_SIZE: usize = fio::MAX_BUF as usize;
1427
1428        let (NeededBlob { blob, closer }, blob_server) = MockNeededBlob::new();
1429
1430        let ((), ()) = future::join(
1431            async {
1432                blob_server
1433                    .expect_truncate(3 * CHUNK_SIZE as u64)
1434                    .await
1435                    .expect_write(&[0u8; CHUNK_SIZE])
1436                    .await
1437                    .expect_write(&[1u8; CHUNK_SIZE])
1438                    .await
1439                    .expect_write(&[2u8; CHUNK_SIZE])
1440                    .await
1441                    .expect_blob_written()
1442                    .await;
1443            },
1444            async {
1445                let payload =
1446                    (0..3).flat_map(|n| std::iter::repeat(n).take(CHUNK_SIZE)).collect::<Vec<u8>>();
1447                let blob = blob.truncate(3 * CHUNK_SIZE as u64).await.unwrap().unwrap_needs_data();
1448                let blob = blob.write(&payload).await.unwrap().unwrap_all_written();
1449                let () = blob.blob_written().await.unwrap();
1450                closer.close().await;
1451            },
1452        )
1453        .await;
1454    }
1455
1456    #[fuchsia_async::run_singlethreaded(test)]
1457    async fn get_already_cached_success() {
1458        let (client, mut server) = MockPackageCache::new();
1459
1460        let ((), ()) = future::join(
1461            async {
1462                server
1463                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1464                    .await
1465                    .finish()
1466                    .close_pkg_dir();
1467                server.expect_closed().await;
1468            },
1469            async move {
1470                let pkg_dir = client.get_already_cached(blob_id(2)).await.unwrap();
1471
1472                assert_matches!(
1473                    pkg_dir.into_proxy().take_event_stream().next().await,
1474                    Some(Err(fidl::Error::ClientChannelClosed { status: Status::NOT_EMPTY, .. }))
1475                );
1476            },
1477        )
1478        .await;
1479    }
1480
1481    #[fuchsia_async::run_singlethreaded(test)]
1482    async fn get_already_cached_missing_meta_far() {
1483        let (client, mut server) = MockPackageCache::new();
1484
1485        let ((), ()) = future::join(
1486            async {
1487                server
1488                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1489                    .await
1490                    .expect_open_meta_blob(Ok(Some(fidl::endpoints::create_endpoints().0)))
1491                    .await;
1492            },
1493            async move {
1494                assert_matches!(
1495                    client.get_already_cached(blob_id(2)).await,
1496                    Err(GetAlreadyCachedError::MissingMetaFar)
1497                );
1498            },
1499        )
1500        .await;
1501    }
1502
1503    #[fuchsia_async::run_singlethreaded(test)]
1504    async fn get_already_cached_missing_content_blob() {
1505        let (client, mut server) = MockPackageCache::new();
1506
1507        let ((), ()) = future::join(
1508            async {
1509                server
1510                    .expect_get(blob_info(2), fpkg::GcProtection::OpenPackageTracking)
1511                    .await
1512                    .expect_open_meta_blob(Ok(None))
1513                    .await
1514                    .expect_get_missing_blobs_client_closes_channel(vec![vec![BlobInfo {
1515                        blob_id: [0; 32].into(),
1516                        length: 0,
1517                    }]])
1518                    .await;
1519            },
1520            async move {
1521                assert_matches!(
1522                    client.get_already_cached(blob_id(2)).await,
1523                    Err(GetAlreadyCachedError::MissingContentBlobs(v))
1524                        if v == vec![BlobInfo {
1525                            blob_id: [0; 32].into(),
1526                            length: 0,
1527                        }]
1528                );
1529            },
1530        )
1531        .await;
1532    }
1533}