archivist_lib/logs/
socket.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
3
4use super::stats::LogStreamStats;
5use crate::logs::stored_message::StoredMessage;
6use fuchsia_async as fasync;
7use futures::Stream;
8use std::marker::PhantomData;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::task::{Context, Poll};
12
13/// An `Encoding` is able to parse a `Message` from raw bytes.
14pub trait Encoding {
15    /// Attempt to parse a message from the given buffer
16    fn wrap_bytes(bytes: Vec<u8>, stats: &Arc<LogStreamStats>) -> Option<StoredMessage>;
17}
18
19/// An encoding that can parse the legacy [logger/syslog wire format]
20///
21/// [logger/syslog wire format]: https://fuchsia.googlesource.com/fuchsia/+/HEAD/zircon/system/ulib/syslog/include/lib/syslog/wire_format.h
22#[derive(Clone, Debug)]
23pub struct LegacyEncoding;
24
25impl Encoding for LegacyEncoding {
26    fn wrap_bytes(buf: Vec<u8>, stats: &Arc<LogStreamStats>) -> Option<StoredMessage> {
27        StoredMessage::from_legacy(buf.into_boxed_slice(), stats)
28    }
29}
30
31#[must_use = "don't drop logs on the floor please!"]
32pub struct LogMessageSocket<E> {
33    buffer: Vec<u8>,
34    stats: Arc<LogStreamStats>,
35    socket: fasync::Socket,
36    _encoder: PhantomData<E>,
37}
38
39impl LogMessageSocket<LegacyEncoding> {
40    /// Creates a new `LogMessageSocket` from the given `socket` that reads the legacy format.
41    pub fn new(socket: fasync::Socket, stats: Arc<LogStreamStats>) -> Self {
42        stats.open_socket();
43        Self { socket, stats, _encoder: PhantomData, buffer: Vec::new() }
44    }
45}
46
47impl<E> Stream for LogMessageSocket<E>
48where
49    E: Encoding + Unpin,
50{
51    type Item = Result<StoredMessage, zx::Status>;
52
53    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
54        let this = self.get_mut();
55        loop {
56            match this.socket.poll_datagram(cx, &mut this.buffer) {
57                // If the socket is pending, return Pending.
58                Poll::Pending => return Poll::Pending,
59                // If the socket got a PEER_CLOSED then finalize the stream.
60                Poll::Ready(Err(zx::Status::PEER_CLOSED)) => return Poll::Ready(None),
61                // If the socket got some other error, return that error.
62                Poll::Ready(Err(status)) => return Poll::Ready(Some(Err(status))),
63                // If the socket read 0 bytes, then retry until we get some data or an error. This
64                // can happen when the zx_object_get_info call returns 0 outstanding read bytes,
65                // but by the time we do zx_socket_read there's data available.
66                Poll::Ready(Ok(0)) => continue,
67                // If we got data, then return the data we read.
68                Poll::Ready(Ok(_len)) => {
69                    let buf = std::mem::take(&mut this.buffer);
70                    let Some(msg) = E::wrap_bytes(buf, &this.stats) else {
71                        continue;
72                    };
73                    return Poll::Ready(Some(Ok(msg)));
74                }
75            }
76        }
77    }
78}
79
80impl<E> Drop for LogMessageSocket<E> {
81    fn drop(&mut self) {
82        self.stats.close_socket();
83    }
84}
85
86#[cfg(test)]
87mod tests {
88    use super::*;
89    use crate::testing::TEST_IDENTITY;
90    use diagnostics_data::Severity;
91    use diagnostics_message::fx_log_packet_t;
92    use futures::StreamExt;
93
94    #[fasync::run_until_stalled(test)]
95    async fn logger_stream_test() {
96        let (sin, sout) = zx::Socket::create_datagram();
97        let mut packet: fx_log_packet_t = Default::default();
98        packet.metadata.pid = 1;
99        packet.metadata.severity = 0x30; // INFO
100        packet.data[0] = 5;
101        packet.fill_data(1..6, b'A' as _);
102        packet.fill_data(7..12, b'B' as _);
103
104        let socket = fasync::Socket::from_socket(sout);
105        let mut ls = LogMessageSocket::new(socket, Default::default());
106        sin.write(packet.as_bytes()).unwrap();
107        let expected_p = diagnostics_data::LogsDataBuilder::new(diagnostics_data::BuilderArgs {
108            timestamp: zx::BootInstant::from_nanos(packet.metadata.time),
109            component_url: Some(TEST_IDENTITY.url.clone()),
110            moniker: TEST_IDENTITY.moniker.clone(),
111            severity: Severity::Info,
112        })
113        .set_pid(packet.metadata.pid)
114        .set_tid(packet.metadata.tid)
115        .add_tag("AAAAA")
116        .set_message("BBBBB".to_string())
117        .build();
118
119        let bytes = ls.next().await.unwrap().unwrap();
120        let result_message = bytes.parse(&TEST_IDENTITY).unwrap();
121        assert_eq!(result_message, expected_p);
122
123        // write one more time
124        sin.write(packet.as_bytes()).unwrap();
125
126        let result_message = ls.next().await.unwrap().unwrap().parse(&TEST_IDENTITY).unwrap();
127        assert_eq!(result_message, expected_p);
128    }
129}