fidl_fuchsia_pkg_ext/
fidl_iterator_to_stream.rs
1use fidl_fuchsia_pkg as fpkg;
6use futures::future::TryFutureExt as _;
7use futures::stream::Stream;
8
9pub fn fidl_iterator_to_stream<T: FidlIterator>(
23 iterator: T,
24) -> impl Stream<Item = Result<Vec<T::Item>, fidl::Error>> + Unpin {
25 futures::stream::try_unfold(iterator, |iterator| {
26 iterator.next().map_ok(|v| if v.is_empty() { None } else { Some((v, iterator)) })
27 })
28}
29
30pub trait FidlIterator {
32 type Item: Unpin;
33
34 fn next(&self) -> fidl::client::QueryResponseFut<Vec<Self::Item>>;
35}
36
37impl FidlIterator for fpkg::BlobInfoIteratorProxy {
38 type Item = fpkg::BlobInfo;
39
40 fn next(&self) -> fidl::client::QueryResponseFut<Vec<Self::Item>> {
41 self.next()
42 }
43}
44
45#[cfg(test)]
46mod tests {
47 use super::*;
48 use assert_matches::assert_matches;
49 use fidl::endpoints::{ControlHandle as _, Responder as _};
50 use futures::future::join;
51 use futures::stream::{StreamExt as _, TryStreamExt as _};
52 use zx_status::Status;
53
54 struct MockIteratorServer {
55 reqs: fpkg::BlobInfoIteratorRequestStream,
56 }
57
58 impl MockIteratorServer {
59 fn new() -> (Self, impl Stream<Item = Result<Vec<fpkg::BlobInfo>, fidl::Error>>) {
60 let (proxy, reqs) =
61 fidl::endpoints::create_proxy_and_stream::<fpkg::BlobInfoIteratorMarker>();
62 (Self { reqs }, fidl_iterator_to_stream(proxy))
63 }
64
65 async fn expect_next(&mut self, resp: Option<Vec<fpkg::BlobInfo>>) {
67 let fpkg::BlobInfoIteratorRequest::Next { responder } =
68 self.reqs.next().await.unwrap().unwrap();
69 match resp {
70 Some(resp) => responder.send(&resp).unwrap(),
71 None => responder.control_handle().shutdown_with_epitaph(Status::NO_RESOURCES),
72 }
73 }
74 }
75
76 fn blob_info(u: u8) -> fpkg::BlobInfo {
77 fpkg::BlobInfo { blob_id: fpkg::BlobId { merkle_root: [u; 32] }, length: 0 }
78 }
79
80 #[fuchsia_async::run_singlethreaded(test)]
81 async fn read_one_item() {
82 let (mut server, mut stream) = MockIteratorServer::new();
83
84 let ((), item) = join(server.expect_next(Some(vec![blob_info(1)])), stream.next()).await;
85
86 assert_matches!(item, Some(Ok(v)) if v == vec![blob_info(1)]);
87 }
88
89 #[fuchsia_async::run_singlethreaded(test)]
90 async fn read_two_items() {
91 let (mut server, mut stream) = MockIteratorServer::new();
92
93 let ((), (first, second)) = join(
94 async {
95 server.expect_next(Some(vec![blob_info(1)])).await;
96 server.expect_next(Some(vec![blob_info(2)])).await
97 },
98 async { (stream.next().await, stream.next().await) },
99 )
100 .await;
101
102 assert_matches!(first, Some(Ok(v)) if v == vec![blob_info(1)]);
103 assert_matches!(second, Some(Ok(v)) if v == vec![blob_info(2)]);
104 }
105
106 #[fuchsia_async::run_singlethreaded(test)]
107 async fn error_terminates() {
108 let (mut server, mut stream) = MockIteratorServer::new();
109
110 let ((), (first, second)) =
111 join(server.expect_next(None), async { (stream.next().await, stream.next().await) })
112 .await;
113
114 assert_matches!(
115 first,
116 Some(Err(fidl::Error::ClientChannelClosed{status, ..}))
117 if status == Status::NO_RESOURCES
118 );
119 assert_matches!(second, None);
120 }
121
122 #[fuchsia_async::run_singlethreaded(test)]
123 async fn empty_response_terminates() {
124 let (mut server, mut stream) = MockIteratorServer::new();
125
126 let ((), item) = join(server.expect_next(Some(vec![])), stream.next()).await;
127
128 assert_matches!(item, None);
129 }
130
131 #[fuchsia_async::run_singlethreaded(test)]
132 async fn read_one_item_then_terminate_successfully() {
133 let (mut server, stream) = MockIteratorServer::new();
134
135 let ((), items) = join(
136 async {
137 server.expect_next(Some(vec![blob_info(1)])).await;
138 server.expect_next(Some(vec![])).await
139 },
140 stream.map_err(|_| ()).try_concat(),
141 )
142 .await;
143
144 assert_eq!(items, Ok(vec![blob_info(1)]));
145 }
146
147 #[fuchsia_async::run_singlethreaded(test)]
148 async fn read_one_item_then_terminate_with_error() {
149 let (mut server, stream) = MockIteratorServer::new();
150
151 let ((), items) = join(
152 async {
153 server.expect_next(Some(vec![blob_info(1)])).await;
154 server.expect_next(None).await
155 },
156 stream.map_err(|_| ()).collect::<Vec<_>>(),
157 )
158 .await;
159
160 assert_eq!(items, vec![Ok(vec![blob_info(1)]), Err(())]);
161 }
162}