1use fidl_fuchsia_fxfs::BlobWriterProxy;
6
7use futures::future::BoxFuture;
8use futures::stream::{FuturesOrdered, StreamExt as _, TryStreamExt as _};
9
10mod errors;
11pub use errors::{CreateError, WriteError};
12
13#[derive(Debug)]
16pub struct BlobWriter {
17 blob_writer_proxy: BlobWriterProxy,
18 vmo: zx::Vmo,
19 outstanding_writes:
32 FuturesOrdered<BoxFuture<'static, Result<Result<u64, zx::Status>, fidl::Error>>>,
33 bytes_sent: u64,
35 available: u64,
38 blob_len: u64,
40 vmo_len: u64,
42}
43
44impl BlobWriter {
45 pub async fn create(
47 blob_writer_proxy: BlobWriterProxy,
48 size: u64,
49 ) -> Result<Self, CreateError> {
50 let vmo = blob_writer_proxy
51 .get_vmo(size)
52 .await
53 .map_err(CreateError::Fidl)?
54 .map_err(zx::Status::from_raw)
55 .map_err(CreateError::GetVmo)?;
56 let vmo_len = vmo.get_size().map_err(CreateError::GetSize)?;
57 Ok(BlobWriter {
58 blob_writer_proxy,
59 vmo,
60 outstanding_writes: FuturesOrdered::new(),
61 bytes_sent: 0,
62 available: vmo_len,
63 blob_len: size,
64 vmo_len,
65 })
66 }
67
68 pub async fn write(&mut self, mut bytes: &[u8]) -> Result<(), WriteError> {
80 if self.bytes_sent + bytes.len() as u64 > self.blob_len {
81 return Err(WriteError::EndOfBlob);
82 }
83 while !bytes.is_empty() {
84 debug_assert!(self.outstanding_writes.len() <= 2);
85 if self.available == 0 || self.outstanding_writes.len() == 2 {
87 let bytes_ackd = self
88 .outstanding_writes
89 .next()
90 .await
91 .ok_or_else(|| WriteError::QueueEnded)?
92 .map_err(WriteError::Fidl)?
93 .map_err(WriteError::BytesReady)?;
94 self.available += bytes_ackd;
95 }
96
97 let bytes_to_send_len = {
98 let mut bytes_to_send_len = std::cmp::min(self.available, bytes.len() as u64);
99 if self.blob_len - self.bytes_sent > self.vmo_len {
102 bytes_to_send_len = std::cmp::min(bytes_to_send_len, self.vmo_len / 2)
103 }
104 bytes_to_send_len
105 };
106
107 let (bytes_to_send, remaining_bytes) = bytes.split_at(bytes_to_send_len as usize);
108 bytes = remaining_bytes;
109
110 let vmo_index = self.bytes_sent % self.vmo_len;
111 let (bytes_to_send_before_wrap, bytes_to_send_after_wrap) = bytes_to_send
112 .split_at(std::cmp::min((self.vmo_len - vmo_index) as usize, bytes_to_send.len()));
113
114 self.vmo.write(bytes_to_send_before_wrap, vmo_index).map_err(WriteError::VmoWrite)?;
115 if !bytes_to_send_after_wrap.is_empty() {
116 self.vmo.write(bytes_to_send_after_wrap, 0).map_err(WriteError::VmoWrite)?;
117 }
118
119 let write_fut = self.blob_writer_proxy.bytes_ready(bytes_to_send_len);
120 self.outstanding_writes.push_back(Box::pin(async move {
121 write_fut
122 .await
123 .map(|res| res.map(|()| bytes_to_send_len).map_err(zx::Status::from_raw))
124 }));
125 self.available -= bytes_to_send_len;
126 self.bytes_sent += bytes_to_send_len;
127 }
128 debug_assert!(self.bytes_sent <= self.blob_len);
129
130 if self.bytes_sent == self.blob_len {
132 while let Some(result) =
133 self.outstanding_writes.try_next().await.map_err(WriteError::Fidl)?
134 {
135 match result {
136 Ok(bytes_ackd) => self.available += bytes_ackd,
137 Err(e) => return Err(WriteError::BytesReady(e)),
138 }
139 }
140 if self.available != self.vmo_len {
142 return Err(WriteError::EndOfBlob);
143 }
144 }
145 Ok(())
146 }
147
148 pub fn vmo_size(&self) -> u64 {
149 self.vmo_len
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156 use assert_matches::assert_matches;
157 use fidl::endpoints::create_proxy_and_stream;
158 use fidl_fuchsia_fxfs::{BlobWriterMarker, BlobWriterRequest};
159 use fuchsia_sync::Mutex;
160 use futures::{FutureExt, pin_mut, select};
161 use std::sync::Arc;
162 use zx::HandleBased;
163
164 const VMO_SIZE: usize = 4096;
165
166 async fn check_blob_writer(
167 write_fun: impl AsyncFnOnce(BlobWriterProxy, &[u8]),
168 data: &[u8],
169 writes: &[(usize, usize)],
170 ) {
171 let (proxy, mut stream) = create_proxy_and_stream::<BlobWriterMarker>();
172 let count = Arc::new(Mutex::new(0));
173 let count_clone = count.clone();
174 let expected_count = writes.len();
175 let mut check_vmo = None;
176 let mock_server = async move {
177 while let Some(request) = stream.next().await {
178 match request {
179 Ok(BlobWriterRequest::GetVmo { responder, .. }) => {
180 let vmo = zx::Vmo::create(VMO_SIZE as u64).expect("failed to create vmo");
181 let vmo_dup = vmo
182 .duplicate_handle(zx::Rights::SAME_RIGHTS)
183 .expect("failed to duplicate VMO");
184 check_vmo = Some(vmo);
185 responder.send(Ok(vmo_dup)).unwrap();
186 }
187 Ok(BlobWriterRequest::BytesReady { responder, bytes_written, .. }) => {
188 let vmo = check_vmo.as_ref().unwrap();
189 let mut count_locked = count.lock();
190 let mut buf = vec![0; bytes_written as usize];
191 let data_range = writes[*count_locked];
192 let vmo_offset = data_range.0 % VMO_SIZE;
193 if vmo_offset + bytes_written as usize > VMO_SIZE {
194 let split = VMO_SIZE - vmo_offset;
195 vmo.read(&mut buf[0..split], vmo_offset as u64).unwrap();
196 vmo.read(&mut buf[split..], 0).unwrap();
197 } else {
198 vmo.read(&mut buf, vmo_offset as u64).unwrap();
199 }
200 assert_eq!(bytes_written, (data_range.1 - data_range.0) as u64);
201 assert_eq!(&data[data_range.0..data_range.1], buf);
202 *count_locked += 1;
203 responder.send(Ok(())).unwrap();
204 }
205 _ => {
206 unreachable!()
207 }
208 }
209 }
210 }
211 .fuse();
212
213 pin_mut!(mock_server);
214
215 select! {
216 _ = mock_server => unreachable!(),
217 _ = write_fun(proxy, data).fuse() => {
218 assert_eq!(*count_clone.lock(), expected_count);
219 }
220 }
221 }
222
223 #[fuchsia::test]
224 async fn invalid_write_past_end_of_blob() {
225 let mut data = [0; VMO_SIZE];
226 rand::fill(&mut data[..]);
227
228 let write_fun = async move |proxy: BlobWriterProxy, data: &[u8]| {
229 let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
230 .await
231 .expect("failed to create BlobWriter");
232 let () = blob_writer.write(&data).await.unwrap();
233 let invalid_write = [0; 4096];
234 assert_matches!(blob_writer.write(&invalid_write).await, Err(WriteError::EndOfBlob));
235 };
236
237 check_blob_writer(write_fun, &data, &[(0, VMO_SIZE)]).await;
238 }
239
240 #[fuchsia::test]
241 async fn do_not_split_writes_if_blob_fits_in_vmo() {
242 let mut data = vec![0; VMO_SIZE - 1];
243 rand::fill(&mut data[..]);
244
245 let write_fun = async move |proxy: BlobWriterProxy, data: &[u8]| {
246 let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
247 .await
248 .expect("failed to create BlobWriter");
249 let () = blob_writer.write(&data[..]).await.unwrap();
250 };
251
252 check_blob_writer(write_fun, &data, &[(0, 4095)]).await;
253 }
254
255 #[fuchsia::test]
256 async fn split_writes_if_blob_does_not_fit_in_vmo() {
257 let mut data = vec![0; VMO_SIZE + 1];
258 rand::fill(&mut data[..]);
259
260 let write_fun = async move |proxy: BlobWriterProxy, data: &[u8]| {
261 let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
262 .await
263 .expect("failed to create BlobWriter");
264 let () = blob_writer.write(&data[..]).await.unwrap();
265 };
266
267 check_blob_writer(write_fun, &data, &[(0, 2048), (2048, 4096), (4096, 4097)]).await;
268 }
269
270 #[fuchsia::test]
271 async fn third_write_wraps() {
272 let mut data = vec![0; 1024 * 6];
273 rand::fill(&mut data[..]);
274
275 let writes =
276 [(0, 1024 * 2), (1024 * 2, 1024 * 3), (1024 * 3, 1024 * 5), (1024 * 5, 1024 * 6)];
277
278 let write_fun = async move |proxy: BlobWriterProxy, data: &[u8]| {
279 let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
280 .await
281 .expect("failed to create BlobWriter");
282 for (i, j) in writes {
283 let () = blob_writer.write(&data[i..j]).await.unwrap();
284 }
285 };
286
287 check_blob_writer(write_fun, &data, &writes).await;
288 }
289
290 #[fuchsia::test]
291 async fn many_wraps() {
292 let mut data = vec![0; VMO_SIZE * 3];
293 rand::fill(&mut data[..]);
294
295 let write_fun = async move |proxy: BlobWriterProxy, data: &[u8]| {
296 let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
297 .await
298 .expect("failed to create BlobWriter");
299 let () = blob_writer.write(&data[0..1]).await.unwrap();
300 let () = blob_writer.write(&data[1..]).await.unwrap();
301 };
302
303 check_blob_writer(
304 write_fun,
305 &data,
306 &[
307 (0, 1),
308 (1, 2049),
309 (2049, 4097),
310 (4097, 6145),
311 (6145, 8193),
312 (8193, 10241),
313 (10241, 12288),
314 ],
315 )
316 .await;
317 }
318}