Skip to main content

archivist_lib/logs/
shared_buffer.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod cursor;
6
7pub use cursor::{FilterCursor, FilterCursorStream};
8
9use crate::identity::ComponentIdentity;
10use crate::logs::repository::ARCHIVIST_MONIKER;
11use crate::logs::stats::{LogStreamStats, SaturationCurve};
12use derivative::Derivative;
13use diagnostics_log_encoding::encode::add_dropped_count;
14use diagnostics_log_encoding::{Header, TRACING_FORMAT_LOG_RECORD_TYPE};
15use fidl_fuchsia_diagnostics::{ComponentSelector, StreamMode};
16use fidl_fuchsia_logger::MAX_DATAGRAM_LEN_BYTES;
17use fuchsia_async as fasync;
18use fuchsia_async::condition::{Condition, ConditionGuard};
19use fuchsia_inspect::Node;
20use fuchsia_sync::Mutex;
21use futures::channel::oneshot;
22use log::debug;
23use ring_buffer::{self, RingBuffer, ring_buffer_record_len};
24use std::collections::VecDeque;
25use std::mem::ManuallyDrop;
26use std::ops::{Deref, DerefMut, Range};
27use std::sync::{Arc, Weak};
28use std::time::Duration;
29use zerocopy::FromBytes;
30
31// Aim to keep 25% of the buffer free. This is expressed as a fraction: numerator / denominator.
32const SPACE_THRESHOLD_NUMERATOR: usize = 1;
33const SPACE_THRESHOLD_DENOMINATOR: usize = 4;
34
35// The default amount of time that Archivist will sleep for to reduce how often it wakes up to
36// handle log messages.
37const DEFAULT_SLEEP_TIME: Duration = Duration::from_millis(200);
38
39pub fn create_ring_buffer(capacity: usize) -> ring_buffer::Reader {
40    RingBuffer::create(calculate_real_size_given_desired_capacity(capacity))
41}
42
43fn calculate_real_size_given_desired_capacity(capacity: usize) -> usize {
44    // We always keep spare space in the buffer so that we don't drop messages.  This is controlled
45    // by SPACE_THRESHOLD_NUMERATOR & SPACE_THRESHOLD_DENOMINATOR.  Round up capacity so that
46    // `capacity` reflects the actual amount of log data we can store.
47    let page_size = zx::system_get_page_size() as usize;
48    (capacity * SPACE_THRESHOLD_DENOMINATOR
49        / (SPACE_THRESHOLD_DENOMINATOR - SPACE_THRESHOLD_NUMERATOR))
50        .next_multiple_of(page_size)
51}
52
53const IOB_PEER_CLOSED_KEY_BASE: u64 = 0x8000_0000_0000_0000;
54
55pub type OnInactive = Box<dyn Fn(Arc<ComponentIdentity>) + Send + Sync>;
56
57pub struct SharedBuffer {
58    inner: Condition<Inner>,
59
60    // Sockets. This *must* be locked after `inner` and the lock *must* be dropped before
61    // `InnerGuard` is dropped.
62    sockets: Mutex<Slab<Socket>>,
63
64    // Callback for when a container is inactive (i.e. no logs and no sockets).
65    on_inactive: OnInactive,
66
67    // The port that we use to service the sockets.
68    port: zx::Port,
69
70    // Event used for termination.
71    event: zx::Event,
72
73    // The thread that we use to service the sockets.
74    socket_thread: Mutex<Option<std::thread::JoinHandle<()>>>,
75
76    // The task responsible for monitoring the space in the IOBuffer and popping messages when it
77    // gets full. It will also wake cursors whenever new data arrives.
78    _buffer_monitor_task: fasync::Task<()>,
79
80    /// Metric for tracking buffer saturation.
81    pub(crate) saturation_curve: Arc<SaturationCurve>,
82}
83
84struct InnerGuard<'a> {
85    buffer: &'a Arc<SharedBuffer>,
86
87    guard: ManuallyDrop<ConditionGuard<'a, Inner>>,
88
89    // The list of components that should be reported as inactive once the lock is dropped.
90    on_inactive: Vec<Arc<ComponentIdentity>>,
91
92    // If true, wake all the wakers registered with the ConditionGuard.
93    wake: bool,
94}
95
96impl Drop for InnerGuard<'_> {
97    fn drop(&mut self) {
98        if self.wake {
99            for waker in self.guard.drain_wakers() {
100                waker.wake();
101            }
102        }
103        // SAFETY: This is the only place we drop the guard.
104        unsafe {
105            ManuallyDrop::drop(&mut self.guard);
106        }
107        for identity in self.on_inactive.drain(..) {
108            (*self.buffer.on_inactive)(identity);
109        }
110    }
111}
112
113impl Deref for InnerGuard<'_> {
114    type Target = Inner;
115
116    fn deref(&self) -> &Self::Target {
117        &self.guard
118    }
119}
120
121impl DerefMut for InnerGuard<'_> {
122    fn deref_mut(&mut self) -> &mut Self::Target {
123        &mut self.guard
124    }
125}
126
127struct Inner {
128    // The ring buffer.
129    ring_buffer: Arc<RingBuffer>,
130
131    // Registered containers.
132    containers: Containers,
133
134    // Socket thread message queue.
135    thread_msg_queue: VecDeque<ThreadMessage>,
136
137    // The index in the buffer that we have scanned for messages.
138    last_scanned: u64,
139
140    // The ID of the message that was last scanned.
141    last_scanned_message_id: u64,
142
143    // A copy of the tail index in the ring buffer.
144    tail: u64,
145
146    // The ID of the message at the tail.  This is just a counter that increments one per message.
147    tail_message_id: u64,
148
149    // The IOBuffer peers that we must watch.
150    iob_peers: Slab<(ContainerId, zx::Iob)>,
151
152    // True, when the buffer has been terminated.
153    terminated: bool,
154}
155
156enum ThreadMessage {
157    // The thread should terminate.
158    Terminate,
159
160    // Process all pending socket messages and report via the Sender when done.
161    Flush(oneshot::Sender<()>),
162}
163
164pub struct SharedBufferOptions {
165    // To reduce how often Archivist wakes when new messages are written, Archivist will sleep for
166    // this time. This will impact how quickly messages show up via the cursors.
167    pub sleep_time: Duration,
168    /// The size of the circular buffer for the saturation curve.
169    pub saturation_curve_size: usize,
170}
171
172impl Default for SharedBufferOptions {
173    fn default() -> Self {
174        Self { sleep_time: DEFAULT_SLEEP_TIME, saturation_curve_size: 254 }
175    }
176}
177
178impl SharedBuffer {
179    /// Returns a new shared buffer and the container for Archivist.
180    pub fn new(
181        ring_buffer: ring_buffer::Reader,
182        on_inactive: OnInactive,
183        options: SharedBufferOptions,
184        inspect_node: &Node,
185    ) -> Arc<Self> {
186        let saturation_curve =
187            Arc::new(SaturationCurve::new(inspect_node, options.saturation_curve_size));
188        let this = Arc::new_cyclic(|weak: &Weak<Self>| Self {
189            inner: Condition::new(Inner {
190                ring_buffer: Arc::clone(&ring_buffer),
191                containers: Containers::default(),
192                thread_msg_queue: VecDeque::default(),
193                last_scanned: 0,
194                last_scanned_message_id: 0,
195                tail: 0,
196                tail_message_id: 0,
197                iob_peers: Slab::default(),
198                terminated: false,
199            }),
200            sockets: Mutex::new(Slab::default()),
201            on_inactive,
202            port: zx::Port::create(),
203            event: zx::Event::create(),
204            socket_thread: Mutex::default(),
205            saturation_curve,
206            _buffer_monitor_task: fasync::Task::spawn(Self::buffer_monitor_task(
207                Weak::clone(weak),
208                ring_buffer,
209                options.sleep_time,
210            )),
211        });
212
213        *this.socket_thread.lock() = Some({
214            let this = Arc::clone(&this);
215            std::thread::spawn(move || this.socket_thread(options.sleep_time))
216        });
217        this
218    }
219
220    pub fn new_container_buffer(
221        self: &Arc<Self>,
222        identity: Arc<ComponentIdentity>,
223        stats: Arc<LogStreamStats>,
224    ) -> ContainerBuffer {
225        let mut inner = self.inner.lock();
226        let Inner { containers, ring_buffer, .. } = &mut *inner;
227        ContainerBuffer {
228            shared_buffer: Arc::clone(self),
229            container_id: containers.new_container(ring_buffer, Arc::clone(&identity), stats),
230        }
231    }
232
233    pub async fn flush(&self) {
234        let (sender, receiver) = oneshot::channel();
235        self.inner.lock().thread_msg_queue.push_back(ThreadMessage::Flush(sender));
236        self.event.signal(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
237        // Ignore failure if Archivist is shutting down.
238        let _ = receiver.await;
239    }
240
241    /// Returns the number of registered containers in use by the buffer.
242    #[cfg(test)]
243    pub fn container_count(&self) -> usize {
244        self.inner.lock().containers.len()
245    }
246
247    /// Terminates the socket thread. The socket thread will drain the sockets before terminating.
248    /// Returns a future that may be awaited if the caller wants to wait for the socket thread to
249    /// terminate.
250    pub fn terminate(self: &Arc<Self>) -> impl Future<Output = ()> {
251        {
252            let mut inner = InnerGuard::new(self);
253            self.flush_sockets(&mut inner);
254            inner.thread_msg_queue.push_back(ThreadMessage::Terminate);
255        }
256        self.event.signal(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
257        let join_handle = self.socket_thread.lock().take().unwrap();
258        fasync::unblock(|| {
259            let _ = join_handle.join();
260        })
261    }
262
263    /// Returns a cursor that will return messages that match selector. There will be a limited
264    /// attempt to reorder messages by timestamp.  If no selectors are specified, *all* messages
265    /// will be returned.
266    pub fn cursor(
267        self: &Arc<Self>,
268        mode: StreamMode,
269        selectors: Vec<ComponentSelector>,
270    ) -> FilterCursor {
271        InnerGuard::new(self).cursor(mode, selectors)
272    }
273
274    fn socket_thread(self: Arc<Self>, sleep_time: Duration) {
275        const INTERRUPT_KEY: u64 = u64::MAX;
276        let mut sockets_ready = Vec::new();
277        let mut iob_peer_closed = Vec::new();
278        let mut interrupt_needs_arming = true;
279        let mut msg = None;
280
281        loop {
282            let mut deadline = if msg.is_some() {
283                zx::MonotonicInstant::INFINITE_PAST
284            } else {
285                if interrupt_needs_arming {
286                    self.event
287                        .wait_async(
288                            &self.port,
289                            INTERRUPT_KEY,
290                            zx::Signals::USER_0,
291                            zx::WaitAsyncOpts::empty(),
292                        )
293                        .unwrap();
294                    interrupt_needs_arming = false;
295                }
296
297                // Wait so that we're not constantly waking up for every log message that is queued.
298                // Ignore errors here.
299                let _ = self
300                    .event
301                    .wait_one(zx::Signals::USER_0, zx::MonotonicInstant::after(sleep_time.into()));
302                zx::MonotonicInstant::INFINITE
303            };
304
305            // Gather the list of sockets that are ready to read.
306            loop {
307                match self.port.wait(deadline) {
308                    Ok(packet) => {
309                        if packet.key() == INTERRUPT_KEY {
310                            interrupt_needs_arming = true;
311                            // To maintain proper ordering, we must capture the message here whilst
312                            // we are still gathering the list of sockets that are ready to read.
313                            // If we wait till later, we introduce windows where we might miss a
314                            // socket that should be read.
315                            if msg.is_none() {
316                                msg = self.inner.lock().thread_msg_queue.pop_front();
317                            }
318                        } else if packet.key() & IOB_PEER_CLOSED_KEY_BASE != 0 {
319                            iob_peer_closed.push(packet.key() as u32);
320                        } else {
321                            sockets_ready.push(SocketId(packet.key() as u32))
322                        }
323                    }
324                    Err(zx::Status::TIMED_OUT) => break,
325                    Err(status) => panic!("port wait error: {status:?}"),
326                }
327                deadline = zx::MonotonicInstant::INFINITE_PAST;
328            }
329
330            let mut inner = InnerGuard::new(&self);
331
332            if !iob_peer_closed.is_empty() {
333                // See the comment on `is_active()` for why this is required.
334                inner.update_message_ids(inner.ring_buffer.head());
335
336                for iob_peer_closed in iob_peer_closed.drain(..) {
337                    let container_id = inner.iob_peers.free(iob_peer_closed).0;
338                    if let Some(container) = inner.containers.get_mut(container_id) {
339                        container.iob_count -= 1;
340                        if container.iob_count == 0 && !container.is_active() {
341                            if container.should_free() {
342                                inner.containers.free(container_id);
343                            } else {
344                                let identity = Arc::clone(&container.identity);
345                                inner.on_inactive.push(identity);
346                            }
347                        }
348                    }
349                }
350            }
351
352            {
353                let mut sockets = self.sockets.lock();
354                for socket_id in sockets_ready.drain(..) {
355                    inner.read_socket(&mut sockets, socket_id, |socket| {
356                        socket
357                            .socket
358                            .wait_async(
359                                &self.port,
360                                socket_id.0 as u64,
361                                zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
362                                zx::WaitAsyncOpts::empty(),
363                            )
364                            .unwrap();
365                    });
366                }
367            }
368
369            // Now that we've processed the sockets, we can process the message.
370            if let Some(m) = msg.take() {
371                match m {
372                    ThreadMessage::Terminate => {
373                        inner.terminated = true;
374                        inner.wake = true;
375                        return;
376                    }
377                    ThreadMessage::Flush(sender) => {
378                        let _ = sender.send(());
379                    }
380                }
381
382                // See if there's another message.
383                msg = inner.thread_msg_queue.pop_front();
384                if msg.is_none() {
385                    // If there are no more messages, we must clear the signal so that we get
386                    // notified when the next message arrives. This must be done whilst we are
387                    // holding the lock.
388                    self.event.signal(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
389                }
390            }
391        }
392    }
393
394    async fn buffer_monitor_task(
395        this: Weak<Self>,
396        mut ring_buffer: ring_buffer::Reader,
397        sleep_time: Duration,
398    ) {
399        let mut last_head = 0;
400        loop {
401            // Sleep to limit how often we wake up.
402            fasync::Timer::new(sleep_time).await;
403            let head = ring_buffer.wait(last_head).await;
404            let Some(this) = this.upgrade() else { return };
405            let mut inner = InnerGuard::new(&this);
406            inner.check_space(head);
407            last_head = head;
408        }
409    }
410
411    /// Flushes sockets and returns the head index.
412    fn flush_sockets(&self, inner: &mut InnerGuard<'_>) -> u64 {
413        let mut sockets = self.sockets.lock();
414        let mut socket_id = 0;
415        while let Some(next_id) = sockets.next_used(socket_id) {
416            // The socket doesn't need to be rearmed here.
417            inner.read_socket(&mut sockets, SocketId(next_id), |_| {});
418            socket_id = next_id + 1;
419        }
420        let head = inner.ring_buffer.head();
421        inner.update_message_ids(head);
422        head
423    }
424}
425
426impl Inner {
427    // Returns list of containers that no longer have any messages.
428    fn ingest(&mut self, msg: &[u8], container_id: ContainerId) {
429        if msg.len() < std::mem::size_of::<Header>() {
430            debug!("message too short ({})", msg.len());
431            if let Some(container) = self.containers.get(container_id) {
432                container.stats.increment_invalid(msg.len());
433            }
434            return;
435        }
436
437        let header = Header::read_from_bytes(&msg[..std::mem::size_of::<Header>()]).unwrap();
438
439        // NOTE: Some tests send messages that are bigger than the header indicates. We ignore the
440        // tail of any such messages.
441        let msg_len = header.size_words() as usize * 8;
442
443        // Check the correct type and size.
444        if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg.len() < msg_len {
445            debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, msg.len());
446            if let Some(container) = self.containers.get(container_id) {
447                container.stats.increment_invalid(msg.len());
448            }
449            return;
450        }
451
452        let Some(container) = self.containers.get_mut(container_id) else {
453            return;
454        };
455
456        let mut data;
457        let msg = if container.dropped_count > 0 {
458            data = msg.to_vec();
459            if !add_dropped_count(&mut data, container.dropped_count) {
460                debug!("unable to add dropped count to invalid message");
461                container.stats.increment_invalid(data.len());
462                return;
463            }
464            &data
465        } else {
466            msg
467        };
468
469        if container.iob.write(Default::default(), 0, msg).is_err() {
470            // We were unable to write the message to the buffer, most likely due to lack of
471            // space. We drop this message and then we'll add to the dropped count for the next
472            // message.
473            container.dropped_count += 1
474        } else {
475            container.dropped_count = 0;
476        }
477    }
478
479    /// Parses the first message within `range` returns (container_id, message, timestamp).
480    ///
481    /// # Panics
482    ///
483    /// This will panic if the ring buffer has been corrupted. Only the kernel and Archivist can
484    /// write to the ring buffer and so we trust both not to corrupt the ring buffer.
485    ///
486    /// # Safety
487    ///
488    /// `range` *must* be within the written range of the ring buffer so that there is no concurrent
489    /// write access to that range. The returned slice is only valid whilst this remains true.
490    unsafe fn parse_message(
491        &self,
492        range: Range<u64>,
493    ) -> (ContainerId, &[u8], Option<zx::BootInstant>) {
494        let (tag, msg) = unsafe { self.ring_buffer.first_message_in(range) }
495            .expect("Unable to read message from ring buffer");
496        (
497            ContainerId(tag as u32),
498            msg,
499            (msg.len() >= 16)
500                .then(|| zx::BootInstant::from_nanos(i64::read_from_bytes(&msg[8..16]).unwrap())),
501        )
502    }
503}
504
505impl<'a> InnerGuard<'a> {
506    fn new(buffer: &'a Arc<SharedBuffer>) -> Self {
507        Self {
508            buffer,
509            guard: ManuallyDrop::new(buffer.inner.lock()),
510            on_inactive: Vec::new(),
511            wake: false,
512        }
513    }
514
515    /// Pops a message and returns its total size. The caller should call `update_message_ids(head)`
516    /// prior to calling this.
517    ///
518    /// NOTE: This will pop the oldest message in terms of when it was inserted which is *not*
519    /// necessarily the message with the *oldest* timestamp because we might not have received the
520    /// messages in perfect timestamp order. This should be close enough for all use cases we care
521    /// about, and besides, the timestamps can't be trusted anyway.
522    fn pop(&mut self, head: u64) -> Option<usize> {
523        if head == self.tail {
524            return None;
525        }
526
527        // SAFETY: There can be no concurrent writes between `tail..head` and the *only* place
528        // we increment the tail index is just before we leave this function.
529        let record_len = {
530            let (container_id, message, _) = unsafe { self.parse_message(self.tail..head) };
531            let record_len = ring_buffer_record_len(message.len());
532
533            let container = self.containers.get_mut(container_id).unwrap();
534
535            container.stats.increment_rolled_out(record_len);
536            container.msg_ids.start += 1;
537            if !container.is_active() {
538                if container.should_free() {
539                    self.containers.free(container_id);
540                } else {
541                    let identity = Arc::clone(&container.identity);
542                    self.on_inactive.push(identity);
543                }
544            }
545
546            record_len
547        };
548
549        // NOTE: This should go last. After incrementing `tail`, the `message` can be overwritten.
550        self.ring_buffer.increment_tail(record_len);
551        self.tail += record_len as u64;
552        self.tail_message_id += 1;
553
554        // The caller should have called `update_message_ids(head)` prior to calling this.
555        assert!(self.last_scanned >= self.tail);
556
557        Some(record_len)
558    }
559
560    /// Reads a socket. Calls `rearm` to rearm the socket once it has been drained.
561    fn read_socket(
562        &mut self,
563        sockets: &mut Slab<Socket>,
564        socket_id: SocketId,
565        rearm: impl FnOnce(&mut Socket),
566    ) {
567        let Some(socket) = sockets.get_mut(socket_id.0) else { return };
568        let container_id = socket.container_id;
569
570        loop {
571            self.check_space(self.ring_buffer.head());
572
573            let mut data = Vec::with_capacity(MAX_DATAGRAM_LEN_BYTES as usize);
574
575            // Read directly into the buffer leaving space for the header.
576            let len = match socket.socket.read_uninit(data.spare_capacity_mut()) {
577                Ok(d) => d.len(),
578                Err(zx::Status::SHOULD_WAIT) => {
579                    // The socket has been drained.
580                    rearm(socket);
581                    return;
582                }
583                Err(_) => break,
584            };
585
586            // SAFETY: `read_uninit` will have written to `len` bytes.
587            unsafe {
588                data.set_len(len);
589            }
590
591            let container = self.containers.get_mut(container_id).unwrap();
592            if data.len() < 16 {
593                container.stats.increment_invalid(data.len());
594                continue;
595            }
596
597            let header = Header::read_from_bytes(&data[..std::mem::size_of::<Header>()]).unwrap();
598            let msg_len = header.size_words() as usize * 8;
599            if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg_len != data.len() {
600                debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, data.len());
601                container.stats.increment_invalid(data.len());
602                continue;
603            }
604
605            if container.dropped_count > 0 && !add_dropped_count(&mut data, container.dropped_count)
606            {
607                debug!("unable to add dropped count to invalid message");
608                container.stats.increment_invalid(data.len());
609                continue;
610            }
611
612            if container.iob.write(Default::default(), 0, &data).is_err() {
613                // We were unable to write the message to the buffer, most likely due to lack of
614                // space. We drop this message and then we'll add to the dropped count for the next
615                // message.
616                container.dropped_count += 1
617            } else {
618                container.dropped_count = 0;
619            }
620        }
621
622        // This path is taken when the socket should be closed.
623
624        // See the comment on `is_active()` for why this is required.
625        self.update_message_ids(self.ring_buffer.head());
626
627        let container = self.containers.get_mut(container_id).unwrap();
628        container.remove_socket(socket_id, sockets);
629        if !container.is_active() {
630            if container.should_free() {
631                self.containers.free(container_id);
632            } else {
633                let identity = Arc::clone(&container.identity);
634                self.on_inactive.push(identity);
635            }
636        }
637    }
638
639    /// Scans the ring buffer and updates `msg_ids` for the containers.
640    fn update_message_ids(&mut self, head: u64) {
641        while self.last_scanned < head {
642            // SAFETY: This is safe because `head` must be within the ring buffer range and we make
643            // sure that `self.last_scanned` is always >= `tail` in `pop()`.
644            let (container_id, message, _) = unsafe { self.parse_message(self.last_scanned..head) };
645            let msg_len = message.len();
646            let severity = (msg_len >= 8)
647                .then(|| Header::read_from_bytes(&message[0..8]).unwrap().severity().into());
648            let container = self.containers.get_mut(container_id).unwrap();
649            container.msg_ids.end += 1;
650            if let Some(severity) = severity {
651                container.stats.ingest_message(msg_len, severity);
652            }
653            self.last_scanned += ring_buffer_record_len(msg_len) as u64;
654            self.last_scanned_message_id += 1;
655            self.wake = true;
656        }
657        if self.wake {
658            let boot_time = zx::BootInstant::get().into_nanos();
659            let count = self.last_scanned_message_id - self.tail_message_id;
660            self.buffer.saturation_curve.record(boot_time, count);
661        }
662    }
663
664    /// Ensures the buffer keeps the required amount of space.
665    fn check_space(&mut self, head: u64) {
666        self.update_message_ids(head);
667        let capacity = self.ring_buffer.capacity();
668        let mut space = capacity
669            .checked_sub((head - self.tail) as usize)
670            .unwrap_or_else(|| panic!("bad range: {:?}", self.tail..head));
671        let required_space = capacity * SPACE_THRESHOLD_NUMERATOR / SPACE_THRESHOLD_DENOMINATOR;
672        while space < required_space {
673            let Some(amount) = self.pop(head) else { break };
674            space += amount;
675        }
676    }
677
678    /// Returns a cursor.
679    fn cursor(&mut self, mode: StreamMode, selectors: Vec<ComponentSelector>) -> FilterCursor {
680        // NOTE: It is not safe to use on_inactive in this function because this function can be
681        // called whilst locks are held which are the same locks that the on_inactive notification
682        // uses.
683        let (index, message_id) = match mode {
684            StreamMode::Snapshot => (self.tail, self.tail_message_id),
685            StreamMode::Subscribe => {
686                let head = self.ring_buffer.head();
687                self.update_message_ids(head);
688                (self.last_scanned, self.last_scanned_message_id)
689            }
690            StreamMode::SnapshotThenSubscribe => (self.tail, self.tail_message_id),
691        };
692        FilterCursor::new(
693            Arc::clone(self.buffer),
694            index,
695            message_id,
696            matches!(mode, StreamMode::Snapshot),
697            selectors,
698        )
699    }
700}
701
702#[derive(Default)]
703struct Containers {
704    slab: Slab<ContainerInfo>,
705}
706
707#[derive(Clone, Copy, Debug, Eq, PartialEq)]
708struct ContainerId(u32);
709
710impl Containers {
711    #[cfg(test)]
712    fn len(&self) -> usize {
713        self.slab.len()
714    }
715
716    fn get(&self, id: ContainerId) -> Option<&ContainerInfo> {
717        self.slab.get(id.0)
718    }
719
720    fn get_mut(&mut self, id: ContainerId) -> Option<&mut ContainerInfo> {
721        self.slab.get_mut(id.0)
722    }
723
724    fn new_container(
725        &mut self,
726        buffer: &RingBuffer,
727        identity: Arc<ComponentIdentity>,
728        stats: Arc<LogStreamStats>,
729    ) -> ContainerId {
730        ContainerId(self.slab.insert(|id| {
731            let (iob, _) = buffer.new_iob_writer(id as u64).unwrap();
732            ContainerInfo::new(identity, stats, iob)
733        }))
734    }
735
736    fn free(&mut self, id: ContainerId) {
737        self.slab.free(id.0);
738    }
739}
740
741#[derive(Derivative)]
742#[derivative(Debug)]
743struct ContainerInfo {
744    // The identity of the container.
745    identity: Arc<ComponentIdentity>,
746
747    // The first and last message IDs stored in the shared buffer. The last message ID can be out of
748    // date if there are concurrent writers.
749    msg_ids: Range<u64>,
750
751    // Whether the container is terminated.
752    terminated: bool,
753
754    // Inspect instrumentation.
755    #[derivative(Debug = "ignore")]
756    stats: Arc<LogStreamStats>,
757
758    // The first socket ID for this container.
759    first_socket_id: SocketId,
760
761    // The IOBuffer used for writing.
762    iob: zx::Iob,
763
764    // The number of client IOBuffers.
765    iob_count: usize,
766
767    // The number of messages dropped when forwarding from a socket to an IOBuffer.
768    dropped_count: u64,
769}
770
771impl ContainerInfo {
772    fn new(identity: Arc<ComponentIdentity>, stats: Arc<LogStreamStats>, iob: zx::Iob) -> Self {
773        Self {
774            identity,
775            msg_ids: 0..0,
776            terminated: false,
777            stats,
778            first_socket_id: SocketId::NULL,
779            iob,
780            iob_count: 0,
781            dropped_count: 0,
782        }
783    }
784
785    fn should_free(&self) -> bool {
786        self.terminated && !self.is_active()
787    }
788
789    // Returns true if the container is considered active which is the case if it has sockets, io
790    // buffers, or buffered messages.
791    //
792    // NOTE: Whenever a socket or iob is closed, `update_message_ids` must called to ensure
793    // `msg_ids.end` is correctly set.
794    fn is_active(&self) -> bool {
795        self.first_socket_id != SocketId::NULL
796            || self.iob_count > 0
797            || self.msg_ids.end != self.msg_ids.start
798            || ARCHIVIST_MONIKER.get().is_some_and(|m| *self.identity == *m)
799    }
800
801    // # Panics
802    //
803    // This will panic if the socket isn't found.
804    fn remove_socket(&mut self, socket_id: SocketId, sockets: &mut Slab<Socket>) {
805        let Socket { prev, next, .. } = *sockets.get(socket_id.0).unwrap();
806        if prev == SocketId::NULL {
807            self.first_socket_id = next;
808        } else {
809            sockets.get_mut(prev.0).unwrap().next = next;
810        }
811        if next != SocketId::NULL {
812            sockets
813                .get_mut(next.0)
814                .unwrap_or_else(|| panic!("next {next:?} has been freed!"))
815                .prev = prev;
816        }
817        sockets.free(socket_id.0);
818        self.stats.close_socket();
819        debug!(identity:% = self.identity; "Socket closed.");
820    }
821}
822
823pub struct ContainerBuffer {
824    shared_buffer: Arc<SharedBuffer>,
825    container_id: ContainerId,
826}
827
828impl ContainerBuffer {
829    /// Returns the tag used by IOBuffers used for this component.
830    pub fn iob_tag(&self) -> u64 {
831        self.container_id.0 as u64
832    }
833
834    /// Ingests a new message.
835    ///
836    /// If the message is invalid, it is dropped.
837    pub fn push_back(&self, msg: &[u8]) {
838        self.shared_buffer.inner.lock().ingest(msg, self.container_id);
839    }
840
841    /// Returns the identity of the container if it still exists.
842    pub fn identity(&self) -> Option<Arc<ComponentIdentity>> {
843        self.shared_buffer
844            .inner
845            .lock()
846            .containers
847            .get(self.container_id)
848            .map(|c| Arc::clone(&c.identity))
849    }
850
851    /// Returns an IOBuffer for the container.
852    pub fn iob(&self) -> zx::Iob {
853        let mut inner = self.shared_buffer.inner.lock();
854
855        inner.containers.get_mut(self.container_id).unwrap().iob_count += 1;
856
857        let (ep0, ep1) = inner.ring_buffer.new_iob_writer(self.container_id.0 as u64).unwrap();
858
859        inner.iob_peers.insert(|idx| {
860            ep1.wait_async(
861                &self.shared_buffer.port,
862                idx as u64 | IOB_PEER_CLOSED_KEY_BASE,
863                zx::Signals::IOB_PEER_CLOSED,
864                zx::WaitAsyncOpts::empty(),
865            )
866            .unwrap();
867
868            (self.container_id, ep1)
869        });
870
871        ep0
872    }
873
874    /// Marks the buffer as terminated which will force all cursors to end and close all sockets.
875    /// The component's data will remain in the buffer until the messages are rolled out. This will
876    /// *not* drain sockets or close IOBuffers.
877    pub fn terminate(&self) {
878        let mut inner = InnerGuard::new(&self.shared_buffer);
879
880        // See the comment on `is_active()` for why this is required.
881        inner.update_message_ids(inner.ring_buffer.head());
882
883        if let Some(container) = inner.containers.get_mut(self.container_id) {
884            container.terminated = true;
885            if container.first_socket_id != SocketId::NULL {
886                let mut sockets = self.shared_buffer.sockets.lock();
887                loop {
888                    container.remove_socket(container.first_socket_id, &mut sockets);
889                    if container.first_socket_id == SocketId::NULL {
890                        break;
891                    }
892                }
893            }
894            if container.should_free() {
895                inner.containers.free(self.container_id);
896            }
897            inner.wake = true;
898        }
899    }
900
901    /// Returns true if the container has messages, sockets or IOBuffers.
902    pub fn is_active(&self) -> bool {
903        self.shared_buffer
904            .inner
905            .lock()
906            .containers
907            .get(self.container_id)
908            .is_some_and(|c| c.is_active())
909    }
910
911    /// Adds a socket for this container.
912    pub fn add_socket(&self, socket: zx::Socket) {
913        let mut inner = self.shared_buffer.inner.lock();
914        let Some(container) = inner.containers.get_mut(self.container_id) else { return };
915        container.stats.open_socket();
916        let next = container.first_socket_id;
917        let mut sockets = self.shared_buffer.sockets.lock();
918        let socket_id = SocketId(sockets.insert(|socket_id| {
919            socket
920                .wait_async(
921                    &self.shared_buffer.port,
922                    socket_id as u64,
923                    zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
924                    zx::WaitAsyncOpts::empty(),
925                )
926                .unwrap();
927            Socket { socket, container_id: self.container_id, prev: SocketId::NULL, next }
928        }));
929        if next != SocketId::NULL {
930            sockets.get_mut(next.0).unwrap().prev = socket_id;
931        }
932        container.first_socket_id = socket_id;
933    }
934}
935
936impl Drop for ContainerBuffer {
937    fn drop(&mut self) {
938        self.terminate();
939    }
940}
941
942/// Implements a simple Slab allocator.
943struct Slab<T> {
944    slab: Vec<Slot<T>>,
945    free_index: usize,
946}
947
948impl<T> Default for Slab<T> {
949    fn default() -> Self {
950        Self { slab: Vec::new(), free_index: usize::MAX }
951    }
952}
953
954enum Slot<T> {
955    Used(T),
956    Free(usize),
957}
958
959impl<T> Slab<T> {
960    /// Returns the number of used entries. This is not performant.
961    #[cfg(test)]
962    fn len(&self) -> usize {
963        self.slab.iter().filter(|c| matches!(c, Slot::Used(_))).count()
964    }
965
966    fn free(&mut self, index: u32) -> T {
967        let index = index as usize;
968        let value = match std::mem::replace(&mut self.slab[index], Slot::Free(self.free_index)) {
969            Slot::Free(_) => panic!("Slot already free"),
970            Slot::Used(value) => value,
971        };
972        self.free_index = index;
973        value
974    }
975
976    fn get(&self, id: u32) -> Option<&T> {
977        self.slab.get(id as usize).and_then(|s| match s {
978            Slot::Used(s) => Some(s),
979            _ => None,
980        })
981    }
982
983    fn get_mut(&mut self, id: u32) -> Option<&mut T> {
984        self.slab.get_mut(id as usize).and_then(|s| match s {
985            Slot::Used(s) => Some(s),
986            _ => None,
987        })
988    }
989
990    fn insert(&mut self, value: impl FnOnce(u32) -> T) -> u32 {
991        let free_index = self.free_index;
992        if free_index != usize::MAX {
993            self.free_index = match std::mem::replace(
994                &mut self.slab[free_index],
995                Slot::Used(value(free_index as u32)),
996            ) {
997                Slot::Free(next) => next,
998                _ => unreachable!(),
999            };
1000            free_index as u32
1001        } else {
1002            // This is < rather than <= because we reserve 0xffff_ffff to be used as a NULL value
1003            // (see SocketId::NULL below).
1004            assert!(self.slab.len() < u32::MAX as usize);
1005            self.slab.push(Slot::Used(value(self.slab.len() as u32)));
1006            (self.slab.len() - 1) as u32
1007        }
1008    }
1009
1010    /// Returns the next used slot starting from `id` (inclusive).
1011    fn next_used(&self, id: u32) -> Option<u32> {
1012        let mut id = id as usize;
1013        while id < self.slab.len() {
1014            if matches!(self.slab[id], Slot::Used(_)) {
1015                return Some(id as u32);
1016            }
1017            id += 1;
1018        }
1019        None
1020    }
1021}
1022
1023#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1024struct SocketId(u32);
1025
1026impl SocketId {
1027    const NULL: Self = SocketId(0xffff_ffff);
1028}
1029
1030struct Socket {
1031    socket: zx::Socket,
1032    container_id: ContainerId,
1033    // Sockets are stored as a linked list for each container.
1034    prev: SocketId,
1035    next: SocketId,
1036}
1037
1038#[cfg(test)]
1039mod tests {
1040    use super::{SharedBuffer, SharedBufferOptions, Slab, create_ring_buffer};
1041    use crate::identity::ComponentIdentity;
1042    use crate::logs::stats::LogStreamStats;
1043    use crate::logs::testing::make_message;
1044    use assert_matches::assert_matches;
1045    use diagnostics_assertions::{AnyProperty, assert_data_tree};
1046    use fidl_fuchsia_diagnostics::StreamMode;
1047    use fuchsia_async as fasync;
1048    use fuchsia_async::TimeoutExt;
1049    use fuchsia_inspect::Inspector;
1050    use futures::FutureExt;
1051    use futures::channel::mpsc;
1052    use futures::future::OptionFuture;
1053    use futures::stream::{FuturesUnordered, StreamExt as _};
1054    use moniker::ExtendedMoniker;
1055    use ring_buffer::MAX_MESSAGE_SIZE;
1056    use selectors::SelectorExt;
1057    use std::future::poll_fn;
1058    use std::iter::repeat_with;
1059    use std::pin::pin;
1060    use std::sync::Arc;
1061    use std::sync::atomic::{AtomicU64, Ordering};
1062    use std::task::Poll;
1063    use std::time::Duration;
1064    use zerocopy::FromBytes;
1065
1066    fn test_stats() -> Arc<LogStreamStats> {
1067        Arc::new(LogStreamStats::new(
1068            &fuchsia_inspect::Node::default(),
1069            &ComponentIdentity::unknown(),
1070        ))
1071    }
1072
1073    fn container_cursor(
1074        buffer: &Arc<SharedBuffer>,
1075        container: &super::ContainerBuffer,
1076        mode: StreamMode,
1077    ) -> Option<impl futures::Stream<Item = Box<[u8]>>> {
1078        let selectors = vec![container.identity()?.moniker.clone().into_component_selector()];
1079        let mut cursor = Box::pin(buffer.cursor(mode, selectors));
1080        Some(futures::stream::poll_fn(move |cx| {
1081            cursor.as_mut().poll_next(cx).map(|m| m.map(|m| m.parse().unwrap().2.into()))
1082        }))
1083    }
1084
1085    async fn yield_to_executor() {
1086        let mut first_time = true;
1087        poll_fn(|cx| {
1088            if first_time {
1089                cx.waker().wake_by_ref();
1090                first_time = false;
1091                Poll::Pending
1092            } else {
1093                Poll::Ready(())
1094            }
1095        })
1096        .await;
1097    }
1098
1099    #[fuchsia::test]
1100    async fn push_one_message() {
1101        let buffer = SharedBuffer::new(
1102            create_ring_buffer(MAX_MESSAGE_SIZE),
1103            Box::new(|_| {}),
1104            Default::default(),
1105            &fuchsia_inspect::Node::default(),
1106        );
1107        let container_buffer =
1108            buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
1109        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1110        container_buffer.push_back(msg.bytes());
1111
1112        // Make sure the cursor can find it.
1113        let cursor = container_cursor(&buffer, &container_buffer, StreamMode::Snapshot).unwrap();
1114        assert_eq!(cursor.map(|item| assert_eq!(&*item, msg.bytes())).count().await, 1);
1115    }
1116
1117    #[fuchsia::test]
1118    async fn message_too_short() {
1119        let buffer = SharedBuffer::new(
1120            create_ring_buffer(MAX_MESSAGE_SIZE),
1121            Box::new(|_| {}),
1122            Default::default(),
1123            &fuchsia_inspect::Node::default(),
1124        );
1125
1126        let container_buffer =
1127            buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
1128        container_buffer.push_back(&[0]);
1129
1130        assert_eq!(
1131            container_cursor(&buffer, &container_buffer, StreamMode::Snapshot)
1132                .unwrap()
1133                .count()
1134                .await,
1135            0
1136        );
1137    }
1138
1139    #[fuchsia::test]
1140    async fn bad_type() {
1141        let buffer = SharedBuffer::new(
1142            create_ring_buffer(MAX_MESSAGE_SIZE),
1143            Box::new(|_| {}),
1144            Default::default(),
1145            &fuchsia_inspect::Node::default(),
1146        );
1147        let container_buffer =
1148            buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
1149        container_buffer.push_back(&[0x77; 16]);
1150
1151        assert_eq!(
1152            container_cursor(&buffer, &container_buffer, StreamMode::Snapshot)
1153                .unwrap()
1154                .count()
1155                .await,
1156            0
1157        );
1158    }
1159
1160    #[fuchsia::test]
1161    async fn message_truncated() {
1162        let buffer = SharedBuffer::new(
1163            create_ring_buffer(MAX_MESSAGE_SIZE),
1164            Box::new(|_| {}),
1165            Default::default(),
1166            &fuchsia_inspect::Node::default(),
1167        );
1168        let container_buffer =
1169            buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
1170        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1171        container_buffer.push_back(&msg.bytes()[..msg.bytes().len() - 1]);
1172
1173        assert_eq!(
1174            container_cursor(&buffer, &container_buffer, StreamMode::Snapshot)
1175                .unwrap()
1176                .count()
1177                .await,
1178            0
1179        );
1180    }
1181
1182    #[fuchsia::test]
1183    async fn buffer_wrapping() {
1184        let buffer = SharedBuffer::new(
1185            create_ring_buffer(MAX_MESSAGE_SIZE),
1186            Box::new(|_| {}),
1187            SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
1188            &fuchsia_inspect::Node::default(),
1189        );
1190        let container_buffer =
1191            buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
1192
1193        // Keep writing messages until we wrap.
1194        let mut i = 0;
1195        loop {
1196            let msg = make_message(&format!("{i}"), None, zx::BootInstant::from_nanos(i));
1197            container_buffer.push_back(msg.bytes());
1198            i += 1;
1199
1200            // Yield to the executor to allow messages to be rolled out.
1201            yield_to_executor().await;
1202
1203            let inner = buffer.inner.lock();
1204            if inner.ring_buffer.head() > inner.ring_buffer.capacity() as u64 {
1205                break;
1206            }
1207        }
1208
1209        // Read back all the messages.
1210        let mut cursor =
1211            pin!(container_cursor(&buffer, &container_buffer, StreamMode::Snapshot).unwrap());
1212
1213        let mut j;
1214        let item = cursor.next().await;
1215        assert_matches!(
1216            item,
1217            Some(item) => {
1218                j = i64::read_from_bytes(&item[8..16]).unwrap();
1219                let msg = make_message(&format!("{j}"),
1220                                       None,
1221                                       zx::BootInstant::from_nanos(j));
1222                assert_eq!(&*item, msg.bytes());
1223            }
1224        );
1225
1226        j += 1;
1227        while j != i {
1228            assert_matches!(
1229                cursor.next().await,
1230                Some(item) => {
1231                    assert_eq!(&*item, make_message(&format!("{j}"),
1232                                                         None,
1233                                                         zx::BootInstant::from_nanos(j)).bytes());
1234                }
1235            );
1236            j += 1;
1237        }
1238
1239        assert!(cursor.next().await.is_none());
1240    }
1241
1242    #[fuchsia::test]
1243    async fn on_inactive() {
1244        let identity = Arc::new(vec!["a"].into());
1245        let on_inactive = Arc::new(AtomicU64::new(0));
1246        let buffer = {
1247            let on_inactive = Arc::clone(&on_inactive);
1248            let identity = Arc::clone(&identity);
1249            Arc::new(SharedBuffer::new(
1250                create_ring_buffer(MAX_MESSAGE_SIZE),
1251                Box::new(move |i| {
1252                    assert_eq!(i, identity);
1253                    on_inactive.fetch_add(1, Ordering::Relaxed);
1254                }),
1255                SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
1256                &fuchsia_inspect::Node::default(),
1257            ))
1258        };
1259        let container_a = buffer.new_container_buffer(identity, test_stats());
1260        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), test_stats());
1261
1262        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1263        container_a.push_back(msg.bytes());
1264
1265        // Repeatedly write messages to b until a is rolled out.
1266        while container_cursor(&buffer, &container_a, StreamMode::Snapshot).unwrap().count().await
1267            == 1
1268        {
1269            container_b.push_back(msg.bytes());
1270
1271            // Yield to the executor to allow messages to be rolled out.
1272            yield_to_executor().await;
1273        }
1274
1275        assert_eq!(on_inactive.load(Ordering::Relaxed), 1);
1276    }
1277
1278    #[fuchsia::test]
1279    async fn terminate_drops_container() {
1280        // Silence a clippy warning; SharedBuffer needs an executor.
1281        async {}.await;
1282
1283        let buffer = SharedBuffer::new(
1284            create_ring_buffer(MAX_MESSAGE_SIZE),
1285            Box::new(|_| {}),
1286            SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
1287            &fuchsia_inspect::Node::default(),
1288        );
1289
1290        // terminate when buffer has no logs.
1291        let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
1292        assert_eq!(buffer.container_count(), 1);
1293        container_a.terminate();
1294
1295        assert_eq!(buffer.container_count(), 0);
1296
1297        // terminate when buffer has logs.
1298        let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
1299        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1300        container_a.push_back(msg.bytes());
1301        assert_eq!(buffer.container_count(), 1);
1302        container_a.terminate();
1303
1304        // The container should still be there because it has logs.
1305        assert_eq!(buffer.container_count(), 1);
1306
1307        // Roll out the logs.
1308        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), test_stats());
1309        assert_eq!(buffer.container_count(), 2);
1310
1311        // Repeatedly write messages to b until a's message is dropped and then the container will
1312        // be dropped.
1313        while buffer.container_count() != 1 {
1314            container_b.push_back(msg.bytes());
1315
1316            // Yield to the executor to allow messages to be rolled out.
1317            yield_to_executor().await;
1318        }
1319
1320        assert!(container_cursor(&buffer, &container_a, StreamMode::Subscribe).is_none());
1321    }
1322
1323    #[fuchsia::test]
1324    async fn cursor_subscribe() {
1325        for mode in [StreamMode::Subscribe, StreamMode::SnapshotThenSubscribe] {
1326            let buffer = SharedBuffer::new(
1327                create_ring_buffer(MAX_MESSAGE_SIZE),
1328                Box::new(|_| {}),
1329                Default::default(),
1330                &fuchsia_inspect::Node::default(),
1331            );
1332            let container =
1333                Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats()));
1334            let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1335            container.push_back(msg.bytes());
1336
1337            let (sender, mut receiver) = mpsc::unbounded();
1338
1339            // Run the cursor in a separate task so that we can test it gets woken correctly.
1340            {
1341                let container = Arc::clone(&container);
1342                fasync::Task::spawn(async move {
1343                    let mut cursor =
1344                        pin!(container_cursor(&container.shared_buffer, &container, mode).unwrap());
1345                    while let Some(item) = cursor.next().await {
1346                        sender.unbounded_send(item).unwrap();
1347                    }
1348                })
1349                .detach();
1350            }
1351
1352            // The existing message should only be returned with SnapshotThenSubscribe
1353            if mode == StreamMode::SnapshotThenSubscribe {
1354                assert_matches!(
1355                    receiver.next().await,
1356                    Some(item) if item.as_ref() == msg.bytes()
1357                );
1358            }
1359
1360            // No message should arrive. We can only use a timeout here.
1361            assert!(
1362                OptionFuture::from(Some(receiver.next()))
1363                    .on_timeout(Duration::from_millis(500), || None)
1364                    .await
1365                    .is_none()
1366            );
1367
1368            container.push_back(msg.bytes());
1369
1370            // The message should arrive now.
1371            assert_matches!(
1372                receiver.next().await,
1373                Some(item) if item.as_ref() == msg.bytes()
1374            );
1375        }
1376    }
1377
1378    #[fuchsia::test]
1379    async fn drained_post_termination_cursors() {
1380        let buffer = SharedBuffer::new(
1381            create_ring_buffer(MAX_MESSAGE_SIZE),
1382            Box::new(|_| {}),
1383            Default::default(),
1384            &fuchsia_inspect::Node::default(),
1385        );
1386        let container =
1387            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats()));
1388        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1389
1390        let mut cursor_a =
1391            pin!(container_cursor(&buffer, &container, StreamMode::Subscribe).unwrap());
1392        let mut cursor_b =
1393            pin!(container_cursor(&buffer, &container, StreamMode::SnapshotThenSubscribe).unwrap());
1394
1395        container.push_back(msg.bytes());
1396        container.push_back(msg.bytes());
1397        container.push_back(msg.bytes());
1398        container.push_back(msg.bytes());
1399        container.push_back(msg.bytes());
1400
1401        let mut cursor_c =
1402            pin!(container_cursor(&buffer, &container, StreamMode::Snapshot).unwrap());
1403        assert!(cursor_a.next().await.is_some());
1404        assert!(cursor_b.next().await.is_some());
1405        assert!(cursor_c.next().await.is_some());
1406
1407        drop(buffer.terminate());
1408
1409        // All cursors should return the 4 remaining messages.
1410        assert_eq!(cursor_a.count().await, 4);
1411        assert_eq!(cursor_b.count().await, 4);
1412        assert_eq!(cursor_c.count().await, 4);
1413    }
1414
1415    #[fuchsia::test]
1416    async fn empty_post_termination_cursors() {
1417        let buffer = SharedBuffer::new(
1418            create_ring_buffer(MAX_MESSAGE_SIZE),
1419            Box::new(|_| {}),
1420            Default::default(),
1421            &fuchsia_inspect::Node::default(),
1422        );
1423        let container =
1424            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats()));
1425
1426        let cursor_a = container_cursor(&buffer, &container, StreamMode::Subscribe).unwrap();
1427        let cursor_b =
1428            container_cursor(&buffer, &container, StreamMode::SnapshotThenSubscribe).unwrap();
1429        let cursor_c = container_cursor(&buffer, &container, StreamMode::Snapshot).unwrap();
1430
1431        drop(buffer.terminate());
1432
1433        assert_eq!(cursor_a.count().await, 0);
1434        assert_eq!(cursor_b.count().await, 0);
1435        assert_eq!(cursor_c.count().await, 0);
1436    }
1437
1438    #[fuchsia::test]
1439    async fn recycled_container_slot() {
1440        let buffer = Arc::new(SharedBuffer::new(
1441            create_ring_buffer(MAX_MESSAGE_SIZE),
1442            Box::new(|_| {}),
1443            SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
1444            &fuchsia_inspect::Node::default(),
1445        ));
1446        let container_a =
1447            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats()));
1448        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1449        container_a.push_back(msg.bytes());
1450
1451        let mut cursor = pin!(
1452            container_cursor(&buffer, &container_a, StreamMode::SnapshotThenSubscribe).unwrap()
1453        );
1454        assert_matches!(cursor.next().await, Some(_));
1455
1456        // Roll out all the messages.
1457        let container_b =
1458            Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), test_stats()));
1459        while container_cursor(&buffer, &container_a, StreamMode::Snapshot).unwrap().count().await
1460            > 0
1461        {
1462            container_b.push_back(msg.bytes());
1463
1464            // Yield to the executor to allow messages to be rolled out.
1465            yield_to_executor().await;
1466        }
1467
1468        container_a.terminate();
1469
1470        // This should create a new container that uses a new slot and shouldn't interfere with
1471        // container_a.
1472        let container_c =
1473            Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), test_stats()));
1474        container_c.push_back(msg.bytes());
1475        container_c.push_back(msg.bytes());
1476    }
1477
1478    #[fuchsia::test]
1479    async fn socket_increments_logstats() {
1480        let inspector = Inspector::default();
1481        let identity =
1482            Arc::new(ComponentIdentity::new(ExtendedMoniker::parse_str("./test").unwrap(), ""));
1483        let stats = Arc::new(LogStreamStats::new(inspector.root(), &identity));
1484        let buffer = Arc::new(SharedBuffer::new(
1485            create_ring_buffer(65536),
1486            Box::new(|_| {}),
1487            Default::default(),
1488            &fuchsia_inspect::Node::default(),
1489        ));
1490        let container_a = Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats));
1491        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1492
1493        let (local, remote) = zx::Socket::create_datagram();
1494        container_a.add_socket(remote);
1495
1496        let cursor_a = container_cursor(&buffer, &container_a, StreamMode::Subscribe).unwrap();
1497
1498        // Use FuturesUnordered so that we can make sure that the cursor is woken when a message is
1499        // received (FuturesUnordered uses separate wakers for all the futures it manages).
1500        let mut futures = FuturesUnordered::new();
1501        futures.push(async move {
1502            let mut cursor_a = pin!(cursor_a);
1503            cursor_a.next().await
1504        });
1505        let mut next = futures.next();
1506        assert!(futures::poll!(&mut next).is_pending());
1507
1508        local.write(msg.bytes()).unwrap();
1509
1510        let cursor_b = pin!(container_cursor(&buffer, &container_a, StreamMode::Snapshot).unwrap());
1511
1512        assert_eq!(cursor_b.map(|item| assert_eq!(&*item, msg.bytes())).count().await, 1);
1513
1514        // If cursor_a wasn't woken, this will hang.
1515        next.await;
1516        // Validate logstats (must happen after the socket was handled)
1517        assert_data_tree!(
1518            inspector,
1519            root: contains {
1520                test: {
1521                    url: "",
1522                    last_timestamp: AnyProperty,
1523                    sockets_closed: 0u64,
1524                    sockets_opened: 1u64,
1525                    invalid: {
1526                        number: 0u64,
1527                        bytes: 0u64,
1528                    },
1529                    total: {
1530                        number: 1u64,
1531                        bytes: 88u64,
1532                    },
1533                    rolled_out: {
1534                        number: 0u64,
1535                        bytes: 0u64,
1536                    },
1537                    trace: {
1538                        number: 0u64,
1539                        bytes: 0u64,
1540                    },
1541                    debug: {
1542                        number: 1u64,
1543                        bytes: 88u64,
1544                    },
1545                    info: {
1546                        number: 0u64,
1547                        bytes: 0u64,
1548                    },
1549                    warn: {
1550                        number: 0u64,
1551                        bytes: 0u64,
1552                    },
1553                    error: {
1554                        number: 0u64,
1555                        bytes: 0u64,
1556                    },
1557                    fatal: {
1558                        number: 0u64,
1559                        bytes: 0u64,
1560                    },
1561                }
1562            }
1563        );
1564    }
1565
1566    #[fuchsia::test]
1567    async fn socket() {
1568        let buffer = Arc::new(SharedBuffer::new(
1569            create_ring_buffer(MAX_MESSAGE_SIZE),
1570            Box::new(|_| {}),
1571            Default::default(),
1572            &fuchsia_inspect::Node::default(),
1573        ));
1574        let container_a =
1575            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats()));
1576        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1577
1578        let (local, remote) = zx::Socket::create_datagram();
1579        container_a.add_socket(remote);
1580
1581        let cursor_a = container_cursor(&buffer, &container_a, StreamMode::Subscribe).unwrap();
1582
1583        // Use FuturesUnordered so that we can make sure that the cursor is woken when a message is
1584        // received (FuturesUnordered uses separate wakers for all the futures it manages).
1585        let mut futures = FuturesUnordered::new();
1586        futures.push(async move {
1587            let mut cursor_a = pin!(cursor_a);
1588            cursor_a.next().await
1589        });
1590        let mut next = futures.next();
1591        assert!(futures::poll!(&mut next).is_pending());
1592
1593        local.write(msg.bytes()).unwrap();
1594
1595        let cursor_b = pin!(container_cursor(&buffer, &container_a, StreamMode::Snapshot).unwrap());
1596
1597        assert_eq!(cursor_b.map(|item| assert_eq!(&*item, msg.bytes())).count().await, 1);
1598
1599        // If cursor_a wasn't woken, this will hang.
1600        next.await;
1601    }
1602
1603    #[fuchsia::test]
1604    async fn socket_on_inactive() {
1605        let on_inactive = Arc::new(AtomicU64::new(0));
1606        let a_identity = Arc::new(vec!["a"].into());
1607        let buffer = Arc::new(SharedBuffer::new(
1608            create_ring_buffer(MAX_MESSAGE_SIZE),
1609            {
1610                let on_inactive = Arc::clone(&on_inactive);
1611                let a_identity = Arc::clone(&a_identity);
1612                Box::new(move |id| {
1613                    assert_eq!(id, a_identity);
1614                    on_inactive.fetch_add(1, Ordering::Relaxed);
1615                })
1616            },
1617            SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
1618            &fuchsia_inspect::Node::default(),
1619        ));
1620        let container_a = Arc::new(buffer.new_container_buffer(a_identity, test_stats()));
1621        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1622
1623        let (local, remote) = zx::Socket::create_datagram();
1624        container_a.add_socket(remote);
1625
1626        local.write(msg.bytes()).unwrap();
1627
1628        let cursor = pin!(container_cursor(&buffer, &container_a, StreamMode::Snapshot).unwrap());
1629
1630        assert_eq!(cursor.map(|item| assert_eq!(&*item, msg.bytes())).count().await, 1);
1631
1632        // Now roll out a's messages.
1633        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), test_stats());
1634        while container_cursor(&buffer, &container_a, StreamMode::Snapshot).unwrap().count().await
1635            == 1
1636        {
1637            container_b.push_back(msg.bytes());
1638
1639            // Yield to the executor to allow messages to be rolled out.
1640            yield_to_executor().await;
1641        }
1642
1643        assert_eq!(on_inactive.load(Ordering::Relaxed), 0);
1644
1645        // Close the socket.
1646        std::mem::drop(local);
1647
1648        // We don't know when the socket thread will run so we have to loop.
1649        while on_inactive.load(Ordering::Relaxed) != 1 {
1650            fasync::Timer::new(Duration::from_millis(50)).await;
1651        }
1652    }
1653
1654    #[fuchsia::test]
1655    async fn flush() {
1656        let a_identity = Arc::new(vec!["a"].into());
1657        let buffer = Arc::new(SharedBuffer::new(
1658            create_ring_buffer(1024 * 1024),
1659            Box::new(|_| {}),
1660            Default::default(),
1661            &fuchsia_inspect::Node::default(),
1662        ));
1663        let container_a = Arc::new(buffer.new_container_buffer(a_identity, test_stats()));
1664        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1665
1666        let (local, remote) = zx::Socket::create_datagram();
1667        container_a.add_socket(remote);
1668
1669        let cursor = pin!(container_cursor(&buffer, &container_a, StreamMode::Subscribe).unwrap());
1670
1671        const COUNT: usize = 1000;
1672        for _ in 0..COUNT {
1673            local.write(msg.bytes()).unwrap();
1674        }
1675
1676        // Race two flush futures.
1677        let mut flush_futures = FuturesUnordered::from_iter([buffer.flush(), buffer.flush()]);
1678        flush_futures.next().await;
1679
1680        let messages: Option<Vec<_>> = cursor.take(COUNT).collect().now_or_never();
1681        assert!(messages.is_some());
1682
1683        // Make sure the other one finishes too.
1684        flush_futures.next().await;
1685
1686        // Make sure we can still terminate the buffer.
1687        buffer.terminate().await;
1688    }
1689
1690    #[fuchsia::test]
1691    fn test_slab_next_used() {
1692        let mut slab = Slab::default();
1693        let mut ids: Vec<_> = repeat_with(|| slab.insert(|_| ())).take(4).collect();
1694        ids.sort();
1695        slab.free(ids[0]);
1696        slab.free(ids[2]);
1697        assert_eq!(slab.next_used(0), Some(ids[1]));
1698        assert_eq!(slab.next_used(ids[1]), Some(ids[1]));
1699        assert_eq!(slab.next_used(ids[1] + 1), Some(ids[3]));
1700        assert_eq!(slab.next_used(ids[3] + 1), None);
1701    }
1702}