Skip to main content

test_runners_lib/
logs.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Helpers for capturing logs from Fuchsia processes.
6
7use fuchsia_async as fasync;
8use futures::{AsyncReadExt as _, AsyncWriteExt as _, FutureExt as _, future};
9use std::num::NonZeroUsize;
10use thiserror::Error;
11
12/// Buffer size for socket read calls to `LoggerStream::buffer_and_drain`.
13const SOCKET_BUFFER_SIZE: usize = 2048;
14
15/// Maximum length we will buffer for a single line. If a line is longer than this
16/// length it will be split up into multiple messages.
17const MAX_LINE_BUFFER_LENGTH: usize = 4096;
18
19/// Error returned by this library.
20#[derive(Debug, PartialEq, Eq, Error, Clone)]
21pub enum LoggerError {
22    #[error("cannot create socket: {:?}", _0)]
23    CreateSocket(zx::Status),
24
25    #[error("cannot duplicate socket: {:?}", _0)]
26    DuplicateSocket(zx::Status),
27
28    #[error("invalid socket: {:?}", _0)]
29    InvalidSocket(zx::Status),
30}
31
32/// Error returned from draining LoggerStream or writing to LogWriter.
33#[derive(Debug, Error)]
34pub enum LogError {
35    /// Error encountered when draining LoggerStream.
36    #[error("can't get logs: {:?}", _0)]
37    Read(std::io::Error),
38
39    /// Error encountered when writing to LogWriter.
40    #[error("can't write logs: {:?}", _0)]
41    Write(std::io::Error),
42}
43
44/// Creates a combined socket handle for stdout and stderr and hooks them to same socket.
45/// It also wraps the socket into stream and returns it back.
46pub fn create_std_combined_log_stream()
47-> Result<(LoggerStream, zx::NullableHandle, zx::NullableHandle), LoggerError> {
48    let (client, log) = zx::Socket::create_stream();
49
50    let stream = LoggerStream::new(client).map_err(LoggerError::InvalidSocket)?;
51    let clone =
52        log.duplicate_handle(zx::Rights::SAME_RIGHTS).map_err(LoggerError::DuplicateSocket)?;
53
54    Ok((stream, log.into_handle(), clone.into_handle()))
55}
56
57/// Creates a socket handle for stdout/stderr and hooks it to a file handle.
58/// It also wraps the socket into stream and returns it back.
59pub fn create_log_stream() -> Result<(LoggerStream, zx::NullableHandle), LoggerError> {
60    let (client, log) = zx::Socket::create_stream();
61
62    let stream = LoggerStream::new(client).map_err(LoggerError::InvalidSocket)?;
63
64    Ok((stream, log.into_handle()))
65}
66/// Collects logs in background and gives a way to collect those logs.
67pub struct LogStreamReader {
68    fut: future::RemoteHandle<Result<Vec<u8>, LogError>>,
69}
70
71impl LogStreamReader {
72    pub fn new(logger: LoggerStream) -> Self {
73        let (logger_handle, logger_fut) = logger.read_to_end().remote_handle();
74        fasync::Task::spawn(logger_handle).detach();
75        Self { fut: logger_fut }
76    }
77
78    /// Retrieve all logs.
79    pub async fn get_logs(self) -> Result<Vec<u8>, LogError> {
80        self.fut.await
81    }
82}
83
84/// A stream bound to a socket where a source stream is captured.
85/// For example, stdout and stderr streams can be redirected to the contained
86/// socket and captured.
87pub struct LoggerStream {
88    socket: fasync::Socket,
89}
90
91impl Unpin for LoggerStream {}
92
93impl LoggerStream {
94    /// Create a LoggerStream from the provided zx::Socket. The `socket` object
95    /// should be bound to its intended source stream (e.g. "stdout").
96    pub fn new(socket: zx::Socket) -> Result<LoggerStream, zx::Status> {
97        let l = LoggerStream { socket: fasync::Socket::from_socket(socket) };
98        Ok(l)
99    }
100
101    /// Reads all bytes from socket.
102    pub async fn read_to_end(mut self) -> Result<Vec<u8>, LogError> {
103        let mut buffer: Vec<u8> = Vec::new();
104        let _bytes_read = self.socket.read_to_end(&mut buffer).await.map_err(LogError::Read)?;
105        Ok(buffer)
106    }
107
108    /// Drain the `stream` and write all of its contents to `writer`. Bytes are
109    /// delimited by newline and each line will be passed to `writer.write` individually.
110    /// An optional `peek_fn` may be specified which is passed a reference to each line before
111    /// it is written.
112    pub async fn buffer_drain_and_peek(
113        mut self,
114        writer: &mut SocketLogWriter,
115        peek_fn: Option<impl Fn(&[u8])>,
116    ) -> Result<(), LogError> {
117        let mut line_buffer: Vec<u8> = Vec::with_capacity(MAX_LINE_BUFFER_LENGTH);
118        let mut socket_buffer: Vec<u8> = vec![0; SOCKET_BUFFER_SIZE];
119
120        while let Some(bytes_read) = NonZeroUsize::new(
121            self.socket.read(&mut socket_buffer[..]).await.map_err(LogError::Read)?,
122        ) {
123            let bytes_read = bytes_read.get();
124
125            let newline_iter = socket_buffer[..bytes_read]
126                .iter()
127                .enumerate()
128                .filter_map(|(i, &b)| if b == b'\n' { Some(i) } else { None });
129
130            let mut prev_offset = 0;
131            for idx in newline_iter {
132                let line = &socket_buffer[prev_offset..idx + 1];
133                if !line_buffer.is_empty() {
134                    writer.write(line_buffer.drain(..).as_slice()).await?;
135                }
136                if let Some(peek) = &peek_fn {
137                    peek(line);
138                }
139                writer.write(line).await?;
140                prev_offset = idx + 1;
141            }
142            if prev_offset != bytes_read {
143                line_buffer.extend_from_slice(&socket_buffer[prev_offset..bytes_read]);
144            }
145
146            if line_buffer.len() > MAX_LINE_BUFFER_LENGTH {
147                let bytes = &line_buffer[..MAX_LINE_BUFFER_LENGTH];
148                if let Some(peek) = &peek_fn {
149                    peek(bytes);
150                }
151                writer.write(bytes).await?;
152                line_buffer.drain(..MAX_LINE_BUFFER_LENGTH);
153            }
154        }
155
156        if !line_buffer.is_empty() {
157            let bytes = &line_buffer[..];
158            if let Some(peek) = &peek_fn {
159                peek(bytes);
160            }
161            writer.write(bytes).await?;
162        }
163
164        Ok(())
165    }
166
167    /// Convenience function for buffer_drain_and_peek without a peek function.
168    pub async fn buffer_and_drain(self, writer: &mut SocketLogWriter) -> Result<(), LogError> {
169        self.buffer_drain_and_peek(writer, None::<fn(&[u8])>).await
170    }
171
172    /// Take the underlying socket of this object.
173    pub fn take_socket(self) -> fasync::Socket {
174        self.socket
175    }
176}
177
178/// Utility struct to write to socket asynchrously.
179pub struct SocketLogWriter {
180    logger: fasync::Socket,
181}
182
183impl SocketLogWriter {
184    pub fn new(logger: fasync::Socket) -> Self {
185        Self { logger }
186    }
187
188    pub async fn write_str(&mut self, s: &str) -> Result<(), LogError> {
189        self.write(s.as_bytes()).await
190    }
191
192    pub async fn write(&mut self, bytes: &[u8]) -> Result<(), LogError> {
193        self.logger.write_all(bytes).await.map_err(LogError::Write)
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200    use anyhow::{Context as _, Error, format_err};
201    use assert_matches::assert_matches;
202    use futures::{TryStreamExt as _, try_join};
203    use rand::distr::{Alphanumeric, SampleString as _};
204    use rand::rng;
205    use std::sync::mpsc;
206    use test_case::test_case;
207
208    #[fuchsia_async::run_singlethreaded(test)]
209    async fn log_writer_reader_work() {
210        let (sock1, sock2) = zx::Socket::create_stream();
211        let mut log_writer = SocketLogWriter::new(fasync::Socket::from_socket(sock1));
212
213        let reader = LoggerStream::new(sock2).unwrap();
214        let reader = LogStreamReader::new(reader);
215
216        log_writer.write_str("this is string one.").await.unwrap();
217        log_writer.write_str("this is string two.").await.unwrap();
218        drop(log_writer);
219
220        let actual = reader.get_logs().await.unwrap();
221        let actual = std::str::from_utf8(&actual).unwrap();
222        assert_eq!(actual, "this is string one.this is string two.".to_owned());
223    }
224
225    #[test_case(String::from("Hello World!") ; "consumes_simple_msg")]
226    #[test_case(get_random_string(10000) ; "consumes_large_msg")]
227    #[fasync::run_singlethreaded(test)]
228    async fn logger_stream_read_to_end(msg: String) -> Result<(), Error> {
229        let (stream, tx) = create_logger_stream()?;
230
231        let () = take_and_write_to_socket(tx, &msg)?;
232        let result = stream.read_to_end().await.context("Failed to read from socket")?;
233        let actual = std::str::from_utf8(&result).context("Failed to parse bytes")?.to_owned();
234
235        assert_eq!(actual, msg);
236        Ok(())
237    }
238
239    #[fasync::run_singlethreaded(test)]
240    async fn logger_stream_read_to_end_consumes_concat_msgs() -> Result<(), Error> {
241        let (stream, tx) = create_logger_stream()?;
242        let msgs =
243            vec!["Hello World!".to_owned(), "Hola Mundo!".to_owned(), "你好,世界!".to_owned()];
244
245        for msg in msgs.iter() {
246            let () = write_to_socket(&tx, &msg)?;
247        }
248        std::mem::drop(tx);
249        let result = stream.read_to_end().await.context("Failed to read from socket")?;
250        let actual = std::str::from_utf8(&result).context("Failed to parse bytes")?.to_owned();
251
252        assert_eq!(actual, msgs.join(""));
253        Ok(())
254    }
255
256    #[fasync::run_singlethreaded(test)]
257    async fn buffer_and_drain_reads_each_line_as_a_new_message() -> Result<(), Error> {
258        let (stream, tx) = create_logger_stream()?;
259        let (mut logger, rx) = create_datagram_logger()?;
260        let msg = "Hello World\nHola Mundo!\n你好,世界!";
261
262        let (tx_peeks, rx_peeks) = mpsc::channel();
263
264        let () = take_and_write_to_socket(tx, msg)?;
265        let (actual, ()) = try_join!(read_all_messages(rx), async move {
266            stream
267                .buffer_drain_and_peek(
268                    &mut logger,
269                    Some(move |line: &[u8]| tx_peeks.send(line.len()).unwrap()),
270                )
271                .await
272                .context("Failed to drain stream")
273        },)?;
274
275        let expected = vec![
276            "Hello World\n".to_string(),
277            "Hola Mundo!\n".to_string(),
278            "你好,世界!".to_string(),
279        ];
280        assert_eq!(actual, expected);
281
282        let lengths = rx_peeks.iter().collect::<Vec<_>>();
283
284        assert_eq!(lengths, expected.iter().map(|v| v.len()).collect::<Vec<_>>());
285
286        Ok(())
287    }
288
289    #[fasync::run_singlethreaded(test)]
290    async fn buffer_and_drain_does_not_buffer_past_maximum_size() -> Result<(), Error> {
291        let msg = get_random_string(MAX_LINE_BUFFER_LENGTH + 10);
292        let (stream, tx) = create_logger_stream()?;
293        let (mut logger, rx) = create_datagram_logger()?;
294
295        let (tx_peeks, rx_peeks) = mpsc::channel();
296
297        let () = take_and_write_to_socket(tx, &msg)?;
298        let (actual, ()) = try_join!(read_all_messages(rx), async move {
299            stream
300                .buffer_drain_and_peek(
301                    &mut logger,
302                    Some(move |line: &[u8]| {
303                        tx_peeks.send(line.len()).unwrap();
304                    }),
305                )
306                .await
307                .context("Failed to drain stream")
308        },)?;
309
310        let lengths = rx_peeks.iter().collect::<Vec<_>>();
311
312        assert_eq!(actual.len(), 2);
313        assert_eq!(actual[0], msg[0..MAX_LINE_BUFFER_LENGTH]);
314        assert_eq!(actual[1], msg[MAX_LINE_BUFFER_LENGTH..]);
315
316        assert_eq!(lengths, vec![MAX_LINE_BUFFER_LENGTH, 10]);
317
318        Ok(())
319    }
320
321    #[fasync::run_singlethreaded(test)]
322    async fn buffer_and_drain_dumps_full_buffer_if_no_newline_seen() -> Result<(), Error> {
323        let (stream, tx) = create_logger_stream()?;
324        let (mut logger, rx) = create_datagram_logger()?;
325
326        let ((), ()) = try_join!(
327            async move {
328                let msg = get_random_string(SOCKET_BUFFER_SIZE);
329                // First write up to (SOCKET_BUFFER_SIZE - 1) so that we can
330                // assert that buffer isn't drained prematurely.
331                let () = write_to_socket(&tx, &msg[..SOCKET_BUFFER_SIZE - 1])?;
332
333                // Temporarily convert fasync::Socket back to zx::Socket so that
334                // we can use non-blocking `read` call.
335                let rx = rx.into_zx_socket();
336                let mut buffer = vec![0u8; SOCKET_BUFFER_SIZE];
337                let maybe_bytes_read = rx.read(&mut buffer);
338                assert_eq!(maybe_bytes_read, Err(zx::Status::SHOULD_WAIT));
339
340                // Write last byte
341                let () = write_to_socket(&tx, &msg[SOCKET_BUFFER_SIZE - 1..SOCKET_BUFFER_SIZE])?;
342
343                // Confirm we still didn't write, waiting for newline.
344                let maybe_bytes_read = rx.read(&mut buffer);
345                assert_eq!(maybe_bytes_read, Err(zx::Status::SHOULD_WAIT));
346
347                // Drop socket to unblock the read routine.
348                std::mem::drop(tx);
349
350                // Convert zx::Socket back to fasync::Socket.
351                let mut rx = fasync::Socket::from_socket(rx);
352                let bytes_read =
353                    rx.read(&mut buffer).await.context("Failed to read from socket")?;
354                let msg_written = std::str::from_utf8(&buffer).context("Failed to parse bytes")?;
355
356                assert_eq!(bytes_read, SOCKET_BUFFER_SIZE);
357                assert_eq!(msg_written, msg);
358
359                Ok(())
360            },
361            async move { stream.buffer_and_drain(&mut logger).await.context("Failed to drain stream") },
362        )?;
363
364        Ok(())
365    }
366
367    #[fasync::run_singlethreaded(test)]
368    async fn buffer_and_drain_return_error_if_stream_polls_err() -> Result<(), Error> {
369        let (_tx, rx) = zx::Socket::create_stream();
370        // If we don't have read rights the socket returns an error when polled.
371        let rx = rx.duplicate_handle(zx::Rights::BASIC).expect("duplicate");
372        let stream = LoggerStream::new(rx).context("Failed to create LoggerStream")?;
373        let (mut logger, _rx) = create_datagram_logger()?;
374
375        let result = stream.buffer_and_drain(&mut logger).await;
376
377        assert_matches!(result, Err(LogError::Read(_)));
378        Ok(())
379    }
380
381    async fn read_all_messages(socket: fasync::Socket) -> Result<Vec<String>, Error> {
382        let mut results = Vec::new();
383        let mut stream = socket.into_datagram_stream();
384        while let Some(bytes) = stream.try_next().await.context("Failed to read socket stream")? {
385            results.push(
386                std::str::from_utf8(&bytes).context("Failed to parse bytes into utf8")?.to_owned(),
387            );
388        }
389
390        Ok(results)
391    }
392
393    fn take_and_write_to_socket(socket: zx::Socket, message: &str) -> Result<(), Error> {
394        write_to_socket(&socket, &message)
395    }
396
397    fn write_to_socket(socket: &zx::Socket, message: &str) -> Result<(), Error> {
398        let bytes_written =
399            socket.write(message.as_bytes()).context("Failed to write to socket")?;
400        match bytes_written == message.len() {
401            true => Ok(()),
402            false => Err(format_err!(
403                "Bytes written to socket doesn't match len of message. Message len = {}. Bytes written = {}",
404                message.len(),
405                bytes_written
406            )),
407        }
408    }
409
410    fn create_datagram_logger() -> Result<(SocketLogWriter, fasync::Socket), Error> {
411        let (tx, rx) = zx::Socket::create_datagram();
412        let logger = SocketLogWriter::new(fasync::Socket::from_socket(tx));
413        let rx = fasync::Socket::from_socket(rx);
414        Ok((logger, rx))
415    }
416
417    fn create_logger_stream() -> Result<(LoggerStream, zx::Socket), Error> {
418        let (tx, rx) = zx::Socket::create_stream();
419        let stream = LoggerStream::new(rx).context("Failed to create LoggerStream")?;
420        Ok((stream, tx))
421    }
422
423    fn get_random_string(size: usize) -> String {
424        Alphanumeric.sample_string(&mut rng(), size)
425    }
426}