Skip to main content

archivist_lib/logs/shared_buffer/
cursor.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{ContainerId, Inner, InnerGuard, SharedBuffer};
6use crate::identity::ComponentIdentity;
7use diagnostics_data::{LogError, LogsData};
8use diagnostics_log_encoding::parse::ParseError;
9use diagnostics_log_encoding::{Header, TRACING_FORMAT_LOG_RECORD_TYPE};
10use diagnostics_message::error::MessageError;
11use fidl_fuchsia_diagnostics::ComponentSelector;
12use fuchsia_async::condition::{ConditionGuard, WakerEntry};
13use futures::Stream;
14use futures::stream::FusedStream;
15use pin_project::pin_project;
16use ring_buffer::ring_buffer_record_len;
17use selectors::matches_selectors;
18use std::cmp::Ordering;
19use std::collections::BinaryHeap;
20use std::marker::PhantomData;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll, ready};
24use zerocopy::FromBytes;
25
26/// FilterCursor is a cursor that returns all logs optionally filtered by component selectors.
27#[pin_project]
28pub struct FilterCursor {
29    buffer: Arc<SharedBuffer>,
30    index: u64,
31    message_id: u64,
32    end: Option<u64>,
33    selectors: Vec<ComponentSelector>,
34    messages: BinaryHeap<MessageRef>,
35    flush_sockets_for_snapshot: bool,
36    #[pin]
37    waker_entry: WakerEntry<Inner>,
38}
39
40impl FilterCursor {
41    pub fn new(
42        buffer: Arc<SharedBuffer>,
43        index: u64,
44        message_id: u64,
45        snapshot: bool,
46        selectors: Vec<ComponentSelector>,
47    ) -> Self {
48        let waker_entry = buffer.inner.waker_entry();
49        Self {
50            buffer,
51            index,
52            message_id,
53            end: None,
54            selectors,
55            messages: BinaryHeap::new(),
56            waker_entry,
57            flush_sockets_for_snapshot: snapshot,
58        }
59    }
60
61    /// Polls for the next message.  We can't use the Stream trait because this is a lending
62    /// iterator; it returns a reference to a message (and will hold a lock).  Depending on the use,
63    /// this can be more efficient than using a Stream which will involve copying the message.
64    pub fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Message<'_>>> {
65        let this = self.project();
66
67        if *this.flush_sockets_for_snapshot {
68            let mut inner = InnerGuard::new(this.buffer);
69            *this.end = Some(this.buffer.flush_sockets(&mut inner));
70            *this.flush_sockets_for_snapshot = false;
71        }
72
73        let mut inner = this.buffer.inner.lock();
74
75        // Comparing monikers can be expensive, so memoize the results.
76        struct ContainerIds {
77            ids: [ContainerId; 8],
78            result: u8,
79            pos: usize,
80        }
81
82        impl ContainerIds {
83            /// Memoizes the result of `predicate` for a given `id`.
84            fn memoize(&mut self, id: ContainerId, predicate: impl FnOnce() -> bool) -> bool {
85                if let Some(pos) = self.ids.iter().position(|i| i == &id) {
86                    self.result & (1 << pos) != 0
87                } else {
88                    let result = predicate();
89                    self.ids[self.pos] = id;
90                    if result {
91                        self.result |= 1 << self.pos;
92                    } else {
93                        self.result &= !(1 << self.pos);
94                    }
95                    self.pos = (self.pos + 1) % self.ids.len();
96                    result
97                }
98            }
99        }
100
101        let mut container_ids = ContainerIds { ids: [ContainerId(0xffff); 8], result: 0, pos: 0 };
102
103        // NOTE: If messages are dropped the dropped count won't account for filtering: it will
104        // include messages that have been dropped that don't match the filter.  Fixing this is
105        // difficult and not worth the effort.  Dropped messages should be rare and the common case
106        // is that there is no filtering.
107        let mut dropped = 0;
108        if *this.index < inner.tail {
109            *this.index = inner.tail;
110            dropped += inner.tail_message_id - *this.message_id;
111            *this.message_id = inner.tail_message_id;
112        }
113        while *this.index < inner.last_scanned && this.end.is_none_or(|e| *this.index < e) {
114            // SAFETY: We've checked index >= inner.tail, so the range must be valid.  `msg`
115            // will remain valid whilst we're holding the lock.
116            let (component, msg, timestamp) =
117                unsafe { inner.parse_message(*this.index..inner.last_scanned) };
118
119            if let Some(timestamp) = timestamp
120                && let Some(container) = inner.containers.get(component)
121                && (this.selectors.is_empty()
122                    || container_ids.memoize(component, || {
123                        matches_selectors(&container.identity.moniker, this.selectors)
124                    }))
125            {
126                this.messages.push(MessageRef { index: *this.index, timestamp });
127            }
128
129            *this.index += ring_buffer_record_len(msg.len()) as u64;
130            *this.message_id += 1;
131        }
132        let message = loop {
133            let Some(message) = this.messages.pop() else { break None };
134            if message.index < inner.tail {
135                // See note above regarding filtering.
136                dropped += 1;
137            } else {
138                break Some(message);
139            }
140        };
141
142        let live_messages = (inner.last_scanned_message_id - inner.tail_message_id) as usize;
143        const HEAP_PRUNE_THRESHOLD: usize = 256;
144
145        // Drain all stale entries if the heap has grown too large.
146        // Timestamps are attacker-controlled (see parse_message), so a hostile writer
147        // can order the heap such that stale entries never surface and the heap grows
148        // without bound. This keeps the heap bounded by the live ring-buffer window.
149        if this.messages.len() > live_messages + HEAP_PRUNE_THRESHOLD {
150            let tail = inner.tail;
151            this.messages.retain(|m| m.index >= tail);
152        }
153
154        if let Some(message) = message {
155            return Poll::Ready(Some(Message { inner, message, dropped }));
156        }
157
158        if this.end.is_some_and(|e| *this.index >= e) || inner.terminated {
159            *this.index = u64::MAX;
160            Poll::Ready(None)
161        } else {
162            inner.add_waker(this.waker_entry, cx.waker().clone());
163            Poll::Pending
164        }
165    }
166
167    fn is_terminated(&self) -> bool {
168        self.index == u64::MAX
169    }
170}
171
172#[derive(Eq)]
173struct MessageRef {
174    timestamp: zx::BootInstant,
175    index: u64,
176}
177
178impl PartialEq for MessageRef {
179    fn eq(&self, other: &Self) -> bool {
180        self.timestamp == other.timestamp && self.index == other.index
181    }
182}
183
184impl PartialOrd for MessageRef {
185    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
186        Some(self.cmp(other))
187    }
188}
189
190impl Ord for MessageRef {
191    fn cmp(&self, other: &Self) -> Ordering {
192        // BinaryHeap is a max-heap, but we want min, hence this ordering.
193        other.timestamp.cmp(&self.timestamp).then_with(|| other.index.cmp(&self.index))
194    }
195}
196
197pub struct Message<'a> {
198    inner: ConditionGuard<'a, Inner>,
199    message: MessageRef,
200    pub dropped: u64,
201}
202
203impl Message<'_> {
204    /// Returns the component and FXT bytes.  The FXT record is validated to be the correct length
205    /// and type.
206    pub fn parse(&self) -> Result<(&Arc<ComponentIdentity>, Header, &[u8]), MessageError> {
207        // SAFETY: We hold a lock which prevents the buffer from being drained so
208        // it should be safe to read this range.
209        let (container, data, _) =
210            unsafe { self.inner.parse_message(self.message.index..self.inner.last_scanned) };
211        let container_id = container;
212        let (mut header, _) = Header::read_from_prefix(data)
213            .map_err(|_| MessageError::from(ParseError::InvalidHeader))?;
214        let msg_len = header.size_words() as usize * 8;
215        if msg_len > data.len() || msg_len < 16 {
216            return Err(ParseError::ValueOutOfValidRange.into());
217        }
218        if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE {
219            return Err(ParseError::InvalidRecordType.into());
220        }
221        header.set_tag(container_id.0);
222        let container = self.inner.containers.get(container).unwrap();
223        Ok((&container.identity, header, &data[..msg_len]))
224    }
225}
226
227impl TryFrom<Message<'_>> for LogsData {
228    type Error = MessageError;
229
230    fn try_from(value: Message<'_>) -> Result<Self, Self::Error> {
231        let (container, _header, data) = value.parse()?;
232        let mut data = diagnostics_message::from_structured(container.as_ref().into(), data)?;
233        if value.dropped > 0 {
234            data.metadata
235                .errors
236                .get_or_insert(vec![])
237                .push(LogError::RolledOutLogs { count: value.dropped });
238        }
239        Ok(data)
240    }
241}
242
243/// Like FilterCursor but returns a stream of LogsData.
244#[pin_project]
245pub struct FilterCursorStream<T> {
246    #[pin]
247    cursor: FilterCursor,
248    phantom: PhantomData<T>,
249}
250
251impl<T> Stream for FilterCursorStream<T>
252where
253    T: for<'a> TryFrom<Message<'a>>,
254{
255    type Item = T;
256
257    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
258        let mut this = self.project();
259        loop {
260            match ready!(this.cursor.as_mut().poll_next(cx)) {
261                Some(item) => {
262                    if let Ok(data) = T::try_from(item) {
263                        return Poll::Ready(Some(data));
264                    }
265                    // The message is bad, just ignore it.
266                }
267                None => return Poll::Ready(None),
268            }
269        }
270    }
271}
272
273impl<T> FusedStream for FilterCursorStream<T>
274where
275    T: for<'a> TryFrom<Message<'a>>,
276{
277    fn is_terminated(&self) -> bool {
278        self.cursor.is_terminated()
279    }
280}
281
282impl<T> From<FilterCursor> for FilterCursorStream<T> {
283    fn from(cursor: FilterCursor) -> Self {
284        Self { cursor, phantom: PhantomData }
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291    use crate::logs::shared_buffer::{
292        InnerGuard, SharedBuffer, SharedBufferOptions, create_ring_buffer,
293    };
294    use crate::logs::stats::LogStreamStats;
295    use crate::logs::testing::make_message;
296    use assert_matches::assert_matches;
297    use fidl_fuchsia_diagnostics::StreamMode;
298    use futures::StreamExt;
299    use selectors::{FastError, parse_component_selector};
300    use std::future::poll_fn;
301    use std::pin::pin;
302    use std::time::Duration;
303
304    fn test_stats() -> Arc<LogStreamStats> {
305        Arc::new(LogStreamStats::new(
306            &fuchsia_inspect::Node::default(),
307            &ComponentIdentity::unknown(),
308        ))
309    }
310
311    #[fuchsia::test]
312    async fn cursor_basic() {
313        let buffer = SharedBuffer::new(
314            create_ring_buffer(65536),
315            Box::new(|_| {}),
316            Default::default(),
317            &fuchsia_inspect::Node::default(),
318        );
319        let container = buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
320        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
321        container.push_back(msg.bytes());
322
323        let cursor = buffer.cursor(StreamMode::Snapshot, vec![]);
324        let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
325
326        let item = stream.next().await.unwrap();
327        assert_eq!(item.msg().unwrap(), "a");
328        assert!(stream.next().await.is_none());
329    }
330
331    #[fuchsia::test]
332    async fn cursor_filter() {
333        let buffer = SharedBuffer::new(
334            create_ring_buffer(65536),
335            Box::new(|_| {}),
336            Default::default(),
337            &fuchsia_inspect::Node::default(),
338        );
339        let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
340        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), test_stats());
341
342        container_a.push_back(make_message("msg_a", None, zx::BootInstant::from_nanos(1)).bytes());
343        container_b.push_back(make_message("msg_b", None, zx::BootInstant::from_nanos(2)).bytes());
344
345        let selector = parse_component_selector::<FastError>("a").unwrap();
346        let cursor = buffer.cursor(StreamMode::Snapshot, vec![selector]);
347        let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
348
349        let item = stream.next().await.unwrap();
350        assert_eq!(item.msg().unwrap(), "msg_a");
351        assert!(stream.next().await.is_none());
352    }
353
354    #[fuchsia::test]
355    async fn cursor_subscribe() {
356        let buffer = SharedBuffer::new(
357            create_ring_buffer(65536),
358            Box::new(|_| {}),
359            Default::default(),
360            &fuchsia_inspect::Node::default(),
361        );
362        let container = buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
363
364        let cursor = buffer.cursor(StreamMode::Subscribe, vec![]);
365        let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
366
367        assert!(futures::poll!(stream.next()).is_pending());
368
369        container.push_back(make_message("msg", None, zx::BootInstant::from_nanos(1)).bytes());
370
371        let item = stream.next().await.unwrap();
372        assert_eq!(item.msg().unwrap(), "msg");
373    }
374
375    #[fuchsia::test]
376    async fn cursor_snapshot_then_subscribe() {
377        let buffer = SharedBuffer::new(
378            create_ring_buffer(65536),
379            Box::new(|_| {}),
380            Default::default(),
381            &fuchsia_inspect::Node::default(),
382        );
383        let container = buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
384
385        container.push_back(make_message("msg1", None, zx::BootInstant::from_nanos(1)).bytes());
386
387        let cursor = buffer.cursor(StreamMode::SnapshotThenSubscribe, vec![]);
388        let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
389
390        let item = stream.next().await.unwrap();
391        assert_eq!(item.msg().unwrap(), "msg1");
392
393        assert!(futures::poll!(stream.next()).is_pending());
394
395        container.push_back(make_message("msg2", None, zx::BootInstant::from_nanos(2)).bytes());
396
397        let item = stream.next().await.unwrap();
398        assert_eq!(item.msg().unwrap(), "msg2");
399    }
400
401    #[fuchsia::test]
402    async fn cursor_dropped_logs() {
403        let buffer = SharedBuffer::new(
404            create_ring_buffer(65536),
405            Box::new(|_| {}),
406            SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
407            &fuchsia_inspect::Node::default(),
408        );
409        let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
410        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), test_stats());
411
412        let msg = make_message("msg", None, zx::BootInstant::from_nanos(1));
413
414        container_a.push_back(msg.bytes()); // 1
415        container_a.push_back(msg.bytes()); // 2
416        container_a.push_back(msg.bytes()); // 3
417
418        let cursor = buffer.cursor(StreamMode::SnapshotThenSubscribe, vec![]);
419        let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
420
421        // Consume one
422        let item = stream.next().await.unwrap();
423        assert_eq!(item.msg().unwrap(), "msg");
424
425        let has_messages = async |component| {
426            let mut cursor = pin!(buffer.cursor(
427                StreamMode::Snapshot,
428                vec![parse_component_selector::<FastError>(component).unwrap()],
429            ));
430            poll_fn(|cx| cursor.as_mut().poll_next(cx).map(|i| i.is_some())).await
431        };
432
433        // Force roll out of remaining A messages
434        while has_messages("a").await {
435            container_b.push_back(msg.bytes());
436            // Force the buffer to pop old messages.
437            {
438                let mut inner = InnerGuard::new(&buffer);
439                inner.check_space(inner.ring_buffer.head());
440            }
441        }
442
443        // Next item should indicate the two dropped messages for container A.
444        let item = stream.next().await.unwrap();
445        assert_eq!(item.metadata.errors.as_ref().unwrap().len(), 1);
446        assert_matches!(
447            item.metadata.errors.as_ref().unwrap()[0],
448            LogError::RolledOutLogs { count } if count == 2
449        );
450    }
451
452    #[fuchsia::test]
453    async fn terminate_terminates_cursor() {
454        let buffer = SharedBuffer::new(
455            create_ring_buffer(65536),
456            Box::new(|_| {}),
457            SharedBufferOptions { sleep_time: Duration::from_secs(10), ..Default::default() },
458            &fuchsia_inspect::Node::default(),
459        );
460        let cursor = pin!(FilterCursorStream::<LogsData>::from(
461            buffer.cursor(StreamMode::SnapshotThenSubscribe, vec![])
462        ));
463        let container = buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
464        container.push_back(make_message("msg", None, zx::BootInstant::from_nanos(1)).bytes());
465        drop(buffer.terminate());
466        assert_eq!(cursor.count().await, 1);
467    }
468
469    #[fuchsia::test]
470    async fn cursor_descending_timestamps_bounded_heap() {
471        let buffer = SharedBuffer::new(
472            create_ring_buffer(65536),
473            Box::new(|_| {}),
474            SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
475            &fuchsia_inspect::Node::default(),
476        );
477        let identity = Arc::new(vec!["a"].into());
478        let container = buffer.new_container_buffer(identity, test_stats());
479        let cursor = buffer.cursor(StreamMode::SnapshotThenSubscribe, vec![]);
480        let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
481
482        // Write enough messages to roll out old messages and exceed the HEAP_PRUNE_THRESHOLD.
483        // We interleave large and small timestamps so the large ones remain in the heap.
484        for i in 0..5000 {
485            let msg_large = make_message(
486                &format!("msg_large_{}", i),
487                None,
488                zx::BootInstant::from_nanos(2000000 + i as i64),
489            );
490            container.push_back(msg_large.bytes());
491            let msg_small = make_message(
492                &format!("msg_small_{}", i),
493                None,
494                zx::BootInstant::from_nanos(1000000 - i as i64),
495            );
496            container.push_back(msg_small.bytes());
497
498            {
499                let mut inner = InnerGuard::new(&buffer);
500                inner.check_space(inner.ring_buffer.head());
501            }
502
503            let item = stream.next().await.unwrap();
504            assert_eq!(item.msg().unwrap(), &format!("msg_small_{}", i));
505        }
506
507        // Without the prune logic, the large messages would stay in the heap forever,
508        // making the heap size >= 5000.
509        // With the prune logic, they are pruned once they exceed live_messages + PRUNE_THRESHOLD.
510        let heap_len = stream.as_mut().project().cursor.project().messages.len();
511        assert!(heap_len <= 3000, "Heap size: {}", heap_len);
512    }
513}