1use crate::identity::ComponentIdentity;
6use crate::logs::shared_buffer::ContainerBuffer;
7use crate::logs::stats::LogStreamStats;
8use crate::logs::stored_message::StoredMessage;
9use derivative::Derivative;
10use fidl::endpoints::RequestStream;
11use fidl_fuchsia_diagnostics::LogInterestSelector;
12use fidl_fuchsia_diagnostics_types::{Interest as FidlInterest, Severity as FidlSeverity};
13use fidl_fuchsia_logger::{LogSinkOnInitRequest, LogSinkRequest, LogSinkRequestStream};
14use fuchsia_async as fasync;
15use fuchsia_async::condition::Condition;
16use futures::future::{Fuse, FusedFuture};
17use futures::prelude::*;
18use futures::select;
19use futures::stream::StreamExt;
20use log::{debug, error};
21use selectors::SelectorExt;
22use std::cmp::Ordering;
23use std::collections::BTreeMap;
24use std::pin::pin;
25use std::sync::Arc;
26use std::task::Poll;
27
28pub type OnInactive = Box<dyn Fn(&LogsArtifactsContainer) + Send + Sync>;
29
30#[derive(Derivative)]
31#[derivative(Debug)]
32pub struct LogsArtifactsContainer {
33 pub identity: Arc<ComponentIdentity>,
35
36 pub stats: Arc<LogStreamStats>,
38
39 #[derivative(Debug = "ignore")]
41 buffer: ContainerBuffer,
42
43 #[derivative(Debug = "ignore")]
45 state: Arc<Condition<ContainerState>>,
46
47 #[derivative(Debug = "ignore")]
50 on_inactive: Option<OnInactive>,
51}
52
53#[derive(Debug)]
54struct ContainerState {
55 num_active_channels: u64,
57
58 interests: BTreeMap<Interest, usize>,
60
61 is_initializing: bool,
62}
63
64impl LogsArtifactsContainer {
65 pub fn new<'a>(
66 identity: Arc<ComponentIdentity>,
67 interest_selectors: impl Iterator<Item = &'a LogInterestSelector>,
68 initial_interest: Option<FidlSeverity>,
69 stats: Arc<LogStreamStats>,
70 buffer: ContainerBuffer,
71 on_inactive: Option<OnInactive>,
72 ) -> Self {
73 let mut interests = BTreeMap::new();
74 if let Some(severity) = initial_interest {
75 interests.insert(Interest::from(severity), 1);
76 }
77 let new = Self {
78 identity,
79 buffer,
80 state: Arc::new(Condition::new(ContainerState {
81 num_active_channels: 0,
82 interests,
83 is_initializing: true,
84 })),
85 stats,
86 on_inactive,
87 };
88
89 new.update_interest(interest_selectors, &[]);
91
92 new
93 }
94
95 pub fn handle_log_sink(
99 self: &Arc<Self>,
100 stream: LogSinkRequestStream,
101 scope: fasync::ScopeHandle,
102 ) {
103 if stream
104 .control_handle()
105 .send_on_init(LogSinkOnInitRequest {
106 buffer: Some(self.buffer.iob()),
107 interest: Some(self.state.lock().min_interest()),
108 ..Default::default()
109 })
110 .is_err()
111 {
112 return;
113 }
114
115 {
116 let mut guard = self.state.lock();
117 guard.num_active_channels += 1;
118 guard.is_initializing = false;
119 }
120 scope.spawn(Arc::clone(self).actually_handle_log_sink(stream));
121 }
122
123 async fn actually_handle_log_sink(self: Arc<Self>, mut stream: LogSinkRequestStream) {
125 let mut previous_interest_sent = None;
126 debug!(identity:% = self.identity; "Draining LogSink channel.");
127
128 let mut hanging_gets = Vec::new();
129 let mut interest_changed = pin!(Fuse::terminated());
130
131 loop {
132 select! {
133 next = stream.next() => {
134 let Some(next) = next else { break };
135 match next {
136 Ok(LogSinkRequest::ConnectStructured { socket, .. }) => {
137 self.buffer.add_socket(socket);
138 }
139 Ok(LogSinkRequest::WaitForInterestChange { responder }) => {
140 hanging_gets.push(responder);
143 }
144 Err(e) => error!(identity:% = self.identity, e:%; "error handling log sink"),
145 Ok(LogSinkRequest::_UnknownMethod { .. }) => {}
146 }
147 }
148 _ = interest_changed => {}
149 }
150
151 if !hanging_gets.is_empty() {
152 let min_interest = self.state.lock().min_interest();
153 if Some(&min_interest) != previous_interest_sent.as_ref() {
154 for responder in hanging_gets.drain(..) {
156 let _ = responder.send(Ok(&min_interest));
157 }
158 interest_changed.set(Fuse::terminated());
159 previous_interest_sent = Some(min_interest);
160 } else if interest_changed.is_terminated() {
161 let previous_interest_sent = previous_interest_sent.clone();
163 interest_changed.set(
164 self.state
165 .when(move |state| {
166 if previous_interest_sent != Some(state.min_interest()) {
167 Poll::Ready(())
168 } else {
169 Poll::Pending
170 }
171 })
172 .fuse(),
173 );
174 }
175 }
176 }
177
178 debug!(identity:% = self.identity; "LogSink channel closed.");
179 self.state.lock().num_active_channels -= 1;
180 self.check_inactive();
181 }
182
183 pub fn ingest_message(&self, message: StoredMessage) {
185 self.buffer.push_back(message.bytes());
186 }
187
188 pub fn update_interest<'a>(
193 &self,
194 interest_selectors: impl Iterator<Item = &'a LogInterestSelector>,
195 previous_selectors: &[LogInterestSelector],
196 ) {
197 let mut new_interest = FidlInterest::default();
198 let mut remove_interest = FidlInterest::default();
199 for selector in interest_selectors {
200 if self
201 .identity
202 .moniker
203 .matches_component_selector(&selector.selector)
204 .unwrap_or_default()
205 {
206 new_interest = selector.interest.clone();
207 break;
209 }
210 }
211
212 if let Some(previous_selector) = previous_selectors.iter().find(|s| {
213 self.identity.moniker.matches_component_selector(&s.selector).unwrap_or_default()
214 }) {
215 remove_interest = previous_selector.interest.clone();
216 }
217
218 let mut state = self.state.lock();
219 if new_interest == FidlInterest::default() && remove_interest != FidlInterest::default() {
223 state.erase(&remove_interest);
224 } else if new_interest != FidlInterest::default()
225 && remove_interest == FidlInterest::default()
226 {
227 state.push_interest(new_interest);
228 } else if new_interest != FidlInterest::default()
229 && remove_interest != FidlInterest::default()
230 {
231 state.erase(&remove_interest);
232 state.push_interest(new_interest);
233 } else {
234 return;
235 }
236
237 for waker in state.drain_wakers() {
238 waker.wake();
239 }
240 }
241
242 pub fn reset_interest(&self, interest_selectors: &[LogInterestSelector]) {
246 for selector in interest_selectors {
247 if self
248 .identity
249 .moniker
250 .matches_component_selector(&selector.selector)
251 .unwrap_or_default()
252 {
253 let mut state = self.state.lock();
254 state.erase(&selector.interest);
255 for waker in state.drain_wakers() {
256 waker.wake();
257 }
258 return;
259 }
260 }
261 }
262
263 pub fn is_active(&self) -> bool {
266 let state = self.state.lock();
267 state.is_initializing || state.num_active_channels > 0 || self.buffer.is_active()
268 }
269
270 fn check_inactive(&self) {
272 if !self.is_active()
273 && let Some(on_inactive) = &self.on_inactive
274 {
275 on_inactive(self);
276 }
277 }
278
279 pub fn terminate(&self) {
282 self.buffer.terminate();
283 }
284
285 #[cfg(test)]
286 pub fn mark_stopped(&self) {
287 self.state.lock().is_initializing = false;
288 self.check_inactive();
289 }
290
291 pub fn buffer(&self) -> &ContainerBuffer {
292 &self.buffer
293 }
294}
295
296impl ContainerState {
297 fn push_interest(&mut self, interest: FidlInterest) {
299 if interest != FidlInterest::default() {
300 let count = self.interests.entry(interest.into()).or_insert(0);
301 *count += 1;
302 }
303 }
304
305 fn erase(&mut self, interest: &FidlInterest) {
307 let interest = interest.clone().into();
308 if let Some(count) = self.interests.get_mut(&interest) {
309 if *count <= 1 {
310 self.interests.remove(&interest);
311 } else {
312 *count -= 1;
313 }
314 }
315 }
316
317 fn min_interest(&self) -> FidlInterest {
320 self.interests.keys().next().map(|i| i.0.clone()).unwrap_or_default()
322 }
323}
324
325#[derive(Debug, PartialEq)]
326struct Interest(FidlInterest);
327
328impl From<FidlInterest> for Interest {
329 fn from(interest: FidlInterest) -> Interest {
330 Interest(interest)
331 }
332}
333
334impl From<FidlSeverity> for Interest {
335 fn from(severity: FidlSeverity) -> Interest {
336 Interest(FidlInterest { min_severity: Some(severity), ..Default::default() })
337 }
338}
339
340impl std::ops::Deref for Interest {
341 type Target = FidlInterest;
342 fn deref(&self) -> &Self::Target {
343 &self.0
344 }
345}
346
347impl Eq for Interest {}
348
349impl Ord for Interest {
350 fn cmp(&self, other: &Self) -> Ordering {
351 self.min_severity.cmp(&other.min_severity)
352 }
353}
354
355impl PartialOrd for Interest {
356 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
357 Some(self.cmp(other))
358 }
359}
360
361#[cfg(test)]
362mod tests {
363 use super::*;
364 use crate::logs::shared_buffer::{SharedBuffer, create_ring_buffer};
365 use fidl_fuchsia_diagnostics::{ComponentSelector, StringSelector};
366 use fidl_fuchsia_diagnostics_types::Severity;
367 use fidl_fuchsia_logger::{LogSinkMarker, LogSinkProxy};
368 use fuchsia_async::{Task, TestExecutor};
369 use fuchsia_inspect as inspect;
370 use fuchsia_inspect_derive::WithInspect;
371 use moniker::ExtendedMoniker;
372
373 fn initialize_container(
374 severity: Option<Severity>,
375 scope: fasync::ScopeHandle,
376 ) -> (Arc<LogsArtifactsContainer>, LogSinkProxy) {
377 let identity = Arc::new(ComponentIdentity::new(
378 ExtendedMoniker::parse_str("/foo/bar").unwrap(),
379 "fuchsia-pkg://test",
380 ));
381 let stats = Arc::new(
382 LogStreamStats::default()
383 .with_inspect(inspect::component::inspector().root(), identity.moniker.as_ref())
384 .expect("failed to attach component log stats"),
385 );
386 let buffer = SharedBuffer::new(
387 create_ring_buffer(1024 * 1024),
388 Box::new(|_| {}),
389 Default::default(),
390 );
391 let container = Arc::new(LogsArtifactsContainer::new(
392 identity,
393 std::iter::empty(),
394 severity,
395 Arc::clone(&stats),
396 buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats),
397 None,
398 ));
399 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
401 container.handle_log_sink(stream, scope);
402 (container, proxy)
403 }
404
405 #[fuchsia::test(allow_stalls = false)]
406 async fn update_interest() {
407 let scope = fasync::Scope::new();
409 let (container, log_sink) = initialize_container(None, scope.to_handle());
410
411 let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
413
414 assert_eq!(initial_interest.min_severity, None);
416 let log_sink_clone = log_sink.clone();
417 let mut interest_future =
418 Task::spawn(async move { log_sink_clone.wait_for_interest_change().await });
419
420 assert!(TestExecutor::poll_until_stalled(&mut interest_future).await.is_pending());
422
423 container.update_interest([interest(&["foo", "bar"], Some(Severity::Info))].iter(), &[]);
425
426 assert_eq!(interest_future.await.unwrap().unwrap().min_severity, Some(Severity::Info));
428 }
429
430 #[fuchsia::test]
431 async fn initial_interest() {
432 let scope = fasync::Scope::new();
433 let (_container, log_sink) = initialize_container(Some(Severity::Info), scope.to_handle());
434 let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
435 assert_eq!(initial_interest.min_severity, Some(Severity::Info));
436 }
437
438 #[fuchsia::test]
439 async fn interest_serverity_semantics() {
440 let scope = fasync::Scope::new();
441 let (container, log_sink) = initialize_container(None, scope.to_handle());
442 let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
443 assert_eq!(initial_interest.min_severity, None);
444 container.update_interest([interest(&["foo", "bar"], Some(Severity::Info))].iter(), &[]);
446 assert_severity(&log_sink, Severity::Info).await;
447 assert_interests(&container, [(Severity::Info, 1)]);
448
449 container.update_interest([interest(&["foo", "bar"], Some(Severity::Warn))].iter(), &[]);
452 assert_interests(&container, [(Severity::Info, 1), (Severity::Warn, 1)]);
453
454 container.update_interest([interest(&["foo", "bar"], Some(Severity::Debug))].iter(), &[]);
456 assert_severity(&log_sink, Severity::Debug).await;
457 assert_interests(
458 &container,
459 [(Severity::Debug, 1), (Severity::Info, 1), (Severity::Warn, 1)],
460 );
461
462 container.update_interest([interest(&["foo", "bar"], Some(Severity::Debug))].iter(), &[]);
465 assert_interests(
466 &container,
467 [(Severity::Debug, 2), (Severity::Info, 1), (Severity::Warn, 1)],
468 );
469
470 container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Debug))]);
473 assert_interests(
474 &container,
475 [(Severity::Debug, 1), (Severity::Info, 1), (Severity::Warn, 1)],
476 );
477
478 container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Debug))]);
480 assert_severity(&log_sink, Severity::Info).await;
481 assert_interests(&container, [(Severity::Info, 1), (Severity::Warn, 1)]);
482
483 container.update_interest(
486 [interest(&["foo", "bar"], Some(Severity::Error))].iter(),
487 &[interest(&["foo", "bar"], Some(Severity::Info))],
488 );
489 assert_severity(&log_sink, Severity::Warn).await;
490 assert_interests(&container, [(Severity::Error, 1), (Severity::Warn, 1)]);
491
492 container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Warn))]);
494 assert_severity(&log_sink, Severity::Error).await;
495 assert_interests(&container, [(Severity::Error, 1)]);
496
497 container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Error))]);
500 assert_eq!(
501 log_sink.wait_for_interest_change().await.unwrap().unwrap(),
502 FidlInterest::default()
503 );
504
505 assert_interests(&container, []);
506 }
507
508 fn interest(moniker: &[&str], min_severity: Option<Severity>) -> LogInterestSelector {
509 LogInterestSelector {
510 selector: ComponentSelector {
511 moniker_segments: Some(
512 moniker.iter().map(|s| StringSelector::ExactMatch(s.to_string())).collect(),
513 ),
514 ..Default::default()
515 },
516 interest: FidlInterest { min_severity, ..Default::default() },
517 }
518 }
519
520 async fn assert_severity(proxy: &LogSinkProxy, severity: Severity) {
521 assert_eq!(
522 proxy.wait_for_interest_change().await.unwrap().unwrap().min_severity.unwrap(),
523 severity
524 );
525 }
526
527 fn assert_interests<const N: usize>(
528 container: &LogsArtifactsContainer,
529 severities: [(Severity, usize); N],
530 ) {
531 let mut expected_map = BTreeMap::new();
532 expected_map.extend(IntoIterator::into_iter(severities).map(|(s, c)| {
533 let interest = FidlInterest { min_severity: Some(s), ..Default::default() };
534 (interest.into(), c)
535 }));
536 assert_eq!(expected_map, container.state.lock().interests);
537 }
538}