validating_log_listener/
lib.rs1#![warn(missing_docs)]
8
9use fidl_fuchsia_logger::{
10 LogFilterOptions, LogListenerSafeMarker, LogListenerSafeRequest, LogListenerSafeRequestStream,
11 LogMessage, LogProxy,
12};
13use fuchsia_async as fasync;
14use futures::channel::mpsc::{Receiver, Sender, channel};
15use futures::sink::SinkExt;
16use futures::stream::StreamExt;
17
18pub async fn validate_log_stream(
25 expected: impl IntoIterator<Item = LogMessage>,
26 proxy: LogProxy,
27 filter_options: Option<LogFilterOptions>,
28) {
29 ValidatingListener::new(expected).run(proxy, filter_options).await;
30}
31
32enum Outcome {
33 AllExpectedReceived,
34 LogSentDone,
35 UnexpectedMessage(LogMessage),
36}
37
38struct ValidatingListener {
40 expected: Vec<LogMessage>,
41 outcomes: Option<Receiver<Outcome>>,
42 send_outcomes: Sender<Outcome>,
43}
44
45impl ValidatingListener {
46 fn new(expected: impl IntoIterator<Item = LogMessage>) -> Self {
47 let (send_outcomes, outcomes) = channel(3);
48 Self { expected: expected.into_iter().collect(), send_outcomes, outcomes: Some(outcomes) }
49 }
50
51 async fn run(mut self, proxy: LogProxy, filter_options: Option<LogFilterOptions>) {
54 let (client_end, stream) =
55 fidl::endpoints::create_request_stream::<LogListenerSafeMarker>();
56 let filter_options = filter_options.as_ref();
57
58 proxy.listen_safe(client_end, filter_options).expect("failed to register listener");
59
60 let mut sink_says_done = false;
61 let mut all_expected = false;
62 let mut outcomes = self.outcomes.take().unwrap();
63 fasync::Task::spawn(self.handle_stream(stream)).detach();
64
65 'observe_outcomes: while let Some(outcome) = outcomes.next().await {
66 match outcome {
67 Outcome::AllExpectedReceived => all_expected = true,
68 Outcome::LogSentDone => sink_says_done = true,
69 Outcome::UnexpectedMessage(msg) => panic!("unexpected log message {msg:?}"),
70 }
71
72 if all_expected || sink_says_done {
73 break 'observe_outcomes;
74 }
75 }
76
77 assert!(all_expected, "must have received all expected messages");
78 }
79
80 async fn handle_stream(mut self, mut stream: LogListenerSafeRequestStream) {
81 while let Some(Ok(req)) = stream.next().await {
82 self.handle_request(req).await;
83 }
84 }
85
86 async fn handle_request(&mut self, req: LogListenerSafeRequest) {
87 match req {
88 LogListenerSafeRequest::Log { log, responder } => {
89 self.log(log).await;
90 responder.send().ok();
91 }
92 LogListenerSafeRequest::LogMany { log, responder } => {
93 for msg in log {
94 self.log(msg).await;
95 }
96 responder.send().ok();
97 }
98 LogListenerSafeRequest::Done { .. } => {
99 self.send_outcomes.send(Outcome::LogSentDone).await.unwrap();
100 }
101 }
102 }
103
104 async fn log(&mut self, received: LogMessage) {
105 if let Some((i, _)) = self.expected.iter().enumerate().find(|(_, expected)| {
106 expected.msg == received.msg
107 && expected.tags == received.tags
108 && expected.severity == received.severity
109 }) {
110 self.expected.remove(i);
111 if self.expected.is_empty() {
112 self.send_outcomes.send(Outcome::AllExpectedReceived).await.unwrap();
113 }
114 } else {
115 self.send_outcomes.send(Outcome::UnexpectedMessage(received)).await.unwrap();
116 }
117 }
118}