1mod cursor;
6
7pub use cursor::{FilterCursor, FilterCursorStream, FxtMessage};
8
9use crate::identity::ComponentIdentity;
10use crate::logs::repository::ARCHIVIST_MONIKER;
11use crate::logs::stats::LogStreamStats;
12use derivative::Derivative;
13use diagnostics_log_encoding::encode::add_dropped_count;
14use diagnostics_log_encoding::{Header, TRACING_FORMAT_LOG_RECORD_TYPE};
15use fidl_fuchsia_diagnostics::{ComponentSelector, StreamMode};
16use fidl_fuchsia_logger::MAX_DATAGRAM_LEN_BYTES;
17use fuchsia_async as fasync;
18use fuchsia_async::condition::{Condition, ConditionGuard};
19use fuchsia_sync::Mutex;
20use futures::channel::oneshot;
21use log::debug;
22use ring_buffer::{self, RingBuffer, ring_buffer_record_len};
23use std::collections::VecDeque;
24use std::mem::ManuallyDrop;
25use std::ops::{Deref, DerefMut, Range};
26use std::sync::{Arc, Weak};
27use std::time::Duration;
28use zerocopy::FromBytes;
29
30const SPACE_THRESHOLD_NUMERATOR: usize = 1;
32const SPACE_THRESHOLD_DENOMINATOR: usize = 4;
33
34const DEFAULT_SLEEP_TIME: Duration = Duration::from_millis(200);
37
38pub fn create_ring_buffer(capacity: usize) -> ring_buffer::Reader {
39 RingBuffer::create(calculate_real_size_given_desired_capacity(capacity))
40}
41
42fn calculate_real_size_given_desired_capacity(capacity: usize) -> usize {
43 let page_size = zx::system_get_page_size() as usize;
47 (capacity * SPACE_THRESHOLD_DENOMINATOR
48 / (SPACE_THRESHOLD_DENOMINATOR - SPACE_THRESHOLD_NUMERATOR))
49 .next_multiple_of(page_size)
50}
51
52const IOB_PEER_CLOSED_KEY_BASE: u64 = 0x8000_0000_0000_0000;
53
54pub type OnInactive = Box<dyn Fn(Arc<ComponentIdentity>) + Send + Sync>;
55
56pub struct SharedBuffer {
57 inner: Condition<Inner>,
58
59 sockets: Mutex<Slab<Socket>>,
62
63 on_inactive: OnInactive,
65
66 port: zx::Port,
68
69 event: zx::Event,
71
72 socket_thread: Mutex<Option<std::thread::JoinHandle<()>>>,
74
75 _buffer_monitor_task: fasync::Task<()>,
78}
79
80struct InnerGuard<'a> {
81 buffer: &'a Arc<SharedBuffer>,
82
83 guard: ManuallyDrop<ConditionGuard<'a, Inner>>,
84
85 on_inactive: Vec<Arc<ComponentIdentity>>,
87
88 wake: bool,
90}
91
92impl Drop for InnerGuard<'_> {
93 fn drop(&mut self) {
94 if self.wake {
95 for waker in self.guard.drain_wakers() {
96 waker.wake();
97 }
98 }
99 unsafe {
101 ManuallyDrop::drop(&mut self.guard);
102 }
103 for identity in self.on_inactive.drain(..) {
104 (*self.buffer.on_inactive)(identity);
105 }
106 }
107}
108
109impl Deref for InnerGuard<'_> {
110 type Target = Inner;
111
112 fn deref(&self) -> &Self::Target {
113 &self.guard
114 }
115}
116
117impl DerefMut for InnerGuard<'_> {
118 fn deref_mut(&mut self) -> &mut Self::Target {
119 &mut self.guard
120 }
121}
122
123struct Inner {
124 ring_buffer: Arc<RingBuffer>,
126
127 containers: Containers,
129
130 thread_msg_queue: VecDeque<ThreadMessage>,
132
133 last_scanned: u64,
135
136 last_scanned_message_id: u64,
138
139 tail: u64,
141
142 tail_message_id: u64,
144
145 iob_peers: Slab<(ContainerId, zx::Iob)>,
147
148 terminated: bool,
150}
151
152enum ThreadMessage {
153 Terminate,
155
156 Flush(oneshot::Sender<()>),
158}
159
160pub struct SharedBufferOptions {
161 pub sleep_time: Duration,
164}
165
166impl Default for SharedBufferOptions {
167 fn default() -> Self {
168 Self { sleep_time: DEFAULT_SLEEP_TIME }
169 }
170}
171
172impl SharedBuffer {
173 pub fn new(
175 ring_buffer: ring_buffer::Reader,
176 on_inactive: OnInactive,
177 options: SharedBufferOptions,
178 ) -> Arc<Self> {
179 let this = Arc::new_cyclic(|weak: &Weak<Self>| Self {
180 inner: Condition::new(Inner {
181 ring_buffer: Arc::clone(&ring_buffer),
182 containers: Containers::default(),
183 thread_msg_queue: VecDeque::default(),
184 last_scanned: 0,
185 last_scanned_message_id: 0,
186 tail: 0,
187 tail_message_id: 0,
188 iob_peers: Slab::default(),
189 terminated: false,
190 }),
191 sockets: Mutex::new(Slab::default()),
192 on_inactive,
193 port: zx::Port::create(),
194 event: zx::Event::create(),
195 socket_thread: Mutex::default(),
196 _buffer_monitor_task: fasync::Task::spawn(Self::buffer_monitor_task(
197 Weak::clone(weak),
198 ring_buffer,
199 options.sleep_time,
200 )),
201 });
202
203 *this.socket_thread.lock() = Some({
204 let this = Arc::clone(&this);
205 std::thread::spawn(move || this.socket_thread(options.sleep_time))
206 });
207 this
208 }
209
210 pub fn new_container_buffer(
211 self: &Arc<Self>,
212 identity: Arc<ComponentIdentity>,
213 stats: Arc<LogStreamStats>,
214 ) -> ContainerBuffer {
215 let mut inner = self.inner.lock();
216 let Inner { containers, ring_buffer, .. } = &mut *inner;
217 ContainerBuffer {
218 shared_buffer: Arc::clone(self),
219 container_id: containers.new_container(ring_buffer, Arc::clone(&identity), stats),
220 }
221 }
222
223 pub async fn flush(&self) {
224 let (sender, receiver) = oneshot::channel();
225 self.inner.lock().thread_msg_queue.push_back(ThreadMessage::Flush(sender));
226 self.event.signal(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
227 let _ = receiver.await;
229 }
230
231 #[cfg(test)]
233 pub fn container_count(&self) -> usize {
234 self.inner.lock().containers.len()
235 }
236
237 pub fn terminate(self: &Arc<Self>) -> impl Future<Output = ()> {
241 {
242 let mut inner = InnerGuard::new(self);
243 self.flush_sockets(&mut inner);
244 inner.thread_msg_queue.push_back(ThreadMessage::Terminate);
245 }
246 self.event.signal(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
247 let join_handle = self.socket_thread.lock().take().unwrap();
248 fasync::unblock(|| {
249 let _ = join_handle.join();
250 })
251 }
252
253 pub fn cursor(
257 self: &Arc<Self>,
258 mode: StreamMode,
259 selectors: Vec<ComponentSelector>,
260 ) -> FilterCursor {
261 InnerGuard::new(self).cursor(mode, selectors)
262 }
263
264 fn socket_thread(self: Arc<Self>, sleep_time: Duration) {
265 const INTERRUPT_KEY: u64 = u64::MAX;
266 let mut sockets_ready = Vec::new();
267 let mut iob_peer_closed = Vec::new();
268 let mut interrupt_needs_arming = true;
269 let mut msg = None;
270
271 loop {
272 let mut deadline = if msg.is_some() {
273 zx::MonotonicInstant::INFINITE_PAST
274 } else {
275 if interrupt_needs_arming {
276 self.event
277 .wait_async(
278 &self.port,
279 INTERRUPT_KEY,
280 zx::Signals::USER_0,
281 zx::WaitAsyncOpts::empty(),
282 )
283 .unwrap();
284 interrupt_needs_arming = false;
285 }
286
287 let _ = self
290 .event
291 .wait_one(zx::Signals::USER_0, zx::MonotonicInstant::after(sleep_time.into()));
292 zx::MonotonicInstant::INFINITE
293 };
294
295 loop {
297 match self.port.wait(deadline) {
298 Ok(packet) => {
299 if packet.key() == INTERRUPT_KEY {
300 interrupt_needs_arming = true;
301 if msg.is_none() {
306 msg = self.inner.lock().thread_msg_queue.pop_front();
307 }
308 } else if packet.key() & IOB_PEER_CLOSED_KEY_BASE != 0 {
309 iob_peer_closed.push(packet.key() as u32);
310 } else {
311 sockets_ready.push(SocketId(packet.key() as u32))
312 }
313 }
314 Err(zx::Status::TIMED_OUT) => break,
315 Err(status) => panic!("port wait error: {status:?}"),
316 }
317 deadline = zx::MonotonicInstant::INFINITE_PAST;
318 }
319
320 let mut inner = InnerGuard::new(&self);
321
322 if !iob_peer_closed.is_empty() {
323 inner.update_message_ids(inner.ring_buffer.head());
325
326 for iob_peer_closed in iob_peer_closed.drain(..) {
327 let container_id = inner.iob_peers.free(iob_peer_closed).0;
328 if let Some(container) = inner.containers.get_mut(container_id) {
329 container.iob_count -= 1;
330 if container.iob_count == 0 && !container.is_active() {
331 if container.should_free() {
332 inner.containers.free(container_id);
333 } else {
334 let identity = Arc::clone(&container.identity);
335 inner.on_inactive.push(identity);
336 }
337 }
338 }
339 }
340 }
341
342 {
343 let mut sockets = self.sockets.lock();
344 for socket_id in sockets_ready.drain(..) {
345 inner.read_socket(&mut sockets, socket_id, |socket| {
346 socket
347 .socket
348 .wait_async(
349 &self.port,
350 socket_id.0 as u64,
351 zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
352 zx::WaitAsyncOpts::empty(),
353 )
354 .unwrap();
355 });
356 }
357 }
358
359 if let Some(m) = msg.take() {
361 match m {
362 ThreadMessage::Terminate => {
363 inner.terminated = true;
364 inner.wake = true;
365 return;
366 }
367 ThreadMessage::Flush(sender) => {
368 let _ = sender.send(());
369 }
370 }
371
372 msg = inner.thread_msg_queue.pop_front();
374 if msg.is_none() {
375 self.event.signal(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
379 }
380 }
381 }
382 }
383
384 async fn buffer_monitor_task(
385 this: Weak<Self>,
386 mut ring_buffer: ring_buffer::Reader,
387 sleep_time: Duration,
388 ) {
389 let mut last_head = 0;
390 loop {
391 fasync::Timer::new(sleep_time).await;
393 let head = ring_buffer.wait(last_head).await;
394 let Some(this) = this.upgrade() else { return };
395 let mut inner = InnerGuard::new(&this);
396 inner.check_space(head);
397 last_head = head;
398 }
399 }
400
401 fn flush_sockets(&self, inner: &mut InnerGuard<'_>) -> u64 {
403 let mut sockets = self.sockets.lock();
404 let mut socket_id = 0;
405 while let Some(next_id) = sockets.next_used(socket_id) {
406 inner.read_socket(&mut sockets, SocketId(next_id), |_| {});
408 socket_id = next_id + 1;
409 }
410 let head = inner.ring_buffer.head();
411 inner.update_message_ids(head);
412 head
413 }
414}
415
416impl Inner {
417 fn ingest(&mut self, msg: &[u8], container_id: ContainerId) {
419 if msg.len() < std::mem::size_of::<Header>() {
420 debug!("message too short ({})", msg.len());
421 if let Some(container) = self.containers.get(container_id) {
422 container.stats.increment_invalid(msg.len());
423 }
424 return;
425 }
426
427 let header = Header::read_from_bytes(&msg[..std::mem::size_of::<Header>()]).unwrap();
428
429 let msg_len = header.size_words() as usize * 8;
432
433 if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg.len() < msg_len {
435 debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, msg.len());
436 if let Some(container) = self.containers.get(container_id) {
437 container.stats.increment_invalid(msg.len());
438 }
439 return;
440 }
441
442 let Some(container) = self.containers.get_mut(container_id) else {
443 return;
444 };
445
446 let mut data;
447 let msg = if container.dropped_count > 0 {
448 data = msg.to_vec();
449 if !add_dropped_count(&mut data, container.dropped_count) {
450 debug!("unable to add dropped count to invalid message");
451 container.stats.increment_invalid(data.len());
452 return;
453 }
454 &data
455 } else {
456 msg
457 };
458
459 if container.iob.write(Default::default(), 0, msg).is_err() {
460 container.dropped_count += 1
464 } else {
465 container.dropped_count = 0;
466 }
467 }
468
469 unsafe fn parse_message(
481 &self,
482 range: Range<u64>,
483 ) -> (ContainerId, &[u8], Option<zx::BootInstant>) {
484 let (tag, msg) = unsafe { self.ring_buffer.first_message_in(range) }
485 .expect("Unable to read message from ring buffer");
486 (
487 ContainerId(tag as u32),
488 msg,
489 (msg.len() >= 16)
490 .then(|| zx::BootInstant::from_nanos(i64::read_from_bytes(&msg[8..16]).unwrap())),
491 )
492 }
493}
494
495impl<'a> InnerGuard<'a> {
496 fn new(buffer: &'a Arc<SharedBuffer>) -> Self {
497 Self {
498 buffer,
499 guard: ManuallyDrop::new(buffer.inner.lock()),
500 on_inactive: Vec::new(),
501 wake: false,
502 }
503 }
504
505 fn pop(&mut self, head: u64) -> Option<usize> {
513 if head == self.tail {
514 return None;
515 }
516
517 let record_len = {
520 let (container_id, message, _) = unsafe { self.parse_message(self.tail..head) };
521 let record_len = ring_buffer_record_len(message.len());
522
523 let container = self.containers.get_mut(container_id).unwrap();
524
525 container.stats.increment_rolled_out(record_len);
526 container.msg_ids.start += 1;
527 if !container.is_active() {
528 if container.should_free() {
529 self.containers.free(container_id);
530 } else {
531 let identity = Arc::clone(&container.identity);
532 self.on_inactive.push(identity);
533 }
534 }
535
536 record_len
537 };
538
539 self.ring_buffer.increment_tail(record_len);
541 self.tail += record_len as u64;
542 self.tail_message_id += 1;
543
544 assert!(self.last_scanned >= self.tail);
546
547 Some(record_len)
548 }
549
550 fn read_socket(
552 &mut self,
553 sockets: &mut Slab<Socket>,
554 socket_id: SocketId,
555 rearm: impl FnOnce(&mut Socket),
556 ) {
557 let Some(socket) = sockets.get_mut(socket_id.0) else { return };
558 let container_id = socket.container_id;
559
560 loop {
561 self.check_space(self.ring_buffer.head());
562
563 let mut data = Vec::with_capacity(MAX_DATAGRAM_LEN_BYTES as usize);
564
565 let len = match socket.socket.read_uninit(data.spare_capacity_mut()) {
567 Ok(d) => d.len(),
568 Err(zx::Status::SHOULD_WAIT) => {
569 rearm(socket);
571 return;
572 }
573 Err(_) => break,
574 };
575
576 unsafe {
578 data.set_len(len);
579 }
580
581 let container = self.containers.get_mut(container_id).unwrap();
582 if data.len() < 16 {
583 container.stats.increment_invalid(data.len());
584 continue;
585 }
586
587 let header = Header::read_from_bytes(&data[..std::mem::size_of::<Header>()]).unwrap();
588 let msg_len = header.size_words() as usize * 8;
589 if header.raw_type() != TRACING_FORMAT_LOG_RECORD_TYPE || msg_len != data.len() {
590 debug!("bad type or size ({}, {}, {})", header.raw_type(), msg_len, data.len());
591 container.stats.increment_invalid(data.len());
592 continue;
593 }
594
595 if container.dropped_count > 0 && !add_dropped_count(&mut data, container.dropped_count)
596 {
597 debug!("unable to add dropped count to invalid message");
598 container.stats.increment_invalid(data.len());
599 continue;
600 }
601
602 if container.iob.write(Default::default(), 0, &data).is_err() {
603 container.dropped_count += 1
607 } else {
608 container.dropped_count = 0;
609 }
610 }
611
612 self.update_message_ids(self.ring_buffer.head());
616
617 let container = self.containers.get_mut(container_id).unwrap();
618 container.remove_socket(socket_id, sockets);
619 if !container.is_active() {
620 if container.should_free() {
621 self.containers.free(container_id);
622 } else {
623 let identity = Arc::clone(&container.identity);
624 self.on_inactive.push(identity);
625 }
626 }
627 }
628
629 fn update_message_ids(&mut self, head: u64) {
631 while self.last_scanned < head {
632 let (container_id, message, _) = unsafe { self.parse_message(self.last_scanned..head) };
635 let msg_len = message.len();
636 let severity = (msg_len >= 8)
637 .then(|| Header::read_from_bytes(&message[0..8]).unwrap().severity().into());
638 let container = self.containers.get_mut(container_id).unwrap();
639 container.msg_ids.end += 1;
640 if let Some(severity) = severity {
641 container.stats.ingest_message(msg_len, severity);
642 }
643 self.last_scanned += ring_buffer_record_len(msg_len) as u64;
644 self.last_scanned_message_id += 1;
645 self.wake = true;
646 }
647 }
648
649 fn check_space(&mut self, head: u64) {
651 self.update_message_ids(head);
652 let capacity = self.ring_buffer.capacity();
653 let mut space = capacity
654 .checked_sub((head - self.tail) as usize)
655 .unwrap_or_else(|| panic!("bad range: {:?}", self.tail..head));
656 let required_space = capacity * SPACE_THRESHOLD_NUMERATOR / SPACE_THRESHOLD_DENOMINATOR;
657 while space < required_space {
658 let Some(amount) = self.pop(head) else { break };
659 space += amount;
660 }
661 }
662
663 fn cursor(&mut self, mode: StreamMode, selectors: Vec<ComponentSelector>) -> FilterCursor {
665 let (index, message_id) = match mode {
669 StreamMode::Snapshot => (self.tail, self.tail_message_id),
670 StreamMode::Subscribe => {
671 let head = self.ring_buffer.head();
672 self.update_message_ids(head);
673 (self.last_scanned, self.last_scanned_message_id)
674 }
675 StreamMode::SnapshotThenSubscribe => (self.tail, self.tail_message_id),
676 };
677 FilterCursor::new(
678 Arc::clone(self.buffer),
679 index,
680 message_id,
681 matches!(mode, StreamMode::Snapshot),
682 selectors,
683 )
684 }
685}
686
687#[derive(Default)]
688struct Containers {
689 slab: Slab<ContainerInfo>,
690}
691
692#[derive(Clone, Copy, Debug, Eq, PartialEq)]
693struct ContainerId(u32);
694
695impl Containers {
696 #[cfg(test)]
697 fn len(&self) -> usize {
698 self.slab.len()
699 }
700
701 fn get(&self, id: ContainerId) -> Option<&ContainerInfo> {
702 self.slab.get(id.0)
703 }
704
705 fn get_mut(&mut self, id: ContainerId) -> Option<&mut ContainerInfo> {
706 self.slab.get_mut(id.0)
707 }
708
709 fn new_container(
710 &mut self,
711 buffer: &RingBuffer,
712 identity: Arc<ComponentIdentity>,
713 stats: Arc<LogStreamStats>,
714 ) -> ContainerId {
715 ContainerId(self.slab.insert(|id| {
716 let (iob, _) = buffer.new_iob_writer(id as u64).unwrap();
717 ContainerInfo::new(identity, stats, iob)
718 }))
719 }
720
721 fn free(&mut self, id: ContainerId) {
722 self.slab.free(id.0);
723 }
724}
725
726#[derive(Derivative)]
727#[derivative(Debug)]
728struct ContainerInfo {
729 identity: Arc<ComponentIdentity>,
731
732 msg_ids: Range<u64>,
735
736 terminated: bool,
738
739 #[derivative(Debug = "ignore")]
741 stats: Arc<LogStreamStats>,
742
743 first_socket_id: SocketId,
745
746 iob: zx::Iob,
748
749 iob_count: usize,
751
752 dropped_count: u64,
754}
755
756impl ContainerInfo {
757 fn new(identity: Arc<ComponentIdentity>, stats: Arc<LogStreamStats>, iob: zx::Iob) -> Self {
758 Self {
759 identity,
760 msg_ids: 0..0,
761 terminated: false,
762 stats,
763 first_socket_id: SocketId::NULL,
764 iob,
765 iob_count: 0,
766 dropped_count: 0,
767 }
768 }
769
770 fn should_free(&self) -> bool {
771 self.terminated && !self.is_active()
772 }
773
774 fn is_active(&self) -> bool {
780 self.first_socket_id != SocketId::NULL
781 || self.iob_count > 0
782 || self.msg_ids.end != self.msg_ids.start
783 || ARCHIVIST_MONIKER.get().is_some_and(|m| *self.identity == *m)
784 }
785
786 fn remove_socket(&mut self, socket_id: SocketId, sockets: &mut Slab<Socket>) {
790 let Socket { prev, next, .. } = *sockets.get(socket_id.0).unwrap();
791 if prev == SocketId::NULL {
792 self.first_socket_id = next;
793 } else {
794 sockets.get_mut(prev.0).unwrap().next = next;
795 }
796 if next != SocketId::NULL {
797 sockets
798 .get_mut(next.0)
799 .unwrap_or_else(|| panic!("next {next:?} has been freed!"))
800 .prev = prev;
801 }
802 sockets.free(socket_id.0);
803 self.stats.close_socket();
804 debug!(identity:% = self.identity; "Socket closed.");
805 }
806}
807
808pub struct ContainerBuffer {
809 shared_buffer: Arc<SharedBuffer>,
810 container_id: ContainerId,
811}
812
813impl ContainerBuffer {
814 pub fn iob_tag(&self) -> u64 {
816 self.container_id.0 as u64
817 }
818
819 pub fn push_back(&self, msg: &[u8]) {
823 self.shared_buffer.inner.lock().ingest(msg, self.container_id);
824 }
825
826 pub fn iob(&self) -> zx::Iob {
828 let mut inner = self.shared_buffer.inner.lock();
829
830 inner.containers.get_mut(self.container_id).unwrap().iob_count += 1;
831
832 let (ep0, ep1) = inner.ring_buffer.new_iob_writer(self.container_id.0 as u64).unwrap();
833
834 inner.iob_peers.insert(|idx| {
835 ep1.wait_async(
836 &self.shared_buffer.port,
837 idx as u64 | IOB_PEER_CLOSED_KEY_BASE,
838 zx::Signals::IOB_PEER_CLOSED,
839 zx::WaitAsyncOpts::empty(),
840 )
841 .unwrap();
842
843 (self.container_id, ep1)
844 });
845
846 ep0
847 }
848
849 #[cfg(test)]
851 fn cursor(&self, mode: StreamMode) -> Option<FilterCursorStream<FxtMessage>> {
852 use selectors::SelectorExt;
853
854 let mut inner = InnerGuard::new(&self.shared_buffer);
858 let Some(container) = inner.containers.get_mut(self.container_id) else {
859 return None;
861 };
862 let selectors = vec![container.identity.moniker.clone().into_component_selector()];
863 Some(inner.cursor(mode, selectors).into())
864 }
865
866 pub fn terminate(&self) {
870 let mut inner = InnerGuard::new(&self.shared_buffer);
871
872 inner.update_message_ids(inner.ring_buffer.head());
874
875 if let Some(container) = inner.containers.get_mut(self.container_id) {
876 container.terminated = true;
877 if container.first_socket_id != SocketId::NULL {
878 let mut sockets = self.shared_buffer.sockets.lock();
879 loop {
880 container.remove_socket(container.first_socket_id, &mut sockets);
881 if container.first_socket_id == SocketId::NULL {
882 break;
883 }
884 }
885 }
886 if container.should_free() {
887 inner.containers.free(self.container_id);
888 }
889 inner.wake = true;
890 }
891 }
892
893 pub fn is_active(&self) -> bool {
895 self.shared_buffer
896 .inner
897 .lock()
898 .containers
899 .get(self.container_id)
900 .is_some_and(|c| c.is_active())
901 }
902
903 pub fn add_socket(&self, socket: zx::Socket) {
905 let mut inner = self.shared_buffer.inner.lock();
906 let Some(container) = inner.containers.get_mut(self.container_id) else { return };
907 container.stats.open_socket();
908 let next = container.first_socket_id;
909 let mut sockets = self.shared_buffer.sockets.lock();
910 let socket_id = SocketId(sockets.insert(|socket_id| {
911 socket
912 .wait_async(
913 &self.shared_buffer.port,
914 socket_id as u64,
915 zx::Signals::OBJECT_READABLE | zx::Signals::OBJECT_PEER_CLOSED,
916 zx::WaitAsyncOpts::empty(),
917 )
918 .unwrap();
919 Socket { socket, container_id: self.container_id, prev: SocketId::NULL, next }
920 }));
921 if next != SocketId::NULL {
922 sockets.get_mut(next.0).unwrap().prev = socket_id;
923 }
924 container.first_socket_id = socket_id;
925 }
926}
927
928impl Drop for ContainerBuffer {
929 fn drop(&mut self) {
930 self.terminate();
931 }
932}
933
934struct Slab<T> {
936 slab: Vec<Slot<T>>,
937 free_index: usize,
938}
939
940impl<T> Default for Slab<T> {
941 fn default() -> Self {
942 Self { slab: Vec::new(), free_index: usize::MAX }
943 }
944}
945
946enum Slot<T> {
947 Used(T),
948 Free(usize),
949}
950
951impl<T> Slab<T> {
952 #[cfg(test)]
954 fn len(&self) -> usize {
955 self.slab.iter().filter(|c| matches!(c, Slot::Used(_))).count()
956 }
957
958 fn free(&mut self, index: u32) -> T {
959 let index = index as usize;
960 let value = match std::mem::replace(&mut self.slab[index], Slot::Free(self.free_index)) {
961 Slot::Free(_) => panic!("Slot already free"),
962 Slot::Used(value) => value,
963 };
964 self.free_index = index;
965 value
966 }
967
968 fn get(&self, id: u32) -> Option<&T> {
969 self.slab.get(id as usize).and_then(|s| match s {
970 Slot::Used(s) => Some(s),
971 _ => None,
972 })
973 }
974
975 fn get_mut(&mut self, id: u32) -> Option<&mut T> {
976 self.slab.get_mut(id as usize).and_then(|s| match s {
977 Slot::Used(s) => Some(s),
978 _ => None,
979 })
980 }
981
982 fn insert(&mut self, value: impl FnOnce(u32) -> T) -> u32 {
983 let free_index = self.free_index;
984 if free_index != usize::MAX {
985 self.free_index = match std::mem::replace(
986 &mut self.slab[free_index],
987 Slot::Used(value(free_index as u32)),
988 ) {
989 Slot::Free(next) => next,
990 _ => unreachable!(),
991 };
992 free_index as u32
993 } else {
994 assert!(self.slab.len() < u32::MAX as usize);
997 self.slab.push(Slot::Used(value(self.slab.len() as u32)));
998 (self.slab.len() - 1) as u32
999 }
1000 }
1001
1002 fn next_used(&self, id: u32) -> Option<u32> {
1004 let mut id = id as usize;
1005 while id < self.slab.len() {
1006 if matches!(self.slab[id], Slot::Used(_)) {
1007 return Some(id as u32);
1008 }
1009 id += 1;
1010 }
1011 None
1012 }
1013}
1014
1015#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1016struct SocketId(u32);
1017
1018impl SocketId {
1019 const NULL: Self = SocketId(0xffff_ffff);
1020}
1021
1022struct Socket {
1023 socket: zx::Socket,
1024 container_id: ContainerId,
1025 prev: SocketId,
1027 next: SocketId,
1028}
1029
1030#[cfg(test)]
1031mod tests {
1032 use super::{SharedBuffer, SharedBufferOptions, Slab, create_ring_buffer};
1033 use crate::logs::stats::LogStreamStats;
1034 use crate::logs::testing::make_message;
1035 use assert_matches::assert_matches;
1036 use diagnostics_assertions::{AnyProperty, assert_data_tree};
1037 use fidl_fuchsia_diagnostics::StreamMode;
1038 use fuchsia_async as fasync;
1039 use fuchsia_async::TimeoutExt;
1040 use fuchsia_inspect::Inspector;
1041 use fuchsia_inspect_derive::WithInspect;
1042 use futures::FutureExt;
1043 use futures::channel::mpsc;
1044 use futures::future::OptionFuture;
1045 use futures::stream::{FuturesUnordered, StreamExt as _};
1046 use ring_buffer::MAX_MESSAGE_SIZE;
1047 use std::future::poll_fn;
1048 use std::iter::repeat_with;
1049 use std::pin::pin;
1050 use std::sync::Arc;
1051 use std::sync::atomic::{AtomicU64, Ordering};
1052 use std::task::Poll;
1053 use std::time::Duration;
1054
1055 async fn yield_to_executor() {
1056 let mut first_time = true;
1057 poll_fn(|cx| {
1058 if first_time {
1059 cx.waker().wake_by_ref();
1060 first_time = false;
1061 Poll::Pending
1062 } else {
1063 Poll::Ready(())
1064 }
1065 })
1066 .await;
1067 }
1068
1069 #[fuchsia::test]
1070 async fn push_one_message() {
1071 let buffer = SharedBuffer::new(
1072 create_ring_buffer(MAX_MESSAGE_SIZE),
1073 Box::new(|_| {}),
1074 Default::default(),
1075 );
1076 let container_buffer =
1077 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1078 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1079 container_buffer.push_back(msg.bytes());
1080
1081 let cursor = container_buffer.cursor(StreamMode::Snapshot).unwrap();
1083 assert_eq!(cursor.map(|item| assert_eq!(item.data(), msg.bytes())).count().await, 1);
1084 }
1085
1086 #[fuchsia::test]
1087 async fn message_too_short() {
1088 let buffer = SharedBuffer::new(
1089 create_ring_buffer(MAX_MESSAGE_SIZE),
1090 Box::new(|_| {}),
1091 Default::default(),
1092 );
1093
1094 let container_buffer =
1095 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1096 container_buffer.push_back(&[0]);
1097
1098 assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1099 }
1100
1101 #[fuchsia::test]
1102 async fn bad_type() {
1103 let buffer = SharedBuffer::new(
1104 create_ring_buffer(MAX_MESSAGE_SIZE),
1105 Box::new(|_| {}),
1106 Default::default(),
1107 );
1108 let container_buffer =
1109 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1110 container_buffer.push_back(&[0x77; 16]);
1111
1112 assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1113 }
1114
1115 #[fuchsia::test]
1116 async fn message_truncated() {
1117 let buffer = SharedBuffer::new(
1118 create_ring_buffer(MAX_MESSAGE_SIZE),
1119 Box::new(|_| {}),
1120 Default::default(),
1121 );
1122 let container_buffer =
1123 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1124 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1125 container_buffer.push_back(&msg.bytes()[..msg.bytes().len() - 1]);
1126
1127 assert_eq!(container_buffer.cursor(StreamMode::Snapshot).unwrap().count().await, 0);
1128 }
1129
1130 #[fuchsia::test]
1131 async fn buffer_wrapping() {
1132 let buffer = SharedBuffer::new(
1133 create_ring_buffer(MAX_MESSAGE_SIZE),
1134 Box::new(|_| {}),
1135 SharedBufferOptions { sleep_time: Duration::ZERO },
1136 );
1137 let container_buffer =
1138 buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1139
1140 let mut i = 0;
1142 loop {
1143 let msg = make_message(&format!("{i}"), None, zx::BootInstant::from_nanos(i));
1144 container_buffer.push_back(msg.bytes());
1145 i += 1;
1146
1147 yield_to_executor().await;
1149
1150 let inner = buffer.inner.lock();
1151 if inner.ring_buffer.head() > inner.ring_buffer.capacity() as u64 {
1152 break;
1153 }
1154 }
1155
1156 let mut cursor = pin!(container_buffer.cursor(StreamMode::Snapshot).unwrap());
1158
1159 let mut j;
1160 let item = cursor.next().await;
1161 assert_matches!(
1162 item,
1163 Some(item) => {
1164 j = item.timestamp().into_nanos();
1165 let msg = make_message(&format!("{j}"),
1166 None,
1167 item.timestamp());
1168 assert_eq!(item.data(), msg.bytes());
1169 }
1170 );
1171
1172 j += 1;
1173 while j != i {
1174 assert_matches!(
1175 cursor.next().await,
1176 Some(item) => {
1177 assert_eq!(item.data(), make_message(&format!("{j}"),
1178 None,
1179 item.timestamp()).bytes());
1180 }
1181 );
1182 j += 1;
1183 }
1184
1185 assert!(cursor.next().await.is_none());
1186 }
1187
1188 #[fuchsia::test]
1189 async fn on_inactive() {
1190 let identity = Arc::new(vec!["a"].into());
1191 let on_inactive = Arc::new(AtomicU64::new(0));
1192 let buffer = {
1193 let on_inactive = Arc::clone(&on_inactive);
1194 let identity = Arc::clone(&identity);
1195 Arc::new(SharedBuffer::new(
1196 create_ring_buffer(MAX_MESSAGE_SIZE),
1197 Box::new(move |i| {
1198 assert_eq!(i, identity);
1199 on_inactive.fetch_add(1, Ordering::Relaxed);
1200 }),
1201 SharedBufferOptions { sleep_time: Duration::ZERO },
1202 ))
1203 };
1204 let container_a = buffer.new_container_buffer(identity, Arc::default());
1205 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1206
1207 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1208 container_a.push_back(msg.bytes());
1209
1210 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1212 container_b.push_back(msg.bytes());
1213
1214 yield_to_executor().await;
1216 }
1217
1218 assert_eq!(on_inactive.load(Ordering::Relaxed), 1);
1219 }
1220
1221 #[fuchsia::test]
1222 async fn terminate_drops_container() {
1223 async {}.await;
1225
1226 let buffer = SharedBuffer::new(
1227 create_ring_buffer(MAX_MESSAGE_SIZE),
1228 Box::new(|_| {}),
1229 SharedBufferOptions { sleep_time: Duration::ZERO },
1230 );
1231
1232 let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1234 assert_eq!(buffer.container_count(), 1);
1235 container_a.terminate();
1236
1237 assert_eq!(buffer.container_count(), 0);
1238
1239 let container_a = buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default());
1241 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1242 container_a.push_back(msg.bytes());
1243 assert_eq!(buffer.container_count(), 1);
1244 container_a.terminate();
1245
1246 assert_eq!(buffer.container_count(), 1);
1248
1249 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1251 assert_eq!(buffer.container_count(), 2);
1252
1253 while buffer.container_count() != 1 {
1256 container_b.push_back(msg.bytes());
1257
1258 yield_to_executor().await;
1260 }
1261
1262 assert!(container_a.cursor(StreamMode::Subscribe).is_none());
1263 }
1264
1265 #[fuchsia::test]
1266 async fn cursor_subscribe() {
1267 for mode in [StreamMode::Subscribe, StreamMode::SnapshotThenSubscribe] {
1268 let buffer = SharedBuffer::new(
1269 create_ring_buffer(MAX_MESSAGE_SIZE),
1270 Box::new(|_| {}),
1271 Default::default(),
1272 );
1273 let container =
1274 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1275 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1276 container.push_back(msg.bytes());
1277
1278 let (sender, mut receiver) = mpsc::unbounded();
1279
1280 {
1282 let container = Arc::clone(&container);
1283 fasync::Task::spawn(async move {
1284 let mut cursor = pin!(container.cursor(mode).unwrap());
1285 while let Some(item) = cursor.next().await {
1286 sender.unbounded_send(item).unwrap();
1287 }
1288 })
1289 .detach();
1290 }
1291
1292 if mode == StreamMode::SnapshotThenSubscribe {
1294 assert_matches!(
1295 receiver.next().await,
1296 Some(item) if item.data() == msg.bytes()
1297 );
1298 }
1299
1300 assert!(
1302 OptionFuture::from(Some(receiver.next()))
1303 .on_timeout(Duration::from_millis(500), || None)
1304 .await
1305 .is_none()
1306 );
1307
1308 container.push_back(msg.bytes());
1309
1310 assert_matches!(
1312 receiver.next().await,
1313 Some(item) if item.data() == msg.bytes()
1314 );
1315 }
1316 }
1317
1318 #[fuchsia::test]
1319 async fn drained_post_termination_cursors() {
1320 let buffer = SharedBuffer::new(
1321 create_ring_buffer(MAX_MESSAGE_SIZE),
1322 Box::new(|_| {}),
1323 Default::default(),
1324 );
1325 let container =
1326 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1327 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1328
1329 let mut cursor_a = pin!(container.cursor(StreamMode::Subscribe).unwrap());
1330 let mut cursor_b = pin!(container.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1331
1332 container.push_back(msg.bytes());
1333 container.push_back(msg.bytes());
1334 container.push_back(msg.bytes());
1335 container.push_back(msg.bytes());
1336 container.push_back(msg.bytes());
1337
1338 let mut cursor_c = pin!(container.cursor(StreamMode::Snapshot).unwrap());
1339 assert!(cursor_a.next().await.is_some());
1340 assert!(cursor_b.next().await.is_some());
1341 assert!(cursor_c.next().await.is_some());
1342
1343 drop(buffer.terminate());
1344
1345 assert_eq!(cursor_a.count().await, 4);
1347 assert_eq!(cursor_b.count().await, 4);
1348 assert_eq!(cursor_c.count().await, 4);
1349 }
1350
1351 #[fuchsia::test]
1352 async fn empty_post_termination_cursors() {
1353 let buffer = SharedBuffer::new(
1354 create_ring_buffer(MAX_MESSAGE_SIZE),
1355 Box::new(|_| {}),
1356 Default::default(),
1357 );
1358 let container =
1359 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1360
1361 let cursor_a = container.cursor(StreamMode::Subscribe).unwrap();
1362 let cursor_b = container.cursor(StreamMode::SnapshotThenSubscribe).unwrap();
1363 let cursor_c = container.cursor(StreamMode::Snapshot).unwrap();
1364
1365 drop(buffer.terminate());
1366
1367 assert_eq!(cursor_a.count().await, 0);
1368 assert_eq!(cursor_b.count().await, 0);
1369 assert_eq!(cursor_c.count().await, 0);
1370 }
1371
1372 #[fuchsia::test]
1373 async fn recycled_container_slot() {
1374 let buffer = Arc::new(SharedBuffer::new(
1375 create_ring_buffer(MAX_MESSAGE_SIZE),
1376 Box::new(|_| {}),
1377 SharedBufferOptions { sleep_time: Duration::ZERO },
1378 ));
1379 let container_a =
1380 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1381 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1382 container_a.push_back(msg.bytes());
1383
1384 let mut cursor = pin!(container_a.cursor(StreamMode::SnapshotThenSubscribe).unwrap());
1385 assert_matches!(cursor.next().await, Some(_));
1386
1387 let container_b =
1389 Arc::new(buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default()));
1390 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await > 0 {
1391 container_b.push_back(msg.bytes());
1392
1393 yield_to_executor().await;
1395 }
1396
1397 container_a.terminate();
1398
1399 let container_c =
1402 Arc::new(buffer.new_container_buffer(Arc::new(vec!["c"].into()), Arc::default()));
1403 container_c.push_back(msg.bytes());
1404 container_c.push_back(msg.bytes());
1405 }
1406
1407 #[fuchsia::test]
1408 async fn socket_increments_logstats() {
1409 let inspector = Inspector::default();
1410 let stats: Arc<LogStreamStats> =
1411 Arc::new(LogStreamStats::default().with_inspect(inspector.root(), "test").unwrap());
1412 let buffer = Arc::new(SharedBuffer::new(
1413 create_ring_buffer(65536),
1414 Box::new(|_| {}),
1415 Default::default(),
1416 ));
1417 let container_a = Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats));
1418 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1419
1420 let (local, remote) = zx::Socket::create_datagram();
1421 container_a.add_socket(remote);
1422
1423 let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1424
1425 let mut futures = FuturesUnordered::new();
1428 futures.push(async move {
1429 let mut cursor_a = pin!(cursor_a);
1430 cursor_a.next().await
1431 });
1432 let mut next = futures.next();
1433 assert!(futures::poll!(&mut next).is_pending());
1434
1435 local.write(msg.bytes()).unwrap();
1436
1437 let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1438
1439 assert_eq!(cursor_b.map(|item| assert_eq!(item.data(), msg.bytes())).count().await, 1);
1440
1441 next.await;
1443 assert_data_tree!(
1445 inspector,
1446 root: contains {
1447 test: {
1448 url: "",
1449 last_timestamp: AnyProperty,
1450 sockets_closed: 0u64,
1451 sockets_opened: 1u64,
1452 invalid: {
1453 number: 0u64,
1454 bytes: 0u64,
1455 },
1456 total: {
1457 number: 1u64,
1458 bytes: 88u64,
1459 },
1460 rolled_out: {
1461 number: 0u64,
1462 bytes: 0u64,
1463 },
1464 trace: {
1465 number: 0u64,
1466 bytes: 0u64,
1467 },
1468 debug: {
1469 number: 1u64,
1470 bytes: 88u64,
1471 },
1472 info: {
1473 number: 0u64,
1474 bytes: 0u64,
1475 },
1476 warn: {
1477 number: 0u64,
1478 bytes: 0u64,
1479 },
1480 error: {
1481 number: 0u64,
1482 bytes: 0u64,
1483 },
1484 fatal: {
1485 number: 0u64,
1486 bytes: 0u64,
1487 },
1488 }
1489 }
1490 );
1491 }
1492
1493 #[fuchsia::test]
1494 async fn socket() {
1495 let buffer = Arc::new(SharedBuffer::new(
1496 create_ring_buffer(MAX_MESSAGE_SIZE),
1497 Box::new(|_| {}),
1498 Default::default(),
1499 ));
1500 let container_a =
1501 Arc::new(buffer.new_container_buffer(Arc::new(vec!["a"].into()), Arc::default()));
1502 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1503
1504 let (local, remote) = zx::Socket::create_datagram();
1505 container_a.add_socket(remote);
1506
1507 let cursor_a = container_a.cursor(StreamMode::Subscribe).unwrap();
1508
1509 let mut futures = FuturesUnordered::new();
1512 futures.push(async move {
1513 let mut cursor_a = pin!(cursor_a);
1514 cursor_a.next().await
1515 });
1516 let mut next = futures.next();
1517 assert!(futures::poll!(&mut next).is_pending());
1518
1519 local.write(msg.bytes()).unwrap();
1520
1521 let cursor_b = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1522
1523 assert_eq!(cursor_b.map(|item| assert_eq!(item.data(), msg.bytes())).count().await, 1);
1524
1525 next.await;
1527 }
1528
1529 #[fuchsia::test]
1530 async fn socket_on_inactive() {
1531 let on_inactive = Arc::new(AtomicU64::new(0));
1532 let a_identity = Arc::new(vec!["a"].into());
1533 let buffer = Arc::new(SharedBuffer::new(
1534 create_ring_buffer(MAX_MESSAGE_SIZE),
1535 {
1536 let on_inactive = Arc::clone(&on_inactive);
1537 let a_identity = Arc::clone(&a_identity);
1538 Box::new(move |id| {
1539 assert_eq!(id, a_identity);
1540 on_inactive.fetch_add(1, Ordering::Relaxed);
1541 })
1542 },
1543 SharedBufferOptions { sleep_time: Duration::ZERO },
1544 ));
1545 let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1546 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1547
1548 let (local, remote) = zx::Socket::create_datagram();
1549 container_a.add_socket(remote);
1550
1551 local.write(msg.bytes()).unwrap();
1552
1553 let cursor = pin!(container_a.cursor(StreamMode::Snapshot).unwrap());
1554
1555 assert_eq!(cursor.map(|item| assert_eq!(item.data(), msg.bytes())).count().await, 1);
1556
1557 let container_b = buffer.new_container_buffer(Arc::new(vec!["b"].into()), Arc::default());
1559 while container_a.cursor(StreamMode::Snapshot).unwrap().count().await == 1 {
1560 container_b.push_back(msg.bytes());
1561
1562 yield_to_executor().await;
1564 }
1565
1566 assert_eq!(on_inactive.load(Ordering::Relaxed), 0);
1567
1568 std::mem::drop(local);
1570
1571 while on_inactive.load(Ordering::Relaxed) != 1 {
1573 fasync::Timer::new(Duration::from_millis(50)).await;
1574 }
1575 }
1576
1577 #[fuchsia::test]
1578 async fn flush() {
1579 let a_identity = Arc::new(vec!["a"].into());
1580 let buffer = Arc::new(SharedBuffer::new(
1581 create_ring_buffer(1024 * 1024),
1582 Box::new(|_| {}),
1583 Default::default(),
1584 ));
1585 let container_a = Arc::new(buffer.new_container_buffer(a_identity, Arc::default()));
1586 let msg = make_message("a", None, zx::BootInstant::from_nanos(1));
1587
1588 let (local, remote) = zx::Socket::create_datagram();
1589 container_a.add_socket(remote);
1590
1591 let cursor = pin!(container_a.cursor(StreamMode::Subscribe).unwrap());
1592
1593 const COUNT: usize = 1000;
1594 for _ in 0..COUNT {
1595 local.write(msg.bytes()).unwrap();
1596 }
1597
1598 let mut flush_futures = FuturesUnordered::from_iter([buffer.flush(), buffer.flush()]);
1600 flush_futures.next().await;
1601
1602 let messages: Option<Vec<_>> = cursor.take(COUNT).collect().now_or_never();
1603 assert!(messages.is_some());
1604
1605 flush_futures.next().await;
1607
1608 buffer.terminate().await;
1610 }
1611
1612 #[fuchsia::test]
1613 fn test_slab_next_used() {
1614 let mut slab = Slab::default();
1615 let mut ids: Vec<_> = repeat_with(|| slab.insert(|_| ())).take(4).collect();
1616 ids.sort();
1617 slab.free(ids[0]);
1618 slab.free(ids[2]);
1619 assert_eq!(slab.next_used(0), Some(ids[1]));
1620 assert_eq!(slab.next_used(ids[1]), Some(ids[1]));
1621 assert_eq!(slab.next_used(ids[1] + 1), Some(ids[3]));
1622 assert_eq!(slab.next_used(ids[3] + 1), None);
1623 }
1624}