1#![deny(missing_docs)]
6
7use fidl::endpoints::{Proxy, Request, RequestStream, create_proxy_and_stream};
10use fuchsia_async::Task;
11use futures::{FutureExt, TryFutureExt, TryStreamExt};
12use log::error;
13
14pub mod fdomain {
16 use fdomain_client::Client;
17 use fdomain_client::fidl::{Proxy, Request, RequestStream};
18 use fuchsia_async::Task;
19 use futures::{FutureExt, TryFutureExt, TryStreamExt};
20 use log::error;
21 use std::sync::Arc;
22
23 pub fn spawn_local_stream_handler<P, F, Fut>(client: &Arc<Client>, f: F) -> P
26 where
27 P: Proxy,
28 F: FnMut(Request<P::Protocol>) -> Fut + 'static,
29 Fut: Future<Output = ()> + 'static,
30 {
31 let (proxy, stream) = client.create_proxy_and_stream::<P::Protocol>();
32 Task::local(for_each_or_log(Arc::clone(client), stream, f)).detach();
33 proxy
34 }
35
36 pub fn spawn_stream_handler<P, F, Fut>(client: &Arc<Client>, f: F) -> P
39 where
40 P: Proxy,
41 F: FnMut(Request<P::Protocol>) -> Fut + 'static + Send,
42 Fut: Future<Output = ()> + 'static + Send,
43 {
44 let (proxy, stream) = client.create_proxy_and_stream::<P::Protocol>();
45 Task::spawn(for_each_or_log(Arc::clone(client), stream, f)).detach();
46 proxy
47 }
48
49 fn for_each_or_log<St, F, Fut>(
50 client: Arc<Client>,
51 stream: St,
52 mut f: F,
53 ) -> impl Future<Output = ()>
54 where
55 St: RequestStream,
56 F: FnMut(St::Ok) -> Fut,
57 Fut: Future<Output = ()>,
58 {
59 async move {
60 let _client = client;
63
64 stream
65 .try_for_each(move |r| f(r).map(Ok))
66 .unwrap_or_else(|e| error!("FIDL stream handler failed: {}", e))
67 .await
68 }
69 }
70}
71
72pub fn spawn_local_stream_handler<P, F, Fut>(f: F) -> P
75where
76 P: Proxy,
77 F: FnMut(Request<P::Protocol>) -> Fut + 'static,
78 Fut: Future<Output = ()> + 'static,
79{
80 let (proxy, stream) = create_proxy_and_stream::<P::Protocol>();
81 Task::local(for_each_or_log(stream, f)).detach();
82 proxy
83}
84
85pub fn spawn_stream_handler<P, F, Fut>(f: F) -> P
88where
89 P: Proxy,
90 F: FnMut(Request<P::Protocol>) -> Fut + 'static + Send,
91 Fut: Future<Output = ()> + 'static + Send,
92{
93 let (proxy, stream) = create_proxy_and_stream::<P::Protocol>();
94 Task::spawn(for_each_or_log(stream, f)).detach();
95 proxy
96}
97
98fn for_each_or_log<St, F, Fut>(stream: St, mut f: F) -> impl Future<Output = ()>
99where
100 St: RequestStream,
101 F: FnMut(St::Ok) -> Fut,
102 Fut: Future<Output = ()>,
103{
104 stream
105 .try_for_each(move |r| f(r).map(Ok))
106 .unwrap_or_else(|e| error!("FIDL stream handler failed: {}", e))
107}
108
109#[cfg(test)]
110mod test {
111 use super::*;
112 use fdomain_test_placeholders::{EchoProxy as FEchoProxy, EchoRequest as FEchoRequest};
113 use fidl_test_placeholders::{EchoProxy, EchoRequest};
114
115 #[fuchsia::test]
116 async fn test_spawn_local_stream_handler() {
117 let f = |req| {
118 let EchoRequest::EchoString { value, responder } = req;
119 async move {
120 responder.send(Some(&value.unwrap())).expect("responder failed");
121 }
122 };
123 let proxy: EchoProxy = spawn_local_stream_handler(f);
124 let res = proxy.echo_string(Some("hello world")).await.expect("echo failed");
125 assert_eq!(res, Some("hello world".to_string()));
126 let res = proxy.echo_string(Some("goodbye world")).await.expect("echo failed");
127 assert_eq!(res, Some("goodbye world".to_string()));
128 }
129
130 #[fuchsia::test(threads = 2)]
131 async fn test_spawn_stream_handler() {
132 let f = |req| {
133 let EchoRequest::EchoString { value, responder } = req;
134 async move {
135 responder.send(Some(&value.unwrap())).expect("responder failed");
136 }
137 };
138 let proxy: EchoProxy = spawn_stream_handler(f);
139 let res = proxy.echo_string(Some("hello world")).await.expect("echo failed");
140 assert_eq!(res, Some("hello world".to_string()));
141 let res = proxy.echo_string(Some("goodbye world")).await.expect("echo failed");
142 assert_eq!(res, Some("goodbye world".to_string()));
143 }
144
145 #[fuchsia::test]
146 async fn test_spawn_local_stream_handler_fdomain() {
147 let thread_unsafe = std::rc::Rc::new(());
148 let client = fdomain_local::local_client_empty();
149 let f = move |req| {
150 let _thread_unsafe = thread_unsafe.clone();
151 let FEchoRequest::EchoString { value, responder } = req;
152 async move {
153 responder.send(Some(&value.unwrap())).expect("responder failed");
154 }
155 };
156 let proxy: FEchoProxy = fdomain::spawn_local_stream_handler(&client, f);
157 let res = proxy.echo_string(Some("hello world")).await.expect("echo failed");
158 assert_eq!(res, Some("hello world".to_string()));
159 let res = proxy.echo_string(Some("goodbye world")).await.expect("echo failed");
160 assert_eq!(res, Some("goodbye world".to_string()));
161 }
162
163 #[fuchsia::test(threads = 2)]
164 async fn test_spawn_stream_handler_fdomain() {
165 let client = fdomain_local::local_client_empty();
166 let f = |req| {
167 let FEchoRequest::EchoString { value, responder } = req;
168 async move {
169 responder.send(Some(&value.unwrap())).expect("responder failed");
170 }
171 };
172 let proxy: FEchoProxy = fdomain::spawn_stream_handler(&client, f);
173 let res = proxy.echo_string(Some("hello world")).await.expect("echo failed");
174 assert_eq!(res, Some("hello world".to_string()));
175 let res = proxy.echo_string(Some("goodbye world")).await.expect("echo failed");
176 assert_eq!(res, Some("goodbye world".to_string()));
177 }
178}