validating_log_listener/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A utility crate for validating the behavior of LogSink & Log implementations.
6
7#![warn(missing_docs)]
8
9use fidl_fuchsia_logger::{
10    LogFilterOptions, LogListenerSafeMarker, LogListenerSafeRequest, LogListenerSafeRequestStream,
11    LogMessage, LogProxy,
12};
13use fuchsia_async as fasync;
14use futures::channel::mpsc::{channel, Receiver, Sender};
15use futures::sink::SinkExt;
16use futures::stream::StreamExt;
17
18/// Test that all of the expected message arrive over `proxy`, with no unexpected ones appearing.
19/// Returns once all expected messages have been observed.
20///
21/// # Panics
22///
23/// Panics when validation fails due to an unexpected message or due to connection failures.
24pub async fn validate_log_stream(
25    expected: impl IntoIterator<Item = LogMessage>,
26    proxy: LogProxy,
27    filter_options: Option<LogFilterOptions>,
28) {
29    ValidatingListener::new(expected).run(proxy, filter_options, false).await;
30}
31
32/// Test that all of the expected message arrive over `proxy` after requesting a log dump, with no
33/// unexpected records appearing. Returns once all expected messages have been observed.
34///
35/// # Panics
36///
37/// Panics when validation fails due to an unexpected message, missing messages when the sink says
38/// it is done dumping, or due to connection failures.
39pub async fn validate_log_dump(
40    expected: impl IntoIterator<Item = LogMessage>,
41    proxy: LogProxy,
42    filter_options: Option<LogFilterOptions>,
43) {
44    ValidatingListener::new(expected).run(proxy, filter_options, true).await;
45}
46
47enum Outcome {
48    AllExpectedReceived,
49    LogSentDone,
50    UnexpectedMessage(LogMessage),
51}
52
53/// Listens to all log messages sent during test, and verifies that they match what's expected.
54struct ValidatingListener {
55    expected: Vec<LogMessage>,
56    outcomes: Option<Receiver<Outcome>>,
57    send_outcomes: Sender<Outcome>,
58}
59
60impl ValidatingListener {
61    fn new(expected: impl IntoIterator<Item = LogMessage>) -> Self {
62        let (send_outcomes, outcomes) = channel(3);
63        Self { expected: expected.into_iter().collect(), send_outcomes, outcomes: Some(outcomes) }
64    }
65
66    /// Drive a LogListenerSafe request stream. Signals for channel close and test completion are
67    /// send on the futures-aware channels with which ValidatingListener is constructed.
68    async fn run(
69        mut self,
70        proxy: LogProxy,
71        filter_options: Option<LogFilterOptions>,
72        dump_logs: bool,
73    ) {
74        let (client_end, stream) =
75            fidl::endpoints::create_request_stream::<LogListenerSafeMarker>();
76        let filter_options = filter_options.as_ref();
77
78        if dump_logs {
79            proxy.dump_logs_safe(client_end, filter_options).expect("failed to register listener");
80        } else {
81            proxy.listen_safe(client_end, filter_options).expect("failed to register listener");
82        }
83
84        let mut sink_says_done = false;
85        let mut all_expected = false;
86        let mut outcomes = self.outcomes.take().unwrap();
87        fasync::Task::spawn(self.handle_stream(stream)).detach();
88
89        'observe_outcomes: while let Some(outcome) = outcomes.next().await {
90            match outcome {
91                Outcome::AllExpectedReceived => all_expected = true,
92                Outcome::LogSentDone => sink_says_done = true,
93                Outcome::UnexpectedMessage(msg) => panic!("unexpected log message {:?}", msg),
94            }
95
96            if all_expected && (!dump_logs || sink_says_done) {
97                // only stop looking at outcomes if we have all the messages we expect AND
98                // if we either don't care about log dumps terminating because we didn't ask for one
99                // or it has terminated as we expect
100                break 'observe_outcomes;
101            }
102        }
103
104        if dump_logs {
105            assert!(sink_says_done, "must have received all expected messages");
106        } else {
107            // FIXME(41966): this should be tested for both streaming and dumping modes
108            assert!(all_expected, "must have received all expected messages");
109        }
110    }
111
112    async fn handle_stream(mut self, mut stream: LogListenerSafeRequestStream) {
113        while let Some(Ok(req)) = stream.next().await {
114            self.handle_request(req).await;
115        }
116    }
117
118    async fn handle_request(&mut self, req: LogListenerSafeRequest) {
119        match req {
120            LogListenerSafeRequest::Log { log, responder } => {
121                self.log(log).await;
122                responder.send().ok();
123            }
124            LogListenerSafeRequest::LogMany { log, responder } => {
125                for msg in log {
126                    self.log(msg).await;
127                }
128                responder.send().ok();
129            }
130            LogListenerSafeRequest::Done { .. } => {
131                self.send_outcomes.send(Outcome::LogSentDone).await.unwrap();
132            }
133        }
134    }
135
136    async fn log(&mut self, received: LogMessage) {
137        if let Some((i, _)) = self.expected.iter().enumerate().find(|(_, expected)| {
138            expected.msg == received.msg
139                && expected.tags == received.tags
140                && expected.severity == received.severity
141        }) {
142            self.expected.remove(i);
143            if self.expected.is_empty() {
144                self.send_outcomes.send(Outcome::AllExpectedReceived).await.unwrap();
145            }
146        } else {
147            self.send_outcomes.send(Outcome::UnexpectedMessage(received)).await.unwrap();
148        }
149    }
150}