fidl_fuchsia_pkg_ext/
fidl_iterator_to_stream.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_pkg as fpkg;
6use futures::future::TryFutureExt as _;
7use futures::stream::Stream;
8
9/// Converts a proxy to a FIDL iterator like:
10///
11/// protocol PayloadIterator {
12///    Next() -> (vector<Payload>:MAX payloads);
13/// };
14///
15/// into a `Stream` of `Result<Vec<Payload>, fidl::Error>`s.
16///
17/// The returned stream will never yield an empty `Vec`. When e.g. `PayloadIterator::Next` returns
18/// an empty Vec, the returned stream will yield `None` (signaling the end of the stream).
19///
20/// To use with a new protocol (e.g. `PayloadIterator`), implement `FidlIterator` for
21/// `PayloadIteratorProxy`.
22pub fn fidl_iterator_to_stream<T: FidlIterator>(
23    iterator: T,
24) -> impl Stream<Item = Result<Vec<T::Item>, fidl::Error>> + Unpin {
25    futures::stream::try_unfold(iterator, |iterator| {
26        iterator.next().map_ok(|v| if v.is_empty() { None } else { Some((v, iterator)) })
27    })
28}
29
30/// A FIDL proxy for a FIDL protocol following the iterator pattern.
31pub trait FidlIterator {
32    type Item: Unpin;
33
34    fn next(&self) -> fidl::client::QueryResponseFut<Vec<Self::Item>>;
35}
36
37impl FidlIterator for fpkg::BlobInfoIteratorProxy {
38    type Item = fpkg::BlobInfo;
39
40    fn next(&self) -> fidl::client::QueryResponseFut<Vec<Self::Item>> {
41        self.next()
42    }
43}
44
45#[cfg(test)]
46mod tests {
47    use super::*;
48    use assert_matches::assert_matches;
49    use fidl::endpoints::{ControlHandle as _, Responder as _};
50    use futures::future::join;
51    use futures::stream::{StreamExt as _, TryStreamExt as _};
52    use zx_status::Status;
53
54    struct MockIteratorServer {
55        reqs: fpkg::BlobInfoIteratorRequestStream,
56    }
57
58    impl MockIteratorServer {
59        fn new() -> (Self, impl Stream<Item = Result<Vec<fpkg::BlobInfo>, fidl::Error>>) {
60            let (proxy, reqs) =
61                fidl::endpoints::create_proxy_and_stream::<fpkg::BlobInfoIteratorMarker>();
62            (Self { reqs }, fidl_iterator_to_stream(proxy))
63        }
64
65        // On Some(resp) responds with resp, else closes channel with NO_RESOURCES.
66        async fn expect_next(&mut self, resp: Option<Vec<fpkg::BlobInfo>>) {
67            let fpkg::BlobInfoIteratorRequest::Next { responder } =
68                self.reqs.next().await.unwrap().unwrap();
69            match resp {
70                Some(resp) => responder.send(&resp).unwrap(),
71                None => responder.control_handle().shutdown_with_epitaph(Status::NO_RESOURCES),
72            }
73        }
74    }
75
76    fn blob_info(u: u8) -> fpkg::BlobInfo {
77        fpkg::BlobInfo { blob_id: fpkg::BlobId { merkle_root: [u; 32] }, length: 0 }
78    }
79
80    #[fuchsia_async::run_singlethreaded(test)]
81    async fn read_one_item() {
82        let (mut server, mut stream) = MockIteratorServer::new();
83
84        let ((), item) = join(server.expect_next(Some(vec![blob_info(1)])), stream.next()).await;
85
86        assert_matches!(item, Some(Ok(v)) if v == vec![blob_info(1)]);
87    }
88
89    #[fuchsia_async::run_singlethreaded(test)]
90    async fn read_two_items() {
91        let (mut server, mut stream) = MockIteratorServer::new();
92
93        let ((), (first, second)) = join(
94            async {
95                server.expect_next(Some(vec![blob_info(1)])).await;
96                server.expect_next(Some(vec![blob_info(2)])).await
97            },
98            async { (stream.next().await, stream.next().await) },
99        )
100        .await;
101
102        assert_matches!(first, Some(Ok(v)) if v == vec![blob_info(1)]);
103        assert_matches!(second, Some(Ok(v)) if v == vec![blob_info(2)]);
104    }
105
106    #[fuchsia_async::run_singlethreaded(test)]
107    async fn error_terminates() {
108        let (mut server, mut stream) = MockIteratorServer::new();
109
110        let ((), (first, second)) =
111            join(server.expect_next(None), async { (stream.next().await, stream.next().await) })
112                .await;
113
114        assert_matches!(
115            first,
116            Some(Err(fidl::Error::ClientChannelClosed{status, ..}))
117                if status == Status::NO_RESOURCES
118        );
119        assert_matches!(second, None);
120    }
121
122    #[fuchsia_async::run_singlethreaded(test)]
123    async fn empty_response_terminates() {
124        let (mut server, mut stream) = MockIteratorServer::new();
125
126        let ((), item) = join(server.expect_next(Some(vec![])), stream.next()).await;
127
128        assert_matches!(item, None);
129    }
130
131    #[fuchsia_async::run_singlethreaded(test)]
132    async fn read_one_item_then_terminate_successfully() {
133        let (mut server, stream) = MockIteratorServer::new();
134
135        let ((), items) = join(
136            async {
137                server.expect_next(Some(vec![blob_info(1)])).await;
138                server.expect_next(Some(vec![])).await
139            },
140            stream.map_err(|_| ()).try_concat(),
141        )
142        .await;
143
144        assert_eq!(items, Ok(vec![blob_info(1)]));
145    }
146
147    #[fuchsia_async::run_singlethreaded(test)]
148    async fn read_one_item_then_terminate_with_error() {
149        let (mut server, stream) = MockIteratorServer::new();
150
151        let ((), items) = join(
152            async {
153                server.expect_next(Some(vec![blob_info(1)])).await;
154                server.expect_next(None).await
155            },
156            stream.map_err(|_| ()).collect::<Vec<_>>(),
157        )
158        .await;
159
160        assert_eq!(items, vec![Ok(vec![blob_info(1)]), Err(())]);
161    }
162}