1mod cursor;
6
7pub use cursor::{FilterCursor, FilterCursorStream, FxtMessage};
8
9use crate::identity::ComponentIdentity;
10use crate::logs::repository::ARCHIVIST_MONIKER;
11use crate::logs::stats::{LogStreamStats, SaturationCurve};
12use derivative::Derivative;
13use diagnostics_log_encoding::encode::add_dropped_count;
14use diagnostics_log_encoding::{Header, TRACING_FORMAT_LOG_RECORD_TYPE};
15use fidl_fuchsia_diagnostics::{ComponentSelector, StreamMode};
16use fidl_fuchsia_logger::MAX_DATAGRAM_LEN_BYTES;
17use fuchsia_async as fasync;
18use fuchsia_async::condition::{Condition, ConditionGuard};
19use fuchsia_inspect::Node;
20use fuchsia_sync::Mutex;
21use futures::channel::oneshot;
22use log::debug;
23use ring_buffer::{self, RingBuffer, ring_buffer_record_len};
24use std::collections::VecDeque;
25use std::mem::ManuallyDrop;
26use std::ops::{Deref, DerefMut, Range};
27use std::sync::{Arc, Weak};
28use std::time::Duration;
29use zerocopy::FromBytes;
30
31const SPACE_THRESHOLD_NUMERATOR: usize = 1;
33const SPACE_THRESHOLD_DENOMINATOR: usize = 4;
34
35const DEFAULT_SLEEP_TIME: Duration = Duration::from_millis(200);
38
39pub fn create_ring_buffer(capacity: usize) -> ring_buffer::Reader {
40 RingBuffer::create(calculate_real_size_given_desired_capacity(capacity))
41}
42
43fn calculate_real_size_given_desired_capacity(capacity: usize) -> usize {
44 let page_size = zx::system_get_page_size() as usize;
48 (capacity * SPACE_THRESHOLD_DENOMINATOR
49 / (SPACE_THRESHOLD_DENOMINATOR - SPACE_THRESHOLD_NUMERATOR))
50 .next_multiple_of(page_size)
51}
52
53const IOB_PEER_CLOSED_KEY_BASE: u64 = 0x8000_0000_0000_0000;
54
55pub type OnInactive = Box<dyn Fn(Arc<ComponentIdentity>) + Send + Sync>;
56
57pub struct SharedBuffer {
58 inner: Condition<Inner>,
59
60 sockets: Mutex<Slab<Socket>>,
63
64 on_inactive: OnInactive,
66
67 port: zx::Port,
69
70 event: zx::Event,
72
73 socket_thread: Mutex<Option<std::thread::JoinHandle<()>>>,
75
76 _buffer_monitor_task: fasync::Task<()>,
79
80 pub(crate) saturation_curve: Arc<SaturationCurve>,
82}
83
84struct InnerGuard<'a> {
85 buffer: &'a Arc<SharedBuffer>,
86
87 guard: ManuallyDrop<ConditionGuard<'a, Inner>>,
88
89 on_inactive: Vec<Arc<ComponentIdentity>>,
91
92 wake: bool,
94}
95
96impl Drop for InnerGuard<'_> {
97 fn drop(&mut self) {
98 if self.wake {
99 for waker in self.guard.drain_wakers() {
100 waker.wake();
101 }
102 }
103 unsafe {
105 ManuallyDrop::drop(&mut self.guard);
106 }
107 for identity in self.on_inactive.drain(..) {
108 (*self.buffer.on_inactive)(identity);
109 }
110 }
111}
112
113impl Deref for InnerGuard<'_> {
114 type Target = Inner;
115
116 fn deref(&self) -> &Self::Target {
117 &self.guard
118 }
119}
120
121impl DerefMut for InnerGuard<'_> {
122 fn deref_mut(&mut self) -> &mut Self::Target {
123 &mut self.guard
124 }
125}
126
127struct Inner {
128 ring_buffer: Arc<RingBuffer>,
130
131 containers: Containers,
133
134 thread_msg_queue: VecDeque<ThreadMessage>,
136
137 last_scanned: u64,
139
140 last_scanned_message_id: u64,
142
143 tail: u64,
145
146 tail_message_id: u64,
148
149 iob_peers: Slab<(ContainerId, zx::Iob)>,
151
152 terminated: bool,
154}
155
156enum ThreadMessage {
157 Terminate,
159
160 Flush(oneshot::Sender<()>),
162}
163
164pub struct SharedBufferOptions {
165 pub sleep_time: Duration,
168 pub saturation_curve_size: usize,
170}
171
172impl Default for SharedBufferOptions {
173 fn default() -> Self {
174 Self { sleep_time: DEFAULT_SLEEP_TIME, saturation_curve_size: 254 }
175 }
176}
177
178impl SharedBuffer {
179 pub fn new(
181 ring_buffer: ring_buffer::Reader,
182 on_inactive: OnInactive,
183 options: SharedBufferOptions,
184 inspect_node: &Node,
185 ) -> Arc<Self> {
186 let saturation_curve =
187 Arc::new(SaturationCurve::new(inspect_node, options.saturation_curve_size));
188 let this = Arc::new_cyclic(|weak: &Weak<Self>| Self {
189 inner: Condition::new(Inner {
190 ring_buffer: Arc::clone(&ring_buffer),
191 containers: Containers::default(),
192 thread_msg_queue: VecDeque::default(),
193 last_scanned: 0,
194 last_scanned_message_id: 0,
195 tail: 0,
196 tail_message_id: 0,
197 iob_peers: Slab::default(),
198 terminated: false,
199 }),
200 sockets: Mutex::new(Slab::default()),
201 on_inactive,
202 port: zx::Port::create(),
203 event: zx::Event::create(),
204 socket_thread: Mutex::default(),
205 saturation_curve,
206 _buffer_monitor_task: fasync::Task::spawn(Self::buffer_monitor_task(
207 Weak::clone(weak),
208 ring_buffer,
209 options.sleep_time,
210 )),
211 });
212
213 *this.socket_thread.lock() = Some({
214 let this = Arc::clone(&this);
215 std::thread::spawn(move || this.socket_thread(options.sleep_time))
216 });
217 this
218 }
219
220 pub fn new_container_buffer(
221 self: &Arc<Self>,
222 identity: Arc<ComponentIdentity>,
223 stats: Arc<LogStreamStats>,
224 ) -> ContainerBuffer {
225 let mut inner = self.inner.lock();
226 let Inner { containers, ring_buffer, .. } = &mut *inner;
227 ContainerBuffer {
228 shared_buffer: Arc::clone(self),
229 container_id: containers.new_container(ring_buffer, Arc::clone(&identity), stats),
230 }
231 }
232
233 pub async fn flush(&self) {
234 let (sender, receiver) = oneshot::channel();
235 self.inner.lock().thread_msg_queue.push_back(ThreadMessage::Flush(sender));
236 self.event.signal(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
237 let _ = receiver.await;
239 }
240
241 #[cfg(test)]
243 pub fn container_count(&self) -> usize {
244 self.inner.lock().containers.len()
245 }
246
247 pub fn terminate(self: &Arc<Self>) -> impl Future<Output = ()> {
251 {
252 let mut inner = InnerGuard::new(self);
253 self.flush_sockets(&mut inner);
254 inner.thread_msg_queue.push_back(ThreadMessage::Terminate);
255 }
256 self.event.signal(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
257 let join_handle = self.socket_thread.lock().take().unwrap();
258 fasync::unblock(|| {
259 let _ = join_handle.join();
260 })
261 }
262
263 pub fn cursor(
267 self: &Arc<Self>,
268 mode: StreamMode,
269 selectors: Vec<ComponentSelector>,
270 ) -> FilterCursor {
271 InnerGuard::new(self).cursor(mode, selectors)
272 }
273
274 fn socket_thread(self: Arc<Self>, sleep_time: Duration) {
275 const INTERRUPT_KEY: u64 = u64::MAX;
276 let mut sockets_ready = Vec::new();
277 let mut iob_peer_closed = Vec::new();
278 let mut interrupt_needs_arming = true;
279 let mut msg = None;
280
281 loop {
282 let mut deadline = if msg.is_some() {
283 zx::MonotonicInstant::INFINITE_PAST
284 } else {
285 if interrupt_needs_arming {
286 self.event
287 .wait_async(
288 &self.port,
289 INTERRUPT_KEY,
290 zx::Signals::USER_0,
291 zx::WaitAsyncOpts::empty(),
292 )
293 .unwrap();
294 interrupt_needs_arming = false;
295 }
296
297 let _ = self
300 .event
301 .wait_one(zx::Signals::USER_0, zx::MonotonicInstant::after(sleep_time.into()));
302 zx::MonotonicInstant::INFINITE
303 };
304
305 loop {
307 match self.port.wait(deadline) {
308 Ok(packet) => {
309 if packet.key() == INTERRUPT_KEY {
310 interrupt_needs_arming = true;
311 if msg.is_none() {
316 msg = self.inner.lock().thread_msg_queue.pop_front();
317 }
318 } else if packet.key() & IOB_PEER_CLOSED_KEY_BASE != 0 {
319 iob_peer_closed.push(packet.key() as u32);
320 } else {
321 sockets_ready.push(SocketId(packet.key() as u32))
322 }
323 }
324 Err(zx::Status::TIMED_OUT) => break,
325 Err(status) => panic!("port wait error: {status:?}"),
326 }
327 deadline = zx::MonotonicInstant::INFINITE_PAST;
328 }
329
330 let mut inner = InnerGuard::new(&self);
331
332 if !iob_peer_closed.is_empty() {
333 inner.update_message_ids(inner.ring_buffer.head());
335
336 for iob_peer_closed in iob_peer_closed.drain(..) {
337 let container_id = inner.iob_peers.free(iob_peer_closed).0;
338 if let Some(container) = inner.containers.get_mut(container_id) {
339 container.iob_count -= 1;
340 if container.iob_count == 0 && !container.is_active() {
341 if container.should_free() {
342 inner.containers.free(container_id);
343 } else {
344 let identity = Arc::clone(&container.identity);
345 inner.on_inactive.push(identity);
346 }
347 }
348 }
349 }
350 }
351
352 {
353 let mut sockets = self.sockets.lock();
354 for socket_id in sockets_ready.drain(..) {
355 inner.read_socket(&mut sockets, socket_id, |socket| {
356 socket
357 .socket
358 .wait_async(
359 &self.port,
360 socket_id.0 as u64,
361 zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
362 zx::WaitAsyncOpts::empty(),
363 )
364 .unwrap();
365 });
366 }
367 }
368
369 if let Some(m) = msg.take() {
371 match m {
372 ThreadMessage::Terminate => {
373 inner.terminated = true;
374 inner.wake = true;
375 return;
376 }
377 ThreadMessage::Flush(sender) => {
378 let _ = sender.send(());
379 }
380 }
381
382 msg = inner.thread_msg_queue.pop_front();
384 if msg.is_none() {
385 self.event.signal(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
389 }
390 }
391 }
392 }
393
394 async fn buffer_monitor_task(
395 this: Weak<Self>,
396 mut ring_buffer: ring_buffer::Reader,
397 sleep_time: Duration,
398 ) {
399 let mut last_head = 0;
400 loop {
401 fasync::Timer::new(sleep_time).await;
403 let head = ring_buffer.wait(last_head).await;
404 let Some(this) = this.upgrade() else { return };
405 let mut inner = InnerGuard::new(&this);
406 inner.check_space(head);
407 last_head = head;
408 }
409 }
410
411 fn flush_sockets(&self, inner: &mut InnerGuard<'_>) -> u64 {
413 let mut sockets = self.sockets.lock();
414 let mut socket_id = 0;
415 while let Some(next_id) = sockets.next_used(socket_id) {
416 inner.read_socket(&mut sockets, SocketId(next_id), |_| {});
418 socket_id = next_id + 1;
419 }
420 let head = inner.ring_buffer.head();
421 inner.update_message_ids(head);
422 head
423 }
424}
425
426impl Inner {
427 fn ingest(&mut self, msg: &[u8], container_id: ContainerId) {
429 if msg.len() < std::mem::size_of::<Header>() {
430 debug!("message too short ({})", msg.len());
431 if let Some(container) = self.containers.get(container_id) {
432 container.stats.increment_invalid(msg.len());
433 }
434 return;
435 }
436
437 let header = Header::read_from_bytes(&msg[..std::mem::size_of::<Header>()]).unwrap();
438
439 let msg_len = header.size_words() as usize * 8;
442
443 if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg.len() < msg_len {
445 debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, msg.len());
446 if let Some(container) = self.containers.get(container_id) {
447 container.stats.increment_invalid(msg.len());
448 }
449 return;
450 }
451
452 let Some(container) = self.containers.get_mut(container_id) else {
453 return;
454 };
455
456 let mut data;
457 let msg = if container.dropped_count > 0 {
458 data = msg.to_vec();
459 if !add_dropped_count(&mut data, container.dropped_count) {
460 debug!("unable to add dropped count to invalid message");
461 container.stats.increment_invalid(data.len());
462 return;
463 }
464 &data
465 } else {
466 msg
467 };
468
469 if container.iob.write(Default::default(), 0, msg).is_err() {
470 container.dropped_count += 1
474 } else {
475 container.dropped_count = 0;
476 }
477 }
478
479 unsafe fn parse_message(
491 &self,
492 range: Range<u64>,
493 ) -> (ContainerId, &[u8], Option<zx::BootInstant>) {
494 let (tag, msg) = unsafe { self.ring_buffer.first_message_in(range) }
495 .expect("Unable to read message from ring buffer");
496 (
497 ContainerId(tag as u32),
498 msg,
499 (msg.len() >= 16)
500 .then(|| zx::BootInstant::from_nanos(i64::read_from_bytes(&msg[8..16]).unwrap())),
501 )
502 }
503}
504
505impl<'a> InnerGuard<'a> {
506 fn new(buffer: &'a Arc<SharedBuffer>) -> Self {
507 Self {
508 buffer,
509 guard: ManuallyDrop::new(buffer.inner.lock()),
510 on_inactive: Vec::new(),
511 wake: false,
512 }
513 }
514
515 fn pop(&mut self, head: u64) -> Option<usize> {
523 if head == self.tail {
524 return None;
525 }
526
527 let record_len = {
530 let (container_id, message, _) = unsafe { self.parse_message(self.tail..head) };
531 let record_len = ring_buffer_record_len(message.len());
532
533 let container = self.containers.get_mut(container_id).unwrap();
534
535 container.stats.increment_rolled_out(record_len);
536 container.msg_ids.start += 1;
537 if !container.is_active() {
538 if container.should_free() {
539 self.containers.free(container_id);
540 } else {
541 let identity = Arc::clone(&container.identity);
542 self.on_inactive.push(identity);
543 }
544 }
545
546 record_len
547 };
548
549 self.ring_buffer.increment_tail(record_len);
551 self.tail += record_len as u64;
552 self.tail_message_id += 1;
553
554 assert!(self.last_scanned >= self.tail);
556
557 Some(record_len)
558 }
559
560 fn read_socket(
562 &mut self,
563 sockets: &mut Slab<Socket>,
564 socket_id: SocketId,
565 rearm: impl FnOnce(&mut Socket),
566 ) {
567 let Some(socket) = sockets.get_mut(socket_id.0) else { return };
568 let container_id = socket.container_id;
569
570 loop {
571 self.check_space(self.ring_buffer.head());
572
573 let mut data = Vec::with_capacity(MAX_DATAGRAM_LEN_BYTES as usize);
574
575 let len = match socket.socket.read_uninit(data.spare_capacity_mut()) {
577 Ok(d) => d.len(),
578 Err(zx::Status::SHOULD_WAIT) => {
579 rearm(socket);
581 return;
582 }
583 Err(_) => break,
584 };
585
586 unsafe {
588 data.set_len(len);
589 }
590
591 let container = self.containers.get_mut(container_id).unwrap();
592 if data.len() < 16 {
593 container.stats.increment_invalid(data.len());
594 continue;
595 }
596
597 let header = Header::read_from_bytes(&data[..std::mem::size_of::<Header>()]).unwrap();
598 let msg_len = header.size_words() as usize * 8;
599 if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg_len != data.len() {
600 debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, data.len());
601 container.stats.increment_invalid(data.len());
602 continue;
603 }
604
605 if container.dropped_count > 0 && !add_dropped_count(&mut data, container.dropped_count)
606 {
607 debug!("unable to add dropped count to invalid message");
608 container.stats.increment_invalid(data.len());
609 continue;
610 }
611
612 if container.iob.write(Default::default(), 0, &data).is_err() {
613 container.dropped_count += 1
617 } else {
618 container.dropped_count = 0;
619 }
620 }
621
622 self.update_message_ids(self.ring_buffer.head());
626
627 let container = self.containers.get_mut(container_id).unwrap();
628 container.remove_socket(socket_id, sockets);
629 if !container.is_active() {
630 if container.should_free() {
631 self.containers.free(container_id);
632 } else {
633 let identity = Arc::clone(&container.identity);
634 self.on_inactive.push(identity);
635 }
636 }
637 }
638
639 fn update_message_ids(&mut self, head: u64) {
641 while self.last_scanned < head {
642 let (container_id, message, _) = unsafe { self.parse_message(self.last_scanned..head) };
645 let msg_len = message.len();
646 let severity = (msg_len >= 8)
647 .then(|| Header::read_from_bytes(&message[0..8]).unwrap().severity().into());
648 let container = self.containers.get_mut(container_id).unwrap();
649 container.msg_ids.end += 1;
650 if let Some(severity) = severity {
651 container.stats.ingest_message(msg_len, severity);
652 }
653 self.last_scanned += ring_buffer_record_len(msg_len) as u64;
654 self.last_scanned_message_id += 1;
655 self.wake = true;
656 }
657 if self.wake {
658 let boot_time = zx::BootInstant::get().into_nanos();
659 let count = self.last_scanned_message_id - self.tail_message_id;
660 self.buffer.saturation_curve.record(boot_time, count);
661 }
662 }
663
664 fn check_space(&mut self, head: u64) {
666 self.update_message_ids(head);
667 let capacity = self.ring_buffer.capacity();
668 let mut space = capacity
669 .checked_sub((head - self.tail) as usize)
670 .unwrap_or_else(|| panic!("bad range: {:?}", self.tail..head));
671 let required_space = capacity * SPACE_THRESHOLD_NUMERATOR / SPACE_THRESHOLD_DENOMINATOR;
672 while space < required_space {
673 let Some(amount) = self.pop(head) else { break };
674 space += amount;
675 }
676 }
677
678 fn cursor(&mut self, mode: StreamMode, selectors: Vec<ComponentSelector>) -> FilterCursor {
680 let (index, message_id) = match mode {
684 StreamMode::Snapshot => (self.tail, self.tail_message_id),
685 StreamMode::Subscribe => {
686 let head = self.ring_buffer.head();
687 self.update_message_ids(head);
688 (self.last_scanned, self.last_scanned_message_id)
689 }
690 StreamMode::SnapshotThenSubscribe => (self.tail, self.tail_message_id),
691 };
692 FilterCursor::new(
693 Arc::clone(self.buffer),
694 index,
695 message_id,
696 matches!(mode, StreamMode::Snapshot),
697 selectors,
698 )
699 }
700}
701
702#[derive(Default)]
703struct Containers {
704 slab: Slab<ContainerInfo>,
705}
706
707#[derive(Clone, Copy, Debug, Eq, PartialEq)]
708struct ContainerId(u32);
709
710impl Containers {
711 #[cfg(test)]
712 fn len(&self) -> usize {
713 self.slab.len()
714 }
715
716 fn get(&self, id: ContainerId) -> Option<&ContainerInfo> {
717 self.slab.get(id.0)
718 }
719
720 fn get_mut(&mut self, id: ContainerId) -> Option<&mut ContainerInfo> {
721 self.slab.get_mut(id.0)
722 }
723
724 fn new_container(
725 &mut self,
726 buffer: &RingBuffer,
727 identity: Arc<ComponentIdentity>,
728 stats: Arc<LogStreamStats>,
729 ) -> ContainerId {
730 ContainerId(self.slab.insert(|id| {
731 let (iob, _) = buffer.new_iob_writer(id as u64).unwrap();
732 ContainerInfo::new(identity, stats, iob)
733 }))
734 }
735
736 fn free(&mut self, id: ContainerId) {
737 self.slab.free(id.0);
738 }
739}
740
741#[derive(Derivative)]
742#[derivative(Debug)]
743struct ContainerInfo {
744 identity: Arc<ComponentIdentity>,
746
747 msg_ids: Range<u64>,
750
751 terminated: bool,
753
754 #[derivative(Debug = "ignore")]
756 stats: Arc<LogStreamStats>,
757
758 first_socket_id: SocketId,
760
761 iob: zx::Iob,
763
764 iob_count: usize,
766
767 dropped_count: u64,
769}
770
771impl ContainerInfo {
772 fn new(identity: Arc<ComponentIdentity>, stats: Arc<LogStreamStats>, iob: zx::Iob) -> Self {
773 Self {
774 identity,
775 msg_ids: 0..0,
776 terminated: false,
777 stats,
778 first_socket_id: SocketId::NULL,
779 iob,
780 iob_count: 0,
781 dropped_count: 0,
782 }
783 }
784
785 fn should_free(&self) -> bool {
786 self.terminated && !self.is_active()
787 }
788
789 fn is_active(&self) -> bool {
795 self.first_socket_id != SocketId::NULL
796 || self.iob_count > 0
797 || self.msg_ids.end != self.msg_ids.start
798 || ARCHIVIST_MONIKER.get().is_some_and(|m| *self.identity == *m)
799 }
800
801 fn remove_socket(&mut self, socket_id: SocketId, sockets: &mut Slab<Socket>) {
805 let Socket { prev, next, .. } = *sockets.get(socket_id.0).unwrap();
806 if prev == SocketId::NULL {
807 self.first_socket_id = next;
808 } else {
809 sockets.get_mut(prev.0).unwrap().next = next;
810 }
811 if next != SocketId::NULL {
812 sockets
813 .get_mut(next.0)
814 .unwrap_or_else(|| panic!("next {next:?} has been freed!"))
815 .prev = prev;
816 }
817 sockets.free(socket_id.0);
818 self.stats.close_socket();
819 debug!(identity:% = self.identity; "Socket closed.");
820 }
821}
822
823pub struct ContainerBuffer {
824 shared_buffer: Arc<SharedBuffer>,
825 container_id: ContainerId,
826}
827
828impl ContainerBuffer {
829 pub fn iob_tag(&self) -> u64 {
831 self.container_id.0 as u64
832 }
833
834 pub fn push_back(&self, msg: &[u8]) {
838 self.shared_buffer.inner.lock().ingest(msg, self.container_id);
839 }
840
841 pub fn iob(&self) -> zx::Iob {
843 let mut inner = self.shared_buffer.inner.lock();
844
845 inner.containers.get_mut(self.container_id).unwrap().iob_count += 1;
846
847 let (ep0, ep1) = inner.ring_buffer.new_iob_writer(self.container_id.0 as u64).unwrap();
848
849 inner.iob_peers.insert(|idx| {
850 ep1.wait_async(
851 &self.shared_buffer.port,
852 idx as u64 | IOB_PEER_CLOSED_KEY_BASE,
853 zx::Signals::IOB_PEER_CLOSED,
854 zx::WaitAsyncOpts::empty(),
855 )
856 .unwrap();
857
858 (self.container_id, ep1)
859 });
860
861 ep0
862 }
863
864 #[cfg(test)]
866 fn cursor(&self, mode: StreamMode) -> Option<FilterCursorStream<FxtMessage>> {
867 use selectors::SelectorExt;
868
869 let mut inner = InnerGuard::new(&self.shared_buffer);
873 let Some(container) = inner.containers.get_mut(self.container_id) else {
874 return None;
876 };
877 let selectors = vec![container.identity.moniker.clone().into_component_selector()];
878 Some(inner.cursor(mode, selectors).into())
879 }
880
881 pub fn terminate(&self) {
885 let mut inner = InnerGuard::new(&self.shared_buffer);
886
887 inner.update_message_ids(inner.ring_buffer.head());
889
890 if let Some(container) = inner.containers.get_mut(self.container_id) {
891 container.terminated = true;
892 if container.first_socket_id != SocketId::NULL {
893 let mut sockets = self.shared_buffer.sockets.lock();
894 loop {
895 container.remove_socket(container.first_socket_id, &mut sockets);
896 if container.first_socket_id == SocketId::NULL {
897 break;
898 }
899 }
900 }
901 if container.should_free() {
902 inner.containers.free(self.container_id);
903 }
904 inner.wake = true;
905 }
906 }
907
908 pub fn is_active(&self) -> bool {
910 self.shared_buffer
911 .inner
912 .lock()
913 .containers
914 .get(self.container_id)
915 .is_some_and(|c| c.is_active())
916 }
917
918 pub fn add_socket(&self, socket: zx::Socket) {
920 let mut inner = self.shared_buffer.inner.lock();
921 let Some(container) = inner.containers.get_mut(self.container_id) else { return };
922 container.stats.open_socket();
923 let next = container.first_socket_id;
924 let mut sockets = self.shared_buffer.sockets.lock();
925 let socket_id = SocketId(sockets.insert(|socket_id| {
926 socket
927 .wait_async(
928 &self.shared_buffer.port,
929 socket_id as u64,
930 zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
931 zx::WaitAsyncOpts::empty(),
932 )
933 .unwrap();
934 Socket { socket, container_id: self.container_id, prev: SocketId::NULL, next }
935 }));
936 if next != SocketId::NULL {
937 sockets.get_mut(next.0).unwrap().prev = socket_id;
938 }
939 container.first_socket_id = socket_id;
940 }
941}
942
943impl Drop for ContainerBuffer {
944 fn drop(&mut self) {
945 self.terminate();
946 }
947}
948
949struct Slab<T> {
951 slab: Vec<Slot<T>>,
952 free_index: usize,
953}
954
955impl<T> Default for Slab<T> {
956 fn default() -> Self {
957 Self { slab: Vec::new(), free_index: usize::MAX }
958 }
959}
960
961enum Slot<T> {
962 Used(T),
963 Free(usize),
964}
965
966impl<T> Slab<T> {
967 #[cfg(test)]
969 fn len(&self) -> usize {
970 self.slab.iter().filter(|c| matches!(c, Slot::Used(_))).count()
971 }
972
973 fn free(&mut self, index: u32) -> T {
974 let index = index as usize;
975 let value = match std::mem::replace(&mut self.slab[index], Slot::Free(self.free_index)) {
976 Slot::Free(_) => panic!("Slot already free"),
977 Slot::Used(value) => value,
978 };
979 self.free_index = index;
980 value
981 }
982
983 fn get(&self, id: u32) -> Option<&T> {
984 self.slab.get(id as usize).and_then(|s| match s {
985 Slot::Used(s) => Some(s),
986 _ => None,
987 })
988 }
989
990 fn get_mut(&mut self, id: u32) -> Option<&mut T> {
991 self.slab.get_mut(id as usize).and_then(|s| match s {
992 Slot::Used(s) => Some(s),
993 _ => None,
994 })
995 }
996
997 fn insert(&mut self, value: impl FnOnce(u32) -> T) -> u32 {
998 let free_index = self.free_index;
999 if free_index != usize::MAX {
1000 self.free_index = match std::mem::replace(
1001 &mut self.slab[free_index],
1002 Slot::Used(value(free_index as u32)),
1003 ) {
1004 Slot::Free(next) => next,
1005 _ => unreachable!(),
1006 };
1007 free_index as u32
1008 } else {
1009 assert!(self.slab.len() < u32::MAX as usize);
1012 self.slab.push(Slot::Used(value(self.slab.len() as u32)));
1013 (self.slab.len() - 1) as u32
1014 }
1015 }
1016
1017 fn next_used(&self, id: u32) -> Option<u32> {
1019 let mut id = id as usize;
1020 while id < self.slab.len() {
1021 if matches!(self.slab[id], Slot::Used(_)) {
1022 return Some(id as u32);
1023 }
1024 id += 1;
1025 }
1026 None
1027 }
1028}
1029
1030#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1031struct SocketId(u32);
1032
1033impl SocketId {
1034 const NULL: Self = SocketId(0xffff_ffff);
1035}
1036
1037struct Socket {
1038 socket: zx::Socket,
1039 container_id: ContainerId,
1040 prev: SocketId,
1042 next: SocketId,
1043}
1044
1045#[cfg(test)]
1046mod tests {
1047 use super::{SharedBuffer, SharedBufferOptions, Slab, create_ring_buffer};
1048 use crate::logs::stats::LogStreamStats;
1049 use crate::logs::testing::make_message;
1050 use assert_matches::assert_matches;
1051 use diagnostics_assertions::{AnyProperty, assert_data_tree};
1052 use fidl_fuchsia_diagnostics::StreamMode;
1053 use fuchsia_async as fasync;
1054 use fuchsia_async::TimeoutExt;
1055 use fuchsia_inspect::Inspector;
1056 use fuchsia_inspect_derive::WithInspect;
1057 use futures::FutureExt;
1058 use futures::channel::mpsc;
1059 use futures::future::OptionFuture;
1060 use futures::stream::{FuturesUnordered, StreamExt as _};
1061 use ring_buffer::MAX_MESSAGE_SIZE;
1062 use std::future::poll_fn;
1063 use std::iter::repeat_with;
1064 use std::pin::pin;
1065 use std::sync::Arc;
1066 use std::sync::atomic::{AtomicU64, Ordering};
1067 use std::task::Poll;
1068 use std::time::Duration;
1069
1070 async fn yield_to_executor() {
1071 let mut first_time = true;
1072 poll_fn(|cx| {
1073 if first_time {
1074 cx.waker().wake_by_ref();
1075 first_time = false;
1076 Poll::Pending
1077 } else {
1078 Poll::Ready(())
1079 }
1080 })
1081 .await;
1082 }
1083
1084 #[fuchsia::test]
1085 async fn push_one_message() {
1086 let buffer = SharedBuffer::new(
1087 create_ring_buffer(MAX_MESSAGE_SIZE),
1088 Box::new(|_| {}),
1089 Default::default(),
1090 &fuchsia_inspect::Node::default(),
1091 );
1092 let container_buffer =
1093 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1094 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1095 container_buffer.push_back(msg.bytes());
1096
1097 let cursor = container_buffer.cursor(StreamMode::Snapshot).unwrap();
1099 assert_eq!(cursor.map(|item| assert_eq!(item.data(), msg.bytes())).count().await, 1);
1100 }
1101
1102 #[fuchsia::test]
1103 async fn message_too_short() {
1104 let buffer = SharedBuffer::new(
1105 create_ring_buffer(MAX_MESSAGE_SIZE),
1106 Box::new(|_| {}),
1107 Default::default(),
1108 &fuchsia_inspect::Node::default(),
1109 );
1110
1111 let container_buffer =
1112 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1113 container_buffer.push_back(&[0]);
1114
1115 assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1116 }
1117
1118 #[fuchsia::test]
1119 async fn bad_type() {
1120 let buffer = SharedBuffer::new(
1121 create_ring_buffer(MAX_MESSAGE_SIZE),
1122 Box::new(|_| {}),
1123 Default::default(),
1124 &fuchsia_inspect::Node::default(),
1125 );
1126 let container_buffer =
1127 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1128 container_buffer.push_back(&[0x77; 16]);
1129
1130 assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1131 }
1132
1133 #[fuchsia::test]
1134 async fn message_truncated() {
1135 let buffer = SharedBuffer::new(
1136 create_ring_buffer(MAX_MESSAGE_SIZE),
1137 Box::new(|_| {}),
1138 Default::default(),
1139 &fuchsia_inspect::Node::default(),
1140 );
1141 let container_buffer =
1142 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1143 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1144 container_buffer.push_back(&msg.bytes()[..msg.bytes().len() - 1]);
1145
1146 assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1147 }
1148
1149 #[fuchsia::test]
1150 async fn buffer_wrapping() {
1151 let buffer = SharedBuffer::new(
1152 create_ring_buffer(MAX_MESSAGE_SIZE),
1153 Box::new(|_| {}),
1154 SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
1155 &fuchsia_inspect::Node::default(),
1156 );
1157 let container_buffer =
1158 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1159
1160 let mut i = 0;
1162 loop {
1163 let msg = make_message(&format!("{i}"), None, zx::BootInstant::from_nanos(i));
1164 container_buffer.push_back(msg.bytes());
1165 i += 1;
1166
1167 yield_to_executor().await;
1169
1170 let inner = buffer.inner.lock();
1171 if inner.ring_buffer.head() > inner.ring_buffer.capacity() as u64 {
1172 break;
1173 }
1174 }
1175
1176 let mut cursor = pin!(container_buffer.cursor(StreamMode::Snapshot).unwrap());
1178
1179 let mut j;
1180 let item = cursor.next().await;
1181 assert_matches!(
1182 item,
1183 Some(item) => {
1184 j = item.timestamp().into_nanos();
1185 let msg = make_message(&format!("{j}"),
1186 None,
1187 item.timestamp());
1188 assert_eq!(item.data(), msg.bytes());
1189 }
1190 );
1191
1192 j += 1;
1193 while j != i {
1194 assert_matches!(
1195 cursor.next().await,
1196 Some(item) => {
1197 assert_eq!(item.data(), make_message(&format!("{j}"),
1198 None,
1199 item.timestamp()).bytes());
1200 }
1201 );
1202 j += 1;
1203 }
1204
1205 assert!(cursor.next().await.is_none());
1206 }
1207
1208 #[fuchsia::test]
1209 async fn on_inactive() {
1210 let identity = Arc::new(vec!["a"].into());
1211 let on_inactive = Arc::new(AtomicU64::new(0));
1212 let buffer = {
1213 let on_inactive = Arc::clone(&on_inactive);
1214 let identity = Arc::clone(&identity);
1215 Arc::new(SharedBuffer::new(
1216 create_ring_buffer(MAX_MESSAGE_SIZE),
1217 Box::new(move |i| {
1218 assert_eq!(i, identity);
1219 on_inactive.fetch_add(1, Ordering::Relaxed);
1220 }),
1221 SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
1222 &fuchsia_inspect::Node::default(),
1223 ))
1224 };
1225 let container_a = buffer.new_container_buffer(identity, Arc::default());
1226 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1227
1228 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1229 container_a.push_back(msg.bytes());
1230
1231 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1233 container_b.push_back(msg.bytes());
1234
1235 yield_to_executor().await;
1237 }
1238
1239 assert_eq!(on_inactive.load(Ordering::Relaxed), 1);
1240 }
1241
1242 #[fuchsia::test]
1243 async fn terminate_drops_container() {
1244 async {}.await;
1246
1247 let buffer = SharedBuffer::new(
1248 create_ring_buffer(MAX_MESSAGE_SIZE),
1249 Box::new(|_| {}),
1250 SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
1251 &fuchsia_inspect::Node::default(),
1252 );
1253
1254 let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1256 assert_eq!(buffer.container_count(), 1);
1257 container_a.terminate();
1258
1259 assert_eq!(buffer.container_count(), 0);
1260
1261 let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1263 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1264 container_a.push_back(msg.bytes());
1265 assert_eq!(buffer.container_count(), 1);
1266 container_a.terminate();
1267
1268 assert_eq!(buffer.container_count(), 1);
1270
1271 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1273 assert_eq!(buffer.container_count(), 2);
1274
1275 while buffer.container_count() != 1 {
1278 container_b.push_back(msg.bytes());
1279
1280 yield_to_executor().await;
1282 }
1283
1284 assert!(container_a.cursor(StreamMode::Subscribe).is_none());
1285 }
1286
1287 #[fuchsia::test]
1288 async fn cursor_subscribe() {
1289 for mode in [StreamMode::Subscribe, StreamMode::SnapshotThenSubscribe] {
1290 let buffer = SharedBuffer::new(
1291 create_ring_buffer(MAX_MESSAGE_SIZE),
1292 Box::new(|_| {}),
1293 Default::default(),
1294 &fuchsia_inspect::Node::default(),
1295 );
1296 let container =
1297 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1298 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1299 container.push_back(msg.bytes());
1300
1301 let (sender, mut receiver) = mpsc::unbounded();
1302
1303 {
1305 let container = Arc::clone(&container);
1306 fasync::Task::spawn(async move {
1307 let mut cursor = pin!(container.cursor(mode).unwrap());
1308 while let Some(item) = cursor.next().await {
1309 sender.unbounded_send(item).unwrap();
1310 }
1311 })
1312 .detach();
1313 }
1314
1315 if mode == StreamMode::SnapshotThenSubscribe {
1317 assert_matches!(
1318 receiver.next().await,
1319 Some(item) if item.data() == msg.bytes()
1320 );
1321 }
1322
1323 assert!(
1325 OptionFuture::from(Some(receiver.next()))
1326 .on_timeout(Duration::from_millis(500), || None)
1327 .await
1328 .is_none()
1329 );
1330
1331 container.push_back(msg.bytes());
1332
1333 assert_matches!(
1335 receiver.next().await,
1336 Some(item) if item.data() == msg.bytes()
1337 );
1338 }
1339 }
1340
1341 #[fuchsia::test]
1342 async fn drained_post_termination_cursors() {
1343 let buffer = SharedBuffer::new(
1344 create_ring_buffer(MAX_MESSAGE_SIZE),
1345 Box::new(|_| {}),
1346 Default::default(),
1347 &fuchsia_inspect::Node::default(),
1348 );
1349 let container =
1350 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1351 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1352
1353 let mut cursor_a = pin!(container.cursor(StreamMode::Subscribe).unwrap());
1354 let mut cursor_b = pin!(container.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1355
1356 container.push_back(msg.bytes());
1357 container.push_back(msg.bytes());
1358 container.push_back(msg.bytes());
1359 container.push_back(msg.bytes());
1360 container.push_back(msg.bytes());
1361
1362 let mut cursor_c = pin!(container.cursor(StreamMode::Snapshot).unwrap());
1363 assert!(cursor_a.next().await.is_some());
1364 assert!(cursor_b.next().await.is_some());
1365 assert!(cursor_c.next().await.is_some());
1366
1367 drop(buffer.terminate());
1368
1369 assert_eq!(cursor_a.count().await, 4);
1371 assert_eq!(cursor_b.count().await, 4);
1372 assert_eq!(cursor_c.count().await, 4);
1373 }
1374
1375 #[fuchsia::test]
1376 async fn empty_post_termination_cursors() {
1377 let buffer = SharedBuffer::new(
1378 create_ring_buffer(MAX_MESSAGE_SIZE),
1379 Box::new(|_| {}),
1380 Default::default(),
1381 &fuchsia_inspect::Node::default(),
1382 );
1383 let container =
1384 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1385
1386 let cursor_a = container.cursor(StreamMode::Subscribe).unwrap();
1387 let cursor_b = container.cursor(StreamMode::SnapshotThenSubscribe).unwrap();
1388 let cursor_c = container.cursor(StreamMode::Snapshot).unwrap();
1389
1390 drop(buffer.terminate());
1391
1392 assert_eq!(cursor_a.count().await, 0);
1393 assert_eq!(cursor_b.count().await, 0);
1394 assert_eq!(cursor_c.count().await, 0);
1395 }
1396
1397 #[fuchsia::test]
1398 async fn recycled_container_slot() {
1399 let buffer = Arc::new(SharedBuffer::new(
1400 create_ring_buffer(MAX_MESSAGE_SIZE),
1401 Box::new(|_| {}),
1402 SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
1403 &fuchsia_inspect::Node::default(),
1404 ));
1405 let container_a =
1406 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1407 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1408 container_a.push_back(msg.bytes());
1409
1410 let mut cursor = pin!(container_a.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1411 assert_matches!(cursor.next().await, Some(_));
1412
1413 let container_b =
1415 Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1416 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await > 0 {
1417 container_b.push_back(msg.bytes());
1418
1419 yield_to_executor().await;
1421 }
1422
1423 container_a.terminate();
1424
1425 let container_c =
1428 Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), Arc::default()));
1429 container_c.push_back(msg.bytes());
1430 container_c.push_back(msg.bytes());
1431 }
1432
1433 #[fuchsia::test]
1434 async fn socket_increments_logstats() {
1435 let inspector = Inspector::default();
1436 let stats: Arc<LogStreamStats> =
1437 Arc::new(LogStreamStats::default().with_inspect(inspector.root(), "test").unwrap());
1438 let buffer = Arc::new(SharedBuffer::new(
1439 create_ring_buffer(65536),
1440 Box::new(|_| {}),
1441 Default::default(),
1442 &fuchsia_inspect::Node::default(),
1443 ));
1444 let container_a = Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats));
1445 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1446
1447 let (local, remote) = zx::Socket::create_datagram();
1448 container_a.add_socket(remote);
1449
1450 let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1451
1452 let mut futures = FuturesUnordered::new();
1455 futures.push(async move {
1456 let mut cursor_a = pin!(cursor_a);
1457 cursor_a.next().await
1458 });
1459 let mut next = futures.next();
1460 assert!(futures::poll!(&mut next).is_pending());
1461
1462 local.write(msg.bytes()).unwrap();
1463
1464 let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1465
1466 assert_eq!(cursor_b.map(|item| assert_eq!(item.data(), msg.bytes())).count().await, 1);
1467
1468 next.await;
1470 assert_data_tree!(
1472 inspector,
1473 root: contains {
1474 test: {
1475 url: "",
1476 last_timestamp: AnyProperty,
1477 sockets_closed: 0u64,
1478 sockets_opened: 1u64,
1479 invalid: {
1480 number: 0u64,
1481 bytes: 0u64,
1482 },
1483 total: {
1484 number: 1u64,
1485 bytes: 88u64,
1486 },
1487 rolled_out: {
1488 number: 0u64,
1489 bytes: 0u64,
1490 },
1491 trace: {
1492 number: 0u64,
1493 bytes: 0u64,
1494 },
1495 debug: {
1496 number: 1u64,
1497 bytes: 88u64,
1498 },
1499 info: {
1500 number: 0u64,
1501 bytes: 0u64,
1502 },
1503 warn: {
1504 number: 0u64,
1505 bytes: 0u64,
1506 },
1507 error: {
1508 number: 0u64,
1509 bytes: 0u64,
1510 },
1511 fatal: {
1512 number: 0u64,
1513 bytes: 0u64,
1514 },
1515 }
1516 }
1517 );
1518 }
1519
1520 #[fuchsia::test]
1521 async fn socket() {
1522 let buffer = Arc::new(SharedBuffer::new(
1523 create_ring_buffer(MAX_MESSAGE_SIZE),
1524 Box::new(|_| {}),
1525 Default::default(),
1526 &fuchsia_inspect::Node::default(),
1527 ));
1528 let container_a =
1529 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1530 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1531
1532 let (local, remote) = zx::Socket::create_datagram();
1533 container_a.add_socket(remote);
1534
1535 let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1536
1537 let mut futures = FuturesUnordered::new();
1540 futures.push(async move {
1541 let mut cursor_a = pin!(cursor_a);
1542 cursor_a.next().await
1543 });
1544 let mut next = futures.next();
1545 assert!(futures::poll!(&mut next).is_pending());
1546
1547 local.write(msg.bytes()).unwrap();
1548
1549 let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1550
1551 assert_eq!(cursor_b.map(|item| assert_eq!(item.data(), msg.bytes())).count().await, 1);
1552
1553 next.await;
1555 }
1556
1557 #[fuchsia::test]
1558 async fn socket_on_inactive() {
1559 let on_inactive = Arc::new(AtomicU64::new(0));
1560 let a_identity = Arc::new(vec!["a"].into());
1561 let buffer = Arc::new(SharedBuffer::new(
1562 create_ring_buffer(MAX_MESSAGE_SIZE),
1563 {
1564 let on_inactive = Arc::clone(&on_inactive);
1565 let a_identity = Arc::clone(&a_identity);
1566 Box::new(move |id| {
1567 assert_eq!(id, a_identity);
1568 on_inactive.fetch_add(1, Ordering::Relaxed);
1569 })
1570 },
1571 SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
1572 &fuchsia_inspect::Node::default(),
1573 ));
1574 let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1575 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1576
1577 let (local, remote) = zx::Socket::create_datagram();
1578 container_a.add_socket(remote);
1579
1580 local.write(msg.bytes()).unwrap();
1581
1582 let cursor = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1583
1584 assert_eq!(cursor.map(|item| assert_eq!(item.data(), msg.bytes())).count().await, 1);
1585
1586 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1588 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1589 container_b.push_back(msg.bytes());
1590
1591 yield_to_executor().await;
1593 }
1594
1595 assert_eq!(on_inactive.load(Ordering::Relaxed), 0);
1596
1597 std::mem::drop(local);
1599
1600 while on_inactive.load(Ordering::Relaxed) != 1 {
1602 fasync::Timer::new(Duration::from_millis(50)).await;
1603 }
1604 }
1605
1606 #[fuchsia::test]
1607 async fn flush() {
1608 let a_identity = Arc::new(vec!["a"].into());
1609 let buffer = Arc::new(SharedBuffer::new(
1610 create_ring_buffer(1024 * 1024),
1611 Box::new(|_| {}),
1612 Default::default(),
1613 &fuchsia_inspect::Node::default(),
1614 ));
1615 let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1616 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1617
1618 let (local, remote) = zx::Socket::create_datagram();
1619 container_a.add_socket(remote);
1620
1621 let cursor = pin!(container_a.cursor(StreamMode::Subscribe).unwrap());
1622
1623 const COUNT: usize = 1000;
1624 for _ in 0..COUNT {
1625 local.write(msg.bytes()).unwrap();
1626 }
1627
1628 let mut flush_futures = FuturesUnordered::from_iter([buffer.flush(), buffer.flush()]);
1630 flush_futures.next().await;
1631
1632 let messages: Option<Vec<_>> = cursor.take(COUNT).collect().now_or_never();
1633 assert!(messages.is_some());
1634
1635 flush_futures.next().await;
1637
1638 buffer.terminate().await;
1640 }
1641
1642 #[fuchsia::test]
1643 fn test_slab_next_used() {
1644 let mut slab = Slab::default();
1645 let mut ids: Vec<_> = repeat_with(|| slab.insert(|_| ())).take(4).collect();
1646 ids.sort();
1647 slab.free(ids[0]);
1648 slab.free(ids[2]);
1649 assert_eq!(slab.next_used(0), Some(ids[1]));
1650 assert_eq!(slab.next_used(ids[1]), Some(ids[1]));
1651 assert_eq!(slab.next_used(ids[1] + 1), Some(ids[3]));
1652 assert_eq!(slab.next_used(ids[3] + 1), None);
1653 }
1654}