elf_runner/
stdout.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::config::StreamSink;
6use super::logger::{LogWriter, OutputLevel, SyslogWriter, create_namespace_logger};
7use diagnostics_log::Publisher;
8use fuchsia_runtime::{HandleInfo, HandleType};
9use futures::{Future, StreamExt};
10use log::warn;
11use namespace::Namespace;
12use socket_parsing::{NewlineChunker, NewlineChunkerError};
13use vfs::ExecutionScope;
14use zx::HandleBased;
15use {fidl_fuchsia_process as fproc, fuchsia_async as fasync};
16
17const STDOUT_FD: i32 = 1;
18const STDERR_FD: i32 = 2;
19
20/// Max size for message when draining input stream socket. This number is
21/// slightly smaller than size allowed by Archivist (LogSink service implementation).
22const MAX_MESSAGE_SIZE: usize = 30720;
23
24/// Bind stdout or stderr streams to syslog. This function binds either or both
25/// output streams to syslog depending on value provided for each streams'
26/// StreamSink. If the value for an output stream is set to StreamSink::Log,
27/// that stream's file descriptor will be bound to syslog. All writes on that
28// fd will be forwarded to syslog and will register as log entries. For stdout,
29// the messages will be tagged with severity INFO. For stderr, the messages
30// will be tagged with severity WARN. A task is created to listen to writes on
31// the appropriate file descriptor and forward the message to syslog. This
32// function returns both the task for each file descriptor and its
33// corresponding HandleInfo.
34pub fn bind_streams_to_syslog(
35    ns: &Namespace,
36    stdout_sink: StreamSink,
37    stderr_sink: StreamSink,
38) -> (ExecutionScope, Vec<fproc::HandleInfo>) {
39    let scope = ExecutionScope::new();
40    let mut handles: Vec<fproc::HandleInfo> = Vec::new();
41
42    let mut logger = None;
43    let mut forward_stream = |sink, fd, level| {
44        if matches!(sink, StreamSink::Log) {
45            // create the handle before dealing with the logger so components still receive an inert
46            // handle if connecting to LogSink fails
47            let (socket, handle_info) =
48                new_socket_bound_to_fd(fd).expect("failed to create socket");
49            handles.push(handle_info);
50
51            if let Some(logger) = logger.get_or_insert_with(|| create_namespace_logger(ns)) {
52                scope.spawn(forward_socket_to_syslog(logger.clone(), socket, level));
53            }
54        }
55    };
56
57    forward_stream(stdout_sink, STDOUT_FD, OutputLevel::Info);
58    forward_stream(stderr_sink, STDERR_FD, OutputLevel::Warn);
59
60    (scope, handles)
61}
62
63fn forward_socket_to_syslog(
64    logger: impl Future<Output = Option<Publisher>> + Send + 'static,
65    socket: fasync::Socket,
66    level: OutputLevel,
67) -> impl Future<Output = ()> + 'static {
68    async move {
69        let Some(logger) = logger.await else { return };
70        let mut writer = SyslogWriter::new(logger, level);
71        if let Err(error) = drain_lines(socket, &mut writer).await {
72            warn!(error:%; "Draining output stream failed");
73        }
74    }
75}
76
77fn new_socket_bound_to_fd(fd: i32) -> Result<(fasync::Socket, fproc::HandleInfo), zx::Status> {
78    let (tx, rx) = zx::Socket::create_stream();
79    let rx = fasync::Socket::from_socket(rx);
80    Ok((
81        rx,
82        fproc::HandleInfo {
83            handle: tx.into_handle(),
84            id: HandleInfo::new(HandleType::FileDescriptor, fd as u16).as_raw(),
85        },
86    ))
87}
88
89/// Drains all bytes from socket and writes messages to writer. Bytes read
90/// are split into lines and separated into chunks no greater than
91/// MAX_MESSAGE_SIZE.
92async fn drain_lines(
93    socket: fasync::Socket,
94    writer: &mut impl LogWriter,
95) -> Result<(), NewlineChunkerError> {
96    let chunker = NewlineChunker::new(socket, MAX_MESSAGE_SIZE);
97    futures::pin_mut!(chunker);
98
99    while let Some(chunk_or_line) = chunker.next().await {
100        writer.write(&chunk_or_line?).await;
101    }
102
103    Ok(())
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109    use anyhow::{Context, Error, format_err};
110    use fuchsia_async::Task;
111    use futures::channel::mpsc;
112    use futures::{FutureExt, SinkExt, try_join};
113    use rand::distr::{Alphanumeric, SampleString as _};
114    use rand::rng;
115
116    impl LogWriter for mpsc::Sender<String> {
117        async fn write(&mut self, bytes: &[u8]) {
118            let message =
119                std::str::from_utf8(&bytes).expect("Failed to decode bytes to utf8.").to_owned();
120            let () =
121                self.send(message).await.expect("Failed to send message to other end of mpsc.");
122        }
123    }
124
125    #[fuchsia::test]
126    async fn drain_lines_splits_into_max_size_chunks() -> Result<(), Error> {
127        let (tx, rx) = zx::Socket::create_stream();
128        let rx = fasync::Socket::from_socket(rx);
129        let (mut sender, recv) = create_mock_logger();
130        let msg = get_random_string(MAX_MESSAGE_SIZE * 4);
131
132        let () = take_and_write_to_socket(tx, &msg)?;
133        let (actual, ()) =
134            try_join!(recv.collect().map(Result::<Vec<String>, Error>::Ok), async move {
135                drain_lines(rx, &mut sender).await.map_err(Into::into)
136            })?;
137
138        assert_eq!(
139            actual,
140            msg.as_bytes()
141                .chunks(MAX_MESSAGE_SIZE)
142                .map(|bytes| std::str::from_utf8(bytes).expect("Bytes are not utf8.").to_owned())
143                .collect::<Vec<String>>()
144        );
145
146        Ok(())
147    }
148
149    #[fuchsia::test]
150    async fn drain_lines_splits_at_newline() -> Result<(), Error> {
151        let (tx, rx) = zx::Socket::create_stream();
152        let rx = fasync::Socket::from_socket(rx);
153        let (mut sender, recv) = create_mock_logger();
154        let msg =
155            std::iter::repeat_with(|| Alphanumeric.sample_string(&mut rng(), MAX_MESSAGE_SIZE - 1))
156                .take(3)
157                .collect::<Vec<_>>()
158                .join("\n");
159
160        let () = take_and_write_to_socket(tx, &msg)?;
161        let (actual, ()) =
162            try_join!(recv.collect().map(Result::<Vec<String>, Error>::Ok), async move {
163                drain_lines(rx, &mut sender).await.map_err(Into::into)
164            })?;
165
166        assert_eq!(actual, msg.split("\n").map(str::to_owned).collect::<Vec<String>>());
167        Ok(())
168    }
169
170    #[fuchsia::test]
171    async fn drain_lines_writes_when_message_is_received() -> Result<(), Error> {
172        let (tx, rx) = zx::Socket::create_stream();
173        let rx = fasync::Socket::from_socket(rx);
174        let (mut sender, mut recv) = create_mock_logger();
175        let messages: Vec<String> = vec!["Hello!\n".to_owned(), "World!\n".to_owned()];
176
177        let ((), ()) = try_join!(
178            async move { drain_lines(rx, &mut sender).await.map_err(Error::from) },
179            async move {
180                for mut message in messages.into_iter() {
181                    let () = write_to_socket(&tx, &message)?;
182                    let logged_messaged =
183                        recv.next().await.context("Receiver channel closed. Got no message.")?;
184                    // Logged message should strip '\n' so we need to do the same before assertion.
185                    message.pop();
186                    assert_eq!(logged_messaged, message);
187                }
188
189                Ok(())
190            }
191        )?;
192
193        Ok(())
194    }
195
196    #[fuchsia::test]
197    async fn drain_lines_waits_for_entire_lines() -> Result<(), Error> {
198        let (tx, rx) = zx::Socket::create_stream();
199        let rx = fasync::Socket::from_socket(rx);
200        let (mut sender, mut recv) = create_mock_logger();
201
202        let ((), ()) = try_join!(
203            async move { drain_lines(rx, &mut sender).await.map_err(Error::from) },
204            async move {
205                let () = write_to_socket(&tx, "Hello\nWorld")?;
206                let logged_messaged =
207                    recv.next().await.context("Receiver channel closed. Got no message.")?;
208                assert_eq!(logged_messaged, "Hello");
209                let () = write_to_socket(&tx, "Hello\nAgain")?;
210                std::mem::drop(tx);
211                let logged_messaged =
212                    recv.next().await.context("Receiver channel closed. Got no message.")?;
213                assert_eq!(logged_messaged, "WorldHello");
214                let logged_messaged =
215                    recv.next().await.context("Receiver channel closed. Got no message.")?;
216                assert_eq!(logged_messaged, "Again");
217                Ok(())
218            }
219        )?;
220
221        Ok(())
222    }
223
224    #[fuchsia::test]
225    async fn drain_lines_collapses_repeated_newlines() -> Result<(), Error> {
226        let (tx, rx) = zx::Socket::create_stream();
227        let rx = fasync::Socket::from_socket(rx);
228        let (mut sender, mut recv) = create_mock_logger();
229
230        let drainer = Task::spawn(async move { drain_lines(rx, &mut sender).await });
231
232        write_to_socket(&tx, "Hello\n\nWorld\n")?;
233        assert_eq!(recv.next().await.unwrap(), "Hello");
234        assert_eq!(recv.next().await.unwrap(), "World");
235
236        drop(tx);
237        drainer.await?;
238        assert_eq!(recv.next().await, None);
239
240        Ok(())
241    }
242
243    fn take_and_write_to_socket(socket: zx::Socket, message: &str) -> Result<(), Error> {
244        write_to_socket(&socket, &message)
245    }
246
247    fn write_to_socket(socket: &zx::Socket, message: &str) -> Result<(), Error> {
248        let bytes_written =
249            socket.write(message.as_bytes()).context("Failed to write to socket")?;
250        match bytes_written == message.len() {
251            true => Ok(()),
252            false => Err(format_err!(
253                "Bytes written to socket doesn't match len of message. Message len = {}. Bytes written = {}",
254                message.len(),
255                bytes_written
256            )),
257        }
258    }
259
260    fn create_mock_logger() -> (mpsc::Sender<String>, mpsc::Receiver<String>) {
261        mpsc::channel::<String>(20)
262    }
263
264    fn get_random_string(size: usize) -> String {
265        Alphanumeric.sample_string(&mut rng(), size)
266    }
267}