1use fidl_fuchsia_fxfs::BlobWriterProxy;
6
7use futures::future::BoxFuture;
8use futures::stream::{FuturesOrdered, StreamExt as _, TryStreamExt as _};
9
10mod errors;
11pub use errors::{CreateError, WriteError};
12
13#[derive(Debug)]
16pub struct BlobWriter {
17 blob_writer_proxy: BlobWriterProxy,
18 vmo: zx::Vmo,
19 outstanding_writes:
32 FuturesOrdered<BoxFuture<'static, Result<Result<u64, zx::Status>, fidl::Error>>>,
33 bytes_sent: u64,
35 available: u64,
38 blob_len: u64,
40 vmo_len: u64,
42}
43
44impl BlobWriter {
45 pub async fn create(
47 blob_writer_proxy: BlobWriterProxy,
48 size: u64,
49 ) -> Result<Self, CreateError> {
50 let vmo = blob_writer_proxy
51 .get_vmo(size)
52 .await
53 .map_err(CreateError::Fidl)?
54 .map_err(zx::Status::from_raw)
55 .map_err(CreateError::GetVmo)?;
56 let vmo_len = vmo.get_size().map_err(CreateError::GetSize)?;
57 Ok(BlobWriter {
58 blob_writer_proxy,
59 vmo,
60 outstanding_writes: FuturesOrdered::new(),
61 bytes_sent: 0,
62 available: vmo_len,
63 blob_len: size,
64 vmo_len,
65 })
66 }
67
68 pub async fn write(&mut self, mut bytes: &[u8]) -> Result<(), WriteError> {
80 if self.bytes_sent + bytes.len() as u64 > self.blob_len {
81 return Err(WriteError::EndOfBlob);
82 }
83 while !bytes.is_empty() {
84 debug_assert!(self.outstanding_writes.len() <= 2);
85 if self.available == 0 || self.outstanding_writes.len() == 2 {
87 let bytes_ackd = self
88 .outstanding_writes
89 .next()
90 .await
91 .ok_or_else(|| WriteError::QueueEnded)?
92 .map_err(WriteError::Fidl)?
93 .map_err(WriteError::BytesReady)?;
94 self.available += bytes_ackd;
95 }
96
97 let bytes_to_send_len = {
98 let mut bytes_to_send_len = std::cmp::min(self.available, bytes.len() as u64);
99 if self.blob_len - self.bytes_sent > self.vmo_len {
102 bytes_to_send_len = std::cmp::min(bytes_to_send_len, self.vmo_len / 2)
103 }
104 bytes_to_send_len
105 };
106
107 let (bytes_to_send, remaining_bytes) = bytes.split_at(bytes_to_send_len as usize);
108 bytes = remaining_bytes;
109
110 let vmo_index = self.bytes_sent % self.vmo_len;
111 let (bytes_to_send_before_wrap, bytes_to_send_after_wrap) = bytes_to_send
112 .split_at(std::cmp::min((self.vmo_len - vmo_index) as usize, bytes_to_send.len()));
113
114 self.vmo.write(bytes_to_send_before_wrap, vmo_index).map_err(WriteError::VmoWrite)?;
115 if !bytes_to_send_after_wrap.is_empty() {
116 self.vmo.write(bytes_to_send_after_wrap, 0).map_err(WriteError::VmoWrite)?;
117 }
118
119 let write_fut = self.blob_writer_proxy.bytes_ready(bytes_to_send_len);
120 self.outstanding_writes.push_back(Box::pin(async move {
121 write_fut
122 .await
123 .map(|res| res.map(|()| bytes_to_send_len).map_err(zx::Status::from_raw))
124 }));
125 self.available -= bytes_to_send_len;
126 self.bytes_sent += bytes_to_send_len;
127 }
128 debug_assert!(self.bytes_sent <= self.blob_len);
129
130 if self.bytes_sent == self.blob_len {
132 while let Some(result) =
133 self.outstanding_writes.try_next().await.map_err(WriteError::Fidl)?
134 {
135 match result {
136 Ok(bytes_ackd) => self.available += bytes_ackd,
137 Err(e) => return Err(WriteError::BytesReady(e)),
138 }
139 }
140 if self.available != self.vmo_len {
142 return Err(WriteError::EndOfBlob);
143 }
144 }
145 Ok(())
146 }
147
148 pub fn vmo_size(&self) -> u64 {
149 self.vmo_len
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156 use assert_matches::assert_matches;
157 use fidl::endpoints::create_proxy_and_stream;
158 use fidl_fuchsia_fxfs::{BlobWriterMarker, BlobWriterRequest};
159 use fuchsia_sync::Mutex;
160 use futures::{FutureExt, pin_mut, select};
161 use std::sync::Arc;
162
163 const VMO_SIZE: usize = 4096;
164
165 async fn check_blob_writer(
166 write_fun: impl AsyncFnOnce(BlobWriterProxy, &[u8]),
167 data: &[u8],
168 writes: &[(usize, usize)],
169 ) {
170 let (proxy, mut stream) = create_proxy_and_stream::<BlobWriterMarker>();
171 let count = Arc::new(Mutex::new(0));
172 let count_clone = count.clone();
173 let expected_count = writes.len();
174 let mut check_vmo = None;
175 let mock_server = async move {
176 while let Some(request) = stream.next().await {
177 match request {
178 Ok(BlobWriterRequest::GetVmo { responder, .. }) => {
179 let vmo = zx::Vmo::create(VMO_SIZE as u64).expect("failed to create vmo");
180 let vmo_dup = vmo
181 .duplicate_handle(zx::Rights::SAME_RIGHTS)
182 .expect("failed to duplicate VMO");
183 check_vmo = Some(vmo);
184 responder.send(Ok(vmo_dup)).unwrap();
185 }
186 Ok(BlobWriterRequest::BytesReady { responder, bytes_written, .. }) => {
187 let vmo = check_vmo.as_ref().unwrap();
188 let mut count_locked = count.lock();
189 let mut buf = vec![0; bytes_written as usize];
190 let data_range = writes[*count_locked];
191 let vmo_offset = data_range.0 % VMO_SIZE;
192 if vmo_offset + bytes_written as usize > VMO_SIZE {
193 let split = VMO_SIZE - vmo_offset;
194 vmo.read(&mut buf[0..split], vmo_offset as u64).unwrap();
195 vmo.read(&mut buf[split..], 0).unwrap();
196 } else {
197 vmo.read(&mut buf, vmo_offset as u64).unwrap();
198 }
199 assert_eq!(bytes_written, (data_range.1 - data_range.0) as u64);
200 assert_eq!(&data[data_range.0..data_range.1], buf);
201 *count_locked += 1;
202 responder.send(Ok(())).unwrap();
203 }
204 _ => {
205 unreachable!()
206 }
207 }
208 }
209 }
210 .fuse();
211
212 pin_mut!(mock_server);
213
214 select! {
215 _ = mock_server => unreachable!(),
216 _ = write_fun(proxy, data).fuse() => {
217 assert_eq!(*count_clone.lock(), expected_count);
218 }
219 }
220 }
221
222 #[fuchsia::test]
223 async fn invalid_write_past_end_of_blob() {
224 let mut data = [0; VMO_SIZE];
225 rand::fill(&mut data[..]);
226
227 let write_fun = async move |proxy: BlobWriterProxy, data: &[u8]| {
228 let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
229 .await
230 .expect("failed to create BlobWriter");
231 let () = blob_writer.write(&data).await.unwrap();
232 let invalid_write = [0; 4096];
233 assert_matches!(blob_writer.write(&invalid_write).await, Err(WriteError::EndOfBlob));
234 };
235
236 check_blob_writer(write_fun, &data, &[(0, VMO_SIZE)]).await;
237 }
238
239 #[fuchsia::test]
240 async fn do_not_split_writes_if_blob_fits_in_vmo() {
241 let mut data = vec![0; VMO_SIZE - 1];
242 rand::fill(&mut data[..]);
243
244 let write_fun = async move |proxy: BlobWriterProxy, data: &[u8]| {
245 let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
246 .await
247 .expect("failed to create BlobWriter");
248 let () = blob_writer.write(&data[..]).await.unwrap();
249 };
250
251 check_blob_writer(write_fun, &data, &[(0, 4095)]).await;
252 }
253
254 #[fuchsia::test]
255 async fn split_writes_if_blob_does_not_fit_in_vmo() {
256 let mut data = vec![0; VMO_SIZE + 1];
257 rand::fill(&mut data[..]);
258
259 let write_fun = async move |proxy: BlobWriterProxy, data: &[u8]| {
260 let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
261 .await
262 .expect("failed to create BlobWriter");
263 let () = blob_writer.write(&data[..]).await.unwrap();
264 };
265
266 check_blob_writer(write_fun, &data, &[(0, 2048), (2048, 4096), (4096, 4097)]).await;
267 }
268
269 #[fuchsia::test]
270 async fn third_write_wraps() {
271 let mut data = vec![0; 1024 * 6];
272 rand::fill(&mut data[..]);
273
274 let writes =
275 [(0, 1024 * 2), (1024 * 2, 1024 * 3), (1024 * 3, 1024 * 5), (1024 * 5, 1024 * 6)];
276
277 let write_fun = async move |proxy: BlobWriterProxy, data: &[u8]| {
278 let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
279 .await
280 .expect("failed to create BlobWriter");
281 for (i, j) in writes {
282 let () = blob_writer.write(&data[i..j]).await.unwrap();
283 }
284 };
285
286 check_blob_writer(write_fun, &data, &writes).await;
287 }
288
289 #[fuchsia::test]
290 async fn many_wraps() {
291 let mut data = vec![0; VMO_SIZE * 3];
292 rand::fill(&mut data[..]);
293
294 let write_fun = async move |proxy: BlobWriterProxy, data: &[u8]| {
295 let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
296 .await
297 .expect("failed to create BlobWriter");
298 let () = blob_writer.write(&data[0..1]).await.unwrap();
299 let () = blob_writer.write(&data[1..]).await.unwrap();
300 };
301
302 check_blob_writer(
303 write_fun,
304 &data,
305 &[
306 (0, 1),
307 (1, 2049),
308 (2049, 4097),
309 (4097, 6145),
310 (6145, 8193),
311 (8193, 10241),
312 (10241, 12288),
313 ],
314 )
315 .await;
316 }
317}