fidl_fuchsia_pkg_ext/
serve_fidl_iterator.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::Measurable;
6use anyhow::{Context as _, Result};
7use fidl_fuchsia_pkg::{
8    BlobIdIteratorNextResponder, BlobIdIteratorRequest, BlobIdIteratorRequestStream,
9    BlobInfoIteratorNextResponder, BlobInfoIteratorRequest, BlobInfoIteratorRequestStream,
10    PackageIndexEntry, PackageIndexIteratorNextResponder, PackageIndexIteratorRequest,
11    PackageIndexIteratorRequestStream,
12};
13use futures::prelude::*;
14use zx_types::ZX_CHANNEL_MAX_MSG_BYTES;
15
16/// Serves fidl iterators like:
17///
18/// protocol PayloadIterator {
19///    Next() -> (vector<Payload>:MAX payloads);
20/// };
21///
22/// from:
23///   `fidl_iterator`: effectively a stream of `PayloadIterator::Next` requests
24///   `items`: a slice of `Payload`s.
25///
26/// Fills each response to `Next()` with as many entries as will fit in a fidl message. The
27/// returned future completes after `Next()` yields an empty response or the iterator
28/// is interrupted (client closes the channel or the task encounters a FIDL layer error).
29///
30/// To use with a new protocol (e.g. `PayloadIterator`), in this crate:
31///   1. implement `FidlIteratorRequestStream` for `PayloadIteratorRequestStream`
32///   2. implement `FidlIteratorNextResponder` for `PayloadIteratorNextResponder`
33///   3. implement `Measurable` for `Payload` using functions generated by
34///      //tools/fidl/measure-tape
35pub async fn serve_fidl_iterator_from_slice<I>(
36    mut fidl_iterator: I,
37    mut items: impl AsMut<[<I::Responder as FidlIteratorNextResponder>::Item]>,
38) -> Result<()>
39where
40    I: FidlIteratorRequestStream,
41{
42    let mut items = SliceChunker::new(items.as_mut());
43
44    loop {
45        let chunk = items.next();
46
47        let responder =
48            match fidl_iterator.try_next().await.context("while waiting for next() request")? {
49                None => break,
50                Some(request) => I::request_to_responder(request),
51            };
52
53        let () = responder.send_chunk(&chunk).context("while responding")?;
54
55        // Yield a single empty chunk, then stop serving the protocol.
56        if chunk.is_empty() {
57            break;
58        }
59    }
60
61    Ok(())
62}
63
64/// Serves fidl iterators like:
65///
66/// protocol PayloadIterator {
67///    Next() -> (vector<Payload>:MAX payloads);
68/// };
69///
70/// from:
71///   `fidl_iterator`: effectively a stream of `PayloadIterator::Next` requests
72///   `stream`: a Stream<Vec<Payload>>
73///   `max_stream_chunks`: the maximum number of `Vec<Payload>`'s to pull from `stream` at a time.
74///     Making this number larger can pack more `Payload`s into the fidl response, decreasing
75///     overhead, but the buffer of `Vec<Payload>`s is pre-allocated, so if this number is e.g.
76///     `usize::MAX` the program will OOM. This number is the maximum, not the minimum, i.e
77///     `serve_fidl_iterator_from_stream` will not block on `stream` if there are available
78///     `Payload`s to send. Arguments of `0` will be converted to `1`.
79///
80///
81/// Fills each response to `Next()` with as many available entries as will fit in a fidl message.
82/// Only blocks on `stream` if there are no available entries.
83/// The returned future completes after `Next()` yields an empty response or the iterator
84/// is interrupted (client closes the channel or the task encounters a FIDL layer error).
85///
86/// To use with a new protocol (e.g. `PayloadIterator`), in this crate:
87///   1. implement `FidlIteratorRequestStream` for `PayloadIteratorRequestStream`
88///   2. implement `FidlIteratorNextResponder` for `PayloadIteratorNextResponder`
89///   3. implement `Measurable` for `Payload` using functions generated by
90///      //tools/fidl/measure-tape
91pub async fn serve_fidl_iterator_from_stream<I>(
92    mut fidl_iterator: I,
93    stream: impl futures::stream::Stream<Item = Vec<<I::Responder as FidlIteratorNextResponder>::Item>>
94        + Unpin,
95    max_stream_chunks: usize,
96) -> Result<()>
97where
98    I: FidlIteratorRequestStream,
99{
100    let mut chunked_stream = stream.ready_chunks(std::cmp::max(max_stream_chunks, 1));
101    let mut fidl_chunker = OwningChunker::new();
102
103    loop {
104        let responder =
105            match fidl_iterator.try_next().await.context("while waiting for next() request")? {
106                None => break,
107                Some(request) => I::request_to_responder(request),
108            };
109
110        // Get as many new items as possible, to minimize the number of FIDL messages, but don't
111        // block if we already have some.
112        if fidl_chunker.is_empty() {
113            loop {
114                if let Some(xss) = chunked_stream.next().await {
115                    fidl_chunker.extend(xss.into_iter().flatten());
116                    if fidl_chunker.is_empty() {
117                        continue;
118                    }
119                }
120                break;
121            }
122        } else {
123            if let Some(Some(xss)) = chunked_stream.next().now_or_never() {
124                fidl_chunker.extend(xss.into_iter().flatten());
125            }
126        }
127
128        let mut chunk = fidl_chunker.next();
129        let () = responder.send_chunk(chunk.make_contiguous()).context("while responding")?;
130        if chunk.is_empty() {
131            break;
132        }
133    }
134
135    Ok(())
136}
137
138/// A FIDL request stream for a FIDL protocol following the iterator pattern.
139pub trait FidlIteratorRequestStream:
140    fidl::endpoints::RequestStream + TryStream<Error = fidl::Error>
141{
142    type Responder: FidlIteratorNextResponder;
143
144    fn request_to_responder(request: <Self as TryStream>::Ok) -> Self::Responder;
145}
146
147/// A responder to a Next() request for a FIDL iterator.
148pub trait FidlIteratorNextResponder {
149    type Item: Measurable;
150
151    fn send_chunk(self, chunk: &[Self::Item]) -> Result<(), fidl::Error>;
152}
153
154impl FidlIteratorRequestStream for PackageIndexIteratorRequestStream {
155    type Responder = PackageIndexIteratorNextResponder;
156
157    fn request_to_responder(request: PackageIndexIteratorRequest) -> Self::Responder {
158        let PackageIndexIteratorRequest::Next { responder } = request;
159        responder
160    }
161}
162
163impl FidlIteratorNextResponder for PackageIndexIteratorNextResponder {
164    type Item = PackageIndexEntry;
165
166    fn send_chunk(self, chunk: &[Self::Item]) -> Result<(), fidl::Error> {
167        self.send(chunk)
168    }
169}
170
171impl FidlIteratorRequestStream for BlobInfoIteratorRequestStream {
172    type Responder = BlobInfoIteratorNextResponder;
173
174    fn request_to_responder(request: BlobInfoIteratorRequest) -> Self::Responder {
175        let BlobInfoIteratorRequest::Next { responder } = request;
176        responder
177    }
178}
179
180impl FidlIteratorRequestStream for BlobIdIteratorRequestStream {
181    type Responder = BlobIdIteratorNextResponder;
182
183    fn request_to_responder(request: BlobIdIteratorRequest) -> Self::Responder {
184        let BlobIdIteratorRequest::Next { responder } = request;
185        responder
186    }
187}
188
189impl FidlIteratorNextResponder for BlobInfoIteratorNextResponder {
190    type Item = fidl_fuchsia_pkg::BlobInfo;
191
192    fn send_chunk(self, chunk: &[Self::Item]) -> Result<(), fidl::Error> {
193        self.send(chunk)
194    }
195}
196
197impl FidlIteratorNextResponder for BlobIdIteratorNextResponder {
198    type Item = fidl_fuchsia_pkg::BlobId;
199
200    fn send_chunk(self, chunk: &[Self::Item]) -> Result<(), fidl::Error> {
201        self.send(chunk)
202    }
203}
204
205// FIXME(52297) This constant would ideally be exported by the `fidl` crate.
206// sizeof(TransactionHeader) + sizeof(VectorHeader)
207const FIDL_VEC_RESPONSE_OVERHEAD_BYTES: usize = 32;
208
209/// Assumes the fixed overhead of a single fidl response header and a single vec header per chunk.
210/// It must not be used with more complex responses.
211fn how_many_items_fit_in_fidl_vec_response<'a>(
212    items: impl Iterator<Item = &'a (impl Measurable + 'a)>,
213) -> usize {
214    let mut bytes_used: usize = FIDL_VEC_RESPONSE_OVERHEAD_BYTES;
215    let mut count = 0;
216
217    for item in items {
218        bytes_used += item.measure();
219        if bytes_used > ZX_CHANNEL_MAX_MSG_BYTES as usize {
220            break;
221        }
222        count += 1;
223    }
224    count
225}
226
227/// Helper to split a slice of items into chunks that will fit in a single FIDL vec response.
228///
229/// Note, SliceChunker assumes the fixed overhead of a single fidl response header and a single vec
230/// header per chunk.  It must not be used with more complex responses.
231struct SliceChunker<'a, I> {
232    items: &'a mut [I],
233}
234
235impl<'a, I> SliceChunker<'a, I>
236where
237    I: Measurable,
238{
239    fn new(items: &'a mut [I]) -> Self {
240        Self { items }
241    }
242
243    /// Produce the next chunk of items to respond with. Iteration stops when this method returns
244    /// an empty slice, which occurs when either:
245    /// * All items have been returned
246    /// * SliceChunker encounters an item so large that it cannot even be stored in a response
247    ///   dedicated to just that one item.
248    ///
249    /// Once next() returns an empty slice, it will continue to do so in future calls.
250    fn next(&mut self) -> &'a mut [I] {
251        let entry_count = how_many_items_fit_in_fidl_vec_response(self.items.iter());
252        // tmp/swap dance to appease the borrow checker.
253        let tmp = std::mem::replace(&mut self.items, &mut []);
254        let (chunk, rest) = tmp.split_at_mut(entry_count);
255        self.items = rest;
256        chunk
257    }
258}
259
260/// Helper to split a collection of items into chunks that will fit in a single FIDL vec response.
261///
262/// Note, OwningChunker assumes the fixed overhead of a single fidl response header and a single vec
263/// header per chunk.  It must not be used with more complex responses.
264struct OwningChunker<I> {
265    items: std::collections::VecDeque<I>,
266}
267
268impl<I> OwningChunker<I>
269where
270    I: Measurable,
271{
272    fn new() -> Self {
273        Self { items: std::collections::VecDeque::new() }
274    }
275
276    /// Produce the next chunk of items to respond with. Iteration stops when this method returns
277    /// an empty VecDeque, which occurs when either:
278    /// * All items have been returned (and no new items are added)
279    /// * OwningChunker encounters an item so large that it cannot even be stored in a response
280    ///   dedicated to just that one item.
281    fn next(&mut self) -> std::collections::VecDeque<I> {
282        let count = how_many_items_fit_in_fidl_vec_response(self.items.iter());
283        let mut other = self.items.split_off(count);
284        std::mem::swap(&mut self.items, &mut other);
285        other
286    }
287
288    fn is_empty(&self) -> bool {
289        self.items.is_empty()
290    }
291
292    fn extend(&mut self, iter: impl IntoIterator<Item = I>) {
293        self.items.extend(iter)
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use fidl_fuchsia_pkg::{BlobInfoIteratorMarker, PackageIndexIteratorMarker};
301    use fuchsia_async::Task;
302    use fuchsia_hash::HashRangeFull;
303    use fuchsia_pkg::PackagePath;
304    use proptest::prelude::*;
305
306    #[test]
307    fn zx_channel_max_msg_bytes_fits_in_usize() {
308        let _: usize = ZX_CHANNEL_MAX_MSG_BYTES.try_into().unwrap();
309    }
310
311    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
312    struct Byte(u8);
313
314    impl Measurable for Byte {
315        fn measure(&self) -> usize {
316            1
317        }
318    }
319
320    #[test]
321    fn slice_chunker_fuses() {
322        let items = &mut [Byte(42)];
323        let mut chunker = SliceChunker::new(items);
324
325        assert_eq!(chunker.next(), &mut [Byte(42)]);
326        assert_eq!(chunker.next(), &mut []);
327        assert_eq!(chunker.next(), &mut []);
328    }
329
330    #[test]
331    fn slice_chunker_chunks_at_expected_boundary() {
332        const BYTES_PER_CHUNK: usize =
333            ZX_CHANNEL_MAX_MSG_BYTES as usize - FIDL_VEC_RESPONSE_OVERHEAD_BYTES;
334
335        // Expect to fill 2 full chunks with 1 item left over.
336        let mut items =
337            (0..=(BYTES_PER_CHUNK as u64 * 2)).map(|n| Byte(n as u8)).collect::<Vec<Byte>>();
338        let expected = items.clone();
339        let mut chunker = SliceChunker::new(&mut items);
340
341        let mut actual: Vec<Byte> = vec![];
342
343        for _ in 0..2 {
344            let chunk = chunker.next();
345            assert_eq!(chunk.len(), BYTES_PER_CHUNK);
346
347            actual.extend(&*chunk);
348        }
349
350        let chunk = chunker.next();
351        assert_eq!(chunk.len(), 1);
352        actual.extend(&*chunk);
353
354        assert_eq!(actual, expected);
355    }
356
357    #[test]
358    fn slice_chunker_terminates_at_too_large_item() {
359        #[derive(Debug, PartialEq, Eq)]
360        struct TooBig;
361        impl Measurable for TooBig {
362            fn measure(&self) -> usize {
363                ZX_CHANNEL_MAX_MSG_BYTES as usize
364            }
365        }
366
367        let items = &mut [TooBig];
368        let mut chunker = SliceChunker::new(items);
369        assert_eq!(chunker.next(), &mut []);
370    }
371
372    #[test]
373    fn owning_chunker_fuses() {
374        let items = [Byte(42)];
375        let mut chunker = OwningChunker::new();
376        chunker.extend(items);
377
378        assert_eq!(chunker.next().make_contiguous(), &[Byte(42)]);
379        assert_eq!(chunker.next().make_contiguous(), &[]);
380        assert_eq!(chunker.next().make_contiguous(), &[]);
381    }
382
383    #[test]
384    fn owning_chunker_chunks_at_expected_boundary() {
385        const BYTES_PER_CHUNK: usize =
386            ZX_CHANNEL_MAX_MSG_BYTES as usize - FIDL_VEC_RESPONSE_OVERHEAD_BYTES;
387
388        // Expect to fill 2 full chunks with 1 item left over.
389        let items =
390            (0..=(BYTES_PER_CHUNK as u64 * 2)).map(|n| Byte(n as u8)).collect::<Vec<Byte>>();
391        let expected = items.clone();
392        let mut chunker = OwningChunker::new();
393        chunker.extend(items.into_iter());
394
395        let mut actual: Vec<Byte> = vec![];
396
397        for _ in 0..2 {
398            let chunk = chunker.next();
399            assert_eq!(chunk.len(), BYTES_PER_CHUNK);
400
401            actual.extend(chunk);
402        }
403
404        let chunk = chunker.next();
405        assert_eq!(chunk.len(), 1);
406        actual.extend(chunk);
407
408        assert_eq!(actual, expected);
409    }
410
411    #[test]
412    fn owning_chunker_terminates_at_too_large_item() {
413        #[derive(Debug, PartialEq, Eq)]
414        struct TooBig;
415        impl Measurable for TooBig {
416            fn measure(&self) -> usize {
417                ZX_CHANNEL_MAX_MSG_BYTES as usize
418            }
419        }
420
421        let items = [TooBig];
422        let mut chunker = OwningChunker::new();
423        chunker.extend(items);
424        assert_eq!(chunker.next().make_contiguous(), &mut []);
425    }
426
427    #[test]
428    fn owning_chunker_extend_after_next() {
429        let mut chunker = OwningChunker::new();
430        chunker.extend([Byte(0)]);
431        chunker.extend([Byte(1)]);
432
433        assert_eq!(chunker.next().make_contiguous(), &[Byte(0), Byte(1)]);
434        assert_eq!(chunker.next().make_contiguous(), &[]);
435
436        chunker.extend([Byte(2)]);
437
438        assert_eq!(chunker.next().make_contiguous(), &[Byte(2)]);
439    }
440
441    #[test]
442    fn verify_fidl_vec_response_overhead() {
443        let vec_response_overhead = {
444            use fidl::encoding::{
445                DefaultFuchsiaResourceDialect, DynamicFlags, TransactionHeader, TransactionMessage,
446                TransactionMessageType, UnboundedVector,
447            };
448
449            type Msg = TransactionMessageType<UnboundedVector<u8>>;
450            let msg = TransactionMessage {
451                header: TransactionHeader::new(0, 0, DynamicFlags::empty()),
452                body: &[] as &[u8],
453            };
454            fidl::encoding::with_tls_encoded::<Msg, DefaultFuchsiaResourceDialect, _>(
455                msg,
456                |bytes, _handles| Ok(bytes.len()),
457            )
458            .unwrap()
459        };
460        assert_eq!(vec_response_overhead, FIDL_VEC_RESPONSE_OVERHEAD_BYTES);
461    }
462
463    proptest! {
464        #![proptest_config(ProptestConfig{
465            // Disable persistence to avoid the warning for not running in the
466            // source code directory (since we're running on a Fuchsia target)
467            failure_persistence: None,
468            .. ProptestConfig::default()
469        })]
470
471        #[test]
472        fn serve_fidl_iterator_from_slice_yields_expected_entries(items: Vec<crate::BlobInfo>) {
473            let mut executor = fuchsia_async::TestExecutor::new();
474            executor.run_singlethreaded(async move {
475                let (proxy, stream) =
476                    fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>();
477                let mut actual_items = vec![];
478
479                let ((), ()) = futures::future::join(
480                    async {
481                        let items = items
482                            .iter()
483                            .cloned()
484                            .map(fidl_fuchsia_pkg::BlobInfo::from)
485                            .collect::<Vec<_>>();
486                        serve_fidl_iterator_from_slice(stream, items).await.unwrap()
487                    },
488                    async {
489                        loop {
490                            let chunk = proxy.next().await.unwrap();
491                            if chunk.is_empty() {
492                                break;
493                            }
494                            let chunk = chunk.into_iter().map(crate::BlobInfo::from);
495                            actual_items.extend(chunk);
496                        }
497                    },
498                )
499                .await;
500
501                assert_eq!(items, actual_items);
502            })
503        }
504
505        #[test]
506        fn serve_fidl_iterator_from_stream_yields_expected_entries(
507            items: Vec<crate::BlobInfo>,
508            repetition in 0..4usize,
509            max_chunking in 0..4usize,
510        ) {
511            let mut executor = fuchsia_async::TestExecutor::new();
512            executor.run_singlethreaded(async move {
513                let (proxy, fidl_stream) =
514                    fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>();
515                let (mut item_sender, item_stream) = futures::channel::mpsc::unbounded();
516                let mut actual_items = vec![];
517
518                let ((), (), ()) = futures::future::join3(
519                    async {
520                        for _ in 0..repetition {
521                            let () = item_sender.send(items
522                                .iter()
523                                .cloned()
524                                .map(fidl_fuchsia_pkg::BlobInfo::from)
525                                .collect::<Vec<_>>()).await.unwrap();
526                        }
527                        drop(item_sender);
528                    },
529                    async {
530                        let () = serve_fidl_iterator_from_stream(
531                            fidl_stream,
532                            item_stream,
533                            max_chunking
534                        )
535                        .await
536                        .unwrap();
537                    },
538                    async {
539                        loop {
540                            let chunk = proxy.next().await.unwrap();
541                            if chunk.is_empty() {
542                                break;
543                            }
544                            let chunk = chunk.into_iter().map(crate::BlobInfo::from);
545                            actual_items.extend(chunk);
546                        }
547                    },
548                )
549                .await;
550
551                let expected_items = {
552                    let mut expected_items = vec![];
553                    for _ in 0..repetition {
554                        expected_items.extend(items.iter().cloned())
555                    }
556                    expected_items
557                };
558                assert_eq!(expected_items, actual_items);
559            })
560        }
561    }
562
563    const PACKAGE_INDEX_CHUNK_SIZE_MAX: usize = 818;
564
565    // FIDL message is at most 65,536 bytes because of zx_channel_write [1].
566    // `PackageIndexIterator.Next()` return value size, encoded [2], is:
567    // 16 bytes FIDL transaction header +
568    // 16 bytes vector header +
569    // N * (16 bytes string header (from url field of struct PackageUrl) +
570    // L bytes string content +
571    // 32 bytes array.
572    // This totals in 32 + N * (48 + L), where L is 8-byte aligned
573    // because secondary objects (e.g. string contents) are 8-byte aligned.
574    //
575    // The shortest possible package url is 29 bytes "fuchsia-pkg://fuchsia.com/a/0".
576    //
577    // And the longest is 283 bytes, which is 288 bytes with 8-byte alignment, so
578    // PACKAGE_INDEX_CHUNK_SIZE_MIN => 65536 <= 32 + N * (48 + 288) => N = 194
579    //
580    // [1] https://fuchsia.dev/fuchsia-src/reference/syscalls/channel_write
581    // [2] https://fuchsia.dev/fuchsia-src/reference/fidl/language/wire-format
582    const PACKAGE_INDEX_CHUNK_SIZE_MIN: usize = 194;
583
584    #[fuchsia_async::run_singlethreaded(test)]
585    async fn package_index_iterator_paginates_shortest_entries() {
586        let names = ('a'..='z').cycle().map(|c| c.to_string());
587        let paths = names.map(|name| {
588            PackagePath::from_name_and_variant(name.parse().unwrap(), "0".parse().unwrap())
589        });
590
591        verify_package_index_iterator_pagination(paths, PACKAGE_INDEX_CHUNK_SIZE_MAX).await;
592    }
593
594    #[fuchsia_async::run_singlethreaded(test)]
595    async fn package_index_iterator_paginates_longest_entries() {
596        let names = ('a'..='z')
597            .map(|c| std::iter::repeat(c).take(PackagePath::MAX_NAME_BYTES).collect::<String>())
598            .cycle();
599        let paths = names.map(|name| {
600            PackagePath::from_name_and_variant(name.parse().unwrap(), "0".parse().unwrap())
601        });
602
603        verify_package_index_iterator_pagination(paths, PACKAGE_INDEX_CHUNK_SIZE_MIN).await;
604    }
605
606    async fn verify_package_index_iterator_pagination(
607        paths: impl Iterator<Item = PackagePath>,
608        expected_chunk_size: usize,
609    ) {
610        let package_entries: Vec<fidl_fuchsia_pkg::PackageIndexEntry> = paths
611            .zip(HashRangeFull::default())
612            .take(expected_chunk_size * 2)
613            .map(|(path, hash)| fidl_fuchsia_pkg::PackageIndexEntry {
614                package_url: fidl_fuchsia_pkg::PackageUrl {
615                    url: format!("fuchsia-pkg://fuchsia.com/{}", path),
616                },
617                meta_far_blob_id: crate::BlobId::from(hash).into(),
618            })
619            .collect();
620
621        let (iter, stream) =
622            fidl::endpoints::create_proxy_and_stream::<PackageIndexIteratorMarker>();
623        let task = Task::local(serve_fidl_iterator_from_slice(stream, package_entries));
624
625        let chunk = iter.next().await.unwrap();
626        assert_eq!(chunk.len(), expected_chunk_size);
627
628        let chunk = iter.next().await.unwrap();
629        assert_eq!(chunk.len(), expected_chunk_size);
630
631        let chunk = iter.next().await.unwrap();
632        assert_eq!(chunk.len(), 0);
633
634        let () = task.await.unwrap();
635    }
636
637    // TestExecutor.run_until_stalled is not available on host
638    #[cfg(target_os = "fuchsia")]
639    use assert_matches::assert_matches;
640
641    // TestExecutor.run_until_stalled is not available on host
642    #[cfg(target_os = "fuchsia")]
643    #[test]
644    fn serve_fidl_iterator_from_stream_ignores_empty_vec() {
645        let mut executor = fuchsia_async::TestExecutor::new();
646        let (proxy, fidl_stream) =
647            fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>();
648        let (item_sender, item_stream) = futures::channel::mpsc::unbounded();
649        let mut serve_task = serve_fidl_iterator_from_stream(fidl_stream, item_stream, 10).boxed();
650
651        // serve_fidl_iterator_from_stream should ignore the empty vec of Payloads, so
652        // chunk_fut should not complete.
653        let () = item_sender.unbounded_send(vec![]).unwrap();
654        let mut chunk_fut = proxy.next();
655        assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
656        assert_matches!(executor.run_until_stalled(&mut chunk_fut), std::task::Poll::Pending);
657
658        // chunk_fut should complete once serve_fidl_iterator_from_stream is given a Payload
659        let blob_info = crate::BlobInfo { blob_id: [0; 32].into(), length: 0 };
660        let () = item_sender.unbounded_send(vec![blob_info.into()]).unwrap();
661        assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
662        assert_matches!(
663            executor.run_until_stalled(&mut chunk_fut),
664            std::task::Poll::Ready(Ok(chunk))
665                if chunk == vec![fidl_fuchsia_pkg::BlobInfo::from(blob_info)]
666        );
667    }
668
669    // TestExecutor.run_until_stalled is not available on host
670    #[cfg(target_os = "fuchsia")]
671    #[test]
672    fn serve_fidl_iterator_from_stream_does_not_block_if_chunker_not_empty() {
673        let mut executor = fuchsia_async::TestExecutor::new();
674        let (proxy, fidl_stream) =
675            fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>();
676        let (item_sender, item_stream) = futures::channel::mpsc::unbounded();
677        let mut serve_task = serve_fidl_iterator_from_stream(fidl_stream, item_stream, 10).boxed();
678
679        let blob_info = fidl_fuchsia_pkg::BlobInfo::from(crate::BlobInfo {
680            blob_id: [0; 32].into(),
681            length: 0,
682        });
683        let max_payloads_per_fidl_response = (ZX_CHANNEL_MAX_MSG_BYTES as usize
684            - FIDL_VEC_RESPONSE_OVERHEAD_BYTES)
685            / measure_fuchsia_pkg::Measurable::measure(&blob_info).num_bytes;
686        let payloads = vec![blob_info; max_payloads_per_fidl_response + 1];
687        assert_eq!(
688            how_many_items_fit_in_fidl_vec_response(payloads.iter()),
689            max_payloads_per_fidl_response
690        );
691
692        // Send all the payloads, the first FIDL response should contain as many as will fit.
693        let () = item_sender.unbounded_send(payloads).unwrap();
694        let mut chunk_fut = proxy.next();
695        assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
696        assert_matches!(
697            executor.run_until_stalled(&mut chunk_fut),
698            std::task::Poll::Ready(Ok(chunk))
699                if chunk.len() == max_payloads_per_fidl_response
700        );
701
702        // There should be one payload left in the OwningChunker, so we should be able to obtain
703        // another FIDL response without sending more payloads.
704        let mut chunk_fut = proxy.next();
705        assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
706        assert_matches!(
707            executor.run_until_stalled(&mut chunk_fut),
708            std::task::Poll::Ready(Ok(chunk))
709                if chunk.len() == 1
710        );
711
712        // There should be no payloads left, so the next Next request should block.
713        let mut chunk_fut = proxy.next();
714        assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
715        assert_matches!(executor.run_until_stalled(&mut chunk_fut), std::task::Poll::Pending);
716
717        // The serving task should start providing payloads again when more are provided.
718        let () = item_sender.unbounded_send(vec![blob_info; 2]).unwrap();
719        assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
720        assert_matches!(
721            executor.run_until_stalled(&mut chunk_fut),
722            std::task::Poll::Ready(Ok(chunk))
723                if chunk.len() == 2
724        );
725    }
726}