1use crate::{Event, EventSource};
6use fuchsia_hyper::HttpsClient;
7use futures::stream::Stream;
8use futures::task::{Context, Poll};
9use hyper::{Body, Request, StatusCode};
10use std::pin::Pin;
11use thiserror::Error;
12
13#[derive(Debug)]
15pub struct Client {
16 source: EventSource,
17 chunks: Body,
18 events: std::vec::IntoIter<Event>,
19}
20
21impl Client {
22 pub async fn connect(
24 https_client: HttpsClient,
25 url: impl AsRef<str>,
26 ) -> Result<Self, ClientConnectError> {
27 let request = Request::get(url.as_ref())
28 .header("accept", "text/event-stream")
29 .body(Body::empty())
30 .map_err(|e| ClientConnectError::CreateRequest(e))?;
31 let response =
32 https_client.request(request).await.map_err(|e| ClientConnectError::MakeRequest(e))?;
33 if response.status() != StatusCode::OK {
34 return Err(ClientConnectError::HttpStatus(response.status()));
35 }
36 Ok(Self {
37 source: EventSource::new(),
38 chunks: response.into_body(),
39 events: vec![].into_iter(),
40 })
41 }
42}
43
44#[derive(Debug, Error)]
45pub enum ClientConnectError {
46 #[error("error creating http request: {}", _0)]
47 CreateRequest(hyper::http::Error),
48
49 #[error("error making http request: {}", _0)]
50 MakeRequest(hyper::Error),
51
52 #[error("http server responded with status other than OK: {}", _0)]
53 HttpStatus(hyper::StatusCode),
54}
55
56impl Stream for Client {
57 type Item = Result<Event, ClientPollError>;
58
59 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
60 loop {
61 if let Some(event) = self.events.next() {
62 return Poll::Ready(Some(Ok(event)));
63 }
64 match Pin::new(&mut self.chunks).poll_next(cx) {
65 Poll::Ready(Some(Ok(chunk))) => {
66 self.events = self.source.parse(&chunk).into_iter();
67 }
68 Poll::Ready(Some(Err(e))) => {
69 return Poll::Ready(Some(Err(ClientPollError::NextChunk(e))))
70 }
71 Poll::Ready(None) => {
72 return Poll::Ready(None);
73 }
74 Poll::Pending => {
75 return Poll::Pending;
76 }
77 }
78 }
79 }
80}
81
82#[derive(Debug, Error)]
83pub enum ClientPollError {
84 #[error("error downloading next chunk: {}", _0)]
85 NextChunk(hyper::Error),
86}
87
88#[cfg(test)]
89mod tests {
90 use super::*;
91 use assert_matches::assert_matches;
92 use fuchsia_async::net::TcpListener;
93 use fuchsia_async::{self as fasync};
94 use fuchsia_hyper::new_https_client;
95 use futures::future::{Future, TryFutureExt};
96 use futures::stream::{StreamExt, TryStreamExt};
97 use hyper::server::accept::from_stream;
98 use hyper::server::Server;
99 use hyper::service::{make_service_fn, service_fn};
100 use hyper::Response;
101 use std::convert::Infallible;
102 use std::net::{Ipv4Addr, SocketAddr};
103
104 fn spawn_server<F>(handle_req: fn(Request<Body>) -> F) -> String
105 where
106 F: Future<Output = Result<Response<Body>, hyper::Error>> + Send + 'static,
107 {
108 let (connections, url) = {
109 let listener =
110 TcpListener::bind(&SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0)).unwrap();
111 let local_addr = listener.local_addr().unwrap();
112 (
113 listener
114 .accept_stream()
115 .map_ok(|(conn, _addr)| fuchsia_hyper::TcpStream { stream: conn }),
116 format!("http://{}", local_addr),
117 )
118 };
119 let server = Server::builder(from_stream(connections))
120 .executor(fuchsia_hyper::Executor)
121 .serve(make_service_fn(move |_socket: &fuchsia_hyper::TcpStream| async move {
122 Ok::<_, Infallible>(service_fn(handle_req))
123 }))
124 .unwrap_or_else(|e| panic!("mock sse server failed: {:?}", e));
125 fasync::Task::spawn(server).detach();
126 url
127 }
128
129 fn make_event() -> Event {
130 Event::from_type_and_data("event_type", "data_contents").unwrap()
131 }
132
133 #[fasync::run_singlethreaded(test)]
134 async fn receive_one_event() {
135 async fn handle_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
136 Ok(Response::builder()
137 .status(StatusCode::OK)
138 .header("content-type", "text/event-stream")
139 .body(make_event().to_vec().into())
140 .unwrap())
141 }
142 let url = spawn_server(handle_req);
143
144 let client = Client::connect(new_https_client(), url).await.unwrap();
145 let events: Result<Vec<_>, _> = client.collect::<Vec<_>>().await.into_iter().collect();
146
147 assert_eq!(events.unwrap(), vec![make_event()]);
148 }
149
150 #[fasync::run_singlethreaded(test)]
151 async fn client_sends_correct_http_headers() {
152 async fn handle_req(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
153 assert_eq!(req.method(), &hyper::Method::GET);
154 assert_eq!(
155 req.headers().get("accept").map(|h| h.as_bytes()),
156 Some(&b"text/event-stream"[..])
157 );
158 Ok(Response::builder()
159 .status(StatusCode::OK)
160 .header("content-type", "text/event-stream")
161 .body(make_event().to_vec().into())
162 .unwrap())
163 }
164 let url = spawn_server(handle_req);
165
166 let client = Client::connect(new_https_client(), url).await.unwrap();
167 client.collect::<Vec<_>>().await;
168 }
169
170 #[fasync::run_singlethreaded(test)]
171 async fn error_create_request() {
172 assert_matches!(
173 Client::connect(new_https_client(), "\n").await,
174 Err(ClientConnectError::CreateRequest(_))
175 );
176 }
177
178 #[fasync::run_singlethreaded(test)]
179 async fn error_make_request() {
180 assert_matches!(
181 Client::connect(new_https_client(), "bad_url").await,
182 Err(ClientConnectError::MakeRequest(_))
183 );
184 }
185
186 #[fasync::run_singlethreaded(test)]
187 async fn error_http_status() {
188 async fn handle_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
189 Ok(Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty()).unwrap())
190 }
191 let url = spawn_server(handle_req);
192
193 assert_matches!(
194 Client::connect(new_https_client(), url).await,
195 Err(ClientConnectError::HttpStatus(_))
196 );
197 }
198
199 #[fasync::run_singlethreaded(test)]
200 async fn error_downloading_chunk() {
201 const BODY_SIZE_LARGE_ENOUGH_TO_TRIGGER_DELAYED_STREAMING: usize = 1_000_000;
207
208 async fn handle_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
209 Ok(Response::builder()
210 .status(StatusCode::OK)
211 .header(
212 "content-length",
213 &format!("{}", BODY_SIZE_LARGE_ENOUGH_TO_TRIGGER_DELAYED_STREAMING),
214 )
215 .header("content-type", "text/event-stream")
216 .body(Body::wrap_stream(futures::stream::iter(vec![
217 Ok(vec![b' '; BODY_SIZE_LARGE_ENOUGH_TO_TRIGGER_DELAYED_STREAMING - 1]),
218 Err("error-text".to_string()),
219 ])))
220 .unwrap())
221 }
222 let url = spawn_server(handle_req);
223 let mut client = Client::connect(new_https_client(), url).await.unwrap();
224
225 assert_matches!(client.try_next().await, Err(ClientPollError::NextChunk(_)));
226 }
227}