fidl_fuchsia_pkg_ext/cache/
storage.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{OpenBlobError, TruncateBlobError, WriteBlobError};
6use anyhow::Context as _;
7use zx_status::Status;
8use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio, fidl_fuchsia_pkg as fpkg};
9
10pub(super) fn into_blob_writer_and_closer(
11    fidl: fpkg::BlobWriter,
12) -> Result<(Box<dyn Writer>, Box<dyn Closer>), OpenBlobError> {
13    use fpkg::BlobWriter::*;
14    match fidl {
15        File(file) => {
16            let proxy = file.into_proxy();
17            Ok((Box::new(Clone::clone(&proxy)), Box::new(proxy)))
18        }
19        Writer(writer) => {
20            // fuchsia.fxfs/BlobCreator allows concurrent creation attempts, so we don't need to
21            // cancel an ongoing attempt before trying again.
22            Ok((Box::new(FxBlob::new(writer.into_proxy())), Box::new(())))
23        }
24    }
25}
26
27#[async_trait::async_trait]
28pub(super) trait Closer: Send + Sync + std::fmt::Debug {
29    /// Close the blob to enable immediate retry of create and write.
30    async fn close(&mut self);
31
32    /// Attempt to close the blob. Function may return before blob is closed if closing requires
33    /// async.
34    fn best_effort_close(&mut self);
35}
36
37#[async_trait::async_trait]
38impl Closer for fio::FileProxy {
39    async fn close(&mut self) {
40        let _: Result<Result<(), i32>, fidl::Error> = fio::FileProxy::close(self).await;
41    }
42
43    fn best_effort_close(&mut self) {
44        let _: fidl::client::QueryResponseFut<Result<(), i32>> = fio::FileProxy::close(self);
45    }
46}
47
48// fuchsia.fxfs/BlobCreator allows concurrent creation attempts, so we don't need to cancel an
49// ongoing attempt before trying again.
50#[async_trait::async_trait]
51impl Closer for () {
52    async fn close(&mut self) {}
53
54    fn best_effort_close(&mut self) {}
55}
56
57#[async_trait::async_trait]
58pub(super) trait Writer: Send + std::fmt::Debug {
59    /// Set the size of the blob.
60    /// If the blob is size zero, the returned Future should not complete until the blob
61    /// is readable.
62    async fn truncate(&mut self, size: u64) -> Result<(), TruncateBlobError>;
63    /// Write `bytes` to the blob.
64    /// The Future returned by the `write` call that writes the final bytes should
65    /// not complete until the blob is readable.
66    async fn write(
67        &mut self,
68        bytes: &[u8],
69        after_write: &(dyn Fn(u64) + Send + Sync),
70        after_write_ack: &(dyn Fn() + Send + Sync),
71    ) -> Result<(), WriteBlobError>;
72}
73
74#[async_trait::async_trait]
75impl Writer for fio::FileProxy {
76    async fn truncate(&mut self, size: u64) -> Result<(), TruncateBlobError> {
77        self.resize(size).await?.map_err(|i| match Status::from_raw(i) {
78            Status::NO_SPACE => TruncateBlobError::NoSpace,
79            other => TruncateBlobError::UnexpectedResponse(other),
80        })
81    }
82
83    async fn write(
84        &mut self,
85        mut bytes: &[u8],
86        after_write: &(dyn Fn(u64) + Send + Sync),
87        after_write_ack: &(dyn Fn() + Send + Sync),
88    ) -> Result<(), WriteBlobError> {
89        while !bytes.is_empty() {
90            let limit = bytes.len().min(fio::MAX_BUF as usize);
91
92            let result_fut = fio::FileProxy::write(self, &bytes[..limit]);
93            after_write(bytes.len() as u64);
94
95            let result = result_fut.await;
96            after_write_ack();
97
98            let written = result?.map_err(|i| match Status::from_raw(i) {
99                Status::IO_DATA_INTEGRITY => WriteBlobError::Corrupt,
100                Status::NO_SPACE => WriteBlobError::NoSpace,
101                other => WriteBlobError::UnexpectedResponse(other),
102            })? as usize;
103
104            if written > bytes.len() {
105                return Err(WriteBlobError::Overwrite);
106            }
107            bytes = &bytes[written..];
108        }
109
110        Ok(())
111    }
112}
113
114#[allow(clippy::large_enum_variant)] // TODO(https://fxbug.dev/401087293)
115#[derive(Debug)]
116enum FxBlob {
117    NeedsTruncate(ffxfs::BlobWriterProxy),
118    NeedsBytes(blob_writer::BlobWriter),
119    Invalid,
120}
121
122impl FxBlob {
123    fn new(proxy: ffxfs::BlobWriterProxy) -> Self {
124        Self::NeedsTruncate(proxy)
125    }
126
127    fn state_str(&self) -> &'static str {
128        match self {
129            Self::NeedsTruncate(_) => "needs truncate",
130            Self::NeedsBytes(_) => "needs bytes",
131            Self::Invalid => "invalid",
132        }
133    }
134}
135
136#[async_trait::async_trait]
137impl Writer for FxBlob {
138    async fn truncate(&mut self, size: u64) -> Result<(), TruncateBlobError> {
139        *self = match std::mem::replace(self, Self::Invalid) {
140            Self::NeedsTruncate(proxy) => Self::NeedsBytes(
141                blob_writer::BlobWriter::create(proxy, size)
142                    .await
143                    .context("creating a BlobWriter")
144                    .map_err(TruncateBlobError::Other)?,
145            ),
146            Self::NeedsBytes(_) => {
147                return Err(TruncateBlobError::AlreadyTruncated(self.state_str()))
148            }
149            Self::Invalid => return Err(TruncateBlobError::BadState),
150        };
151        Ok(())
152    }
153
154    async fn write(
155        &mut self,
156        bytes: &[u8],
157        after_write: &(dyn Fn(u64) + Send + Sync),
158        after_write_ack: &(dyn Fn() + Send + Sync),
159    ) -> Result<(), WriteBlobError> {
160        let Self::NeedsBytes(writer) = self else {
161            return Err(WriteBlobError::BytesNotNeeded(self.state_str()));
162        };
163        let fut = writer.write(bytes);
164        let () = after_write(bytes.len() as u64);
165        let res = fut.await;
166        let () = after_write_ack();
167        res.map_err(|e| match e {
168            e @ blob_writer::WriteError::BytesReady(s) => match s {
169                Status::IO_DATA_INTEGRITY => WriteBlobError::Corrupt,
170                Status::NO_SPACE => WriteBlobError::NoSpace,
171                _ => WriteBlobError::FxBlob(e),
172            },
173            e => WriteBlobError::FxBlob(e),
174        })
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181    use futures::stream::TryStreamExt as _;
182
183    #[fuchsia_async::run_singlethreaded(test)]
184    async fn file_proxy_chunks_writes() {
185        let (mut proxy, mut server) = fidl::endpoints::create_proxy_and_stream::<fio::FileMarker>();
186        let bytes = vec![0; fio::MAX_BUF as usize + 1];
187
188        let write_fut = async move {
189            <fio::FileProxy as Writer>::write(&mut proxy, &bytes, &|_| (), &|| ()).await.unwrap()
190        };
191        let server_fut = async move {
192            match server.try_next().await.unwrap().unwrap() {
193                fio::FileRequest::Write { data, responder } => {
194                    // Proxy limited writes to MAX_BUF bytes.
195                    assert_eq!(data, vec![0; fio::MAX_BUF as usize]);
196                    let () = responder.send(Ok(fio::MAX_BUF)).unwrap();
197                }
198                req => panic!("unexpected request {req:?}"),
199            }
200            match server.try_next().await.unwrap().unwrap() {
201                fio::FileRequest::Write { data, responder } => {
202                    assert_eq!(data, vec![0; 1]);
203                    let () = responder.send(Ok(1)).unwrap();
204                }
205                req => panic!("unexpected request {req:?}"),
206            }
207            assert!(server.try_next().await.unwrap().is_none());
208        };
209
210        let ((), ()) = futures::future::join(write_fut, server_fut).await;
211    }
212
213    #[fuchsia_async::run_singlethreaded(test)]
214    async fn file_proxy_handles_short_writes() {
215        let (mut proxy, mut server) = fidl::endpoints::create_proxy_and_stream::<fio::FileMarker>();
216        let bytes = [0; 10];
217
218        let write_fut = async move {
219            <fio::FileProxy as Writer>::write(&mut proxy, &bytes, &|_| (), &|| ()).await.unwrap()
220        };
221        let server_fut = async move {
222            match server.try_next().await.unwrap().unwrap() {
223                fio::FileRequest::Write { data, responder } => {
224                    assert_eq!(data, [0; 10]);
225                    // Ack only 8 of the 10 bytes.
226                    let () = responder.send(Ok(8)).unwrap();
227                }
228                req => panic!("unexpected request {req:?}"),
229            }
230            match server.try_next().await.unwrap().unwrap() {
231                fio::FileRequest::Write { data, responder } => {
232                    assert_eq!(data, [0; 2]);
233                    let () = responder.send(Ok(2)).unwrap();
234                }
235                req => panic!("unexpected request {req:?}"),
236            }
237            assert!(server.try_next().await.unwrap().is_none());
238        };
239
240        let ((), ()) = futures::future::join(write_fut, server_fut).await;
241    }
242
243    #[fuchsia_async::run_singlethreaded(test)]
244    async fn fxblob_writer() {
245        let blobfs = blobfs_ramdisk::BlobfsRamdisk::builder().fxblob().start().await.unwrap();
246        assert_eq!(blobfs.list_blobs().unwrap(), std::collections::BTreeSet::new());
247        let contents = [0u8; 7];
248        let hash = fuchsia_merkle::from_slice(&contents).root();
249        let compressed = delivery_blob::Type1Blob::generate(
250            &contents[..],
251            delivery_blob::CompressionMode::Attempt,
252        );
253        let writer = blobfs
254            .blob_creator_proxy()
255            .unwrap()
256            .unwrap()
257            .create(&hash.into(), false)
258            .await
259            .unwrap()
260            .unwrap();
261
262        let (mut writer, _closer) =
263            into_blob_writer_and_closer(fpkg::BlobWriter::Writer(writer)).unwrap();
264        let () = writer.truncate(compressed.len().try_into().unwrap()).await.unwrap();
265        let () = writer.write(&compressed, &|_| (), &|| ()).await.unwrap();
266
267        assert_eq!(blobfs.list_blobs().unwrap(), std::collections::BTreeSet::from([hash]));
268
269        let () = blobfs.stop().await.unwrap();
270    }
271}