1mod cursor;
6
7pub use cursor::{FilterCursor, FilterCursorStream};
8
9use crate::identity::ComponentIdentity;
10use crate::logs::repository::ARCHIVIST_MONIKER;
11use crate::logs::stats::{LogStreamStats, SaturationCurve};
12use derivative::Derivative;
13use diagnostics_log_encoding::encode::add_dropped_count;
14use diagnostics_log_encoding::{Header, TRACING_FORMAT_LOG_RECORD_TYPE};
15use fidl_fuchsia_diagnostics::{ComponentSelector, StreamMode};
16use fidl_fuchsia_logger::MAX_DATAGRAM_LEN_BYTES;
17use fuchsia_async as fasync;
18use fuchsia_async::condition::{Condition, ConditionGuard};
19use fuchsia_inspect::Node;
20use fuchsia_sync::Mutex;
21use futures::channel::oneshot;
22use log::debug;
23use ring_buffer::{self, RingBuffer, ring_buffer_record_len};
24use std::collections::VecDeque;
25use std::mem::ManuallyDrop;
26use std::ops::{Deref, DerefMut, Range};
27use std::sync::{Arc, Weak};
28use std::time::Duration;
29use zerocopy::FromBytes;
30
31const SPACE_THRESHOLD_NUMERATOR: usize = 1;
33const SPACE_THRESHOLD_DENOMINATOR: usize = 4;
34
35const DEFAULT_SLEEP_TIME: Duration = Duration::from_millis(200);
38
39pub fn create_ring_buffer(capacity: usize) -> ring_buffer::Reader {
40 RingBuffer::create(calculate_real_size_given_desired_capacity(capacity))
41}
42
43fn calculate_real_size_given_desired_capacity(capacity: usize) -> usize {
44 let page_size = zx::system_get_page_size() as usize;
48 (capacity * SPACE_THRESHOLD_DENOMINATOR
49 / (SPACE_THRESHOLD_DENOMINATOR - SPACE_THRESHOLD_NUMERATOR))
50 .next_multiple_of(page_size)
51}
52
53const IOB_PEER_CLOSED_KEY_BASE: u64 = 0x8000_0000_0000_0000;
54
55pub type OnInactive = Box<dyn Fn(Arc<ComponentIdentity>) + Send + Sync>;
56
57pub struct SharedBuffer {
58 inner: Condition<Inner>,
59
60 sockets: Mutex<Slab<Socket>>,
63
64 on_inactive: OnInactive,
66
67 port: zx::Port,
69
70 event: zx::Event,
72
73 socket_thread: Mutex<Option<std::thread::JoinHandle<()>>>,
75
76 _buffer_monitor_task: fasync::Task<()>,
79
80 pub(crate) saturation_curve: Arc<SaturationCurve>,
82}
83
84struct InnerGuard<'a> {
85 buffer: &'a Arc<SharedBuffer>,
86
87 guard: ManuallyDrop<ConditionGuard<'a, Inner>>,
88
89 on_inactive: Vec<Arc<ComponentIdentity>>,
91
92 wake: bool,
94}
95
96impl Drop for InnerGuard<'_> {
97 fn drop(&mut self) {
98 if self.wake {
99 for waker in self.guard.drain_wakers() {
100 waker.wake();
101 }
102 }
103 unsafe {
105 ManuallyDrop::drop(&mut self.guard);
106 }
107 for identity in self.on_inactive.drain(..) {
108 (*self.buffer.on_inactive)(identity);
109 }
110 }
111}
112
113impl Deref for InnerGuard<'_> {
114 type Target = Inner;
115
116 fn deref(&self) -> &Self::Target {
117 &self.guard
118 }
119}
120
121impl DerefMut for InnerGuard<'_> {
122 fn deref_mut(&mut self) -> &mut Self::Target {
123 &mut self.guard
124 }
125}
126
127struct Inner {
128 ring_buffer: Arc<RingBuffer>,
130
131 containers: Containers,
133
134 thread_msg_queue: VecDeque<ThreadMessage>,
136
137 last_scanned: u64,
139
140 last_scanned_message_id: u64,
142
143 tail: u64,
145
146 tail_message_id: u64,
148
149 iob_peers: Slab<(ContainerId, zx::Iob)>,
151
152 terminated: bool,
154}
155
156enum ThreadMessage {
157 Terminate,
159
160 Flush(oneshot::Sender<()>),
162}
163
164pub struct SharedBufferOptions {
165 pub sleep_time: Duration,
168 pub saturation_curve_size: usize,
170}
171
172impl Default for SharedBufferOptions {
173 fn default() -> Self {
174 Self { sleep_time: DEFAULT_SLEEP_TIME, saturation_curve_size: 254 }
175 }
176}
177
178impl SharedBuffer {
179 pub fn new(
181 ring_buffer: ring_buffer::Reader,
182 on_inactive: OnInactive,
183 options: SharedBufferOptions,
184 inspect_node: &Node,
185 ) -> Arc<Self> {
186 let saturation_curve =
187 Arc::new(SaturationCurve::new(inspect_node, options.saturation_curve_size));
188 let this = Arc::new_cyclic(|weak: &Weak<Self>| Self {
189 inner: Condition::new(Inner {
190 ring_buffer: Arc::clone(&ring_buffer),
191 containers: Containers::default(),
192 thread_msg_queue: VecDeque::default(),
193 last_scanned: 0,
194 last_scanned_message_id: 0,
195 tail: 0,
196 tail_message_id: 0,
197 iob_peers: Slab::default(),
198 terminated: false,
199 }),
200 sockets: Mutex::new(Slab::default()),
201 on_inactive,
202 port: zx::Port::create(),
203 event: zx::Event::create(),
204 socket_thread: Mutex::default(),
205 saturation_curve,
206 _buffer_monitor_task: fasync::Task::spawn(Self::buffer_monitor_task(
207 Weak::clone(weak),
208 ring_buffer,
209 options.sleep_time,
210 )),
211 });
212
213 *this.socket_thread.lock() = Some({
214 let this = Arc::clone(&this);
215 std::thread::spawn(move || this.socket_thread(options.sleep_time))
216 });
217 this
218 }
219
220 pub fn new_container_buffer(
221 self: &Arc<Self>,
222 identity: Arc<ComponentIdentity>,
223 stats: Arc<LogStreamStats>,
224 ) -> ContainerBuffer {
225 let mut inner = self.inner.lock();
226 let Inner { containers, ring_buffer, .. } = &mut *inner;
227 ContainerBuffer {
228 shared_buffer: Arc::clone(self),
229 container_id: containers.new_container(ring_buffer, Arc::clone(&identity), stats),
230 }
231 }
232
233 pub async fn flush(&self) {
234 let (sender, receiver) = oneshot::channel();
235 self.inner.lock().thread_msg_queue.push_back(ThreadMessage::Flush(sender));
236 self.event.signal(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
237 let _ = receiver.await;
239 }
240
241 #[cfg(test)]
243 pub fn container_count(&self) -> usize {
244 self.inner.lock().containers.len()
245 }
246
247 pub fn terminate(self: &Arc<Self>) -> impl Future<Output = ()> {
251 {
252 let mut inner = InnerGuard::new(self);
253 self.flush_sockets(&mut inner);
254 inner.thread_msg_queue.push_back(ThreadMessage::Terminate);
255 }
256 self.event.signal(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
257 let join_handle = self.socket_thread.lock().take().unwrap();
258 fasync::unblock(|| {
259 let _ = join_handle.join();
260 })
261 }
262
263 pub fn cursor(
267 self: &Arc<Self>,
268 mode: StreamMode,
269 selectors: Vec<ComponentSelector>,
270 ) -> FilterCursor {
271 InnerGuard::new(self).cursor(mode, selectors)
272 }
273
274 fn socket_thread(self: Arc<Self>, sleep_time: Duration) {
275 const INTERRUPT_KEY: u64 = u64::MAX;
276 let mut sockets_ready = Vec::new();
277 let mut iob_peer_closed = Vec::new();
278 let mut interrupt_needs_arming = true;
279 let mut msg = None;
280
281 loop {
282 let mut deadline = if msg.is_some() {
283 zx::MonotonicInstant::INFINITE_PAST
284 } else {
285 if interrupt_needs_arming {
286 self.event
287 .wait_async(
288 &self.port,
289 INTERRUPT_KEY,
290 zx::Signals::USER_0,
291 zx::WaitAsyncOpts::empty(),
292 )
293 .unwrap();
294 interrupt_needs_arming = false;
295 }
296
297 let _ = self
300 .event
301 .wait_one(zx::Signals::USER_0, zx::MonotonicInstant::after(sleep_time.into()));
302 zx::MonotonicInstant::INFINITE
303 };
304
305 loop {
307 match self.port.wait(deadline) {
308 Ok(packet) => {
309 if packet.key() == INTERRUPT_KEY {
310 interrupt_needs_arming = true;
311 if msg.is_none() {
316 msg = self.inner.lock().thread_msg_queue.pop_front();
317 }
318 } else if packet.key() & IOB_PEER_CLOSED_KEY_BASE != 0 {
319 iob_peer_closed.push(packet.key() as u32);
320 } else {
321 sockets_ready.push(SocketId(packet.key() as u32))
322 }
323 }
324 Err(zx::Status::TIMED_OUT) => break,
325 Err(status) => panic!("port wait error: {status:?}"),
326 }
327 deadline = zx::MonotonicInstant::INFINITE_PAST;
328 }
329
330 let mut inner = InnerGuard::new(&self);
331
332 if !iob_peer_closed.is_empty() {
333 inner.update_message_ids(inner.ring_buffer.head());
335
336 for iob_peer_closed in iob_peer_closed.drain(..) {
337 let container_id = inner.iob_peers.free(iob_peer_closed).0;
338 if let Some(container) = inner.containers.get_mut(container_id) {
339 container.iob_count -= 1;
340 if container.iob_count == 0 && !container.is_active() {
341 if container.should_free() {
342 inner.containers.free(container_id);
343 } else {
344 let identity = Arc::clone(&container.identity);
345 inner.on_inactive.push(identity);
346 }
347 }
348 }
349 }
350 }
351
352 {
353 let mut sockets = self.sockets.lock();
354 for socket_id in sockets_ready.drain(..) {
355 inner.read_socket(&mut sockets, socket_id, |socket| {
356 socket
357 .socket
358 .wait_async(
359 &self.port,
360 socket_id.0 as u64,
361 zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
362 zx::WaitAsyncOpts::empty(),
363 )
364 .unwrap();
365 });
366 }
367 }
368
369 if let Some(m) = msg.take() {
371 match m {
372 ThreadMessage::Terminate => {
373 inner.terminated = true;
374 inner.wake = true;
375 return;
376 }
377 ThreadMessage::Flush(sender) => {
378 let _ = sender.send(());
379 }
380 }
381
382 msg = inner.thread_msg_queue.pop_front();
384 if msg.is_none() {
385 self.event.signal(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
389 }
390 }
391 }
392 }
393
394 async fn buffer_monitor_task(
395 this: Weak<Self>,
396 mut ring_buffer: ring_buffer::Reader,
397 sleep_time: Duration,
398 ) {
399 let mut last_head = 0;
400 loop {
401 fasync::Timer::new(sleep_time).await;
403 let head = ring_buffer.wait(last_head).await;
404 let Some(this) = this.upgrade() else { return };
405 let mut inner = InnerGuard::new(&this);
406 inner.check_space(head);
407 last_head = head;
408 }
409 }
410
411 fn flush_sockets(&self, inner: &mut InnerGuard<'_>) -> u64 {
413 let mut sockets = self.sockets.lock();
414 let mut socket_id = 0;
415 while let Some(next_id) = sockets.next_used(socket_id) {
416 inner.read_socket(&mut sockets, SocketId(next_id), |_| {});
418 socket_id = next_id + 1;
419 }
420 let head = inner.ring_buffer.head();
421 inner.update_message_ids(head);
422 head
423 }
424}
425
426impl Inner {
427 fn ingest(&mut self, msg: &[u8], container_id: ContainerId) {
429 if msg.len() < std::mem::size_of::<Header>() {
430 debug!("message too short ({})", msg.len());
431 if let Some(container) = self.containers.get(container_id) {
432 container.stats.increment_invalid(msg.len());
433 }
434 return;
435 }
436
437 let header = Header::read_from_bytes(&msg[..std::mem::size_of::<Header>()]).unwrap();
438
439 let msg_len = header.size_words() as usize * 8;
442
443 if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg.len() < msg_len {
445 debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, msg.len());
446 if let Some(container) = self.containers.get(container_id) {
447 container.stats.increment_invalid(msg.len());
448 }
449 return;
450 }
451
452 let Some(container) = self.containers.get_mut(container_id) else {
453 return;
454 };
455
456 let mut data;
457 let msg = if container.dropped_count > 0 {
458 data = msg.to_vec();
459 if !add_dropped_count(&mut data, container.dropped_count) {
460 debug!("unable to add dropped count to invalid message");
461 container.stats.increment_invalid(data.len());
462 return;
463 }
464 &data
465 } else {
466 msg
467 };
468
469 if container.iob.write(Default::default(), 0, msg).is_err() {
470 container.dropped_count += 1
474 } else {
475 container.dropped_count = 0;
476 }
477 }
478
479 unsafe fn parse_message(
491 &self,
492 range: Range<u64>,
493 ) -> (ContainerId, &[u8], Option<zx::BootInstant>) {
494 let (tag, msg) = unsafe { self.ring_buffer.first_message_in(range) }
495 .expect("Unable to read message from ring buffer");
496 (
497 ContainerId(tag as u32),
498 msg,
499 (msg.len() >= 16)
500 .then(|| zx::BootInstant::from_nanos(i64::read_from_bytes(&msg[8..16]).unwrap())),
501 )
502 }
503}
504
505impl<'a> InnerGuard<'a> {
506 fn new(buffer: &'a Arc<SharedBuffer>) -> Self {
507 Self {
508 buffer,
509 guard: ManuallyDrop::new(buffer.inner.lock()),
510 on_inactive: Vec::new(),
511 wake: false,
512 }
513 }
514
515 fn pop(&mut self, head: u64) -> Option<usize> {
523 if head == self.tail {
524 return None;
525 }
526
527 let record_len = {
530 let (container_id, message, _) = unsafe { self.parse_message(self.tail..head) };
531 let record_len = ring_buffer_record_len(message.len());
532
533 let container = self.containers.get_mut(container_id).unwrap();
534
535 container.stats.increment_rolled_out(record_len);
536 container.msg_ids.start += 1;
537 if !container.is_active() {
538 if container.should_free() {
539 self.containers.free(container_id);
540 } else {
541 let identity = Arc::clone(&container.identity);
542 self.on_inactive.push(identity);
543 }
544 }
545
546 record_len
547 };
548
549 self.ring_buffer.increment_tail(record_len);
551 self.tail += record_len as u64;
552 self.tail_message_id += 1;
553
554 assert!(self.last_scanned >= self.tail);
556
557 Some(record_len)
558 }
559
560 fn read_socket(
562 &mut self,
563 sockets: &mut Slab<Socket>,
564 socket_id: SocketId,
565 rearm: impl FnOnce(&mut Socket),
566 ) {
567 let Some(socket) = sockets.get_mut(socket_id.0) else { return };
568 let container_id = socket.container_id;
569
570 loop {
571 self.check_space(self.ring_buffer.head());
572
573 let mut data = Vec::with_capacity(MAX_DATAGRAM_LEN_BYTES as usize);
574
575 let len = match socket.socket.read_uninit(data.spare_capacity_mut()) {
577 Ok(d) => d.len(),
578 Err(zx::Status::SHOULD_WAIT) => {
579 rearm(socket);
581 return;
582 }
583 Err(_) => break,
584 };
585
586 unsafe {
588 data.set_len(len);
589 }
590
591 let container = self.containers.get_mut(container_id).unwrap();
592 if data.len() < 16 {
593 container.stats.increment_invalid(data.len());
594 continue;
595 }
596
597 let header = Header::read_from_bytes(&data[..std::mem::size_of::<Header>()]).unwrap();
598 let msg_len = header.size_words() as usize * 8;
599 if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg_len != data.len() {
600 debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, data.len());
601 container.stats.increment_invalid(data.len());
602 continue;
603 }
604
605 if container.dropped_count > 0 && !add_dropped_count(&mut data, container.dropped_count)
606 {
607 debug!("unable to add dropped count to invalid message");
608 container.stats.increment_invalid(data.len());
609 continue;
610 }
611
612 if container.iob.write(Default::default(), 0, &data).is_err() {
613 container.dropped_count += 1
617 } else {
618 container.dropped_count = 0;
619 }
620 }
621
622 self.update_message_ids(self.ring_buffer.head());
626
627 let container = self.containers.get_mut(container_id).unwrap();
628 container.remove_socket(socket_id, sockets);
629 if !container.is_active() {
630 if container.should_free() {
631 self.containers.free(container_id);
632 } else {
633 let identity = Arc::clone(&container.identity);
634 self.on_inactive.push(identity);
635 }
636 }
637 }
638
639 fn update_message_ids(&mut self, head: u64) {
641 while self.last_scanned < head {
642 let (container_id, message, _) = unsafe { self.parse_message(self.last_scanned..head) };
645 let msg_len = message.len();
646 let severity = (msg_len >= 8)
647 .then(|| Header::read_from_bytes(&message[0..8]).unwrap().severity().into());
648 let container = self.containers.get_mut(container_id).unwrap();
649 container.msg_ids.end += 1;
650 if let Some(severity) = severity {
651 container.stats.ingest_message(msg_len, severity);
652 }
653 self.last_scanned += ring_buffer_record_len(msg_len) as u64;
654 self.last_scanned_message_id += 1;
655 self.wake = true;
656 }
657 if self.wake {
658 let boot_time = zx::BootInstant::get().into_nanos();
659 let count = self.last_scanned_message_id - self.tail_message_id;
660 self.buffer.saturation_curve.record(boot_time, count);
661 }
662 }
663
664 fn check_space(&mut self, head: u64) {
666 self.update_message_ids(head);
667 let capacity = self.ring_buffer.capacity();
668 let mut space = capacity
669 .checked_sub((head - self.tail) as usize)
670 .unwrap_or_else(|| panic!("bad range: {:?}", self.tail..head));
671 let required_space = capacity * SPACE_THRESHOLD_NUMERATOR / SPACE_THRESHOLD_DENOMINATOR;
672 while space < required_space {
673 let Some(amount) = self.pop(head) else { break };
674 space += amount;
675 }
676 }
677
678 fn cursor(&mut self, mode: StreamMode, selectors: Vec<ComponentSelector>) -> FilterCursor {
680 let (index, message_id) = match mode {
684 StreamMode::Snapshot => (self.tail, self.tail_message_id),
685 StreamMode::Subscribe => {
686 let head = self.ring_buffer.head();
687 self.update_message_ids(head);
688 (self.last_scanned, self.last_scanned_message_id)
689 }
690 StreamMode::SnapshotThenSubscribe => (self.tail, self.tail_message_id),
691 };
692 FilterCursor::new(
693 Arc::clone(self.buffer),
694 index,
695 message_id,
696 matches!(mode, StreamMode::Snapshot),
697 selectors,
698 )
699 }
700}
701
702#[derive(Default)]
703struct Containers {
704 slab: Slab<ContainerInfo>,
705}
706
707#[derive(Clone, Copy, Debug, Eq, PartialEq)]
708struct ContainerId(u32);
709
710impl Containers {
711 #[cfg(test)]
712 fn len(&self) -> usize {
713 self.slab.len()
714 }
715
716 fn get(&self, id: ContainerId) -> Option<&ContainerInfo> {
717 self.slab.get(id.0)
718 }
719
720 fn get_mut(&mut self, id: ContainerId) -> Option<&mut ContainerInfo> {
721 self.slab.get_mut(id.0)
722 }
723
724 fn new_container(
725 &mut self,
726 buffer: &RingBuffer,
727 identity: Arc<ComponentIdentity>,
728 stats: Arc<LogStreamStats>,
729 ) -> ContainerId {
730 ContainerId(self.slab.insert(|id| {
731 let (iob, _) = buffer.new_iob_writer(id as u64).unwrap();
732 ContainerInfo::new(identity, stats, iob)
733 }))
734 }
735
736 fn free(&mut self, id: ContainerId) {
737 self.slab.free(id.0);
738 }
739}
740
741#[derive(Derivative)]
742#[derivative(Debug)]
743struct ContainerInfo {
744 identity: Arc<ComponentIdentity>,
746
747 msg_ids: Range<u64>,
750
751 terminated: bool,
753
754 #[derivative(Debug = "ignore")]
756 stats: Arc<LogStreamStats>,
757
758 first_socket_id: SocketId,
760
761 iob: zx::Iob,
763
764 iob_count: usize,
766
767 dropped_count: u64,
769}
770
771impl ContainerInfo {
772 fn new(identity: Arc<ComponentIdentity>, stats: Arc<LogStreamStats>, iob: zx::Iob) -> Self {
773 Self {
774 identity,
775 msg_ids: 0..0,
776 terminated: false,
777 stats,
778 first_socket_id: SocketId::NULL,
779 iob,
780 iob_count: 0,
781 dropped_count: 0,
782 }
783 }
784
785 fn should_free(&self) -> bool {
786 self.terminated && !self.is_active()
787 }
788
789 fn is_active(&self) -> bool {
795 self.first_socket_id != SocketId::NULL
796 || self.iob_count > 0
797 || self.msg_ids.end != self.msg_ids.start
798 || ARCHIVIST_MONIKER.get().is_some_and(|m| *self.identity == *m)
799 }
800
801 fn remove_socket(&mut self, socket_id: SocketId, sockets: &mut Slab<Socket>) {
805 let Socket { prev, next, .. } = *sockets.get(socket_id.0).unwrap();
806 if prev == SocketId::NULL {
807 self.first_socket_id = next;
808 } else {
809 sockets.get_mut(prev.0).unwrap().next = next;
810 }
811 if next != SocketId::NULL {
812 sockets
813 .get_mut(next.0)
814 .unwrap_or_else(|| panic!("next {next:?} has been freed!"))
815 .prev = prev;
816 }
817 sockets.free(socket_id.0);
818 self.stats.close_socket();
819 debug!(identity:% = self.identity; "Socket closed.");
820 }
821}
822
823pub struct ContainerBuffer {
824 shared_buffer: Arc<SharedBuffer>,
825 container_id: ContainerId,
826}
827
828impl ContainerBuffer {
829 pub fn iob_tag(&self) -> u64 {
831 self.container_id.0 as u64
832 }
833
834 pub fn push_back(&self, msg: &[u8]) {
838 self.shared_buffer.inner.lock().ingest(msg, self.container_id);
839 }
840
841 pub fn identity(&self) -> Option<Arc<ComponentIdentity>> {
843 self.shared_buffer
844 .inner
845 .lock()
846 .containers
847 .get(self.container_id)
848 .map(|c| Arc::clone(&c.identity))
849 }
850
851 pub fn iob(&self) -> zx::Iob {
853 let mut inner = self.shared_buffer.inner.lock();
854
855 inner.containers.get_mut(self.container_id).unwrap().iob_count += 1;
856
857 let (ep0, ep1) = inner.ring_buffer.new_iob_writer(self.container_id.0 as u64).unwrap();
858
859 inner.iob_peers.insert(|idx| {
860 ep1.wait_async(
861 &self.shared_buffer.port,
862 idx as u64 | IOB_PEER_CLOSED_KEY_BASE,
863 zx::Signals::IOB_PEER_CLOSED,
864 zx::WaitAsyncOpts::empty(),
865 )
866 .unwrap();
867
868 (self.container_id, ep1)
869 });
870
871 ep0
872 }
873
874 pub fn terminate(&self) {
878 let mut inner = InnerGuard::new(&self.shared_buffer);
879
880 inner.update_message_ids(inner.ring_buffer.head());
882
883 if let Some(container) = inner.containers.get_mut(self.container_id) {
884 container.terminated = true;
885 if container.first_socket_id != SocketId::NULL {
886 let mut sockets = self.shared_buffer.sockets.lock();
887 loop {
888 container.remove_socket(container.first_socket_id, &mut sockets);
889 if container.first_socket_id == SocketId::NULL {
890 break;
891 }
892 }
893 }
894 if container.should_free() {
895 inner.containers.free(self.container_id);
896 }
897 inner.wake = true;
898 }
899 }
900
901 pub fn is_active(&self) -> bool {
903 self.shared_buffer
904 .inner
905 .lock()
906 .containers
907 .get(self.container_id)
908 .is_some_and(|c| c.is_active())
909 }
910
911 pub fn add_socket(&self, socket: zx::Socket) {
913 let mut inner = self.shared_buffer.inner.lock();
914 let Some(container) = inner.containers.get_mut(self.container_id) else { return };
915 container.stats.open_socket();
916 let next = container.first_socket_id;
917 let mut sockets = self.shared_buffer.sockets.lock();
918 let socket_id = SocketId(sockets.insert(|socket_id| {
919 socket
920 .wait_async(
921 &self.shared_buffer.port,
922 socket_id as u64,
923 zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
924 zx::WaitAsyncOpts::empty(),
925 )
926 .unwrap();
927 Socket { socket, container_id: self.container_id, prev: SocketId::NULL, next }
928 }));
929 if next != SocketId::NULL {
930 sockets.get_mut(next.0).unwrap().prev = socket_id;
931 }
932 container.first_socket_id = socket_id;
933 }
934}
935
936impl Drop for ContainerBuffer {
937 fn drop(&mut self) {
938 self.terminate();
939 }
940}
941
942struct Slab<T> {
944 slab: Vec<Slot<T>>,
945 free_index: usize,
946}
947
948impl<T> Default for Slab<T> {
949 fn default() -> Self {
950 Self { slab: Vec::new(), free_index: usize::MAX }
951 }
952}
953
954enum Slot<T> {
955 Used(T),
956 Free(usize),
957}
958
959impl<T> Slab<T> {
960 #[cfg(test)]
962 fn len(&self) -> usize {
963 self.slab.iter().filter(|c| matches!(c, Slot::Used(_))).count()
964 }
965
966 fn free(&mut self, index: u32) -> T {
967 let index = index as usize;
968 let value = match std::mem::replace(&mut self.slab[index], Slot::Free(self.free_index)) {
969 Slot::Free(_) => panic!("Slot already free"),
970 Slot::Used(value) => value,
971 };
972 self.free_index = index;
973 value
974 }
975
976 fn get(&self, id: u32) -> Option<&T> {
977 self.slab.get(id as usize).and_then(|s| match s {
978 Slot::Used(s) => Some(s),
979 _ => None,
980 })
981 }
982
983 fn get_mut(&mut self, id: u32) -> Option<&mut T> {
984 self.slab.get_mut(id as usize).and_then(|s| match s {
985 Slot::Used(s) => Some(s),
986 _ => None,
987 })
988 }
989
990 fn insert(&mut self, value: impl FnOnce(u32) -> T) -> u32 {
991 let free_index = self.free_index;
992 if free_index != usize::MAX {
993 self.free_index = match std::mem::replace(
994 &mut self.slab[free_index],
995 Slot::Used(value(free_index as u32)),
996 ) {
997 Slot::Free(next) => next,
998 _ => unreachable!(),
999 };
1000 free_index as u32
1001 } else {
1002 assert!(self.slab.len() < u32::MAX as usize);
1005 self.slab.push(Slot::Used(value(self.slab.len() as u32)));
1006 (self.slab.len() - 1) as u32
1007 }
1008 }
1009
1010 fn next_used(&self, id: u32) -> Option<u32> {
1012 let mut id = id as usize;
1013 while id < self.slab.len() {
1014 if matches!(self.slab[id], Slot::Used(_)) {
1015 return Some(id as u32);
1016 }
1017 id += 1;
1018 }
1019 None
1020 }
1021}
1022
1023#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1024struct SocketId(u32);
1025
1026impl SocketId {
1027 const NULL: Self = SocketId(0xffff_ffff);
1028}
1029
1030struct Socket {
1031 socket: zx::Socket,
1032 container_id: ContainerId,
1033 prev: SocketId,
1035 next: SocketId,
1036}
1037
1038#[cfg(test)]
1039mod tests {
1040 use super::{SharedBuffer, SharedBufferOptions, Slab, create_ring_buffer};
1041 use crate::identity::ComponentIdentity;
1042 use crate::logs::stats::LogStreamStats;
1043 use crate::logs::testing::make_message;
1044 use assert_matches::assert_matches;
1045 use diagnostics_assertions::{AnyProperty, assert_data_tree};
1046 use fidl_fuchsia_diagnostics::StreamMode;
1047 use fuchsia_async as fasync;
1048 use fuchsia_async::TimeoutExt;
1049 use fuchsia_inspect::Inspector;
1050 use futures::FutureExt;
1051 use futures::channel::mpsc;
1052 use futures::future::OptionFuture;
1053 use futures::stream::{FuturesUnordered, StreamExt as _};
1054 use moniker::ExtendedMoniker;
1055 use ring_buffer::MAX_MESSAGE_SIZE;
1056 use selectors::SelectorExt;
1057 use std::future::poll_fn;
1058 use std::iter::repeat_with;
1059 use std::pin::pin;
1060 use std::sync::Arc;
1061 use std::sync::atomic::{AtomicU64, Ordering};
1062 use std::task::Poll;
1063 use std::time::Duration;
1064 use zerocopy::FromBytes;
1065
1066 fn test_stats() -> Arc<LogStreamStats> {
1067 Arc::new(LogStreamStats::new(
1068 &fuchsia_inspect::Node::default(),
1069 &ComponentIdentity::unknown(),
1070 ))
1071 }
1072
1073 fn container_cursor(
1074 buffer: &Arc<SharedBuffer>,
1075 container: &super::ContainerBuffer,
1076 mode: StreamMode,
1077 ) -> Option<impl futures::Stream<Item = Box<[u8]>>> {
1078 let selectors = vec![container.identity()?.moniker.clone().into_component_selector()];
1079 let mut cursor = Box::pin(buffer.cursor(mode, selectors));
1080 Some(futures::stream::poll_fn(move |cx| {
1081 cursor.as_mut().poll_next(cx).map(|m| m.map(|m| m.parse().unwrap().2.into()))
1082 }))
1083 }
1084
1085 async fn yield_to_executor() {
1086 let mut first_time = true;
1087 poll_fn(|cx| {
1088 if first_time {
1089 cx.waker().wake_by_ref();
1090 first_time = false;
1091 Poll::Pending
1092 } else {
1093 Poll::Ready(())
1094 }
1095 })
1096 .await;
1097 }
1098
1099 #[fuchsia::test]
1100 async fn push_one_message() {
1101 let buffer = SharedBuffer::new(
1102 create_ring_buffer(MAX_MESSAGE_SIZE),
1103 Box::new(|_| {}),
1104 Default::default(),
1105 &fuchsia_inspect::Node::default(),
1106 );
1107 let container_buffer =
1108 buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
1109 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1110 container_buffer.push_back(msg.bytes());
1111
1112 let cursor = container_cursor(&buffer, &container_buffer, StreamMode::Snapshot).unwrap();
1114 assert_eq!(cursor.map(|item| assert_eq!(&*item, msg.bytes())).count().await, 1);
1115 }
1116
1117 #[fuchsia::test]
1118 async fn message_too_short() {
1119 let buffer = SharedBuffer::new(
1120 create_ring_buffer(MAX_MESSAGE_SIZE),
1121 Box::new(|_| {}),
1122 Default::default(),
1123 &fuchsia_inspect::Node::default(),
1124 );
1125
1126 let container_buffer =
1127 buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
1128 container_buffer.push_back(&[0]);
1129
1130 assert_eq!(
1131 container_cursor(&buffer, &container_buffer, StreamMode::Snapshot)
1132 .unwrap()
1133 .count()
1134 .await,
1135 0
1136 );
1137 }
1138
1139 #[fuchsia::test]
1140 async fn bad_type() {
1141 let buffer = SharedBuffer::new(
1142 create_ring_buffer(MAX_MESSAGE_SIZE),
1143 Box::new(|_| {}),
1144 Default::default(),
1145 &fuchsia_inspect::Node::default(),
1146 );
1147 let container_buffer =
1148 buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
1149 container_buffer.push_back(&[0x77; 16]);
1150
1151 assert_eq!(
1152 container_cursor(&buffer, &container_buffer, StreamMode::Snapshot)
1153 .unwrap()
1154 .count()
1155 .await,
1156 0
1157 );
1158 }
1159
1160 #[fuchsia::test]
1161 async fn message_truncated() {
1162 let buffer = SharedBuffer::new(
1163 create_ring_buffer(MAX_MESSAGE_SIZE),
1164 Box::new(|_| {}),
1165 Default::default(),
1166 &fuchsia_inspect::Node::default(),
1167 );
1168 let container_buffer =
1169 buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
1170 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1171 container_buffer.push_back(&msg.bytes()[..msg.bytes().len() - 1]);
1172
1173 assert_eq!(
1174 container_cursor(&buffer, &container_buffer, StreamMode::Snapshot)
1175 .unwrap()
1176 .count()
1177 .await,
1178 0
1179 );
1180 }
1181
1182 #[fuchsia::test]
1183 async fn buffer_wrapping() {
1184 let buffer = SharedBuffer::new(
1185 create_ring_buffer(MAX_MESSAGE_SIZE),
1186 Box::new(|_| {}),
1187 SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
1188 &fuchsia_inspect::Node::default(),
1189 );
1190 let container_buffer =
1191 buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
1192
1193 let mut i = 0;
1195 loop {
1196 let msg = make_message(&format!("{i}"), None, zx::BootInstant::from_nanos(i));
1197 container_buffer.push_back(msg.bytes());
1198 i += 1;
1199
1200 yield_to_executor().await;
1202
1203 let inner = buffer.inner.lock();
1204 if inner.ring_buffer.head() > inner.ring_buffer.capacity() as u64 {
1205 break;
1206 }
1207 }
1208
1209 let mut cursor =
1211 pin!(container_cursor(&buffer, &container_buffer, StreamMode::Snapshot).unwrap());
1212
1213 let mut j;
1214 let item = cursor.next().await;
1215 assert_matches!(
1216 item,
1217 Some(item) => {
1218 j = i64::read_from_bytes(&item[8..16]).unwrap();
1219 let msg = make_message(&format!("{j}"),
1220 None,
1221 zx::BootInstant::from_nanos(j));
1222 assert_eq!(&*item, msg.bytes());
1223 }
1224 );
1225
1226 j += 1;
1227 while j != i {
1228 assert_matches!(
1229 cursor.next().await,
1230 Some(item) => {
1231 assert_eq!(&*item, make_message(&format!("{j}"),
1232 None,
1233 zx::BootInstant::from_nanos(j)).bytes());
1234 }
1235 );
1236 j += 1;
1237 }
1238
1239 assert!(cursor.next().await.is_none());
1240 }
1241
1242 #[fuchsia::test]
1243 async fn on_inactive() {
1244 let identity = Arc::new(vec!["a"].into());
1245 let on_inactive = Arc::new(AtomicU64::new(0));
1246 let buffer = {
1247 let on_inactive = Arc::clone(&on_inactive);
1248 let identity = Arc::clone(&identity);
1249 Arc::new(SharedBuffer::new(
1250 create_ring_buffer(MAX_MESSAGE_SIZE),
1251 Box::new(move |i| {
1252 assert_eq!(i, identity);
1253 on_inactive.fetch_add(1, Ordering::Relaxed);
1254 }),
1255 SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
1256 &fuchsia_inspect::Node::default(),
1257 ))
1258 };
1259 let container_a = buffer.new_container_buffer(identity, test_stats());
1260 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), test_stats());
1261
1262 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1263 container_a.push_back(msg.bytes());
1264
1265 while container_cursor(&buffer, &container_a, StreamMode::Snapshot).unwrap().count().await
1267 == 1
1268 {
1269 container_b.push_back(msg.bytes());
1270
1271 yield_to_executor().await;
1273 }
1274
1275 assert_eq!(on_inactive.load(Ordering::Relaxed), 1);
1276 }
1277
1278 #[fuchsia::test]
1279 async fn terminate_drops_container() {
1280 async {}.await;
1282
1283 let buffer = SharedBuffer::new(
1284 create_ring_buffer(MAX_MESSAGE_SIZE),
1285 Box::new(|_| {}),
1286 SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
1287 &fuchsia_inspect::Node::default(),
1288 );
1289
1290 let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
1292 assert_eq!(buffer.container_count(), 1);
1293 container_a.terminate();
1294
1295 assert_eq!(buffer.container_count(), 0);
1296
1297 let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats());
1299 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1300 container_a.push_back(msg.bytes());
1301 assert_eq!(buffer.container_count(), 1);
1302 container_a.terminate();
1303
1304 assert_eq!(buffer.container_count(), 1);
1306
1307 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), test_stats());
1309 assert_eq!(buffer.container_count(), 2);
1310
1311 while buffer.container_count() != 1 {
1314 container_b.push_back(msg.bytes());
1315
1316 yield_to_executor().await;
1318 }
1319
1320 assert!(container_cursor(&buffer, &container_a, StreamMode::Subscribe).is_none());
1321 }
1322
1323 #[fuchsia::test]
1324 async fn cursor_subscribe() {
1325 for mode in [StreamMode::Subscribe, StreamMode::SnapshotThenSubscribe] {
1326 let buffer = SharedBuffer::new(
1327 create_ring_buffer(MAX_MESSAGE_SIZE),
1328 Box::new(|_| {}),
1329 Default::default(),
1330 &fuchsia_inspect::Node::default(),
1331 );
1332 let container =
1333 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats()));
1334 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1335 container.push_back(msg.bytes());
1336
1337 let (sender, mut receiver) = mpsc::unbounded();
1338
1339 {
1341 let container = Arc::clone(&container);
1342 fasync::Task::spawn(async move {
1343 let mut cursor =
1344 pin!(container_cursor(&container.shared_buffer, &container, mode).unwrap());
1345 while let Some(item) = cursor.next().await {
1346 sender.unbounded_send(item).unwrap();
1347 }
1348 })
1349 .detach();
1350 }
1351
1352 if mode == StreamMode::SnapshotThenSubscribe {
1354 assert_matches!(
1355 receiver.next().await,
1356 Some(item) if item.as_ref() == msg.bytes()
1357 );
1358 }
1359
1360 assert!(
1362 OptionFuture::from(Some(receiver.next()))
1363 .on_timeout(Duration::from_millis(500), || None)
1364 .await
1365 .is_none()
1366 );
1367
1368 container.push_back(msg.bytes());
1369
1370 assert_matches!(
1372 receiver.next().await,
1373 Some(item) if item.as_ref() == msg.bytes()
1374 );
1375 }
1376 }
1377
1378 #[fuchsia::test]
1379 async fn drained_post_termination_cursors() {
1380 let buffer = SharedBuffer::new(
1381 create_ring_buffer(MAX_MESSAGE_SIZE),
1382 Box::new(|_| {}),
1383 Default::default(),
1384 &fuchsia_inspect::Node::default(),
1385 );
1386 let container =
1387 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats()));
1388 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1389
1390 let mut cursor_a =
1391 pin!(container_cursor(&buffer, &container, StreamMode::Subscribe).unwrap());
1392 let mut cursor_b =
1393 pin!(container_cursor(&buffer, &container, StreamMode::SnapshotThenSubscribe).unwrap());
1394
1395 container.push_back(msg.bytes());
1396 container.push_back(msg.bytes());
1397 container.push_back(msg.bytes());
1398 container.push_back(msg.bytes());
1399 container.push_back(msg.bytes());
1400
1401 let mut cursor_c =
1402 pin!(container_cursor(&buffer, &container, StreamMode::Snapshot).unwrap());
1403 assert!(cursor_a.next().await.is_some());
1404 assert!(cursor_b.next().await.is_some());
1405 assert!(cursor_c.next().await.is_some());
1406
1407 drop(buffer.terminate());
1408
1409 assert_eq!(cursor_a.count().await, 4);
1411 assert_eq!(cursor_b.count().await, 4);
1412 assert_eq!(cursor_c.count().await, 4);
1413 }
1414
1415 #[fuchsia::test]
1416 async fn empty_post_termination_cursors() {
1417 let buffer = SharedBuffer::new(
1418 create_ring_buffer(MAX_MESSAGE_SIZE),
1419 Box::new(|_| {}),
1420 Default::default(),
1421 &fuchsia_inspect::Node::default(),
1422 );
1423 let container =
1424 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats()));
1425
1426 let cursor_a = container_cursor(&buffer, &container, StreamMode::Subscribe).unwrap();
1427 let cursor_b =
1428 container_cursor(&buffer, &container, StreamMode::SnapshotThenSubscribe).unwrap();
1429 let cursor_c = container_cursor(&buffer, &container, StreamMode::Snapshot).unwrap();
1430
1431 drop(buffer.terminate());
1432
1433 assert_eq!(cursor_a.count().await, 0);
1434 assert_eq!(cursor_b.count().await, 0);
1435 assert_eq!(cursor_c.count().await, 0);
1436 }
1437
1438 #[fuchsia::test]
1439 async fn recycled_container_slot() {
1440 let buffer = Arc::new(SharedBuffer::new(
1441 create_ring_buffer(MAX_MESSAGE_SIZE),
1442 Box::new(|_| {}),
1443 SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
1444 &fuchsia_inspect::Node::default(),
1445 ));
1446 let container_a =
1447 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats()));
1448 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1449 container_a.push_back(msg.bytes());
1450
1451 let mut cursor = pin!(
1452 container_cursor(&buffer, &container_a, StreamMode::SnapshotThenSubscribe).unwrap()
1453 );
1454 assert_matches!(cursor.next().await, Some(_));
1455
1456 let container_b =
1458 Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), test_stats()));
1459 while container_cursor(&buffer, &container_a, StreamMode::Snapshot).unwrap().count().await
1460 > 0
1461 {
1462 container_b.push_back(msg.bytes());
1463
1464 yield_to_executor().await;
1466 }
1467
1468 container_a.terminate();
1469
1470 let container_c =
1473 Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), test_stats()));
1474 container_c.push_back(msg.bytes());
1475 container_c.push_back(msg.bytes());
1476 }
1477
1478 #[fuchsia::test]
1479 async fn socket_increments_logstats() {
1480 let inspector = Inspector::default();
1481 let identity =
1482 Arc::new(ComponentIdentity::new(ExtendedMoniker::parse_str("./test").unwrap(), ""));
1483 let stats = Arc::new(LogStreamStats::new(inspector.root(), &identity));
1484 let buffer = Arc::new(SharedBuffer::new(
1485 create_ring_buffer(65536),
1486 Box::new(|_| {}),
1487 Default::default(),
1488 &fuchsia_inspect::Node::default(),
1489 ));
1490 let container_a = Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats));
1491 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1492
1493 let (local, remote) = zx::Socket::create_datagram();
1494 container_a.add_socket(remote);
1495
1496 let cursor_a = container_cursor(&buffer, &container_a, StreamMode::Subscribe).unwrap();
1497
1498 let mut futures = FuturesUnordered::new();
1501 futures.push(async move {
1502 let mut cursor_a = pin!(cursor_a);
1503 cursor_a.next().await
1504 });
1505 let mut next = futures.next();
1506 assert!(futures::poll!(&mut next).is_pending());
1507
1508 local.write(msg.bytes()).unwrap();
1509
1510 let cursor_b = pin!(container_cursor(&buffer, &container_a, StreamMode::Snapshot).unwrap());
1511
1512 assert_eq!(cursor_b.map(|item| assert_eq!(&*item, msg.bytes())).count().await, 1);
1513
1514 next.await;
1516 assert_data_tree!(
1518 inspector,
1519 root: contains {
1520 test: {
1521 url: "",
1522 last_timestamp: AnyProperty,
1523 sockets_closed: 0u64,
1524 sockets_opened: 1u64,
1525 invalid: {
1526 number: 0u64,
1527 bytes: 0u64,
1528 },
1529 total: {
1530 number: 1u64,
1531 bytes: 88u64,
1532 },
1533 rolled_out: {
1534 number: 0u64,
1535 bytes: 0u64,
1536 },
1537 trace: {
1538 number: 0u64,
1539 bytes: 0u64,
1540 },
1541 debug: {
1542 number: 1u64,
1543 bytes: 88u64,
1544 },
1545 info: {
1546 number: 0u64,
1547 bytes: 0u64,
1548 },
1549 warn: {
1550 number: 0u64,
1551 bytes: 0u64,
1552 },
1553 error: {
1554 number: 0u64,
1555 bytes: 0u64,
1556 },
1557 fatal: {
1558 number: 0u64,
1559 bytes: 0u64,
1560 },
1561 }
1562 }
1563 );
1564 }
1565
1566 #[fuchsia::test]
1567 async fn socket() {
1568 let buffer = Arc::new(SharedBuffer::new(
1569 create_ring_buffer(MAX_MESSAGE_SIZE),
1570 Box::new(|_| {}),
1571 Default::default(),
1572 &fuchsia_inspect::Node::default(),
1573 ));
1574 let container_a =
1575 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), test_stats()));
1576 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1577
1578 let (local, remote) = zx::Socket::create_datagram();
1579 container_a.add_socket(remote);
1580
1581 let cursor_a = container_cursor(&buffer, &container_a, StreamMode::Subscribe).unwrap();
1582
1583 let mut futures = FuturesUnordered::new();
1586 futures.push(async move {
1587 let mut cursor_a = pin!(cursor_a);
1588 cursor_a.next().await
1589 });
1590 let mut next = futures.next();
1591 assert!(futures::poll!(&mut next).is_pending());
1592
1593 local.write(msg.bytes()).unwrap();
1594
1595 let cursor_b = pin!(container_cursor(&buffer, &container_a, StreamMode::Snapshot).unwrap());
1596
1597 assert_eq!(cursor_b.map(|item| assert_eq!(&*item, msg.bytes())).count().await, 1);
1598
1599 next.await;
1601 }
1602
1603 #[fuchsia::test]
1604 async fn socket_on_inactive() {
1605 let on_inactive = Arc::new(AtomicU64::new(0));
1606 let a_identity = Arc::new(vec!["a"].into());
1607 let buffer = Arc::new(SharedBuffer::new(
1608 create_ring_buffer(MAX_MESSAGE_SIZE),
1609 {
1610 let on_inactive = Arc::clone(&on_inactive);
1611 let a_identity = Arc::clone(&a_identity);
1612 Box::new(move |id| {
1613 assert_eq!(id, a_identity);
1614 on_inactive.fetch_add(1, Ordering::Relaxed);
1615 })
1616 },
1617 SharedBufferOptions { sleep_time: Duration::ZERO, ..Default::default() },
1618 &fuchsia_inspect::Node::default(),
1619 ));
1620 let container_a = Arc::new(buffer.new_container_buffer(a_identity, test_stats()));
1621 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1622
1623 let (local, remote) = zx::Socket::create_datagram();
1624 container_a.add_socket(remote);
1625
1626 local.write(msg.bytes()).unwrap();
1627
1628 let cursor = pin!(container_cursor(&buffer, &container_a, StreamMode::Snapshot).unwrap());
1629
1630 assert_eq!(cursor.map(|item| assert_eq!(&*item, msg.bytes())).count().await, 1);
1631
1632 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), test_stats());
1634 while container_cursor(&buffer, &container_a, StreamMode::Snapshot).unwrap().count().await
1635 == 1
1636 {
1637 container_b.push_back(msg.bytes());
1638
1639 yield_to_executor().await;
1641 }
1642
1643 assert_eq!(on_inactive.load(Ordering::Relaxed), 0);
1644
1645 std::mem::drop(local);
1647
1648 while on_inactive.load(Ordering::Relaxed) != 1 {
1650 fasync::Timer::new(Duration::from_millis(50)).await;
1651 }
1652 }
1653
1654 #[fuchsia::test]
1655 async fn flush() {
1656 let a_identity = Arc::new(vec!["a"].into());
1657 let buffer = Arc::new(SharedBuffer::new(
1658 create_ring_buffer(1024 * 1024),
1659 Box::new(|_| {}),
1660 Default::default(),
1661 &fuchsia_inspect::Node::default(),
1662 ));
1663 let container_a = Arc::new(buffer.new_container_buffer(a_identity, test_stats()));
1664 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1665
1666 let (local, remote) = zx::Socket::create_datagram();
1667 container_a.add_socket(remote);
1668
1669 let cursor = pin!(container_cursor(&buffer, &container_a, StreamMode::Subscribe).unwrap());
1670
1671 const COUNT: usize = 1000;
1672 for _ in 0..COUNT {
1673 local.write(msg.bytes()).unwrap();
1674 }
1675
1676 let mut flush_futures = FuturesUnordered::from_iter([buffer.flush(), buffer.flush()]);
1678 flush_futures.next().await;
1679
1680 let messages: Option<Vec<_>> = cursor.take(COUNT).collect().now_or_never();
1681 assert!(messages.is_some());
1682
1683 flush_futures.next().await;
1685
1686 buffer.terminate().await;
1688 }
1689
1690 #[fuchsia::test]
1691 fn test_slab_next_used() {
1692 let mut slab = Slab::default();
1693 let mut ids: Vec<_> = repeat_with(|| slab.insert(|_| ())).take(4).collect();
1694 ids.sort();
1695 slab.free(ids[0]);
1696 slab.free(ids[2]);
1697 assert_eq!(slab.next_used(0), Some(ids[1]));
1698 assert_eq!(slab.next_used(ids[1]), Some(ids[1]));
1699 assert_eq!(slab.next_used(ids[1] + 1), Some(ids[3]));
1700 assert_eq!(slab.next_used(ids[3] + 1), None);
1701 }
1702}