Skip to main content

archivist_lib/logs/
shared_buffer.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod cursor;
6
7pub use cursor::{FilterCursor, FilterCursorStream, FxtMessage};
8
9use crate::identity::ComponentIdentity;
10use crate::logs::repository::ARCHIVIST_MONIKER;
11use crate::logs::stats::{LogStreamStats, SaturationCurve};
12use derivative::Derivative;
13use diagnostics_log_encoding::encode::add_dropped_count;
14use diagnostics_log_encoding::{Header, TRACING_FORMAT_LOG_RECORD_TYPE};
15use fidl_fuchsia_diagnostics::{ComponentSelector, StreamMode};
16use fidl_fuchsia_logger::MAX_DATAGRAM_LEN_BYTES;
17use fuchsia_async as fasync;
18use fuchsia_async::condition::{Condition, ConditionGuard};
19use fuchsia_inspect::Node;
20use fuchsia_sync::Mutex;
21use futures::channel::oneshot;
22use log::debug;
23use ring_buffer::{self, RingBuffer, ring_buffer_record_len};
24use std::collections::VecDeque;
25use std::mem::ManuallyDrop;
26use std::ops::{Deref, DerefMut, Range};
27use std::sync::{Arc, Weak};
28use std::time::Duration;
29use zerocopy::FromBytes;
30
31// Aim to keep 25% of the buffer free. This is expressed as a fraction: numerator / denominator.
32const SPACE_THRESHOLD_NUMERATOR: usize = 1;
33const SPACE_THRESHOLD_DENOMINATOR: usize = 4;
34
35// The default amount of time that Archivist will sleep for to reduce how often it wakes up to
36// handle log messages.
37const DEFAULT_SLEEP_TIME: Duration = Duration::from_millis(200);
38
39pub fn create_ring_buffer(capacity: usize) -> ring_buffer::Reader {
40    RingBuffer::create(calculate_real_size_given_desired_capacity(capacity))
41}
42
43fn calculate_real_size_given_desired_capacity(capacity: usize) -> usize {
44    // We always keep spare space in the buffer so that we don't drop messages.  This is controlled
45    // by SPACE_THRESHOLD_NUMERATOR & SPACE_THRESHOLD_DENOMINATOR.  Round up capacity so that
46    // `capacity` reflects the actual amount of log data we can store.
47    let page_size = zx::system_get_page_size() as usize;
48    (capacity * SPACE_THRESHOLD_DENOMINATOR
49        / (SPACE_THRESHOLD_DENOMINATOR - SPACE_THRESHOLD_NUMERATOR))
50        .next_multiple_of(page_size)
51}
52
53const IOB_PEER_CLOSED_KEY_BASE: u64 = 0x8000_0000_0000_0000;
54
55pub type OnInactive = Box<dyn Fn(Arc<ComponentIdentity>) + Send + Sync>;
56
57pub struct SharedBuffer {
58    inner: Condition<Inner>,
59
60    // Sockets. This *must* be locked after `inner` and the lock *must* be dropped before
61    // `InnerGuard` is dropped.
62    sockets: Mutex<Slab<Socket>>,
63
64    // Callback for when a container is inactive (i.e. no logs and no sockets).
65    on_inactive: OnInactive,
66
67    // The port that we use to service the sockets.
68    port: zx::Port,
69
70    // Event used for termination.
71    event: zx::Event,
72
73    // The thread that we use to service the sockets.
74    socket_thread: Mutex<Option<std::thread::JoinHandle<()>>>,
75
76    // The task responsible for monitoring the space in the IOBuffer and popping messages when it
77    // gets full. It will also wake cursors whenever new data arrives.
78    _buffer_monitor_task: fasync::Task<()>,
79
80    /// Metric for tracking buffer saturation.
81    pub(crate) saturation_curve: Arc<SaturationCurve>,
82}
83
84struct InnerGuard<'a> {
85    buffer: &'a Arc<SharedBuffer>,
86
87    guard: ManuallyDrop<ConditionGuard<'a, Inner>>,
88
89    // The list of components that should be reported as inactive once the lock is dropped.
90    on_inactive: Vec<Arc<ComponentIdentity>>,
91
92    // If true, wake all the wakers registered with the ConditionGuard.
93    wake: bool,
94}
95
96impl Drop for InnerGuard<'_> {
97    fn drop(&mut self) {
98        if self.wake {
99            for waker in self.guard.drain_wakers() {
100                waker.wake();
101            }
102        }
103        // SAFETY: This is the only place we drop the guard.
104        unsafe {
105            ManuallyDrop::drop(&mut self.guard);
106        }
107        for identity in self.on_inactive.drain(..) {
108            (*self.buffer.on_inactive)(identity);
109        }
110    }
111}
112
113impl Deref for InnerGuard<'_> {
114    type Target = Inner;
115
116    fn deref(&self) -> &Self::Target {
117        &self.guard
118    }
119}
120
121impl DerefMut for InnerGuard<'_> {
122    fn deref_mut(&mut self) -> &mut Self::Target {
123        &mut self.guard
124    }
125}
126
127struct Inner {
128    // The ring buffer.
129    ring_buffer: Arc<RingBuffer>,
130
131    // Registered containers.
132    containers: Containers,
133
134    // Socket thread message queue.
135    thread_msg_queue: VecDeque<ThreadMessage>,
136
137    // The index in the buffer that we have scanned for messages.
138    last_scanned: u64,
139
140    // The ID of the message that was last scanned.
141    last_scanned_message_id: u64,
142
143    // A copy of the tail index in the ring buffer.
144    tail: u64,
145
146    // The ID of the message at the tail.  This is just a counter that increments one per message.
147    tail_message_id: u64,
148
149    // The IOBuffer peers that we must watch.
150    iob_peers: Slab<(ContainerId, zx::Iob)>,
151
152    // True, when the buffer has been terminated.
153    terminated: bool,
154}
155
156enum ThreadMessage {
157    // The thread should terminate.
158    Terminate,
159
160    // Process all pending socket messages and report via the Sender when done.
161    Flush(oneshot::Sender<()>),
162}
163
164pub struct SharedBufferOptions {
165    // To reduce how often Archivist wakes when new messages are written, Archivist will sleep for
166    // this time. This will impact how quickly messages show up via the cursors.
167    pub sleep_time: Duration,
168    /// The size of the circular buffer for the saturation curve.
169    pub saturation_curve_size: usize,
170}
171
172impl Default for SharedBufferOptions {
173    fn default() -> Self {
174        Self { sleep_time: DEFAULT_SLEEP_TIME, saturation_curve_size: 254 }
175    }
176}
177
178impl SharedBuffer {
179    /// Returns a new shared buffer and the container for Archivist.
180    pub fn new(
181        ring_buffer: ring_buffer::Reader,
182        on_inactive: OnInactive,
183        options: SharedBufferOptions,
184        inspect_node: &Node,
185    ) -> Arc<Self> {
186        let saturation_curve =
187            Arc::new(SaturationCurve::new(inspect_node, options.saturation_curve_size));
188        let this = Arc::new_cyclic(|weak: &Weak<Self>| Self {
189            inner: Condition::new(Inner {
190                ring_buffer: Arc::clone(&ring_buffer),
191                containers: Containers::default(),
192                thread_msg_queue: VecDeque::default(),
193                last_scanned: 0,
194                last_scanned_message_id: 0,
195                tail: 0,
196                tail_message_id: 0,
197                iob_peers: Slab::default(),
198                terminated: false,
199            }),
200            sockets: Mutex::new(Slab::default()),
201            on_inactive,
202            port: zx::Port::create(),
203            event: zx::Event::create(),
204            socket_thread: Mutex::default(),
205            saturation_curve,
206            _buffer_monitor_task: fasync::Task::spawn(Self::buffer_monitor_task(
207                Weak::clone(weak),
208                ring_buffer,
209                options.sleep_time,
210            )),
211        });
212
213        *this.socket_thread.lock() = Some({
214            let this = Arc::clone(&this);
215            std::thread::spawn(move || this.socket_thread(options.sleep_time))
216        });
217        this
218    }
219
220    pub fn new_container_buffer(
221        self: &Arc<Self>,
222        identity: Arc<ComponentIdentity>,
223        stats: Arc<LogStreamStats>,
224    ) -> ContainerBuffer {
225        let mut inner = self.inner.lock();
226        let Inner { containers, ring_buffer, .. } = &mut *inner;
227        ContainerBuffer {
228            shared_buffer: Arc::clone(self),
229            container_id: containers.new_container(ring_buffer, Arc::clone(&identity), stats),
230        }
231    }
232
233    pub async fn flush(&self) {
234        let (sender, receiver) = oneshot::channel();
235        self.inner.lock().thread_msg_queue.push_back(ThreadMessage::Flush(sender));
236        self.event.signal(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
237        // Ignore failure if Archivist is shutting down.
238        let _ = receiver.await;
239    }
240
241    /// Returns the number of registered containers in use by the buffer.
242    #[cfg(test)]
243    pub fn container_count(&self) -> usize {
244        self.inner.lock().containers.len()
245    }
246
247    /// Terminates the socket thread. The socket thread will drain the sockets before terminating.
248    /// Returns a future that may be awaited if the caller wants to wait for the socket thread to
249    /// terminate.
250    pub fn terminate(self: &Arc<Self>) -> impl Future<Output = ()> {
251        {
252            let mut inner = InnerGuard::new(self);
253            self.flush_sockets(&mut inner);
254            inner.thread_msg_queue.push_back(ThreadMessage::Terminate);
255        }
256        self.event.signal(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
257        let join_handle = self.socket_thread.lock().take().unwrap();
258        fasync::unblock(|| {
259            let _ = join_handle.join();
260        })
261    }
262
263    /// Returns a cursor that will return messages that match selector. There will be a limited
264    /// attempt to reorder messages by timestamp.  If no selectors are specified, *all* messages
265    /// will be returned.
266    pub fn cursor(
267        self: &Arc<Self>,
268        mode: StreamMode,
269        selectors: Vec<ComponentSelector>,
270    ) -> FilterCursor {
271        InnerGuard::new(self).cursor(mode, selectors)
272    }
273
274    fn socket_thread(self: Arc<Self>, sleep_time: Duration) {
275        const INTERRUPT_KEY: u64 = u64::MAX;
276        let mut sockets_ready = Vec::new();
277        let mut iob_peer_closed = Vec::new();
278        let mut interrupt_needs_arming = true;
279        let mut msg = None;
280
281        loop {
282            let mut deadline = if msg.is_some() {
283                zx::MonotonicInstant::INFINITE_PAST
284            } else {
285                if interrupt_needs_arming {
286                    self.event
287                        .wait_async(
288                            &self.port,
289                            INTERRUPT_KEY,
290                            zx::Signals::USER_0,
291                            zx::WaitAsyncOpts::empty(),
292                        )
293                        .unwrap();
294                    interrupt_needs_arming = false;
295                }
296
297                // Wait so that we're not constantly waking up for every log message that is queued.
298                // Ignore errors here.
299                let _ = self
300                    .event
301                    .wait_one(zx::Signals::USER_0, zx::MonotonicInstant::after(sleep_time.into()));
302                zx::MonotonicInstant::INFINITE
303            };
304
305            // Gather the list of sockets that are ready to read.
306            loop {
307                match self.port.wait(deadline) {
308                    Ok(packet) => {
309                        if packet.key() == INTERRUPT_KEY {
310                            interrupt_needs_arming = true;
311                            // To maintain proper ordering, we must capture the message here whilst
312                            // we are still gathering the list of sockets that are ready to read.
313                            // If we wait till later, we introduce windows where we might miss a
314                            // socket that should be read.
315                            if msg.is_none() {
316                                msg = self.inner.lock().thread_msg_queue.pop_front();
317                            }
318                        } else if packet.key() & IOB_PEER_CLOSED_KEY_BASE != 0 {
319                            iob_peer_closed.push(packet.key() as u32);
320                        } else {
321                            sockets_ready.push(SocketId(packet.key() as u32))
322                        }
323                    }
324                    Err(zx::Status::TIMED_OUT) => break,
325                    Err(status) => panic!("port wait error: {status:?}"),
326                }
327                deadline = zx::MonotonicInstant::INFINITE_PAST;
328            }
329
330            let mut inner = InnerGuard::new(&self);
331
332            if !iob_peer_closed.is_empty() {
333                // See the comment on `is_active()` for why this is required.
334                inner.update_message_ids(inner.ring_buffer.head());
335
336                for iob_peer_closed in iob_peer_closed.drain(..) {
337                    let container_id = inner.iob_peers.free(iob_peer_closed).0;
338                    if let Some(container) = inner.containers.get_mut(container_id) {
339                        container.iob_count -= 1;
340                        if container.iob_count == 0 && !container.is_active() {
341                            if container.should_free() {
342                                inner.containers.free(container_id);
343                            } else {
344                                let identity = Arc::clone(&container.identity);
345                                inner.on_inactive.push(identity);
346                            }
347                        }
348                    }
349                }
350            }
351
352            {
353                let mut sockets = self.sockets.lock();
354                for socket_id in sockets_ready.drain(..) {
355                    inner.read_socket(&mut sockets, socket_id, |socket| {
356                        socket
357                            .socket
358                            .wait_async(
359                                &self.port,
360                                socket_id.0 as u64,
361                                zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
362                                zx::WaitAsyncOpts::empty(),
363                            )
364                            .unwrap();
365                    });
366                }
367            }
368
369            // Now that we've processed the sockets, we can process the message.
370            if let Some(m) = msg.take() {
371                match m {
372                    ThreadMessage::Terminate => {
373                        inner.terminated = true;
374                        inner.wake = true;
375                        return;
376                    }
377                    ThreadMessage::Flush(sender) => {
378                        let _ = sender.send(());
379                    }
380                }
381
382                // See if there's another message.
383                msg = inner.thread_msg_queue.pop_front();
384                if msg.is_none() {
385                    // If there are no more messages, we must clear the signal so that we get
386                    // notified when the next message arrives. This must be done whilst we are
387                    // holding the lock.
388                    self.event.signal(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
389                }
390            }
391        }
392    }
393
394    async fn buffer_monitor_task(
395        this: Weak<Self>,
396        mut ring_buffer: ring_buffer::Reader,
397        sleep_time: Duration,
398    ) {
399        let mut last_head = 0;
400        loop {
401            // Sleep to limit how often we wake up.
402            fasync::Timer::new(sleep_time).await;
403            let head = ring_buffer.wait(last_head).await;
404            let Some(this) = this.upgrade() else { return };
405            let mut inner = InnerGuard::new(&this);
406            inner.check_space(head);
407            last_head = head;
408        }
409    }
410
411    /// Flushes sockets and returns the head index.
412    fn flush_sockets(&self, inner: &mut InnerGuard<'_>) -> u64 {
413        let mut sockets = self.sockets.lock();
414        let mut socket_id = 0;
415        while let Some(next_id) = sockets.next_used(socket_id) {
416            // The socket doesn't need to be rearmed here.
417            inner.read_socket(&mut sockets, SocketId(next_id), |_| {});
418            socket_id = next_id + 1;
419        }
420        let head = inner.ring_buffer.head();
421        inner.update_message_ids(head);
422        head
423    }
424}
425
426impl Inner {
427    // Returns list of containers that no longer have any messages.
428    fn ingest(&mut self, msg: &[u8], container_id: ContainerId) {
429        if msg.len() < std::mem::size_of::<Header>() {
430            debug!("message too short ({})", msg.len());
431            if let Some(container) = self.containers.get(container_id) {
432                container.stats.increment_invalid(msg.len());
433            }
434            return;
435        }
436
437        let header = Header::read_from_bytes(&msg[..std::mem::size_of::<Header>()]).unwrap();
438
439        // NOTE: Some tests send messages that are bigger than the header indicates. We ignore the
440        // tail of any such messages.
441        let msg_len = header.size_words() as usize * 8;
442
443        // Check the correct type and size.
444        if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg.len() < msg_len {
445            debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, msg.len());
446            if let Some(container) = self.containers.get(container_id) {
447                container.stats.increment_invalid(msg.len());
448            }
449            return;
450        }
451
452        let Some(container) = self.containers.get_mut(container_id) else {
453            return;
454        };
455
456        let mut data;
457        let msg = if container.dropped_count > 0 {
458            data = msg.to_vec();
459            if !add_dropped_count(&mut data, container.dropped_count) {
460                debug!("unable to add dropped count to invalid message");
461                container.stats.increment_invalid(data.len());
462                return;
463            }
464            &data
465        } else {
466            msg
467        };
468
469        if container.iob.write(Default::default(), 0, msg).is_err() {
470            // We were unable to write the message to the buffer, most likely due to lack of
471            // space. We drop this message and then we'll add to the dropped count for the next
472            // message.
473            container.dropped_count += 1
474        } else {
475            container.dropped_count = 0;
476        }
477    }
478
479    /// Parses the first message within `range` returns (container_id, message, timestamp).
480    ///
481    /// # Panics
482    ///
483    /// This will panic if the ring buffer has been corrupted. Only the kernel and Archivist can
484    /// write to the ring buffer and so we trust both not to corrupt the ring buffer.
485    ///
486    /// # Safety
487    ///
488    /// `range` *must* be within the written range of the ring buffer so that there is no concurrent
489    /// write access to that range. The returned slice is only valid whilst this remains true.
490    unsafe fn parse_message(
491        &self,
492        range: Range<u64>,
493    ) -> (ContainerId, &[u8], Option<zx::BootInstant>) {
494        let (tag, msg) = unsafe { self.ring_buffer.first_message_in(range) }
495            .expect("Unable to read message from ring buffer");
496        (
497            ContainerId(tag as u32),
498            msg,
499            (msg.len() >= 16)
500                .then(|| zx::BootInstant::from_nanos(i64::read_from_bytes(&msg[8..16]).unwrap())),
501        )
502    }
503}
504
505impl<'a> InnerGuard<'a> {
506    fn new(buffer: &'a Arc<SharedBuffer>) -> Self {
507        Self {
508            buffer,
509            guard: ManuallyDrop::new(buffer.inner.lock()),
510            on_inactive: Vec::new(),
511            wake: false,
512        }
513    }
514
515    /// Pops a message and returns its total size. The caller should call `update_message_ids(head)`
516    /// prior to calling this.
517    ///
518    /// NOTE: This will pop the oldest message in terms of when it was inserted which is *not*
519    /// necessarily the message with the *oldest* timestamp because we might not have received the
520    /// messages in perfect timestamp order. This should be close enough for all use cases we care
521    /// about, and besides, the timestamps can't be trusted anyway.
522    fn pop(&mut self, head: u64) -> Option<usize> {
523        if head == self.tail {
524            return None;
525        }
526
527        // SAFETY: There can be no concurrent writes between `tail..head` and the *only* place
528        // we increment the tail index is just before we leave this function.
529        let record_len = {
530            let (container_id, message, _) = unsafe { self.parse_message(self.tail..head) };
531            let record_len = ring_buffer_record_len(message.len());
532
533            let container = self.containers.get_mut(container_id).unwrap();
534
535            container.stats.increment_rolled_out(record_len);
536            container.msg_ids.start += 1;
537            if !container.is_active() {
538                if container.should_free() {
539                    self.containers.free(container_id);
540                } else {
541                    let identity = Arc::clone(&container.identity);
542                    self.on_inactive.push(identity);
543                }
544            }
545
546            record_len
547        };
548
549        // NOTE: This should go last. After incrementing `tail`, the `message` can be overwritten.
550        self.ring_buffer.increment_tail(record_len);
551        self.tail += record_len as u64;
552        self.tail_message_id += 1;
553
554        // The caller should have called `update_message_ids(head)` prior to calling this.
555        assert!(self.last_scanned >= self.tail);
556
557        Some(record_len)
558    }
559
560    /// Reads a socket. Calls `rearm` to rearm the socket once it has been drained.
561    fn read_socket(
562        &mut self,
563        sockets: &mut Slab<Socket>,
564        socket_id: SocketId,
565        rearm: impl FnOnce(&mut Socket),
566    ) {
567        let Some(socket) = sockets.get_mut(socket_id.0) else { return };
568        let container_id = socket.container_id;
569
570        loop {
571            self.check_space(self.ring_buffer.head());
572
573            let mut data = Vec::with_capacity(MAX_DATAGRAM_LEN_BYTES as usize);
574
575            // Read directly into the buffer leaving space for the header.
576            let len = match socket.socket.read_uninit(data.spare_capacity_mut()) {
577                Ok(d) => d.len(),
578                Err(zx::Status::SHOULD_WAIT) => {
579                    // The socket has been drained.
580                    rearm(socket);
581                    return;
582                }
583                Err(_) => break,
584            };
585
586            // SAFETY: `read_uninit` will have written to `len` bytes.
587            unsafe {
588                data.set_len(len);
589            }
590
591            let container = self.containers.get_mut(container_id).unwrap();
592            if data.len() < 16 {
593                container.stats.increment_invalid(data.len());
594                continue;
595            }
596
597            let header = Header::read_from_bytes(&data[..std::mem::size_of::<Header>()]).unwrap();
598            let msg_len = header.size_words() as usize * 8;
599            if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg_len != data.len() {
600                debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, data.len());
601                container.stats.increment_invalid(data.len());
602                continue;
603            }
604
605            if container.dropped_count > 0 && !add_dropped_count(&mut data, container.dropped_count)
606            {
607                debug!("unable to add dropped count to invalid message");
608                container.stats.increment_invalid(data.len());
609                continue;
610            }
611
612            if container.iob.write(Default::default(), 0, &data).is_err() {
613                // We were unable to write the message to the buffer, most likely due to lack of
614                // space. We drop this message and then we'll add to the dropped count for the next
615                // message.
616                container.dropped_count += 1
617            } else {
618                container.dropped_count = 0;
619            }
620        }
621
622        // This path is taken when the socket should be closed.
623
624        // See the comment on `is_active()` for why this is required.
625        self.update_message_ids(self.ring_buffer.head());
626
627        let container = self.containers.get_mut(container_id).unwrap();
628        container.remove_socket(socket_id, sockets);
629        if !container.is_active() {
630            if container.should_free() {
631                self.containers.free(container_id);
632            } else {
633                let identity = Arc::clone(&container.identity);
634                self.on_inactive.push(identity);
635            }
636        }
637    }
638
639    /// Scans the ring buffer and updates `msg_ids` for the containers.
640    fn update_message_ids(&mut self, head: u64) {
641        while self.last_scanned < head {
642            // SAFETY: This is safe because `head` must be within the ring buffer range and we make
643            // sure that `self.last_scanned` is always >= `tail` in `pop()`.
644            let (container_id, message, _) = unsafe { self.parse_message(self.last_scanned..head) };
645            let msg_len = message.len();
646            let severity = (msg_len >= 8)
647                .then(|| Header::read_from_bytes(&message[0..8]).unwrap().severity().into());
648            let container = self.containers.get_mut(container_id).unwrap();
649            container.msg_ids.end += 1;
650            if let Some(severity) = severity {
651                container.stats.ingest_message(msg_len, severity);
652            }
653            self.last_scanned += ring_buffer_record_len(msg_len) as u64;
654            self.last_scanned_message_id += 1;
655            self.wake = true;
656        }
657        if self.wake {
658            let boot_time = zx::BootInstant::get().into_nanos();
659            let count = self.last_scanned_message_id - self.tail_message_id;
660            self.buffer.saturation_curve.record(boot_time, count);
661        }
662    }
663
664    /// Ensures the buffer keeps the required amount of space.
665    fn check_space(&mut self, head: u64) {
666        self.update_message_ids(head);
667        let capacity = self.ring_buffer.capacity();
668        let mut space = capacity
669            .checked_sub((head - self.tail) as usize)
670            .unwrap_or_else(|| panic!("bad range: {:?}", self.tail..head));
671        let required_space = capacity * SPACE_THRESHOLD_NUMERATOR / SPACE_THRESHOLD_DENOMINATOR;
672        while space < required_space {
673            let Some(amount) = self.pop(head) else { break };
674            space += amount;
675        }
676    }
677
678    /// Returns a cursor.
679    fn cursor(&mut self, mode: StreamMode, selectors: Vec<ComponentSelector>) -> FilterCursor {
680        // NOTE: It is not safe to use on_inactive in this function because this function can be
681        // called whilst locks are held which are the same locks that the on_inactive notification
682        // uses.
683        let (index, message_id) = match mode {
684            StreamMode::Snapshot => (self.tail, self.tail_message_id),
685            StreamMode::Subscribe => {
686                let head = self.ring_buffer.head();
687                self.update_message_ids(head);
688                (self.last_scanned, self.last_scanned_message_id)
689            }
690            StreamMode::SnapshotThenSubscribe => (self.tail, self.tail_message_id),
691        };
692        FilterCursor::new(
693            Arc::clone(self.buffer),
694            index,
695            message_id,
696            matches!(mode, StreamMode::Snapshot),
697            selectors,
698        )
699    }
700}
701
702#[derive(Default)]
703struct Containers {
704    slab: Slab<ContainerInfo>,
705}
706
707#[derive(Clone, Copy, Debug, Eq, PartialEq)]
708struct ContainerId(u32);
709
710impl Containers {
711    #[cfg(test)]
712    fn len(&self) -> usize {
713        self.slab.len()
714    }
715
716    fn get(&self, id: ContainerId) -> Option<&ContainerInfo> {
717        self.slab.get(id.0)
718    }
719
720    fn get_mut(&mut self, id: ContainerId) -> Option<&mut ContainerInfo> {
721        self.slab.get_mut(id.0)
722    }
723
724    fn new_container(
725        &mut self,
726        buffer: &RingBuffer,
727        identity: Arc<ComponentIdentity>,
728        stats: Arc<LogStreamStats>,
729    ) -> ContainerId {
730        ContainerId(self.slab.insert(|id| {
731            let (iob, _) = buffer.new_iob_writer(id as u64).unwrap();
732            ContainerInfo::new(identity, stats, iob)
733        }))
734    }
735
736    fn free(&mut self, id: ContainerId) {
737        self.slab.free(id.0);
738    }
739}
740
741#[derive(Derivative)]
742#[derivative(Debug)]
743struct ContainerInfo {
744    // The identity of the container.
745    identity: Arc<ComponentIdentity>,
746
747    // The first and last message IDs stored in the shared buffer. The last message ID can be out of
748    // date if there are concurrent writers.
749    msg_ids: Range<u64>,
750
751    // Whether the container is terminated.
752    terminated: bool,
753
754    // Inspect instrumentation.
755    #[derivative(Debug = "ignore")]
756    stats: Arc<LogStreamStats>,
757
758    // The first socket ID for this container.
759    first_socket_id: SocketId,
760
761    // The IOBuffer used for writing.
762    iob: zx::Iob,
763
764    // The number of client IOBuffers.
765    iob_count: usize,
766
767    // The number of messages dropped when forwarding from a socket to an IOBuffer.
768    dropped_count: u64,
769}
770
771impl ContainerInfo {
772    fn new(identity: Arc<ComponentIdentity>, stats: Arc<LogStreamStats>, iob: zx::Iob) -> Self {
773        Self {
774            identity,
775            msg_ids: 0..0,
776            terminated: false,
777            stats,
778            first_socket_id: SocketId::NULL,
779            iob,
780            iob_count: 0,
781            dropped_count: 0,
782        }
783    }
784
785    fn should_free(&self) -> bool {
786        self.terminated && !self.is_active()
787    }
788
789    // Returns true if the container is considered active which is the case if it has sockets, io
790    // buffers, or buffered messages.
791    //
792    // NOTE: Whenever a socket or iob is closed, `update_message_ids` must called to ensure
793    // `msg_ids.end` is correctly set.
794    fn is_active(&self) -> bool {
795        self.first_socket_id != SocketId::NULL
796            || self.iob_count > 0
797            || self.msg_ids.end != self.msg_ids.start
798            || ARCHIVIST_MONIKER.get().is_some_and(|m| *self.identity == *m)
799    }
800
801    // # Panics
802    //
803    // This will panic if the socket isn't found.
804    fn remove_socket(&mut self, socket_id: SocketId, sockets: &mut Slab<Socket>) {
805        let Socket { prev, next, .. } = *sockets.get(socket_id.0).unwrap();
806        if prev == SocketId::NULL {
807            self.first_socket_id = next;
808        } else {
809            sockets.get_mut(prev.0).unwrap().next = next;
810        }
811        if next != SocketId::NULL {
812            sockets
813                .get_mut(next.0)
814                .unwrap_or_else(|| panic!("next {next:?} has been freed!"))
815                .prev = prev;
816        }
817        sockets.free(socket_id.0);
818        self.stats.close_socket();
819        debug!(identity:% = self.identity; "Socket closed.");
820    }
821}
822
823pub struct ContainerBuffer {
824    shared_buffer: Arc<SharedBuffer>,
825    container_id: ContainerId,
826}
827
828impl ContainerBuffer {
829    /// Returns the tag used by IOBuffers used for this component.
830    pub fn iob_tag(&self) -> u64 {
831        self.container_id.0 as u64
832    }
833
834    /// Ingests a new message.
835    ///
836    /// If the message is invalid, it is dropped.
837    pub fn push_back(&self, msg: &[u8]) {
838        self.shared_buffer.inner.lock().ingest(msg, self.container_id);
839    }
840
841    /// Returns an IOBuffer for the container.
842    pub fn iob(&self) -> zx::Iob {
843        let mut inner = self.shared_buffer.inner.lock();
844
845        inner.containers.get_mut(self.container_id).unwrap().iob_count += 1;
846
847        let (ep0, ep1) = inner.ring_buffer.new_iob_writer(self.container_id.0 as u64).unwrap();
848
849        inner.iob_peers.insert(|idx| {
850            ep1.wait_async(
851                &self.shared_buffer.port,
852                idx as u64 | IOB_PEER_CLOSED_KEY_BASE,
853                zx::Signals::IOB_PEER_CLOSED,
854                zx::WaitAsyncOpts::empty(),
855            )
856            .unwrap();
857
858            (self.container_id, ep1)
859        });
860
861        ep0
862    }
863
864    /// Returns a cursor for messages that only apply to this container.
865    #[cfg(test)]
866    fn cursor(&self, mode: StreamMode) -> Option<FilterCursorStream<FxtMessage>> {
867        use selectors::SelectorExt;
868
869        // NOTE: It is not safe to use on_inactive in this function because this function can be
870        // called whilst locks are held which are the same locks that the on_inactive notification
871        // uses.
872        let mut inner = InnerGuard::new(&self.shared_buffer);
873        let Some(container) = inner.containers.get_mut(self.container_id) else {
874            // We've hit a race where the container has terminated.
875            return None;
876        };
877        let selectors = vec![container.identity.moniker.clone().into_component_selector()];
878        Some(inner.cursor(mode, selectors).into())
879    }
880
881    /// Marks the buffer as terminated which will force all cursors to end and close all sockets.
882    /// The component's data will remain in the buffer until the messages are rolled out. This will
883    /// *not* drain sockets or close IOBuffers.
884    pub fn terminate(&self) {
885        let mut inner = InnerGuard::new(&self.shared_buffer);
886
887        // See the comment on `is_active()` for why this is required.
888        inner.update_message_ids(inner.ring_buffer.head());
889
890        if let Some(container) = inner.containers.get_mut(self.container_id) {
891            container.terminated = true;
892            if container.first_socket_id != SocketId::NULL {
893                let mut sockets = self.shared_buffer.sockets.lock();
894                loop {
895                    container.remove_socket(container.first_socket_id, &mut sockets);
896                    if container.first_socket_id == SocketId::NULL {
897                        break;
898                    }
899                }
900            }
901            if container.should_free() {
902                inner.containers.free(self.container_id);
903            }
904            inner.wake = true;
905        }
906    }
907
908    /// Returns true if the container has messages, sockets or IOBuffers.
909    pub fn is_active(&self) -> bool {
910        self.shared_buffer
911            .inner
912            .lock()
913            .containers
914            .get(self.container_id)
915            .is_some_and(|c| c.is_active())
916    }
917
918    /// Adds a socket for this container.
919    pub fn add_socket(&self, socket: zx::Socket) {
920        let mut inner = self.shared_buffer.inner.lock();
921        let Some(container) = inner.containers.get_mut(self.container_id) else { return };
922        container.stats.open_socket();
923        let next = container.first_socket_id;
924        let mut sockets = self.shared_buffer.sockets.lock();
925        let socket_id = SocketId(sockets.insert(|socket_id| {
926            socket
927                .wait_async(
928                    &self.shared_buffer.port,
929                    socket_id as u64,
930                    zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
931                    zx::WaitAsyncOpts::empty(),
932                )
933                .unwrap();
934            Socket { socket, container_id: self.container_id, prev: SocketId::NULL, next }
935        }));
936        if next != SocketId::NULL {
937            sockets.get_mut(next.0).unwrap().prev = socket_id;
938        }
939        container.first_socket_id = socket_id;
940    }
941}
942
943impl Drop for ContainerBuffer {
944    fn drop(&mut self) {
945        self.terminate();
946    }
947}
948
949/// Implements a simple Slab allocator.
950struct Slab<T> {
951    slab: Vec<Slot<T>>,
952    free_index: usize,
953}
954
955impl<T> Default for Slab<T> {
956    fn default() -> Self {
957        Self { slab: Vec::new(), free_index: usize::MAX }
958    }
959}
960
961enum Slot<T> {
962    Used(T),
963    Free(usize),
964}
965
966impl<T> Slab<T> {
967    /// Returns the number of used entries. This is not performant.
968    #[cfg(test)]
969    fn len(&self) -> usize {
970        self.slab.iter().filter(|c| matches!(c, Slot::Used(_))).count()
971    }
972
973    fn free(&mut self, index: u32) -> T {
974        let index = index as usize;
975        let value = match std::mem::replace(&mut self.slab[index], Slot::Free(self.free_index)) {
976            Slot::Free(_) => panic!("Slot already free"),
977            Slot::Used(value) => value,
978        };
979        self.free_index = index;
980        value
981    }
982
983    fn get(&self, id: u32) -> Option<&T> {
984        self.slab.get(id as usize).and_then(|s| match s {
985            Slot::Used(s) => Some(s),
986            _ => None,
987        })
988    }
989
990    fn get_mut(&mut self, id: u32) -> Option<&mut T> {
991        self.slab.get_mut(id as usize).and_then(|s| match s {
992            Slot::Used(s) => Some(s),
993            _ => None,
994        })
995    }
996
997    fn insert(&mut self, value: impl FnOnce(u32) -> T) -> u32 {
998        let free_index = self.free_index;
999        if free_index != usize::MAX {
1000            self.free_index = match std::mem::replace(
1001                &mut self.slab[free_index],
1002                Slot::Used(value(free_index as u32)),
1003            ) {
1004                Slot::Free(next) => next,
1005                _ => unreachable!(),
1006            };
1007            free_index as u32
1008        } else {
1009            // This is < rather than <= because we reserve 0xffff_ffff to be used as a NULL value
1010            // (see SocketId::NULL below).
1011            assert!(self.slab.len() < u32::MAX as usize);
1012            self.slab.push(Slot::Used(value(self.slab.len() as u32)));
1013            (self.slab.len() - 1) as u32
1014        }
1015    }
1016
1017    /// Returns the next used slot starting from `id` (inclusive).
1018    fn next_used(&self, id: u32) -> Option<u32> {
1019        let mut id = id as usize;
1020        while id < self.slab.len() {
1021            if matches!(self.slab[id], Slot::Used(_)) {
1022                return Some(id as u32);
1023            }
1024            id += 1;
1025        }
1026        None
1027    }
1028}
1029
1030#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1031struct SocketId(u32);
1032
1033impl SocketId {
1034    const NULL: Self = SocketId(0xffff_ffff);
1035}
1036
1037struct Socket {
1038    socket: zx::Socket,
1039    container_id: ContainerId,
1040    // Sockets are stored as a linked list for each container.
1041    prev: SocketId,
1042    next: SocketId,
1043}
1044
1045#[cfg(test)]
1046mod tests {
1047    use super::{SharedBuffer, SharedBufferOptions, Slab, create_ring_buffer};
1048    use crate::logs::stats::LogStreamStats;
1049    use crate::logs::testing::make_message;
1050    use assert_matches::assert_matches;
1051    use diagnostics_assertions::{AnyProperty, assert_data_tree};
1052    use fidl_fuchsia_diagnostics::StreamMode;
1053    use fuchsia_async as fasync;
1054    use fuchsia_async::TimeoutExt;
1055    use fuchsia_inspect::Inspector;
1056    use fuchsia_inspect_derive::WithInspect;
1057    use futures::FutureExt;
1058    use futures::channel::mpsc;
1059    use futures::future::OptionFuture;
1060    use futures::stream::{FuturesUnordered, StreamExt as _};
1061    use ring_buffer::MAX_MESSAGE_SIZE;
1062    use std::future::poll_fn;
1063    use std::iter::repeat_with;
1064    use std::pin::pin;
1065    use std::sync::Arc;
1066    use std::sync::atomic::{AtomicU64, Ordering};
1067    use std::task::Poll;
1068    use std::time::Duration;
1069
1070    async fn yield_to_executor() {
1071        let mut first_time = true;
1072        poll_fn(|cx| {
1073            if first_time {
1074                cx.waker().wake_by_ref();
1075                first_time = false;
1076                Poll::Pending
1077            } else {
1078                Poll::Ready(())
1079            }
1080        })
1081        .await;
1082    }
1083
1084    #[fuchsia::test]
1085    async fn push_one_message() {
1086        let buffer = SharedBuffer::new(
1087            create_ring_buffer(MAX_MESSAGE_SIZE),
1088            Box::new(|_| {}),
1089            Default::default(),
1090            &fuchsia_inspect::Node::default(),
1091        );
1092        let container_buffer =
1093            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1094        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1095        container_buffer.push_back(msg.bytes());
1096
1097        // Make sure the cursor can find it.
1098        let cursor = container_buffer.cursor(StreamMode::Snapshot).unwrap();
1099        assert_eq!(cursor.map(|item| assert_eq!(item.data(), msg.bytes())).count().await, 1);
1100    }
1101
1102    #[fuchsia::test]
1103    async fn message_too_short() {
1104        let buffer = SharedBuffer::new(
1105            create_ring_buffer(MAX_MESSAGE_SIZE),
1106            Box::new(|_| {}),
1107            Default::default(),
1108            &fuchsia_inspect::Node::default(),
1109        );
1110
1111        let container_buffer =
1112            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1113        container_buffer.push_back(&[0]);
1114
1115        assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1116    }
1117
1118    #[fuchsia::test]
1119    async fn bad_type() {
1120        let buffer = SharedBuffer::new(
1121            create_ring_buffer(MAX_MESSAGE_SIZE),
1122            Box::new(|_| {}),
1123            Default::default(),
1124            &fuchsia_inspect::Node::default(),
1125        );
1126        let container_buffer =
1127            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1128        container_buffer.push_back(&[0x77; 16]);
1129
1130        assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1131    }
1132
1133    #[fuchsia::test]
1134    async fn message_truncated() {
1135        let buffer = SharedBuffer::new(
1136            create_ring_buffer(MAX_MESSAGE_SIZE),
1137            Box::new(|_| {}),
1138            Default::default(),
1139            &fuchsia_inspect::Node::default(),
1140        );
1141        let container_buffer =
1142            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1143        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1144        container_buffer.push_back(&msg.bytes()[..msg.bytes().len() - 1]);
1145
1146        assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1147    }
1148
1149    #[fuchsia::test]
1150    async fn buffer_wrapping() {
1151        let buffer = SharedBuffer::new(
1152            create_ring_buffer(MAX_MESSAGE_SIZE),
1153            Box::new(|_| {}),
1154            SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
1155            &fuchsia_inspect::Node::default(),
1156        );
1157        let container_buffer =
1158            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1159
1160        // Keep writing messages until we wrap.
1161        let mut i = 0;
1162        loop {
1163            let msg = make_message(&format!("{i}"), None, zx::BootInstant::from_nanos(i));
1164            container_buffer.push_back(msg.bytes());
1165            i += 1;
1166
1167            // Yield to the executor to allow messages to be rolled out.
1168            yield_to_executor().await;
1169
1170            let inner = buffer.inner.lock();
1171            if inner.ring_buffer.head() > inner.ring_buffer.capacity() as u64 {
1172                break;
1173            }
1174        }
1175
1176        // Read back all the messages.
1177        let mut cursor = pin!(container_buffer.cursor(StreamMode::Snapshot).unwrap());
1178
1179        let mut j;
1180        let item = cursor.next().await;
1181        assert_matches!(
1182            item,
1183            Some(item) => {
1184                j = item.timestamp().into_nanos();
1185                let msg = make_message(&format!("{j}"),
1186                                       None,
1187                                       item.timestamp());
1188                assert_eq!(item.data(), msg.bytes());
1189            }
1190        );
1191
1192        j += 1;
1193        while j != i {
1194            assert_matches!(
1195                cursor.next().await,
1196                Some(item) => {
1197                    assert_eq!(item.data(), make_message(&format!("{j}"),
1198                                                         None,
1199                                                         item.timestamp()).bytes());
1200                }
1201            );
1202            j += 1;
1203        }
1204
1205        assert!(cursor.next().await.is_none());
1206    }
1207
1208    #[fuchsia::test]
1209    async fn on_inactive() {
1210        let identity = Arc::new(vec!["a"].into());
1211        let on_inactive = Arc::new(AtomicU64::new(0));
1212        let buffer = {
1213            let on_inactive = Arc::clone(&on_inactive);
1214            let identity = Arc::clone(&identity);
1215            Arc::new(SharedBuffer::new(
1216                create_ring_buffer(MAX_MESSAGE_SIZE),
1217                Box::new(move |i| {
1218                    assert_eq!(i, identity);
1219                    on_inactive.fetch_add(1, Ordering::Relaxed);
1220                }),
1221                SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
1222                &fuchsia_inspect::Node::default(),
1223            ))
1224        };
1225        let container_a = buffer.new_container_buffer(identity, Arc::default());
1226        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1227
1228        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1229        container_a.push_back(msg.bytes());
1230
1231        // Repeatedly write messages to b until a is rolled out.
1232        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1233            container_b.push_back(msg.bytes());
1234
1235            // Yield to the executor to allow messages to be rolled out.
1236            yield_to_executor().await;
1237        }
1238
1239        assert_eq!(on_inactive.load(Ordering::Relaxed), 1);
1240    }
1241
1242    #[fuchsia::test]
1243    async fn terminate_drops_container() {
1244        // Silence a clippy warning; SharedBuffer needs an executor.
1245        async {}.await;
1246
1247        let buffer = SharedBuffer::new(
1248            create_ring_buffer(MAX_MESSAGE_SIZE),
1249            Box::new(|_| {}),
1250            SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
1251            &fuchsia_inspect::Node::default(),
1252        );
1253
1254        // terminate when buffer has no logs.
1255        let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1256        assert_eq!(buffer.container_count(), 1);
1257        container_a.terminate();
1258
1259        assert_eq!(buffer.container_count(), 0);
1260
1261        // terminate when buffer has logs.
1262        let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1263        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1264        container_a.push_back(msg.bytes());
1265        assert_eq!(buffer.container_count(), 1);
1266        container_a.terminate();
1267
1268        // The container should still be there because it has logs.
1269        assert_eq!(buffer.container_count(), 1);
1270
1271        // Roll out the logs.
1272        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1273        assert_eq!(buffer.container_count(), 2);
1274
1275        // Repeatedly write messages to b until a's message is dropped and then the container will
1276        // be dropped.
1277        while buffer.container_count() != 1 {
1278            container_b.push_back(msg.bytes());
1279
1280            // Yield to the executor to allow messages to be rolled out.
1281            yield_to_executor().await;
1282        }
1283
1284        assert!(container_a.cursor(StreamMode::Subscribe).is_none());
1285    }
1286
1287    #[fuchsia::test]
1288    async fn cursor_subscribe() {
1289        for mode in [StreamMode::Subscribe, StreamMode::SnapshotThenSubscribe] {
1290            let buffer = SharedBuffer::new(
1291                create_ring_buffer(MAX_MESSAGE_SIZE),
1292                Box::new(|_| {}),
1293                Default::default(),
1294                &fuchsia_inspect::Node::default(),
1295            );
1296            let container =
1297                Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1298            let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1299            container.push_back(msg.bytes());
1300
1301            let (sender, mut receiver) = mpsc::unbounded();
1302
1303            // Run the cursor in a separate task so that we can test it gets woken correctly.
1304            {
1305                let container = Arc::clone(&container);
1306                fasync::Task::spawn(async move {
1307                    let mut cursor = pin!(container.cursor(mode).unwrap());
1308                    while let Some(item) = cursor.next().await {
1309                        sender.unbounded_send(item).unwrap();
1310                    }
1311                })
1312                .detach();
1313            }
1314
1315            // The existing message should only be returned with SnapshotThenSubscribe
1316            if mode == StreamMode::SnapshotThenSubscribe {
1317                assert_matches!(
1318                    receiver.next().await,
1319                    Some(item) if item.data() == msg.bytes()
1320                );
1321            }
1322
1323            // No message should arrive. We can only use a timeout here.
1324            assert!(
1325                OptionFuture::from(Some(receiver.next()))
1326                    .on_timeout(Duration::from_millis(500), || None)
1327                    .await
1328                    .is_none()
1329            );
1330
1331            container.push_back(msg.bytes());
1332
1333            // The message should arrive now.
1334            assert_matches!(
1335                receiver.next().await,
1336                Some(item) if item.data() == msg.bytes()
1337            );
1338        }
1339    }
1340
1341    #[fuchsia::test]
1342    async fn drained_post_termination_cursors() {
1343        let buffer = SharedBuffer::new(
1344            create_ring_buffer(MAX_MESSAGE_SIZE),
1345            Box::new(|_| {}),
1346            Default::default(),
1347            &fuchsia_inspect::Node::default(),
1348        );
1349        let container =
1350            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1351        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1352
1353        let mut cursor_a = pin!(container.cursor(StreamMode::Subscribe).unwrap());
1354        let mut cursor_b = pin!(container.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1355
1356        container.push_back(msg.bytes());
1357        container.push_back(msg.bytes());
1358        container.push_back(msg.bytes());
1359        container.push_back(msg.bytes());
1360        container.push_back(msg.bytes());
1361
1362        let mut cursor_c = pin!(container.cursor(StreamMode::Snapshot).unwrap());
1363        assert!(cursor_a.next().await.is_some());
1364        assert!(cursor_b.next().await.is_some());
1365        assert!(cursor_c.next().await.is_some());
1366
1367        drop(buffer.terminate());
1368
1369        // All cursors should return the 4 remaining messages.
1370        assert_eq!(cursor_a.count().await, 4);
1371        assert_eq!(cursor_b.count().await, 4);
1372        assert_eq!(cursor_c.count().await, 4);
1373    }
1374
1375    #[fuchsia::test]
1376    async fn empty_post_termination_cursors() {
1377        let buffer = SharedBuffer::new(
1378            create_ring_buffer(MAX_MESSAGE_SIZE),
1379            Box::new(|_| {}),
1380            Default::default(),
1381            &fuchsia_inspect::Node::default(),
1382        );
1383        let container =
1384            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1385
1386        let cursor_a = container.cursor(StreamMode::Subscribe).unwrap();
1387        let cursor_b = container.cursor(StreamMode::SnapshotThenSubscribe).unwrap();
1388        let cursor_c = container.cursor(StreamMode::Snapshot).unwrap();
1389
1390        drop(buffer.terminate());
1391
1392        assert_eq!(cursor_a.count().await, 0);
1393        assert_eq!(cursor_b.count().await, 0);
1394        assert_eq!(cursor_c.count().await, 0);
1395    }
1396
1397    #[fuchsia::test]
1398    async fn recycled_container_slot() {
1399        let buffer = Arc::new(SharedBuffer::new(
1400            create_ring_buffer(MAX_MESSAGE_SIZE),
1401            Box::new(|_| {}),
1402            SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
1403            &fuchsia_inspect::Node::default(),
1404        ));
1405        let container_a =
1406            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1407        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1408        container_a.push_back(msg.bytes());
1409
1410        let mut cursor = pin!(container_a.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1411        assert_matches!(cursor.next().await, Some(_));
1412
1413        // Roll out all the messages.
1414        let container_b =
1415            Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1416        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await > 0 {
1417            container_b.push_back(msg.bytes());
1418
1419            // Yield to the executor to allow messages to be rolled out.
1420            yield_to_executor().await;
1421        }
1422
1423        container_a.terminate();
1424
1425        // This should create a new container that uses a new slot and shouldn't interfere with
1426        // container_a.
1427        let container_c =
1428            Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), Arc::default()));
1429        container_c.push_back(msg.bytes());
1430        container_c.push_back(msg.bytes());
1431    }
1432
1433    #[fuchsia::test]
1434    async fn socket_increments_logstats() {
1435        let inspector = Inspector::default();
1436        let stats: Arc<LogStreamStats> =
1437            Arc::new(LogStreamStats::default().with_inspect(inspector.root(), "test").unwrap());
1438        let buffer = Arc::new(SharedBuffer::new(
1439            create_ring_buffer(65536),
1440            Box::new(|_| {}),
1441            Default::default(),
1442            &fuchsia_inspect::Node::default(),
1443        ));
1444        let container_a = Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats));
1445        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1446
1447        let (local, remote) = zx::Socket::create_datagram();
1448        container_a.add_socket(remote);
1449
1450        let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1451
1452        // Use FuturesUnordered so that we can make sure that the cursor is woken when a message is
1453        // received (FuturesUnordered uses separate wakers for all the futures it manages).
1454        let mut futures = FuturesUnordered::new();
1455        futures.push(async move {
1456            let mut cursor_a = pin!(cursor_a);
1457            cursor_a.next().await
1458        });
1459        let mut next = futures.next();
1460        assert!(futures::poll!(&mut next).is_pending());
1461
1462        local.write(msg.bytes()).unwrap();
1463
1464        let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1465
1466        assert_eq!(cursor_b.map(|item| assert_eq!(item.data(), msg.bytes())).count().await, 1);
1467
1468        // If cursor_a wasn't woken, this will hang.
1469        next.await;
1470        // Validate logstats (must happen after the socket was handled)
1471        assert_data_tree!(
1472            inspector,
1473            root: contains {
1474                test: {
1475                    url: "",
1476                    last_timestamp: AnyProperty,
1477                    sockets_closed: 0u64,
1478                    sockets_opened: 1u64,
1479                    invalid: {
1480                        number: 0u64,
1481                        bytes: 0u64,
1482                    },
1483                    total: {
1484                        number: 1u64,
1485                        bytes: 88u64,
1486                    },
1487                    rolled_out: {
1488                        number: 0u64,
1489                        bytes: 0u64,
1490                    },
1491                    trace: {
1492                        number: 0u64,
1493                        bytes: 0u64,
1494                    },
1495                    debug: {
1496                        number: 1u64,
1497                        bytes: 88u64,
1498                    },
1499                    info: {
1500                        number: 0u64,
1501                        bytes: 0u64,
1502                    },
1503                    warn: {
1504                        number: 0u64,
1505                        bytes: 0u64,
1506                    },
1507                    error: {
1508                        number: 0u64,
1509                        bytes: 0u64,
1510                    },
1511                    fatal: {
1512                        number: 0u64,
1513                        bytes: 0u64,
1514                    },
1515                }
1516            }
1517        );
1518    }
1519
1520    #[fuchsia::test]
1521    async fn socket() {
1522        let buffer = Arc::new(SharedBuffer::new(
1523            create_ring_buffer(MAX_MESSAGE_SIZE),
1524            Box::new(|_| {}),
1525            Default::default(),
1526            &fuchsia_inspect::Node::default(),
1527        ));
1528        let container_a =
1529            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1530        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1531
1532        let (local, remote) = zx::Socket::create_datagram();
1533        container_a.add_socket(remote);
1534
1535        let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1536
1537        // Use FuturesUnordered so that we can make sure that the cursor is woken when a message is
1538        // received (FuturesUnordered uses separate wakers for all the futures it manages).
1539        let mut futures = FuturesUnordered::new();
1540        futures.push(async move {
1541            let mut cursor_a = pin!(cursor_a);
1542            cursor_a.next().await
1543        });
1544        let mut next = futures.next();
1545        assert!(futures::poll!(&mut next).is_pending());
1546
1547        local.write(msg.bytes()).unwrap();
1548
1549        let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1550
1551        assert_eq!(cursor_b.map(|item| assert_eq!(item.data(), msg.bytes())).count().await, 1);
1552
1553        // If cursor_a wasn't woken, this will hang.
1554        next.await;
1555    }
1556
1557    #[fuchsia::test]
1558    async fn socket_on_inactive() {
1559        let on_inactive = Arc::new(AtomicU64::new(0));
1560        let a_identity = Arc::new(vec!["a"].into());
1561        let buffer = Arc::new(SharedBuffer::new(
1562            create_ring_buffer(MAX_MESSAGE_SIZE),
1563            {
1564                let on_inactive = Arc::clone(&on_inactive);
1565                let a_identity = Arc::clone(&a_identity);
1566                Box::new(move |id| {
1567                    assert_eq!(id, a_identity);
1568                    on_inactive.fetch_add(1, Ordering::Relaxed);
1569                })
1570            },
1571            SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
1572            &fuchsia_inspect::Node::default(),
1573        ));
1574        let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1575        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1576
1577        let (local, remote) = zx::Socket::create_datagram();
1578        container_a.add_socket(remote);
1579
1580        local.write(msg.bytes()).unwrap();
1581
1582        let cursor = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1583
1584        assert_eq!(cursor.map(|item| assert_eq!(item.data(), msg.bytes())).count().await, 1);
1585
1586        // Now roll out a's messages.
1587        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1588        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1589            container_b.push_back(msg.bytes());
1590
1591            // Yield to the executor to allow messages to be rolled out.
1592            yield_to_executor().await;
1593        }
1594
1595        assert_eq!(on_inactive.load(Ordering::Relaxed), 0);
1596
1597        // Close the socket.
1598        std::mem::drop(local);
1599
1600        // We don't know when the socket thread will run so we have to loop.
1601        while on_inactive.load(Ordering::Relaxed) != 1 {
1602            fasync::Timer::new(Duration::from_millis(50)).await;
1603        }
1604    }
1605
1606    #[fuchsia::test]
1607    async fn flush() {
1608        let a_identity = Arc::new(vec!["a"].into());
1609        let buffer = Arc::new(SharedBuffer::new(
1610            create_ring_buffer(1024 * 1024),
1611            Box::new(|_| {}),
1612            Default::default(),
1613            &fuchsia_inspect::Node::default(),
1614        ));
1615        let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1616        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1617
1618        let (local, remote) = zx::Socket::create_datagram();
1619        container_a.add_socket(remote);
1620
1621        let cursor = pin!(container_a.cursor(StreamMode::Subscribe).unwrap());
1622
1623        const COUNT: usize = 1000;
1624        for _ in 0..COUNT {
1625            local.write(msg.bytes()).unwrap();
1626        }
1627
1628        // Race two flush futures.
1629        let mut flush_futures = FuturesUnordered::from_iter([buffer.flush(), buffer.flush()]);
1630        flush_futures.next().await;
1631
1632        let messages: Option<Vec<_>> = cursor.take(COUNT).collect().now_or_never();
1633        assert!(messages.is_some());
1634
1635        // Make sure the other one finishes too.
1636        flush_futures.next().await;
1637
1638        // Make sure we can still terminate the buffer.
1639        buffer.terminate().await;
1640    }
1641
1642    #[fuchsia::test]
1643    fn test_slab_next_used() {
1644        let mut slab = Slab::default();
1645        let mut ids: Vec<_> = repeat_with(|| slab.insert(|_| ())).take(4).collect();
1646        ids.sort();
1647        slab.free(ids[0]);
1648        slab.free(ids[2]);
1649        assert_eq!(slab.next_used(0), Some(ids[1]));
1650        assert_eq!(slab.next_used(ids[1]), Some(ids[1]));
1651        assert_eq!(slab.next_used(ids[1] + 1), Some(ids[3]));
1652        assert_eq!(slab.next_used(ids[3] + 1), None);
1653    }
1654}