archivist_lib/logs/shared_buffer/
cursor.rs1use super::{ContainerId, Inner, InnerGuard, SharedBuffer};
6use crate::identity::ComponentIdentity;
7use diagnostics_data::{LogError, LogsData};
8use diagnostics_log_encoding::parse::ParseError;
9use diagnostics_log_encoding::{Header, TRACING_FORMAT_LOG_RECORD_TYPE};
10use diagnostics_message::error::MessageError;
11use fidl_fuchsia_diagnostics::ComponentSelector;
12use fuchsia_async::condition::{ConditionGuard, WakerEntry};
13use futures::Stream;
14use futures::stream::FusedStream;
15use pin_project::pin_project;
16use ring_buffer::ring_buffer_record_len;
17use selectors::matches_selectors;
18use std::cmp::Ordering;
19use std::collections::BinaryHeap;
20use std::marker::PhantomData;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll, ready};
24use zerocopy::{FromBytes, IntoBytes};
25
26#[pin_project]
28pub struct FilterCursor {
29 buffer: Arc<SharedBuffer>,
30 index: u64,
31 message_id: u64,
32 end: Option<u64>,
33 selectors: Vec<ComponentSelector>,
34 messages: BinaryHeap<MessageRef>,
35 flush_sockets_for_snapshot: bool,
36 #[pin]
37 waker_entry: WakerEntry<Inner>,
38}
39
40impl FilterCursor {
41 pub fn new(
42 buffer: Arc<SharedBuffer>,
43 index: u64,
44 message_id: u64,
45 snapshot: bool,
46 selectors: Vec<ComponentSelector>,
47 ) -> Self {
48 let waker_entry = buffer.inner.waker_entry();
49 Self {
50 buffer,
51 index,
52 message_id,
53 end: None,
54 selectors,
55 messages: BinaryHeap::new(),
56 waker_entry,
57 flush_sockets_for_snapshot: snapshot,
58 }
59 }
60
61 pub fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Message<'_>>> {
65 let this = self.project();
66
67 if *this.flush_sockets_for_snapshot {
68 let mut inner = InnerGuard::new(this.buffer);
69 *this.end = Some(this.buffer.flush_sockets(&mut inner));
70 *this.flush_sockets_for_snapshot = false;
71 }
72
73 let mut inner = this.buffer.inner.lock();
74
75 struct ContainerIds {
77 ids: [ContainerId; 8],
78 result: u8,
79 pos: usize,
80 }
81
82 impl ContainerIds {
83 fn memoize(&mut self, id: ContainerId, predicate: impl FnOnce() -> bool) -> bool {
85 if let Some(pos) = self.ids.iter().position(|i| i == &id) {
86 self.result & (1 << pos) != 0
87 } else {
88 let result = predicate();
89 self.ids[self.pos] = id;
90 if result {
91 self.result |= 1 << self.pos;
92 } else {
93 self.result &= !(1 << self.pos);
94 }
95 self.pos = (self.pos + 1) % self.ids.len();
96 result
97 }
98 }
99 }
100
101 let mut container_ids = ContainerIds { ids: [ContainerId(0xffff); 8], result: 0, pos: 0 };
102
103 let mut dropped = 0;
108 if *this.index < inner.tail {
109 *this.index = inner.tail;
110 dropped += inner.tail_message_id - *this.message_id;
111 *this.message_id = inner.tail_message_id;
112 }
113 while *this.index < inner.last_scanned && this.end.is_none_or(|e| *this.index < e) {
114 let (component, msg, timestamp) =
117 unsafe { inner.parse_message(*this.index..inner.last_scanned) };
118
119 if let Some(timestamp) = timestamp
120 && let Some(container) = inner.containers.get(component)
121 && (this.selectors.is_empty()
122 || container_ids.memoize(component, || {
123 matches_selectors(&container.identity.moniker, this.selectors)
124 }))
125 {
126 this.messages.push(MessageRef { index: *this.index, timestamp });
127 }
128
129 *this.index += ring_buffer_record_len(msg.len()) as u64;
130 *this.message_id += 1;
131 }
132 while let Some(message) = this.messages.pop() {
133 if message.index < inner.tail {
134 dropped += 1;
136 continue;
137 }
138 return Poll::Ready(Some(Message { inner, message, dropped }));
139 }
140 if this.end.is_some_and(|e| *this.index >= e) || inner.terminated {
141 *this.index = u64::MAX;
142 Poll::Ready(None)
143 } else {
144 inner.add_waker(this.waker_entry, cx.waker().clone());
145 Poll::Pending
146 }
147 }
148
149 fn is_terminated(&self) -> bool {
150 self.index == u64::MAX
151 }
152}
153
154#[derive(Eq)]
155struct MessageRef {
156 timestamp: zx::BootInstant,
157 index: u64,
158}
159
160impl PartialEq for MessageRef {
161 fn eq(&self, other: &Self) -> bool {
162 self.timestamp == other.timestamp && self.index == other.index
163 }
164}
165
166impl PartialOrd for MessageRef {
167 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
168 Some(self.cmp(other))
169 }
170}
171
172impl Ord for MessageRef {
173 fn cmp(&self, other: &Self) -> Ordering {
174 other.timestamp.cmp(&self.timestamp).then_with(|| other.index.cmp(&self.index))
176 }
177}
178
179pub struct Message<'a> {
180 inner: ConditionGuard<'a, Inner>,
181 message: MessageRef,
182 dropped: u64,
183}
184
185impl Message<'_> {
186 fn parse(&self) -> Result<(&Arc<ComponentIdentity>, u64, &[u8]), MessageError> {
189 let (container, data, _) =
192 unsafe { self.inner.parse_message(self.message.index..self.inner.last_scanned) };
193 let container_id = container;
194 let (header, _) = Header::read_from_prefix(data)
195 .map_err(|_| MessageError::from(ParseError::InvalidHeader))?;
196 let msg_len = header.size_words() as usize * 8;
197 if msg_len > data.len() || msg_len < 16 {
198 return Err(ParseError::ValueOutOfValidRange.into());
199 }
200 if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE {
201 return Err(ParseError::InvalidRecordType.into());
202 }
203 let container = self.inner.containers.get(container).unwrap();
204 Ok((&container.identity, container_id.0 as u64, &data[..msg_len]))
205 }
206}
207
208impl TryFrom<Message<'_>> for LogsData {
209 type Error = MessageError;
210
211 fn try_from(value: Message<'_>) -> Result<Self, Self::Error> {
212 let (container, _tag, data) = value.parse()?;
213 let mut data = diagnostics_message::from_structured(container.as_ref().into(), data)?;
214 if value.dropped > 0 {
215 data.metadata
216 .errors
217 .get_or_insert(vec![])
218 .push(LogError::RolledOutLogs { count: value.dropped });
219 }
220 Ok(data)
221 }
222}
223
224#[derive(Debug)]
226pub struct FxtMessage {
227 data: Box<[u8]>,
228 dropped: u64,
229 component_identity: Arc<ComponentIdentity>,
230 tag: u64,
231}
232
233impl FxtMessage {
234 pub fn data(&self) -> &[u8] {
235 &self.data
236 }
237
238 pub fn tag(&self) -> u64 {
239 self.tag
240 }
241
242 pub fn dropped(&self) -> u64 {
243 self.dropped
244 }
245
246 pub fn component_identity(&self) -> &Arc<ComponentIdentity> {
247 &self.component_identity
248 }
249
250 pub fn timestamp(&self) -> zx::BootInstant {
251 zx::BootInstant::from_nanos(i64::read_from_bytes(&self.data[8..16]).unwrap())
252 }
253
254 #[cfg(test)]
255 pub fn new_test(
256 data: Box<[u8]>,
257 dropped: u64,
258 component_identity: Arc<ComponentIdentity>,
259 tag: u64,
260 ) -> Self {
261 Self { data, dropped, component_identity, tag }
262 }
263}
264
265impl TryFrom<Message<'_>> for FxtMessage {
266 type Error = MessageError;
267
268 fn try_from(value: Message<'_>) -> Result<Self, Self::Error> {
269 let (component, tag, data) = value.parse()?;
270 let mut data: Box<[u8]> = data.into();
271 if data.len() >= 8 {
272 let mut header = Header::read_from_bytes(&data[0..8]).unwrap();
273 header.set_tag(tag as u32);
274 data[0..8].copy_from_slice(header.as_bytes());
275 }
276 Ok(FxtMessage {
277 data,
278 dropped: value.dropped,
279 tag,
280 component_identity: Arc::clone(component),
281 })
282 }
283}
284
285#[pin_project]
287pub struct FilterCursorStream<T> {
288 #[pin]
289 cursor: FilterCursor,
290 phantom: PhantomData<T>,
291}
292
293impl<T> Stream for FilterCursorStream<T>
294where
295 T: for<'a> TryFrom<Message<'a>>,
296{
297 type Item = T;
298
299 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
300 let mut this = self.project();
301 loop {
302 match ready!(this.cursor.as_mut().poll_next(cx)) {
303 Some(item) => {
304 if let Ok(data) = T::try_from(item) {
305 return Poll::Ready(Some(data));
306 }
307 }
309 None => return Poll::Ready(None),
310 }
311 }
312 }
313}
314
315impl<T> FusedStream for FilterCursorStream<T>
316where
317 T: for<'a> TryFrom<Message<'a>>,
318{
319 fn is_terminated(&self) -> bool {
320 self.cursor.is_terminated()
321 }
322}
323
324impl<T> From<FilterCursor> for FilterCursorStream<T> {
325 fn from(cursor: FilterCursor) -> Self {
326 Self { cursor, phantom: PhantomData }
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333 use crate::logs::shared_buffer::{
334 InnerGuard, SharedBuffer, SharedBufferOptions, create_ring_buffer,
335 };
336 use crate::logs::testing::make_message;
337 use assert_matches::assert_matches;
338 use fidl_fuchsia_diagnostics::StreamMode;
339 use futures::StreamExt;
340 use selectors::{FastError, parse_component_selector};
341 use std::pin::pin;
342 use std::time::Duration;
343
344 #[fuchsia::test]
345 async fn cursor_basic() {
346 let buffer =
347 SharedBuffer::new(create_ring_buffer(65536), Box::new(|_| {}), Default::default());
348 let container = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
349 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
350 container.push_back(msg.bytes());
351
352 let cursor = buffer.cursor(StreamMode::Snapshot, vec![]);
353 let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
354
355 let item = stream.next().await.unwrap();
356 assert_eq!(item.msg().unwrap(), "a");
357 assert!(stream.next().await.is_none());
358 }
359
360 #[fuchsia::test]
361 async fn cursor_filter() {
362 let buffer =
363 SharedBuffer::new(create_ring_buffer(65536), Box::new(|_| {}), Default::default());
364 let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
365 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
366
367 container_a.push_back(make_message("msg_a", None, zx::BootInstant::from_nanos(1)).bytes());
368 container_b.push_back(make_message("msg_b", None, zx::BootInstant::from_nanos(2)).bytes());
369
370 let selector = parse_component_selector::<FastError>("a").unwrap();
371 let cursor = buffer.cursor(StreamMode::Snapshot, vec![selector]);
372 let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
373
374 let item = stream.next().await.unwrap();
375 assert_eq!(item.msg().unwrap(), "msg_a");
376 assert!(stream.next().await.is_none());
377 }
378
379 #[fuchsia::test]
380 async fn cursor_subscribe() {
381 let buffer =
382 SharedBuffer::new(create_ring_buffer(65536), Box::new(|_| {}), Default::default());
383 let container = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
384
385 let cursor = buffer.cursor(StreamMode::Subscribe, vec![]);
386 let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
387
388 assert!(futures::poll!(stream.next()).is_pending());
389
390 container.push_back(make_message("msg", None, zx::BootInstant::from_nanos(1)).bytes());
391
392 let item = stream.next().await.unwrap();
393 assert_eq!(item.msg().unwrap(), "msg");
394 }
395
396 #[fuchsia::test]
397 async fn cursor_snapshot_then_subscribe() {
398 let buffer =
399 SharedBuffer::new(create_ring_buffer(65536), Box::new(|_| {}), Default::default());
400 let container = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
401
402 container.push_back(make_message("msg1", None, zx::BootInstant::from_nanos(1)).bytes());
403
404 let cursor = buffer.cursor(StreamMode::SnapshotThenSubscribe, vec![]);
405 let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
406
407 let item = stream.next().await.unwrap();
408 assert_eq!(item.msg().unwrap(), "msg1");
409
410 assert!(futures::poll!(stream.next()).is_pending());
411
412 container.push_back(make_message("msg2", None, zx::BootInstant::from_nanos(2)).bytes());
413
414 let item = stream.next().await.unwrap();
415 assert_eq!(item.msg().unwrap(), "msg2");
416 }
417
418 #[fuchsia::test]
419 async fn cursor_dropped_logs() {
420 let buffer = SharedBuffer::new(
421 create_ring_buffer(65536),
422 Box::new(|_| {}),
423 SharedBufferOptions { sleep_time: Duration::ZERO },
424 );
425 let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
426 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
427
428 let msg = make_message("msg", None, zx::BootInstant::from_nanos(1));
429
430 container_a.push_back(msg.bytes()); container_a.push_back(msg.bytes()); container_a.push_back(msg.bytes()); let cursor = buffer.cursor(StreamMode::SnapshotThenSubscribe, vec![]);
435 let mut stream = pin!(FilterCursorStream::<LogsData>::from(cursor));
436
437 let item = stream.next().await.unwrap();
439 assert_eq!(item.msg().unwrap(), "msg");
440
441 while pin!(FilterCursorStream::<FxtMessage>::from(buffer.cursor(
443 StreamMode::Snapshot,
444 vec![parse_component_selector::<FastError>("a").unwrap()]
445 )))
446 .next()
447 .await
448 .is_some()
449 {
450 container_b.push_back(msg.bytes());
451 {
453 let mut inner = InnerGuard::new(&buffer);
454 inner.check_space(inner.ring_buffer.head());
455 }
456 }
457
458 let item = stream.next().await.unwrap();
460 assert_eq!(item.metadata.errors.as_ref().unwrap().len(), 1);
461 assert_matches!(
462 item.metadata.errors.as_ref().unwrap()[0],
463 LogError::RolledOutLogs { count } if count == 2
464 );
465 }
466
467 #[fuchsia::test]
468 async fn terminate_terminates_cursor() {
469 let buffer = SharedBuffer::new(
470 create_ring_buffer(65536),
471 Box::new(|_| {}),
472 SharedBufferOptions { sleep_time: Duration::from_secs(10) },
473 );
474 let cursor = pin!(FilterCursorStream::<LogsData>::from(
475 buffer.cursor(StreamMode::SnapshotThenSubscribe, vec![])
476 ));
477 let container = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
478 container.push_back(make_message("msg", None, zx::BootInstant::from_nanos(1)).bytes());
479 drop(buffer.terminate());
480 assert_eq!(cursor.count().await, 1);
481 }
482}