blob_writer/
lib.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_fxfs::BlobWriterProxy;
6
7use futures::future::BoxFuture;
8use futures::stream::{FuturesOrdered, StreamExt as _, TryStreamExt as _};
9
10mod errors;
11pub use errors::{CreateError, WriteError};
12
13/// BlobWriter is a wrapper around the fuchsia.fxfs.BlobWriter fidl protocol. Clients will use this
14/// library to write blobs to disk.
15#[derive(Debug)]
16pub struct BlobWriter {
17    blob_writer_proxy: BlobWriterProxy,
18    vmo: zx::Vmo,
19    // Ordered queue of BytesReady requests. There are at most 2 outstanding requests on the
20    // queue at any point in time. Each BytesReady request takes up at most half the ring
21    // buffer (N).
22    //
23    // Our goal is to be constantly moving bytes out of the network and into storage without ever
24    // having to wait for a fidl roundtrip. Maintaining 2 outstanding requests on the queue allows
25    // us to pipeline requests, so that the server can respond to one request while the client is
26    // creating another. Limiting the size of any particular request to N/2 allows each of the
27    // two requests on the queue to be as big as they possibly can, which is particularly important
28    // when storage is the limiting factor. Namely, we want to avoid situations where the server
29    // has completed a small request and has to wait on a fidl roundtrip (i.e. has to wait for the
30    // network to receive the response, create a new request, and send the request back).
31    outstanding_writes:
32        FuturesOrdered<BoxFuture<'static, Result<Result<u64, zx::Status>, fidl::Error>>>,
33    // Number of bytes that have been written to the vmo, both acknowledged and unacknowledged.
34    bytes_sent: u64,
35    // Number of available bytes in the vmo (the size of the vmo minus the size of unacknowledged
36    // writes).
37    available: u64,
38    // Size of the blob being written.
39    blob_len: u64,
40    // Size of the vmo.
41    vmo_len: u64,
42}
43
44impl BlobWriter {
45    /// Creates a `BlobWriter`.  Exactly `size` bytes are expected to be written into the writer.
46    pub async fn create(
47        blob_writer_proxy: BlobWriterProxy,
48        size: u64,
49    ) -> Result<Self, CreateError> {
50        let vmo = blob_writer_proxy
51            .get_vmo(size)
52            .await
53            .map_err(CreateError::Fidl)?
54            .map_err(zx::Status::from_raw)
55            .map_err(CreateError::GetVmo)?;
56        let vmo_len = vmo.get_size().map_err(CreateError::GetSize)?;
57        Ok(BlobWriter {
58            blob_writer_proxy,
59            vmo,
60            outstanding_writes: FuturesOrdered::new(),
61            bytes_sent: 0,
62            available: vmo_len,
63            blob_len: size,
64            vmo_len,
65        })
66    }
67
68    /// Begins writing `bytes` to the server.
69    ///
70    /// If `bytes` contains all of the remaining unwritten bytes of the blob, i.e. the sum of the
71    /// lengths of the `bytes` slices from this and all prior calls to `write` is equal to the size
72    /// given to `create`, then the returned Future will not complete until all of the writes have
73    /// been acknowledged by the server and the blob can be opened for read.
74    /// Otherwise, the returned Future may complete before the write of `bytes` has been
75    /// acknowledged by the server.
76    ///
77    /// Returns an error if the length of `bytes` exceeds the remaining available space in the
78    /// blob, calculated as per `size`.
79    pub async fn write(&mut self, mut bytes: &[u8]) -> Result<(), WriteError> {
80        if self.bytes_sent + bytes.len() as u64 > self.blob_len {
81            return Err(WriteError::EndOfBlob);
82        }
83        while !bytes.is_empty() {
84            debug_assert!(self.outstanding_writes.len() <= 2);
85            // Wait until there is room in the vmo and fewer than 2 outstanding writes.
86            if self.available == 0 || self.outstanding_writes.len() == 2 {
87                let bytes_ackd = self
88                    .outstanding_writes
89                    .next()
90                    .await
91                    .ok_or_else(|| WriteError::QueueEnded)?
92                    .map_err(WriteError::Fidl)?
93                    .map_err(WriteError::BytesReady)?;
94                self.available += bytes_ackd;
95            }
96
97            let bytes_to_send_len = {
98                let mut bytes_to_send_len = std::cmp::min(self.available, bytes.len() as u64);
99                // If all the remaining bytes do not fit in the vmo, split writes to prevent
100                // blocking the server on an ack roundtrip.
101                if self.blob_len - self.bytes_sent > self.vmo_len {
102                    bytes_to_send_len = std::cmp::min(bytes_to_send_len, self.vmo_len / 2)
103                }
104                bytes_to_send_len
105            };
106
107            let (bytes_to_send, remaining_bytes) = bytes.split_at(bytes_to_send_len as usize);
108            bytes = remaining_bytes;
109
110            let vmo_index = self.bytes_sent % self.vmo_len;
111            let (bytes_to_send_before_wrap, bytes_to_send_after_wrap) = bytes_to_send
112                .split_at(std::cmp::min((self.vmo_len - vmo_index) as usize, bytes_to_send.len()));
113
114            self.vmo.write(bytes_to_send_before_wrap, vmo_index).map_err(WriteError::VmoWrite)?;
115            if !bytes_to_send_after_wrap.is_empty() {
116                self.vmo.write(bytes_to_send_after_wrap, 0).map_err(WriteError::VmoWrite)?;
117            }
118
119            let write_fut = self.blob_writer_proxy.bytes_ready(bytes_to_send_len);
120            self.outstanding_writes.push_back(Box::pin(async move {
121                write_fut
122                    .await
123                    .map(|res| res.map(|()| bytes_to_send_len).map_err(zx::Status::from_raw))
124            }));
125            self.available -= bytes_to_send_len;
126            self.bytes_sent += bytes_to_send_len;
127        }
128        debug_assert!(self.bytes_sent <= self.blob_len);
129
130        // The last write call should not complete until the blob is completely written.
131        if self.bytes_sent == self.blob_len {
132            while let Some(result) =
133                self.outstanding_writes.try_next().await.map_err(WriteError::Fidl)?
134            {
135                match result {
136                    Ok(bytes_ackd) => self.available += bytes_ackd,
137                    Err(e) => return Err(WriteError::BytesReady(e)),
138                }
139            }
140            // This should not be possible.
141            if self.available != self.vmo_len {
142                return Err(WriteError::EndOfBlob);
143            }
144        }
145        Ok(())
146    }
147
148    pub fn vmo_size(&self) -> u64 {
149        self.vmo_len
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156    use assert_matches::assert_matches;
157    use fidl::endpoints::create_proxy_and_stream;
158    use fidl_fuchsia_fxfs::{BlobWriterMarker, BlobWriterRequest};
159    use fuchsia_sync::Mutex;
160    use futures::{FutureExt, pin_mut, select};
161    use std::sync::Arc;
162    use zx::HandleBased;
163
164    const VMO_SIZE: usize = 4096;
165
166    async fn check_blob_writer(
167        write_fun: impl AsyncFnOnce(BlobWriterProxy, &[u8]),
168        data: &[u8],
169        writes: &[(usize, usize)],
170    ) {
171        let (proxy, mut stream) = create_proxy_and_stream::<BlobWriterMarker>();
172        let count = Arc::new(Mutex::new(0));
173        let count_clone = count.clone();
174        let expected_count = writes.len();
175        let mut check_vmo = None;
176        let mock_server = async move {
177            while let Some(request) = stream.next().await {
178                match request {
179                    Ok(BlobWriterRequest::GetVmo { responder, .. }) => {
180                        let vmo = zx::Vmo::create(VMO_SIZE as u64).expect("failed to create vmo");
181                        let vmo_dup = vmo
182                            .duplicate_handle(zx::Rights::SAME_RIGHTS)
183                            .expect("failed to duplicate VMO");
184                        check_vmo = Some(vmo);
185                        responder.send(Ok(vmo_dup)).unwrap();
186                    }
187                    Ok(BlobWriterRequest::BytesReady { responder, bytes_written, .. }) => {
188                        let vmo = check_vmo.as_ref().unwrap();
189                        let mut count_locked = count.lock();
190                        let mut buf = vec![0; bytes_written as usize];
191                        let data_range = writes[*count_locked];
192                        let vmo_offset = data_range.0 % VMO_SIZE;
193                        if vmo_offset + bytes_written as usize > VMO_SIZE {
194                            let split = VMO_SIZE - vmo_offset;
195                            vmo.read(&mut buf[0..split], vmo_offset as u64).unwrap();
196                            vmo.read(&mut buf[split..], 0).unwrap();
197                        } else {
198                            vmo.read(&mut buf, vmo_offset as u64).unwrap();
199                        }
200                        assert_eq!(bytes_written, (data_range.1 - data_range.0) as u64);
201                        assert_eq!(&data[data_range.0..data_range.1], buf);
202                        *count_locked += 1;
203                        responder.send(Ok(())).unwrap();
204                    }
205                    _ => {
206                        unreachable!()
207                    }
208                }
209            }
210        }
211        .fuse();
212
213        pin_mut!(mock_server);
214
215        select! {
216            _ = mock_server => unreachable!(),
217            _ = write_fun(proxy, data).fuse() => {
218                assert_eq!(*count_clone.lock(), expected_count);
219            }
220        }
221    }
222
223    #[fuchsia::test]
224    async fn invalid_write_past_end_of_blob() {
225        let mut data = [0; VMO_SIZE];
226        rand::fill(&mut data[..]);
227
228        let write_fun = async move |proxy: BlobWriterProxy, data: &[u8]| {
229            let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
230                .await
231                .expect("failed to create BlobWriter");
232            let () = blob_writer.write(&data).await.unwrap();
233            let invalid_write = [0; 4096];
234            assert_matches!(blob_writer.write(&invalid_write).await, Err(WriteError::EndOfBlob));
235        };
236
237        check_blob_writer(write_fun, &data, &[(0, VMO_SIZE)]).await;
238    }
239
240    #[fuchsia::test]
241    async fn do_not_split_writes_if_blob_fits_in_vmo() {
242        let mut data = vec![0; VMO_SIZE - 1];
243        rand::fill(&mut data[..]);
244
245        let write_fun = async move |proxy: BlobWriterProxy, data: &[u8]| {
246            let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
247                .await
248                .expect("failed to create BlobWriter");
249            let () = blob_writer.write(&data[..]).await.unwrap();
250        };
251
252        check_blob_writer(write_fun, &data, &[(0, 4095)]).await;
253    }
254
255    #[fuchsia::test]
256    async fn split_writes_if_blob_does_not_fit_in_vmo() {
257        let mut data = vec![0; VMO_SIZE + 1];
258        rand::fill(&mut data[..]);
259
260        let write_fun = async move |proxy: BlobWriterProxy, data: &[u8]| {
261            let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
262                .await
263                .expect("failed to create BlobWriter");
264            let () = blob_writer.write(&data[..]).await.unwrap();
265        };
266
267        check_blob_writer(write_fun, &data, &[(0, 2048), (2048, 4096), (4096, 4097)]).await;
268    }
269
270    #[fuchsia::test]
271    async fn third_write_wraps() {
272        let mut data = vec![0; 1024 * 6];
273        rand::fill(&mut data[..]);
274
275        let writes =
276            [(0, 1024 * 2), (1024 * 2, 1024 * 3), (1024 * 3, 1024 * 5), (1024 * 5, 1024 * 6)];
277
278        let write_fun = async move |proxy: BlobWriterProxy, data: &[u8]| {
279            let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
280                .await
281                .expect("failed to create BlobWriter");
282            for (i, j) in writes {
283                let () = blob_writer.write(&data[i..j]).await.unwrap();
284            }
285        };
286
287        check_blob_writer(write_fun, &data, &writes).await;
288    }
289
290    #[fuchsia::test]
291    async fn many_wraps() {
292        let mut data = vec![0; VMO_SIZE * 3];
293        rand::fill(&mut data[..]);
294
295        let write_fun = async move |proxy: BlobWriterProxy, data: &[u8]| {
296            let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
297                .await
298                .expect("failed to create BlobWriter");
299            let () = blob_writer.write(&data[0..1]).await.unwrap();
300            let () = blob_writer.write(&data[1..]).await.unwrap();
301        };
302
303        check_blob_writer(
304            write_fun,
305            &data,
306            &[
307                (0, 1),
308                (1, 2049),
309                (2049, 4097),
310                (4097, 6145),
311                (6145, 8193),
312                (8193, 10241),
313                (10241, 12288),
314            ],
315        )
316        .await;
317    }
318}