1use crate::{Event, EventSource};
6#[cfg(target_os = "fuchsia")]
7use fidl_fuchsia_net_http as fnet_http;
8#[cfg(target_os = "fuchsia")]
9use fuchsia_async as fasync;
10use fuchsia_hyper::HttpsClient;
11#[cfg(target_os = "fuchsia")]
12use futures::io::AsyncReadExt as _;
13use futures::stream::{Stream, StreamExt as _, TryStreamExt as _};
14use futures::task::{Context, Poll};
15use hyper::{Body, Request, StatusCode};
16use std::pin::Pin;
17use thiserror::Error;
18
19#[derive(derivative::Derivative)]
21#[derivative(Debug)]
22pub struct Client {
23 #[derivative(Debug = "ignore")]
24 chunks: futures::stream::BoxStream<'static, Result<hyper::body::Bytes, anyhow::Error>>,
25 source: EventSource,
26 events: std::vec::IntoIter<Event>,
27}
28
29impl Client {
30 pub async fn from_hyper_client(
32 https_client: &HttpsClient,
33 url: impl AsRef<str>,
34 ) -> Result<Self, FromHyperClientError> {
35 let request = Request::get(url.as_ref())
36 .header("accept", "text/event-stream")
37 .body(Body::empty())
38 .map_err(|e| FromHyperClientError::CreateRequest(e))?;
39 let response = https_client
40 .request(request)
41 .await
42 .map_err(|e| FromHyperClientError::MakeRequest(e))?;
43 if response.status() != StatusCode::OK {
44 return Err(FromHyperClientError::HttpStatus(response.status()));
45 }
46 Ok(Self {
47 chunks: response.into_body().map_err(|e| anyhow::anyhow!(e)).boxed(),
48 source: EventSource::new(),
49 events: vec![].into_iter(),
50 })
51 }
52
53 #[cfg(target_os = "fuchsia")]
59 pub async fn from_http_loader(
60 loader: &fnet_http::LoaderProxy,
61 url: String,
62 stream_buf_size: usize,
63 ) -> Result<Self, FromHttpLoaderError> {
64 let resp = loader
65 .fetch(fnet_http::Request {
66 method: None,
67 url: Some(url),
68 headers: Some(vec![fnet_http::Header {
69 name: b"accept".to_vec(),
70 value: b"text/event-stream".to_vec(),
71 }]),
72 body: None,
73 deadline: None,
74 ..Default::default()
75 })
76 .await
77 .map_err(FromHttpLoaderError::FetchFidl)?;
78 let socket = match resp {
79 fnet_http::Response {
80 error: None, body: Some(body), status_code: Some(200), ..
81 } => body,
82 fnet_http::Response { error, status_code, status_line, .. } => {
83 return Err(FromHttpLoaderError::FetchError { error, status_code, status_line });
84 }
85 };
86 Ok(Self {
87 chunks: stream_from_socket(fasync::Socket::from_socket(socket), stream_buf_size),
88 source: EventSource::new(),
89 events: vec![].into_iter(),
90 })
91 }
92}
93
94#[cfg(target_os = "fuchsia")]
95fn stream_from_socket(
96 socket: fasync::Socket,
97 stream_buf_size: usize,
98) -> futures::stream::BoxStream<'static, Result<hyper::body::Bytes, anyhow::Error>> {
99 let initial_buf = vec![0; stream_buf_size];
100 futures::stream::try_unfold((socket, initial_buf), |(mut socket, mut buf)| async move {
101 match socket.read(&mut buf).await {
102 Ok(0) => Ok(None),
103 Ok(n) => {
104 let chunk = hyper::body::Bytes::copy_from_slice(&buf[..n]);
105 Ok(Some((chunk, (socket, buf))))
106 }
107 Err(e) => Err(anyhow::anyhow!(e).context("reading from socket")),
108 }
109 })
110 .boxed()
111}
112
113#[derive(Debug, Error)]
114pub enum FromHyperClientError {
115 #[error("error creating http request")]
116 CreateRequest(#[source] hyper::http::Error),
117
118 #[error("error making http request")]
119 MakeRequest(#[source] hyper::Error),
120
121 #[error("http server responded with status other than OK: {0}")]
122 HttpStatus(hyper::StatusCode),
123}
124
125#[cfg(target_os = "fuchsia")]
126#[derive(Debug, Error)]
127pub enum FromHttpLoaderError {
128 #[error("fuchsia.net.http/Loader.Fetch fidl error")]
129 FetchFidl(#[source] fidl::Error),
130
131 #[error(
132 "fuchsia.net.http/Loader.Fetch error: status {status_code:?} {error:?} {status_line:?}"
133 )]
134 FetchError {
135 error: Option<fnet_http::Error>,
136 status_code: Option<u32>,
137 status_line: Option<Vec<u8>>,
138 },
139}
140
141impl Stream for Client {
142 type Item = Result<Event, ClientPollError>;
143
144 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
145 loop {
146 if let Some(event) = self.events.next() {
147 return Poll::Ready(Some(Ok(event)));
148 }
149 match Pin::new(&mut self.chunks).poll_next(cx) {
150 Poll::Ready(Some(Ok(chunk))) => {
151 self.events = self.source.parse(&chunk).into_iter();
152 }
153 Poll::Ready(Some(Err(e))) => {
154 return Poll::Ready(Some(Err(ClientPollError::NextChunk(e))));
155 }
156 Poll::Ready(None) => {
157 return Poll::Ready(None);
158 }
159 Poll::Pending => {
160 return Poll::Pending;
161 }
162 }
163 }
164 }
165}
166
167#[derive(Debug, Error)]
168pub enum ClientPollError {
169 #[error("error downloading next chunk")]
170 NextChunk(#[source] anyhow::Error),
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176 use assert_matches::assert_matches;
177 use fuchsia_async as fasync;
178 use fuchsia_async::net::TcpListener;
179 use fuchsia_hyper::new_https_client;
180 use futures::future::{Future, TryFutureExt as _};
181 use hyper::Response;
182 use hyper::server::Server;
183 use hyper::server::accept::from_stream;
184 use hyper::service::{make_service_fn, service_fn};
185 use std::convert::Infallible;
186 use std::net::{Ipv4Addr, SocketAddr};
187 use test_case::test_case;
188
189 fn spawn_server<F>(handle_req: fn(Request<Body>) -> F) -> String
190 where
191 F: Future<Output = Result<Response<Body>, hyper::Error>> + Send + 'static,
192 {
193 let (connections, url) = {
194 let listener =
195 TcpListener::bind(&SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0)).unwrap();
196 let local_addr = listener.local_addr().unwrap();
197 (
198 listener
199 .accept_stream()
200 .map_ok(|(conn, _addr)| fuchsia_hyper::TcpStream { stream: conn }),
201 format!("http://{}", local_addr),
202 )
203 };
204 let server = Server::builder(from_stream(connections))
205 .executor(fuchsia_hyper::Executor)
206 .serve(make_service_fn(move |_socket: &fuchsia_hyper::TcpStream| async move {
207 Ok::<_, Infallible>(service_fn(handle_req))
208 }))
209 .unwrap_or_else(|e| panic!("mock sse server failed: {:?}", e));
210 fasync::Task::spawn(server).detach();
211 url
212 }
213
214 fn make_event() -> Event {
215 Event::from_type_and_data("event_type", "data_contents").unwrap()
216 }
217
218 async fn make_client(byte_source: &str, url: String) -> Client {
219 match byte_source {
220 "hyper" => Client::from_hyper_client(&new_https_client(), url).await.unwrap(),
221 #[cfg(target_os = "fuchsia")]
222 "loader" => Client::from_http_loader(
223 &fuchsia_component::client::connect_to_protocol::<fnet_http::LoaderMarker>()
224 .unwrap(),
225 url,
226 50,
227 )
228 .await
229 .unwrap(),
230 s => panic!("unexpected byte_soure {s}"),
231 }
232 }
233
234 #[test_case("hyper")]
235 #[cfg(target_os = "fuchsia")]
236 #[test_case("loader")]
237 #[fasync::run_singlethreaded(test)]
238 async fn receive_one_event(byte_source: &str) {
239 async fn handle_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
240 Ok(Response::builder()
241 .status(StatusCode::OK)
242 .header("content-type", "text/event-stream")
243 .body(make_event().to_vec().into())
244 .unwrap())
245 }
246 let url = spawn_server(handle_req);
247
248 let client = make_client(byte_source, url).await;
249 let events: Result<Vec<_>, _> = client.collect::<Vec<_>>().await.into_iter().collect();
250
251 assert_eq!(events.unwrap(), vec![make_event()]);
252 }
253
254 #[test_case("hyper")]
255 #[cfg(target_os = "fuchsia")]
256 #[test_case("loader")]
257 #[fasync::run_singlethreaded(test)]
258 async fn client_sends_correct_http_headers(byte_source: &str) {
259 async fn handle_req(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
260 assert_eq!(req.method(), &hyper::Method::GET);
261 assert_eq!(
262 req.headers().get("accept").map(|h| h.as_bytes()),
263 Some(&b"text/event-stream"[..])
264 );
265 Ok(Response::builder()
266 .status(StatusCode::OK)
267 .header("content-type", "text/event-stream")
268 .body(make_event().to_vec().into())
269 .unwrap())
270 }
271 let url = spawn_server(handle_req);
272
273 let client = make_client(byte_source, url).await;
274 client.collect::<Vec<_>>().await;
275 }
276
277 #[fasync::run_singlethreaded(test)]
278 async fn error_create_request() {
279 assert_matches!(
280 Client::from_hyper_client(&new_https_client(), "\n").await,
281 Err(FromHyperClientError::CreateRequest(_))
282 );
283 }
284
285 #[fasync::run_singlethreaded(test)]
286 async fn error_make_request() {
287 assert_matches!(
288 Client::from_hyper_client(&new_https_client(), "bad_url2").await,
289 Err(FromHyperClientError::MakeRequest(_))
290 );
291 }
292
293 #[fasync::run_singlethreaded(test)]
294 async fn hyper_error_http_status() {
295 async fn handle_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
296 Ok(Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty()).unwrap())
297 }
298 let url = spawn_server(handle_req);
299
300 assert_matches!(
301 Client::from_hyper_client(&new_https_client(), url).await,
302 Err(FromHyperClientError::HttpStatus(_))
303 );
304 }
305
306 #[fasync::run_singlethreaded(test)]
307 async fn loader_error_http_status() {
308 async fn handle_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
309 Ok(Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty()).unwrap())
310 }
311 let url = spawn_server(handle_req);
312
313 assert_matches!(
314 Client::from_http_loader(
315 &fuchsia_component::client::connect_to_protocol::<fnet_http::LoaderMarker>()
316 .unwrap(),
317 url,
318 50,
319 )
320 .await,
321 Err(FromHttpLoaderError::FetchError { status_code: Some(404), .. })
322 );
323 }
324
325 #[fasync::run_singlethreaded(test)]
329 async fn error_downloading_chunk() {
330 const BODY_SIZE_LARGE_ENOUGH_TO_TRIGGER_DELAYED_STREAMING: usize = 1_000_000;
336
337 async fn handle_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
338 Ok(Response::builder()
339 .status(StatusCode::OK)
340 .header(
341 "content-length",
342 &format!("{}", BODY_SIZE_LARGE_ENOUGH_TO_TRIGGER_DELAYED_STREAMING),
343 )
344 .header("content-type", "text/event-stream")
345 .body(Body::wrap_stream(futures::stream::iter(vec![
346 Ok(vec![b' '; BODY_SIZE_LARGE_ENOUGH_TO_TRIGGER_DELAYED_STREAMING - 1]),
347 Err("error-text".to_string()),
348 ])))
349 .unwrap())
350 }
351 let url = spawn_server(handle_req);
352 let mut client = Client::from_hyper_client(&new_https_client(), url).await.unwrap();
353
354 assert_matches!(client.try_next().await, Err(ClientPollError::NextChunk(_)));
355 }
356
357 #[test]
358 fn test_stream_from_socket() {
359 let mut executor = fasync::TestExecutor::new_with_fake_time();
360 let (snd, rcv) = zx::Socket::create_stream();
361 let mut stream = stream_from_socket(fasync::Socket::from_socket(rcv), 10);
362
363 let mut next = stream.next();
365 assert_matches!(executor.run_until_stalled(&mut next), Poll::Pending);
366
367 assert_eq!(snd.write(b"test msg").unwrap(), 8);
369 assert_matches!(
370 executor.run_until_stalled(&mut next),
371 Poll::Ready(Some(Ok(b))) if (&*b)[..] == *b"test msg".as_slice()
372 );
373 drop(next);
374
375 assert_eq!(snd.write(b"0000000000111").unwrap(), 13);
377 assert_matches!(
378 executor.run_until_stalled(&mut stream.next()),
379 Poll::Ready(Some(Ok(b))) if (&*b)[..] == *b"0000000000".as_slice()
380 );
381 assert_matches!(
382 executor.run_until_stalled(&mut stream.next()),
383 Poll::Ready(Some(Ok(b))) if (&*b)[..] == *b"111".as_slice()
384 );
385
386 drop(snd);
388 assert_matches!(executor.run_until_stalled(&mut stream.next()), Poll::Ready(None));
389 }
390}