1use super::config::StreamSink;
6use super::logger::{LogWriter, OutputLevel, SyslogWriter, create_namespace_logger};
7use diagnostics_log::Publisher;
8use fuchsia_runtime::{HandleInfo, HandleType};
9use futures::{Future, StreamExt};
10use log::warn;
11use namespace::Namespace;
12use socket_parsing::{NewlineChunker, NewlineChunkerError};
13use vfs::ExecutionScope;
14use zx::HandleBased;
15use {fidl_fuchsia_process as fproc, fuchsia_async as fasync};
16
17const STDOUT_FD: i32 = 1;
18const STDERR_FD: i32 = 2;
19
20const MAX_MESSAGE_SIZE: usize = 30720;
23
24pub fn bind_streams_to_syslog(
35 ns: &Namespace,
36 stdout_sink: StreamSink,
37 stderr_sink: StreamSink,
38) -> (ExecutionScope, Vec<fproc::HandleInfo>) {
39 let scope = ExecutionScope::new();
40 let mut handles: Vec<fproc::HandleInfo> = Vec::new();
41
42 let mut logger = None;
43 let mut forward_stream = |sink, fd, level| {
44 if matches!(sink, StreamSink::Log) {
45 let (socket, handle_info) =
48 new_socket_bound_to_fd(fd).expect("failed to create socket");
49 handles.push(handle_info);
50
51 if let Some(logger) = logger.get_or_insert_with(|| create_namespace_logger(ns)) {
52 scope.spawn(forward_socket_to_syslog(logger.clone(), socket, level));
53 }
54 }
55 };
56
57 forward_stream(stdout_sink, STDOUT_FD, OutputLevel::Info);
58 forward_stream(stderr_sink, STDERR_FD, OutputLevel::Warn);
59
60 (scope, handles)
61}
62
63fn forward_socket_to_syslog(
64 logger: impl Future<Output = Option<Publisher>> + Send + 'static,
65 socket: fasync::Socket,
66 level: OutputLevel,
67) -> impl Future<Output = ()> + 'static {
68 async move {
69 let Some(logger) = logger.await else { return };
70 let mut writer = SyslogWriter::new(logger, level);
71 if let Err(error) = drain_lines(socket, &mut writer).await {
72 warn!(error:%; "Draining output stream failed");
73 }
74 }
75}
76
77fn new_socket_bound_to_fd(fd: i32) -> Result<(fasync::Socket, fproc::HandleInfo), zx::Status> {
78 let (tx, rx) = zx::Socket::create_stream();
79 let rx = fasync::Socket::from_socket(rx);
80 Ok((
81 rx,
82 fproc::HandleInfo {
83 handle: tx.into_handle(),
84 id: HandleInfo::new(HandleType::FileDescriptor, fd as u16).as_raw(),
85 },
86 ))
87}
88
89async fn drain_lines(
93 socket: fasync::Socket,
94 writer: &mut impl LogWriter,
95) -> Result<(), NewlineChunkerError> {
96 let chunker = NewlineChunker::new(socket, MAX_MESSAGE_SIZE);
97 futures::pin_mut!(chunker);
98
99 while let Some(chunk_or_line) = chunker.next().await {
100 writer.write(&chunk_or_line?).await;
101 }
102
103 Ok(())
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109 use anyhow::{Context, Error, format_err};
110 use fuchsia_async::Task;
111 use futures::channel::mpsc;
112 use futures::{FutureExt, SinkExt, try_join};
113 use rand::distr::{Alphanumeric, SampleString as _};
114 use rand::rng;
115
116 impl LogWriter for mpsc::Sender<String> {
117 async fn write(&mut self, bytes: &[u8]) {
118 let message =
119 std::str::from_utf8(&bytes).expect("Failed to decode bytes to utf8.").to_owned();
120 let () =
121 self.send(message).await.expect("Failed to send message to other end of mpsc.");
122 }
123 }
124
125 #[fuchsia::test]
126 async fn drain_lines_splits_into_max_size_chunks() -> Result<(), Error> {
127 let (tx, rx) = zx::Socket::create_stream();
128 let rx = fasync::Socket::from_socket(rx);
129 let (mut sender, recv) = create_mock_logger();
130 let msg = get_random_string(MAX_MESSAGE_SIZE * 4);
131
132 let () = take_and_write_to_socket(tx, &msg)?;
133 let (actual, ()) =
134 try_join!(recv.collect().map(Result::<Vec<String>, Error>::Ok), async move {
135 drain_lines(rx, &mut sender).await.map_err(Into::into)
136 })?;
137
138 assert_eq!(
139 actual,
140 msg.as_bytes()
141 .chunks(MAX_MESSAGE_SIZE)
142 .map(|bytes| std::str::from_utf8(bytes).expect("Bytes are not utf8.").to_owned())
143 .collect::<Vec<String>>()
144 );
145
146 Ok(())
147 }
148
149 #[fuchsia::test]
150 async fn drain_lines_splits_at_newline() -> Result<(), Error> {
151 let (tx, rx) = zx::Socket::create_stream();
152 let rx = fasync::Socket::from_socket(rx);
153 let (mut sender, recv) = create_mock_logger();
154 let msg =
155 std::iter::repeat_with(|| Alphanumeric.sample_string(&mut rng(), MAX_MESSAGE_SIZE - 1))
156 .take(3)
157 .collect::<Vec<_>>()
158 .join("\n");
159
160 let () = take_and_write_to_socket(tx, &msg)?;
161 let (actual, ()) =
162 try_join!(recv.collect().map(Result::<Vec<String>, Error>::Ok), async move {
163 drain_lines(rx, &mut sender).await.map_err(Into::into)
164 })?;
165
166 assert_eq!(actual, msg.split("\n").map(str::to_owned).collect::<Vec<String>>());
167 Ok(())
168 }
169
170 #[fuchsia::test]
171 async fn drain_lines_writes_when_message_is_received() -> Result<(), Error> {
172 let (tx, rx) = zx::Socket::create_stream();
173 let rx = fasync::Socket::from_socket(rx);
174 let (mut sender, mut recv) = create_mock_logger();
175 let messages: Vec<String> = vec!["Hello!\n".to_owned(), "World!\n".to_owned()];
176
177 let ((), ()) = try_join!(
178 async move { drain_lines(rx, &mut sender).await.map_err(Error::from) },
179 async move {
180 for mut message in messages.into_iter() {
181 let () = write_to_socket(&tx, &message)?;
182 let logged_messaged =
183 recv.next().await.context("Receiver channel closed. Got no message.")?;
184 message.pop();
186 assert_eq!(logged_messaged, message);
187 }
188
189 Ok(())
190 }
191 )?;
192
193 Ok(())
194 }
195
196 #[fuchsia::test]
197 async fn drain_lines_waits_for_entire_lines() -> Result<(), Error> {
198 let (tx, rx) = zx::Socket::create_stream();
199 let rx = fasync::Socket::from_socket(rx);
200 let (mut sender, mut recv) = create_mock_logger();
201
202 let ((), ()) = try_join!(
203 async move { drain_lines(rx, &mut sender).await.map_err(Error::from) },
204 async move {
205 let () = write_to_socket(&tx, "Hello\nWorld")?;
206 let logged_messaged =
207 recv.next().await.context("Receiver channel closed. Got no message.")?;
208 assert_eq!(logged_messaged, "Hello");
209 let () = write_to_socket(&tx, "Hello\nAgain")?;
210 std::mem::drop(tx);
211 let logged_messaged =
212 recv.next().await.context("Receiver channel closed. Got no message.")?;
213 assert_eq!(logged_messaged, "WorldHello");
214 let logged_messaged =
215 recv.next().await.context("Receiver channel closed. Got no message.")?;
216 assert_eq!(logged_messaged, "Again");
217 Ok(())
218 }
219 )?;
220
221 Ok(())
222 }
223
224 #[fuchsia::test]
225 async fn drain_lines_collapses_repeated_newlines() -> Result<(), Error> {
226 let (tx, rx) = zx::Socket::create_stream();
227 let rx = fasync::Socket::from_socket(rx);
228 let (mut sender, mut recv) = create_mock_logger();
229
230 let drainer = Task::spawn(async move { drain_lines(rx, &mut sender).await });
231
232 write_to_socket(&tx, "Hello\n\nWorld\n")?;
233 assert_eq!(recv.next().await.unwrap(), "Hello");
234 assert_eq!(recv.next().await.unwrap(), "World");
235
236 drop(tx);
237 drainer.await?;
238 assert_eq!(recv.next().await, None);
239
240 Ok(())
241 }
242
243 fn take_and_write_to_socket(socket: zx::Socket, message: &str) -> Result<(), Error> {
244 write_to_socket(&socket, &message)
245 }
246
247 fn write_to_socket(socket: &zx::Socket, message: &str) -> Result<(), Error> {
248 let bytes_written =
249 socket.write(message.as_bytes()).context("Failed to write to socket")?;
250 match bytes_written == message.len() {
251 true => Ok(()),
252 false => Err(format_err!(
253 "Bytes written to socket doesn't match len of message. Message len = {}. Bytes written = {}",
254 message.len(),
255 bytes_written
256 )),
257 }
258 }
259
260 fn create_mock_logger() -> (mpsc::Sender<String>, mpsc::Receiver<String>) {
261 mpsc::channel::<String>(20)
262 }
263
264 fn get_random_string(size: usize) -> String {
265 Alphanumeric.sample_string(&mut rng(), size)
266 }
267}