fidl_fuchsia_pkg_ext/cache/
storage.rs
1use super::{OpenBlobError, TruncateBlobError, WriteBlobError};
6use anyhow::Context as _;
7use zx_status::Status;
8use {fidl_fuchsia_fxfs as ffxfs, fidl_fuchsia_io as fio, fidl_fuchsia_pkg as fpkg};
9
10pub(super) fn into_blob_writer_and_closer(
11 fidl: fpkg::BlobWriter,
12) -> Result<(Box<dyn Writer>, Box<dyn Closer>), OpenBlobError> {
13 use fpkg::BlobWriter::*;
14 match fidl {
15 File(file) => {
16 let proxy = file.into_proxy();
17 Ok((Box::new(Clone::clone(&proxy)), Box::new(proxy)))
18 }
19 Writer(writer) => {
20 Ok((Box::new(FxBlob::new(writer.into_proxy())), Box::new(())))
23 }
24 }
25}
26
27#[async_trait::async_trait]
28pub(super) trait Closer: Send + Sync + std::fmt::Debug {
29 async fn close(&mut self);
31
32 fn best_effort_close(&mut self);
35}
36
37#[async_trait::async_trait]
38impl Closer for fio::FileProxy {
39 async fn close(&mut self) {
40 let _: Result<Result<(), i32>, fidl::Error> = fio::FileProxy::close(self).await;
41 }
42
43 fn best_effort_close(&mut self) {
44 let _: fidl::client::QueryResponseFut<Result<(), i32>> = fio::FileProxy::close(self);
45 }
46}
47
48#[async_trait::async_trait]
51impl Closer for () {
52 async fn close(&mut self) {}
53
54 fn best_effort_close(&mut self) {}
55}
56
57#[async_trait::async_trait]
58pub(super) trait Writer: Send + std::fmt::Debug {
59 async fn truncate(&mut self, size: u64) -> Result<(), TruncateBlobError>;
63 async fn write(
67 &mut self,
68 bytes: &[u8],
69 after_write: &(dyn Fn(u64) + Send + Sync),
70 after_write_ack: &(dyn Fn() + Send + Sync),
71 ) -> Result<(), WriteBlobError>;
72}
73
74#[async_trait::async_trait]
75impl Writer for fio::FileProxy {
76 async fn truncate(&mut self, size: u64) -> Result<(), TruncateBlobError> {
77 self.resize(size).await?.map_err(|i| match Status::from_raw(i) {
78 Status::NO_SPACE => TruncateBlobError::NoSpace,
79 other => TruncateBlobError::UnexpectedResponse(other),
80 })
81 }
82
83 async fn write(
84 &mut self,
85 mut bytes: &[u8],
86 after_write: &(dyn Fn(u64) + Send + Sync),
87 after_write_ack: &(dyn Fn() + Send + Sync),
88 ) -> Result<(), WriteBlobError> {
89 while !bytes.is_empty() {
90 let limit = bytes.len().min(fio::MAX_BUF as usize);
91
92 let result_fut = fio::FileProxy::write(self, &bytes[..limit]);
93 after_write(bytes.len() as u64);
94
95 let result = result_fut.await;
96 after_write_ack();
97
98 let written = result?.map_err(|i| match Status::from_raw(i) {
99 Status::IO_DATA_INTEGRITY => WriteBlobError::Corrupt,
100 Status::NO_SPACE => WriteBlobError::NoSpace,
101 other => WriteBlobError::UnexpectedResponse(other),
102 })? as usize;
103
104 if written > bytes.len() {
105 return Err(WriteBlobError::Overwrite);
106 }
107 bytes = &bytes[written..];
108 }
109
110 Ok(())
111 }
112}
113
114#[allow(clippy::large_enum_variant)] #[derive(Debug)]
116enum FxBlob {
117 NeedsTruncate(ffxfs::BlobWriterProxy),
118 NeedsBytes(blob_writer::BlobWriter),
119 Invalid,
120}
121
122impl FxBlob {
123 fn new(proxy: ffxfs::BlobWriterProxy) -> Self {
124 Self::NeedsTruncate(proxy)
125 }
126
127 fn state_str(&self) -> &'static str {
128 match self {
129 Self::NeedsTruncate(_) => "needs truncate",
130 Self::NeedsBytes(_) => "needs bytes",
131 Self::Invalid => "invalid",
132 }
133 }
134}
135
136#[async_trait::async_trait]
137impl Writer for FxBlob {
138 async fn truncate(&mut self, size: u64) -> Result<(), TruncateBlobError> {
139 *self = match std::mem::replace(self, Self::Invalid) {
140 Self::NeedsTruncate(proxy) => Self::NeedsBytes(
141 blob_writer::BlobWriter::create(proxy, size)
142 .await
143 .context("creating a BlobWriter")
144 .map_err(TruncateBlobError::Other)?,
145 ),
146 Self::NeedsBytes(_) => {
147 return Err(TruncateBlobError::AlreadyTruncated(self.state_str()))
148 }
149 Self::Invalid => return Err(TruncateBlobError::BadState),
150 };
151 Ok(())
152 }
153
154 async fn write(
155 &mut self,
156 bytes: &[u8],
157 after_write: &(dyn Fn(u64) + Send + Sync),
158 after_write_ack: &(dyn Fn() + Send + Sync),
159 ) -> Result<(), WriteBlobError> {
160 let Self::NeedsBytes(writer) = self else {
161 return Err(WriteBlobError::BytesNotNeeded(self.state_str()));
162 };
163 let fut = writer.write(bytes);
164 let () = after_write(bytes.len() as u64);
165 let res = fut.await;
166 let () = after_write_ack();
167 res.map_err(|e| match e {
168 e @ blob_writer::WriteError::BytesReady(s) => match s {
169 Status::IO_DATA_INTEGRITY => WriteBlobError::Corrupt,
170 Status::NO_SPACE => WriteBlobError::NoSpace,
171 _ => WriteBlobError::FxBlob(e),
172 },
173 e => WriteBlobError::FxBlob(e),
174 })
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181 use futures::stream::TryStreamExt as _;
182
183 #[fuchsia_async::run_singlethreaded(test)]
184 async fn file_proxy_chunks_writes() {
185 let (mut proxy, mut server) = fidl::endpoints::create_proxy_and_stream::<fio::FileMarker>();
186 let bytes = vec![0; fio::MAX_BUF as usize + 1];
187
188 let write_fut = async move {
189 <fio::FileProxy as Writer>::write(&mut proxy, &bytes, &|_| (), &|| ()).await.unwrap()
190 };
191 let server_fut = async move {
192 match server.try_next().await.unwrap().unwrap() {
193 fio::FileRequest::Write { data, responder } => {
194 assert_eq!(data, vec![0; fio::MAX_BUF as usize]);
196 let () = responder.send(Ok(fio::MAX_BUF)).unwrap();
197 }
198 req => panic!("unexpected request {req:?}"),
199 }
200 match server.try_next().await.unwrap().unwrap() {
201 fio::FileRequest::Write { data, responder } => {
202 assert_eq!(data, vec![0; 1]);
203 let () = responder.send(Ok(1)).unwrap();
204 }
205 req => panic!("unexpected request {req:?}"),
206 }
207 assert!(server.try_next().await.unwrap().is_none());
208 };
209
210 let ((), ()) = futures::future::join(write_fut, server_fut).await;
211 }
212
213 #[fuchsia_async::run_singlethreaded(test)]
214 async fn file_proxy_handles_short_writes() {
215 let (mut proxy, mut server) = fidl::endpoints::create_proxy_and_stream::<fio::FileMarker>();
216 let bytes = [0; 10];
217
218 let write_fut = async move {
219 <fio::FileProxy as Writer>::write(&mut proxy, &bytes, &|_| (), &|| ()).await.unwrap()
220 };
221 let server_fut = async move {
222 match server.try_next().await.unwrap().unwrap() {
223 fio::FileRequest::Write { data, responder } => {
224 assert_eq!(data, [0; 10]);
225 let () = responder.send(Ok(8)).unwrap();
227 }
228 req => panic!("unexpected request {req:?}"),
229 }
230 match server.try_next().await.unwrap().unwrap() {
231 fio::FileRequest::Write { data, responder } => {
232 assert_eq!(data, [0; 2]);
233 let () = responder.send(Ok(2)).unwrap();
234 }
235 req => panic!("unexpected request {req:?}"),
236 }
237 assert!(server.try_next().await.unwrap().is_none());
238 };
239
240 let ((), ()) = futures::future::join(write_fut, server_fut).await;
241 }
242
243 #[fuchsia_async::run_singlethreaded(test)]
244 async fn fxblob_writer() {
245 let blobfs = blobfs_ramdisk::BlobfsRamdisk::builder().fxblob().start().await.unwrap();
246 assert_eq!(blobfs.list_blobs().unwrap(), std::collections::BTreeSet::new());
247 let contents = [0u8; 7];
248 let hash = fuchsia_merkle::from_slice(&contents).root();
249 let compressed = delivery_blob::Type1Blob::generate(
250 &contents[..],
251 delivery_blob::CompressionMode::Attempt,
252 );
253 let writer = blobfs
254 .blob_creator_proxy()
255 .unwrap()
256 .unwrap()
257 .create(&hash.into(), false)
258 .await
259 .unwrap()
260 .unwrap();
261
262 let (mut writer, _closer) =
263 into_blob_writer_and_closer(fpkg::BlobWriter::Writer(writer)).unwrap();
264 let () = writer.truncate(compressed.len().try_into().unwrap()).await.unwrap();
265 let () = writer.write(&compressed, &|_| (), &|| ()).await.unwrap();
266
267 assert_eq!(blobfs.list_blobs().unwrap(), std::collections::BTreeSet::from([hash]));
268
269 let () = blobfs.stop().await.unwrap();
270 }
271}