use fidl_fuchsia_fxfs::BlobWriterProxy;
use futures::future::{BoxFuture, FutureExt as _};
use futures::stream::{FuturesOrdered, StreamExt as _, TryStreamExt as _};
mod errors;
pub use errors::{CreateError, WriteError};
#[derive(Debug)]
pub struct BlobWriter {
blob_writer_proxy: BlobWriterProxy,
vmo: zx::Vmo,
outstanding_writes:
FuturesOrdered<BoxFuture<'static, Result<Result<u64, zx::Status>, fidl::Error>>>,
bytes_sent: u64,
available: u64,
blob_len: u64,
vmo_len: u64,
}
impl BlobWriter {
pub async fn create(
blob_writer_proxy: BlobWriterProxy,
size: u64,
) -> Result<Self, CreateError> {
let vmo = blob_writer_proxy
.get_vmo(size)
.await
.map_err(CreateError::Fidl)?
.map_err(zx::Status::from_raw)
.map_err(CreateError::GetVmo)?;
let vmo_len = vmo.get_size().map_err(CreateError::GetSize)?;
Ok(BlobWriter {
blob_writer_proxy,
vmo,
outstanding_writes: FuturesOrdered::new(),
bytes_sent: 0,
available: vmo_len,
blob_len: size,
vmo_len,
})
}
pub async fn write(&mut self, mut bytes: &[u8]) -> Result<(), WriteError> {
if self.bytes_sent + bytes.len() as u64 > self.blob_len {
return Err(WriteError::EndOfBlob);
}
while !bytes.is_empty() {
debug_assert!(self.outstanding_writes.len() <= 2);
if self.available == 0 || self.outstanding_writes.len() == 2 {
let bytes_ackd = self
.outstanding_writes
.next()
.await
.ok_or_else(|| WriteError::QueueEnded)?
.map_err(WriteError::Fidl)?
.map_err(WriteError::BytesReady)?;
self.available += bytes_ackd;
}
let bytes_to_send_len = {
let mut bytes_to_send_len = std::cmp::min(self.available, bytes.len() as u64);
if self.blob_len - self.bytes_sent > self.vmo_len {
bytes_to_send_len = std::cmp::min(bytes_to_send_len, self.vmo_len / 2)
}
bytes_to_send_len
};
let (bytes_to_send, remaining_bytes) = bytes.split_at(bytes_to_send_len as usize);
bytes = remaining_bytes;
let vmo_index = self.bytes_sent % self.vmo_len;
let (bytes_to_send_before_wrap, bytes_to_send_after_wrap) = bytes_to_send
.split_at(std::cmp::min((self.vmo_len - vmo_index) as usize, bytes_to_send.len()));
self.vmo.write(bytes_to_send_before_wrap, vmo_index).map_err(WriteError::VmoWrite)?;
if !bytes_to_send_after_wrap.is_empty() {
self.vmo.write(bytes_to_send_after_wrap, 0).map_err(WriteError::VmoWrite)?;
}
let write_fut = self.blob_writer_proxy.bytes_ready(bytes_to_send_len);
self.outstanding_writes.push_back(
async move {
write_fut
.await
.map(|res| res.map(|()| bytes_to_send_len).map_err(zx::Status::from_raw))
}
.boxed(),
);
self.available -= bytes_to_send_len;
self.bytes_sent += bytes_to_send_len;
}
debug_assert!(self.bytes_sent <= self.blob_len);
if self.bytes_sent == self.blob_len {
while let Some(result) =
self.outstanding_writes.try_next().await.map_err(WriteError::Fidl)?
{
match result {
Ok(bytes_ackd) => self.available += bytes_ackd,
Err(e) => return Err(WriteError::BytesReady(e)),
}
}
if self.available != self.vmo_len {
return Err(WriteError::EndOfBlob);
}
}
Ok(())
}
pub fn vmo_size(&self) -> u64 {
self.vmo_len
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use fidl::endpoints::create_proxy_and_stream;
use fidl_fuchsia_fxfs::{BlobWriterMarker, BlobWriterRequest};
use futures::{pin_mut, select};
use rand::{thread_rng, Rng as _};
use std::sync::{Arc, Mutex};
use zx::HandleBased;
const VMO_SIZE: usize = 4096;
async fn check_blob_writer(
write_fun: impl FnOnce(BlobWriterProxy) -> BoxFuture<'static, ()>,
data: &[u8],
writes: &[(usize, usize)],
) {
let (proxy, mut stream) = create_proxy_and_stream::<BlobWriterMarker>().unwrap();
let count = Arc::new(Mutex::new(0));
let count_clone = count.clone();
let expected_count = writes.len();
let mut check_vmo = None;
let mock_server = async move {
while let Some(request) = stream.next().await {
match request {
Ok(BlobWriterRequest::GetVmo { responder, .. }) => {
let vmo = zx::Vmo::create(VMO_SIZE as u64).expect("failed to create vmo");
let vmo_dup = vmo
.duplicate_handle(zx::Rights::SAME_RIGHTS)
.expect("failed to duplicate VMO");
check_vmo = Some(vmo);
responder.send(Ok(vmo_dup)).unwrap();
}
Ok(BlobWriterRequest::BytesReady { responder, bytes_written, .. }) => {
let vmo = check_vmo.as_ref().unwrap();
let mut count_locked = count.lock().unwrap();
let mut buf = vec![0; bytes_written as usize];
let data_range = writes[*count_locked];
let vmo_offset = data_range.0 % VMO_SIZE;
if vmo_offset + bytes_written as usize > VMO_SIZE {
let split = VMO_SIZE - vmo_offset;
vmo.read(&mut buf[0..split], vmo_offset as u64).unwrap();
vmo.read(&mut buf[split..], 0).unwrap();
} else {
vmo.read(&mut buf, vmo_offset as u64).unwrap();
}
assert_eq!(bytes_written, (data_range.1 - data_range.0) as u64);
assert_eq!(&data[data_range.0..data_range.1], buf);
*count_locked += 1;
responder.send(Ok(())).unwrap();
}
_ => {
unreachable!()
}
}
}
}
.fuse();
pin_mut!(mock_server);
select! {
_ = mock_server => unreachable!(),
_ = write_fun(proxy).fuse() => {
assert_eq!(*count_clone.lock().unwrap(), expected_count);
}
}
}
#[fuchsia::test]
async fn invalid_write_past_end_of_blob() {
let mut data = [0; VMO_SIZE];
thread_rng().fill(&mut data[..]);
let write_fun = |proxy: BlobWriterProxy| {
async move {
let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
.await
.expect("failed to create BlobWriter");
let () = blob_writer.write(&data).await.unwrap();
let invalid_write = [0; 4096];
assert_matches!(
blob_writer.write(&invalid_write).await,
Err(WriteError::EndOfBlob)
);
}
.boxed()
};
check_blob_writer(write_fun, &data, &[(0, VMO_SIZE)]).await;
}
#[fuchsia::test]
async fn do_not_split_writes_if_blob_fits_in_vmo() {
let mut data = [0; VMO_SIZE - 1];
thread_rng().fill(&mut data[..]);
let write_fun = |proxy: BlobWriterProxy| {
async move {
let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
.await
.expect("failed to create BlobWriter");
let () = blob_writer.write(&data[..]).await.unwrap();
}
.boxed()
};
check_blob_writer(write_fun, &data, &[(0, 4095)]).await;
}
#[fuchsia::test]
async fn split_writes_if_blob_does_not_fit_in_vmo() {
let mut data = [0; VMO_SIZE + 1];
thread_rng().fill(&mut data[..]);
let write_fun = |proxy: BlobWriterProxy| {
async move {
let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
.await
.expect("failed to create BlobWriter");
let () = blob_writer.write(&data[..]).await.unwrap();
}
.boxed()
};
check_blob_writer(write_fun, &data, &[(0, 2048), (2048, 4096), (4096, 4097)]).await;
}
#[fuchsia::test]
async fn third_write_wraps() {
let mut data = [0; 1024 * 6];
thread_rng().fill(&mut data[..]);
let writes =
[(0, 1024 * 2), (1024 * 2, 1024 * 3), (1024 * 3, 1024 * 5), (1024 * 5, 1024 * 6)];
let write_fun = |proxy: BlobWriterProxy| {
async move {
let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
.await
.expect("failed to create BlobWriter");
for (i, j) in writes {
let () = blob_writer.write(&data[i..j]).await.unwrap();
}
}
.boxed()
};
check_blob_writer(write_fun, &data, &writes[..]).await;
}
#[fuchsia::test]
async fn many_wraps() {
let mut data = [0; VMO_SIZE * 3];
thread_rng().fill(&mut data[..]);
let write_fun = |proxy: BlobWriterProxy| {
async move {
let mut blob_writer = BlobWriter::create(proxy, data.len() as u64)
.await
.expect("failed to create BlobWriter");
let () = blob_writer.write(&data[0..1]).await.unwrap();
let () = blob_writer.write(&data[1..]).await.unwrap();
}
.boxed()
};
check_blob_writer(
write_fun,
&data,
&[
(0, 1),
(1, 2049),
(2049, 4097),
(4097, 6145),
(6145, 8193),
(8193, 10241),
(10241, 12288),
],
)
.await;
}
}