archivist_lib/logs/
shared_buffer.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod cursor;
6
7pub use cursor::{FilterCursor, FilterCursorStream, FxtMessage};
8
9use crate::identity::ComponentIdentity;
10use crate::logs::repository::ARCHIVIST_MONIKER;
11use crate::logs::stats::LogStreamStats;
12use derivative::Derivative;
13use diagnostics_log_encoding::encode::add_dropped_count;
14use diagnostics_log_encoding::{Header, TRACING_FORMAT_LOG_RECORD_TYPE};
15use fidl_fuchsia_diagnostics::{ComponentSelector, StreamMode};
16use fidl_fuchsia_logger::MAX_DATAGRAM_LEN_BYTES;
17use fuchsia_async as fasync;
18use fuchsia_async::condition::{Condition, ConditionGuard};
19use fuchsia_sync::Mutex;
20use futures::channel::oneshot;
21use log::debug;
22use ring_buffer::{self, RingBuffer, ring_buffer_record_len};
23use std::collections::VecDeque;
24use std::mem::ManuallyDrop;
25use std::ops::{Deref, DerefMut, Range};
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use zerocopy::FromBytes;
29
30// Aim to keep 25% of the buffer free. This is expressed as a fraction: numerator / denominator.
31const SPACE_THRESHOLD_NUMERATOR: usize = 1;
32const SPACE_THRESHOLD_DENOMINATOR: usize = 4;
33
34// The default amount of time that Archivist will sleep for to reduce how often it wakes up to
35// handle log messages.
36const DEFAULT_SLEEP_TIME: Duration = Duration::from_millis(200);
37
38pub fn create_ring_buffer(capacity: usize) -> ring_buffer::Reader {
39    RingBuffer::create(calculate_real_size_given_desired_capacity(capacity))
40}
41
42fn calculate_real_size_given_desired_capacity(capacity: usize) -> usize {
43    // We always keep spare space in the buffer so that we don't drop messages.  This is controlled
44    // by SPACE_THRESHOLD_NUMERATOR & SPACE_THRESHOLD_DENOMINATOR.  Round up capacity so that
45    // `capacity` reflects the actual amount of log data we can store.
46    let page_size = zx::system_get_page_size() as usize;
47    (capacity * SPACE_THRESHOLD_DENOMINATOR
48        / (SPACE_THRESHOLD_DENOMINATOR - SPACE_THRESHOLD_NUMERATOR))
49        .next_multiple_of(page_size)
50}
51
52const IOB_PEER_CLOSED_KEY_BASE: u64 = 0x8000_0000_0000_0000;
53
54pub type OnInactive = Box<dyn Fn(Arc<ComponentIdentity>) + Send + Sync>;
55
56pub struct SharedBuffer {
57    inner: Condition<Inner>,
58
59    // Sockets. This *must* be locked after `inner` and the lock *must* be dropped before
60    // `InnerGuard` is dropped.
61    sockets: Mutex<Slab<Socket>>,
62
63    // Callback for when a container is inactive (i.e. no logs and no sockets).
64    on_inactive: OnInactive,
65
66    // The port that we use to service the sockets.
67    port: zx::Port,
68
69    // Event used for termination.
70    event: zx::Event,
71
72    // The thread that we use to service the sockets.
73    socket_thread: Mutex<Option<std::thread::JoinHandle<()>>>,
74
75    // The task responsible for monitoring the space in the IOBuffer and popping messages when it
76    // gets full. It will also wake cursors whenever new data arrives.
77    _buffer_monitor_task: fasync::Task<()>,
78}
79
80struct InnerGuard<'a> {
81    buffer: &'a Arc<SharedBuffer>,
82
83    guard: ManuallyDrop<ConditionGuard<'a, Inner>>,
84
85    // The list of components that should be reported as inactive once the lock is dropped.
86    on_inactive: Vec<Arc<ComponentIdentity>>,
87
88    // If true, wake all the wakers registered with the ConditionGuard.
89    wake: bool,
90}
91
92impl Drop for InnerGuard<'_> {
93    fn drop(&mut self) {
94        if self.wake {
95            for waker in self.guard.drain_wakers() {
96                waker.wake();
97            }
98        }
99        // SAFETY: This is the only place we drop the guard.
100        unsafe {
101            ManuallyDrop::drop(&mut self.guard);
102        }
103        for identity in self.on_inactive.drain(..) {
104            (*self.buffer.on_inactive)(identity);
105        }
106    }
107}
108
109impl Deref for InnerGuard<'_> {
110    type Target = Inner;
111
112    fn deref(&self) -> &Self::Target {
113        &self.guard
114    }
115}
116
117impl DerefMut for InnerGuard<'_> {
118    fn deref_mut(&mut self) -> &mut Self::Target {
119        &mut self.guard
120    }
121}
122
123struct Inner {
124    // The ring buffer.
125    ring_buffer: Arc<RingBuffer>,
126
127    // Registered containers.
128    containers: Containers,
129
130    // Socket thread message queue.
131    thread_msg_queue: VecDeque<ThreadMessage>,
132
133    // The index in the buffer that we have scanned for messages.
134    last_scanned: u64,
135
136    // The ID of the message that was last scanned.
137    last_scanned_message_id: u64,
138
139    // A copy of the tail index in the ring buffer.
140    tail: u64,
141
142    // The ID of the message at the tail.  This is just a counter that increments one per message.
143    tail_message_id: u64,
144
145    // The IOBuffer peers that we must watch.
146    iob_peers: Slab<(ContainerId, zx::Iob)>,
147
148    // True, when the buffer has been terminated.
149    terminated: bool,
150}
151
152enum ThreadMessage {
153    // The thread should terminate.
154    Terminate,
155
156    // Process all pending socket messages and report via the Sender when done.
157    Flush(oneshot::Sender<()>),
158}
159
160pub struct SharedBufferOptions {
161    // To reduce how often Archivist wakes when new messages are written, Archivist will sleep for
162    // this time. This will impact how quickly messages show up via the cursors.
163    pub sleep_time: Duration,
164}
165
166impl Default for SharedBufferOptions {
167    fn default() -> Self {
168        Self { sleep_time: DEFAULT_SLEEP_TIME }
169    }
170}
171
172impl SharedBuffer {
173    /// Returns a new shared buffer and the container for Archivist.
174    pub fn new(
175        ring_buffer: ring_buffer::Reader,
176        on_inactive: OnInactive,
177        options: SharedBufferOptions,
178    ) -> Arc<Self> {
179        let this = Arc::new_cyclic(|weak: &Weak<Self>| Self {
180            inner: Condition::new(Inner {
181                ring_buffer: Arc::clone(&ring_buffer),
182                containers: Containers::default(),
183                thread_msg_queue: VecDeque::default(),
184                last_scanned: 0,
185                last_scanned_message_id: 0,
186                tail: 0,
187                tail_message_id: 0,
188                iob_peers: Slab::default(),
189                terminated: false,
190            }),
191            sockets: Mutex::new(Slab::default()),
192            on_inactive,
193            port: zx::Port::create(),
194            event: zx::Event::create(),
195            socket_thread: Mutex::default(),
196            _buffer_monitor_task: fasync::Task::spawn(Self::buffer_monitor_task(
197                Weak::clone(weak),
198                ring_buffer,
199                options.sleep_time,
200            )),
201        });
202
203        *this.socket_thread.lock() = Some({
204            let this = Arc::clone(&this);
205            std::thread::spawn(move || this.socket_thread(options.sleep_time))
206        });
207        this
208    }
209
210    pub fn new_container_buffer(
211        self: &Arc<Self>,
212        identity: Arc<ComponentIdentity>,
213        stats: Arc<LogStreamStats>,
214    ) -> ContainerBuffer {
215        let mut inner = self.inner.lock();
216        let Inner { containers, ring_buffer, .. } = &mut *inner;
217        ContainerBuffer {
218            shared_buffer: Arc::clone(self),
219            container_id: containers.new_container(ring_buffer, Arc::clone(&identity), stats),
220        }
221    }
222
223    pub async fn flush(&self) {
224        let (sender, receiver) = oneshot::channel();
225        self.inner.lock().thread_msg_queue.push_back(ThreadMessage::Flush(sender));
226        self.event.signal(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
227        // Ignore failure if Archivist is shutting down.
228        let _ = receiver.await;
229    }
230
231    /// Returns the number of registered containers in use by the buffer.
232    #[cfg(test)]
233    pub fn container_count(&self) -> usize {
234        self.inner.lock().containers.len()
235    }
236
237    /// Terminates the socket thread. The socket thread will drain the sockets before terminating.
238    /// Returns a future that may be awaited if the caller wants to wait for the socket thread to
239    /// terminate.
240    pub fn terminate(self: &Arc<Self>) -> impl Future<Output = ()> {
241        {
242            let mut inner = InnerGuard::new(self);
243            self.flush_sockets(&mut inner);
244            inner.thread_msg_queue.push_back(ThreadMessage::Terminate);
245        }
246        self.event.signal(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
247        let join_handle = self.socket_thread.lock().take().unwrap();
248        fasync::unblock(|| {
249            let _ = join_handle.join();
250        })
251    }
252
253    /// Returns a cursor that will return messages that match selector. There will be a limited
254    /// attempt to reorder messages by timestamp.  If no selectors are specified, *all* messages
255    /// will be returned.
256    pub fn cursor(
257        self: &Arc<Self>,
258        mode: StreamMode,
259        selectors: Vec<ComponentSelector>,
260    ) -> FilterCursor {
261        InnerGuard::new(self).cursor(mode, selectors)
262    }
263
264    fn socket_thread(self: Arc<Self>, sleep_time: Duration) {
265        const INTERRUPT_KEY: u64 = u64::MAX;
266        let mut sockets_ready = Vec::new();
267        let mut iob_peer_closed = Vec::new();
268        let mut interrupt_needs_arming = true;
269        let mut msg = None;
270
271        loop {
272            let mut deadline = if msg.is_some() {
273                zx::MonotonicInstant::INFINITE_PAST
274            } else {
275                if interrupt_needs_arming {
276                    self.event
277                        .wait_async(
278                            &self.port,
279                            INTERRUPT_KEY,
280                            zx::Signals::USER_0,
281                            zx::WaitAsyncOpts::empty(),
282                        )
283                        .unwrap();
284                    interrupt_needs_arming = false;
285                }
286
287                // Wait so that we're not constantly waking up for every log message that is queued.
288                // Ignore errors here.
289                let _ = self
290                    .event
291                    .wait_one(zx::Signals::USER_0, zx::MonotonicInstant::after(sleep_time.into()));
292                zx::MonotonicInstant::INFINITE
293            };
294
295            // Gather the list of sockets that are ready to read.
296            loop {
297                match self.port.wait(deadline) {
298                    Ok(packet) => {
299                        if packet.key() == INTERRUPT_KEY {
300                            interrupt_needs_arming = true;
301                            // To maintain proper ordering, we must capture the message here whilst
302                            // we are still gathering the list of sockets that are ready to read.
303                            // If we wait till later, we introduce windows where we might miss a
304                            // socket that should be read.
305                            if msg.is_none() {
306                                msg = self.inner.lock().thread_msg_queue.pop_front();
307                            }
308                        } else if packet.key() & IOB_PEER_CLOSED_KEY_BASE != 0 {
309                            iob_peer_closed.push(packet.key() as u32);
310                        } else {
311                            sockets_ready.push(SocketId(packet.key() as u32))
312                        }
313                    }
314                    Err(zx::Status::TIMED_OUT) => break,
315                    Err(status) => panic!("port wait error: {status:?}"),
316                }
317                deadline = zx::MonotonicInstant::INFINITE_PAST;
318            }
319
320            let mut inner = InnerGuard::new(&self);
321
322            if !iob_peer_closed.is_empty() {
323                // See the comment on `is_active()` for why this is required.
324                inner.update_message_ids(inner.ring_buffer.head());
325
326                for iob_peer_closed in iob_peer_closed.drain(..) {
327                    let container_id = inner.iob_peers.free(iob_peer_closed).0;
328                    if let Some(container) = inner.containers.get_mut(container_id) {
329                        container.iob_count -= 1;
330                        if container.iob_count == 0 && !container.is_active() {
331                            if container.should_free() {
332                                inner.containers.free(container_id);
333                            } else {
334                                let identity = Arc::clone(&container.identity);
335                                inner.on_inactive.push(identity);
336                            }
337                        }
338                    }
339                }
340            }
341
342            {
343                let mut sockets = self.sockets.lock();
344                for socket_id in sockets_ready.drain(..) {
345                    inner.read_socket(&mut sockets, socket_id, |socket| {
346                        socket
347                            .socket
348                            .wait_async(
349                                &self.port,
350                                socket_id.0 as u64,
351                                zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
352                                zx::WaitAsyncOpts::empty(),
353                            )
354                            .unwrap();
355                    });
356                }
357            }
358
359            // Now that we've processed the sockets, we can process the message.
360            if let Some(m) = msg.take() {
361                match m {
362                    ThreadMessage::Terminate => {
363                        inner.terminated = true;
364                        inner.wake = true;
365                        return;
366                    }
367                    ThreadMessage::Flush(sender) => {
368                        let _ = sender.send(());
369                    }
370                }
371
372                // See if there's another message.
373                msg = inner.thread_msg_queue.pop_front();
374                if msg.is_none() {
375                    // If there are no more messages, we must clear the signal so that we get
376                    // notified when the next message arrives. This must be done whilst we are
377                    // holding the lock.
378                    self.event.signal(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
379                }
380            }
381        }
382    }
383
384    async fn buffer_monitor_task(
385        this: Weak<Self>,
386        mut ring_buffer: ring_buffer::Reader,
387        sleep_time: Duration,
388    ) {
389        let mut last_head = 0;
390        loop {
391            // Sleep to limit how often we wake up.
392            fasync::Timer::new(sleep_time).await;
393            let head = ring_buffer.wait(last_head).await;
394            let Some(this) = this.upgrade() else { return };
395            let mut inner = InnerGuard::new(&this);
396            inner.check_space(head);
397            last_head = head;
398        }
399    }
400
401    /// Flushes sockets and returns the head index.
402    fn flush_sockets(&self, inner: &mut InnerGuard<'_>) -> u64 {
403        let mut sockets = self.sockets.lock();
404        let mut socket_id = 0;
405        while let Some(next_id) = sockets.next_used(socket_id) {
406            // The socket doesn't need to be rearmed here.
407            inner.read_socket(&mut sockets, SocketId(next_id), |_| {});
408            socket_id = next_id + 1;
409        }
410        let head = inner.ring_buffer.head();
411        inner.update_message_ids(head);
412        head
413    }
414}
415
416impl Inner {
417    // Returns list of containers that no longer have any messages.
418    fn ingest(&mut self, msg: &[u8], container_id: ContainerId) {
419        if msg.len() < std::mem::size_of::<Header>() {
420            debug!("message too short ({})", msg.len());
421            if let Some(container) = self.containers.get(container_id) {
422                container.stats.increment_invalid(msg.len());
423            }
424            return;
425        }
426
427        let header = Header::read_from_bytes(&msg[..std::mem::size_of::<Header>()]).unwrap();
428
429        // NOTE: Some tests send messages that are bigger than the header indicates. We ignore the
430        // tail of any such messages.
431        let msg_len = header.size_words() as usize * 8;
432
433        // Check the correct type and size.
434        if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg.len() < msg_len {
435            debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, msg.len());
436            if let Some(container) = self.containers.get(container_id) {
437                container.stats.increment_invalid(msg.len());
438            }
439            return;
440        }
441
442        let Some(container) = self.containers.get_mut(container_id) else {
443            return;
444        };
445
446        let mut data;
447        let msg = if container.dropped_count > 0 {
448            data = msg.to_vec();
449            if !add_dropped_count(&mut data, container.dropped_count) {
450                debug!("unable to add dropped count to invalid message");
451                container.stats.increment_invalid(data.len());
452                return;
453            }
454            &data
455        } else {
456            msg
457        };
458
459        if container.iob.write(Default::default(), 0, msg).is_err() {
460            // We were unable to write the message to the buffer, most likely due to lack of
461            // space. We drop this message and then we'll add to the dropped count for the next
462            // message.
463            container.dropped_count += 1
464        } else {
465            container.dropped_count = 0;
466        }
467    }
468
469    /// Parses the first message within `range` returns (container_id, message, timestamp).
470    ///
471    /// # Panics
472    ///
473    /// This will panic if the ring buffer has been corrupted. Only the kernel and Archivist can
474    /// write to the ring buffer and so we trust both not to corrupt the ring buffer.
475    ///
476    /// # Safety
477    ///
478    /// `range` *must* be within the written range of the ring buffer so that there is no concurrent
479    /// write access to that range. The returned slice is only valid whilst this remains true.
480    unsafe fn parse_message(
481        &self,
482        range: Range<u64>,
483    ) -> (ContainerId, &[u8], Option<zx::BootInstant>) {
484        let (tag, msg) = unsafe { self.ring_buffer.first_message_in(range) }
485            .expect("Unable to read message from ring buffer");
486        (
487            ContainerId(tag as u32),
488            msg,
489            (msg.len() >= 16)
490                .then(|| zx::BootInstant::from_nanos(i64::read_from_bytes(&msg[8..16]).unwrap())),
491        )
492    }
493}
494
495impl<'a> InnerGuard<'a> {
496    fn new(buffer: &'a Arc<SharedBuffer>) -> Self {
497        Self {
498            buffer,
499            guard: ManuallyDrop::new(buffer.inner.lock()),
500            on_inactive: Vec::new(),
501            wake: false,
502        }
503    }
504
505    /// Pops a message and returns its total size. The caller should call `update_message_ids(head)`
506    /// prior to calling this.
507    ///
508    /// NOTE: This will pop the oldest message in terms of when it was inserted which is *not*
509    /// necessarily the message with the *oldest* timestamp because we might not have received the
510    /// messages in perfect timestamp order. This should be close enough for all use cases we care
511    /// about, and besides, the timestamps can't be trusted anyway.
512    fn pop(&mut self, head: u64) -> Option<usize> {
513        if head == self.tail {
514            return None;
515        }
516
517        // SAFETY: There can be no concurrent writes between `tail..head` and the *only* place
518        // we increment the tail index is just before we leave this function.
519        let record_len = {
520            let (container_id, message, _) = unsafe { self.parse_message(self.tail..head) };
521            let record_len = ring_buffer_record_len(message.len());
522
523            let container = self.containers.get_mut(container_id).unwrap();
524
525            container.stats.increment_rolled_out(record_len);
526            container.msg_ids.start += 1;
527            if !container.is_active() {
528                if container.should_free() {
529                    self.containers.free(container_id);
530                } else {
531                    let identity = Arc::clone(&container.identity);
532                    self.on_inactive.push(identity);
533                }
534            }
535
536            record_len
537        };
538
539        // NOTE: This should go last. After incrementing `tail`, the `message` can be overwritten.
540        self.ring_buffer.increment_tail(record_len);
541        self.tail += record_len as u64;
542        self.tail_message_id += 1;
543
544        // The caller should have called `update_message_ids(head)` prior to calling this.
545        assert!(self.last_scanned >= self.tail);
546
547        Some(record_len)
548    }
549
550    /// Reads a socket. Calls `rearm` to rearm the socket once it has been drained.
551    fn read_socket(
552        &mut self,
553        sockets: &mut Slab<Socket>,
554        socket_id: SocketId,
555        rearm: impl FnOnce(&mut Socket),
556    ) {
557        let Some(socket) = sockets.get_mut(socket_id.0) else { return };
558        let container_id = socket.container_id;
559
560        loop {
561            self.check_space(self.ring_buffer.head());
562
563            let mut data = Vec::with_capacity(MAX_DATAGRAM_LEN_BYTES as usize);
564
565            // Read directly into the buffer leaving space for the header.
566            let len = match socket.socket.read_uninit(data.spare_capacity_mut()) {
567                Ok(d) => d.len(),
568                Err(zx::Status::SHOULD_WAIT) => {
569                    // The socket has been drained.
570                    rearm(socket);
571                    return;
572                }
573                Err(_) => break,
574            };
575
576            // SAFETY: `read_uninit` will have written to `len` bytes.
577            unsafe {
578                data.set_len(len);
579            }
580
581            let container = self.containers.get_mut(container_id).unwrap();
582            if data.len() < 16 {
583                container.stats.increment_invalid(data.len());
584                continue;
585            }
586
587            let header = Header::read_from_bytes(&data[..std::mem::size_of::<Header>()]).unwrap();
588            let msg_len = header.size_words() as usize * 8;
589            if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg_len != data.len() {
590                debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, data.len());
591                container.stats.increment_invalid(data.len());
592                continue;
593            }
594
595            if container.dropped_count > 0 && !add_dropped_count(&mut data, container.dropped_count)
596            {
597                debug!("unable to add dropped count to invalid message");
598                container.stats.increment_invalid(data.len());
599                continue;
600            }
601
602            if container.iob.write(Default::default(), 0, &data).is_err() {
603                // We were unable to write the message to the buffer, most likely due to lack of
604                // space. We drop this message and then we'll add to the dropped count for the next
605                // message.
606                container.dropped_count += 1
607            } else {
608                container.dropped_count = 0;
609            }
610        }
611
612        // This path is taken when the socket should be closed.
613
614        // See the comment on `is_active()` for why this is required.
615        self.update_message_ids(self.ring_buffer.head());
616
617        let container = self.containers.get_mut(container_id).unwrap();
618        container.remove_socket(socket_id, sockets);
619        if !container.is_active() {
620            if container.should_free() {
621                self.containers.free(container_id);
622            } else {
623                let identity = Arc::clone(&container.identity);
624                self.on_inactive.push(identity);
625            }
626        }
627    }
628
629    /// Scans the ring buffer and updates `msg_ids` for the containers.
630    fn update_message_ids(&mut self, head: u64) {
631        while self.last_scanned < head {
632            // SAFETY: This is safe because `head` must be within the ring buffer range and we make
633            // sure that `self.last_scanned` is always >= `tail` in `pop()`.
634            let (container_id, message, _) = unsafe { self.parse_message(self.last_scanned..head) };
635            let msg_len = message.len();
636            let severity = (msg_len >= 8)
637                .then(|| Header::read_from_bytes(&message[0..8]).unwrap().severity().into());
638            let container = self.containers.get_mut(container_id).unwrap();
639            container.msg_ids.end += 1;
640            if let Some(severity) = severity {
641                container.stats.ingest_message(msg_len, severity);
642            }
643            self.last_scanned += ring_buffer_record_len(msg_len) as u64;
644            self.last_scanned_message_id += 1;
645            self.wake = true;
646        }
647    }
648
649    /// Ensures the buffer keeps the required amount of space.
650    fn check_space(&mut self, head: u64) {
651        self.update_message_ids(head);
652        let capacity = self.ring_buffer.capacity();
653        let mut space = capacity
654            .checked_sub((head - self.tail) as usize)
655            .unwrap_or_else(|| panic!("bad range: {:?}", self.tail..head));
656        let required_space = capacity * SPACE_THRESHOLD_NUMERATOR / SPACE_THRESHOLD_DENOMINATOR;
657        while space < required_space {
658            let Some(amount) = self.pop(head) else { break };
659            space += amount;
660        }
661    }
662
663    /// Returns a cursor.
664    fn cursor(&mut self, mode: StreamMode, selectors: Vec<ComponentSelector>) -> FilterCursor {
665        // NOTE: It is not safe to use on_inactive in this function because this function can be
666        // called whilst locks are held which are the same locks that the on_inactive notification
667        // uses.
668        let (index, message_id) = match mode {
669            StreamMode::Snapshot => (self.tail, self.tail_message_id),
670            StreamMode::Subscribe => {
671                let head = self.ring_buffer.head();
672                self.update_message_ids(head);
673                (self.last_scanned, self.last_scanned_message_id)
674            }
675            StreamMode::SnapshotThenSubscribe => (self.tail, self.tail_message_id),
676        };
677        FilterCursor::new(
678            Arc::clone(self.buffer),
679            index,
680            message_id,
681            matches!(mode, StreamMode::Snapshot),
682            selectors,
683        )
684    }
685}
686
687#[derive(Default)]
688struct Containers {
689    slab: Slab<ContainerInfo>,
690}
691
692#[derive(Clone, Copy, Debug, Eq, PartialEq)]
693struct ContainerId(u32);
694
695impl Containers {
696    #[cfg(test)]
697    fn len(&self) -> usize {
698        self.slab.len()
699    }
700
701    fn get(&self, id: ContainerId) -> Option<&ContainerInfo> {
702        self.slab.get(id.0)
703    }
704
705    fn get_mut(&mut self, id: ContainerId) -> Option<&mut ContainerInfo> {
706        self.slab.get_mut(id.0)
707    }
708
709    fn new_container(
710        &mut self,
711        buffer: &RingBuffer,
712        identity: Arc<ComponentIdentity>,
713        stats: Arc<LogStreamStats>,
714    ) -> ContainerId {
715        ContainerId(self.slab.insert(|id| {
716            let (iob, _) = buffer.new_iob_writer(id as u64).unwrap();
717            ContainerInfo::new(identity, stats, iob)
718        }))
719    }
720
721    fn free(&mut self, id: ContainerId) {
722        self.slab.free(id.0);
723    }
724}
725
726#[derive(Derivative)]
727#[derivative(Debug)]
728struct ContainerInfo {
729    // The identity of the container.
730    identity: Arc<ComponentIdentity>,
731
732    // The first and last message IDs stored in the shared buffer. The last message ID can be out of
733    // date if there are concurrent writers.
734    msg_ids: Range<u64>,
735
736    // Whether the container is terminated.
737    terminated: bool,
738
739    // Inspect instrumentation.
740    #[derivative(Debug = "ignore")]
741    stats: Arc<LogStreamStats>,
742
743    // The first socket ID for this container.
744    first_socket_id: SocketId,
745
746    // The IOBuffer used for writing.
747    iob: zx::Iob,
748
749    // The number of client IOBuffers.
750    iob_count: usize,
751
752    // The number of messages dropped when forwarding from a socket to an IOBuffer.
753    dropped_count: u64,
754}
755
756impl ContainerInfo {
757    fn new(identity: Arc<ComponentIdentity>, stats: Arc<LogStreamStats>, iob: zx::Iob) -> Self {
758        Self {
759            identity,
760            msg_ids: 0..0,
761            terminated: false,
762            stats,
763            first_socket_id: SocketId::NULL,
764            iob,
765            iob_count: 0,
766            dropped_count: 0,
767        }
768    }
769
770    fn should_free(&self) -> bool {
771        self.terminated && !self.is_active()
772    }
773
774    // Returns true if the container is considered active which is the case if it has sockets, io
775    // buffers, or buffered messages.
776    //
777    // NOTE: Whenever a socket or iob is closed, `update_message_ids` must called to ensure
778    // `msg_ids.end` is correctly set.
779    fn is_active(&self) -> bool {
780        self.first_socket_id != SocketId::NULL
781            || self.iob_count > 0
782            || self.msg_ids.end != self.msg_ids.start
783            || ARCHIVIST_MONIKER.get().is_some_and(|m| *self.identity == *m)
784    }
785
786    // # Panics
787    //
788    // This will panic if the socket isn't found.
789    fn remove_socket(&mut self, socket_id: SocketId, sockets: &mut Slab<Socket>) {
790        let Socket { prev, next, .. } = *sockets.get(socket_id.0).unwrap();
791        if prev == SocketId::NULL {
792            self.first_socket_id = next;
793        } else {
794            sockets.get_mut(prev.0).unwrap().next = next;
795        }
796        if next != SocketId::NULL {
797            sockets
798                .get_mut(next.0)
799                .unwrap_or_else(|| panic!("next {next:?} has been freed!"))
800                .prev = prev;
801        }
802        sockets.free(socket_id.0);
803        self.stats.close_socket();
804        debug!(identity:% = self.identity; "Socket closed.");
805    }
806}
807
808pub struct ContainerBuffer {
809    shared_buffer: Arc<SharedBuffer>,
810    container_id: ContainerId,
811}
812
813impl ContainerBuffer {
814    /// Returns the tag used by IOBuffers used for this component.
815    pub fn iob_tag(&self) -> u64 {
816        self.container_id.0 as u64
817    }
818
819    /// Ingests a new message.
820    ///
821    /// If the message is invalid, it is dropped.
822    pub fn push_back(&self, msg: &[u8]) {
823        self.shared_buffer.inner.lock().ingest(msg, self.container_id);
824    }
825
826    /// Returns an IOBuffer for the container.
827    pub fn iob(&self) -> zx::Iob {
828        let mut inner = self.shared_buffer.inner.lock();
829
830        inner.containers.get_mut(self.container_id).unwrap().iob_count += 1;
831
832        let (ep0, ep1) = inner.ring_buffer.new_iob_writer(self.container_id.0 as u64).unwrap();
833
834        inner.iob_peers.insert(|idx| {
835            ep1.wait_async(
836                &self.shared_buffer.port,
837                idx as u64 | IOB_PEER_CLOSED_KEY_BASE,
838                zx::Signals::IOB_PEER_CLOSED,
839                zx::WaitAsyncOpts::empty(),
840            )
841            .unwrap();
842
843            (self.container_id, ep1)
844        });
845
846        ep0
847    }
848
849    /// Returns a cursor for messages that only apply to this container.
850    #[cfg(test)]
851    fn cursor(&self, mode: StreamMode) -> Option<FilterCursorStream<FxtMessage>> {
852        use selectors::SelectorExt;
853
854        // NOTE: It is not safe to use on_inactive in this function because this function can be
855        // called whilst locks are held which are the same locks that the on_inactive notification
856        // uses.
857        let mut inner = InnerGuard::new(&self.shared_buffer);
858        let Some(container) = inner.containers.get_mut(self.container_id) else {
859            // We've hit a race where the container has terminated.
860            return None;
861        };
862        let selectors = vec![container.identity.moniker.clone().into_component_selector()];
863        Some(inner.cursor(mode, selectors).into())
864    }
865
866    /// Marks the buffer as terminated which will force all cursors to end and close all sockets.
867    /// The component's data will remain in the buffer until the messages are rolled out. This will
868    /// *not* drain sockets or close IOBuffers.
869    pub fn terminate(&self) {
870        let mut inner = InnerGuard::new(&self.shared_buffer);
871
872        // See the comment on `is_active()` for why this is required.
873        inner.update_message_ids(inner.ring_buffer.head());
874
875        if let Some(container) = inner.containers.get_mut(self.container_id) {
876            container.terminated = true;
877            if container.first_socket_id != SocketId::NULL {
878                let mut sockets = self.shared_buffer.sockets.lock();
879                loop {
880                    container.remove_socket(container.first_socket_id, &mut sockets);
881                    if container.first_socket_id == SocketId::NULL {
882                        break;
883                    }
884                }
885            }
886            if container.should_free() {
887                inner.containers.free(self.container_id);
888            }
889            inner.wake = true;
890        }
891    }
892
893    /// Returns true if the container has messages, sockets or IOBuffers.
894    pub fn is_active(&self) -> bool {
895        self.shared_buffer
896            .inner
897            .lock()
898            .containers
899            .get(self.container_id)
900            .is_some_and(|c| c.is_active())
901    }
902
903    /// Adds a socket for this container.
904    pub fn add_socket(&self, socket: zx::Socket) {
905        let mut inner = self.shared_buffer.inner.lock();
906        let Some(container) = inner.containers.get_mut(self.container_id) else { return };
907        container.stats.open_socket();
908        let next = container.first_socket_id;
909        let mut sockets = self.shared_buffer.sockets.lock();
910        let socket_id = SocketId(sockets.insert(|socket_id| {
911            socket
912                .wait_async(
913                    &self.shared_buffer.port,
914                    socket_id as u64,
915                    zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
916                    zx::WaitAsyncOpts::empty(),
917                )
918                .unwrap();
919            Socket { socket, container_id: self.container_id, prev: SocketId::NULL, next }
920        }));
921        if next != SocketId::NULL {
922            sockets.get_mut(next.0).unwrap().prev = socket_id;
923        }
924        container.first_socket_id = socket_id;
925    }
926}
927
928impl Drop for ContainerBuffer {
929    fn drop(&mut self) {
930        self.terminate();
931    }
932}
933
934/// Implements a simple Slab allocator.
935struct Slab<T> {
936    slab: Vec<Slot<T>>,
937    free_index: usize,
938}
939
940impl<T> Default for Slab<T> {
941    fn default() -> Self {
942        Self { slab: Vec::new(), free_index: usize::MAX }
943    }
944}
945
946enum Slot<T> {
947    Used(T),
948    Free(usize),
949}
950
951impl<T> Slab<T> {
952    /// Returns the number of used entries. This is not performant.
953    #[cfg(test)]
954    fn len(&self) -> usize {
955        self.slab.iter().filter(|c| matches!(c, Slot::Used(_))).count()
956    }
957
958    fn free(&mut self, index: u32) -> T {
959        let index = index as usize;
960        let value = match std::mem::replace(&mut self.slab[index], Slot::Free(self.free_index)) {
961            Slot::Free(_) => panic!("Slot already free"),
962            Slot::Used(value) => value,
963        };
964        self.free_index = index;
965        value
966    }
967
968    fn get(&self, id: u32) -> Option<&T> {
969        self.slab.get(id as usize).and_then(|s| match s {
970            Slot::Used(s) => Some(s),
971            _ => None,
972        })
973    }
974
975    fn get_mut(&mut self, id: u32) -> Option<&mut T> {
976        self.slab.get_mut(id as usize).and_then(|s| match s {
977            Slot::Used(s) => Some(s),
978            _ => None,
979        })
980    }
981
982    fn insert(&mut self, value: impl FnOnce(u32) -> T) -> u32 {
983        let free_index = self.free_index;
984        if free_index != usize::MAX {
985            self.free_index = match std::mem::replace(
986                &mut self.slab[free_index],
987                Slot::Used(value(free_index as u32)),
988            ) {
989                Slot::Free(next) => next,
990                _ => unreachable!(),
991            };
992            free_index as u32
993        } else {
994            // This is < rather than <= because we reserve 0xffff_ffff to be used as a NULL value
995            // (see SocketId::NULL below).
996            assert!(self.slab.len() < u32::MAX as usize);
997            self.slab.push(Slot::Used(value(self.slab.len() as u32)));
998            (self.slab.len() - 1) as u32
999        }
1000    }
1001
1002    /// Returns the next used slot starting from `id` (inclusive).
1003    fn next_used(&self, id: u32) -> Option<u32> {
1004        let mut id = id as usize;
1005        while id < self.slab.len() {
1006            if matches!(self.slab[id], Slot::Used(_)) {
1007                return Some(id as u32);
1008            }
1009            id += 1;
1010        }
1011        None
1012    }
1013}
1014
1015#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1016struct SocketId(u32);
1017
1018impl SocketId {
1019    const NULL: Self = SocketId(0xffff_ffff);
1020}
1021
1022struct Socket {
1023    socket: zx::Socket,
1024    container_id: ContainerId,
1025    // Sockets are stored as a linked list for each container.
1026    prev: SocketId,
1027    next: SocketId,
1028}
1029
1030#[cfg(test)]
1031mod tests {
1032    use super::{SharedBuffer, SharedBufferOptions, Slab, create_ring_buffer};
1033    use crate::logs::stats::LogStreamStats;
1034    use crate::logs::testing::make_message;
1035    use assert_matches::assert_matches;
1036    use diagnostics_assertions::{AnyProperty, assert_data_tree};
1037    use fidl_fuchsia_diagnostics::StreamMode;
1038    use fuchsia_async as fasync;
1039    use fuchsia_async::TimeoutExt;
1040    use fuchsia_inspect::Inspector;
1041    use fuchsia_inspect_derive::WithInspect;
1042    use futures::FutureExt;
1043    use futures::channel::mpsc;
1044    use futures::future::OptionFuture;
1045    use futures::stream::{FuturesUnordered, StreamExt as _};
1046    use ring_buffer::MAX_MESSAGE_SIZE;
1047    use std::future::poll_fn;
1048    use std::iter::repeat_with;
1049    use std::pin::pin;
1050    use std::sync::Arc;
1051    use std::sync::atomic::{AtomicU64, Ordering};
1052    use std::task::Poll;
1053    use std::time::Duration;
1054
1055    async fn yield_to_executor() {
1056        let mut first_time = true;
1057        poll_fn(|cx| {
1058            if first_time {
1059                cx.waker().wake_by_ref();
1060                first_time = false;
1061                Poll::Pending
1062            } else {
1063                Poll::Ready(())
1064            }
1065        })
1066        .await;
1067    }
1068
1069    #[fuchsia::test]
1070    async fn push_one_message() {
1071        let buffer = SharedBuffer::new(
1072            create_ring_buffer(MAX_MESSAGE_SIZE),
1073            Box::new(|_| {}),
1074            Default::default(),
1075        );
1076        let container_buffer =
1077            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1078        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1079        container_buffer.push_back(msg.bytes());
1080
1081        // Make sure the cursor can find it.
1082        let cursor = container_buffer.cursor(StreamMode::Snapshot).unwrap();
1083        assert_eq!(cursor.map(|item| assert_eq!(item.data(), msg.bytes())).count().await, 1);
1084    }
1085
1086    #[fuchsia::test]
1087    async fn message_too_short() {
1088        let buffer = SharedBuffer::new(
1089            create_ring_buffer(MAX_MESSAGE_SIZE),
1090            Box::new(|_| {}),
1091            Default::default(),
1092        );
1093
1094        let container_buffer =
1095            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1096        container_buffer.push_back(&[0]);
1097
1098        assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1099    }
1100
1101    #[fuchsia::test]
1102    async fn bad_type() {
1103        let buffer = SharedBuffer::new(
1104            create_ring_buffer(MAX_MESSAGE_SIZE),
1105            Box::new(|_| {}),
1106            Default::default(),
1107        );
1108        let container_buffer =
1109            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1110        container_buffer.push_back(&[0x77; 16]);
1111
1112        assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1113    }
1114
1115    #[fuchsia::test]
1116    async fn message_truncated() {
1117        let buffer = SharedBuffer::new(
1118            create_ring_buffer(MAX_MESSAGE_SIZE),
1119            Box::new(|_| {}),
1120            Default::default(),
1121        );
1122        let container_buffer =
1123            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1124        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1125        container_buffer.push_back(&msg.bytes()[..msg.bytes().len() - 1]);
1126
1127        assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1128    }
1129
1130    #[fuchsia::test]
1131    async fn buffer_wrapping() {
1132        let buffer = SharedBuffer::new(
1133            create_ring_buffer(MAX_MESSAGE_SIZE),
1134            Box::new(|_| {}),
1135            SharedBufferOptions { sleep_time: Duration::ZERO },
1136        );
1137        let container_buffer =
1138            buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1139
1140        // Keep writing messages until we wrap.
1141        let mut i = 0;
1142        loop {
1143            let msg = make_message(&format!("{i}"), None, zx::BootInstant::from_nanos(i));
1144            container_buffer.push_back(msg.bytes());
1145            i += 1;
1146
1147            // Yield to the executor to allow messages to be rolled out.
1148            yield_to_executor().await;
1149
1150            let inner = buffer.inner.lock();
1151            if inner.ring_buffer.head() > inner.ring_buffer.capacity() as u64 {
1152                break;
1153            }
1154        }
1155
1156        // Read back all the messages.
1157        let mut cursor = pin!(container_buffer.cursor(StreamMode::Snapshot).unwrap());
1158
1159        let mut j;
1160        let item = cursor.next().await;
1161        assert_matches!(
1162            item,
1163            Some(item) => {
1164                j = item.timestamp().into_nanos();
1165                let msg = make_message(&format!("{j}"),
1166                                       None,
1167                                       item.timestamp());
1168                assert_eq!(item.data(), msg.bytes());
1169            }
1170        );
1171
1172        j += 1;
1173        while j != i {
1174            assert_matches!(
1175                cursor.next().await,
1176                Some(item) => {
1177                    assert_eq!(item.data(), make_message(&format!("{j}"),
1178                                                         None,
1179                                                         item.timestamp()).bytes());
1180                }
1181            );
1182            j += 1;
1183        }
1184
1185        assert!(cursor.next().await.is_none());
1186    }
1187
1188    #[fuchsia::test]
1189    async fn on_inactive() {
1190        let identity = Arc::new(vec!["a"].into());
1191        let on_inactive = Arc::new(AtomicU64::new(0));
1192        let buffer = {
1193            let on_inactive = Arc::clone(&on_inactive);
1194            let identity = Arc::clone(&identity);
1195            Arc::new(SharedBuffer::new(
1196                create_ring_buffer(MAX_MESSAGE_SIZE),
1197                Box::new(move |i| {
1198                    assert_eq!(i, identity);
1199                    on_inactive.fetch_add(1, Ordering::Relaxed);
1200                }),
1201                SharedBufferOptions { sleep_time: Duration::ZERO },
1202            ))
1203        };
1204        let container_a = buffer.new_container_buffer(identity, Arc::default());
1205        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1206
1207        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1208        container_a.push_back(msg.bytes());
1209
1210        // Repeatedly write messages to b until a is rolled out.
1211        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1212            container_b.push_back(msg.bytes());
1213
1214            // Yield to the executor to allow messages to be rolled out.
1215            yield_to_executor().await;
1216        }
1217
1218        assert_eq!(on_inactive.load(Ordering::Relaxed), 1);
1219    }
1220
1221    #[fuchsia::test]
1222    async fn terminate_drops_container() {
1223        // Silence a clippy warning; SharedBuffer needs an executor.
1224        async {}.await;
1225
1226        let buffer = SharedBuffer::new(
1227            create_ring_buffer(MAX_MESSAGE_SIZE),
1228            Box::new(|_| {}),
1229            SharedBufferOptions { sleep_time: Duration::ZERO },
1230        );
1231
1232        // terminate when buffer has no logs.
1233        let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1234        assert_eq!(buffer.container_count(), 1);
1235        container_a.terminate();
1236
1237        assert_eq!(buffer.container_count(), 0);
1238
1239        // terminate when buffer has logs.
1240        let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1241        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1242        container_a.push_back(msg.bytes());
1243        assert_eq!(buffer.container_count(), 1);
1244        container_a.terminate();
1245
1246        // The container should still be there because it has logs.
1247        assert_eq!(buffer.container_count(), 1);
1248
1249        // Roll out the logs.
1250        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1251        assert_eq!(buffer.container_count(), 2);
1252
1253        // Repeatedly write messages to b until a's message is dropped and then the container will
1254        // be dropped.
1255        while buffer.container_count() != 1 {
1256            container_b.push_back(msg.bytes());
1257
1258            // Yield to the executor to allow messages to be rolled out.
1259            yield_to_executor().await;
1260        }
1261
1262        assert!(container_a.cursor(StreamMode::Subscribe).is_none());
1263    }
1264
1265    #[fuchsia::test]
1266    async fn cursor_subscribe() {
1267        for mode in [StreamMode::Subscribe, StreamMode::SnapshotThenSubscribe] {
1268            let buffer = SharedBuffer::new(
1269                create_ring_buffer(MAX_MESSAGE_SIZE),
1270                Box::new(|_| {}),
1271                Default::default(),
1272            );
1273            let container =
1274                Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1275            let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1276            container.push_back(msg.bytes());
1277
1278            let (sender, mut receiver) = mpsc::unbounded();
1279
1280            // Run the cursor in a separate task so that we can test it gets woken correctly.
1281            {
1282                let container = Arc::clone(&container);
1283                fasync::Task::spawn(async move {
1284                    let mut cursor = pin!(container.cursor(mode).unwrap());
1285                    while let Some(item) = cursor.next().await {
1286                        sender.unbounded_send(item).unwrap();
1287                    }
1288                })
1289                .detach();
1290            }
1291
1292            // The existing message should only be returned with SnapshotThenSubscribe
1293            if mode == StreamMode::SnapshotThenSubscribe {
1294                assert_matches!(
1295                    receiver.next().await,
1296                    Some(item) if item.data() == msg.bytes()
1297                );
1298            }
1299
1300            // No message should arrive. We can only use a timeout here.
1301            assert!(
1302                OptionFuture::from(Some(receiver.next()))
1303                    .on_timeout(Duration::from_millis(500), || None)
1304                    .await
1305                    .is_none()
1306            );
1307
1308            container.push_back(msg.bytes());
1309
1310            // The message should arrive now.
1311            assert_matches!(
1312                receiver.next().await,
1313                Some(item) if item.data() == msg.bytes()
1314            );
1315        }
1316    }
1317
1318    #[fuchsia::test]
1319    async fn drained_post_termination_cursors() {
1320        let buffer = SharedBuffer::new(
1321            create_ring_buffer(MAX_MESSAGE_SIZE),
1322            Box::new(|_| {}),
1323            Default::default(),
1324        );
1325        let container =
1326            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1327        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1328
1329        let mut cursor_a = pin!(container.cursor(StreamMode::Subscribe).unwrap());
1330        let mut cursor_b = pin!(container.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1331
1332        container.push_back(msg.bytes());
1333        container.push_back(msg.bytes());
1334        container.push_back(msg.bytes());
1335        container.push_back(msg.bytes());
1336        container.push_back(msg.bytes());
1337
1338        let mut cursor_c = pin!(container.cursor(StreamMode::Snapshot).unwrap());
1339        assert!(cursor_a.next().await.is_some());
1340        assert!(cursor_b.next().await.is_some());
1341        assert!(cursor_c.next().await.is_some());
1342
1343        drop(buffer.terminate());
1344
1345        // All cursors should return the 4 remaining messages.
1346        assert_eq!(cursor_a.count().await, 4);
1347        assert_eq!(cursor_b.count().await, 4);
1348        assert_eq!(cursor_c.count().await, 4);
1349    }
1350
1351    #[fuchsia::test]
1352    async fn empty_post_termination_cursors() {
1353        let buffer = SharedBuffer::new(
1354            create_ring_buffer(MAX_MESSAGE_SIZE),
1355            Box::new(|_| {}),
1356            Default::default(),
1357        );
1358        let container =
1359            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1360
1361        let cursor_a = container.cursor(StreamMode::Subscribe).unwrap();
1362        let cursor_b = container.cursor(StreamMode::SnapshotThenSubscribe).unwrap();
1363        let cursor_c = container.cursor(StreamMode::Snapshot).unwrap();
1364
1365        drop(buffer.terminate());
1366
1367        assert_eq!(cursor_a.count().await, 0);
1368        assert_eq!(cursor_b.count().await, 0);
1369        assert_eq!(cursor_c.count().await, 0);
1370    }
1371
1372    #[fuchsia::test]
1373    async fn recycled_container_slot() {
1374        let buffer = Arc::new(SharedBuffer::new(
1375            create_ring_buffer(MAX_MESSAGE_SIZE),
1376            Box::new(|_| {}),
1377            SharedBufferOptions { sleep_time: Duration::ZERO },
1378        ));
1379        let container_a =
1380            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1381        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1382        container_a.push_back(msg.bytes());
1383
1384        let mut cursor = pin!(container_a.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1385        assert_matches!(cursor.next().await, Some(_));
1386
1387        // Roll out all the messages.
1388        let container_b =
1389            Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1390        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await > 0 {
1391            container_b.push_back(msg.bytes());
1392
1393            // Yield to the executor to allow messages to be rolled out.
1394            yield_to_executor().await;
1395        }
1396
1397        container_a.terminate();
1398
1399        // This should create a new container that uses a new slot and shouldn't interfere with
1400        // container_a.
1401        let container_c =
1402            Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), Arc::default()));
1403        container_c.push_back(msg.bytes());
1404        container_c.push_back(msg.bytes());
1405    }
1406
1407    #[fuchsia::test]
1408    async fn socket_increments_logstats() {
1409        let inspector = Inspector::default();
1410        let stats: Arc<LogStreamStats> =
1411            Arc::new(LogStreamStats::default().with_inspect(inspector.root(), "test").unwrap());
1412        let buffer = Arc::new(SharedBuffer::new(
1413            create_ring_buffer(65536),
1414            Box::new(|_| {}),
1415            Default::default(),
1416        ));
1417        let container_a = Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats));
1418        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1419
1420        let (local, remote) = zx::Socket::create_datagram();
1421        container_a.add_socket(remote);
1422
1423        let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1424
1425        // Use FuturesUnordered so that we can make sure that the cursor is woken when a message is
1426        // received (FuturesUnordered uses separate wakers for all the futures it manages).
1427        let mut futures = FuturesUnordered::new();
1428        futures.push(async move {
1429            let mut cursor_a = pin!(cursor_a);
1430            cursor_a.next().await
1431        });
1432        let mut next = futures.next();
1433        assert!(futures::poll!(&mut next).is_pending());
1434
1435        local.write(msg.bytes()).unwrap();
1436
1437        let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1438
1439        assert_eq!(cursor_b.map(|item| assert_eq!(item.data(), msg.bytes())).count().await, 1);
1440
1441        // If cursor_a wasn't woken, this will hang.
1442        next.await;
1443        // Validate logstats (must happen after the socket was handled)
1444        assert_data_tree!(
1445            inspector,
1446            root: contains {
1447                test: {
1448                    url: "",
1449                    last_timestamp: AnyProperty,
1450                    sockets_closed: 0u64,
1451                    sockets_opened: 1u64,
1452                    invalid: {
1453                        number: 0u64,
1454                        bytes: 0u64,
1455                    },
1456                    total: {
1457                        number: 1u64,
1458                        bytes: 88u64,
1459                    },
1460                    rolled_out: {
1461                        number: 0u64,
1462                        bytes: 0u64,
1463                    },
1464                    trace: {
1465                        number: 0u64,
1466                        bytes: 0u64,
1467                    },
1468                    debug: {
1469                        number: 1u64,
1470                        bytes: 88u64,
1471                    },
1472                    info: {
1473                        number: 0u64,
1474                        bytes: 0u64,
1475                    },
1476                    warn: {
1477                        number: 0u64,
1478                        bytes: 0u64,
1479                    },
1480                    error: {
1481                        number: 0u64,
1482                        bytes: 0u64,
1483                    },
1484                    fatal: {
1485                        number: 0u64,
1486                        bytes: 0u64,
1487                    },
1488                }
1489            }
1490        );
1491    }
1492
1493    #[fuchsia::test]
1494    async fn socket() {
1495        let buffer = Arc::new(SharedBuffer::new(
1496            create_ring_buffer(MAX_MESSAGE_SIZE),
1497            Box::new(|_| {}),
1498            Default::default(),
1499        ));
1500        let container_a =
1501            Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1502        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1503
1504        let (local, remote) = zx::Socket::create_datagram();
1505        container_a.add_socket(remote);
1506
1507        let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1508
1509        // Use FuturesUnordered so that we can make sure that the cursor is woken when a message is
1510        // received (FuturesUnordered uses separate wakers for all the futures it manages).
1511        let mut futures = FuturesUnordered::new();
1512        futures.push(async move {
1513            let mut cursor_a = pin!(cursor_a);
1514            cursor_a.next().await
1515        });
1516        let mut next = futures.next();
1517        assert!(futures::poll!(&mut next).is_pending());
1518
1519        local.write(msg.bytes()).unwrap();
1520
1521        let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1522
1523        assert_eq!(cursor_b.map(|item| assert_eq!(item.data(), msg.bytes())).count().await, 1);
1524
1525        // If cursor_a wasn't woken, this will hang.
1526        next.await;
1527    }
1528
1529    #[fuchsia::test]
1530    async fn socket_on_inactive() {
1531        let on_inactive = Arc::new(AtomicU64::new(0));
1532        let a_identity = Arc::new(vec!["a"].into());
1533        let buffer = Arc::new(SharedBuffer::new(
1534            create_ring_buffer(MAX_MESSAGE_SIZE),
1535            {
1536                let on_inactive = Arc::clone(&on_inactive);
1537                let a_identity = Arc::clone(&a_identity);
1538                Box::new(move |id| {
1539                    assert_eq!(id, a_identity);
1540                    on_inactive.fetch_add(1, Ordering::Relaxed);
1541                })
1542            },
1543            SharedBufferOptions { sleep_time: Duration::ZERO },
1544        ));
1545        let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1546        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1547
1548        let (local, remote) = zx::Socket::create_datagram();
1549        container_a.add_socket(remote);
1550
1551        local.write(msg.bytes()).unwrap();
1552
1553        let cursor = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1554
1555        assert_eq!(cursor.map(|item| assert_eq!(item.data(), msg.bytes())).count().await, 1);
1556
1557        // Now roll out a's messages.
1558        let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1559        while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1560            container_b.push_back(msg.bytes());
1561
1562            // Yield to the executor to allow messages to be rolled out.
1563            yield_to_executor().await;
1564        }
1565
1566        assert_eq!(on_inactive.load(Ordering::Relaxed), 0);
1567
1568        // Close the socket.
1569        std::mem::drop(local);
1570
1571        // We don't know when the socket thread will run so we have to loop.
1572        while on_inactive.load(Ordering::Relaxed) != 1 {
1573            fasync::Timer::new(Duration::from_millis(50)).await;
1574        }
1575    }
1576
1577    #[fuchsia::test]
1578    async fn flush() {
1579        let a_identity = Arc::new(vec!["a"].into());
1580        let buffer = Arc::new(SharedBuffer::new(
1581            create_ring_buffer(1024 * 1024),
1582            Box::new(|_| {}),
1583            Default::default(),
1584        ));
1585        let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1586        let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1587
1588        let (local, remote) = zx::Socket::create_datagram();
1589        container_a.add_socket(remote);
1590
1591        let cursor = pin!(container_a.cursor(StreamMode::Subscribe).unwrap());
1592
1593        const COUNT: usize = 1000;
1594        for _ in 0..COUNT {
1595            local.write(msg.bytes()).unwrap();
1596        }
1597
1598        // Race two flush futures.
1599        let mut flush_futures = FuturesUnordered::from_iter([buffer.flush(), buffer.flush()]);
1600        flush_futures.next().await;
1601
1602        let messages: Option<Vec<_>> = cursor.take(COUNT).collect().now_or_never();
1603        assert!(messages.is_some());
1604
1605        // Make sure the other one finishes too.
1606        flush_futures.next().await;
1607
1608        // Make sure we can still terminate the buffer.
1609        buffer.terminate().await;
1610    }
1611
1612    #[fuchsia::test]
1613    fn test_slab_next_used() {
1614        let mut slab = Slab::default();
1615        let mut ids: Vec<_> = repeat_with(|| slab.insert(|_| ())).take(4).collect();
1616        ids.sort();
1617        slab.free(ids[0]);
1618        slab.free(ids[2]);
1619        assert_eq!(slab.next_used(0), Some(ids[1]));
1620        assert_eq!(slab.next_used(ids[1]), Some(ids[1]));
1621        assert_eq!(slab.next_used(ids[1] + 1), Some(ids[3]));
1622        assert_eq!(slab.next_used(ids[3] + 1), None);
1623    }
1624}