1use super::{ContainerId, Inner, InnerGuard, SharedBuffer};
6use crate::identity::ComponentIdentity;
7use diagnostics_data::{LogError, LogsData};
8use diagnostics_log_encoding::parse::ParseError;
9use diagnostics_log_encoding::{Header, TRACING_FORMAT_LOG_RECORD_TYPE};
10use diagnostics_message::error::MessageError;
11use fidl_fuchsia_diagnostics::ComponentSelector;
12use fuchsia_async::condition::{ConditionGuard, WakerEntry};
13use futures::Stream;
14use futures::stream::FusedStream;
15use pin_project::pin_project;
16use ring_buffer::ring_buffer_record_len;
17use selectors::matches_selectors;
18use std::cmp::Ordering;
19use std::collections::BinaryHeap;
20use std::marker::PhantomData;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll, ready};
24use zerocopy::FromBytes;
25
26#[pin_project]
28pub struct FilterCursor {
29 buffer: Arc<SharedBuffer>,
30 index: u64,
31 message_id: u64,
32 end: Option<u64>,
33 selectors: Vec<ComponentSelector>,
34 messages: BinaryHeap<MessageRef>,
35 flush_sockets_for_snapshot: bool,
36 #[pin]
37 waker_entry: WakerEntry<Inner>,
38}
39
40impl FilterCursor {
41 pub fn new(
42 buffer: Arc<SharedBuffer>,
43 index: u64,
44 message_id: u64,
45 snapshot: bool,
46 selectors: Vec<ComponentSelector>,
47 ) -> Self {
48 let waker_entry = buffer.inner.waker_entry();
49 Self {
50 buffer,
51 index,
52 message_id,
53 end: None,
54 selectors,
55 messages: BinaryHeap::new(),
56 waker_entry,
57 flush_sockets_for_snapshot: snapshot,
58 }
59 }
60
61 pub fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Message<'_>>> {
65 let this = self.project();
66
67 if *this.flush_sockets_for_snapshot {
68 let mut inner = InnerGuard::new(this.buffer);
69 *this.end = Some(this.buffer.flush_sockets(&mut inner));
70 *this.flush_sockets_for_snapshot = false;
71 }
72
73 let mut inner = this.buffer.inner.lock();
74
75 struct ContainerIds {
77 ids: [ContainerId; 8],
78 result: u8,
79 pos: usize,
80 }
81
82 impl ContainerIds {
83 fn memoize(&mut self, id: ContainerId, predicate: impl FnOnce() -> bool) -> bool {
85 if let Some(pos) = self.ids.iter().position(|i| i == &id) {
86 self.result & (1 << pos) != 0
87 } else {
88 let result = predicate();
89 self.ids[self.pos] = id;
90 if result {
91 self.result |= 1 << self.pos;
92 } else {
93 self.result &= !(1 << self.pos);
94 }
95 self.pos = (self.pos + 1) % self.ids.len();
96 result
97 }
98 }
99 }
100
101 let mut container_ids = ContainerIds { ids: [ContainerId(0xffff); 8], result: 0, pos: 0 };
102
103 let mut dropped = 0;
108 if *this.index < inner.tail {
109 *this.index = inner.tail;
110 dropped += inner.tail_message_id - *this.message_id;
111 *this.message_id = inner.tail_message_id;
112 }
113 while *this.index < inner.last_scanned && this.end.is_none_or(|e| *this.index < e) {
114 let (component, msg, timestamp) =
117 unsafe { inner.parse_message(*this.index..inner.last_scanned) };
118
119 if let Some(timestamp) = timestamp
120 && let Some(container) = inner.containers.get(component)
121 && (this.selectors.is_empty()
122 || container_ids.memoize(component, || {
123 matches_selectors(&container.identity.moniker, this.selectors)
124 }))
125 {
126 this.messages.push(MessageRef { index: *this.index, timestamp });
127 }
128
129 *this.index += ring_buffer_record_len(msg.len()) as u64;
130 *this.message_id += 1;
131 }
132 let message = loop {
133 let Some(message) = this.messages.pop() else { break None };
134 if message.index < inner.tail {
135 dropped += 1;
137 } else {
138 break Some(message);
139 }
140 };
141
142 let live_messages = (inner.last_scanned_message_id - inner.tail_message_id) as usize;
143 const HEAP_PRUNE_THRESHOLD: usize = 256;
144
145 if this.messages.len() > live_messages + HEAP_PRUNE_THRESHOLD {
150 let tail = inner.tail;
151 this.messages.retain(|m| m.index >= tail);
152 }
153
154 if let Some(message) = message {
155 return Poll::Ready(Some(Message { inner, message, dropped }));
156 }
157
158 if this.end.is_some_and(|e| *this.index >= e) || inner.terminated {
159 *this.index = u64::MAX;
160 Poll::Ready(None)
161 } else {
162 inner.add_waker(this.waker_entry, cx.waker().clone());
163 Poll::Pending
164 }
165 }
166
167 fn is_terminated(&self) -> bool {
168 self.index == u64::MAX
169 }
170}
171
172#[derive(Eq)]
173struct MessageRef {
174 timestamp: zx::BootInstant,
175 index: u64,
176}
177
178impl PartialEq for MessageRef {
179 fn eq(&self, other: &Self) -> bool {
180 self.timestamp == other.timestamp && self.index == other.index
181 }
182}
183
184impl PartialOrd for MessageRef {
185 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
186 Some(self.cmp(other))
187 }
188}
189
190impl Ord for MessageRef {
191 fn cmp(&self, other: &Self) -> Ordering {
192 other.timestamp.cmp(&self.timestamp).then_with(|| other.index.cmp(&self.index))
194 }
195}
196
197pub struct Message<'a> {
198 inner: ConditionGuard<'a, Inner>,
199 message: MessageRef,
200 pub dropped: u64,
201}
202
203impl Message<'_> {
204 pub fn parse(&self) -> Result<(&Arc<ComponentIdentity>, Header, &[u8]), MessageError> {
207 let (container, data, _) =
210 unsafe { self.inner.parse_message(self.message.index..self.inner.last_scanned) };
211 let container_id = container;
212 let (mut header, _) = Header::read_from_prefix(data)
213 .map_err(|_| MessageError::from(ParseError::InvalidHeader))?;
214 let msg_len = header.size_words() as usize * 8;
215 if msg_len > data.len() || msg_len < 16 {
216 return Err(ParseError::ValueOutOfValidRange.into());
217 }
218 if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE {
219 return Err(ParseError::InvalidRecordType.into());
220 }
221 header.set_tag(container_id.0);
222 let container = self.inner.containers.get(container).unwrap();
223 Ok((&container.identity, header, &data[..msg_len]))
224 }
225}
226
227impl TryFrom<Message<'_>> for LogsData {
228 type Error = MessageError;
229
230 fn try_from(value: Message<'_>) -> Result<Self, Self::Error> {
231 let (container, _header, data) = value.parse()?;
232 let mut data = diagnostics_message::from_structured(container.as_ref().into(), data)?;
233 if value.dropped > 0 {
234 data.metadata
235 .errors
236 .get_or_insert(vec![])
237 .push(LogError::RolledOutLogs { count: value.dropped });
238 }
239 Ok(data)
240 }
241}
242
243#[pin_project]
245pub struct FilterCursorStream<T> {
246 #[pin]
247 cursor: FilterCursor,
248 phantom: PhantomData<T>,
249}
250
251impl<T> Stream for FilterCursorStream<T>
252where
253 T: for<'a> TryFrom<Message<'a>>,
254{
255 type Item = T;
256
257 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
258 let mut this = self.project();
259 loop {
260 match ready!(this.cursor.as_mut().poll_next(cx)) {
261 Some(item) => {
262 if let Ok(data) = T::try_from(item) {
263 return Poll::Ready(Some(data));
264 }
265 }
267 None => return Poll::Ready(None),
268 }
269 }
270 }
271}
272
273impl<T> FusedStream for FilterCursorStream<T>
274where
275 T: for<'a> TryFrom<Message<'a>>,
276{
277 fn is_terminated(&self) -> bool {
278 self.cursor.is_terminated()
279 }
280}
281
282impl<T> From<FilterCursor> for FilterCursorStream<T> {
283 fn from(cursor: FilterCursor) -> Self {
284 Self { cursor, phantom: PhantomData }
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291 use crate::logs::shared_buffer::{
292 InnerGuard, SharedBuffer, SharedBufferOptions, create_ring_buffer,
293 };
294 use crate::logs::stats::LogStreamStats;
295 use crate::logs::testing::make_message;
296 use assert_matches::assert_matches;
297 use fidl_fuchsia_diagnostics::StreamMode;
298 use futures::StreamExt;
299 use selectors::{FastError, parse_component_selector};
300 use std::future::poll_fn;
301 use std::pin::pin;
302 use std::time::Duration;
303
304 fn test_stats() -> Arc<LogStreamStats> {
305 Arc::new(LogStreamStats::new(
306 &fuchsia_inspect::Node::default(),
307 &ComponentIdentity::unknown(),
308 ))
309 }
310
311 #[fuchsia::test]
312 async fn cursor_basic() {
313 let buffer = SharedBuffer::new(
314 create_ring_buffer(65536),
315 Box::new(|_| {}),
316 Default::default(),
317 &fuchsia_inspect::Node::default(),
318 );
319 let container = buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
320 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
321 container.push_back(msg.bytes());
322
323 let cursor = buffer.cursor(StreamMode::Snapshot, vec![]);
324 let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
325
326 let item = stream.next().await.unwrap();
327 assert_eq!(item.msg().unwrap(), "a");
328 assert!(stream.next().await.is_none());
329 }
330
331 #[fuchsia::test]
332 async fn cursor_filter() {
333 let buffer = SharedBuffer::new(
334 create_ring_buffer(65536),
335 Box::new(|_| {}),
336 Default::default(),
337 &fuchsia_inspect::Node::default(),
338 );
339 let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
340 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), test_stats());
341
342 container_a.push_back(make_message("msg_a", None, zx::BootInstant::from_nanos(1)).bytes());
343 container_b.push_back(make_message("msg_b", None, zx::BootInstant::from_nanos(2)).bytes());
344
345 let selector = parse_component_selector::<FastError>("a").unwrap();
346 let cursor = buffer.cursor(StreamMode::Snapshot, vec![selector]);
347 let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
348
349 let item = stream.next().await.unwrap();
350 assert_eq!(item.msg().unwrap(), "msg_a");
351 assert!(stream.next().await.is_none());
352 }
353
354 #[fuchsia::test]
355 async fn cursor_subscribe() {
356 let buffer = SharedBuffer::new(
357 create_ring_buffer(65536),
358 Box::new(|_| {}),
359 Default::default(),
360 &fuchsia_inspect::Node::default(),
361 );
362 let container = buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
363
364 let cursor = buffer.cursor(StreamMode::Subscribe, vec![]);
365 let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
366
367 assert!(futures::poll!(stream.next()).is_pending());
368
369 container.push_back(make_message("msg", None, zx::BootInstant::from_nanos(1)).bytes());
370
371 let item = stream.next().await.unwrap();
372 assert_eq!(item.msg().unwrap(), "msg");
373 }
374
375 #[fuchsia::test]
376 async fn cursor_snapshot_then_subscribe() {
377 let buffer = SharedBuffer::new(
378 create_ring_buffer(65536),
379 Box::new(|_| {}),
380 Default::default(),
381 &fuchsia_inspect::Node::default(),
382 );
383 let container = buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
384
385 container.push_back(make_message("msg1", None, zx::BootInstant::from_nanos(1)).bytes());
386
387 let cursor = buffer.cursor(StreamMode::SnapshotThenSubscribe, vec![]);
388 let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
389
390 let item = stream.next().await.unwrap();
391 assert_eq!(item.msg().unwrap(), "msg1");
392
393 assert!(futures::poll!(stream.next()).is_pending());
394
395 container.push_back(make_message("msg2", None, zx::BootInstant::from_nanos(2)).bytes());
396
397 let item = stream.next().await.unwrap();
398 assert_eq!(item.msg().unwrap(), "msg2");
399 }
400
401 #[fuchsia::test]
402 async fn cursor_dropped_logs() {
403 let buffer = SharedBuffer::new(
404 create_ring_buffer(65536),
405 Box::new(|_| {}),
406 SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
407 &fuchsia_inspect::Node::default(),
408 );
409 let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
410 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), test_stats());
411
412 let msg = make_message("msg", None, zx::BootInstant::from_nanos(1));
413
414 container_a.push_back(msg.bytes()); container_a.push_back(msg.bytes()); container_a.push_back(msg.bytes()); let cursor = buffer.cursor(StreamMode::SnapshotThenSubscribe, vec![]);
419 let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
420
421 let item = stream.next().await.unwrap();
423 assert_eq!(item.msg().unwrap(), "msg");
424
425 let has_messages = async |component| {
426 let mut cursor = pin!(buffer.cursor(
427 StreamMode::Snapshot,
428 vec![parse_component_selector::<FastError>(component).unwrap()],
429 ));
430 poll_fn(|cx| cursor.as_mut().poll_next(cx).map(|i| i.is_some())).await
431 };
432
433 while has_messages("a").await {
435 container_b.push_back(msg.bytes());
436 {
438 let mut inner = InnerGuard::new(&buffer);
439 inner.check_space(inner.ring_buffer.head());
440 }
441 }
442
443 let item = stream.next().await.unwrap();
445 assert_eq!(item.metadata.errors.as_ref().unwrap().len(), 1);
446 assert_matches!(
447 item.metadata.errors.as_ref().unwrap()[0],
448 LogError::RolledOutLogs { count } if count == 2
449 );
450 }
451
452 #[fuchsia::test]
453 async fn terminate_terminates_cursor() {
454 let buffer = SharedBuffer::new(
455 create_ring_buffer(65536),
456 Box::new(|_| {}),
457 SharedBufferOptions { sleep_time: Duration::from_secs(10), ..Default::default() },
458 &fuchsia_inspect::Node::default(),
459 );
460 let cursor = pin!(FilterCursorStream::<LogsData>::from(
461 buffer.cursor(StreamMode::SnapshotThenSubscribe, vec![])
462 ));
463 let container = buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
464 container.push_back(make_message("msg", None, zx::BootInstant::from_nanos(1)).bytes());
465 drop(buffer.terminate());
466 assert_eq!(cursor.count().await, 1);
467 }
468
469 #[fuchsia::test]
470 async fn cursor_descending_timestamps_bounded_heap() {
471 let buffer = SharedBuffer::new(
472 create_ring_buffer(65536),
473 Box::new(|_| {}),
474 SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
475 &fuchsia_inspect::Node::default(),
476 );
477 let identity = Arc::new(vec!["a"].into());
478 let container = buffer.new_container_buffer(identity, test_stats());
479 let cursor = buffer.cursor(StreamMode::SnapshotThenSubscribe, vec![]);
480 let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
481
482 for i in 0..5000 {
485 let msg_large = make_message(
486 &format!("msg_large_{}", i),
487 None,
488 zx::BootInstant::from_nanos(2000000 + i as i64),
489 );
490 container.push_back(msg_large.bytes());
491 let msg_small = make_message(
492 &format!("msg_small_{}", i),
493 None,
494 zx::BootInstant::from_nanos(1000000 - i as i64),
495 );
496 container.push_back(msg_small.bytes());
497
498 {
499 let mut inner = InnerGuard::new(&buffer);
500 inner.check_space(inner.ring_buffer.head());
501 }
502
503 let item = stream.next().await.unwrap();
504 assert_eq!(item.msg().unwrap(), &format!("msg_small_{}", i));
505 }
506
507 let heap_len = stream.as_mut().project().cursor.project().messages.len();
511 assert!(heap_len <= 3000, "Heap size: {}", heap_len);
512 }
513}