use crate::Measurable;
use anyhow::{Context as _, Result};
use fidl_fuchsia_pkg::{
BlobIdIteratorNextResponder, BlobIdIteratorRequest, BlobIdIteratorRequestStream,
BlobInfoIteratorNextResponder, BlobInfoIteratorRequest, BlobInfoIteratorRequestStream,
PackageIndexEntry, PackageIndexIteratorNextResponder, PackageIndexIteratorRequest,
PackageIndexIteratorRequestStream,
};
use futures::prelude::*;
use zx_types::ZX_CHANNEL_MAX_MSG_BYTES;
pub async fn serve_fidl_iterator_from_slice<I>(
mut fidl_iterator: I,
mut items: impl AsMut<[<I::Responder as FidlIteratorNextResponder>::Item]>,
) -> Result<()>
where
I: FidlIteratorRequestStream,
{
let mut items = SliceChunker::new(items.as_mut());
loop {
let chunk = items.next();
let responder =
match fidl_iterator.try_next().await.context("while waiting for next() request")? {
None => break,
Some(request) => I::request_to_responder(request),
};
let () = responder.send_chunk(&chunk).context("while responding")?;
if chunk.is_empty() {
break;
}
}
Ok(())
}
pub async fn serve_fidl_iterator_from_stream<I>(
mut fidl_iterator: I,
stream: impl futures::stream::Stream<Item = Vec<<I::Responder as FidlIteratorNextResponder>::Item>>
+ Unpin,
max_stream_chunks: usize,
) -> Result<()>
where
I: FidlIteratorRequestStream,
{
let mut chunked_stream = stream.ready_chunks(std::cmp::max(max_stream_chunks, 1));
let mut fidl_chunker = OwningChunker::new();
loop {
let responder =
match fidl_iterator.try_next().await.context("while waiting for next() request")? {
None => break,
Some(request) => I::request_to_responder(request),
};
if fidl_chunker.is_empty() {
loop {
if let Some(xss) = chunked_stream.next().await {
fidl_chunker.extend(xss.into_iter().flatten());
if fidl_chunker.is_empty() {
continue;
}
}
break;
}
} else {
if let Some(Some(xss)) = chunked_stream.next().now_or_never() {
fidl_chunker.extend(xss.into_iter().flatten());
}
}
let mut chunk = fidl_chunker.next();
let () = responder.send_chunk(chunk.make_contiguous()).context("while responding")?;
if chunk.is_empty() {
break;
}
}
Ok(())
}
pub trait FidlIteratorRequestStream:
fidl::endpoints::RequestStream + TryStream<Error = fidl::Error>
{
type Responder: FidlIteratorNextResponder;
fn request_to_responder(request: <Self as TryStream>::Ok) -> Self::Responder;
}
pub trait FidlIteratorNextResponder {
type Item: Measurable;
fn send_chunk(self, chunk: &[Self::Item]) -> Result<(), fidl::Error>;
}
impl FidlIteratorRequestStream for PackageIndexIteratorRequestStream {
type Responder = PackageIndexIteratorNextResponder;
fn request_to_responder(request: PackageIndexIteratorRequest) -> Self::Responder {
let PackageIndexIteratorRequest::Next { responder } = request;
responder
}
}
impl FidlIteratorNextResponder for PackageIndexIteratorNextResponder {
type Item = PackageIndexEntry;
fn send_chunk(self, chunk: &[Self::Item]) -> Result<(), fidl::Error> {
self.send(chunk)
}
}
impl FidlIteratorRequestStream for BlobInfoIteratorRequestStream {
type Responder = BlobInfoIteratorNextResponder;
fn request_to_responder(request: BlobInfoIteratorRequest) -> Self::Responder {
let BlobInfoIteratorRequest::Next { responder } = request;
responder
}
}
impl FidlIteratorRequestStream for BlobIdIteratorRequestStream {
type Responder = BlobIdIteratorNextResponder;
fn request_to_responder(request: BlobIdIteratorRequest) -> Self::Responder {
let BlobIdIteratorRequest::Next { responder } = request;
responder
}
}
impl FidlIteratorNextResponder for BlobInfoIteratorNextResponder {
type Item = fidl_fuchsia_pkg::BlobInfo;
fn send_chunk(self, chunk: &[Self::Item]) -> Result<(), fidl::Error> {
self.send(chunk)
}
}
impl FidlIteratorNextResponder for BlobIdIteratorNextResponder {
type Item = fidl_fuchsia_pkg::BlobId;
fn send_chunk(self, chunk: &[Self::Item]) -> Result<(), fidl::Error> {
self.send(chunk)
}
}
const FIDL_VEC_RESPONSE_OVERHEAD_BYTES: usize = 32;
fn how_many_items_fit_in_fidl_vec_response<'a>(
items: impl Iterator<Item = &'a (impl Measurable + 'a)>,
) -> usize {
let mut bytes_used: usize = FIDL_VEC_RESPONSE_OVERHEAD_BYTES;
let mut count = 0;
for item in items {
bytes_used += item.measure();
if bytes_used > ZX_CHANNEL_MAX_MSG_BYTES as usize {
break;
}
count += 1;
}
count
}
struct SliceChunker<'a, I> {
items: &'a mut [I],
}
impl<'a, I> SliceChunker<'a, I>
where
I: Measurable,
{
fn new(items: &'a mut [I]) -> Self {
Self { items }
}
fn next(&mut self) -> &'a mut [I] {
let entry_count = how_many_items_fit_in_fidl_vec_response(self.items.iter());
let tmp = std::mem::replace(&mut self.items, &mut []);
let (chunk, rest) = tmp.split_at_mut(entry_count);
self.items = rest;
chunk
}
}
struct OwningChunker<I> {
items: std::collections::VecDeque<I>,
}
impl<I> OwningChunker<I>
where
I: Measurable,
{
fn new() -> Self {
Self { items: std::collections::VecDeque::new() }
}
fn next(&mut self) -> std::collections::VecDeque<I> {
let count = how_many_items_fit_in_fidl_vec_response(self.items.iter());
let mut other = self.items.split_off(count);
std::mem::swap(&mut self.items, &mut other);
other
}
fn is_empty(&self) -> bool {
self.items.is_empty()
}
fn extend(&mut self, iter: impl IntoIterator<Item = I>) {
self.items.extend(iter)
}
}
#[cfg(test)]
mod tests {
use super::*;
use fidl_fuchsia_pkg::{BlobInfoIteratorMarker, PackageIndexIteratorMarker};
use fuchsia_async::Task;
use fuchsia_hash::HashRangeFull;
use fuchsia_pkg::PackagePath;
use proptest::prelude::*;
#[test]
fn zx_channel_max_msg_bytes_fits_in_usize() {
let _: usize = ZX_CHANNEL_MAX_MSG_BYTES.try_into().unwrap();
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct Byte(u8);
impl Measurable for Byte {
fn measure(&self) -> usize {
1
}
}
#[test]
fn slice_chunker_fuses() {
let items = &mut [Byte(42)];
let mut chunker = SliceChunker::new(items);
assert_eq!(chunker.next(), &mut [Byte(42)]);
assert_eq!(chunker.next(), &mut []);
assert_eq!(chunker.next(), &mut []);
}
#[test]
fn slice_chunker_chunks_at_expected_boundary() {
const BYTES_PER_CHUNK: usize =
ZX_CHANNEL_MAX_MSG_BYTES as usize - FIDL_VEC_RESPONSE_OVERHEAD_BYTES;
let mut items =
(0..=(BYTES_PER_CHUNK as u64 * 2)).map(|n| Byte(n as u8)).collect::<Vec<Byte>>();
let expected = items.clone();
let mut chunker = SliceChunker::new(&mut items);
let mut actual: Vec<Byte> = vec![];
for _ in 0..2 {
let chunk = chunker.next();
assert_eq!(chunk.len(), BYTES_PER_CHUNK);
actual.extend(&*chunk);
}
let chunk = chunker.next();
assert_eq!(chunk.len(), 1);
actual.extend(&*chunk);
assert_eq!(actual, expected);
}
#[test]
fn slice_chunker_terminates_at_too_large_item() {
#[derive(Debug, PartialEq, Eq)]
struct TooBig;
impl Measurable for TooBig {
fn measure(&self) -> usize {
ZX_CHANNEL_MAX_MSG_BYTES as usize
}
}
let items = &mut [TooBig];
let mut chunker = SliceChunker::new(items);
assert_eq!(chunker.next(), &mut []);
}
#[test]
fn owning_chunker_fuses() {
let items = [Byte(42)];
let mut chunker = OwningChunker::new();
chunker.extend(items);
assert_eq!(chunker.next().make_contiguous(), &[Byte(42)]);
assert_eq!(chunker.next().make_contiguous(), &[]);
assert_eq!(chunker.next().make_contiguous(), &[]);
}
#[test]
fn owning_chunker_chunks_at_expected_boundary() {
const BYTES_PER_CHUNK: usize =
ZX_CHANNEL_MAX_MSG_BYTES as usize - FIDL_VEC_RESPONSE_OVERHEAD_BYTES;
let items =
(0..=(BYTES_PER_CHUNK as u64 * 2)).map(|n| Byte(n as u8)).collect::<Vec<Byte>>();
let expected = items.clone();
let mut chunker = OwningChunker::new();
chunker.extend(items.into_iter());
let mut actual: Vec<Byte> = vec![];
for _ in 0..2 {
let chunk = chunker.next();
assert_eq!(chunk.len(), BYTES_PER_CHUNK);
actual.extend(chunk);
}
let chunk = chunker.next();
assert_eq!(chunk.len(), 1);
actual.extend(chunk);
assert_eq!(actual, expected);
}
#[test]
fn owning_chunker_terminates_at_too_large_item() {
#[derive(Debug, PartialEq, Eq)]
struct TooBig;
impl Measurable for TooBig {
fn measure(&self) -> usize {
ZX_CHANNEL_MAX_MSG_BYTES as usize
}
}
let items = [TooBig];
let mut chunker = OwningChunker::new();
chunker.extend(items);
assert_eq!(chunker.next().make_contiguous(), &mut []);
}
#[test]
fn owning_chunker_extend_after_next() {
let mut chunker = OwningChunker::new();
chunker.extend([Byte(0)]);
chunker.extend([Byte(1)]);
assert_eq!(chunker.next().make_contiguous(), &[Byte(0), Byte(1)]);
assert_eq!(chunker.next().make_contiguous(), &[]);
chunker.extend([Byte(2)]);
assert_eq!(chunker.next().make_contiguous(), &[Byte(2)]);
}
#[test]
fn verify_fidl_vec_response_overhead() {
let vec_response_overhead = {
use fidl::encoding::{
DefaultFuchsiaResourceDialect, DynamicFlags, TransactionHeader, TransactionMessage,
TransactionMessageType, UnboundedVector,
};
type Msg = TransactionMessageType<UnboundedVector<u8>>;
let msg = TransactionMessage {
header: TransactionHeader::new(0, 0, DynamicFlags::empty()),
body: &[] as &[u8],
};
fidl::encoding::with_tls_encoded::<Msg, DefaultFuchsiaResourceDialect, _>(
msg,
|bytes, _handles| Ok(bytes.len()),
)
.unwrap()
};
assert_eq!(vec_response_overhead, FIDL_VEC_RESPONSE_OVERHEAD_BYTES);
}
proptest! {
#![proptest_config(ProptestConfig{
failure_persistence: None,
.. ProptestConfig::default()
})]
#[test]
fn serve_fidl_iterator_from_slice_yields_expected_entries(items: Vec<crate::BlobInfo>) {
let mut executor = fuchsia_async::TestExecutor::new();
executor.run_singlethreaded(async move {
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>();
let mut actual_items = vec![];
let ((), ()) = futures::future::join(
async {
let items = items
.iter()
.cloned()
.map(fidl_fuchsia_pkg::BlobInfo::from)
.collect::<Vec<_>>();
serve_fidl_iterator_from_slice(stream, items).await.unwrap()
},
async {
loop {
let chunk = proxy.next().await.unwrap();
if chunk.is_empty() {
break;
}
let chunk = chunk.into_iter().map(crate::BlobInfo::from);
actual_items.extend(chunk);
}
},
)
.await;
assert_eq!(items, actual_items);
})
}
#[test]
fn serve_fidl_iterator_from_stream_yields_expected_entries(
items: Vec<crate::BlobInfo>,
repetition in 0..4usize,
max_chunking in 0..4usize,
) {
let mut executor = fuchsia_async::TestExecutor::new();
executor.run_singlethreaded(async move {
let (proxy, fidl_stream) =
fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>();
let (mut item_sender, item_stream) = futures::channel::mpsc::unbounded();
let mut actual_items = vec![];
let ((), (), ()) = futures::future::join3(
async {
for _ in 0..repetition {
let () = item_sender.send(items
.iter()
.cloned()
.map(fidl_fuchsia_pkg::BlobInfo::from)
.collect::<Vec<_>>()).await.unwrap();
}
drop(item_sender);
},
async {
let () = serve_fidl_iterator_from_stream(
fidl_stream,
item_stream,
max_chunking
)
.await
.unwrap();
},
async {
loop {
let chunk = proxy.next().await.unwrap();
if chunk.is_empty() {
break;
}
let chunk = chunk.into_iter().map(crate::BlobInfo::from);
actual_items.extend(chunk);
}
},
)
.await;
let expected_items = {
let mut expected_items = vec![];
for _ in 0..repetition {
expected_items.extend(items.iter().cloned())
}
expected_items
};
assert_eq!(expected_items, actual_items);
})
}
}
const PACKAGE_INDEX_CHUNK_SIZE_MAX: usize = 818;
const PACKAGE_INDEX_CHUNK_SIZE_MIN: usize = 194;
#[fuchsia_async::run_singlethreaded(test)]
async fn package_index_iterator_paginates_shortest_entries() {
let names = ('a'..='z').cycle().map(|c| c.to_string());
let paths = names.map(|name| {
PackagePath::from_name_and_variant(name.parse().unwrap(), "0".parse().unwrap())
});
verify_package_index_iterator_pagination(paths, PACKAGE_INDEX_CHUNK_SIZE_MAX).await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn package_index_iterator_paginates_longest_entries() {
let names = ('a'..='z')
.map(|c| std::iter::repeat(c).take(PackagePath::MAX_NAME_BYTES).collect::<String>())
.cycle();
let paths = names.map(|name| {
PackagePath::from_name_and_variant(name.parse().unwrap(), "0".parse().unwrap())
});
verify_package_index_iterator_pagination(paths, PACKAGE_INDEX_CHUNK_SIZE_MIN).await;
}
async fn verify_package_index_iterator_pagination(
paths: impl Iterator<Item = PackagePath>,
expected_chunk_size: usize,
) {
let package_entries: Vec<fidl_fuchsia_pkg::PackageIndexEntry> = paths
.zip(HashRangeFull::default())
.take(expected_chunk_size * 2)
.map(|(path, hash)| fidl_fuchsia_pkg::PackageIndexEntry {
package_url: fidl_fuchsia_pkg::PackageUrl {
url: format!("fuchsia-pkg://fuchsia.com/{}", path),
},
meta_far_blob_id: crate::BlobId::from(hash).into(),
})
.collect();
let (iter, stream) =
fidl::endpoints::create_proxy_and_stream::<PackageIndexIteratorMarker>();
let task = Task::local(serve_fidl_iterator_from_slice(stream, package_entries));
let chunk = iter.next().await.unwrap();
assert_eq!(chunk.len(), expected_chunk_size);
let chunk = iter.next().await.unwrap();
assert_eq!(chunk.len(), expected_chunk_size);
let chunk = iter.next().await.unwrap();
assert_eq!(chunk.len(), 0);
let () = task.await.unwrap();
}
#[cfg(target_os = "fuchsia")]
use assert_matches::assert_matches;
#[cfg(target_os = "fuchsia")]
#[test]
fn serve_fidl_iterator_from_stream_ignores_empty_vec() {
let mut executor = fuchsia_async::TestExecutor::new();
let (proxy, fidl_stream) =
fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>();
let (item_sender, item_stream) = futures::channel::mpsc::unbounded();
let mut serve_task = serve_fidl_iterator_from_stream(fidl_stream, item_stream, 10).boxed();
let () = item_sender.unbounded_send(vec![]).unwrap();
let mut chunk_fut = proxy.next();
assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
assert_matches!(executor.run_until_stalled(&mut chunk_fut), std::task::Poll::Pending);
let blob_info = crate::BlobInfo { blob_id: [0; 32].into(), length: 0 };
let () = item_sender.unbounded_send(vec![blob_info.into()]).unwrap();
assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
assert_matches!(
executor.run_until_stalled(&mut chunk_fut),
std::task::Poll::Ready(Ok(chunk))
if chunk == vec![fidl_fuchsia_pkg::BlobInfo::from(blob_info)]
);
}
#[cfg(target_os = "fuchsia")]
#[test]
fn serve_fidl_iterator_from_stream_does_not_block_if_chunker_not_empty() {
let mut executor = fuchsia_async::TestExecutor::new();
let (proxy, fidl_stream) =
fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>();
let (item_sender, item_stream) = futures::channel::mpsc::unbounded();
let mut serve_task = serve_fidl_iterator_from_stream(fidl_stream, item_stream, 10).boxed();
let blob_info = fidl_fuchsia_pkg::BlobInfo::from(crate::BlobInfo {
blob_id: [0; 32].into(),
length: 0,
});
let max_payloads_per_fidl_response = (ZX_CHANNEL_MAX_MSG_BYTES as usize
- FIDL_VEC_RESPONSE_OVERHEAD_BYTES)
/ measure_fuchsia_pkg::Measurable::measure(&blob_info).num_bytes;
let payloads = vec![blob_info; max_payloads_per_fidl_response + 1];
assert_eq!(
how_many_items_fit_in_fidl_vec_response(payloads.iter()),
max_payloads_per_fidl_response
);
let () = item_sender.unbounded_send(payloads).unwrap();
let mut chunk_fut = proxy.next();
assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
assert_matches!(
executor.run_until_stalled(&mut chunk_fut),
std::task::Poll::Ready(Ok(chunk))
if chunk.len() == max_payloads_per_fidl_response
);
let mut chunk_fut = proxy.next();
assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
assert_matches!(
executor.run_until_stalled(&mut chunk_fut),
std::task::Poll::Ready(Ok(chunk))
if chunk.len() == 1
);
let mut chunk_fut = proxy.next();
assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
assert_matches!(executor.run_until_stalled(&mut chunk_fut), std::task::Poll::Pending);
let () = item_sender.unbounded_send(vec![blob_info; 2]).unwrap();
assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
assert_matches!(
executor.run_until_stalled(&mut chunk_fut),
std::task::Poll::Ready(Ok(chunk))
if chunk.len() == 2
);
}
}