archivist_lib/logs/
listener.rs1use diagnostics_data::LogsData;
5use diagnostics_message::error::MessageError;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_logger::{
8 LogFilterOptions, LogListenerSafeMarker, LogListenerSafeProxy, LogMessage,
9};
10use futures::prelude::*;
11use log::{debug, error, trace};
12use logmessage_measure_tape::Measurable as _;
13use std::sync::Arc;
14use std::task::Poll;
15use thiserror::Error;
16
17mod filter;
18
19use filter::MessageFilter;
20
21const FIDL_VECTOR_HEADER_BYTES: usize = 16;
23
24pub struct Listener {
27 listener: LogListenerSafeProxy,
28 filter: MessageFilter,
29 status: Status,
30}
31
32#[derive(Debug, PartialEq)]
33enum Status {
34 Fine,
35 Stale,
36}
37
38fn is_valid(message: &LogMessage) -> bool {
39 if message.tags.len() > fidl_fuchsia_logger::MAX_TAGS.into() {
41 debug!("Unable to encode message, it exceeded our MAX_TAGS");
42 return false;
43 }
44 for tag in &message.tags {
45 if tag.len() > fidl_fuchsia_logger::MAX_TAG_LEN_BYTES.into() {
46 debug!("Unable to encode message, it exceeded our MAX_TAG_LEN_BYTES");
47 return false;
48 }
49 }
50
51 let msg_size = message.measure().num_bytes;
53 if msg_size + FIDL_VECTOR_HEADER_BYTES > fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize {
54 debug!("Unable to encode message, it exceeded our MAX_LOG_MANY_SIZE_BYTES by itself.");
55 return false;
56 }
57 true
58}
59
60impl Listener {
61 pub fn new(
64 log_listener: ClientEnd<LogListenerSafeMarker>,
65 options: Option<Box<LogFilterOptions>>,
66 ) -> Result<Self, ListenerError> {
67 debug!("New listener with options {:?}", &options);
68 Ok(Self {
69 status: Status::Fine,
70 listener: log_listener.into_proxy(),
71 filter: MessageFilter::new(options)?,
72 })
73 }
74
75 pub async fn run(mut self, mut logs: impl Stream<Item = Arc<LogsData>> + Unpin) {
78 debug!("Backfilling from cursor until pending.");
79 let mut backlog = vec![];
80 futures::future::poll_fn(|cx| {
81 while let Poll::Ready(Some(next)) = logs.poll_next_unpin(cx) {
82 backlog.push(next);
83 }
84
85 Poll::Ready(())
86 })
87 .await;
88
89 self.backfill(backlog).await;
90 debug!("Done backfilling.");
91 if !self.is_healthy() {
92 return;
93 }
94
95 self.send_new_logs(logs).await;
96 debug!("Listener exiting.");
97 }
98
99 fn is_healthy(&self) -> bool {
101 self.status == Status::Fine
102 }
103
104 async fn send_new_logs<S>(&mut self, mut logs: S)
105 where
106 S: Stream<Item = Arc<LogsData>> + Unpin,
107 {
108 while let Some(message) = logs.next().await {
109 self.send_log(&message).await;
110 if !self.is_healthy() {
111 break;
112 }
113 }
114 }
115
116 async fn backfill(&mut self, mut messages: Vec<Arc<LogsData>>) {
119 messages.sort_by_key(|m| m.metadata.timestamp);
120
121 let mut batch_size = FIDL_VECTOR_HEADER_BYTES;
123 let mut filtered_batch = vec![];
124 for msg in messages {
125 if self.filter.should_send(&msg) {
126 let legacy_msg: LogMessage = msg.as_ref().into();
129 let msg_size = legacy_msg.measure().num_bytes;
130
131 if !is_valid(&legacy_msg) {
132 continue;
133 }
134
135 if batch_size + msg_size > fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize {
136 self.send_filtered_logs(&filtered_batch).await;
137 if !self.is_healthy() {
138 return;
139 }
140 filtered_batch.clear();
141 batch_size = FIDL_VECTOR_HEADER_BYTES;
142 }
143
144 batch_size += msg_size;
145 filtered_batch.push(legacy_msg);
146 }
147 }
148
149 if !filtered_batch.is_empty() {
150 self.send_filtered_logs(&filtered_batch).await;
151 }
152 }
153
154 async fn send_filtered_logs(&mut self, log_messages: &[LogMessage]) {
156 trace!("Flushing batch.");
157 self.check_result(self.listener.log_many(log_messages).await);
158 }
159
160 async fn send_log(&mut self, log_message: &LogsData) {
162 if self.filter.should_send(log_message) {
163 let to_send: LogMessage = log_message.into();
164 if !is_valid(&to_send) {
165 return;
166 }
167 self.check_result(self.listener.log(&to_send).await);
168 }
169 }
170
171 fn check_result(&mut self, result: Result<(), fidl::Error>) {
173 if let Err(e) = result {
174 if e.is_closed() {
175 self.status = Status::Stale;
176 } else {
177 error!(e:?; "Error calling listener");
178 }
179 }
180 }
181}
182
183#[derive(Debug, Error)]
184pub enum ListenerError {
185 #[error("{count} tags provided, max {}", fidl_fuchsia_logger::MAX_TAGS)]
186 TooManyTags { count: usize },
187
188 #[error("tag at index {index} is too long, max {}", fidl_fuchsia_logger::MAX_TAG_LEN_BYTES)]
189 TagTooLong { index: usize },
190
191 #[error("couldn't create LogListenerProxy")]
192 CreatingListenerProxy { source: fidl::Error },
193
194 #[error("couldn't decode value: {source}")]
195 Decode {
196 #[from]
197 source: MessageError,
198 },
199
200 #[error("error while forwarding unsafe log requests: {source}")]
201 AsbestosIo { source: fidl::Error },
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207 use crate::identity::ComponentIdentity;
208 use diagnostics_message::{LoggerMessage, METADATA_SIZE, fx_log_packet_t};
209 use fidl::endpoints::ServerEnd;
210 use fidl_fuchsia_logger::{LogLevelFilter, LogListenerSafeRequest};
211 use fuchsia_async as fasync;
212 use libc::c_char;
213 use moniker::ExtendedMoniker;
214
215 #[fuchsia::test]
216 async fn normal_behavior_test() {
217 let message_vec =
218 provide_messages(fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize, 4);
219
220 assert_eq!(run_and_consume_backfill(message_vec).await, 4);
221 }
222
223 #[fuchsia::test]
224 async fn packet_fits_but_converted_struct_would_cause_overflow_test() {
225 let message_vec =
226 provide_messages(fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize, 1);
227
228 assert_eq!(run_and_consume_backfill(message_vec).await, 0);
229 }
230
231 #[fuchsia::test]
232 async fn one_packet_would_overflow_but_others_fit_test() {
233 let mut message_vec =
234 provide_messages(fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize, 1);
235
236 message_vec.append(&mut provide_messages(
237 fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize,
238 4,
239 ));
240
241 assert_eq!(run_and_consume_backfill(message_vec).await, 4);
242 }
243
244 #[fuchsia::test]
245 async fn verify_client_disconnect() {
246 let message_vec =
247 provide_messages(fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize, 3);
248 let logs = stream::iter(message_vec);
249
250 let (client_end, mut requests) =
251 fidl::endpoints::create_request_stream::<LogListenerSafeMarker>();
252 let mut listener = Listener::new(client_end, None).unwrap();
253
254 let listener_task = fasync::Task::spawn(async move {
255 listener.send_new_logs(logs).await;
256 });
257
258 match requests.next().await.unwrap() {
259 Ok(LogListenerSafeRequest::Log { log: _, responder }) => {
260 responder.send().unwrap();
261 }
262 other => panic!("Unexpected request: {other:?}"),
263 }
264 drop(requests);
265
266 listener_task.await;
268 }
269
270 async fn run_and_consume_backfill(message_vec: Vec<Arc<LogsData>>) -> usize {
271 let (client, server) = zx::Channel::create();
272 let client_end = ClientEnd::<LogListenerSafeMarker>::new(client);
273 let mut listener_server = ServerEnd::<LogListenerSafeMarker>::new(server).into_stream();
274 let mut listener = Listener::new(client_end, None).unwrap();
275
276 fasync::Task::spawn(async move {
277 listener.backfill(message_vec).await;
278 })
279 .detach();
280
281 let mut observed_logs: usize = 0;
282 while let Some(req) = listener_server.try_next().await.unwrap() {
283 match req {
284 LogListenerSafeRequest::LogMany { log, responder } => {
285 observed_logs += log.len();
286 responder.send().unwrap();
287 }
288 _ => panic!("only testing backfill mode."),
289 }
290 }
291
292 observed_logs
293 }
294
295 fn provide_messages(summed_msg_size_bytes: usize, num_messages: usize) -> Vec<Arc<LogsData>> {
296 let per_msg_size = summed_msg_size_bytes / num_messages;
297 let mut message_vec = Vec::new();
298 for _ in 0..num_messages {
299 let byte_encoding = generate_byte_encoded_log(per_msg_size);
300 message_vec.push(Arc::new(diagnostics_message::from_logger(
301 get_test_identity().into(),
302 LoggerMessage::try_from(byte_encoding.as_bytes()).unwrap(),
303 )))
304 }
305
306 message_vec
307 }
308
309 fn generate_byte_encoded_log(target_size: usize) -> fx_log_packet_t {
311 let mut test_packet = test_packet();
312 let data_size = target_size - METADATA_SIZE;
313 let tag_size =
314 core::cmp::min(data_size / 2, fidl_fuchsia_logger::MAX_TAG_LEN_BYTES as usize);
315 let message_size = data_size - tag_size;
316
317 populate_packet(&mut test_packet, tag_size, message_size);
318 test_packet
319 }
320
321 fn test_packet() -> fx_log_packet_t {
322 let mut packet: fx_log_packet_t = Default::default();
323 packet.metadata.pid = 1;
324 packet.metadata.tid = 2;
325 packet.metadata.time = 3;
326 packet.metadata.severity = LogLevelFilter::Debug as i32;
327 packet.metadata.dropped_logs = 10;
328 packet
329 }
330
331 fn populate_packet(packet: &mut fx_log_packet_t, tag_count: usize, message_size: usize) {
332 let tag_start = 1;
333 let tag_end = tag_start + tag_count;
334
335 packet.data[0] = tag_count as c_char;
336 packet.fill_data(tag_start..tag_end, b'T' as _);
337 packet.data[tag_end] = 0; let message_start = tag_start + tag_count + 1;
340 let message_end = message_start + message_size;
341 packet.fill_data(message_start..message_end, b'D' as _);
342 }
343
344 fn get_test_identity() -> ComponentIdentity {
345 ComponentIdentity::new(
346 ExtendedMoniker::parse_str("./fake-test-env/bleebloo").unwrap(),
347 "fuchsia-pkg://fuchsia.com/testing123#test-component.cm",
348 )
349 }
350}