1use crate::identity::ComponentIdentity;
6use crate::logs::repository::ARCHIVIST_MONIKER;
7use crate::logs::stats::LogStreamStats;
8use crate::logs::stored_message::StoredMessage;
9use derivative::Derivative;
10use diagnostics_log_encoding::encode::add_dropped_count;
11use diagnostics_log_encoding::{Header, TRACING_FORMAT_LOG_RECORD_TYPE};
12use fidl_fuchsia_diagnostics::StreamMode;
13use fidl_fuchsia_logger::MAX_DATAGRAM_LEN_BYTES;
14use fuchsia_async as fasync;
15use fuchsia_async::condition::{Condition, ConditionGuard, WakerEntry};
16use fuchsia_sync::Mutex;
17use futures::Stream;
18use futures::channel::oneshot;
19use log::debug;
20use pin_project::{pin_project, pinned_drop};
21use ring_buffer::{self, RingBuffer, ring_buffer_record_len};
22use std::collections::VecDeque;
23use std::mem::ManuallyDrop;
24use std::ops::{Deref, DerefMut, Range};
25use std::pin::Pin;
26use std::sync::{Arc, Weak};
27use std::task::{Context, Poll};
28use std::time::Duration;
29use zerocopy::FromBytes;
30use zx::AsHandleRef as _;
31
32const SPACE_THRESHOLD_NUMERATOR: usize = 1;
34const SPACE_THRESHOLD_DENOMINATOR: usize = 4;
35
36const DEFAULT_SLEEP_TIME: Duration = Duration::from_millis(200);
39
40pub fn create_ring_buffer(capacity: usize) -> ring_buffer::Reader {
41 RingBuffer::create(calculate_real_size_given_desired_capacity(capacity))
42}
43
44fn calculate_real_size_given_desired_capacity(capacity: usize) -> usize {
45 let page_size = zx::system_get_page_size() as usize;
49 (capacity * SPACE_THRESHOLD_DENOMINATOR
50 / (SPACE_THRESHOLD_DENOMINATOR - SPACE_THRESHOLD_NUMERATOR))
51 .next_multiple_of(page_size)
52}
53
54const IOB_PEER_CLOSED_KEY_BASE: u64 = 0x8000_0000_0000_0000;
55
56pub type OnInactive = Box<dyn Fn(Arc<ComponentIdentity>) + Send + Sync>;
57
58pub struct SharedBuffer {
59 inner: Condition<Inner>,
60
61 sockets: Mutex<Slab<Socket>>,
64
65 on_inactive: OnInactive,
67
68 port: zx::Port,
70
71 event: zx::Event,
73
74 socket_thread: Mutex<Option<std::thread::JoinHandle<()>>>,
76
77 _buffer_monitor_task: fasync::Task<()>,
80}
81
82struct InnerGuard<'a> {
83 buffer: &'a SharedBuffer,
84
85 guard: ManuallyDrop<ConditionGuard<'a, Inner>>,
86
87 on_inactive: Vec<Arc<ComponentIdentity>>,
89
90 wake: bool,
92}
93
94impl Drop for InnerGuard<'_> {
95 fn drop(&mut self) {
96 if self.wake {
97 for waker in self.guard.drain_wakers() {
98 waker.wake();
99 }
100 }
101 unsafe {
103 ManuallyDrop::drop(&mut self.guard);
104 }
105 for identity in self.on_inactive.drain(..) {
106 (*self.buffer.on_inactive)(identity);
107 }
108 }
109}
110
111impl Deref for InnerGuard<'_> {
112 type Target = Inner;
113
114 fn deref(&self) -> &Self::Target {
115 &self.guard
116 }
117}
118
119impl DerefMut for InnerGuard<'_> {
120 fn deref_mut(&mut self) -> &mut Self::Target {
121 &mut self.guard
122 }
123}
124
125struct Inner {
126 ring_buffer: Arc<RingBuffer>,
128
129 containers: Containers,
131
132 thread_msg_queue: VecDeque<ThreadMessage>,
134
135 last_scanned: u64,
137
138 tail: u64,
140
141 iob_peers: Slab<(ContainerId, zx::Iob)>,
143}
144
145enum ThreadMessage {
146 Terminate,
148
149 Flush(oneshot::Sender<()>),
151}
152
153pub struct SharedBufferOptions {
154 pub sleep_time: Duration,
157}
158
159impl Default for SharedBufferOptions {
160 fn default() -> Self {
161 Self { sleep_time: DEFAULT_SLEEP_TIME }
162 }
163}
164
165impl SharedBuffer {
166 pub fn new(
168 ring_buffer: ring_buffer::Reader,
169 on_inactive: OnInactive,
170 options: SharedBufferOptions,
171 ) -> Arc<Self> {
172 let this = Arc::new_cyclic(|weak: &Weak<Self>| Self {
173 inner: Condition::new(Inner {
174 ring_buffer: Arc::clone(&ring_buffer),
175 containers: Containers::default(),
176 thread_msg_queue: VecDeque::default(),
177 last_scanned: 0,
178 tail: 0,
179 iob_peers: Slab::default(),
180 }),
181 sockets: Mutex::new(Slab::default()),
182 on_inactive,
183 port: zx::Port::create(),
184 event: zx::Event::create(),
185 socket_thread: Mutex::default(),
186 _buffer_monitor_task: fasync::Task::spawn(Self::buffer_monitor_task(
187 Weak::clone(weak),
188 ring_buffer,
189 options.sleep_time,
190 )),
191 });
192
193 *this.socket_thread.lock() = Some({
194 let this = Arc::clone(&this);
195 std::thread::spawn(move || this.socket_thread(options.sleep_time))
196 });
197 this
198 }
199
200 pub fn new_container_buffer(
201 self: &Arc<Self>,
202 identity: Arc<ComponentIdentity>,
203 stats: Arc<LogStreamStats>,
204 ) -> ContainerBuffer {
205 let mut inner = self.inner.lock();
206 let Inner { containers, ring_buffer, .. } = &mut *inner;
207 ContainerBuffer {
208 shared_buffer: Arc::clone(self),
209 container_id: containers.new_container(ring_buffer, Arc::clone(&identity), stats),
210 }
211 }
212
213 pub async fn flush(&self) {
214 let (sender, receiver) = oneshot::channel();
215 self.inner.lock().thread_msg_queue.push_back(ThreadMessage::Flush(sender));
216 self.event.signal_handle(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
217 let _ = receiver.await;
219 }
220
221 #[cfg(test)]
223 pub fn container_count(&self) -> usize {
224 self.inner.lock().containers.len()
225 }
226
227 pub async fn terminate(&self) {
229 self.inner.lock().thread_msg_queue.push_back(ThreadMessage::Terminate);
230 self.event.signal_handle(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
231 let join_handle = self.socket_thread.lock().take().unwrap();
232 fasync::unblock(|| {
233 let _ = join_handle.join();
234 })
235 .await;
236 }
237
238 fn socket_thread(&self, sleep_time: Duration) {
239 const INTERRUPT_KEY: u64 = u64::MAX;
240 let mut sockets_ready = Vec::new();
241 let mut iob_peer_closed = Vec::new();
242 let mut interrupt_needs_arming = true;
243 let mut msg = None;
244
245 loop {
246 let mut deadline = if msg.is_some() {
247 zx::MonotonicInstant::INFINITE_PAST
248 } else {
249 if interrupt_needs_arming {
250 self.event
251 .wait_async_handle(
252 &self.port,
253 INTERRUPT_KEY,
254 zx::Signals::USER_0,
255 zx::WaitAsyncOpts::empty(),
256 )
257 .unwrap();
258 interrupt_needs_arming = false;
259 }
260
261 let _ = self.event.wait_handle(
264 zx::Signals::USER_0,
265 zx::MonotonicInstant::after(sleep_time.into()),
266 );
267 zx::MonotonicInstant::INFINITE
268 };
269
270 loop {
272 match self.port.wait(deadline) {
273 Ok(packet) => {
274 if packet.key() == INTERRUPT_KEY {
275 interrupt_needs_arming = true;
276 if msg.is_none() {
281 msg = self.inner.lock().thread_msg_queue.pop_front();
282 }
283 } else if packet.key() & IOB_PEER_CLOSED_KEY_BASE != 0 {
284 iob_peer_closed.push(packet.key() as u32);
285 } else {
286 sockets_ready.push(SocketId(packet.key() as u32))
287 }
288 }
289 Err(zx::Status::TIMED_OUT) => break,
290 Err(status) => panic!("port wait error: {status:?}"),
291 }
292 deadline = zx::MonotonicInstant::INFINITE_PAST;
293 }
294
295 let mut inner = InnerGuard::new(self);
296
297 if !iob_peer_closed.is_empty() {
298 inner.update_message_ids(inner.ring_buffer.head());
300
301 for iob_peer_closed in iob_peer_closed.drain(..) {
302 let container_id = inner.iob_peers.free(iob_peer_closed).0;
303 if let Some(container) = inner.containers.get_mut(container_id) {
304 container.iob_count -= 1;
305 if container.iob_count == 0 && !container.is_active() {
306 if container.should_free() {
307 inner.containers.free(container_id);
308 } else {
309 let identity = Arc::clone(&container.identity);
310 inner.on_inactive.push(identity);
311 }
312 }
313 }
314 }
315 }
316
317 {
318 let mut sockets = self.sockets.lock();
319 for socket_id in sockets_ready.drain(..) {
320 inner.read_socket(&mut sockets, socket_id, |socket| {
321 socket
322 .socket
323 .wait_async_handle(
324 &self.port,
325 socket_id.0 as u64,
326 zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
327 zx::WaitAsyncOpts::empty(),
328 )
329 .unwrap();
330 });
331 }
332 }
333
334 if let Some(m) = msg.take() {
336 match m {
337 ThreadMessage::Terminate => return,
338 ThreadMessage::Flush(sender) => {
339 let _ = sender.send(());
340 }
341 }
342
343 msg = inner.thread_msg_queue.pop_front();
345 if msg.is_none() {
346 self.event.signal_handle(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
350 }
351 }
352 }
353 }
354
355 async fn buffer_monitor_task(
356 this: Weak<Self>,
357 mut ring_buffer: ring_buffer::Reader,
358 sleep_time: Duration,
359 ) {
360 let mut last_head = 0;
361 loop {
362 fasync::Timer::new(sleep_time).await;
364 let head = ring_buffer.wait(last_head).await;
365 let Some(this) = this.upgrade() else { return };
366 let mut inner = InnerGuard::new(&this);
367 inner.check_space(head);
368 last_head = head;
369 }
370 }
371}
372
373impl Inner {
374 fn ingest(&mut self, msg: &[u8], container_id: ContainerId) {
376 if msg.len() < std::mem::size_of::<Header>() {
377 debug!("message too short ({})", msg.len());
378 if let Some(container) = self.containers.get(container_id) {
379 container.stats.increment_invalid(msg.len());
380 }
381 return;
382 }
383
384 let header = Header::read_from_bytes(&msg[..std::mem::size_of::<Header>()]).unwrap();
385
386 let msg_len = header.size_words() as usize * 8;
389
390 if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg.len() < msg_len {
392 debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, msg.len());
393 if let Some(container) = self.containers.get(container_id) {
394 container.stats.increment_invalid(msg.len());
395 }
396 return;
397 }
398
399 let Some(container) = self.containers.get_mut(container_id) else {
400 return;
401 };
402
403 let mut data;
404 let msg = if container.dropped_count > 0 {
405 data = msg.to_vec();
406 if !add_dropped_count(&mut data, container.dropped_count) {
407 debug!("unable to add dropped count to invalid message");
408 container.stats.increment_invalid(data.len());
409 return;
410 }
411 &data
412 } else {
413 msg
414 };
415
416 if container.iob.write(Default::default(), 0, msg).is_err() {
417 container.dropped_count += 1
421 } else {
422 container.dropped_count = 0;
423 }
424 }
425
426 unsafe fn parse_message(
438 &self,
439 range: Range<u64>,
440 ) -> (ContainerId, &[u8], Option<zx::BootInstant>) {
441 let (tag, msg) = unsafe { self.ring_buffer.first_message_in(range) }
442 .expect("Unable to read message from ring buffer");
443 (
444 ContainerId(tag as u32),
445 msg,
446 (msg.len() >= 16)
447 .then(|| zx::BootInstant::from_nanos(i64::read_from_bytes(&msg[8..16]).unwrap())),
448 )
449 }
450}
451
452impl<'a> InnerGuard<'a> {
453 fn new(buffer: &'a SharedBuffer) -> Self {
454 Self {
455 buffer,
456 guard: ManuallyDrop::new(buffer.inner.lock()),
457 on_inactive: Vec::new(),
458 wake: false,
459 }
460 }
461
462 fn pop(&mut self, head: u64) -> Option<usize> {
470 if head == self.tail {
471 return None;
472 }
473
474 let record_len = {
477 let (container_id, message, timestamp) = unsafe { self.parse_message(self.tail..head) };
478 let record_len = ring_buffer_record_len(message.len());
479
480 let container = self.containers.get_mut(container_id).unwrap();
481
482 container.stats.increment_rolled_out(record_len);
483 container.msg_ids.start += 1;
484 if let Some(timestamp) = timestamp {
485 container.last_rolled_out_timestamp = timestamp;
486 }
487 if !container.is_active() {
488 if container.should_free() {
489 self.containers.free(container_id);
490 } else {
491 let identity = Arc::clone(&container.identity);
492 self.on_inactive.push(identity);
493 }
494 }
495
496 record_len
497 };
498
499 self.ring_buffer.increment_tail(record_len);
501 self.tail += record_len as u64;
502
503 assert!(self.last_scanned >= self.tail);
505
506 Some(record_len)
507 }
508
509 fn read_socket(
511 &mut self,
512 sockets: &mut Slab<Socket>,
513 socket_id: SocketId,
514 rearm: impl FnOnce(&mut Socket),
515 ) {
516 let Some(socket) = sockets.get_mut(socket_id.0) else { return };
517 let container_id = socket.container_id;
518
519 loop {
520 self.check_space(self.ring_buffer.head());
521
522 let mut data = Vec::with_capacity(MAX_DATAGRAM_LEN_BYTES as usize);
523
524 let len = match socket.socket.read_uninit(data.spare_capacity_mut()) {
526 Ok(d) => d.len(),
527 Err(zx::Status::SHOULD_WAIT) => {
528 rearm(socket);
530 return;
531 }
532 Err(_) => break,
533 };
534
535 unsafe {
537 data.set_len(len);
538 }
539
540 let container = self.containers.get_mut(container_id).unwrap();
541 if data.len() < 16 {
542 container.stats.increment_invalid(data.len());
543 continue;
544 }
545
546 let header = Header::read_from_bytes(&data[..std::mem::size_of::<Header>()]).unwrap();
547 let msg_len = header.size_words() as usize * 8;
548 if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg_len != data.len() {
549 debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, data.len());
550 container.stats.increment_invalid(data.len());
551 continue;
552 }
553
554 if container.dropped_count > 0 && !add_dropped_count(&mut data, container.dropped_count)
555 {
556 debug!("unable to add dropped count to invalid message");
557 container.stats.increment_invalid(data.len());
558 continue;
559 }
560
561 if container.iob.write(Default::default(), 0, &data).is_err() {
562 container.dropped_count += 1
566 } else {
567 container.dropped_count = 0;
568 }
569 }
570
571 self.update_message_ids(self.ring_buffer.head());
575
576 let container = self.containers.get_mut(container_id).unwrap();
577 container.remove_socket(socket_id, sockets);
578 if !container.is_active() {
579 if container.should_free() {
580 self.containers.free(container_id);
581 } else {
582 let identity = Arc::clone(&container.identity);
583 self.on_inactive.push(identity);
584 }
585 }
586 }
587
588 fn update_message_ids(&mut self, head: u64) {
590 while self.last_scanned < head {
591 let (container_id, message, _) = unsafe { self.parse_message(self.last_scanned..head) };
594 let msg_len = message.len();
595 let severity = (msg_len >= 8)
596 .then(|| Header::read_from_bytes(&message[0..8]).unwrap().severity().into());
597 let container = self.containers.get_mut(container_id).unwrap();
598 container.msg_ids.end += 1;
599 if let Some(severity) = severity {
600 container.stats.ingest_message(msg_len, severity);
601 }
602 self.last_scanned += ring_buffer_record_len(msg_len) as u64;
603 self.wake = true;
604 }
605 }
606
607 fn check_space(&mut self, head: u64) {
609 self.update_message_ids(head);
610 let capacity = self.ring_buffer.capacity();
611 let mut space = capacity
612 .checked_sub((head - self.tail) as usize)
613 .unwrap_or_else(|| panic!("bad range: {:?}", self.tail..head));
614 let required_space = capacity * SPACE_THRESHOLD_NUMERATOR / SPACE_THRESHOLD_DENOMINATOR;
615 while space < required_space {
616 let Some(amount) = self.pop(head) else { break };
617 space += amount;
618 }
619 }
620}
621
622#[derive(Default)]
623struct Containers {
624 slab: Slab<ContainerInfo>,
625}
626
627#[derive(Clone, Copy, Debug)]
628struct ContainerId(u32);
629
630impl Containers {
631 #[cfg(test)]
632 fn len(&self) -> usize {
633 self.slab.len()
634 }
635
636 fn get(&self, id: ContainerId) -> Option<&ContainerInfo> {
637 self.slab.get(id.0)
638 }
639
640 fn get_mut(&mut self, id: ContainerId) -> Option<&mut ContainerInfo> {
641 self.slab.get_mut(id.0)
642 }
643
644 fn new_container(
645 &mut self,
646 buffer: &RingBuffer,
647 identity: Arc<ComponentIdentity>,
648 stats: Arc<LogStreamStats>,
649 ) -> ContainerId {
650 ContainerId(self.slab.insert(|id| {
651 let (iob, _) = buffer.new_iob_writer(id as u64).unwrap();
652 ContainerInfo::new(identity, stats, iob)
653 }))
654 }
655
656 fn free(&mut self, id: ContainerId) {
657 self.slab.free(id.0);
658 }
659}
660
661#[derive(Derivative)]
662#[derivative(Debug)]
663struct ContainerInfo {
664 refs: usize,
666
667 identity: Arc<ComponentIdentity>,
669
670 first_index: u64,
674
675 msg_ids: Range<u64>,
678
679 terminated: bool,
681
682 #[derivative(Debug = "ignore")]
684 stats: Arc<LogStreamStats>,
685
686 last_rolled_out_timestamp: zx::BootInstant,
688
689 first_socket_id: SocketId,
691
692 iob: zx::Iob,
694
695 iob_count: usize,
697
698 dropped_count: u64,
700}
701
702impl ContainerInfo {
703 fn new(identity: Arc<ComponentIdentity>, stats: Arc<LogStreamStats>, iob: zx::Iob) -> Self {
704 Self {
705 refs: 0,
706 identity,
707 first_index: 0,
708 msg_ids: 0..0,
709 terminated: false,
710 stats,
711 last_rolled_out_timestamp: zx::BootInstant::ZERO,
712 first_socket_id: SocketId::NULL,
713 iob,
714 iob_count: 0,
715 dropped_count: 0,
716 }
717 }
718
719 fn should_free(&self) -> bool {
720 self.terminated && self.refs == 0 && !self.is_active()
721 }
722
723 fn is_active(&self) -> bool {
729 self.first_socket_id != SocketId::NULL
730 || self.iob_count > 0
731 || self.msg_ids.end != self.msg_ids.start
732 || ARCHIVIST_MONIKER.get().is_some_and(|m| *self.identity == *m)
733 }
734
735 fn remove_socket(&mut self, socket_id: SocketId, sockets: &mut Slab<Socket>) {
739 let Socket { prev, next, .. } = *sockets.get(socket_id.0).unwrap();
740 if prev == SocketId::NULL {
741 self.first_socket_id = next;
742 } else {
743 sockets.get_mut(prev.0).unwrap().next = next;
744 }
745 if next != SocketId::NULL {
746 sockets
747 .get_mut(next.0)
748 .unwrap_or_else(|| panic!("next {next:?} has been freed!"))
749 .prev = prev;
750 }
751 sockets.free(socket_id.0);
752 self.stats.close_socket();
753 debug!(identity:% = self.identity; "Socket closed.");
754 }
755}
756
757pub struct ContainerBuffer {
758 shared_buffer: Arc<SharedBuffer>,
759 container_id: ContainerId,
760}
761
762impl ContainerBuffer {
763 pub fn iob_tag(&self) -> u64 {
765 self.container_id.0 as u64
766 }
767
768 pub fn push_back(&self, msg: &[u8]) {
772 self.shared_buffer.inner.lock().ingest(msg, self.container_id);
773 }
774
775 pub fn iob(&self) -> zx::Iob {
777 let mut inner = self.shared_buffer.inner.lock();
778
779 inner.containers.get_mut(self.container_id).unwrap().iob_count += 1;
780
781 let (ep0, ep1) = inner.ring_buffer.new_iob_writer(self.container_id.0 as u64).unwrap();
782
783 inner.iob_peers.insert(|idx| {
784 ep1.wait_async_handle(
785 &self.shared_buffer.port,
786 idx as u64 | IOB_PEER_CLOSED_KEY_BASE,
787 zx::Signals::IOB_PEER_CLOSED,
788 zx::WaitAsyncOpts::empty(),
789 )
790 .unwrap();
791
792 (self.container_id, ep1)
793 });
794
795 ep0
796 }
797
798 pub fn cursor(&self, mode: StreamMode) -> Option<Cursor> {
800 let mut inner = InnerGuard::new(&self.shared_buffer);
804 let Some(mut container) = inner.containers.get_mut(self.container_id) else {
805 return None;
807 };
808
809 container.refs += 1;
810 let stats = Arc::clone(&container.stats);
811
812 let (index, next_id, end) = match mode {
813 StreamMode::Snapshot => {
814 (container.first_index, container.msg_ids.start, CursorEnd::Snapshot(None))
815 }
816 StreamMode::Subscribe => {
817 let head = inner.ring_buffer.head();
819 inner.update_message_ids(head);
820 container = inner.containers.get_mut(self.container_id).unwrap();
821 (head, container.msg_ids.end, CursorEnd::Stream)
822 }
823 StreamMode::SnapshotThenSubscribe => {
824 (container.first_index, container.msg_ids.start, CursorEnd::Stream)
825 }
826 };
827
828 Some(Cursor {
829 index,
830 container_id: self.container_id,
831 next_id,
832 end,
833 buffer: Arc::clone(&self.shared_buffer),
834 waker_entry: self.shared_buffer.inner.waker_entry(),
835 stats,
836 })
837 }
838
839 pub fn terminate(&self) {
843 let mut inner = InnerGuard::new(&self.shared_buffer);
844
845 inner.update_message_ids(inner.ring_buffer.head());
847
848 if let Some(container) = inner.containers.get_mut(self.container_id) {
849 container.terminated = true;
850 if container.first_socket_id != SocketId::NULL {
851 let mut sockets = self.shared_buffer.sockets.lock();
852 loop {
853 container.remove_socket(container.first_socket_id, &mut sockets);
854 if container.first_socket_id == SocketId::NULL {
855 break;
856 }
857 }
858 }
859 if container.should_free() {
860 inner.containers.free(self.container_id);
861 }
862 inner.wake = true;
863 }
864 }
865
866 pub fn is_active(&self) -> bool {
868 self.shared_buffer
869 .inner
870 .lock()
871 .containers
872 .get(self.container_id)
873 .is_some_and(|c| c.is_active())
874 }
875
876 pub fn add_socket(&self, socket: zx::Socket) {
878 let mut inner = self.shared_buffer.inner.lock();
879 let Some(container) = inner.containers.get_mut(self.container_id) else { return };
880 container.stats.open_socket();
881 let next = container.first_socket_id;
882 let mut sockets = self.shared_buffer.sockets.lock();
883 let socket_id = SocketId(sockets.insert(|socket_id| {
884 socket
885 .wait_async_handle(
886 &self.shared_buffer.port,
887 socket_id as u64,
888 zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
889 zx::WaitAsyncOpts::empty(),
890 )
891 .unwrap();
892 Socket { socket, container_id: self.container_id, prev: SocketId::NULL, next }
893 }));
894 if next != SocketId::NULL {
895 sockets.get_mut(next.0).unwrap().prev = socket_id;
896 }
897 container.first_socket_id = socket_id;
898 }
899}
900
901impl Drop for ContainerBuffer {
902 fn drop(&mut self) {
903 self.terminate();
904 }
905}
906
907#[pin_project(PinnedDrop)]
908#[derive(Derivative)]
909#[derivative(Debug)]
910pub struct Cursor {
911 index: u64,
913
914 container_id: ContainerId,
915
916 next_id: u64,
918
919 end: CursorEnd,
921
922 #[derivative(Debug = "ignore")]
923 buffer: Arc<SharedBuffer>,
924
925 #[pin]
927 #[derivative(Debug = "ignore")]
928 waker_entry: WakerEntry<Inner>,
929
930 #[derivative(Debug = "ignore")]
931 stats: Arc<LogStreamStats>,
932}
933
934#[derive(Debug, PartialEq)]
936pub enum LazyItem<T> {
937 Next(Arc<T>),
939 ItemsRolledOut(u64, zx::BootInstant),
941}
942
943impl Stream for Cursor {
944 type Item = LazyItem<StoredMessage>;
945
946 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
947 let mut this = self.project();
948 let mut inner = InnerGuard::new(this.buffer);
949
950 let mut head = inner.ring_buffer.head();
951 inner.check_space(head);
952
953 let mut container = match inner.containers.get(*this.container_id) {
954 None => return Poll::Ready(None),
955 Some(container) => container,
956 };
957
958 let end_id = match &mut this.end {
959 CursorEnd::Snapshot(None) => {
960 let mut sockets = this.buffer.sockets.lock();
961 let mut socket_id = container.first_socket_id;
964 while socket_id != SocketId::NULL {
965 let socket = sockets.get_mut(socket_id.0).unwrap();
966 let next = socket.next;
967 inner.read_socket(&mut sockets, socket_id, |_| {});
969 socket_id = next;
970 }
971
972 head = inner.ring_buffer.head();
974 inner.update_message_ids(head);
975 container = inner.containers.get(*this.container_id).unwrap();
976 *this.end = CursorEnd::Snapshot(Some(container.msg_ids.end));
977 container.msg_ids.end
978 }
979 CursorEnd::Snapshot(Some(end)) => *end,
980 CursorEnd::Stream => u64::MAX,
981 };
982
983 if *this.next_id == end_id {
984 return Poll::Ready(None);
985 }
986
987 if container.msg_ids.start > *this.next_id {
989 let mut next_id = container.msg_ids.start;
990 if end_id < next_id {
991 next_id = end_id;
992 }
993 let rolled_out = next_id - *this.next_id;
994 *this.next_id = next_id;
995 return Poll::Ready(Some(LazyItem::ItemsRolledOut(
996 rolled_out,
997 container.last_rolled_out_timestamp,
998 )));
999 }
1000
1001 if inner.tail > *this.index {
1002 *this.index = inner.tail;
1003 }
1004
1005 if container.first_index > *this.index {
1007 *this.index = container.first_index;
1008 }
1009
1010 if *this.next_id == container.msg_ids.end && *this.index < inner.last_scanned {
1011 *this.index = inner.last_scanned;
1012 }
1013
1014 while *this.index < head {
1016 let (container_id, message, _timestamp) =
1018 unsafe { inner.parse_message(*this.index..head) };
1019
1020 *this.index += ring_buffer_record_len(message.len()) as u64;
1022 assert!(*this.index <= head);
1023
1024 if container_id.0 == this.container_id.0 {
1025 *this.next_id += 1;
1026 if let Some(msg) = StoredMessage::new(message.into(), this.stats) {
1027 return Poll::Ready(Some(LazyItem::Next(Arc::new(msg))));
1028 } else {
1029 }
1031 }
1032 }
1033
1034 if container.terminated {
1035 Poll::Ready(None)
1036 } else {
1037 inner.guard.add_waker(this.waker_entry, cx.waker().clone());
1038 Poll::Pending
1039 }
1040 }
1041}
1042
1043#[pinned_drop]
1044impl PinnedDrop for Cursor {
1045 fn drop(self: Pin<&mut Self>) {
1046 let mut inner = self.buffer.inner.lock();
1047 if let Some(container) = inner.containers.get_mut(self.container_id) {
1048 container.refs -= 1;
1049 if container.should_free() {
1050 inner.containers.free(self.container_id);
1051 }
1052 }
1053 }
1054}
1055
1056#[derive(Debug)]
1057enum CursorEnd {
1058 Snapshot(Option<u64>),
1061 Stream,
1062}
1063
1064struct Slab<T> {
1066 slab: Vec<Slot<T>>,
1067 free_index: usize,
1068}
1069
1070impl<T> Default for Slab<T> {
1071 fn default() -> Self {
1072 Self { slab: Vec::new(), free_index: usize::MAX }
1073 }
1074}
1075
1076enum Slot<T> {
1077 Used(T),
1078 Free(usize),
1079}
1080
1081impl<T> Slab<T> {
1082 #[cfg(test)]
1084 fn len(&self) -> usize {
1085 self.slab.iter().filter(|c| matches!(c, Slot::Used(_))).count()
1086 }
1087
1088 fn free(&mut self, index: u32) -> T {
1089 let index = index as usize;
1090 let value = match std::mem::replace(&mut self.slab[index], Slot::Free(self.free_index)) {
1091 Slot::Free(_) => panic!("Slot already free"),
1092 Slot::Used(value) => value,
1093 };
1094 self.free_index = index;
1095 value
1096 }
1097
1098 fn get(&self, id: u32) -> Option<&T> {
1099 self.slab.get(id as usize).and_then(|s| match s {
1100 Slot::Used(s) => Some(s),
1101 _ => None,
1102 })
1103 }
1104
1105 fn get_mut(&mut self, id: u32) -> Option<&mut T> {
1106 self.slab.get_mut(id as usize).and_then(|s| match s {
1107 Slot::Used(s) => Some(s),
1108 _ => None,
1109 })
1110 }
1111
1112 fn insert(&mut self, value: impl FnOnce(u32) -> T) -> u32 {
1113 let free_index = self.free_index;
1114 if free_index != usize::MAX {
1115 self.free_index = match std::mem::replace(
1116 &mut self.slab[free_index],
1117 Slot::Used(value(free_index as u32)),
1118 ) {
1119 Slot::Free(next) => next,
1120 _ => unreachable!(),
1121 };
1122 free_index as u32
1123 } else {
1124 assert!(self.slab.len() < u32::MAX as usize);
1127 self.slab.push(Slot::Used(value(self.slab.len() as u32)));
1128 (self.slab.len() - 1) as u32
1129 }
1130 }
1131}
1132
1133#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1134struct SocketId(u32);
1135
1136impl SocketId {
1137 const NULL: Self = SocketId(0xffff_ffff);
1138}
1139
1140struct Socket {
1141 socket: zx::Socket,
1142 container_id: ContainerId,
1143 prev: SocketId,
1145 next: SocketId,
1146}
1147
1148#[cfg(test)]
1149mod tests {
1150 use super::{SharedBuffer, SharedBufferOptions, create_ring_buffer};
1151 use crate::logs::shared_buffer::LazyItem;
1152 use crate::logs::stats::LogStreamStats;
1153 use crate::logs::testing::make_message;
1154 use assert_matches::assert_matches;
1155 use diagnostics_assertions::{AnyProperty, assert_data_tree};
1156 use fidl_fuchsia_diagnostics::StreamMode;
1157 use fuchsia_async as fasync;
1158 use fuchsia_async::TimeoutExt;
1159 use fuchsia_inspect::{Inspector, InspectorConfig};
1160 use fuchsia_inspect_derive::WithInspect;
1161 use futures::channel::mpsc;
1162 use futures::future::OptionFuture;
1163 use futures::stream::{FuturesUnordered, StreamExt as _};
1164 use futures::{FutureExt, poll};
1165 use ring_buffer::MAX_MESSAGE_SIZE;
1166 use std::future::poll_fn;
1167 use std::pin::pin;
1168 use std::sync::Arc;
1169 use std::sync::atomic::{AtomicU64, Ordering};
1170 use std::task::Poll;
1171 use std::time::Duration;
1172
1173 async fn yield_to_executor() {
1174 let mut first_time = true;
1175 poll_fn(|cx| {
1176 if first_time {
1177 cx.waker().wake_by_ref();
1178 first_time = false;
1179 Poll::Pending
1180 } else {
1181 Poll::Ready(())
1182 }
1183 })
1184 .await;
1185 }
1186
1187 #[fuchsia::test]
1188 async fn push_one_message() {
1189 let buffer = SharedBuffer::new(
1190 create_ring_buffer(MAX_MESSAGE_SIZE),
1191 Box::new(|_| {}),
1192 Default::default(),
1193 );
1194 let container_buffer =
1195 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1196 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1197 container_buffer.push_back(msg.bytes());
1198
1199 let cursor = container_buffer.cursor(StreamMode::Snapshot).unwrap();
1201 assert_eq!(
1202 cursor
1203 .map(|item| {
1204 match item {
1205 LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1206 _ => panic!("Unexpected item {item:?}"),
1207 }
1208 })
1209 .count()
1210 .await,
1211 1
1212 );
1213 }
1214
1215 #[fuchsia::test]
1216 async fn message_too_short() {
1217 let buffer = SharedBuffer::new(
1218 create_ring_buffer(MAX_MESSAGE_SIZE),
1219 Box::new(|_| {}),
1220 Default::default(),
1221 );
1222
1223 let container_buffer =
1224 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1225 container_buffer.push_back(&[0]);
1226
1227 assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1228 }
1229
1230 #[fuchsia::test]
1231 async fn bad_type() {
1232 let buffer = SharedBuffer::new(
1233 create_ring_buffer(MAX_MESSAGE_SIZE),
1234 Box::new(|_| {}),
1235 Default::default(),
1236 );
1237 let container_buffer =
1238 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1239 container_buffer.push_back(&[0x77; 16]);
1240
1241 assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1242 }
1243
1244 #[fuchsia::test]
1245 async fn message_truncated() {
1246 let buffer = SharedBuffer::new(
1247 create_ring_buffer(MAX_MESSAGE_SIZE),
1248 Box::new(|_| {}),
1249 Default::default(),
1250 );
1251 let container_buffer =
1252 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1253 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1254 container_buffer.push_back(&msg.bytes()[..msg.bytes().len() - 1]);
1255
1256 assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1257 }
1258
1259 #[fuchsia::test]
1260 async fn buffer_wrapping() {
1261 let buffer = SharedBuffer::new(
1262 create_ring_buffer(MAX_MESSAGE_SIZE),
1263 Box::new(|_| {}),
1264 SharedBufferOptions { sleep_time: Duration::ZERO },
1265 );
1266 let container_buffer =
1267 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1268
1269 let mut i = 0;
1271 loop {
1272 let msg = make_message(&format!("{i}"), None, zx::BootInstant::from_nanos(i));
1273 container_buffer.push_back(msg.bytes());
1274 i += 1;
1275
1276 yield_to_executor().await;
1278
1279 let inner = buffer.inner.lock();
1280 if inner.ring_buffer.head() > inner.ring_buffer.capacity() as u64 {
1281 break;
1282 }
1283 }
1284
1285 let mut cursor = pin!(container_buffer.cursor(StreamMode::Snapshot).unwrap());
1287
1288 let mut j;
1289 let mut item = cursor.next().await;
1290 if let Some(LazyItem::ItemsRolledOut(_, _)) = item {
1293 item = cursor.next().await;
1294 }
1295 assert_matches!(
1296 item,
1297 Some(LazyItem::Next(item)) => {
1298 j = item.timestamp().into_nanos();
1299 let msg = make_message(&format!("{j}"),
1300 None,
1301 item.timestamp());
1302 assert_eq!(&*item, &msg);
1303 }
1304 );
1305
1306 j += 1;
1307 while j != i {
1308 assert_matches!(
1309 cursor.next().await,
1310 Some(LazyItem::Next(item)) => {
1311 assert_eq!(&*item, &make_message(&format!("{j}"),
1312 None,
1313 item.timestamp()));
1314 }
1315 );
1316 j += 1;
1317 }
1318
1319 assert_eq!(cursor.next().await, None);
1320 }
1321
1322 #[fuchsia::test]
1323 async fn on_inactive() {
1324 let identity = Arc::new(vec!["a"].into());
1325 let on_inactive = Arc::new(AtomicU64::new(0));
1326 let buffer = {
1327 let on_inactive = Arc::clone(&on_inactive);
1328 let identity = Arc::clone(&identity);
1329 Arc::new(SharedBuffer::new(
1330 create_ring_buffer(MAX_MESSAGE_SIZE),
1331 Box::new(move |i| {
1332 assert_eq!(i, identity);
1333 on_inactive.fetch_add(1, Ordering::Relaxed);
1334 }),
1335 SharedBufferOptions { sleep_time: Duration::ZERO },
1336 ))
1337 };
1338 let container_a = buffer.new_container_buffer(identity, Arc::default());
1339 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1340
1341 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1342 container_a.push_back(msg.bytes());
1343
1344 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1346 container_b.push_back(msg.bytes());
1347
1348 yield_to_executor().await;
1350 }
1351
1352 assert_eq!(on_inactive.load(Ordering::Relaxed), 1);
1353 }
1354
1355 #[fuchsia::test]
1356 async fn terminate_drops_container() {
1357 async {}.await;
1359
1360 let buffer = SharedBuffer::new(
1361 create_ring_buffer(MAX_MESSAGE_SIZE),
1362 Box::new(|_| {}),
1363 SharedBufferOptions { sleep_time: Duration::ZERO },
1364 );
1365
1366 let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1368 assert_eq!(buffer.container_count(), 1);
1369 container_a.terminate();
1370
1371 assert_eq!(buffer.container_count(), 0);
1372
1373 let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1375 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1376 container_a.push_back(msg.bytes());
1377 assert_eq!(buffer.container_count(), 1);
1378 container_a.terminate();
1379
1380 assert_eq!(buffer.container_count(), 1);
1382
1383 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1385 assert_eq!(buffer.container_count(), 2);
1386
1387 while buffer.container_count() != 1 {
1390 container_b.push_back(msg.bytes());
1391
1392 yield_to_executor().await;
1394 }
1395
1396 assert!(container_a.cursor(StreamMode::Subscribe).is_none());
1397 }
1398
1399 #[fuchsia::test]
1400 async fn cursor_subscribe() {
1401 for mode in [StreamMode::Subscribe, StreamMode::SnapshotThenSubscribe] {
1402 let buffer = SharedBuffer::new(
1403 create_ring_buffer(MAX_MESSAGE_SIZE),
1404 Box::new(|_| {}),
1405 Default::default(),
1406 );
1407 let container =
1408 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1409 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1410 container.push_back(msg.bytes());
1411
1412 let (sender, mut receiver) = mpsc::unbounded();
1413
1414 {
1416 let container = Arc::clone(&container);
1417 fasync::Task::spawn(async move {
1418 let mut cursor = pin!(container.cursor(mode).unwrap());
1419 while let Some(item) = cursor.next().await {
1420 sender.unbounded_send(item).unwrap();
1421 }
1422 })
1423 .detach();
1424 }
1425
1426 if mode == StreamMode::SnapshotThenSubscribe {
1428 assert_matches!(
1429 receiver.next().await,
1430 Some(LazyItem::Next(item)) if item.bytes() == msg.bytes()
1431 );
1432 }
1433
1434 assert_eq!(
1436 OptionFuture::from(Some(receiver.next()))
1437 .on_timeout(Duration::from_millis(500), || None)
1438 .await,
1439 None
1440 );
1441
1442 container.push_back(msg.bytes());
1443
1444 assert_matches!(
1446 receiver.next().await,
1447 Some(LazyItem::Next(item)) if item.bytes() == msg.bytes()
1448 );
1449
1450 container.terminate();
1451
1452 assert!(receiver.next().await.is_none());
1454 }
1455 }
1456
1457 #[fuchsia::test]
1458 async fn cursor_rolled_out() {
1459 for pass in 0..2 {
1462 let buffer = SharedBuffer::new(
1463 create_ring_buffer(MAX_MESSAGE_SIZE),
1464 Box::new(|_| {}),
1465 SharedBufferOptions { sleep_time: Duration::ZERO },
1466 );
1467 let container_a =
1468 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1469 let container_b =
1470 Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1471 let container_c =
1472 Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), Arc::default()));
1473 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1474
1475 container_a.push_back(msg.bytes());
1476 container_a.push_back(msg.bytes());
1477 container_b.push_back(msg.bytes());
1478
1479 const A_MESSAGE_COUNT: usize = 50;
1480 for _ in 0..A_MESSAGE_COUNT - 2 {
1481 container_a.push_back(msg.bytes());
1482 }
1483
1484 let mut cursor = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1485
1486 let mut expected = A_MESSAGE_COUNT;
1487
1488 if pass == 0 {
1490 assert!(cursor.next().await.is_some());
1491 expected -= 1;
1492 }
1493
1494 while container_b.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1496 container_c.push_back(msg.bytes());
1497
1498 yield_to_executor().await;
1500 }
1501
1502 assert_matches!(
1504 cursor.next().await,
1505 Some(LazyItem::ItemsRolledOut(rolled_out, t))
1506 if t == zx::BootInstant::from_nanos(1) && rolled_out > 0
1507 => expected -= rolled_out as usize
1508 );
1509
1510 assert_eq!(cursor.count().await, expected);
1512 }
1513 }
1514
1515 #[fuchsia::test]
1516 async fn drained_post_termination_cursors() {
1517 let buffer = SharedBuffer::new(
1518 create_ring_buffer(MAX_MESSAGE_SIZE),
1519 Box::new(|_| {}),
1520 Default::default(),
1521 );
1522 let container =
1523 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1524 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1525
1526 let mut cursor_a = pin!(container.cursor(StreamMode::Subscribe).unwrap());
1527 let mut cursor_b = pin!(container.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1528
1529 container.push_back(msg.bytes());
1530 container.push_back(msg.bytes());
1531 container.push_back(msg.bytes());
1532 container.push_back(msg.bytes());
1533 container.push_back(msg.bytes());
1534
1535 let mut cursor_c = pin!(container.cursor(StreamMode::Snapshot).unwrap());
1536 assert!(cursor_a.next().await.is_some());
1537 assert!(cursor_b.next().await.is_some());
1538 assert!(cursor_c.next().await.is_some());
1539
1540 container.terminate();
1541
1542 assert_eq!(cursor_a.count().await, 4);
1544 assert_eq!(cursor_b.count().await, 4);
1545 assert_eq!(cursor_c.count().await, 4);
1546 }
1547
1548 #[fuchsia::test]
1549 async fn empty_post_termination_cursors() {
1550 let buffer = SharedBuffer::new(
1551 create_ring_buffer(MAX_MESSAGE_SIZE),
1552 Box::new(|_| {}),
1553 Default::default(),
1554 );
1555 let container =
1556 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1557
1558 let cursor_a = container.cursor(StreamMode::Subscribe).unwrap();
1559 let cursor_b = container.cursor(StreamMode::SnapshotThenSubscribe).unwrap();
1560 let cursor_c = container.cursor(StreamMode::Snapshot).unwrap();
1561
1562 container.terminate();
1563
1564 assert_eq!(cursor_a.count().await, 0);
1565 assert_eq!(cursor_b.count().await, 0);
1566 assert_eq!(cursor_c.count().await, 0);
1567 }
1568
1569 #[fuchsia::test]
1570 async fn snapshot_then_subscribe_works_when_only_dropped_notifications_are_returned() {
1571 let buffer = SharedBuffer::new(
1572 create_ring_buffer(MAX_MESSAGE_SIZE),
1573 Box::new(|_| {}),
1574 SharedBufferOptions { sleep_time: Duration::ZERO },
1575 );
1576 let container_a =
1577 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1578 let container_b =
1579 Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1580 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1581 container_a.push_back(msg.bytes());
1582 container_a.push_back(msg.bytes());
1583 container_a.push_back(msg.bytes());
1584 let mut cursor = pin!(container_a.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1585
1586 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await > 0 {
1588 container_b.push_back(msg.bytes());
1589
1590 yield_to_executor().await;
1592 }
1593
1594 assert_matches!(cursor.next().await, Some(LazyItem::ItemsRolledOut(3, _)));
1595
1596 assert!(poll!(cursor.next()).is_pending());
1597
1598 container_a.terminate();
1599 assert_eq!(cursor.count().await, 0);
1600 }
1601
1602 #[fuchsia::test]
1603 async fn recycled_container_slot() {
1604 let buffer = Arc::new(SharedBuffer::new(
1605 create_ring_buffer(MAX_MESSAGE_SIZE),
1606 Box::new(|_| {}),
1607 SharedBufferOptions { sleep_time: Duration::ZERO },
1608 ));
1609 let container_a =
1610 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1611 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1612 container_a.push_back(msg.bytes());
1613
1614 let mut cursor = pin!(container_a.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1615 assert_matches!(cursor.next().await, Some(LazyItem::Next(_)));
1616
1617 let container_b =
1619 Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1620 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await > 0 {
1621 container_b.push_back(msg.bytes());
1622
1623 yield_to_executor().await;
1625 }
1626
1627 container_a.terminate();
1628
1629 let container_c =
1632 Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), Arc::default()));
1633 container_c.push_back(msg.bytes());
1634 container_c.push_back(msg.bytes());
1635
1636 assert_matches!(cursor.next().await, None);
1638 }
1639
1640 #[fuchsia::test]
1641 async fn socket_increments_logstats() {
1642 let inspector = Inspector::new(InspectorConfig::default());
1643 let stats: Arc<LogStreamStats> =
1644 Arc::new(LogStreamStats::default().with_inspect(inspector.root(), "test").unwrap());
1645 let buffer = Arc::new(SharedBuffer::new(
1646 create_ring_buffer(65536),
1647 Box::new(|_| {}),
1648 Default::default(),
1649 ));
1650 let container_a = Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats));
1651 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1652
1653 let (local, remote) = zx::Socket::create_datagram();
1654 container_a.add_socket(remote);
1655
1656 let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1657
1658 let mut futures = FuturesUnordered::new();
1661 futures.push(async move {
1662 let mut cursor_a = pin!(cursor_a);
1663 cursor_a.next().await
1664 });
1665 let mut next = futures.next();
1666 assert!(futures::poll!(&mut next).is_pending());
1667
1668 local.write(msg.bytes()).unwrap();
1669
1670 let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1671
1672 assert_eq!(
1673 cursor_b
1674 .map(|item| {
1675 match item {
1676 LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1677 _ => panic!("Unexpected item {item:?}"),
1678 }
1679 })
1680 .count()
1681 .await,
1682 1
1683 );
1684
1685 next.await;
1687 assert_data_tree!(
1689 inspector,
1690 root: contains {
1691 test: {
1692 url: "",
1693 last_timestamp: AnyProperty,
1694 sockets_closed: 0u64,
1695 sockets_opened: 1u64,
1696 invalid: {
1697 number: 0u64,
1698 bytes: 0u64,
1699 },
1700 total: {
1701 number: 1u64,
1702 bytes: 88u64,
1703 },
1704 rolled_out: {
1705 number: 0u64,
1706 bytes: 0u64,
1707 },
1708 trace: {
1709 number: 0u64,
1710 bytes: 0u64,
1711 },
1712 debug: {
1713 number: 1u64,
1714 bytes: 88u64,
1715 },
1716 info: {
1717 number: 0u64,
1718 bytes: 0u64,
1719 },
1720 warn: {
1721 number: 0u64,
1722 bytes: 0u64,
1723 },
1724 error: {
1725 number: 0u64,
1726 bytes: 0u64,
1727 },
1728 fatal: {
1729 number: 0u64,
1730 bytes: 0u64,
1731 },
1732 }
1733 }
1734 );
1735 }
1736
1737 #[fuchsia::test]
1738 async fn socket() {
1739 let buffer = Arc::new(SharedBuffer::new(
1740 create_ring_buffer(MAX_MESSAGE_SIZE),
1741 Box::new(|_| {}),
1742 Default::default(),
1743 ));
1744 let container_a =
1745 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1746 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1747
1748 let (local, remote) = zx::Socket::create_datagram();
1749 container_a.add_socket(remote);
1750
1751 let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1752
1753 let mut futures = FuturesUnordered::new();
1756 futures.push(async move {
1757 let mut cursor_a = pin!(cursor_a);
1758 cursor_a.next().await
1759 });
1760 let mut next = futures.next();
1761 assert!(futures::poll!(&mut next).is_pending());
1762
1763 local.write(msg.bytes()).unwrap();
1764
1765 let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1766
1767 assert_eq!(
1768 cursor_b
1769 .map(|item| {
1770 match item {
1771 LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1772 _ => panic!("Unexpected item {item:?}"),
1773 }
1774 })
1775 .count()
1776 .await,
1777 1
1778 );
1779
1780 next.await;
1782 }
1783
1784 #[fuchsia::test]
1785 async fn socket_on_inactive() {
1786 let on_inactive = Arc::new(AtomicU64::new(0));
1787 let a_identity = Arc::new(vec!["a"].into());
1788 let buffer = Arc::new(SharedBuffer::new(
1789 create_ring_buffer(MAX_MESSAGE_SIZE),
1790 {
1791 let on_inactive = Arc::clone(&on_inactive);
1792 let a_identity = Arc::clone(&a_identity);
1793 Box::new(move |id| {
1794 assert_eq!(id, a_identity);
1795 on_inactive.fetch_add(1, Ordering::Relaxed);
1796 })
1797 },
1798 SharedBufferOptions { sleep_time: Duration::ZERO },
1799 ));
1800 let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1801 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1802
1803 let (local, remote) = zx::Socket::create_datagram();
1804 container_a.add_socket(remote);
1805
1806 local.write(msg.bytes()).unwrap();
1807
1808 let cursor = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1809
1810 assert_eq!(
1811 cursor
1812 .map(|item| {
1813 match item {
1814 LazyItem::Next(item) => assert_eq!(item.bytes(), msg.bytes()),
1815 _ => panic!("Unexpected item {item:?}"),
1816 }
1817 })
1818 .count()
1819 .await,
1820 1
1821 );
1822
1823 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1825 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1826 container_b.push_back(msg.bytes());
1827
1828 yield_to_executor().await;
1830 }
1831
1832 assert_eq!(on_inactive.load(Ordering::Relaxed), 0);
1833
1834 std::mem::drop(local);
1836
1837 while on_inactive.load(Ordering::Relaxed) != 1 {
1839 fasync::Timer::new(Duration::from_millis(50)).await;
1840 }
1841 }
1842
1843 #[fuchsia::test]
1844 async fn flush() {
1845 let a_identity = Arc::new(vec!["a"].into());
1846 let buffer = Arc::new(SharedBuffer::new(
1847 create_ring_buffer(1024 * 1024),
1848 Box::new(|_| {}),
1849 Default::default(),
1850 ));
1851 let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1852 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1853
1854 let (local, remote) = zx::Socket::create_datagram();
1855 container_a.add_socket(remote);
1856
1857 let cursor = pin!(container_a.cursor(StreamMode::Subscribe).unwrap());
1858
1859 const COUNT: usize = 1000;
1860 for _ in 0..COUNT {
1861 local.write(msg.bytes()).unwrap();
1862 }
1863
1864 let mut flush_futures = FuturesUnordered::from_iter([buffer.flush(), buffer.flush()]);
1866 flush_futures.next().await;
1867
1868 let messages: Option<Vec<_>> = cursor.take(COUNT).collect().now_or_never();
1869 assert!(messages.is_some());
1870
1871 flush_futures.next().await;
1873
1874 buffer.terminate().await;
1876 }
1877}