archivist_lib/logs/
listener.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
3
4use diagnostics_data::LogsData;
5use diagnostics_message::error::MessageError;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_logger::{
8    LogFilterOptions, LogListenerSafeMarker, LogListenerSafeProxy, LogMessage,
9};
10use futures::prelude::*;
11use log::{debug, error, trace};
12use logmessage_measure_tape::Measurable as _;
13use std::sync::Arc;
14use std::task::Poll;
15use thiserror::Error;
16
17mod filter;
18
19use filter::MessageFilter;
20
21// Number of bytes the header of a vector occupies in a fidl message.
22const FIDL_VECTOR_HEADER_BYTES: usize = 16;
23
24/// An individual log listener. Wraps the FIDL type `LogListenerProxy` in filtering options provided
25/// when connecting.
26pub struct Listener {
27    listener: LogListenerSafeProxy,
28    filter: MessageFilter,
29    status: Status,
30}
31
32#[derive(Debug, PartialEq)]
33enum Status {
34    Fine,
35    Stale,
36}
37
38fn is_valid(message: &LogMessage) -> bool {
39    // Check that the tags fit in FIDL.
40    if message.tags.len() > fidl_fuchsia_logger::MAX_TAGS.into() {
41        debug!("Unable to encode message, it exceeded our MAX_TAGS");
42        return false;
43    }
44    for tag in &message.tags {
45        if tag.len() > fidl_fuchsia_logger::MAX_TAG_LEN_BYTES.into() {
46            debug!("Unable to encode message, it exceeded our MAX_TAG_LEN_BYTES");
47            return false;
48        }
49    }
50
51    // If a message by itself is too big to fit into fidl, warn and skip.
52    let msg_size = message.measure().num_bytes;
53    if msg_size + FIDL_VECTOR_HEADER_BYTES > fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize {
54        debug!("Unable to encode message, it exceeded our MAX_LOG_MANY_SIZE_BYTES by itself.");
55        return false;
56    }
57    true
58}
59
60impl Listener {
61    /// Create a new `Listener`. Fails if `client` can't be converted into a `LogListenerProxy` or
62    /// if `LogFilterOptions` are invalid.
63    pub fn new(
64        log_listener: ClientEnd<LogListenerSafeMarker>,
65        options: Option<Box<LogFilterOptions>>,
66    ) -> Result<Self, ListenerError> {
67        debug!("New listener with options {:?}", &options);
68        Ok(Self {
69            status: Status::Fine,
70            listener: log_listener.into_proxy(),
71            filter: MessageFilter::new(options)?,
72        })
73    }
74
75    /// Send messages to the listener. First eagerly collects any backlog and sends it out in
76    /// batches before waiting for wakeups.
77    pub async fn run(mut self, mut logs: impl Stream<Item = Arc<LogsData>> + Unpin) {
78        debug!("Backfilling from cursor until pending.");
79        let mut backlog = vec![];
80        futures::future::poll_fn(|cx| {
81            while let Poll::Ready(Some(next)) = logs.poll_next_unpin(cx) {
82                backlog.push(next);
83            }
84
85            Poll::Ready(())
86        })
87        .await;
88
89        self.backfill(backlog).await;
90        debug!("Done backfilling.");
91        if !self.is_healthy() {
92            return;
93        }
94
95        self.send_new_logs(logs).await;
96        debug!("Listener exiting.");
97    }
98
99    /// Returns whether this listener should continue receiving messages.
100    fn is_healthy(&self) -> bool {
101        self.status == Status::Fine
102    }
103
104    async fn send_new_logs<S>(&mut self, mut logs: S)
105    where
106        S: Stream<Item = Arc<LogsData>> + Unpin,
107    {
108        while let Some(message) = logs.next().await {
109            self.send_log(&message).await;
110            if !self.is_healthy() {
111                break;
112            }
113        }
114    }
115
116    /// Send all messages currently in the provided buffer to this listener. Attempts to batch up
117    /// to the message size limit. Returns early if the listener appears to be unhealthy.
118    async fn backfill(&mut self, mut messages: Vec<Arc<LogsData>>) {
119        messages.sort_by_key(|m| m.metadata.timestamp);
120
121        // Initialize batch size to the size of the vector header.
122        let mut batch_size = FIDL_VECTOR_HEADER_BYTES;
123        let mut filtered_batch = vec![];
124        for msg in messages {
125            if self.filter.should_send(&msg) {
126                // Convert archivist-encoded log message to legacy format expected
127                // by the listener, then use measure_tape to get true size.
128                let legacy_msg: LogMessage = msg.as_ref().into();
129                let msg_size = legacy_msg.measure().num_bytes;
130
131                if !is_valid(&legacy_msg) {
132                    continue;
133                }
134
135                if batch_size + msg_size > fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize {
136                    self.send_filtered_logs(&filtered_batch).await;
137                    if !self.is_healthy() {
138                        return;
139                    }
140                    filtered_batch.clear();
141                    batch_size = FIDL_VECTOR_HEADER_BYTES;
142                }
143
144                batch_size += msg_size;
145                filtered_batch.push(legacy_msg);
146            }
147        }
148
149        if !filtered_batch.is_empty() {
150            self.send_filtered_logs(&filtered_batch).await;
151        }
152    }
153
154    /// Send a batch of pre-filtered log messages to this listener.
155    async fn send_filtered_logs(&mut self, log_messages: &[LogMessage]) {
156        trace!("Flushing batch.");
157        self.check_result(self.listener.log_many(log_messages).await);
158    }
159
160    /// Send a single log message if it should be sent according to this listener's filter settings.
161    async fn send_log(&mut self, log_message: &LogsData) {
162        if self.filter.should_send(log_message) {
163            let to_send: LogMessage = log_message.into();
164            if !is_valid(&to_send) {
165                return;
166            }
167            self.check_result(self.listener.log(&to_send).await);
168        }
169    }
170
171    /// Consume the result of sending logs to this listener, potentially marking it stale.
172    fn check_result(&mut self, result: Result<(), fidl::Error>) {
173        if let Err(e) = result {
174            if e.is_closed() {
175                self.status = Status::Stale;
176            } else {
177                error!(e:?; "Error calling listener");
178            }
179        }
180    }
181}
182
183#[derive(Debug, Error)]
184pub enum ListenerError {
185    #[error("{count} tags provided, max {}", fidl_fuchsia_logger::MAX_TAGS)]
186    TooManyTags { count: usize },
187
188    #[error("tag at index {index} is too long, max {}", fidl_fuchsia_logger::MAX_TAG_LEN_BYTES)]
189    TagTooLong { index: usize },
190
191    #[error("couldn't create LogListenerProxy")]
192    CreatingListenerProxy { source: fidl::Error },
193
194    #[error("couldn't decode value: {source}")]
195    Decode {
196        #[from]
197        source: MessageError,
198    },
199
200    #[error("error while forwarding unsafe log requests: {source}")]
201    AsbestosIo { source: fidl::Error },
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207    use crate::identity::ComponentIdentity;
208    use diagnostics_message::{LoggerMessage, METADATA_SIZE, fx_log_packet_t};
209    use fidl::endpoints::ServerEnd;
210    use fidl_fuchsia_logger::{LogLevelFilter, LogListenerSafeRequest};
211    use fuchsia_async as fasync;
212    use libc::c_char;
213    use moniker::ExtendedMoniker;
214
215    #[fuchsia::test]
216    async fn normal_behavior_test() {
217        let message_vec =
218            provide_messages(fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize, 4);
219
220        assert_eq!(run_and_consume_backfill(message_vec).await, 4);
221    }
222
223    #[fuchsia::test]
224    async fn packet_fits_but_converted_struct_would_cause_overflow_test() {
225        let message_vec =
226            provide_messages(fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize, 1);
227
228        assert_eq!(run_and_consume_backfill(message_vec).await, 0);
229    }
230
231    #[fuchsia::test]
232    async fn one_packet_would_overflow_but_others_fit_test() {
233        let mut message_vec =
234            provide_messages(fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize, 1);
235
236        message_vec.append(&mut provide_messages(
237            fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize,
238            4,
239        ));
240
241        assert_eq!(run_and_consume_backfill(message_vec).await, 4);
242    }
243
244    #[fuchsia::test]
245    async fn verify_client_disconnect() {
246        let message_vec =
247            provide_messages(fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize, 3);
248        let logs = stream::iter(message_vec);
249
250        let (client_end, mut requests) =
251            fidl::endpoints::create_request_stream::<LogListenerSafeMarker>();
252        let mut listener = Listener::new(client_end, None).unwrap();
253
254        let listener_task = fasync::Task::spawn(async move {
255            listener.send_new_logs(logs).await;
256        });
257
258        match requests.next().await.unwrap() {
259            Ok(LogListenerSafeRequest::Log { log: _, responder }) => {
260                responder.send().unwrap();
261            }
262            other => panic!("Unexpected request: {other:?}"),
263        }
264        drop(requests);
265
266        // The task should finish since the `LogListenerSafe` server disconnected.
267        listener_task.await;
268    }
269
270    async fn run_and_consume_backfill(message_vec: Vec<Arc<LogsData>>) -> usize {
271        let (client, server) = zx::Channel::create();
272        let client_end = ClientEnd::<LogListenerSafeMarker>::new(client);
273        let mut listener_server = ServerEnd::<LogListenerSafeMarker>::new(server).into_stream();
274        let mut listener = Listener::new(client_end, None).unwrap();
275
276        fasync::Task::spawn(async move {
277            listener.backfill(message_vec).await;
278        })
279        .detach();
280
281        let mut observed_logs: usize = 0;
282        while let Some(req) = listener_server.try_next().await.unwrap() {
283            match req {
284                LogListenerSafeRequest::LogMany { log, responder } => {
285                    observed_logs += log.len();
286                    responder.send().unwrap();
287                }
288                _ => panic!("only testing backfill mode."),
289            }
290        }
291
292        observed_logs
293    }
294
295    fn provide_messages(summed_msg_size_bytes: usize, num_messages: usize) -> Vec<Arc<LogsData>> {
296        let per_msg_size = summed_msg_size_bytes / num_messages;
297        let mut message_vec = Vec::new();
298        for _ in 0..num_messages {
299            let byte_encoding = generate_byte_encoded_log(per_msg_size);
300            message_vec.push(Arc::new(diagnostics_message::from_logger(
301                get_test_identity().into(),
302                LoggerMessage::try_from(byte_encoding.as_bytes()).unwrap(),
303            )))
304        }
305
306        message_vec
307    }
308
309    // Generate an fx log packet of a target size with size split between tags and data.
310    fn generate_byte_encoded_log(target_size: usize) -> fx_log_packet_t {
311        let mut test_packet = test_packet();
312        let data_size = target_size - METADATA_SIZE;
313        let tag_size =
314            core::cmp::min(data_size / 2, fidl_fuchsia_logger::MAX_TAG_LEN_BYTES as usize);
315        let message_size = data_size - tag_size;
316
317        populate_packet(&mut test_packet, tag_size, message_size);
318        test_packet
319    }
320
321    fn test_packet() -> fx_log_packet_t {
322        let mut packet: fx_log_packet_t = Default::default();
323        packet.metadata.pid = 1;
324        packet.metadata.tid = 2;
325        packet.metadata.time = 3;
326        packet.metadata.severity = LogLevelFilter::Debug as i32;
327        packet.metadata.dropped_logs = 10;
328        packet
329    }
330
331    fn populate_packet(packet: &mut fx_log_packet_t, tag_count: usize, message_size: usize) {
332        let tag_start = 1;
333        let tag_end = tag_start + tag_count;
334
335        packet.data[0] = tag_count as c_char;
336        packet.fill_data(tag_start..tag_end, b'T' as _);
337        packet.data[tag_end] = 0; // terminate tags
338
339        let message_start = tag_start + tag_count + 1;
340        let message_end = message_start + message_size;
341        packet.fill_data(message_start..message_end, b'D' as _);
342    }
343
344    fn get_test_identity() -> ComponentIdentity {
345        ComponentIdentity::new(
346            ExtendedMoniker::parse_str("./fake-test-env/bleebloo").unwrap(),
347            "fuchsia-pkg://fuchsia.com/testing123#test-component.cm",
348        )
349    }
350}