1use crate::Measurable;
6use anyhow::{Context as _, Result};
7use fidl_fuchsia_pkg::{
8 BlobIdIteratorNextResponder, BlobIdIteratorRequest, BlobIdIteratorRequestStream,
9 BlobInfoIteratorNextResponder, BlobInfoIteratorRequest, BlobInfoIteratorRequestStream,
10 PackageIndexEntry, PackageIndexIteratorNextResponder, PackageIndexIteratorRequest,
11 PackageIndexIteratorRequestStream,
12};
13use futures::prelude::*;
14use zx_types::ZX_CHANNEL_MAX_MSG_BYTES;
15
16pub async fn serve_fidl_iterator_from_slice<I>(
36 mut fidl_iterator: I,
37 mut items: impl AsMut<[<I::Responder as FidlIteratorNextResponder>::Item]>,
38) -> Result<()>
39where
40 I: FidlIteratorRequestStream,
41{
42 let mut items = SliceChunker::new(items.as_mut());
43
44 loop {
45 let chunk = items.next();
46
47 let responder =
48 match fidl_iterator.try_next().await.context("while waiting for next() request")? {
49 None => break,
50 Some(request) => I::request_to_responder(request),
51 };
52
53 let () = responder.send_chunk(&chunk).context("while responding")?;
54
55 if chunk.is_empty() {
57 break;
58 }
59 }
60
61 Ok(())
62}
63
64pub async fn serve_fidl_iterator_from_stream<I>(
92 mut fidl_iterator: I,
93 stream: impl futures::stream::Stream<Item = Vec<<I::Responder as FidlIteratorNextResponder>::Item>>
94 + Unpin,
95 max_stream_chunks: usize,
96) -> Result<()>
97where
98 I: FidlIteratorRequestStream,
99{
100 let mut chunked_stream = stream.ready_chunks(std::cmp::max(max_stream_chunks, 1));
101 let mut fidl_chunker = OwningChunker::new();
102
103 loop {
104 let responder =
105 match fidl_iterator.try_next().await.context("while waiting for next() request")? {
106 None => break,
107 Some(request) => I::request_to_responder(request),
108 };
109
110 if fidl_chunker.is_empty() {
113 loop {
114 if let Some(xss) = chunked_stream.next().await {
115 fidl_chunker.extend(xss.into_iter().flatten());
116 if fidl_chunker.is_empty() {
117 continue;
118 }
119 }
120 break;
121 }
122 } else {
123 if let Some(Some(xss)) = chunked_stream.next().now_or_never() {
124 fidl_chunker.extend(xss.into_iter().flatten());
125 }
126 }
127
128 let mut chunk = fidl_chunker.next();
129 let () = responder.send_chunk(chunk.make_contiguous()).context("while responding")?;
130 if chunk.is_empty() {
131 break;
132 }
133 }
134
135 Ok(())
136}
137
138pub trait FidlIteratorRequestStream:
140 fidl::endpoints::RequestStream + TryStream<Error = fidl::Error>
141{
142 type Responder: FidlIteratorNextResponder;
143
144 fn request_to_responder(request: <Self as TryStream>::Ok) -> Self::Responder;
145}
146
147pub trait FidlIteratorNextResponder {
149 type Item: Measurable;
150
151 fn send_chunk(self, chunk: &[Self::Item]) -> Result<(), fidl::Error>;
152}
153
154impl FidlIteratorRequestStream for PackageIndexIteratorRequestStream {
155 type Responder = PackageIndexIteratorNextResponder;
156
157 fn request_to_responder(request: PackageIndexIteratorRequest) -> Self::Responder {
158 let PackageIndexIteratorRequest::Next { responder } = request;
159 responder
160 }
161}
162
163impl FidlIteratorNextResponder for PackageIndexIteratorNextResponder {
164 type Item = PackageIndexEntry;
165
166 fn send_chunk(self, chunk: &[Self::Item]) -> Result<(), fidl::Error> {
167 self.send(chunk)
168 }
169}
170
171impl FidlIteratorRequestStream for BlobInfoIteratorRequestStream {
172 type Responder = BlobInfoIteratorNextResponder;
173
174 fn request_to_responder(request: BlobInfoIteratorRequest) -> Self::Responder {
175 let BlobInfoIteratorRequest::Next { responder } = request;
176 responder
177 }
178}
179
180impl FidlIteratorRequestStream for BlobIdIteratorRequestStream {
181 type Responder = BlobIdIteratorNextResponder;
182
183 fn request_to_responder(request: BlobIdIteratorRequest) -> Self::Responder {
184 let BlobIdIteratorRequest::Next { responder } = request;
185 responder
186 }
187}
188
189impl FidlIteratorNextResponder for BlobInfoIteratorNextResponder {
190 type Item = fidl_fuchsia_pkg::BlobInfo;
191
192 fn send_chunk(self, chunk: &[Self::Item]) -> Result<(), fidl::Error> {
193 self.send(chunk)
194 }
195}
196
197impl FidlIteratorNextResponder for BlobIdIteratorNextResponder {
198 type Item = fidl_fuchsia_pkg::BlobId;
199
200 fn send_chunk(self, chunk: &[Self::Item]) -> Result<(), fidl::Error> {
201 self.send(chunk)
202 }
203}
204
205const FIDL_VEC_RESPONSE_OVERHEAD_BYTES: usize = 32;
208
209fn how_many_items_fit_in_fidl_vec_response<'a>(
212 items: impl Iterator<Item = &'a (impl Measurable + 'a)>,
213) -> usize {
214 let mut bytes_used: usize = FIDL_VEC_RESPONSE_OVERHEAD_BYTES;
215 let mut count = 0;
216
217 for item in items {
218 bytes_used += item.measure();
219 if bytes_used > ZX_CHANNEL_MAX_MSG_BYTES as usize {
220 break;
221 }
222 count += 1;
223 }
224 count
225}
226
227struct SliceChunker<'a, I> {
232 items: &'a mut [I],
233}
234
235impl<'a, I> SliceChunker<'a, I>
236where
237 I: Measurable,
238{
239 fn new(items: &'a mut [I]) -> Self {
240 Self { items }
241 }
242
243 fn next(&mut self) -> &'a mut [I] {
251 let entry_count = how_many_items_fit_in_fidl_vec_response(self.items.iter());
252 let tmp = std::mem::replace(&mut self.items, &mut []);
254 let (chunk, rest) = tmp.split_at_mut(entry_count);
255 self.items = rest;
256 chunk
257 }
258}
259
260struct OwningChunker<I> {
265 items: std::collections::VecDeque<I>,
266}
267
268impl<I> OwningChunker<I>
269where
270 I: Measurable,
271{
272 fn new() -> Self {
273 Self { items: std::collections::VecDeque::new() }
274 }
275
276 fn next(&mut self) -> std::collections::VecDeque<I> {
282 let count = how_many_items_fit_in_fidl_vec_response(self.items.iter());
283 let mut other = self.items.split_off(count);
284 std::mem::swap(&mut self.items, &mut other);
285 other
286 }
287
288 fn is_empty(&self) -> bool {
289 self.items.is_empty()
290 }
291
292 fn extend(&mut self, iter: impl IntoIterator<Item = I>) {
293 self.items.extend(iter)
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use fidl_fuchsia_pkg::{BlobInfoIteratorMarker, PackageIndexIteratorMarker};
301 use fuchsia_async::Task;
302 use fuchsia_hash::HashRangeFull;
303 use fuchsia_pkg::PackagePath;
304 use proptest::prelude::*;
305
306 #[test]
307 fn zx_channel_max_msg_bytes_fits_in_usize() {
308 let _: usize = ZX_CHANNEL_MAX_MSG_BYTES.try_into().unwrap();
309 }
310
311 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
312 struct Byte(u8);
313
314 impl Measurable for Byte {
315 fn measure(&self) -> usize {
316 1
317 }
318 }
319
320 #[test]
321 fn slice_chunker_fuses() {
322 let items = &mut [Byte(42)];
323 let mut chunker = SliceChunker::new(items);
324
325 assert_eq!(chunker.next(), &mut [Byte(42)]);
326 assert_eq!(chunker.next(), &mut []);
327 assert_eq!(chunker.next(), &mut []);
328 }
329
330 #[test]
331 fn slice_chunker_chunks_at_expected_boundary() {
332 const BYTES_PER_CHUNK: usize =
333 ZX_CHANNEL_MAX_MSG_BYTES as usize - FIDL_VEC_RESPONSE_OVERHEAD_BYTES;
334
335 let mut items =
337 (0..=(BYTES_PER_CHUNK as u64 * 2)).map(|n| Byte(n as u8)).collect::<Vec<Byte>>();
338 let expected = items.clone();
339 let mut chunker = SliceChunker::new(&mut items);
340
341 let mut actual: Vec<Byte> = vec![];
342
343 for _ in 0..2 {
344 let chunk = chunker.next();
345 assert_eq!(chunk.len(), BYTES_PER_CHUNK);
346
347 actual.extend(&*chunk);
348 }
349
350 let chunk = chunker.next();
351 assert_eq!(chunk.len(), 1);
352 actual.extend(&*chunk);
353
354 assert_eq!(actual, expected);
355 }
356
357 #[test]
358 fn slice_chunker_terminates_at_too_large_item() {
359 #[derive(Debug, PartialEq, Eq)]
360 struct TooBig;
361 impl Measurable for TooBig {
362 fn measure(&self) -> usize {
363 ZX_CHANNEL_MAX_MSG_BYTES as usize
364 }
365 }
366
367 let items = &mut [TooBig];
368 let mut chunker = SliceChunker::new(items);
369 assert_eq!(chunker.next(), &mut []);
370 }
371
372 #[test]
373 fn owning_chunker_fuses() {
374 let items = [Byte(42)];
375 let mut chunker = OwningChunker::new();
376 chunker.extend(items);
377
378 assert_eq!(chunker.next().make_contiguous(), &[Byte(42)]);
379 assert_eq!(chunker.next().make_contiguous(), &[]);
380 assert_eq!(chunker.next().make_contiguous(), &[]);
381 }
382
383 #[test]
384 fn owning_chunker_chunks_at_expected_boundary() {
385 const BYTES_PER_CHUNK: usize =
386 ZX_CHANNEL_MAX_MSG_BYTES as usize - FIDL_VEC_RESPONSE_OVERHEAD_BYTES;
387
388 let items =
390 (0..=(BYTES_PER_CHUNK as u64 * 2)).map(|n| Byte(n as u8)).collect::<Vec<Byte>>();
391 let expected = items.clone();
392 let mut chunker = OwningChunker::new();
393 chunker.extend(items.into_iter());
394
395 let mut actual: Vec<Byte> = vec![];
396
397 for _ in 0..2 {
398 let chunk = chunker.next();
399 assert_eq!(chunk.len(), BYTES_PER_CHUNK);
400
401 actual.extend(chunk);
402 }
403
404 let chunk = chunker.next();
405 assert_eq!(chunk.len(), 1);
406 actual.extend(chunk);
407
408 assert_eq!(actual, expected);
409 }
410
411 #[test]
412 fn owning_chunker_terminates_at_too_large_item() {
413 #[derive(Debug, PartialEq, Eq)]
414 struct TooBig;
415 impl Measurable for TooBig {
416 fn measure(&self) -> usize {
417 ZX_CHANNEL_MAX_MSG_BYTES as usize
418 }
419 }
420
421 let items = [TooBig];
422 let mut chunker = OwningChunker::new();
423 chunker.extend(items);
424 assert_eq!(chunker.next().make_contiguous(), &mut []);
425 }
426
427 #[test]
428 fn owning_chunker_extend_after_next() {
429 let mut chunker = OwningChunker::new();
430 chunker.extend([Byte(0)]);
431 chunker.extend([Byte(1)]);
432
433 assert_eq!(chunker.next().make_contiguous(), &[Byte(0), Byte(1)]);
434 assert_eq!(chunker.next().make_contiguous(), &[]);
435
436 chunker.extend([Byte(2)]);
437
438 assert_eq!(chunker.next().make_contiguous(), &[Byte(2)]);
439 }
440
441 #[test]
442 fn verify_fidl_vec_response_overhead() {
443 let vec_response_overhead = {
444 use fidl::encoding::{
445 DefaultFuchsiaResourceDialect, DynamicFlags, TransactionHeader, TransactionMessage,
446 TransactionMessageType, UnboundedVector,
447 };
448
449 type Msg = TransactionMessageType<UnboundedVector<u8>>;
450 let msg = TransactionMessage {
451 header: TransactionHeader::new(0, 0, DynamicFlags::empty()),
452 body: &[] as &[u8],
453 };
454 fidl::encoding::with_tls_encoded::<Msg, DefaultFuchsiaResourceDialect, _>(
455 msg,
456 |bytes, _handles| Ok(bytes.len()),
457 )
458 .unwrap()
459 };
460 assert_eq!(vec_response_overhead, FIDL_VEC_RESPONSE_OVERHEAD_BYTES);
461 }
462
463 proptest! {
464 #![proptest_config(ProptestConfig{
465 failure_persistence: None,
468 .. ProptestConfig::default()
469 })]
470
471 #[test]
472 fn serve_fidl_iterator_from_slice_yields_expected_entries(items: Vec<crate::BlobInfo>) {
473 let mut executor = fuchsia_async::TestExecutor::new();
474 executor.run_singlethreaded(async move {
475 let (proxy, stream) =
476 fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>();
477 let mut actual_items = vec![];
478
479 let ((), ()) = futures::future::join(
480 async {
481 let items = items
482 .iter()
483 .cloned()
484 .map(fidl_fuchsia_pkg::BlobInfo::from)
485 .collect::<Vec<_>>();
486 serve_fidl_iterator_from_slice(stream, items).await.unwrap()
487 },
488 async {
489 loop {
490 let chunk = proxy.next().await.unwrap();
491 if chunk.is_empty() {
492 break;
493 }
494 let chunk = chunk.into_iter().map(crate::BlobInfo::from);
495 actual_items.extend(chunk);
496 }
497 },
498 )
499 .await;
500
501 assert_eq!(items, actual_items);
502 })
503 }
504
505 #[test]
506 fn serve_fidl_iterator_from_stream_yields_expected_entries(
507 items: Vec<crate::BlobInfo>,
508 repetition in 0..4usize,
509 max_chunking in 0..4usize,
510 ) {
511 let mut executor = fuchsia_async::TestExecutor::new();
512 executor.run_singlethreaded(async move {
513 let (proxy, fidl_stream) =
514 fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>();
515 let (mut item_sender, item_stream) = futures::channel::mpsc::unbounded();
516 let mut actual_items = vec![];
517
518 let ((), (), ()) = futures::future::join3(
519 async {
520 for _ in 0..repetition {
521 let () = item_sender.send(items
522 .iter()
523 .cloned()
524 .map(fidl_fuchsia_pkg::BlobInfo::from)
525 .collect::<Vec<_>>()).await.unwrap();
526 }
527 drop(item_sender);
528 },
529 async {
530 let () = serve_fidl_iterator_from_stream(
531 fidl_stream,
532 item_stream,
533 max_chunking
534 )
535 .await
536 .unwrap();
537 },
538 async {
539 loop {
540 let chunk = proxy.next().await.unwrap();
541 if chunk.is_empty() {
542 break;
543 }
544 let chunk = chunk.into_iter().map(crate::BlobInfo::from);
545 actual_items.extend(chunk);
546 }
547 },
548 )
549 .await;
550
551 let expected_items = {
552 let mut expected_items = vec![];
553 for _ in 0..repetition {
554 expected_items.extend(items.iter().cloned())
555 }
556 expected_items
557 };
558 assert_eq!(expected_items, actual_items);
559 })
560 }
561 }
562
563 const PACKAGE_INDEX_CHUNK_SIZE_MAX: usize = 818;
564
565 const PACKAGE_INDEX_CHUNK_SIZE_MIN: usize = 194;
583
584 #[fuchsia_async::run_singlethreaded(test)]
585 async fn package_index_iterator_paginates_shortest_entries() {
586 let names = ('a'..='z').cycle().map(|c| c.to_string());
587 let paths = names.map(|name| {
588 PackagePath::from_name_and_variant(name.parse().unwrap(), "0".parse().unwrap())
589 });
590
591 verify_package_index_iterator_pagination(paths, PACKAGE_INDEX_CHUNK_SIZE_MAX).await;
592 }
593
594 #[fuchsia_async::run_singlethreaded(test)]
595 async fn package_index_iterator_paginates_longest_entries() {
596 let names = ('a'..='z')
597 .map(|c| std::iter::repeat(c).take(PackagePath::MAX_NAME_BYTES).collect::<String>())
598 .cycle();
599 let paths = names.map(|name| {
600 PackagePath::from_name_and_variant(name.parse().unwrap(), "0".parse().unwrap())
601 });
602
603 verify_package_index_iterator_pagination(paths, PACKAGE_INDEX_CHUNK_SIZE_MIN).await;
604 }
605
606 async fn verify_package_index_iterator_pagination(
607 paths: impl Iterator<Item = PackagePath>,
608 expected_chunk_size: usize,
609 ) {
610 let package_entries: Vec<fidl_fuchsia_pkg::PackageIndexEntry> = paths
611 .zip(HashRangeFull::default())
612 .take(expected_chunk_size * 2)
613 .map(|(path, hash)| fidl_fuchsia_pkg::PackageIndexEntry {
614 package_url: fidl_fuchsia_pkg::PackageUrl {
615 url: format!("fuchsia-pkg://fuchsia.com/{}", path),
616 },
617 meta_far_blob_id: crate::BlobId::from(hash).into(),
618 })
619 .collect();
620
621 let (iter, stream) =
622 fidl::endpoints::create_proxy_and_stream::<PackageIndexIteratorMarker>();
623 let task = Task::local(serve_fidl_iterator_from_slice(stream, package_entries));
624
625 let chunk = iter.next().await.unwrap();
626 assert_eq!(chunk.len(), expected_chunk_size);
627
628 let chunk = iter.next().await.unwrap();
629 assert_eq!(chunk.len(), expected_chunk_size);
630
631 let chunk = iter.next().await.unwrap();
632 assert_eq!(chunk.len(), 0);
633
634 let () = task.await.unwrap();
635 }
636
637 #[cfg(target_os = "fuchsia")]
639 use assert_matches::assert_matches;
640
641 #[cfg(target_os = "fuchsia")]
643 #[test]
644 fn serve_fidl_iterator_from_stream_ignores_empty_vec() {
645 let mut executor = fuchsia_async::TestExecutor::new();
646 let (proxy, fidl_stream) =
647 fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>();
648 let (item_sender, item_stream) = futures::channel::mpsc::unbounded();
649 let mut serve_task = serve_fidl_iterator_from_stream(fidl_stream, item_stream, 10).boxed();
650
651 let () = item_sender.unbounded_send(vec![]).unwrap();
654 let mut chunk_fut = proxy.next();
655 assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
656 assert_matches!(executor.run_until_stalled(&mut chunk_fut), std::task::Poll::Pending);
657
658 let blob_info = crate::BlobInfo { blob_id: [0; 32].into(), length: 0 };
660 let () = item_sender.unbounded_send(vec![blob_info.into()]).unwrap();
661 assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
662 assert_matches!(
663 executor.run_until_stalled(&mut chunk_fut),
664 std::task::Poll::Ready(Ok(chunk))
665 if chunk == vec![fidl_fuchsia_pkg::BlobInfo::from(blob_info)]
666 );
667 }
668
669 #[cfg(target_os = "fuchsia")]
671 #[test]
672 fn serve_fidl_iterator_from_stream_does_not_block_if_chunker_not_empty() {
673 let mut executor = fuchsia_async::TestExecutor::new();
674 let (proxy, fidl_stream) =
675 fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>();
676 let (item_sender, item_stream) = futures::channel::mpsc::unbounded();
677 let mut serve_task = serve_fidl_iterator_from_stream(fidl_stream, item_stream, 10).boxed();
678
679 let blob_info = fidl_fuchsia_pkg::BlobInfo::from(crate::BlobInfo {
680 blob_id: [0; 32].into(),
681 length: 0,
682 });
683 let max_payloads_per_fidl_response = (ZX_CHANNEL_MAX_MSG_BYTES as usize
684 - FIDL_VEC_RESPONSE_OVERHEAD_BYTES)
685 / measure_fuchsia_pkg::Measurable::measure(&blob_info).num_bytes;
686 let payloads = vec![blob_info; max_payloads_per_fidl_response + 1];
687 assert_eq!(
688 how_many_items_fit_in_fidl_vec_response(payloads.iter()),
689 max_payloads_per_fidl_response
690 );
691
692 let () = item_sender.unbounded_send(payloads).unwrap();
694 let mut chunk_fut = proxy.next();
695 assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
696 assert_matches!(
697 executor.run_until_stalled(&mut chunk_fut),
698 std::task::Poll::Ready(Ok(chunk))
699 if chunk.len() == max_payloads_per_fidl_response
700 );
701
702 let mut chunk_fut = proxy.next();
705 assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
706 assert_matches!(
707 executor.run_until_stalled(&mut chunk_fut),
708 std::task::Poll::Ready(Ok(chunk))
709 if chunk.len() == 1
710 );
711
712 let mut chunk_fut = proxy.next();
714 assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
715 assert_matches!(executor.run_until_stalled(&mut chunk_fut), std::task::Poll::Pending);
716
717 let () = item_sender.unbounded_send(vec![blob_info; 2]).unwrap();
719 assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
720 assert_matches!(
721 executor.run_until_stalled(&mut chunk_fut),
722 std::task::Poll::Ready(Ok(chunk))
723 if chunk.len() == 2
724 );
725 }
726}