archivist_lib/logs/
socket.rs
1use super::stats::LogStreamStats;
5use crate::logs::stored_message::StoredMessage;
6use fuchsia_async as fasync;
7use futures::Stream;
8use std::marker::PhantomData;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::task::{Context, Poll};
12
13pub trait Encoding {
15 fn wrap_bytes(bytes: Vec<u8>, stats: &Arc<LogStreamStats>) -> Option<StoredMessage>;
17}
18
19#[derive(Clone, Debug)]
23pub struct LegacyEncoding;
24
25impl Encoding for LegacyEncoding {
26 fn wrap_bytes(buf: Vec<u8>, stats: &Arc<LogStreamStats>) -> Option<StoredMessage> {
27 StoredMessage::from_legacy(buf.into_boxed_slice(), stats)
28 }
29}
30
31#[must_use = "don't drop logs on the floor please!"]
32pub struct LogMessageSocket<E> {
33 buffer: Vec<u8>,
34 stats: Arc<LogStreamStats>,
35 socket: fasync::Socket,
36 _encoder: PhantomData<E>,
37}
38
39impl LogMessageSocket<LegacyEncoding> {
40 pub fn new(socket: fasync::Socket, stats: Arc<LogStreamStats>) -> Self {
42 stats.open_socket();
43 Self { socket, stats, _encoder: PhantomData, buffer: Vec::new() }
44 }
45}
46
47impl<E> Stream for LogMessageSocket<E>
48where
49 E: Encoding + Unpin,
50{
51 type Item = Result<StoredMessage, zx::Status>;
52
53 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
54 let this = self.get_mut();
55 loop {
56 match this.socket.poll_datagram(cx, &mut this.buffer) {
57 Poll::Pending => return Poll::Pending,
59 Poll::Ready(Err(zx::Status::PEER_CLOSED)) => return Poll::Ready(None),
61 Poll::Ready(Err(status)) => return Poll::Ready(Some(Err(status))),
63 Poll::Ready(Ok(0)) => continue,
67 Poll::Ready(Ok(_len)) => {
69 let buf = std::mem::take(&mut this.buffer);
70 let Some(msg) = E::wrap_bytes(buf, &this.stats) else {
71 continue;
72 };
73 return Poll::Ready(Some(Ok(msg)));
74 }
75 }
76 }
77 }
78}
79
80impl<E> Drop for LogMessageSocket<E> {
81 fn drop(&mut self) {
82 self.stats.close_socket();
83 }
84}
85
86#[cfg(test)]
87mod tests {
88 use super::*;
89 use crate::testing::TEST_IDENTITY;
90 use diagnostics_data::Severity;
91 use diagnostics_message::fx_log_packet_t;
92 use futures::StreamExt;
93
94 #[fasync::run_until_stalled(test)]
95 async fn logger_stream_test() {
96 let (sin, sout) = zx::Socket::create_datagram();
97 let mut packet: fx_log_packet_t = Default::default();
98 packet.metadata.pid = 1;
99 packet.metadata.severity = 0x30; packet.data[0] = 5;
101 packet.fill_data(1..6, b'A' as _);
102 packet.fill_data(7..12, b'B' as _);
103
104 let socket = fasync::Socket::from_socket(sout);
105 let mut ls = LogMessageSocket::new(socket, Default::default());
106 sin.write(packet.as_bytes()).unwrap();
107 let expected_p = diagnostics_data::LogsDataBuilder::new(diagnostics_data::BuilderArgs {
108 timestamp: zx::BootInstant::from_nanos(packet.metadata.time),
109 component_url: Some(TEST_IDENTITY.url.clone()),
110 moniker: TEST_IDENTITY.moniker.clone(),
111 severity: Severity::Info,
112 })
113 .set_pid(packet.metadata.pid)
114 .set_tid(packet.metadata.tid)
115 .add_tag("AAAAA")
116 .set_message("BBBBB".to_string())
117 .build();
118
119 let bytes = ls.next().await.unwrap().unwrap();
120 let result_message = bytes.parse(&TEST_IDENTITY).unwrap();
121 assert_eq!(result_message, expected_p);
122
123 sin.write(packet.as_bytes()).unwrap();
125
126 let result_message = ls.next().await.unwrap().unwrap().parse(&TEST_IDENTITY).unwrap();
127 assert_eq!(result_message, expected_p);
128 }
129}