http_sse/
client.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::{Event, EventSource};
6use fuchsia_hyper::HttpsClient;
7use futures::stream::Stream;
8use futures::task::{Context, Poll};
9use hyper::{Body, Request, StatusCode};
10use std::pin::Pin;
11use thiserror::Error;
12
13/// An http SSE client.
14#[derive(Debug)]
15pub struct Client {
16    source: EventSource,
17    chunks: Body,
18    events: std::vec::IntoIter<Event>,
19}
20
21impl Client {
22    /// Connects to an http url and, on success, returns a `Stream` of SSE events.
23    pub async fn connect(
24        https_client: HttpsClient,
25        url: impl AsRef<str>,
26    ) -> Result<Self, ClientConnectError> {
27        let request = Request::get(url.as_ref())
28            .header("accept", "text/event-stream")
29            .body(Body::empty())
30            .map_err(|e| ClientConnectError::CreateRequest(e))?;
31        let response =
32            https_client.request(request).await.map_err(|e| ClientConnectError::MakeRequest(e))?;
33        if response.status() != StatusCode::OK {
34            return Err(ClientConnectError::HttpStatus(response.status()));
35        }
36        Ok(Self {
37            source: EventSource::new(),
38            chunks: response.into_body(),
39            events: vec![].into_iter(),
40        })
41    }
42}
43
44#[derive(Debug, Error)]
45pub enum ClientConnectError {
46    #[error("error creating http request: {}", _0)]
47    CreateRequest(hyper::http::Error),
48
49    #[error("error making http request: {}", _0)]
50    MakeRequest(hyper::Error),
51
52    #[error("http server responded with status other than OK: {}", _0)]
53    HttpStatus(hyper::StatusCode),
54}
55
56impl Stream for Client {
57    type Item = Result<Event, ClientPollError>;
58
59    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
60        loop {
61            if let Some(event) = self.events.next() {
62                return Poll::Ready(Some(Ok(event)));
63            }
64            match Pin::new(&mut self.chunks).poll_next(cx) {
65                Poll::Ready(Some(Ok(chunk))) => {
66                    self.events = self.source.parse(&chunk).into_iter();
67                }
68                Poll::Ready(Some(Err(e))) => {
69                    return Poll::Ready(Some(Err(ClientPollError::NextChunk(e))))
70                }
71                Poll::Ready(None) => {
72                    return Poll::Ready(None);
73                }
74                Poll::Pending => {
75                    return Poll::Pending;
76                }
77            }
78        }
79    }
80}
81
82#[derive(Debug, Error)]
83pub enum ClientPollError {
84    #[error("error downloading next chunk: {}", _0)]
85    NextChunk(hyper::Error),
86}
87
88#[cfg(test)]
89mod tests {
90    use super::*;
91    use assert_matches::assert_matches;
92    use fuchsia_async::net::TcpListener;
93    use fuchsia_async::{self as fasync};
94    use fuchsia_hyper::new_https_client;
95    use futures::future::{Future, TryFutureExt};
96    use futures::stream::{StreamExt, TryStreamExt};
97    use hyper::server::accept::from_stream;
98    use hyper::server::Server;
99    use hyper::service::{make_service_fn, service_fn};
100    use hyper::Response;
101    use std::convert::Infallible;
102    use std::net::{Ipv4Addr, SocketAddr};
103
104    fn spawn_server<F>(handle_req: fn(Request<Body>) -> F) -> String
105    where
106        F: Future<Output = Result<Response<Body>, hyper::Error>> + Send + 'static,
107    {
108        let (connections, url) = {
109            let listener =
110                TcpListener::bind(&SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0)).unwrap();
111            let local_addr = listener.local_addr().unwrap();
112            (
113                listener
114                    .accept_stream()
115                    .map_ok(|(conn, _addr)| fuchsia_hyper::TcpStream { stream: conn }),
116                format!("http://{}", local_addr),
117            )
118        };
119        let server = Server::builder(from_stream(connections))
120            .executor(fuchsia_hyper::Executor)
121            .serve(make_service_fn(move |_socket: &fuchsia_hyper::TcpStream| async move {
122                Ok::<_, Infallible>(service_fn(handle_req))
123            }))
124            .unwrap_or_else(|e| panic!("mock sse server failed: {:?}", e));
125        fasync::Task::spawn(server).detach();
126        url
127    }
128
129    fn make_event() -> Event {
130        Event::from_type_and_data("event_type", "data_contents").unwrap()
131    }
132
133    #[fasync::run_singlethreaded(test)]
134    async fn receive_one_event() {
135        async fn handle_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
136            Ok(Response::builder()
137                .status(StatusCode::OK)
138                .header("content-type", "text/event-stream")
139                .body(make_event().to_vec().into())
140                .unwrap())
141        }
142        let url = spawn_server(handle_req);
143
144        let client = Client::connect(new_https_client(), url).await.unwrap();
145        let events: Result<Vec<_>, _> = client.collect::<Vec<_>>().await.into_iter().collect();
146
147        assert_eq!(events.unwrap(), vec![make_event()]);
148    }
149
150    #[fasync::run_singlethreaded(test)]
151    async fn client_sends_correct_http_headers() {
152        async fn handle_req(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
153            assert_eq!(req.method(), &hyper::Method::GET);
154            assert_eq!(
155                req.headers().get("accept").map(|h| h.as_bytes()),
156                Some(&b"text/event-stream"[..])
157            );
158            Ok(Response::builder()
159                .status(StatusCode::OK)
160                .header("content-type", "text/event-stream")
161                .body(make_event().to_vec().into())
162                .unwrap())
163        }
164        let url = spawn_server(handle_req);
165
166        let client = Client::connect(new_https_client(), url).await.unwrap();
167        client.collect::<Vec<_>>().await;
168    }
169
170    #[fasync::run_singlethreaded(test)]
171    async fn error_create_request() {
172        assert_matches!(
173            Client::connect(new_https_client(), "\n").await,
174            Err(ClientConnectError::CreateRequest(_))
175        );
176    }
177
178    #[fasync::run_singlethreaded(test)]
179    async fn error_make_request() {
180        assert_matches!(
181            Client::connect(new_https_client(), "bad_url").await,
182            Err(ClientConnectError::MakeRequest(_))
183        );
184    }
185
186    #[fasync::run_singlethreaded(test)]
187    async fn error_http_status() {
188        async fn handle_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
189            Ok(Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty()).unwrap())
190        }
191        let url = spawn_server(handle_req);
192
193        assert_matches!(
194            Client::connect(new_https_client(), url).await,
195            Err(ClientConnectError::HttpStatus(_))
196        );
197    }
198
199    #[fasync::run_singlethreaded(test)]
200    async fn error_downloading_chunk() {
201        // If the body of an http response is not large enough, hyper will download the body
202        // along with the header in the initial fuchsia_hyper::HttpsClient.request(). This means
203        // that even if the body is implemented with a stream that fails before the transfer is
204        // complete, the failure will occur during the initial request, before awaiting on the
205        // body chunk stream.
206        const BODY_SIZE_LARGE_ENOUGH_TO_TRIGGER_DELAYED_STREAMING: usize = 1_000_000;
207
208        async fn handle_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
209            Ok(Response::builder()
210                .status(StatusCode::OK)
211                .header(
212                    "content-length",
213                    &format!("{}", BODY_SIZE_LARGE_ENOUGH_TO_TRIGGER_DELAYED_STREAMING),
214                )
215                .header("content-type", "text/event-stream")
216                .body(Body::wrap_stream(futures::stream::iter(vec![
217                    Ok(vec![b' '; BODY_SIZE_LARGE_ENOUGH_TO_TRIGGER_DELAYED_STREAMING - 1]),
218                    Err("error-text".to_string()),
219                ])))
220                .unwrap())
221        }
222        let url = spawn_server(handle_req);
223        let mut client = Client::connect(new_https_client(), url).await.unwrap();
224
225        assert_matches!(client.try_next().await, Err(ClientPollError::NextChunk(_)));
226    }
227}