Skip to main content

test_diagnostics/
lib.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! This crate provides helper functions to collect test diagnostics.
6
7use fuchsia_async as fasync;
8use fuchsia_sync::Mutex;
9use futures::channel::mpsc;
10use futures::prelude::*;
11use futures::ready;
12use futures::task::{Context, Poll};
13use std::cell::RefCell;
14use std::io::Write;
15use std::pin::Pin;
16use std::sync::Arc;
17
18pub use crate::diagnostics::LogStream;
19
20mod diagnostics;
21pub mod zstd_compress;
22
23thread_local! {
24    static BUFFER: RefCell<Vec<u8>> = RefCell::new(vec![0; 1024*1024*2]);
25}
26
27/// Future that executes a function when bytes are available on a socket.
28pub struct SocketReadFut<'a, T, F>
29where
30    F: FnMut(Option<&[u8]>) -> Result<T, std::io::Error> + Unpin,
31{
32    socket: &'a mut flex_client::AsyncSocket,
33    on_read_fn: F,
34}
35
36impl<'a, T, F> SocketReadFut<'a, T, F>
37where
38    F: FnMut(Option<&[u8]>) -> Result<T, std::io::Error> + Unpin,
39{
40    pub fn new(socket: &'a mut flex_client::AsyncSocket, on_read_fn: F) -> Self {
41        Self { socket, on_read_fn }
42    }
43}
44
45impl<'a, T, F> Future for SocketReadFut<'a, T, F>
46where
47    F: FnMut(Option<&[u8]>) -> Result<T, std::io::Error> + Unpin,
48{
49    type Output = Result<T, std::io::Error>;
50    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
51        BUFFER.with(|b| {
52            let mut b = b.borrow_mut();
53            let mut_self = self.get_mut();
54            let len = ready!(Pin::new(&mut mut_self.socket).poll_read(cx, &mut *b)?);
55            match len {
56                0 => Poll::Ready((mut_self.on_read_fn)(None)),
57                l => Poll::Ready((mut_self.on_read_fn)(Some(&b[..l]))),
58            }
59        })
60    }
61}
62
63pub async fn collect_string_from_socket(
64    socket: flex_client::Socket,
65) -> Result<String, anyhow::Error> {
66    let (s, mut r) = mpsc::channel(1024);
67    let task = fasync::Task::spawn(collect_and_send_string_output(socket, s));
68    let mut ret = String::new();
69    while let Some(content) = r.next().await {
70        ret.push_str(&content);
71    }
72    task.await?;
73    Ok(ret)
74}
75
76pub async fn collect_and_send_string_output(
77    socket: flex_client::Socket,
78    mut sender: mpsc::Sender<String>,
79) -> Result<(), anyhow::Error> {
80    let mut async_socket = flex_client::socket_to_async(socket);
81    loop {
82        let maybe_string = SocketReadFut::new(&mut async_socket, |maybe_buf| {
83            Ok(maybe_buf.map(|buf| String::from_utf8_lossy(buf).into()))
84        })
85        .await?;
86        match maybe_string {
87            Some(string) => sender.send(string).await?,
88            None => return Ok(()),
89        }
90    }
91}
92
93/// A writer that buffers content in memory for some duration before flushing the contents to
94/// an inner writer. After the duration elapses, any new bytes are written immediately to the
95/// inner writer. Calling flush() also immediately flushes the contents.
96/// Errors that occur when flushing on timeout are returned at the next write() or flush()
97/// call. Therefore, callers should make sure to call flush before the StdoutBuffer goes out of
98/// scope.
99pub struct StdoutBuffer<W: Write + Send + 'static> {
100    inner: Arc<Mutex<StdoutBufferInner<W>>>,
101    _timer: fuchsia_async::Task<()>,
102}
103
104impl<W: Write + Send + 'static> StdoutBuffer<W> {
105    /// Crates new StdoutBuffer and starts the timer on log buffering.
106    /// `duration`: Buffers log for this duration or till done() is called.
107    /// `sender`: Channel to send logs on.
108    /// `max_capacity`: Flush log if buffer size exceeds this value. This will not cancel the timer
109    /// and all the logs would be flushed once timer expires.
110    pub fn new(duration: std::time::Duration, writer: W, max_capacity: usize) -> Self {
111        let (inner, timer) = StdoutBufferInner::new(duration, writer, max_capacity);
112        Self { inner, _timer: timer }
113    }
114}
115
116impl<W: Write + Send + 'static> Write for StdoutBuffer<W> {
117    fn flush(&mut self) -> Result<(), std::io::Error> {
118        self.inner.lock().flush()
119    }
120
121    fn write(&mut self, bytes: &[u8]) -> Result<usize, std::io::Error> {
122        self.inner.lock().write(bytes)
123    }
124}
125
126struct StdoutBufferInner<W: Write + Send + 'static> {
127    writer: W,
128    /// Whether to buffer logs or not.
129    buffer: Option<Vec<u8>>,
130    stop_buffer_error: Option<std::io::Error>,
131    max_capacity: usize,
132}
133
134impl<W: Write + Send + 'static> StdoutBufferInner<W> {
135    fn new(
136        duration: std::time::Duration,
137        writer: W,
138        max_capacity: usize,
139    ) -> (Arc<Mutex<Self>>, fuchsia_async::Task<()>) {
140        let new_self = Arc::new(Mutex::new(StdoutBufferInner {
141            writer,
142            buffer: Some(Vec::with_capacity(max_capacity)),
143            stop_buffer_error: None,
144            max_capacity,
145        }));
146
147        let timer = fuchsia_async::Timer::new(duration);
148        let log_buffer = Arc::downgrade(&new_self);
149        let f = async move {
150            timer.await;
151            if let Some(log_buffer) = log_buffer.upgrade() {
152                log_buffer.lock().stop_buffering();
153            }
154        };
155
156        (new_self, fuchsia_async::Task::spawn(f))
157    }
158
159    fn stop_buffering(&mut self) {
160        if let Some(buf) = self.buffer.take() {
161            if let Err(e) = self.writer.write_all(&buf) {
162                self.stop_buffer_error = Some(e);
163            }
164        }
165    }
166}
167
168impl<W: Write + Send + 'static> Write for StdoutBufferInner<W> {
169    fn flush(&mut self) -> Result<(), std::io::Error> {
170        self.stop_buffering();
171        match self.stop_buffer_error.take() {
172            Some(e) => Err(e),
173            None => self.writer.flush(),
174        }
175    }
176
177    fn write(&mut self, bytes: &[u8]) -> Result<usize, std::io::Error> {
178        if let Some(e) = self.stop_buffer_error.take() {
179            return Err(e);
180        }
181        match self.buffer.as_mut() {
182            None => self.writer.write(bytes),
183            Some(buf) if buf.len() + bytes.len() > self.max_capacity => {
184                self.writer.write_all(&buf)?;
185                buf.truncate(0);
186                self.writer.write(bytes)
187            }
188            Some(buf) => Write::write(buf, bytes),
189        }
190    }
191}
192
193impl<W: Write + Send + 'static> Drop for StdoutBufferInner<W> {
194    fn drop(&mut self) {
195        let _ = self.flush();
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202    use futures::StreamExt;
203    use pretty_assertions::assert_eq;
204
205    async fn collect_until_eq<S: Stream<Item = String> + Unpin>(mut stream: S, target: &str) {
206        let mut accumulator = "".to_string();
207        while accumulator.len() < target.len() {
208            match stream.next().await {
209                Some(string) => accumulator.push_str(&string),
210                None => panic!(
211                    "Expected string '{}' but stream terminated after '{}'",
212                    target, accumulator
213                ),
214            }
215        }
216        assert_eq!(target, accumulator);
217    }
218
219    #[fuchsia_async::run_singlethreaded(test)]
220    async fn collect_test_stdout() {
221        #[cfg(feature = "fdomain")]
222        let client = fdomain_local::local_client_empty();
223        #[cfg(not(feature = "fdomain"))]
224        let client = flex_client::fidl::ZirconClient;
225        let (sock_server, sock_client) = client.create_stream_socket();
226        let mut sock_server = flex_client::socket_to_async(sock_server);
227
228        let (sender, mut recv) = mpsc::channel(1);
229
230        let fut =
231            fuchsia_async::Task::spawn(collect_and_send_string_output(sock_client, sender.into()));
232
233        sock_server.write_all(b"test message 1").await.expect("Can't write msg to socket");
234        sock_server.write_all(b"test message 2").await.expect("Can't write msg to socket");
235        sock_server.write_all(b"test message 3").await.expect("Can't write msg to socket");
236
237        collect_until_eq(&mut recv, "test message 1test message 2test message 3").await;
238
239        // can receive messages multiple times
240        sock_server.write_all(b"test message 4").await.expect("Can't write msg to socket");
241        collect_until_eq(&mut recv, "test message 4").await;
242
243        // messages can be read after socket server is closed.
244        sock_server.write_all(b"test message 5").await.expect("Can't write msg to socket");
245        std::mem::drop(sock_server); // this will drop this handle and close it.
246        fut.await.expect("log collection should not fail");
247        collect_until_eq(&mut recv, "test message 5").await;
248
249        // socket was closed, this should return None
250        let msg = recv.next().await;
251        assert_eq!(msg, None);
252    }
253
254    /// Host side executor doesn't have a fake timer, so these tests only run on device for now.
255    #[cfg(target_os = "fuchsia")]
256    mod stdout {
257        use super::*;
258        use fuchsia_async::TestExecutor;
259
260        use pretty_assertions::assert_eq;
261        use std::ops::Add;
262
263        struct MutexBytes(Arc<Mutex<Vec<u8>>>);
264
265        impl Write for MutexBytes {
266            fn flush(&mut self) -> Result<(), std::io::Error> {
267                Write::flush(&mut *self.0.lock())
268            }
269
270            fn write(&mut self, bytes: &[u8]) -> Result<usize, std::io::Error> {
271                Write::write(&mut *self.0.lock(), bytes)
272            }
273        }
274
275        #[test]
276        fn log_buffer_without_timeout() {
277            let mut executor = TestExecutor::new_with_fake_time();
278            let output = Arc::new(Mutex::new(vec![]));
279            let writer = MutexBytes(output.clone());
280            let (log_buffer, mut timeout_task) =
281                StdoutBufferInner::new(std::time::Duration::from_secs(5), writer, 100);
282
283            write!(log_buffer.lock(), "message1").expect("write message");
284            assert_eq!(*output.lock(), b"");
285            write!(log_buffer.lock(), "message2").expect("write message");
286            assert_eq!(*output.lock(), b"");
287
288            assert_eq!(executor.run_until_stalled(&mut timeout_task), Poll::Pending);
289            assert_eq!(*output.lock(), b"");
290
291            log_buffer.lock().flush().expect("flush buffer");
292            assert_eq!(*output.lock(), b"message1message2");
293        }
294
295        #[test]
296        fn log_buffer_flush_on_drop() {
297            let mut executor = TestExecutor::new_with_fake_time();
298            let output = Arc::new(Mutex::new(vec![]));
299            let writer = MutexBytes(output.clone());
300            let (log_buffer, mut timeout_task) =
301                StdoutBufferInner::new(std::time::Duration::from_secs(5), writer, 100);
302
303            write!(log_buffer.lock(), "message1").expect("write message");
304            assert_eq!(*output.lock(), b"");
305            write!(log_buffer.lock(), "message2").expect("write message");
306            assert_eq!(*output.lock(), b"");
307
308            assert_eq!(executor.run_until_stalled(&mut timeout_task), Poll::Pending);
309            assert_eq!(*output.lock(), b"");
310
311            drop(log_buffer);
312            assert_eq!(*output.lock(), b"message1message2");
313        }
314
315        #[test]
316        fn log_buffer_with_timeout() {
317            let mut executor = TestExecutor::new_with_fake_time();
318            let output = Arc::new(Mutex::new(vec![]));
319            let writer = MutexBytes(output.clone());
320            let (log_buffer, mut timeout_task) =
321                StdoutBufferInner::new(std::time::Duration::from_secs(5), writer, 100);
322
323            write!(log_buffer.lock(), "message1").expect("write message");
324            assert_eq!(*output.lock(), b"");
325            write!(log_buffer.lock(), "message2").expect("write message");
326            assert_eq!(*output.lock(), b"");
327
328            assert_eq!(executor.run_until_stalled(&mut timeout_task), Poll::Pending);
329            assert_eq!(*output.lock(), b"");
330
331            executor.set_fake_time(executor.now().add(zx::MonotonicDuration::from_seconds(6)));
332            executor.wake_next_timer();
333            assert_eq!(executor.run_until_stalled(&mut timeout_task), Poll::Ready(()));
334            assert_eq!(*output.lock(), b"message1message2");
335        }
336
337        #[test]
338        fn log_buffer_capacity_reached() {
339            let _executor = TestExecutor::new_with_fake_time();
340            let output = Arc::new(Mutex::new(vec![]));
341            let writer = MutexBytes(output.clone());
342            let (log_buffer, _timeout_task) =
343                StdoutBufferInner::new(std::time::Duration::from_secs(5), writer, 10);
344
345            write!(log_buffer.lock(), "message1").expect("write message");
346            assert_eq!(*output.lock(), b"");
347            write!(log_buffer.lock(), "message2").expect("write message");
348            assert_eq!(*output.lock(), b"message1message2");
349
350            // capacity was reached but buffering is still on, so next msg should buffer
351            write!(log_buffer.lock(), "message1").expect("write message");
352            assert_eq!(*output.lock(), b"message1message2");
353            write!(log_buffer.lock(), "message2").expect("write message");
354            assert_eq!(*output.lock(), b"message1message2message1message2");
355        }
356    }
357}