archivist_lib/logs/
shared_buffer.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::identity::ComponentIdentity;
6use crate::logs::repository::ARCHIVIST_MONIKER;
7use crate::logs::stats::LogStreamStats;
8use crate::logs::stored_message::StoredMessage;
9use derivative::Derivative;
10use diagnostics_log_encoding::encode::add_dropped_count;
11use diagnostics_log_encoding::{Header, TRACING_FORMAT_LOG_RECORD_TYPE};
12use fidl_fuchsia_diagnostics::StreamMode;
13use fidl_fuchsia_logger::MAX_DATAGRAM_LEN_BYTES;
14use fuchsia_async as fasync;
15use fuchsia_async::condition::{Condition, ConditionGuard, WakerEntry};
16use fuchsia_sync::Mutex;
17use futures::Stream;
18use futures::channel::oneshot;
19use log::debug;
20use pin_project::{pin_project, pinned_drop};
21use ring_buffer::{self, RingBuffer, ring_buffer_record_len};
22use std::collections::VecDeque;
23use std::mem::ManuallyDrop;
24use std::ops::{Deref, DerefMut, Range};
25use std::pin::Pin;
26use std::sync::{Arc, Weak};
27use std::task::{Context, Poll};
28use std::time::Duration;
29use zerocopy::FromBytes;
30use zx::AsHandleRef as _;
31
32// Aim to keep 25% of the buffer free. This is expressed as a fraction: numerator / denominator.
33const SPACE_THRESHOLD_NUMERATOR: usize = 1;
34const SPACE_THRESHOLD_DENOMINATOR: usize = 4;
35
36// The default amount of time that Archivist will sleep for to reduce how often it wakes up to
37// handle log messages.
38const DEFAULT_SLEEP_TIME: Duration = Duration::from_millis(200);
39
40pub fn create_ring_buffer(capacity: usize) -> ring_buffer::Reader {
41    RingBuffer::create(calculate_real_size_given_desired_capacity(capacity))
42}
43
44fn calculate_real_size_given_desired_capacity(capacity: usize) -> usize {
45    // We always keep spare space in the buffer so that we don't drop messages.  This is controlled
46    // by SPACE_THRESHOLD_NUMERATOR & SPACE_THRESHOLD_DENOMINATOR.  Round up capacity so that
47    // `capacity` reflects the actual amount of log data we can store.
48    let page_size = zx::system_get_page_size() as usize;
49    (capacity * SPACE_THRESHOLD_DENOMINATOR
50        / (SPACE_THRESHOLD_DENOMINATOR - SPACE_THRESHOLD_NUMERATOR))
51        .next_multiple_of(page_size)
52}
53
54const IOB_PEER_CLOSED_KEY_BASE: u64 = 0x8000_0000_0000_0000;
55
56pub type OnInactive = Box<dyn Fn(Arc<ComponentIdentity>) + Send + Sync>;
57
58pub struct SharedBuffer {
59    inner: Condition<Inner>,
60
61    // Sockets. This *must* be locked after `inner` and the lock *must* be dropped before
62    // `InnerGuard` is dropped.
63    sockets: Mutex<Slab<Socket>>,
64
65    // Callback for when a container is inactive (i.e. no logs and no sockets).
66    on_inactive: OnInactive,
67
68    // The port that we use to service the sockets.
69    port: zx::Port,
70
71    // Event used for termination.
72    event: zx::Event,
73
74    // The thread that we use to service the sockets.
75    socket_thread: Mutex<Option<std::thread::JoinHandle<()>>>,
76
77    // The task responsible for monitoring the space in the IOBuffer and popping messages when it
78    // gets full. It will also wake cursors whenever new data arrives.
79    _buffer_monitor_task: fasync::Task<()>,
80}
81
82struct InnerGuard<'a> {
83    buffer: &'a SharedBuffer,
84
85    guard: ManuallyDrop<ConditionGuard<'a, Inner>>,
86
87    // The list of components that should be reported as inactive once the lock is dropped.
88    on_inactive: Vec<Arc<ComponentIdentity>>,
89
90    // If true, wake all the wakers registered with the ConditionGuard.
91    wake: bool,
92}
93
94impl Drop for InnerGuard<'_> {
95    fn drop(&mut self) {
96        if self.wake {
97            for waker in self.guard.drain_wakers() {
98                waker.wake();
99            }
100        }
101        // SAFETY: This is the only place we drop the guard.
102        unsafe {
103            ManuallyDrop::drop(&mut self.guard);
104        }
105        for identity in self.on_inactive.drain(..) {
106            (*self.buffer.on_inactive)(identity);
107        }
108    }
109}
110
111impl Deref for InnerGuard<'_> {
112    type Target = Inner;
113
114    fn deref(&self) -> &Self::Target {
115        &self.guard
116    }
117}
118
119impl DerefMut for InnerGuard<'_> {
120    fn deref_mut(&mut self) -> &mut Self::Target {
121        &mut self.guard
122    }
123}
124
125struct Inner {
126    // The ring buffer.
127    ring_buffer: Arc<RingBuffer>,
128
129    // Registered containers.
130    containers: Containers,
131
132    // Socket thread message queue.
133    thread_msg_queue: VecDeque<ThreadMessage>,
134
135    // The index in the buffer that we have scanned for messages.
136    last_scanned: u64,
137
138    // A copy of the tail index in the ring buffer.
139    tail: u64,
140
141    // The IOBuffer peers that we must watch.
142    iob_peers: Slab<(ContainerId, zx::Iob)>,
143}
144
145enum ThreadMessage {
146    // The thread should terminate.
147    Terminate,
148
149    // Process all pending socket messages and report via the Sender when done.
150    Flush(oneshot::Sender<()>),
151}
152
153pub struct SharedBufferOptions {
154    // To reduce how often Archivist wakes when new messages are written, Archivist will sleep for
155    // this time. This will impact how quickly messages show up via the cursors.
156    pub sleep_time: Duration,
157}
158
159impl Default for SharedBufferOptions {
160    fn default() -> Self {
161        Self { sleep_time: DEFAULT_SLEEP_TIME }
162    }
163}
164
165impl SharedBuffer {
166    /// Returns a new shared buffer and the container for Archivist.
167    pub fn new(
168        ring_buffer: ring_buffer::Reader,
169        on_inactive: OnInactive,
170        options: SharedBufferOptions,
171    ) -> Arc<Self> {
172        let this = Arc::new_cyclic(|weak: &Weak<Self>| Self {
173            inner: Condition::new(Inner {
174                ring_buffer: Arc::clone(&ring_buffer),
175                containers: Containers::default(),
176                thread_msg_queue: VecDeque::default(),
177                last_scanned: 0,
178                tail: 0,
179                iob_peers: Slab::default(),
180            }),
181            sockets: Mutex::new(Slab::default()),
182            on_inactive,
183            port: zx::Port::create(),
184            event: zx::Event::create(),
185            socket_thread: Mutex::default(),
186            _buffer_monitor_task: fasync::Task::spawn(Self::buffer_monitor_task(
187                Weak::clone(weak),
188                ring_buffer,
189                options.sleep_time,
190            )),
191        });
192
193        *this.socket_thread.lock() = Some({
194            let this = Arc::clone(&this);
195            std::thread::spawn(move || this.socket_thread(options.sleep_time))
196        });
197        this
198    }
199
200    pub fn new_container_buffer(
201        self: &Arc<Self>,
202        identity: Arc<ComponentIdentity>,
203        stats: Arc<LogStreamStats>,
204    ) -> ContainerBuffer {
205        let mut inner = self.inner.lock();
206        let Inner { containers, ring_buffer, .. } = &mut *inner;
207        ContainerBuffer {
208            shared_buffer: Arc::clone(self),
209            container_id: containers.new_container(ring_buffer, Arc::clone(&identity), stats),
210        }
211    }
212
213    pub async fn flush(&self) {
214        let (sender, receiver) = oneshot::channel();
215        self.inner.lock().thread_msg_queue.push_back(ThreadMessage::Flush(sender));
216        self.event.signal_handle(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
217        // Ignore failure if Archivist is shutting down.
218        let _ = receiver.await;
219    }
220
221    /// Returns the number of registered containers in use by the buffer.
222    #[cfg(test)]
223    pub fn container_count(&self) -> usize {
224        self.inner.lock().containers.len()
225    }
226
227    /// Terminates the socket thread. The socket thread will drain the sockets before terminating.
228    pub async fn terminate(&self) {
229        self.inner.lock().thread_msg_queue.push_back(ThreadMessage::Terminate);
230        self.event.signal_handle(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
231        let join_handle = self.socket_thread.lock().take().unwrap();
232        fasync::unblock(|| {
233            let _ = join_handle.join();
234        })
235        .await;
236    }
237
238    fn socket_thread(&self, sleep_time: Duration) {
239        const INTERRUPT_KEY: u64 = u64::MAX;
240        let mut sockets_ready = Vec::new();
241        let mut iob_peer_closed = Vec::new();
242        let mut interrupt_needs_arming = true;
243        let mut msg = None;
244
245        loop {
246            let mut deadline = if msg.is_some() {
247                zx::MonotonicInstant::INFINITE_PAST
248            } else {
249                if interrupt_needs_arming {
250                    self.event
251                        .wait_async_handle(
252                            &self.port,
253                            INTERRUPT_KEY,
254                            zx::Signals::USER_0,
255                            zx::WaitAsyncOpts::empty(),
256                        )
257                        .unwrap();
258                    interrupt_needs_arming = false;
259                }
260
261                // Wait so that we're not constantly waking up for every log message that is queued.
262                // Ignore errors here.
263                let _ = self.event.wait_handle(
264                    zx::Signals::USER_0,
265                    zx::MonotonicInstant::after(sleep_time.into()),
266                );
267                zx::MonotonicInstant::INFINITE
268            };
269
270            // Gather the list of sockets that are ready to read.
271            loop {
272                match self.port.wait(deadline) {
273                    Ok(packet) => {
274                        if packet.key() == INTERRUPT_KEY {
275                            interrupt_needs_arming = true;
276                            // To maintain proper ordering, we must capture the message here whilst
277                            // we are still gathering the list of sockets that are ready to read.
278                            // If we wait till later, we introduce windows where we might miss a
279                            // socket that should be read.
280                            if msg.is_none() {
281                                msg = self.inner.lock().thread_msg_queue.pop_front();
282                            }
283                        } else if packet.key() & IOB_PEER_CLOSED_KEY_BASE != 0 {
284                            iob_peer_closed.push(packet.key() as u32);
285                        } else {
286                            sockets_ready.push(SocketId(packet.key() as u32))
287                        }
288                    }
289                    Err(zx::Status::TIMED_OUT) => break,
290                    Err(status) => panic!("port wait error: {status:?}"),
291                }
292                deadline = zx::MonotonicInstant::INFINITE_PAST;
293            }
294
295            let mut inner = InnerGuard::new(self);
296
297            if !iob_peer_closed.is_empty() {
298                // See the comment on `is_active()` for why this is required.
299                inner.update_message_ids(inner.ring_buffer.head());
300
301                for iob_peer_closed in iob_peer_closed.drain(..) {
302                    let container_id = inner.iob_peers.free(iob_peer_closed).0;
303                    if let Some(container) = inner.containers.get_mut(container_id) {
304                        container.iob_count -= 1;
305                        if container.iob_count == 0 && !container.is_active() {
306                            if container.should_free() {
307                                inner.containers.free(container_id);
308                            } else {
309                                let identity = Arc::clone(&container.identity);
310                                inner.on_inactive.push(identity);
311                            }
312                        }
313                    }
314                }
315            }
316
317            {
318                let mut sockets = self.sockets.lock();
319                for socket_id in sockets_ready.drain(..) {
320                    inner.read_socket(&mut sockets, socket_id, |socket| {
321                        socket
322                            .socket
323                            .wait_async_handle(
324                                &self.port,
325                                socket_id.0 as u64,
326                                zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
327                                zx::WaitAsyncOpts::empty(),
328                            )
329                            .unwrap();
330                    });
331                }
332            }
333
334            // Now that we've processed the sockets, we can process the message.
335            if let Some(m) = msg.take() {
336                match m {
337                    ThreadMessage::Terminate => return,
338                    ThreadMessage::Flush(sender) => {
339                        let _ = sender.send(());
340                    }
341                }
342
343                // See if there's another message.
344                msg = inner.thread_msg_queue.pop_front();
345                if msg.is_none() {
346                    // If there are no more messages, we must clear the signal so that we get
347                    // notified when the next message arrives. This must be done whilst we are
348                    // holding the lock.
349                    self.event.signal_handle(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
350                }
351            }
352        }
353    }
354
355    async fn buffer_monitor_task(
356        this: Weak<Self>,
357        mut ring_buffer: ring_buffer::Reader,
358        sleep_time: Duration,
359    ) {
360        let mut last_head = 0;
361        loop {
362            // Sleep to limit how often we wake up.
363            fasync::Timer::new(sleep_time).await;
364            let head = ring_buffer.wait(last_head).await;
365            let Some(this) = this.upgrade() else { return };
366            let mut inner = InnerGuard::new(&this);
367            inner.check_space(head);
368            last_head = head;
369        }
370    }
371}
372
373impl Inner {
374    // Returns list of containers that no longer have any messages.
375    fn ingest(&mut self, msg: &[u8], container_id: ContainerId) {
376        if msg.len() < std::mem::size_of::<Header>() {
377            debug!("message too short ({})", msg.len());
378            if let Some(container) = self.containers.get(container_id) {
379                container.stats.increment_invalid(msg.len());
380            }
381            return;
382        }
383
384        let header = Header::read_from_bytes(&msg[..std::mem::size_of::<Header>()]).unwrap();
385
386        // NOTE: Some tests send messages that are bigger than the header indicates. We ignore the
387        // tail of any such messages.
388        let msg_len = header.size_words() as usize * 8;
389
390        // Check the correct type and size.
391        if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg.len() < msg_len {
392            debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, msg.len());
393            if let Some(container) = self.containers.get(container_id) {
394                container.stats.increment_invalid(msg.len());
395            }
396            return;
397        }
398
399        let Some(container) = self.containers.get_mut(container_id) else {
400            return;
401        };
402
403        let mut data;
404        let msg = if container.dropped_count > 0 {
405            data = msg.to_vec();
406            if !add_dropped_count(&mut data, container.dropped_count) {
407                debug!("unable to add dropped count to invalid message");
408                container.stats.increment_invalid(data.len());
409                return;
410            }
411            &data
412        } else {
413            msg
414        };
415
416        if container.iob.write(Default::default(), 0, msg).is_err() {
417            // We were unable to write the message to the buffer, most likely due to lack of
418            // space. We drop this message and then we'll add to the dropped count for the next
419            // message.
420            container.dropped_count += 1
421        } else {
422            container.dropped_count = 0;
423        }
424    }
425
426    /// Parses the first message within `range` returns (container_id, message, timestamp).
427    ///
428    /// # Panics
429    ///
430    /// This will panic if the ring buffer has been corrupted. Only the kernel and Archivist can
431    /// write to the ring buffer and so we trust both not to corrupt the ring buffer.
432    ///
433    /// # Safety
434    ///
435    /// `range` *must* be within the written range of the ring buffer so that there is no concurrent
436    /// write access to that range. The returned slice is only valid whilst this remains true.
437    unsafe fn parse_message(
438        &self,
439        range: Range<u64>,
440    ) -> (ContainerId, &[u8], Option<zx::BootInstant>) {
441        let (tag, msg) = unsafe { self.ring_buffer.first_message_in(range) }
442            .expect("Unable to read message from ring buffer");
443        (
444            ContainerId(tag as u32),
445            msg,
446            (msg.len() >= 16)
447                .then(|| zx::BootInstant::from_nanos(i64::read_from_bytes(&msg[8..16]).unwrap())),
448        )
449    }
450}
451
452impl<'a> InnerGuard<'a> {
453    fn new(buffer: &'a SharedBuffer) -> Self {
454        Self {
455            buffer,
456            guard: ManuallyDrop::new(buffer.inner.lock()),
457            on_inactive: Vec::new(),
458            wake: false,
459        }
460    }
461
462    /// Pops a message and returns its total size. The caller should call `update_message_ids(head)`
463    /// prior to calling this.
464    ///
465    /// NOTE: This will pop the oldest message in terms of when it was inserted which is *not*
466    /// necessarily the message with the *oldest* timestamp because we might not have received the
467    /// messages in perfect timestamp order. This should be close enough for all use cases we care
468    /// about, and besides, the timestamps can't be trusted anyway.
469    fn pop(&mut self, head: u64) -> Option<usize> {
470        if head == self.tail {
471            return None;
472        }
473
474        // SAFETY: There can be no concurrent writes between `tail..head` and the *only* place
475        // we increment the tail index is just before we leave this function.
476        let record_len = {
477            let (container_id, message, timestamp) = unsafe { self.parse_message(self.tail..head) };
478            let record_len = ring_buffer_record_len(message.len());
479
480            let container = self.containers.get_mut(container_id).unwrap();
481
482            container.stats.increment_rolled_out(record_len);
483            container.msg_ids.start += 1;
484            if let Some(timestamp) = timestamp {
485                container.last_rolled_out_timestamp = timestamp;
486            }
487            if !container.is_active() {
488                if container.should_free() {
489                    self.containers.free(container_id);
490                } else {
491                    let identity = Arc::clone(&container.identity);
492                    self.on_inactive.push(identity);
493                }
494            }
495
496            record_len
497        };
498
499        // NOTE: This should go last. After incrementing `tail`, the `message` can be overwritten.
500        self.ring_buffer.increment_tail(record_len);
501        self.tail += record_len as u64;
502
503        // The caller should have called `update_message_ids(head)` prior to calling this.
504        assert!(self.last_scanned >= self.tail);
505
506        Some(record_len)
507    }
508
509    /// Reads a socket. Calls `rearm` to rearm the socket once it has been drained.
510    fn read_socket(
511        &mut self,
512        sockets: &mut Slab<Socket>,
513        socket_id: SocketId,
514        rearm: impl FnOnce(&mut Socket),
515    ) {
516        let Some(socket) = sockets.get_mut(socket_id.0) else { return };
517        let container_id = socket.container_id;
518
519        loop {
520            self.check_space(self.ring_buffer.head());
521
522            let mut data = Vec::with_capacity(MAX_DATAGRAM_LEN_BYTES as usize);
523
524            // Read directly into the buffer leaving space for the header.
525            let len = match socket.socket.read_uninit(data.spare_capacity_mut()) {
526                Ok(d) => d.len(),
527                Err(zx::Status::SHOULD_WAIT) => {
528                    // The socket has been drained.
529                    rearm(socket);
530                    return;
531                }
532                Err(_) => break,
533            };
534
535            // SAFETY: `read_uninit` will have written to `len` bytes.
536            unsafe {
537                data.set_len(len);
538            }
539
540            let container = self.containers.get_mut(container_id).unwrap();
541            if data.len() < 16 {
542                container.stats.increment_invalid(data.len());
543                continue;
544            }
545
546            let header = Header::read_from_bytes(&data[..std::mem::size_of::<Header>()]).unwrap();
547            let msg_len = header.size_words() as usize * 8;
548            if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg_len != data.len() {
549                debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, data.len());
550                container.stats.increment_invalid(data.len());
551                continue;
552            }
553
554            if container.dropped_count > 0 && !add_dropped_count(&mut data, container.dropped_count)
555            {
556                debug!("unable to add dropped count to invalid message");
557                container.stats.increment_invalid(data.len());
558                continue;
559            }
560
561            if container.iob.write(Default::default(), 0, &data).is_err() {
562                // We were unable to write the message to the buffer, most likely due to lack of
563                // space. We drop this message and then we'll add to the dropped count for the next
564                // message.
565                container.dropped_count += 1
566            } else {
567                container.dropped_count = 0;
568            }
569        }
570
571        // This path is taken when the socket should be closed.
572
573        // See the comment on `is_active()` for why this is required.
574        self.update_message_ids(self.ring_buffer.head());
575
576        let container = self.containers.get_mut(container_id).unwrap();
577        container.remove_socket(socket_id, sockets);
578        if !container.is_active() {
579            if container.should_free() {
580                self.containers.free(container_id);
581            } else {
582                let identity = Arc::clone(&container.identity);
583                self.on_inactive.push(identity);
584            }
585        }
586    }
587
588    /// Scans the ring buffer and updates `msg_ids` for the containers.
589    fn update_message_ids(&mut self, head: u64) {
590        while self.last_scanned < head {
591            // SAFETY: This is safe because `head` must be within the ring buffer range and we make
592            // sure that `self.last_scanned` is always >= `tail` in `pop()`.
593            let (container_id, message, _) = unsafe { self.parse_message(self.last_scanned..head) };
594            let msg_len = message.len();
595            let severity = (msg_len >= 8)
596                .then(|| Header::read_from_bytes(&message[0..8]).unwrap().severity().into());
597            let container = self.containers.get_mut(container_id).unwrap();
598            container.msg_ids.end += 1;
599            if let Some(severity) = severity {
600                container.stats.ingest_message(msg_len, severity);
601            }
602            self.last_scanned += ring_buffer_record_len(msg_len) as u64;
603            self.wake = true;
604        }
605    }
606
607    /// Ensures the buffer keeps the required amount of space.
608    fn check_space(&mut self, head: u64) {
609        self.update_message_ids(head);
610        let capacity = self.ring_buffer.capacity();
611        let mut space = capacity
612            .checked_sub((head - self.tail) as usize)
613            .unwrap_or_else(|| panic!("bad range: {:?}", self.tail..head));
614        let required_space = capacity * SPACE_THRESHOLD_NUMERATOR / SPACE_THRESHOLD_DENOMINATOR;
615        while space < required_space {
616            let Some(amount) = self.pop(head) else { break };
617            space += amount;
618        }
619    }
620}
621
622#[derive(Default)]
623struct Containers {
624    slab: Slab<ContainerInfo>,
625}
626
627#[derive(Clone, Copy, Debug)]
628struct ContainerId(u32);
629
630impl Containers {
631    #[cfg(test)]
632    fn len(&self) -> usize {
633        self.slab.len()
634    }
635
636    fn get(&self, id: ContainerId) -> Option<&ContainerInfo> {
637        self.slab.get(id.0)
638    }
639
640    fn get_mut(&mut self, id: ContainerId) -> Option<&mut ContainerInfo> {
641        self.slab.get_mut(id.0)
642    }
643
644    fn new_container(
645        &mut self,
646        buffer: &RingBuffer,
647        identity: Arc<ComponentIdentity>,
648        stats: Arc<LogStreamStats>,
649    ) -> ContainerId {
650        ContainerId(self.slab.insert(|id| {
651            let (iob, _) = buffer.new_iob_writer(id as u64).unwrap();
652            ContainerInfo::new(identity, stats, iob)
653        }))
654    }
655
656    fn free(&mut self, id: ContainerId) {
657        self.slab.free(id.0);
658    }
659}
660
661#[derive(Derivative)]
662#[derivative(Debug)]
663struct ContainerInfo {
664    // The number of references (typically cursors) that prevent this object from being freed.
665    refs: usize,
666
667    // The identity of the container.
668    identity: Arc<ComponentIdentity>,
669
670    // The first index in the shared buffer for a message for this container. This index
671    // might have been rolled out. This is used as an optimisation to set the starting
672    // cursor position.
673    first_index: u64,
674
675    // The first and last message IDs stored in the shared buffer. The last message ID can be out of
676    // date if there are concurrent writers.
677    msg_ids: Range<u64>,
678
679    // Whether the container is terminated.
680    terminated: bool,
681
682    // Inspect instrumentation.
683    #[derivative(Debug = "ignore")]
684    stats: Arc<LogStreamStats>,
685
686    // The timestamp of the message most recently rolled out.
687    last_rolled_out_timestamp: zx::BootInstant,
688
689    // The first socket ID for this container.
690    first_socket_id: SocketId,
691
692    // The IOBuffer used for writing.
693    iob: zx::Iob,
694
695    // The number of client IOBuffers.
696    iob_count: usize,
697
698    // The number of messages dropped when forwarding from a socket to an IOBuffer.
699    dropped_count: u64,
700}
701
702impl ContainerInfo {
703    fn new(identity: Arc<ComponentIdentity>, stats: Arc<LogStreamStats>, iob: zx::Iob) -> Self {
704        Self {
705            refs: 0,
706            identity,
707            first_index: 0,
708            msg_ids: 0..0,
709            terminated: false,
710            stats,
711            last_rolled_out_timestamp: zx::BootInstant::ZERO,
712            first_socket_id: SocketId::NULL,
713            iob,
714            iob_count: 0,
715            dropped_count: 0,
716        }
717    }
718
719    fn should_free(&self) -> bool {
720        self.terminated && self.refs == 0 && !self.is_active()
721    }
722
723    // Returns true if the container is considered active which is the case if it has sockets, io
724    // buffers, or buffered messages.
725    //
726    // NOTE: Whenever a socket or iob is closed, `update_message_ids` must called to ensure
727    // `msg_ids.end` is correctly set.
728    fn is_active(&self) -> bool {
729        self.first_socket_id != SocketId::NULL
730            || self.iob_count > 0
731            || self.msg_ids.end != self.msg_ids.start
732            || ARCHIVIST_MONIKER.get().is_some_and(|m| *self.identity == *m)
733    }
734
735    // # Panics
736    //
737    // This will panic if the socket isn't found.
738    fn remove_socket(&mut self, socket_id: SocketId, sockets: &mut Slab<Socket>) {
739        let Socket { prev, next, .. } = *sockets.get(socket_id.0).unwrap();
740        if prev == SocketId::NULL {
741            self.first_socket_id = next;
742        } else {
743            sockets.get_mut(prev.0).unwrap().next = next;
744        }
745        if next != SocketId::NULL {
746            sockets
747                .get_mut(next.0)
748                .unwrap_or_else(|| panic!("next {next:?} has been freed!"))
749                .prev = prev;
750        }
751        sockets.free(socket_id.0);
752        self.stats.close_socket();
753        debug!(identity:% = self.identity; "Socket closed.");
754    }
755}
756
757pub struct ContainerBuffer {
758    shared_buffer: Arc<SharedBuffer>,
759    container_id: ContainerId,
760}
761
762impl ContainerBuffer {
763    /// Returns the tag used by IOBuffers used for this component.
764    pub fn iob_tag(&self) -> u64 {
765        self.container_id.0 as u64
766    }
767
768    /// Ingests a new message.
769    ///
770    /// If the message is invalid, it is dropped.
771    pub fn push_back(&self, msg: &[u8]) {
772        self.shared_buffer.inner.lock().ingest(msg, self.container_id);
773    }
774
775    /// Returns an IOBuffer for the container.
776    pub fn iob(&self) -> zx::Iob {
777        let mut inner = self.shared_buffer.inner.lock();
778
779        inner.containers.get_mut(self.container_id).unwrap().iob_count += 1;
780
781        let (ep0, ep1) = inner.ring_buffer.new_iob_writer(self.container_id.0 as u64).unwrap();
782
783        inner.iob_peers.insert(|idx| {
784            ep1.wait_async_handle(
785                &self.shared_buffer.port,
786                idx as u64 | IOB_PEER_CLOSED_KEY_BASE,
787                zx::Signals::IOB_PEER_CLOSED,
788                zx::WaitAsyncOpts::empty(),
789            )
790            .unwrap();
791
792            (self.container_id, ep1)
793        });
794
795        ep0
796    }
797
798    /// Returns a cursor.
799    pub fn cursor(&self, mode: StreamMode) -> Option<Cursor> {
800        // NOTE: It is not safe to use on_inactive in this function because this function can be
801        // called whilst locks are held which are the same locks that the on_inactive notification
802        // uses.
803        let mut inner = InnerGuard::new(&self.shared_buffer);
804        let Some(mut container) = inner.containers.get_mut(self.container_id) else {
805            // We've hit a race where the container has terminated.
806            return None;
807        };
808
809        container.refs += 1;
810        let stats = Arc::clone(&container.stats);
811
812        let (index, next_id, end) = match mode {
813            StreamMode::Snapshot => {
814                (container.first_index, container.msg_ids.start, CursorEnd::Snapshot(None))
815            }
816            StreamMode::Subscribe => {
817                // Call `update_message_ids` so that `msg_ids.end` is up to date.
818                let head = inner.ring_buffer.head();
819                inner.update_message_ids(head);
820                container = inner.containers.get_mut(self.container_id).unwrap();
821                (head, container.msg_ids.end, CursorEnd::Stream)
822            }
823            StreamMode::SnapshotThenSubscribe => {
824                (container.first_index, container.msg_ids.start, CursorEnd::Stream)
825            }
826        };
827
828        Some(Cursor {
829            index,
830            container_id: self.container_id,
831            next_id,
832            end,
833            buffer: Arc::clone(&self.shared_buffer),
834            waker_entry: self.shared_buffer.inner.waker_entry(),
835            stats,
836        })
837    }
838
839    /// Marks the buffer as terminated which will force all cursors to end and close all sockets.
840    /// The component's data will remain in the buffer until the messages are rolled out. This will
841    /// *not* drain sockets or close IOBuffers.
842    pub fn terminate(&self) {
843        let mut inner = InnerGuard::new(&self.shared_buffer);
844
845        // See the comment on `is_active()` for why this is required.
846        inner.update_message_ids(inner.ring_buffer.head());
847
848        if let Some(container) = inner.containers.get_mut(self.container_id) {
849            container.terminated = true;
850            if container.first_socket_id != SocketId::NULL {
851                let mut sockets = self.shared_buffer.sockets.lock();
852                loop {
853                    container.remove_socket(container.first_socket_id, &mut sockets);
854                    if container.first_socket_id == SocketId::NULL {
855                        break;
856                    }
857                }
858            }
859            if container.should_free() {
860                inner.containers.free(self.container_id);
861            }
862            inner.wake = true;
863        }
864    }
865
866    /// Returns true if the container has messages, sockets or IOBuffers.
867    pub fn is_active(&self) -> bool {
868        self.shared_buffer
869            .inner
870            .lock()
871            .containers
872            .get(self.container_id)
873            .is_some_and(|c| c.is_active())
874    }
875
876    /// Adds a socket for this container.
877    pub fn add_socket(&self, socket: zx::Socket) {
878        let mut inner = self.shared_buffer.inner.lock();
879        let Some(container) = inner.containers.get_mut(self.container_id) else { return };
880        container.stats.open_socket();
881        let next = container.first_socket_id;
882        let mut sockets = self.shared_buffer.sockets.lock();
883        let socket_id = SocketId(sockets.insert(|socket_id| {
884            socket
885                .wait_async_handle(
886                    &self.shared_buffer.port,
887                    socket_id as u64,
888                    zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
889                    zx::WaitAsyncOpts::empty(),
890                )
891                .unwrap();
892            Socket { socket, container_id: self.container_id, prev: SocketId::NULL, next }
893        }));
894        if next != SocketId::NULL {
895            sockets.get_mut(next.0).unwrap().prev = socket_id;
896        }
897        container.first_socket_id = socket_id;
898    }
899}
900
901impl Drop for ContainerBuffer {
902    fn drop(&mut self) {
903        self.terminate();
904    }
905}
906
907#[pin_project(PinnedDrop)]
908#[derive(Derivative)]
909#[derivative(Debug)]
910pub struct Cursor {
911    // Index in the buffer that we should continue searching from.
912    index: u64,
913
914    container_id: ContainerId,
915
916    // The next expected message ID.
917    next_id: u64,
918
919    // The ID, if any, that we should end at (exclusive).
920    end: CursorEnd,
921
922    #[derivative(Debug = "ignore")]
923    buffer: Arc<SharedBuffer>,
924
925    // waker_entry is used to register a waker for subscribing cursors.
926    #[pin]
927    #[derivative(Debug = "ignore")]
928    waker_entry: WakerEntry<Inner>,
929
930    #[derivative(Debug = "ignore")]
931    stats: Arc<LogStreamStats>,
932}
933
934/// The next element in the stream or a marker of the number of items rolled out since last polled.
935#[derive(Debug, PartialEq)]
936pub enum LazyItem<T> {
937    /// The next item in the stream.
938    Next(Arc<T>),
939    /// A count of the items dropped between the last call to poll_next and this one.
940    ItemsRolledOut(u64, zx::BootInstant),
941}
942
943impl Stream for Cursor {
944    type Item = LazyItem<StoredMessage>;
945
946    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
947        let mut this = self.project();
948        let mut inner = InnerGuard::new(this.buffer);
949
950        let mut head = inner.ring_buffer.head();
951        inner.check_space(head);
952
953        let mut container = match inner.containers.get(*this.container_id) {
954            None => return Poll::Ready(None),
955            Some(container) => container,
956        };
957
958        let end_id = match &mut this.end {
959            CursorEnd::Snapshot(None) => {
960                let mut sockets = this.buffer.sockets.lock();
961                // Drain the sockets associated with this container first so that we capture
962                // pending messages. Some tests rely on this.
963                let mut socket_id = container.first_socket_id;
964                while socket_id != SocketId::NULL {
965                    let socket = sockets.get_mut(socket_id.0).unwrap();
966                    let next = socket.next;
967                    // The socket doesn't need to be rearmed here.
968                    inner.read_socket(&mut sockets, socket_id, |_| {});
969                    socket_id = next;
970                }
971
972                // Call `update_message_ids` so that `msg_ids.end` is up to date.
973                head = inner.ring_buffer.head();
974                inner.update_message_ids(head);
975                container = inner.containers.get(*this.container_id).unwrap();
976                *this.end = CursorEnd::Snapshot(Some(container.msg_ids.end));
977                container.msg_ids.end
978            }
979            CursorEnd::Snapshot(Some(end)) => *end,
980            CursorEnd::Stream => u64::MAX,
981        };
982
983        if *this.next_id == end_id {
984            return Poll::Ready(None);
985        }
986
987        // See if messages have been rolled out.
988        if container.msg_ids.start > *this.next_id {
989            let mut next_id = container.msg_ids.start;
990            if end_id < next_id {
991                next_id = end_id;
992            }
993            let rolled_out = next_id - *this.next_id;
994            *this.next_id = next_id;
995            return Poll::Ready(Some(LazyItem::ItemsRolledOut(
996                rolled_out,
997                container.last_rolled_out_timestamp,
998            )));
999        }
1000
1001        if inner.tail > *this.index {
1002            *this.index = inner.tail;
1003        }
1004
1005        // Some optimisations to reduce the amount of searching we might have to do.
1006        if container.first_index > *this.index {
1007            *this.index = container.first_index;
1008        }
1009
1010        if *this.next_id == container.msg_ids.end && *this.index < inner.last_scanned {
1011            *this.index = inner.last_scanned;
1012        }
1013
1014        // Scan forward until we find a record matching this container.
1015        while *this.index < head {
1016            // SAFETY: `*this.index..head` must be within the written region of the ring-buffer.
1017            let (container_id, message, _timestamp) =
1018                unsafe { inner.parse_message(*this.index..head) };
1019
1020            // Move index to the next record.
1021            *this.index += ring_buffer_record_len(message.len()) as u64;
1022            assert!(*this.index <= head);
1023
1024            if container_id.0 == this.container_id.0 {
1025                *this.next_id += 1;
1026                if let Some(msg) = StoredMessage::new(message.into(), this.stats) {
1027                    return Poll::Ready(Some(LazyItem::Next(Arc::new(msg))));
1028                } else {
1029                    // The message is corrupt. Just skip it.
1030                }
1031            }
1032        }
1033
1034        if container.terminated {
1035            Poll::Ready(None)
1036        } else {
1037            inner.guard.add_waker(this.waker_entry, cx.waker().clone());
1038            Poll::Pending
1039        }
1040    }
1041}
1042
1043#[pinned_drop]
1044impl PinnedDrop for Cursor {
1045    fn drop(self: Pin<&mut Self>) {
1046        let mut inner = self.buffer.inner.lock();
1047        if let Some(container) = inner.containers.get_mut(self.container_id) {
1048            container.refs -= 1;
1049            if container.should_free() {
1050                inner.containers.free(self.container_id);
1051            }
1052        }
1053    }
1054}
1055
1056#[derive(Debug)]
1057enum CursorEnd {
1058    // The id will be None when the cursor is first created and is set after the cursor
1059    // is first polled.
1060    Snapshot(Option<u64>),
1061    Stream,
1062}
1063
1064/// Implements a simple Slab allocator.
1065struct Slab<T> {
1066    slab: Vec<Slot<T>>,
1067    free_index: usize,
1068}
1069
1070impl<T> Default for Slab<T> {
1071    fn default() -> Self {
1072        Self { slab: Vec::new(), free_index: usize::MAX }
1073    }
1074}
1075
1076enum Slot<T> {
1077    Used(T),
1078    Free(usize),
1079}
1080
1081impl<T> Slab<T> {
1082    /// Returns the number of used entries. This is not performant.
1083    #[cfg(test)]
1084    fn len(&self) -> usize {
1085        self.slab.iter().filter(|c| matches!(c, Slot::Used(_))).count()
1086    }
1087
1088    fn free(&mut self, index: u32) -> T {
1089        let index = index as usize;
1090        let value = match std::mem::replace(&mut self.slab[index], Slot::Free(self.free_index)) {
1091            Slot::Free(_) => panic!("Slot already free"),
1092            Slot::Used(value) => value,
1093        };
1094        self.free_index = index;
1095        value
1096    }
1097
1098    fn get(&self, id: u32) -> Option<&T> {
1099        self.slab.get(id as usize).and_then(|s| match s {
1100            Slot::Used(s) => Some(s),
1101            _ => None,
1102        })
1103    }
1104
1105    fn get_mut(&mut self, id: u32) -> Option<&mut T> {
1106        self.slab.get_mut(id as usize).and_then(|s| match s {
1107            Slot::Used(s) => Some(s),
1108            _ => None,
1109        })
1110    }
1111
1112    fn insert(&mut self, value: impl FnOnce(u32) -> T) -> u32 {
1113        let free_index = self.free_index;
1114        if free_index != usize::MAX {
1115            self.free_index = match std::mem::replace(
1116                &mut self.slab[free_index],
1117                Slot::Used(value(free_index as u32)),
1118            ) {
1119                Slot::Free(next) => next,
1120                _ => unreachable!(),
1121            };
1122            free_index as u32
1123        } else {
1124            // This is < rather than <= because we reserve 0xffff_ffff to be used as a NULL value
1125            // (see SocketId::NULL below).
1126            assert!(self.slab.len() < u32::MAX as usize);
1127            self.slab.push(Slot::Used(value(self.slab.len() as u32)));
1128            (self.slab.len() - 1) as u32
1129        }
1130    }
1131}
1132
1133#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1134struct SocketId(u32);
1135
1136impl SocketId {
1137    const NULL: Self = SocketId(0xffff_ffff);
1138}
1139
1140struct Socket {
1141    socket: zx::Socket,
1142    container_id: ContainerId,
1143    // Sockets are stored as a linked list for each container.
1144    prev: SocketId,
1145    next: SocketId,
1146}
1147
1148#[cfg(test)]
1149mod tests {
1150    use super::{SharedBuffer, SharedBufferOptions, create_ring_buffer};
1151    use crate::logs::shared_buffer::LazyItem;
1152    use crate::logs::stats::LogStreamStats;
1153    use crate::logs::testing::make_message;
1154    use assert_matches::assert_matches;
1155    use diagnostics_assertions::{AnyProperty, assert_data_tree};
1156    use fidl_fuchsia_diagnostics::StreamMode;
1157    use fuchsia_async as fasync;
1158    use fuchsia_async::TimeoutExt;
1159    use fuchsia_inspect::{Inspector, InspectorConfig};
1160    use fuchsia_inspect_derive::WithInspect;
1161    use futures::channel::mpsc;
1162    use futures::future::OptionFuture;
1163    use futures::stream::{FuturesUnordered, StreamExt as _};
1164    use futures::{FutureExt, poll};
1165    use ring_buffer::MAX_MESSAGE_SIZE;
1166    use std::future::poll_fn;
1167    use std::pin::pin;
1168    use std::sync::Arc;
1169    use std::sync::atomic::{AtomicU64, Ordering};
1170    use std::task::Poll;
1171    use std::time::Duration;
1172
1173    async fn yield_to_executor() {
1174        let mut first_time = true;
1175        poll_fn(|cx| {
1176            if first_time {
1177                cx.waker().wake_by_ref();
1178                first_time = false;
1179                Poll::Pending
1180            } else {
1181                Poll::Ready(())
1182            }
1183        })
1184        .await;
1185    }
1186
1187    #[fuchsia::test]
1188    async fn push_one_message() {
1189        let buffer = SharedBuffer::new(
1190            create_ring_buffer(MAX_MESSAGE_SIZE),
1191            Box::new(|_| {}),
1192            Default::default(),
1193        );
1194        let container_buffer =
1195            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1196        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1197        container_buffer.push_back(msg.bytes());
1198
1199        // Make sure the cursor can find it.
1200        let cursor = container_buffer.cursor(StreamMode::Snapshot).unwrap();
1201        assert_eq!(
1202            cursor
1203                .map(|item| {
1204                    match item {
1205                        LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1206                        _ => panic!("Unexpected item {item:?}"),
1207                    }
1208                })
1209                .count()
1210                .await,
1211            1
1212        );
1213    }
1214
1215    #[fuchsia::test]
1216    async fn message_too_short() {
1217        let buffer = SharedBuffer::new(
1218            create_ring_buffer(MAX_MESSAGE_SIZE),
1219            Box::new(|_| {}),
1220            Default::default(),
1221        );
1222
1223        let container_buffer =
1224            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1225        container_buffer.push_back(&[0]);
1226
1227        assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1228    }
1229
1230    #[fuchsia::test]
1231    async fn bad_type() {
1232        let buffer = SharedBuffer::new(
1233            create_ring_buffer(MAX_MESSAGE_SIZE),
1234            Box::new(|_| {}),
1235            Default::default(),
1236        );
1237        let container_buffer =
1238            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1239        container_buffer.push_back(&[0x77; 16]);
1240
1241        assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1242    }
1243
1244    #[fuchsia::test]
1245    async fn message_truncated() {
1246        let buffer = SharedBuffer::new(
1247            create_ring_buffer(MAX_MESSAGE_SIZE),
1248            Box::new(|_| {}),
1249            Default::default(),
1250        );
1251        let container_buffer =
1252            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1253        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1254        container_buffer.push_back(&msg.bytes()[..msg.bytes().len() - 1]);
1255
1256        assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1257    }
1258
1259    #[fuchsia::test]
1260    async fn buffer_wrapping() {
1261        let buffer = SharedBuffer::new(
1262            create_ring_buffer(MAX_MESSAGE_SIZE),
1263            Box::new(|_| {}),
1264            SharedBufferOptions { sleep_time: Duration::ZERO },
1265        );
1266        let container_buffer =
1267            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1268
1269        // Keep writing messages until we wrap.
1270        let mut i = 0;
1271        loop {
1272            let msg = make_message(&format!("{i}"), None, zx::BootInstant::from_nanos(i));
1273            container_buffer.push_back(msg.bytes());
1274            i += 1;
1275
1276            // Yield to the executor to allow messages to be rolled out.
1277            yield_to_executor().await;
1278
1279            let inner = buffer.inner.lock();
1280            if inner.ring_buffer.head() > inner.ring_buffer.capacity() as u64 {
1281                break;
1282            }
1283        }
1284
1285        // Read back all the messages.
1286        let mut cursor = pin!(container_buffer.cursor(StreamMode::Snapshot).unwrap());
1287
1288        let mut j;
1289        let mut item = cursor.next().await;
1290        // Calling `cursor.next()` can cause messages to be rolled out, so skip those if that has
1291        // happened.
1292        if let Some(LazyItem::ItemsRolledOut(_, _)) = item {
1293            item = cursor.next().await;
1294        }
1295        assert_matches!(
1296            item,
1297            Some(LazyItem::Next(item)) => {
1298                j = item.timestamp().into_nanos();
1299                let msg = make_message(&format!("{j}"),
1300                                       None,
1301                                       item.timestamp());
1302                assert_eq!(&*item, &msg);
1303            }
1304        );
1305
1306        j += 1;
1307        while j != i {
1308            assert_matches!(
1309                cursor.next().await,
1310                Some(LazyItem::Next(item)) => {
1311                    assert_eq!(&*item, &make_message(&format!("{j}"),
1312                                                     None,
1313                                                     item.timestamp()));
1314                }
1315            );
1316            j += 1;
1317        }
1318
1319        assert_eq!(cursor.next().await, None);
1320    }
1321
1322    #[fuchsia::test]
1323    async fn on_inactive() {
1324        let identity = Arc::new(vec!["a"].into());
1325        let on_inactive = Arc::new(AtomicU64::new(0));
1326        let buffer = {
1327            let on_inactive = Arc::clone(&on_inactive);
1328            let identity = Arc::clone(&identity);
1329            Arc::new(SharedBuffer::new(
1330                create_ring_buffer(MAX_MESSAGE_SIZE),
1331                Box::new(move |i| {
1332                    assert_eq!(i, identity);
1333                    on_inactive.fetch_add(1, Ordering::Relaxed);
1334                }),
1335                SharedBufferOptions { sleep_time: Duration::ZERO },
1336            ))
1337        };
1338        let container_a = buffer.new_container_buffer(identity, Arc::default());
1339        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1340
1341        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1342        container_a.push_back(msg.bytes());
1343
1344        // Repeatedly write messages to b until a is rolled out.
1345        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1346            container_b.push_back(msg.bytes());
1347
1348            // Yield to the executor to allow messages to be rolled out.
1349            yield_to_executor().await;
1350        }
1351
1352        assert_eq!(on_inactive.load(Ordering::Relaxed), 1);
1353    }
1354
1355    #[fuchsia::test]
1356    async fn terminate_drops_container() {
1357        // Silence a clippy warning; SharedBuffer needs an executor.
1358        async {}.await;
1359
1360        let buffer = SharedBuffer::new(
1361            create_ring_buffer(MAX_MESSAGE_SIZE),
1362            Box::new(|_| {}),
1363            SharedBufferOptions { sleep_time: Duration::ZERO },
1364        );
1365
1366        // terminate when buffer has no logs.
1367        let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1368        assert_eq!(buffer.container_count(), 1);
1369        container_a.terminate();
1370
1371        assert_eq!(buffer.container_count(), 0);
1372
1373        // terminate when buffer has logs.
1374        let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1375        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1376        container_a.push_back(msg.bytes());
1377        assert_eq!(buffer.container_count(), 1);
1378        container_a.terminate();
1379
1380        // The container should still be there because it has logs.
1381        assert_eq!(buffer.container_count(), 1);
1382
1383        // Roll out the logs.
1384        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1385        assert_eq!(buffer.container_count(), 2);
1386
1387        // Repeatedly write messages to b until a's message is dropped and then the container will
1388        // be dropped.
1389        while buffer.container_count() != 1 {
1390            container_b.push_back(msg.bytes());
1391
1392            // Yield to the executor to allow messages to be rolled out.
1393            yield_to_executor().await;
1394        }
1395
1396        assert!(container_a.cursor(StreamMode::Subscribe).is_none());
1397    }
1398
1399    #[fuchsia::test]
1400    async fn cursor_subscribe() {
1401        for mode in [StreamMode::Subscribe, StreamMode::SnapshotThenSubscribe] {
1402            let buffer = SharedBuffer::new(
1403                create_ring_buffer(MAX_MESSAGE_SIZE),
1404                Box::new(|_| {}),
1405                Default::default(),
1406            );
1407            let container =
1408                Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1409            let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1410            container.push_back(msg.bytes());
1411
1412            let (sender, mut receiver) = mpsc::unbounded();
1413
1414            // Run the cursor in a separate task so that we can test it gets woken correctly.
1415            {
1416                let container = Arc::clone(&container);
1417                fasync::Task::spawn(async move {
1418                    let mut cursor = pin!(container.cursor(mode).unwrap());
1419                    while let Some(item) = cursor.next().await {
1420                        sender.unbounded_send(item).unwrap();
1421                    }
1422                })
1423                .detach();
1424            }
1425
1426            // The existing message should only be returned with SnapshotThenSubscribe
1427            if mode == StreamMode::SnapshotThenSubscribe {
1428                assert_matches!(
1429                    receiver.next().await,
1430                    Some(LazyItem::Next(item)) if item.bytes() == msg.bytes()
1431                );
1432            }
1433
1434            // No message should arrive. We can only use a timeout here.
1435            assert_eq!(
1436                OptionFuture::from(Some(receiver.next()))
1437                    .on_timeout(Duration::from_millis(500), || None)
1438                    .await,
1439                None
1440            );
1441
1442            container.push_back(msg.bytes());
1443
1444            // The message should arrive now.
1445            assert_matches!(
1446                receiver.next().await,
1447                Some(LazyItem::Next(item)) if item.bytes() == msg.bytes()
1448            );
1449
1450            container.terminate();
1451
1452            // The cursor should terminate now.
1453            assert!(receiver.next().await.is_none());
1454        }
1455    }
1456
1457    #[fuchsia::test]
1458    async fn cursor_rolled_out() {
1459        // On the first pass we roll out before the cursor has started. On the second pass, we roll
1460        // out after the cursor has started.
1461        for pass in 0..2 {
1462            let buffer = SharedBuffer::new(
1463                create_ring_buffer(MAX_MESSAGE_SIZE),
1464                Box::new(|_| {}),
1465                SharedBufferOptions { sleep_time: Duration::ZERO },
1466            );
1467            let container_a =
1468                Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1469            let container_b =
1470                Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1471            let container_c =
1472                Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), Arc::default()));
1473            let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1474
1475            container_a.push_back(msg.bytes());
1476            container_a.push_back(msg.bytes());
1477            container_b.push_back(msg.bytes());
1478
1479            const A_MESSAGE_COUNT: usize = 50;
1480            for _ in 0..A_MESSAGE_COUNT - 2 {
1481                container_a.push_back(msg.bytes());
1482            }
1483
1484            let mut cursor = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1485
1486            let mut expected = A_MESSAGE_COUNT;
1487
1488            // Get the first stored message on the first pass.
1489            if pass == 0 {
1490                assert!(cursor.next().await.is_some());
1491                expected -= 1;
1492            }
1493
1494            // Roll out messages until container_b is rolled out.
1495            while container_b.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1496                container_c.push_back(msg.bytes());
1497
1498                // Yield to the executor to allow messages to be rolled out.
1499                yield_to_executor().await;
1500            }
1501
1502            // We should have rolled out some messages in container a.
1503            assert_matches!(
1504                cursor.next().await,
1505                Some(LazyItem::ItemsRolledOut(rolled_out, t))
1506                    if t == zx::BootInstant::from_nanos(1) && rolled_out > 0
1507                    => expected -= rolled_out as usize
1508            );
1509
1510            // And check how many are remaining.
1511            assert_eq!(cursor.count().await, expected);
1512        }
1513    }
1514
1515    #[fuchsia::test]
1516    async fn drained_post_termination_cursors() {
1517        let buffer = SharedBuffer::new(
1518            create_ring_buffer(MAX_MESSAGE_SIZE),
1519            Box::new(|_| {}),
1520            Default::default(),
1521        );
1522        let container =
1523            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1524        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1525
1526        let mut cursor_a = pin!(container.cursor(StreamMode::Subscribe).unwrap());
1527        let mut cursor_b = pin!(container.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1528
1529        container.push_back(msg.bytes());
1530        container.push_back(msg.bytes());
1531        container.push_back(msg.bytes());
1532        container.push_back(msg.bytes());
1533        container.push_back(msg.bytes());
1534
1535        let mut cursor_c = pin!(container.cursor(StreamMode::Snapshot).unwrap());
1536        assert!(cursor_a.next().await.is_some());
1537        assert!(cursor_b.next().await.is_some());
1538        assert!(cursor_c.next().await.is_some());
1539
1540        container.terminate();
1541
1542        // All cursors should return the 4 remaining messages.
1543        assert_eq!(cursor_a.count().await, 4);
1544        assert_eq!(cursor_b.count().await, 4);
1545        assert_eq!(cursor_c.count().await, 4);
1546    }
1547
1548    #[fuchsia::test]
1549    async fn empty_post_termination_cursors() {
1550        let buffer = SharedBuffer::new(
1551            create_ring_buffer(MAX_MESSAGE_SIZE),
1552            Box::new(|_| {}),
1553            Default::default(),
1554        );
1555        let container =
1556            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1557
1558        let cursor_a = container.cursor(StreamMode::Subscribe).unwrap();
1559        let cursor_b = container.cursor(StreamMode::SnapshotThenSubscribe).unwrap();
1560        let cursor_c = container.cursor(StreamMode::Snapshot).unwrap();
1561
1562        container.terminate();
1563
1564        assert_eq!(cursor_a.count().await, 0);
1565        assert_eq!(cursor_b.count().await, 0);
1566        assert_eq!(cursor_c.count().await, 0);
1567    }
1568
1569    #[fuchsia::test]
1570    async fn snapshot_then_subscribe_works_when_only_dropped_notifications_are_returned() {
1571        let buffer = SharedBuffer::new(
1572            create_ring_buffer(MAX_MESSAGE_SIZE),
1573            Box::new(|_| {}),
1574            SharedBufferOptions { sleep_time: Duration::ZERO },
1575        );
1576        let container_a =
1577            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1578        let container_b =
1579            Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1580        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1581        container_a.push_back(msg.bytes());
1582        container_a.push_back(msg.bytes());
1583        container_a.push_back(msg.bytes());
1584        let mut cursor = pin!(container_a.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1585
1586        // Roll out all the messages.
1587        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await > 0 {
1588            container_b.push_back(msg.bytes());
1589
1590            // Yield to the executor to allow messages to be rolled out.
1591            yield_to_executor().await;
1592        }
1593
1594        assert_matches!(cursor.next().await, Some(LazyItem::ItemsRolledOut(3, _)));
1595
1596        assert!(poll!(cursor.next()).is_pending());
1597
1598        container_a.terminate();
1599        assert_eq!(cursor.count().await, 0);
1600    }
1601
1602    #[fuchsia::test]
1603    async fn recycled_container_slot() {
1604        let buffer = Arc::new(SharedBuffer::new(
1605            create_ring_buffer(MAX_MESSAGE_SIZE),
1606            Box::new(|_| {}),
1607            SharedBufferOptions { sleep_time: Duration::ZERO },
1608        ));
1609        let container_a =
1610            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1611        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1612        container_a.push_back(msg.bytes());
1613
1614        let mut cursor = pin!(container_a.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1615        assert_matches!(cursor.next().await, Some(LazyItem::Next(_)));
1616
1617        // Roll out all the messages.
1618        let container_b =
1619            Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1620        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await > 0 {
1621            container_b.push_back(msg.bytes());
1622
1623            // Yield to the executor to allow messages to be rolled out.
1624            yield_to_executor().await;
1625        }
1626
1627        container_a.terminate();
1628
1629        // This should create a new container that uses a new slot and shouldn't interfere with
1630        // container_a.
1631        let container_c =
1632            Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), Arc::default()));
1633        container_c.push_back(msg.bytes());
1634        container_c.push_back(msg.bytes());
1635
1636        // The original cursor should have finished.
1637        assert_matches!(cursor.next().await, None);
1638    }
1639
1640    #[fuchsia::test]
1641    async fn socket_increments_logstats() {
1642        let inspector = Inspector::new(InspectorConfig::default());
1643        let stats: Arc<LogStreamStats> =
1644            Arc::new(LogStreamStats::default().with_inspect(inspector.root(), "test").unwrap());
1645        let buffer = Arc::new(SharedBuffer::new(
1646            create_ring_buffer(65536),
1647            Box::new(|_| {}),
1648            Default::default(),
1649        ));
1650        let container_a = Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats));
1651        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1652
1653        let (local, remote) = zx::Socket::create_datagram();
1654        container_a.add_socket(remote);
1655
1656        let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1657
1658        // Use FuturesUnordered so that we can make sure that the cursor is woken when a message is
1659        // received (FuturesUnordered uses separate wakers for all the futures it manages).
1660        let mut futures = FuturesUnordered::new();
1661        futures.push(async move {
1662            let mut cursor_a = pin!(cursor_a);
1663            cursor_a.next().await
1664        });
1665        let mut next = futures.next();
1666        assert!(futures::poll!(&mut next).is_pending());
1667
1668        local.write(msg.bytes()).unwrap();
1669
1670        let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1671
1672        assert_eq!(
1673            cursor_b
1674                .map(|item| {
1675                    match item {
1676                        LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1677                        _ => panic!("Unexpected item {item:?}"),
1678                    }
1679                })
1680                .count()
1681                .await,
1682            1
1683        );
1684
1685        // If cursor_a wasn't woken, this will hang.
1686        next.await;
1687        // Validate logstats (must happen after the socket was handled)
1688        assert_data_tree!(
1689            inspector,
1690            root: contains {
1691                test: {
1692                    url: "",
1693                    last_timestamp: AnyProperty,
1694                    sockets_closed: 0u64,
1695                    sockets_opened: 1u64,
1696                    invalid: {
1697                        number: 0u64,
1698                        bytes: 0u64,
1699                    },
1700                    total: {
1701                        number: 1u64,
1702                        bytes: 88u64,
1703                    },
1704                    rolled_out: {
1705                        number: 0u64,
1706                        bytes: 0u64,
1707                    },
1708                    trace: {
1709                        number: 0u64,
1710                        bytes: 0u64,
1711                    },
1712                    debug: {
1713                        number: 1u64,
1714                        bytes: 88u64,
1715                    },
1716                    info: {
1717                        number: 0u64,
1718                        bytes: 0u64,
1719                    },
1720                    warn: {
1721                        number: 0u64,
1722                        bytes: 0u64,
1723                    },
1724                    error: {
1725                        number: 0u64,
1726                        bytes: 0u64,
1727                    },
1728                    fatal: {
1729                        number: 0u64,
1730                        bytes: 0u64,
1731                    },
1732                }
1733            }
1734        );
1735    }
1736
1737    #[fuchsia::test]
1738    async fn socket() {
1739        let buffer = Arc::new(SharedBuffer::new(
1740            create_ring_buffer(MAX_MESSAGE_SIZE),
1741            Box::new(|_| {}),
1742            Default::default(),
1743        ));
1744        let container_a =
1745            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1746        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1747
1748        let (local, remote) = zx::Socket::create_datagram();
1749        container_a.add_socket(remote);
1750
1751        let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1752
1753        // Use FuturesUnordered so that we can make sure that the cursor is woken when a message is
1754        // received (FuturesUnordered uses separate wakers for all the futures it manages).
1755        let mut futures = FuturesUnordered::new();
1756        futures.push(async move {
1757            let mut cursor_a = pin!(cursor_a);
1758            cursor_a.next().await
1759        });
1760        let mut next = futures.next();
1761        assert!(futures::poll!(&mut next).is_pending());
1762
1763        local.write(msg.bytes()).unwrap();
1764
1765        let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1766
1767        assert_eq!(
1768            cursor_b
1769                .map(|item| {
1770                    match item {
1771                        LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1772                        _ => panic!("Unexpected item {item:?}"),
1773                    }
1774                })
1775                .count()
1776                .await,
1777            1
1778        );
1779
1780        // If cursor_a wasn't woken, this will hang.
1781        next.await;
1782    }
1783
1784    #[fuchsia::test]
1785    async fn socket_on_inactive() {
1786        let on_inactive = Arc::new(AtomicU64::new(0));
1787        let a_identity = Arc::new(vec!["a"].into());
1788        let buffer = Arc::new(SharedBuffer::new(
1789            create_ring_buffer(MAX_MESSAGE_SIZE),
1790            {
1791                let on_inactive = Arc::clone(&on_inactive);
1792                let a_identity = Arc::clone(&a_identity);
1793                Box::new(move |id| {
1794                    assert_eq!(id, a_identity);
1795                    on_inactive.fetch_add(1, Ordering::Relaxed);
1796                })
1797            },
1798            SharedBufferOptions { sleep_time: Duration::ZERO },
1799        ));
1800        let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1801        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1802
1803        let (local, remote) = zx::Socket::create_datagram();
1804        container_a.add_socket(remote);
1805
1806        local.write(msg.bytes()).unwrap();
1807
1808        let cursor = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1809
1810        assert_eq!(
1811            cursor
1812                .map(|item| {
1813                    match item {
1814                        LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1815                        _ => panic!("Unexpected item {item:?}"),
1816                    }
1817                })
1818                .count()
1819                .await,
1820            1
1821        );
1822
1823        // Now roll out a's messages.
1824        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1825        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1826            container_b.push_back(msg.bytes());
1827
1828            // Yield to the executor to allow messages to be rolled out.
1829            yield_to_executor().await;
1830        }
1831
1832        assert_eq!(on_inactive.load(Ordering::Relaxed), 0);
1833
1834        // Close the socket.
1835        std::mem::drop(local);
1836
1837        // We don't know when the socket thread will run so we have to loop.
1838        while on_inactive.load(Ordering::Relaxed) != 1 {
1839            fasync::Timer::new(Duration::from_millis(50)).await;
1840        }
1841    }
1842
1843    #[fuchsia::test]
1844    async fn flush() {
1845        let a_identity = Arc::new(vec!["a"].into());
1846        let buffer = Arc::new(SharedBuffer::new(
1847            create_ring_buffer(1024 * 1024),
1848            Box::new(|_| {}),
1849            Default::default(),
1850        ));
1851        let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1852        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1853
1854        let (local, remote) = zx::Socket::create_datagram();
1855        container_a.add_socket(remote);
1856
1857        let cursor = pin!(container_a.cursor(StreamMode::Subscribe).unwrap());
1858
1859        const COUNT: usize = 1000;
1860        for _ in 0..COUNT {
1861            local.write(msg.bytes()).unwrap();
1862        }
1863
1864        // Race two flush futures.
1865        let mut flush_futures = FuturesUnordered::from_iter([buffer.flush(), buffer.flush()]);
1866        flush_futures.next().await;
1867
1868        let messages: Option<Vec<_>> = cursor.take(COUNT).collect().now_or_never();
1869        assert!(messages.is_some());
1870
1871        // Make sure the other one finishes too.
1872        flush_futures.next().await;
1873
1874        // Make sure we can still terminate the buffer.
1875        buffer.terminate().await;
1876    }
1877}