socket_to_stdio/
lib.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::Context as _;
6use fidl_fuchsia_io as fio;
7use futures::future::Either;
8use futures::stream::StreamExt as _;
9use futures::{AsyncReadExt as _, AsyncWriteExt as _};
10use std::io::StdoutLock;
11use termion::raw::IntoRawMode as _;
12
13/// Abstracts stdout for `connect_socket_to_stdio`. Allows callers to determine if stdout should be
14/// exclusively owned for the duration of the call.
15pub enum Stdout<'a> {
16    /// Exclusive ownership of stdout (nothing else can write to stdout while this exists),
17    /// put into raw mode.
18    Raw(termion::raw::RawTerminal<StdoutLock<'a>>),
19    /// Shared ownership of stdout (output may be interleaved with output from other sources).
20    Buffered,
21}
22
23impl std::io::Write for Stdout<'_> {
24    fn flush(&mut self) -> Result<(), std::io::Error> {
25        match self {
26            Self::Raw(r) => r.flush(),
27            Self::Buffered => std::io::stdout().flush(),
28        }
29    }
30    fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
31        match self {
32            Self::Raw(r) => r.write(buf),
33            Self::Buffered => std::io::stdout().write(buf),
34        }
35    }
36}
37
38impl Stdout<'_> {
39    pub fn raw() -> anyhow::Result<Self> {
40        let stdout = std::io::stdout();
41
42        if !termion::is_tty(&stdout) {
43            anyhow::bail!("interactive mode does not support piping");
44        }
45
46        // Put the host terminal into raw mode, so input characters are not echoed, streams are not
47        // buffered and newlines are not changed.
48        let term_out =
49            stdout.lock().into_raw_mode().context("could not set raw mode on terminal")?;
50
51        Ok(Self::Raw(term_out))
52    }
53
54    pub fn buffered() -> Self {
55        Self::Buffered
56    }
57}
58
59/// Concurrently:
60///   1. locks stdin and copies the input to `socket`
61///   2. reads data from `socket` and writes it to `stdout`
62/// Finishes when the remote end of the socket closes (when (2) completes).
63pub async fn connect_socket_to_stdio(
64    socket: fidl::Socket,
65    stdout: Stdout<'_>,
66) -> anyhow::Result<()> {
67    #[allow(clippy::large_futures)]
68    connect_socket_to_stdio_impl(socket, || std::io::stdin().lock(), stdout)?.await
69}
70
71fn connect_socket_to_stdio_impl<R>(
72    socket: fidl::Socket,
73    stdin: impl FnOnce() -> R + Send + 'static,
74    mut stdout: impl std::io::Write,
75) -> anyhow::Result<impl futures::Future<Output = anyhow::Result<()>>>
76where
77    R: std::io::Read,
78{
79    // Use a separate thread to read from stdin without blocking the executor.
80    let (stdin_send, mut stdin_recv) = futures::channel::mpsc::unbounded();
81    let _: std::thread::JoinHandle<_> = std::thread::Builder::new()
82        .name("connect_socket_to_stdio stdin thread".into())
83        .spawn(move || {
84            let mut stdin = stdin();
85            let mut buf = [0u8; fio::MAX_BUF as usize];
86            loop {
87                let bytes_read = stdin.read(&mut buf)?;
88                if bytes_read == 0 {
89                    return Ok::<(), anyhow::Error>(());
90                }
91                let () = stdin_send.unbounded_send(buf[..bytes_read].to_vec())?;
92            }
93        })
94        .context("spawning stdin thread")?;
95
96    let (mut socket_in, mut socket_out) = fuchsia_async::Socket::from_socket(socket).split();
97
98    let stdin_to_socket = async move {
99        while let Some(stdin) = stdin_recv.next().await {
100            socket_out.write_all(&stdin).await.context("writing to socket")?;
101            socket_out.flush().await.context("flushing socket")?;
102        }
103        Ok::<(), anyhow::Error>(())
104    };
105
106    let socket_to_stdout = async move {
107        loop {
108            let mut buf = [0u8; fio::MAX_BUF as usize];
109            let bytes_read = socket_in.read(&mut buf).await.context("reading from socket")?;
110            if bytes_read == 0 {
111                break;
112            }
113            stdout.write_all(&buf[..bytes_read]).context("writing to stdout")?;
114            stdout.flush().context("flushing stdout")?;
115        }
116        Ok::<(), anyhow::Error>(())
117    };
118
119    Ok(async move {
120        futures::pin_mut!(stdin_to_socket);
121        futures::pin_mut!(socket_to_stdout);
122        Ok(match futures::future::select(stdin_to_socket, socket_to_stdout).await {
123            Either::Left((stdin_to_socket, socket_to_stdout)) => {
124                let () = stdin_to_socket?;
125                // Wait for output even after stdin closes. The remote may be responding to the
126                // final input, or the remote may not be reading from stdin at all (consider
127                // "bash -c $CMD").
128                let () = socket_to_stdout.await?;
129            }
130            Either::Right((socket_to_stdout, _)) => {
131                let () = socket_to_stdout?;
132                // No reason to wait for stdin because the socket is closed so writing stdin to it
133                // would fail.
134            }
135        })
136    })
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142
143    #[fuchsia::test]
144    async fn stdin_to_socket() {
145        let (socket, socket_remote) = fidl::Socket::create_stream();
146
147        let connect_fut =
148            connect_socket_to_stdio_impl(socket_remote, || &b"test input"[..], vec![]).unwrap();
149
150        let (connect_res, bytes_from_socket) = futures::join!(connect_fut, async move {
151            let mut socket = fuchsia_async::Socket::from_socket(socket);
152            let mut out = vec![0u8; 100];
153            let bytes_read = socket.read(&mut out).await.unwrap();
154            drop(socket);
155            out.resize(bytes_read, 0);
156            out
157        });
158        let () = connect_res.unwrap();
159
160        assert_eq!(bytes_from_socket, &b"test input"[..]);
161    }
162
163    #[fuchsia::test]
164    async fn socket_to_stdout() {
165        let (socket, socket_remote) = fidl::Socket::create_stream();
166        assert_eq!(socket.write(&b"test input"[..]).unwrap(), 10);
167        drop(socket);
168        let mut stdout = vec![];
169        let (unblocker, block_until) = std::sync::mpsc::channel();
170
171        #[allow(clippy::large_futures)]
172        let () = connect_socket_to_stdio_impl(
173            socket_remote,
174            move || {
175                let () = block_until.recv().unwrap();
176                &[][..]
177            },
178            &mut stdout,
179        )
180        .unwrap()
181        .await
182        .unwrap();
183
184        // let the stdin_to_socket thread finish before test cleanup
185        unblocker.send(()).unwrap();
186
187        assert_eq!(&stdout[..], &b"test input"[..]);
188    }
189}