Skip to main content

http_sse/
client.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::{Event, EventSource};
6#[cfg(target_os = "fuchsia")]
7use fidl_fuchsia_net_http as fnet_http;
8#[cfg(target_os = "fuchsia")]
9use fuchsia_async as fasync;
10use fuchsia_hyper::HttpsClient;
11#[cfg(target_os = "fuchsia")]
12use futures::io::AsyncReadExt as _;
13use futures::stream::{Stream, StreamExt as _, TryStreamExt as _};
14use futures::task::{Context, Poll};
15use hyper::{Body, Request, StatusCode};
16use std::pin::Pin;
17use thiserror::Error;
18
19/// An http SSE client.
20#[derive(derivative::Derivative)]
21#[derivative(Debug)]
22pub struct Client {
23    #[derivative(Debug = "ignore")]
24    chunks: futures::stream::BoxStream<'static, Result<hyper::body::Bytes, anyhow::Error>>,
25    source: EventSource,
26    events: std::vec::IntoIter<Event>,
27}
28
29impl Client {
30    /// Connects to an http url and, on success, returns a `Stream` of SSE events.
31    pub async fn from_hyper_client(
32        https_client: &HttpsClient,
33        url: impl AsRef<str>,
34    ) -> Result<Self, FromHyperClientError> {
35        let request = Request::get(url.as_ref())
36            .header("accept", "text/event-stream")
37            .body(Body::empty())
38            .map_err(|e| FromHyperClientError::CreateRequest(e))?;
39        let response = https_client
40            .request(request)
41            .await
42            .map_err(|e| FromHyperClientError::MakeRequest(e))?;
43        if response.status() != StatusCode::OK {
44            return Err(FromHyperClientError::HttpStatus(response.status()));
45        }
46        Ok(Self {
47            chunks: response.into_body().map_err(|e| anyhow::anyhow!(e)).boxed(),
48            source: EventSource::new(),
49            events: vec![].into_iter(),
50        })
51    }
52
53    /// Connects to an http url and, on success, returns a `Stream` of SSE events.
54    ///
55    /// `stream_buf_size` is the size of the buffer to use when reading from the http response body.
56    /// Consider making it a small multiple of the expected event size to optimize for low memory
57    /// usage or a large multiple to optimize for event processing speed.
58    #[cfg(target_os = "fuchsia")]
59    pub async fn from_http_loader(
60        loader: &fnet_http::LoaderProxy,
61        url: String,
62        stream_buf_size: usize,
63    ) -> Result<Self, FromHttpLoaderError> {
64        let resp = loader
65            .fetch(fnet_http::Request {
66                method: None,
67                url: Some(url),
68                headers: Some(vec![fnet_http::Header {
69                    name: b"accept".to_vec(),
70                    value: b"text/event-stream".to_vec(),
71                }]),
72                body: None,
73                deadline: None,
74                ..Default::default()
75            })
76            .await
77            .map_err(FromHttpLoaderError::FetchFidl)?;
78        let socket = match resp {
79            fnet_http::Response {
80                error: None, body: Some(body), status_code: Some(200), ..
81            } => body,
82            fnet_http::Response { error, status_code, status_line, .. } => {
83                return Err(FromHttpLoaderError::FetchError { error, status_code, status_line });
84            }
85        };
86        Ok(Self {
87            chunks: stream_from_socket(fasync::Socket::from_socket(socket), stream_buf_size),
88            source: EventSource::new(),
89            events: vec![].into_iter(),
90        })
91    }
92}
93
94#[cfg(target_os = "fuchsia")]
95fn stream_from_socket(
96    socket: fasync::Socket,
97    stream_buf_size: usize,
98) -> futures::stream::BoxStream<'static, Result<hyper::body::Bytes, anyhow::Error>> {
99    let initial_buf = vec![0; stream_buf_size];
100    futures::stream::try_unfold((socket, initial_buf), |(mut socket, mut buf)| async move {
101        match socket.read(&mut buf).await {
102            Ok(0) => Ok(None),
103            Ok(n) => {
104                let chunk = hyper::body::Bytes::copy_from_slice(&buf[..n]);
105                Ok(Some((chunk, (socket, buf))))
106            }
107            Err(e) => Err(anyhow::anyhow!(e).context("reading from socket")),
108        }
109    })
110    .boxed()
111}
112
113#[derive(Debug, Error)]
114pub enum FromHyperClientError {
115    #[error("error creating http request")]
116    CreateRequest(#[source] hyper::http::Error),
117
118    #[error("error making http request")]
119    MakeRequest(#[source] hyper::Error),
120
121    #[error("http server responded with status other than OK: {0}")]
122    HttpStatus(hyper::StatusCode),
123}
124
125#[cfg(target_os = "fuchsia")]
126#[derive(Debug, Error)]
127pub enum FromHttpLoaderError {
128    #[error("fuchsia.net.http/Loader.Fetch fidl error")]
129    FetchFidl(#[source] fidl::Error),
130
131    #[error(
132        "fuchsia.net.http/Loader.Fetch error: status {status_code:?} {error:?} {status_line:?}"
133    )]
134    FetchError {
135        error: Option<fnet_http::Error>,
136        status_code: Option<u32>,
137        status_line: Option<Vec<u8>>,
138    },
139}
140
141impl Stream for Client {
142    type Item = Result<Event, ClientPollError>;
143
144    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
145        loop {
146            if let Some(event) = self.events.next() {
147                return Poll::Ready(Some(Ok(event)));
148            }
149            match Pin::new(&mut self.chunks).poll_next(cx) {
150                Poll::Ready(Some(Ok(chunk))) => {
151                    self.events = self.source.parse(&chunk).into_iter();
152                }
153                Poll::Ready(Some(Err(e))) => {
154                    return Poll::Ready(Some(Err(ClientPollError::NextChunk(e))));
155                }
156                Poll::Ready(None) => {
157                    return Poll::Ready(None);
158                }
159                Poll::Pending => {
160                    return Poll::Pending;
161                }
162            }
163        }
164    }
165}
166
167#[derive(Debug, Error)]
168pub enum ClientPollError {
169    #[error("error downloading next chunk")]
170    NextChunk(#[source] anyhow::Error),
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176    use assert_matches::assert_matches;
177    use fuchsia_async as fasync;
178    use fuchsia_async::net::TcpListener;
179    use fuchsia_hyper::new_https_client;
180    use futures::future::{Future, TryFutureExt as _};
181    use hyper::Response;
182    use hyper::server::Server;
183    use hyper::server::accept::from_stream;
184    use hyper::service::{make_service_fn, service_fn};
185    use std::convert::Infallible;
186    use std::net::{Ipv4Addr, SocketAddr};
187    use test_case::test_case;
188
189    fn spawn_server<F>(handle_req: fn(Request<Body>) -> F) -> String
190    where
191        F: Future<Output = Result<Response<Body>, hyper::Error>> + Send + 'static,
192    {
193        let (connections, url) = {
194            let listener =
195                TcpListener::bind(&SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0)).unwrap();
196            let local_addr = listener.local_addr().unwrap();
197            (
198                listener
199                    .accept_stream()
200                    .map_ok(|(conn, _addr)| fuchsia_hyper::TcpStream { stream: conn }),
201                format!("http://{}", local_addr),
202            )
203        };
204        let server = Server::builder(from_stream(connections))
205            .executor(fuchsia_hyper::Executor)
206            .serve(make_service_fn(move |_socket: &fuchsia_hyper::TcpStream| async move {
207                Ok::<_, Infallible>(service_fn(handle_req))
208            }))
209            .unwrap_or_else(|e| panic!("mock sse server failed: {:?}", e));
210        fasync::Task::spawn(server).detach();
211        url
212    }
213
214    fn make_event() -> Event {
215        Event::from_type_and_data("event_type", "data_contents").unwrap()
216    }
217
218    async fn make_client(byte_source: &str, url: String) -> Client {
219        match byte_source {
220            "hyper" => Client::from_hyper_client(&new_https_client(), url).await.unwrap(),
221            #[cfg(target_os = "fuchsia")]
222            "loader" => Client::from_http_loader(
223                &fuchsia_component::client::connect_to_protocol::<fnet_http::LoaderMarker>()
224                    .unwrap(),
225                url,
226                50,
227            )
228            .await
229            .unwrap(),
230            s => panic!("unexpected byte_soure {s}"),
231        }
232    }
233
234    #[test_case("hyper")]
235    #[cfg(target_os = "fuchsia")]
236    #[test_case("loader")]
237    #[fasync::run_singlethreaded(test)]
238    async fn receive_one_event(byte_source: &str) {
239        async fn handle_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
240            Ok(Response::builder()
241                .status(StatusCode::OK)
242                .header("content-type", "text/event-stream")
243                .body(make_event().to_vec().into())
244                .unwrap())
245        }
246        let url = spawn_server(handle_req);
247
248        let client = make_client(byte_source, url).await;
249        let events: Result<Vec<_>, _> = client.collect::<Vec<_>>().await.into_iter().collect();
250
251        assert_eq!(events.unwrap(), vec![make_event()]);
252    }
253
254    #[test_case("hyper")]
255    #[cfg(target_os = "fuchsia")]
256    #[test_case("loader")]
257    #[fasync::run_singlethreaded(test)]
258    async fn client_sends_correct_http_headers(byte_source: &str) {
259        async fn handle_req(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
260            assert_eq!(req.method(), &hyper::Method::GET);
261            assert_eq!(
262                req.headers().get("accept").map(|h| h.as_bytes()),
263                Some(&b"text/event-stream"[..])
264            );
265            Ok(Response::builder()
266                .status(StatusCode::OK)
267                .header("content-type", "text/event-stream")
268                .body(make_event().to_vec().into())
269                .unwrap())
270        }
271        let url = spawn_server(handle_req);
272
273        let client = make_client(byte_source, url).await;
274        client.collect::<Vec<_>>().await;
275    }
276
277    #[fasync::run_singlethreaded(test)]
278    async fn error_create_request() {
279        assert_matches!(
280            Client::from_hyper_client(&new_https_client(), "\n").await,
281            Err(FromHyperClientError::CreateRequest(_))
282        );
283    }
284
285    #[fasync::run_singlethreaded(test)]
286    async fn error_make_request() {
287        assert_matches!(
288            Client::from_hyper_client(&new_https_client(), "bad_url2").await,
289            Err(FromHyperClientError::MakeRequest(_))
290        );
291    }
292
293    #[fasync::run_singlethreaded(test)]
294    async fn hyper_error_http_status() {
295        async fn handle_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
296            Ok(Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty()).unwrap())
297        }
298        let url = spawn_server(handle_req);
299
300        assert_matches!(
301            Client::from_hyper_client(&new_https_client(), url).await,
302            Err(FromHyperClientError::HttpStatus(_))
303        );
304    }
305
306    #[fasync::run_singlethreaded(test)]
307    async fn loader_error_http_status() {
308        async fn handle_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
309            Ok(Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty()).unwrap())
310        }
311        let url = spawn_server(handle_req);
312
313        assert_matches!(
314            Client::from_http_loader(
315                &fuchsia_component::client::connect_to_protocol::<fnet_http::LoaderMarker>()
316                    .unwrap(),
317                url,
318                50,
319            )
320            .await,
321            Err(FromHttpLoaderError::FetchError { status_code: Some(404), .. })
322        );
323    }
324
325    // The fuchsia.net.http.Response contains the http response body as a zx::Socket of bytes,
326    // so it doesn't have stream reading errors unless there is an error reading from the actual
327    // socket.
328    #[fasync::run_singlethreaded(test)]
329    async fn error_downloading_chunk() {
330        // If the body of an http response is not large enough, hyper will download the body
331        // along with the header in the initial fuchsia_hyper::HttpsClient.request(). This means
332        // that even if the body is implemented with a stream that fails before the transfer is
333        // complete, the failure will occur during the initial request, before awaiting on the
334        // body chunk stream.
335        const BODY_SIZE_LARGE_ENOUGH_TO_TRIGGER_DELAYED_STREAMING: usize = 1_000_000;
336
337        async fn handle_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
338            Ok(Response::builder()
339                .status(StatusCode::OK)
340                .header(
341                    "content-length",
342                    &format!("{}", BODY_SIZE_LARGE_ENOUGH_TO_TRIGGER_DELAYED_STREAMING),
343                )
344                .header("content-type", "text/event-stream")
345                .body(Body::wrap_stream(futures::stream::iter(vec![
346                    Ok(vec![b' '; BODY_SIZE_LARGE_ENOUGH_TO_TRIGGER_DELAYED_STREAMING - 1]),
347                    Err("error-text".to_string()),
348                ])))
349                .unwrap())
350        }
351        let url = spawn_server(handle_req);
352        let mut client = Client::from_hyper_client(&new_https_client(), url).await.unwrap();
353
354        assert_matches!(client.try_next().await, Err(ClientPollError::NextChunk(_)));
355    }
356
357    #[test]
358    fn test_stream_from_socket() {
359        let mut executor = fasync::TestExecutor::new_with_fake_time();
360        let (snd, rcv) = zx::Socket::create_stream();
361        let mut stream = stream_from_socket(fasync::Socket::from_socket(rcv), 10);
362
363        // Empty socket is pending.
364        let mut next = stream.next();
365        assert_matches!(executor.run_until_stalled(&mut next), Poll::Pending);
366
367        // Non-empty socket yields correctly truncated Bytes.
368        assert_eq!(snd.write(b"test msg").unwrap(), 8);
369        assert_matches!(
370            executor.run_until_stalled(&mut next),
371            Poll::Ready(Some(Ok(b))) if (&*b)[..] == *b"test msg".as_slice()
372        );
373        drop(next);
374
375        // Write larger than buf split across multiple yields.
376        assert_eq!(snd.write(b"0000000000111").unwrap(), 13);
377        assert_matches!(
378            executor.run_until_stalled(&mut stream.next()),
379            Poll::Ready(Some(Ok(b))) if (&*b)[..] == *b"0000000000".as_slice()
380        );
381        assert_matches!(
382            executor.run_until_stalled(&mut stream.next()),
383            Poll::Ready(Some(Ok(b))) if (&*b)[..] == *b"111".as_slice()
384        );
385
386        // Closed socket is finished.
387        drop(snd);
388        assert_matches!(executor.run_until_stalled(&mut stream.next()), Poll::Ready(None));
389    }
390}