Skip to main content

reachability_core/
fetch.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{Context, format_err};
6use async_trait::async_trait;
7use fuchsia_async::TimeoutExt;
8use fuchsia_async::net::TcpStream;
9
10use futures::{AsyncReadExt, AsyncWriteExt, TryFutureExt};
11use log::log;
12use std::net;
13
14const FETCH_TIMEOUT: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(10);
15
16fn http_request(path: &str, host: &str) -> String {
17    [
18        &format!("HEAD {path} HTTP/1.1"),
19        &format!("host: {host}"),
20        "connection: close",
21        "user-agent: fuchsia reachability probe",
22    ]
23    .join("\r\n")
24        + "\r\n\r\n"
25}
26
27async fn fetch<FA: FetchAddr + std::marker::Sync>(
28    interface_name: &str,
29    host: &str,
30    path: &str,
31    addr: &FA,
32) -> anyhow::Result<u16> {
33    let timeout = zx::MonotonicInstant::after(FETCH_TIMEOUT);
34    let addr = addr.as_socket_addr();
35    let socket = socket2::Socket::new(
36        match addr {
37            net::SocketAddr::V4(_) => socket2::Domain::IPV4,
38            net::SocketAddr::V6(_) => socket2::Domain::IPV6,
39        },
40        socket2::Type::STREAM,
41        Some(socket2::Protocol::TCP),
42    )
43    .context("while constructing socket")?;
44    socket.bind_device(Some(interface_name.as_bytes()))?;
45    let mut stream = TcpStream::connect_from_raw(socket, addr)
46        .context("while constructing tcp stream")?
47        .map_err(|e| format_err!("Opening TcpStream connection failed: {e:?}"))
48        .on_timeout(timeout, || Err(format_err!("Opening TcpStream timed out")))
49        .await?;
50    let message = http_request(path, host);
51    stream
52        .write_all(message.as_bytes())
53        .map_err(|e| format_err!("Writing to TcpStream failed: {e:?}"))
54        .on_timeout(timeout, || Err(format_err!("Writing data to TcpStream timed out")))
55        .await?;
56
57    let mut bytes = Vec::new();
58    let _: usize = stream
59        .read_to_end(&mut bytes)
60        .map_err(|e| format_err!("Reading response from TcpStream failed: {e:?}"))
61        .on_timeout(timeout, || Err(format_err!("Reading response from TcpStream timed out")))
62        .await?;
63    let resp = String::from_utf8(bytes)?;
64    let first_line = resp.split("\r\n").next().expect("split always returns at least one item");
65    if let [http, code, ..] = first_line.split(' ').collect::<Vec<_>>().as_slice() {
66        if !http.starts_with("HTTP/") {
67            return Err(format_err!("Response header malformed: {first_line}"));
68        }
69        Ok(code.parse().map_err(|e| format_err!("While parsing status code: {e:?}"))?)
70    } else {
71        Err(format_err!("Response header malformed: {first_line}"))
72    }
73}
74
75pub trait FetchAddr {
76    fn as_socket_addr(&self) -> net::SocketAddr;
77}
78
79impl FetchAddr for net::SocketAddr {
80    fn as_socket_addr(&self) -> net::SocketAddr {
81        *self
82    }
83}
84
85impl FetchAddr for net::IpAddr {
86    fn as_socket_addr(&self) -> net::SocketAddr {
87        net::SocketAddr::from((*self, 80))
88    }
89}
90
91#[async_trait]
92pub trait Fetch {
93    async fn fetch<FA: FetchAddr + std::marker::Sync>(
94        &self,
95        interface_name: &str,
96        host: &str,
97        path: &str,
98        addr: &FA,
99    ) -> Option<u16>;
100}
101
102pub struct Fetcher;
103
104#[async_trait]
105impl Fetch for Fetcher {
106    async fn fetch<FA: FetchAddr + std::marker::Sync>(
107        &self,
108        interface_name: &str,
109        host: &str,
110        path: &str,
111        addr: &FA,
112    ) -> Option<u16> {
113        let r = fetch(interface_name, host, path, addr).await;
114        match r {
115            Ok(code) => Some(code),
116            Err(e) => {
117                // Check to see if the error is due to the host/network being
118                // unreachable. In that case, this error is likely unconcerning
119                // and signifies a network may not have connectivity across
120                // one of the IP protocols, which can be common for home
121                // network configurations.
122                let level = if let Some(io_error) = e.downcast_ref::<std::io::Error>()
123                    && (io_error.raw_os_error() == Some(libc::ENETUNREACH)
124                        || io_error.raw_os_error() == Some(libc::EHOSTUNREACH))
125                {
126                    log::Level::Info
127                } else {
128                    log::Level::Warn
129                };
130
131                let addr = addr.as_socket_addr();
132                log!(
133                    level,
134                    "err fetching {host}{path} from {addr} \
135                     through {interface_name}: {e:#}"
136                );
137
138                None
139            }
140        }
141    }
142}
143
144#[cfg(test)]
145mod test {
146    use super::*;
147
148    use std::net::{Ipv4Addr, SocketAddr};
149    use std::pin::pin;
150
151    use fuchsia_async::net::TcpListener;
152    use fuchsia_async::{self as fasync};
153    use futures::future::Fuse;
154    use futures::io::BufReader;
155    use futures::{AsyncBufReadExt, FutureExt, StreamExt};
156    use test_case::test_case;
157
158    fn server(
159        code: u16,
160    ) -> anyhow::Result<(SocketAddr, Fuse<impl futures::Future<Output = Vec<String>>>)> {
161        let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0);
162        let listener = TcpListener::bind(&addr).context("binding TCP")?;
163        let addr = listener.local_addr()?;
164
165        let server_fut = async move {
166            let timeout = zx::MonotonicInstant::after(FETCH_TIMEOUT);
167            let mut incoming = listener.accept_stream();
168            if let Some(result) = incoming
169                .next()
170                .on_timeout(timeout, || panic!("timeout waiting for connection"))
171                .await
172            {
173                let (stream, _addr) = result.expect("accept incoming TCP connection");
174                let mut stream = BufReader::new(stream);
175                let mut request = Vec::new();
176                loop {
177                    let mut s = String::new();
178                    let _: usize = stream
179                        .read_line(&mut s)
180                        .on_timeout(timeout, || panic!("timeout waiting to read data"))
181                        .await
182                        .expect("read data");
183                    if s == "\r\n" {
184                        break;
185                    }
186                    request.push(s.trim().to_string());
187                }
188                let data = format!("HTTP/1.1 {} OK\r\n\r\n", code);
189                stream
190                    .write_all(data.as_bytes())
191                    .on_timeout(timeout, || panic!("timeout waiting to write response"))
192                    .await
193                    .expect("reply to request");
194                request
195            } else {
196                Vec::new()
197            }
198        }
199        .fuse();
200
201        Ok((addr, server_fut))
202    }
203
204    #[test_case("http://reachability.test/", 200; "base path 200")]
205    #[test_case("http://reachability.test/path/", 200; "sub path 200")]
206    #[test_case("http://reachability.test/", 400; "base path 400")]
207    #[test_case("http://reachability.test/path/", 400; "sub path 400")]
208    #[fasync::run_singlethreaded(test)]
209    async fn test_fetch(url_str: &'static str, code: u16) -> anyhow::Result<()> {
210        let url = url::Url::parse(url_str)?;
211        let (addr, server_fut) = server(code)?;
212        let domain = url.host().expect("no host").to_string();
213        let path = url.path().to_string();
214
215        let mut fetch_fut = pin!(fetch("", &domain, &path, &addr).fuse());
216
217        let mut server_fut = pin!(server_fut);
218
219        let mut request = None;
220        let result = loop {
221            futures::select! {
222                req = server_fut => request = Some(req),
223                result = fetch_fut => break result
224            };
225        };
226
227        assert!(result.is_ok(), "Expected OK, got: {result:?}");
228        assert_eq!(result.ok(), Some(code));
229        let request = request.expect("no request body");
230        assert!(request.contains(&format!("HEAD {path} HTTP/1.1")));
231        assert!(request.contains(&format!("host: {domain}")));
232
233        Ok(())
234    }
235}