diagnostics_log/fuchsia/
filter.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
3
4use crate::OnInterestChanged;
5use diagnostics_log_encoding::encode::TestRecord;
6use diagnostics_log_types::Severity;
7use fidl_fuchsia_logger::LogSinkProxy;
8use fuchsia_sync::Mutex;
9
10#[cfg(fuchsia_api_level_less_than = "27")]
11use fidl_fuchsia_diagnostics as fdiagnostics;
12#[cfg(fuchsia_api_level_at_least = "27")]
13use fidl_fuchsia_diagnostics_types as fdiagnostics;
14
15pub(crate) struct InterestFilter {
16    default_severity: Severity,
17    listener: Mutex<Option<Box<dyn OnInterestChanged + Send + Sync + 'static>>>,
18}
19
20impl InterestFilter {
21    /// Returns a new `InterestFilter` with the default interest.
22    ///
23    /// NOTE: This does not update the global maximum log level because some callers don't want
24    /// that. Calling `update_interest` *will* update the global maximum log level.
25    pub fn new(default_interest: fdiagnostics::Interest) -> Self {
26        Self {
27            default_severity: default_interest.min_severity.map_or(Severity::Info, Severity::from),
28            listener: Mutex::default(),
29        }
30    }
31
32    /// Sets the interest listener.
33    pub fn set_interest_listener<T>(&self, listener: T)
34    where
35        T: OnInterestChanged + Send + Sync + 'static,
36    {
37        let mut listener_guard = self.listener.lock();
38        *listener_guard = Some(Box::new(listener));
39    }
40
41    /// Listen for interest updates.
42    pub async fn listen_for_interest_updates(&self, proxy: LogSinkProxy) {
43        while let Ok(Ok(interest)) = proxy.wait_for_interest_change().await {
44            self.update_interest(interest);
45        }
46    }
47
48    /// Updates the global interest.
49    pub fn update_interest(&self, interest: fdiagnostics::Interest) {
50        let new_min_severity = interest.min_severity.map_or(self.default_severity, Severity::from);
51        log::set_max_level(new_min_severity.into());
52        let callback_guard = self.listener.lock();
53        if let Some(callback) = &*callback_guard {
54            callback.on_changed(new_min_severity);
55        }
56    }
57
58    pub fn enabled_for_testing(&self, record: &TestRecord<'_>) -> bool {
59        let min_severity = Severity::try_from(log::max_level()).map(|s| s as u8).unwrap_or(u8::MAX);
60        min_severity <= record.severity
61    }
62}
63
64#[cfg(test)]
65mod tests {
66    use super::*;
67    use fidl::endpoints::create_proxy_and_stream;
68    use fidl_fuchsia_logger::{LogSinkMarker, LogSinkRequest, LogSinkRequestStream};
69    use futures::channel::mpsc;
70    use futures::{StreamExt, TryStreamExt};
71    use log::{debug, error, info, trace, warn};
72    use std::sync::Arc;
73
74    struct SeverityTracker {
75        _filter: Arc<InterestFilter>,
76        severity_counts: Arc<Mutex<SeverityCount>>,
77    }
78
79    impl log::Log for SeverityTracker {
80        fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
81            true
82        }
83
84        fn log(&self, record: &log::Record<'_>) {
85            let mut count = self.severity_counts.lock();
86            let to_increment = match record.level() {
87                log::Level::Trace => &mut count.trace,
88                log::Level::Debug => &mut count.debug,
89                log::Level::Info => &mut count.info,
90                log::Level::Warn => &mut count.warn,
91                log::Level::Error => &mut count.error,
92            };
93            *to_increment += 1;
94        }
95
96        fn flush(&self) {}
97    }
98
99    #[derive(Debug, Default, Eq, PartialEq)]
100    struct SeverityCount {
101        trace: u64,
102        debug: u64,
103        info: u64,
104        warn: u64,
105        error: u64,
106    }
107
108    struct InterestChangedListener(mpsc::UnboundedSender<()>);
109
110    impl OnInterestChanged for InterestChangedListener {
111        fn on_changed(&self, _: crate::Severity) {
112            self.0.unbounded_send(()).unwrap();
113        }
114    }
115
116    #[fuchsia::test(logging = false)]
117    async fn default_filter_is_info_when_unspecified() {
118        let filter = Arc::new(InterestFilter::new(fdiagnostics::Interest::default()));
119        filter.update_interest(fdiagnostics::Interest::default());
120        let observed = Arc::new(Mutex::new(SeverityCount::default()));
121        log::set_boxed_logger(Box::new(SeverityTracker {
122            severity_counts: observed.clone(),
123            _filter: filter,
124        }))
125        .unwrap();
126        let mut expected = SeverityCount::default();
127
128        error!("oops");
129        expected.error += 1;
130        assert_eq!(&*observed.lock(), &expected);
131
132        warn!("maybe");
133        expected.warn += 1;
134        assert_eq!(&*observed.lock(), &expected);
135
136        info!("ok");
137        expected.info += 1;
138        assert_eq!(&*observed.lock(), &expected);
139
140        debug!("hint");
141        assert_eq!(&*observed.lock(), &expected, "should not increment counters");
142
143        trace!("spew");
144        assert_eq!(&*observed.lock(), &expected, "should not increment counters");
145    }
146
147    async fn send_interest_change(stream: &mut LogSinkRequestStream, severity: Option<Severity>) {
148        match stream.try_next().await {
149            Ok(Some(LogSinkRequest::WaitForInterestChange { responder })) => {
150                responder
151                    .send(Ok(&fdiagnostics::Interest {
152                        min_severity: severity.map(fdiagnostics::Severity::from),
153                        ..Default::default()
154                    }))
155                    .expect("send response");
156            }
157            other => panic!("Expected WaitForInterestChange but got {:?}", other),
158        }
159    }
160
161    #[fuchsia::test(logging = false)]
162    async fn default_filter_on_interest_changed() {
163        let (proxy, mut requests) = create_proxy_and_stream::<LogSinkMarker>();
164        let filter = Arc::new(InterestFilter::new(fdiagnostics::Interest {
165            min_severity: Some(fdiagnostics::Severity::Warn),
166            ..Default::default()
167        }));
168        let (send, mut recv) = mpsc::unbounded();
169        filter.set_interest_listener(InterestChangedListener(send));
170        let _on_changes_task = fuchsia_async::Task::spawn({
171            let filter = filter.clone();
172            async move { filter.listen_for_interest_updates(proxy).await }
173        });
174        let observed = Arc::new(Mutex::new(SeverityCount::default()));
175        log::set_boxed_logger(Box::new(SeverityTracker {
176            severity_counts: observed.clone(),
177            _filter: filter,
178        }))
179        .expect("set logger");
180
181        // After overriding to info, filtering is at info level. The mpsc channel is used to
182        // get a signal as to when the filter has processed the update.
183        send_interest_change(&mut requests, Some(Severity::Info)).await;
184        recv.next().await.unwrap();
185
186        let mut expected = SeverityCount::default();
187        error!("oops");
188        expected.error += 1;
189        assert_eq!(&*observed.lock(), &expected);
190
191        warn!("maybe");
192        expected.warn += 1;
193        assert_eq!(&*observed.lock(), &expected);
194
195        info!("ok");
196        expected.info += 1;
197        assert_eq!(&*observed.lock(), &expected);
198
199        debug!("hint");
200        assert_eq!(&*observed.lock(), &expected, "should not increment counters");
201
202        trace!("spew");
203        assert_eq!(&*observed.lock(), &expected, "should not increment counters");
204
205        // After resetting to default, filtering is at warn level.
206        send_interest_change(&mut requests, None).await;
207        recv.next().await.unwrap();
208
209        error!("oops");
210        expected.error += 1;
211        assert_eq!(&*observed.lock(), &expected);
212
213        warn!("maybe");
214        expected.warn += 1;
215        assert_eq!(&*observed.lock(), &expected);
216
217        info!("ok");
218        assert_eq!(&*observed.lock(), &expected, "should not increment counters");
219
220        debug!("hint");
221        assert_eq!(&*observed.lock(), &expected, "should not increment counters");
222
223        trace!("spew");
224        assert_eq!(&*observed.lock(), &expected, "should not increment counters");
225    }
226
227    #[fuchsia::test(logging = false)]
228    async fn log_frontend_tracks_severity() {
229        // Manually set to a known value.
230        log::set_max_level(log::LevelFilter::Off);
231
232        let (proxy, mut requests) = create_proxy_and_stream::<LogSinkMarker>();
233        let filter = Arc::new(InterestFilter::new(fdiagnostics::Interest {
234            min_severity: Some(fdiagnostics::Severity::Warn),
235            ..Default::default()
236        }));
237        // The filter shouldn't set the global level until it receives an interest update.
238        assert_eq!(log::max_level(), log::LevelFilter::Off);
239        assert_eq!(filter.default_severity, Severity::Warn);
240
241        let (send, mut recv) = mpsc::unbounded();
242        filter.set_interest_listener(InterestChangedListener(send));
243        let _on_changes_task = fuchsia_async::Task::spawn(async move {
244            filter.listen_for_interest_updates(proxy).await;
245        });
246
247        send_interest_change(&mut requests, Some(Severity::Trace)).await;
248        recv.next().await.unwrap();
249        assert_eq!(log::max_level(), log::LevelFilter::Trace);
250
251        send_interest_change(&mut requests, Some(Severity::Info)).await;
252        recv.next().await.unwrap();
253        assert_eq!(log::max_level(), log::LevelFilter::Info);
254    }
255}