validating_log_listener/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

//! A utility crate for validating the behavior of LogSink & Log implementations.

#![warn(missing_docs)]

use fidl_fuchsia_logger::{
    LogFilterOptions, LogListenerSafeMarker, LogListenerSafeRequest, LogListenerSafeRequestStream,
    LogMessage, LogProxy,
};
use fuchsia_async as fasync;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::sink::SinkExt;
use futures::stream::StreamExt;

/// Test that all of the expected message arrive over `proxy`, with no unexpected ones appearing.
/// Returns once all expected messages have been observed.
///
/// # Panics
///
/// Panics when validation fails due to an unexpected message or due to connection failures.
pub async fn validate_log_stream(
    expected: impl IntoIterator<Item = LogMessage>,
    proxy: LogProxy,
    filter_options: Option<LogFilterOptions>,
) {
    ValidatingListener::new(expected).run(proxy, filter_options, false).await;
}

/// Test that all of the expected message arrive over `proxy` after requesting a log dump, with no
/// unexpected records appearing. Returns once all expected messages have been observed.
///
/// # Panics
///
/// Panics when validation fails due to an unexpected message, missing messages when the sink says
/// it is done dumping, or due to connection failures.
pub async fn validate_log_dump(
    expected: impl IntoIterator<Item = LogMessage>,
    proxy: LogProxy,
    filter_options: Option<LogFilterOptions>,
) {
    ValidatingListener::new(expected).run(proxy, filter_options, true).await;
}

enum Outcome {
    AllExpectedReceived,
    LogSentDone,
    UnexpectedMessage(LogMessage),
}

/// Listens to all log messages sent during test, and verifies that they match what's expected.
struct ValidatingListener {
    expected: Vec<LogMessage>,
    outcomes: Option<Receiver<Outcome>>,
    send_outcomes: Sender<Outcome>,
}

impl ValidatingListener {
    fn new(expected: impl IntoIterator<Item = LogMessage>) -> Self {
        let (send_outcomes, outcomes) = channel(3);
        Self { expected: expected.into_iter().collect(), send_outcomes, outcomes: Some(outcomes) }
    }

    /// Drive a LogListenerSafe request stream. Signals for channel close and test completion are
    /// send on the futures-aware channels with which ValidatingListener is constructed.
    async fn run(
        mut self,
        proxy: LogProxy,
        filter_options: Option<LogFilterOptions>,
        dump_logs: bool,
    ) {
        let (client_end, stream) =
            fidl::endpoints::create_request_stream::<LogListenerSafeMarker>();
        let filter_options = filter_options.as_ref();

        if dump_logs {
            proxy.dump_logs_safe(client_end, filter_options).expect("failed to register listener");
        } else {
            proxy.listen_safe(client_end, filter_options).expect("failed to register listener");
        }

        let mut sink_says_done = false;
        let mut all_expected = false;
        let mut outcomes = self.outcomes.take().unwrap();
        fasync::Task::spawn(self.handle_stream(stream)).detach();

        'observe_outcomes: while let Some(outcome) = outcomes.next().await {
            match outcome {
                Outcome::AllExpectedReceived => all_expected = true,
                Outcome::LogSentDone => sink_says_done = true,
                Outcome::UnexpectedMessage(msg) => panic!("unexpected log message {:?}", msg),
            }

            if all_expected && (!dump_logs || sink_says_done) {
                // only stop looking at outcomes if we have all the messages we expect AND
                // if we either don't care about log dumps terminating because we didn't ask for one
                // or it has terminated as we expect
                break 'observe_outcomes;
            }
        }

        if dump_logs {
            assert!(sink_says_done, "must have received all expected messages");
        } else {
            // FIXME(41966): this should be tested for both streaming and dumping modes
            assert!(all_expected, "must have received all expected messages");
        }
    }

    async fn handle_stream(mut self, mut stream: LogListenerSafeRequestStream) {
        while let Some(Ok(req)) = stream.next().await {
            self.handle_request(req).await;
        }
    }

    async fn handle_request(&mut self, req: LogListenerSafeRequest) {
        match req {
            LogListenerSafeRequest::Log { log, responder } => {
                self.log(log).await;
                responder.send().ok();
            }
            LogListenerSafeRequest::LogMany { log, responder } => {
                for msg in log {
                    self.log(msg).await;
                }
                responder.send().ok();
            }
            LogListenerSafeRequest::Done { .. } => {
                self.send_outcomes.send(Outcome::LogSentDone).await.unwrap();
            }
        }
    }

    async fn log(&mut self, received: LogMessage) {
        if let Some((i, _)) = self.expected.iter().enumerate().find(|(_, expected)| {
            expected.msg == received.msg
                && expected.tags == received.tags
                && expected.severity == received.severity
        }) {
            self.expected.remove(i);
            if self.expected.is_empty() {
                self.send_outcomes.send(Outcome::AllExpectedReceived).await.unwrap();
            }
        } else {
            self.send_outcomes.send(Outcome::UnexpectedMessage(received)).await.unwrap();
        }
    }
}