socket_to_stdio/
lib.rs
1use anyhow::Context as _;
6use fidl_fuchsia_io as fio;
7use futures::future::Either;
8use futures::stream::StreamExt as _;
9use futures::{AsyncReadExt as _, AsyncWriteExt as _};
10use std::io::StdoutLock;
11use termion::raw::IntoRawMode as _;
12
13pub enum Stdout<'a> {
16 Raw(termion::raw::RawTerminal<StdoutLock<'a>>),
19 Buffered,
21}
22
23impl std::io::Write for Stdout<'_> {
24 fn flush(&mut self) -> Result<(), std::io::Error> {
25 match self {
26 Self::Raw(r) => r.flush(),
27 Self::Buffered => std::io::stdout().flush(),
28 }
29 }
30 fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
31 match self {
32 Self::Raw(r) => r.write(buf),
33 Self::Buffered => std::io::stdout().write(buf),
34 }
35 }
36}
37
38impl Stdout<'_> {
39 pub fn raw() -> anyhow::Result<Self> {
40 let stdout = std::io::stdout();
41
42 if !termion::is_tty(&stdout) {
43 anyhow::bail!("interactive mode does not support piping");
44 }
45
46 let term_out =
49 stdout.lock().into_raw_mode().context("could not set raw mode on terminal")?;
50
51 Ok(Self::Raw(term_out))
52 }
53
54 pub fn buffered() -> Self {
55 Self::Buffered
56 }
57}
58
59pub async fn connect_socket_to_stdio(
64 socket: fidl::Socket,
65 stdout: Stdout<'_>,
66) -> anyhow::Result<()> {
67 #[allow(clippy::large_futures)]
68 connect_socket_to_stdio_impl(socket, || std::io::stdin().lock(), stdout)?.await
69}
70
71fn connect_socket_to_stdio_impl<R>(
72 socket: fidl::Socket,
73 stdin: impl FnOnce() -> R + Send + 'static,
74 mut stdout: impl std::io::Write,
75) -> anyhow::Result<impl futures::Future<Output = anyhow::Result<()>>>
76where
77 R: std::io::Read,
78{
79 let (stdin_send, mut stdin_recv) = futures::channel::mpsc::unbounded();
81 let _: std::thread::JoinHandle<_> = std::thread::Builder::new()
82 .name("connect_socket_to_stdio stdin thread".into())
83 .spawn(move || {
84 let mut stdin = stdin();
85 let mut buf = [0u8; fio::MAX_BUF as usize];
86 loop {
87 let bytes_read = stdin.read(&mut buf)?;
88 if bytes_read == 0 {
89 return Ok::<(), anyhow::Error>(());
90 }
91 let () = stdin_send.unbounded_send(buf[..bytes_read].to_vec())?;
92 }
93 })
94 .context("spawning stdin thread")?;
95
96 let (mut socket_in, mut socket_out) = fuchsia_async::Socket::from_socket(socket).split();
97
98 let stdin_to_socket = async move {
99 while let Some(stdin) = stdin_recv.next().await {
100 socket_out.write_all(&stdin).await.context("writing to socket")?;
101 socket_out.flush().await.context("flushing socket")?;
102 }
103 Ok::<(), anyhow::Error>(())
104 };
105
106 let socket_to_stdout = async move {
107 loop {
108 let mut buf = [0u8; fio::MAX_BUF as usize];
109 let bytes_read = socket_in.read(&mut buf).await.context("reading from socket")?;
110 if bytes_read == 0 {
111 break;
112 }
113 stdout.write_all(&buf[..bytes_read]).context("writing to stdout")?;
114 stdout.flush().context("flushing stdout")?;
115 }
116 Ok::<(), anyhow::Error>(())
117 };
118
119 Ok(async move {
120 futures::pin_mut!(stdin_to_socket);
121 futures::pin_mut!(socket_to_stdout);
122 Ok(match futures::future::select(stdin_to_socket, socket_to_stdout).await {
123 Either::Left((stdin_to_socket, socket_to_stdout)) => {
124 let () = stdin_to_socket?;
125 let () = socket_to_stdout.await?;
129 }
130 Either::Right((socket_to_stdout, _)) => {
131 let () = socket_to_stdout?;
132 }
135 })
136 })
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142
143 #[fuchsia::test]
144 async fn stdin_to_socket() {
145 let (socket, socket_remote) = fidl::Socket::create_stream();
146
147 let connect_fut =
148 connect_socket_to_stdio_impl(socket_remote, || &b"test input"[..], vec![]).unwrap();
149
150 let (connect_res, bytes_from_socket) = futures::join!(connect_fut, async move {
151 let mut socket = fuchsia_async::Socket::from_socket(socket);
152 let mut out = vec![0u8; 100];
153 let bytes_read = socket.read(&mut out).await.unwrap();
154 drop(socket);
155 out.resize(bytes_read, 0);
156 out
157 });
158 let () = connect_res.unwrap();
159
160 assert_eq!(bytes_from_socket, &b"test input"[..]);
161 }
162
163 #[fuchsia::test]
164 async fn socket_to_stdout() {
165 let (socket, socket_remote) = fidl::Socket::create_stream();
166 assert_eq!(socket.write(&b"test input"[..]).unwrap(), 10);
167 drop(socket);
168 let mut stdout = vec![];
169 let (unblocker, block_until) = std::sync::mpsc::channel();
170
171 #[allow(clippy::large_futures)]
172 let () = connect_socket_to_stdio_impl(
173 socket_remote,
174 move || {
175 let () = block_until.recv().unwrap();
176 &[][..]
177 },
178 &mut stdout,
179 )
180 .unwrap()
181 .await
182 .unwrap();
183
184 unblocker.send(()).unwrap();
186
187 assert_eq!(&stdout[..], &b"test input"[..]);
188 }
189}