Skip to main content

archivist_lib/logs/shared_buffer/
cursor.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::{ContainerId, Inner, InnerGuard, SharedBuffer};
6use crate::identity::ComponentIdentity;
7use diagnostics_data::{LogError, LogsData};
8use diagnostics_log_encoding::parse::ParseError;
9use diagnostics_log_encoding::{Header, TRACING_FORMAT_LOG_RECORD_TYPE};
10use diagnostics_message::error::MessageError;
11use fidl_fuchsia_diagnostics::ComponentSelector;
12use fuchsia_async::condition::{ConditionGuard, WakerEntry};
13use futures::Stream;
14use futures::stream::FusedStream;
15use pin_project::pin_project;
16use ring_buffer::ring_buffer_record_len;
17use selectors::matches_selectors;
18use std::cmp::Ordering;
19use std::collections::BinaryHeap;
20use std::marker::PhantomData;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll, ready};
24use zerocopy::{FromBytes, IntoBytes};
25
26/// FilterCursor is a cursor that returns all logs optionally filtered by component selectors.
27#[pin_project]
28pub struct FilterCursor {
29    buffer: Arc<SharedBuffer>,
30    index: u64,
31    message_id: u64,
32    end: Option<u64>,
33    selectors: Vec<ComponentSelector>,
34    messages: BinaryHeap<MessageRef>,
35    flush_sockets_for_snapshot: bool,
36    #[pin]
37    waker_entry: WakerEntry<Inner>,
38}
39
40impl FilterCursor {
41    pub fn new(
42        buffer: Arc<SharedBuffer>,
43        index: u64,
44        message_id: u64,
45        snapshot: bool,
46        selectors: Vec<ComponentSelector>,
47    ) -> Self {
48        let waker_entry = buffer.inner.waker_entry();
49        Self {
50            buffer,
51            index,
52            message_id,
53            end: None,
54            selectors,
55            messages: BinaryHeap::new(),
56            waker_entry,
57            flush_sockets_for_snapshot: snapshot,
58        }
59    }
60
61    /// Polls for the next message.  We can't use the Stream trait because this is a lending
62    /// iterator; it returns a reference to a message (and will hold a lock).  Depending on the use,
63    /// this can be more efficient than using a Stream which will involve copying the message.
64    pub fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Message<'_>>> {
65        let this = self.project();
66
67        if *this.flush_sockets_for_snapshot {
68            let mut inner = InnerGuard::new(this.buffer);
69            *this.end = Some(this.buffer.flush_sockets(&mut inner));
70            *this.flush_sockets_for_snapshot = false;
71        }
72
73        let mut inner = this.buffer.inner.lock();
74
75        // Comparing monikers can be expensive, so memoize the results.
76        struct ContainerIds {
77            ids: [ContainerId; 8],
78            result: u8,
79            pos: usize,
80        }
81
82        impl ContainerIds {
83            /// Memoizes the result of `predicate` for a given `id`.
84            fn memoize(&mut self, id: ContainerId, predicate: impl FnOnce() -> bool) -> bool {
85                if let Some(pos) = self.ids.iter().position(|i| i == &id) {
86                    self.result & (1 << pos) != 0
87                } else {
88                    let result = predicate();
89                    self.ids[self.pos] = id;
90                    if result {
91                        self.result |= 1 << self.pos;
92                    } else {
93                        self.result &= !(1 << self.pos);
94                    }
95                    self.pos = (self.pos + 1) % self.ids.len();
96                    result
97                }
98            }
99        }
100
101        let mut container_ids = ContainerIds { ids: [ContainerId(0xffff); 8], result: 0, pos: 0 };
102
103        // NOTE: If messages are dropped the dropped count won't account for filtering: it will
104        // include messages that have been dropped that don't match the filter.  Fixing this is
105        // difficult and not worth the effort.  Dropped messages should be rare and the common case
106        // is that there is no filtering.
107        let mut dropped = 0;
108        if *this.index < inner.tail {
109            *this.index = inner.tail;
110            dropped += inner.tail_message_id - *this.message_id;
111            *this.message_id = inner.tail_message_id;
112        }
113        while *this.index < inner.last_scanned && this.end.is_none_or(|e| *this.index < e) {
114            // SAFETY: We've checked index >= inner.tail, so the range must be valid.  `msg`
115            // will remain valid whilst we're holding the lock.
116            let (component, msg, timestamp) =
117                unsafe { inner.parse_message(*this.index..inner.last_scanned) };
118
119            if let Some(timestamp) = timestamp
120                && let Some(container) = inner.containers.get(component)
121                && (this.selectors.is_empty()
122                    || container_ids.memoize(component, || {
123                        matches_selectors(&container.identity.moniker, this.selectors)
124                    }))
125            {
126                this.messages.push(MessageRef { index: *this.index, timestamp });
127            }
128
129            *this.index += ring_buffer_record_len(msg.len()) as u64;
130            *this.message_id += 1;
131        }
132        while let Some(message) = this.messages.pop() {
133            if message.index < inner.tail {
134                // See note above regarding filtering.
135                dropped += 1;
136                continue;
137            }
138            return Poll::Ready(Some(Message { inner, message, dropped }));
139        }
140        if this.end.is_some_and(|e| *this.index >= e) || inner.terminated {
141            *this.index = u64::MAX;
142            Poll::Ready(None)
143        } else {
144            inner.add_waker(this.waker_entry, cx.waker().clone());
145            Poll::Pending
146        }
147    }
148
149    fn is_terminated(&self) -> bool {
150        self.index == u64::MAX
151    }
152}
153
154#[derive(Eq)]
155struct MessageRef {
156    timestamp: zx::BootInstant,
157    index: u64,
158}
159
160impl PartialEq for MessageRef {
161    fn eq(&self, other: &Self) -> bool {
162        self.timestamp == other.timestamp && self.index == other.index
163    }
164}
165
166impl PartialOrd for MessageRef {
167    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
168        Some(self.cmp(other))
169    }
170}
171
172impl Ord for MessageRef {
173    fn cmp(&self, other: &Self) -> Ordering {
174        // BinaryHeap is a max-heap, but we want min, hence this ordering.
175        other.timestamp.cmp(&self.timestamp).then_with(|| other.index.cmp(&self.index))
176    }
177}
178
179pub struct Message<'a> {
180    inner: ConditionGuard<'a, Inner>,
181    message: MessageRef,
182    dropped: u64,
183}
184
185impl Message<'_> {
186    /// Returns the component and FXT bytes.  The FXT record is validated to be the correct length
187    /// and type.
188    fn parse(&self) -> Result<(&Arc<ComponentIdentity>, u64, &[u8]), MessageError> {
189        // SAFETY: We hold a lock which prevents the buffer from being drained so
190        // it should be safe to read this range.
191        let (container, data, _) =
192            unsafe { self.inner.parse_message(self.message.index..self.inner.last_scanned) };
193        let container_id = container;
194        let (header, _) = Header::read_from_prefix(data)
195            .map_err(|_| MessageError::from(ParseError::InvalidHeader))?;
196        let msg_len = header.size_words() as usize * 8;
197        if msg_len > data.len() || msg_len < 16 {
198            return Err(ParseError::ValueOutOfValidRange.into());
199        }
200        if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE {
201            return Err(ParseError::InvalidRecordType.into());
202        }
203        let container = self.inner.containers.get(container).unwrap();
204        Ok((&container.identity, container_id.0 as u64, &data[..msg_len]))
205    }
206}
207
208impl TryFrom<Message<'_>> for LogsData {
209    type Error = MessageError;
210
211    fn try_from(value: Message<'_>) -> Result<Self, Self::Error> {
212        let (container, _tag, data) = value.parse()?;
213        let mut data = diagnostics_message::from_structured(container.as_ref().into(), data)?;
214        if value.dropped > 0 {
215            data.metadata
216                .errors
217                .get_or_insert(vec![])
218                .push(LogError::RolledOutLogs { count: value.dropped });
219        }
220        Ok(data)
221    }
222}
223
224/// A copy of the raw message.
225#[derive(Debug)]
226pub struct FxtMessage {
227    data: Box<[u8]>,
228    dropped: u64,
229    component_identity: Arc<ComponentIdentity>,
230    tag: u64,
231}
232
233impl FxtMessage {
234    pub fn data(&self) -> &[u8] {
235        &self.data
236    }
237
238    pub fn tag(&self) -> u64 {
239        self.tag
240    }
241
242    pub fn dropped(&self) -> u64 {
243        self.dropped
244    }
245
246    pub fn component_identity(&self) -> &Arc<ComponentIdentity> {
247        &self.component_identity
248    }
249
250    pub fn timestamp(&self) -> zx::BootInstant {
251        zx::BootInstant::from_nanos(i64::read_from_bytes(&self.data[8..16]).unwrap())
252    }
253
254    #[cfg(test)]
255    pub fn new_test(
256        data: Box<[u8]>,
257        dropped: u64,
258        component_identity: Arc<ComponentIdentity>,
259        tag: u64,
260    ) -> Self {
261        Self { data, dropped, component_identity, tag }
262    }
263}
264
265impl TryFrom<Message<'_>> for FxtMessage {
266    type Error = MessageError;
267
268    fn try_from(value: Message<'_>) -> Result<Self, Self::Error> {
269        let (component, tag, data) = value.parse()?;
270        let mut data: Box<[u8]> = data.into();
271        if data.len() >= 8 {
272            let mut header = Header::read_from_bytes(&data[0..8]).unwrap();
273            header.set_tag(tag as u32);
274            data[0..8].copy_from_slice(header.as_bytes());
275        }
276        Ok(FxtMessage {
277            data,
278            dropped: value.dropped,
279            tag,
280            component_identity: Arc::clone(component),
281        })
282    }
283}
284
285/// Like FilterCursor but returns a stream of LogsData.
286#[pin_project]
287pub struct FilterCursorStream<T> {
288    #[pin]
289    cursor: FilterCursor,
290    phantom: PhantomData<T>,
291}
292
293impl<T> Stream for FilterCursorStream<T>
294where
295    T: for<'a> TryFrom<Message<'a>>,
296{
297    type Item = T;
298
299    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
300        let mut this = self.project();
301        loop {
302            match ready!(this.cursor.as_mut().poll_next(cx)) {
303                Some(item) => {
304                    if let Ok(data) = T::try_from(item) {
305                        return Poll::Ready(Some(data));
306                    }
307                    // The message is bad, just ignore it.
308                }
309                None => return Poll::Ready(None),
310            }
311        }
312    }
313}
314
315impl<T> FusedStream for FilterCursorStream<T>
316where
317    T: for<'a> TryFrom<Message<'a>>,
318{
319    fn is_terminated(&self) -> bool {
320        self.cursor.is_terminated()
321    }
322}
323
324impl<T> From<FilterCursor> for FilterCursorStream<T> {
325    fn from(cursor: FilterCursor) -> Self {
326        Self { cursor, phantom: PhantomData }
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333    use crate::logs::shared_buffer::{
334        InnerGuard, SharedBuffer, SharedBufferOptions, create_ring_buffer,
335    };
336    use crate::logs::testing::make_message;
337    use assert_matches::assert_matches;
338    use fidl_fuchsia_diagnostics::StreamMode;
339    use futures::StreamExt;
340    use selectors::{FastError, parse_component_selector};
341    use std::pin::pin;
342    use std::time::Duration;
343
344    #[fuchsia::test]
345    async fn cursor_basic() {
346        let buffer =
347            SharedBuffer::new(create_ring_buffer(65536), Box::new(|_| {}), Default::default());
348        let container = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
349        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
350        container.push_back(msg.bytes());
351
352        let cursor = buffer.cursor(StreamMode::Snapshot, vec![]);
353        let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
354
355        let item = stream.next().await.unwrap();
356        assert_eq!(item.msg().unwrap(), "a");
357        assert!(stream.next().await.is_none());
358    }
359
360    #[fuchsia::test]
361    async fn cursor_filter() {
362        let buffer =
363            SharedBuffer::new(create_ring_buffer(65536), Box::new(|_| {}), Default::default());
364        let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
365        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
366
367        container_a.push_back(make_message("msg_a", None, zx::BootInstant::from_nanos(1)).bytes());
368        container_b.push_back(make_message("msg_b", None, zx::BootInstant::from_nanos(2)).bytes());
369
370        let selector = parse_component_selector::<FastError>("a").unwrap();
371        let cursor = buffer.cursor(StreamMode::Snapshot, vec![selector]);
372        let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
373
374        let item = stream.next().await.unwrap();
375        assert_eq!(item.msg().unwrap(), "msg_a");
376        assert!(stream.next().await.is_none());
377    }
378
379    #[fuchsia::test]
380    async fn cursor_subscribe() {
381        let buffer =
382            SharedBuffer::new(create_ring_buffer(65536), Box::new(|_| {}), Default::default());
383        let container = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
384
385        let cursor = buffer.cursor(StreamMode::Subscribe, vec![]);
386        let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
387
388        assert!(futures::poll!(stream.next()).is_pending());
389
390        container.push_back(make_message("msg", None, zx::BootInstant::from_nanos(1)).bytes());
391
392        let item = stream.next().await.unwrap();
393        assert_eq!(item.msg().unwrap(), "msg");
394    }
395
396    #[fuchsia::test]
397    async fn cursor_snapshot_then_subscribe() {
398        let buffer =
399            SharedBuffer::new(create_ring_buffer(65536), Box::new(|_| {}), Default::default());
400        let container = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
401
402        container.push_back(make_message("msg1", None, zx::BootInstant::from_nanos(1)).bytes());
403
404        let cursor = buffer.cursor(StreamMode::SnapshotThenSubscribe, vec![]);
405        let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
406
407        let item = stream.next().await.unwrap();
408        assert_eq!(item.msg().unwrap(), "msg1");
409
410        assert!(futures::poll!(stream.next()).is_pending());
411
412        container.push_back(make_message("msg2", None, zx::BootInstant::from_nanos(2)).bytes());
413
414        let item = stream.next().await.unwrap();
415        assert_eq!(item.msg().unwrap(), "msg2");
416    }
417
418    #[fuchsia::test]
419    async fn cursor_dropped_logs() {
420        let buffer = SharedBuffer::new(
421            create_ring_buffer(65536),
422            Box::new(|_| {}),
423            SharedBufferOptions { sleep_time: Duration::ZERO },
424        );
425        let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
426        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
427
428        let msg = make_message("msg", None, zx::BootInstant::from_nanos(1));
429
430        container_a.push_back(msg.bytes()); // 1
431        container_a.push_back(msg.bytes()); // 2
432        container_a.push_back(msg.bytes()); // 3
433
434        let cursor = buffer.cursor(StreamMode::SnapshotThenSubscribe, vec![]);
435        let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
436
437        // Consume one
438        let item = stream.next().await.unwrap();
439        assert_eq!(item.msg().unwrap(), "msg");
440
441        // Force roll out of remaining A messages
442        while pin!(FilterCursorStream::<FxtMessage>::from(buffer.cursor(
443            StreamMode::Snapshot,
444            vec![parse_component_selector::<FastError>("a").unwrap()]
445        )))
446        .next()
447        .await
448        .is_some()
449        {
450            container_b.push_back(msg.bytes());
451            // Force the buffer to pop old messages.
452            {
453                let mut inner = InnerGuard::new(&buffer);
454                inner.check_space(inner.ring_buffer.head());
455            }
456        }
457
458        // Next item should indicate the two dropped messages for container A.
459        let item = stream.next().await.unwrap();
460        assert_eq!(item.metadata.errors.as_ref().unwrap().len(), 1);
461        assert_matches!(
462            item.metadata.errors.as_ref().unwrap()[0],
463            LogError::RolledOutLogs { count } if count == 2
464        );
465    }
466
467    #[fuchsia::test]
468    async fn terminate_terminates_cursor() {
469        let buffer = SharedBuffer::new(
470            create_ring_buffer(65536),
471            Box::new(|_| {}),
472            SharedBufferOptions { sleep_time: Duration::from_secs(10) },
473        );
474        let cursor = pin!(FilterCursorStream::<LogsData>::from(
475            buffer.cursor(StreamMode::SnapshotThenSubscribe, vec![])
476        ));
477        let container = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
478        container.push_back(make_message("msg", None, zx::BootInstant::from_nanos(1)).bytes());
479        drop(buffer.terminate());
480        assert_eq!(cursor.count().await, 1);
481    }
482}