1use netstack3_base::{Payload, SackBlocks, SeqNum};
10
11use arrayvec::ArrayVec;
12use core::fmt::Debug;
13use core::ops::Range;
14use packet::InnerPacketBuilder;
15
16use crate::internal::base::BufferSizes;
17use crate::internal::seq_ranges::{SeqRange, SeqRanges};
18
19pub trait Buffer: Debug + Sized {
21 fn limits(&self) -> BufferLimits;
26
27 fn target_capacity(&self) -> usize;
39
40 fn request_capacity(&mut self, size: usize);
46}
47
48pub trait ReceiveBuffer: Buffer {
50 fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize;
54
55 fn make_readable(&mut self, count: usize, has_outstanding: bool);
66}
67
68pub trait SendBuffer: Buffer {
70 type Payload<'a>: InnerPacketBuilder + Payload + Debug + 'a;
72
73 fn mark_read(&mut self, count: usize);
80
81 fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
89 where
90 F: FnOnce(Self::Payload<'a>) -> R;
91}
92
93#[derive(Eq, PartialEq, Debug, Copy, Clone)]
95pub struct BufferLimits {
96 pub capacity: usize,
98
99 pub len: usize,
101}
102
103#[derive(Debug)]
105#[cfg_attr(test, derive(PartialEq, Eq))]
106pub(super) struct Assembler {
107 nxt: SeqNum,
111 generation: usize,
115 outstanding: SeqRanges<usize>,
120}
121
122impl Assembler {
123 pub(super) fn new(nxt: SeqNum) -> Self {
125 Self { outstanding: SeqRanges::default(), generation: 0, nxt }
126 }
127
128 pub(super) fn nxt(&self) -> SeqNum {
130 self.nxt
131 }
132
133 pub(super) fn has_out_of_order(&self) -> bool {
136 !self.outstanding.is_empty()
137 }
138
139 pub(super) fn insert(&mut self, Range { start, end }: Range<SeqNum>) -> usize {
151 assert!(!start.after(end));
152 assert!(!start.before(self.nxt));
153 if start == end {
154 return 0;
155 }
156
157 let Self { outstanding, nxt, generation } = self;
158 *generation = *generation + 1;
159 let _: bool = outstanding.insert(start..end, *generation);
160
161 if let Some(advanced) = outstanding.pop_front_if(|r| r.start() == *nxt) {
162 *nxt = advanced.end();
163 usize::try_from(advanced.len()).unwrap()
164 } else {
165 0
166 }
167 }
168
169 pub(super) fn has_outstanding(&self) -> bool {
170 let Self { outstanding, nxt: _, generation: _ } = self;
171 !outstanding.is_empty()
172 }
173
174 pub(crate) fn sack_blocks(&self, size_limits: SackBlockSizeLimiters) -> SackBlocks {
190 let Self { nxt: _, generation: _, outstanding } = self;
191 if outstanding.is_empty() {
193 return SackBlocks::default();
194 }
195
196 let mut heap = ArrayVec::<&SeqRange<_>, { SackBlocks::MAX_BLOCKS }>::new();
200 let num_blocks_allowed = size_limits.num_blocks_allowed();
201
202 for block in outstanding.iter() {
203 if heap.len() >= num_blocks_allowed {
204 if heap.last().is_some_and(|l| l.meta() < block.meta()) {
205 let _: Option<_> = heap.pop();
207 } else {
208 continue;
211 }
212 }
213
214 heap.push(block);
215 heap.sort_by(|a, b| b.meta().cmp(&a.meta()))
217 }
218
219 SackBlocks::from_iter(heap.into_iter().map(|block| block.to_sack_block()))
220 }
221}
222
223pub(crate) struct SackBlockSizeLimiters {
224 pub(crate) timestamp_enabled: bool,
225}
226
227impl SackBlockSizeLimiters {
228 fn num_blocks_allowed(&self) -> usize {
229 let Self { timestamp_enabled } = self;
230 if *timestamp_enabled {
231 SackBlocks::MAX_BLOCKS_WITH_TIMESTAMP
232 } else {
233 SackBlocks::MAX_BLOCKS
234 }
235 }
236}
237
238pub trait IntoBuffers<R: ReceiveBuffer, S: SendBuffer> {
241 fn into_buffers(self, buffer_sizes: BufferSizes) -> (R, S);
243}
244
245#[cfg(any(test, feature = "testutils"))]
246impl<R: Default + ReceiveBuffer, S: Default + SendBuffer> IntoBuffers<R, S> for () {
247 fn into_buffers(self, buffer_sizes: BufferSizes) -> (R, S) {
248 let BufferSizes { send: _, receive: _ } = buffer_sizes;
250 Default::default()
251 }
252}
253
254#[cfg(any(test, feature = "testutils"))]
255pub(crate) mod testutil {
256 use super::*;
257
258 use alloc::sync::Arc;
259 use alloc::vec;
260 use alloc::vec::Vec;
261 use core::cmp;
262
263 use either::Either;
264 use netstack3_base::sync::Mutex;
265 use netstack3_base::{FragmentedPayload, PayloadLen, WindowSize};
266
267 use crate::internal::socket::accept_queue::ListenerNotifier;
268
269 #[derive(Clone, PartialEq, Eq)]
286 pub struct RingBuffer {
287 pub(super) storage: Vec<u8>,
288 pub(super) head: usize,
293 pub(super) len: usize,
298 }
299
300 impl Debug for RingBuffer {
301 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
302 let Self { storage, head, len } = self;
303 f.debug_struct("RingBuffer")
304 .field("storage (len, cap)", &(storage.len(), storage.capacity()))
305 .field("head", head)
306 .field("len", len)
307 .finish()
308 }
309 }
310
311 impl Default for RingBuffer {
312 fn default() -> Self {
313 Self::new(WindowSize::DEFAULT.into())
314 }
315 }
316
317 impl RingBuffer {
318 pub fn new(capacity: usize) -> Self {
320 Self { storage: vec![0; capacity], head: 0, len: 0 }
321 }
322
323 pub fn reset(&mut self) {
325 let Self { storage: _, head, len } = self;
326 *head = 0;
327 *len = 0;
328 }
329
330 fn with_readable<'a, F, R>(storage: &'a Vec<u8>, start: usize, len: usize, f: F) -> R
332 where
333 F: for<'b> FnOnce(&'b [&'a [u8]]) -> R,
334 {
335 let end = start + len;
337 if end > storage.len() {
338 let first_part = &storage[start..storage.len()];
339 let second_part = &storage[0..len - first_part.len()];
340 f(&[first_part, second_part][..])
341 } else {
342 let all_bytes = &storage[start..end];
343 f(&[all_bytes][..])
344 }
345 }
346
347 pub fn read_with<F>(&mut self, f: F) -> usize
355 where
356 F: for<'a, 'b> FnOnce(&'b [&'a [u8]]) -> usize,
357 {
358 let Self { storage, head, len } = self;
359 if storage.len() == 0 {
360 return f(&[&[]]);
361 }
362 let nread = RingBuffer::with_readable(storage, *head, *len, f);
363 assert!(nread <= *len);
364 *len -= nread;
365 *head = (*head + nread) % storage.len();
366 nread
367 }
368
369 pub fn writable_regions(&mut self) -> impl IntoIterator<Item = &mut [u8]> {
371 let BufferLimits { capacity, len } = self.limits();
372 let available = capacity - len;
373 let Self { storage, head, len } = self;
374
375 let mut write_start = *head + *len;
376 if write_start >= storage.len() {
377 write_start -= storage.len()
378 }
379 let write_end = write_start + available;
380 if write_end <= storage.len() {
381 Either::Left([&mut self.storage[write_start..write_end]].into_iter())
382 } else {
383 let (b1, b2) = self.storage[..].split_at_mut(write_start);
384 let b2_len = b2.len();
385 Either::Right([b2, &mut b1[..(available - b2_len)]].into_iter())
386 }
387 }
388 }
389
390 impl Buffer for RingBuffer {
391 fn limits(&self) -> BufferLimits {
392 let Self { storage, len, head: _ } = self;
393 let capacity = storage.len();
394 BufferLimits { len: *len, capacity }
395 }
396
397 fn target_capacity(&self) -> usize {
398 let Self { storage, len: _, head: _ } = self;
399 storage.len()
400 }
401
402 fn request_capacity(&mut self, size: usize) {
403 unimplemented!("capacity request for {size} not supported")
404 }
405 }
406
407 impl ReceiveBuffer for RingBuffer {
408 fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize {
409 let BufferLimits { capacity, len } = self.limits();
410 let available = capacity - len;
411 let Self { storage, head, len } = self;
412 if storage.len() == 0 {
413 return 0;
414 }
415
416 if offset > available {
417 return 0;
418 }
419 let start_at = (*head + *len + offset) % storage.len();
420 let to_write = cmp::min(data.len(), available);
421 let first_len = cmp::min(to_write, storage.len() - start_at);
423 data.partial_copy(0, &mut storage[start_at..start_at + first_len]);
424 if to_write > first_len {
427 data.partial_copy(first_len, &mut storage[0..to_write - first_len]);
428 }
429 to_write
430 }
431
432 fn make_readable(&mut self, count: usize, _has_outstanding: bool) {
433 let BufferLimits { capacity, len } = self.limits();
434 debug_assert!(count <= capacity - len);
435 self.len += count;
436 }
437 }
438
439 impl SendBuffer for RingBuffer {
440 type Payload<'a> = FragmentedPayload<'a, 2>;
441
442 fn mark_read(&mut self, count: usize) {
443 let Self { storage, head, len } = self;
444 assert!(count <= *len);
445 *len -= count;
446 *head = (*head + count) % storage.len();
447 }
448
449 fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
450 where
451 F: FnOnce(Self::Payload<'a>) -> R,
452 {
453 let Self { storage, head, len } = self;
454 if storage.len() == 0 {
455 return f(FragmentedPayload::new_empty());
456 }
457 assert!(offset <= *len);
458 RingBuffer::with_readable(
459 storage,
460 (*head + offset) % storage.len(),
461 *len - offset,
462 |readable| f(readable.into_iter().map(|x| *x).collect()),
463 )
464 }
465 }
466
467 impl RingBuffer {
468 pub(crate) fn enqueue_data(&mut self, data: &[u8]) -> usize {
472 let nwritten = self.write_at(0, &data);
473 self.make_readable(nwritten, false);
474 nwritten
475 }
476 }
477
478 impl Buffer for Arc<Mutex<RingBuffer>> {
479 fn limits(&self) -> BufferLimits {
480 self.lock().limits()
481 }
482
483 fn target_capacity(&self) -> usize {
484 self.lock().target_capacity()
485 }
486
487 fn request_capacity(&mut self, size: usize) {
488 self.lock().request_capacity(size)
489 }
490 }
491
492 impl ReceiveBuffer for Arc<Mutex<RingBuffer>> {
493 fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize {
494 self.lock().write_at(offset, data)
495 }
496
497 fn make_readable(&mut self, count: usize, has_outstanding: bool) {
498 self.lock().make_readable(count, has_outstanding)
499 }
500 }
501
502 #[derive(Debug, Default)]
504 pub struct TestSendBuffer {
505 fake_stream: Arc<Mutex<Vec<u8>>>,
506 ring: RingBuffer,
507 }
508
509 impl TestSendBuffer {
510 pub fn new(fake_stream: Arc<Mutex<Vec<u8>>>, ring: RingBuffer) -> TestSendBuffer {
513 Self { fake_stream, ring }
514 }
515
516 pub fn enqueue_data(&mut self, data: &[u8]) {
518 self.fake_stream.lock().extend_from_slice(data);
519 }
520 }
521
522 impl Buffer for TestSendBuffer {
523 fn limits(&self) -> BufferLimits {
524 let Self { fake_stream, ring } = self;
525 let BufferLimits { capacity: ring_capacity, len: ring_len } = ring.limits();
526 let guard = fake_stream.lock();
527 let len = ring_len + guard.len();
528 let capacity = ring_capacity + guard.capacity();
529 BufferLimits { len, capacity }
530 }
531
532 fn target_capacity(&self) -> usize {
533 let Self { fake_stream: _, ring } = self;
534 ring.target_capacity()
535 }
536
537 fn request_capacity(&mut self, size: usize) {
538 let Self { fake_stream: _, ring } = self;
539 ring.request_capacity(size)
540 }
541 }
542
543 impl SendBuffer for TestSendBuffer {
544 type Payload<'a> = FragmentedPayload<'a, 2>;
545
546 fn mark_read(&mut self, count: usize) {
547 let Self { fake_stream: _, ring } = self;
548 ring.mark_read(count)
549 }
550
551 fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
552 where
553 F: FnOnce(Self::Payload<'a>) -> R,
554 {
555 let Self { fake_stream, ring } = self;
556 let mut guard = fake_stream.lock();
557 if !guard.is_empty() {
558 let BufferLimits { capacity, len } = ring.limits();
560 let len = (capacity - len).min(guard.len());
561 let rest = guard.split_off(len);
562 let first = core::mem::replace(&mut *guard, rest);
563 assert_eq!(ring.enqueue_data(&first[..]), len);
564 }
565 ring.peek_with(offset, f)
566 }
567 }
568
569 fn arc_mutex_eq<T: PartialEq>(a: &Arc<Mutex<T>>, b: &Arc<Mutex<T>>) -> bool {
570 if Arc::ptr_eq(a, b) {
571 return true;
572 }
573 (&*a.lock()) == (&*b.lock())
574 }
575
576 #[derive(Clone, Debug, Default)]
578 pub struct ClientBuffers {
579 pub receive: Arc<Mutex<RingBuffer>>,
581 pub send: Arc<Mutex<Vec<u8>>>,
583 }
584
585 impl PartialEq for ClientBuffers {
586 fn eq(&self, ClientBuffers { receive: other_receive, send: other_send }: &Self) -> bool {
587 let Self { receive, send } = self;
588 arc_mutex_eq(receive, other_receive) && arc_mutex_eq(send, other_send)
589 }
590 }
591
592 impl Eq for ClientBuffers {}
593
594 impl ClientBuffers {
595 pub fn new(buffer_sizes: BufferSizes) -> Self {
597 let BufferSizes { send, receive } = buffer_sizes;
598 Self {
599 receive: Arc::new(Mutex::new(RingBuffer::new(receive))),
600 send: Arc::new(Mutex::new(Vec::with_capacity(send))),
601 }
602 }
603 }
604
605 #[derive(Debug, Clone, Eq, PartialEq)]
607 #[allow(missing_docs)]
608 pub enum ProvidedBuffers {
609 Buffers(WriteBackClientBuffers),
610 NoBuffers,
611 }
612
613 impl Default for ProvidedBuffers {
614 fn default() -> Self {
615 Self::NoBuffers
616 }
617 }
618
619 impl From<WriteBackClientBuffers> for ProvidedBuffers {
620 fn from(buffers: WriteBackClientBuffers) -> Self {
621 ProvidedBuffers::Buffers(buffers)
622 }
623 }
624
625 impl From<ProvidedBuffers> for WriteBackClientBuffers {
626 fn from(extra: ProvidedBuffers) -> Self {
627 match extra {
628 ProvidedBuffers::Buffers(buffers) => buffers,
629 ProvidedBuffers::NoBuffers => Default::default(),
630 }
631 }
632 }
633
634 impl From<ProvidedBuffers> for () {
635 fn from(_: ProvidedBuffers) -> Self {
636 ()
637 }
638 }
639
640 impl From<()> for ProvidedBuffers {
641 fn from(_: ()) -> Self {
642 Default::default()
643 }
644 }
645
646 #[derive(Debug, Default, Clone)]
649 pub struct WriteBackClientBuffers(pub Arc<Mutex<Option<ClientBuffers>>>);
650
651 impl PartialEq for WriteBackClientBuffers {
652 fn eq(&self, Self(other): &Self) -> bool {
653 let Self(this) = self;
654 arc_mutex_eq(this, other)
655 }
656 }
657
658 impl Eq for WriteBackClientBuffers {}
659
660 impl IntoBuffers<Arc<Mutex<RingBuffer>>, TestSendBuffer> for ProvidedBuffers {
661 fn into_buffers(
662 self,
663 buffer_sizes: BufferSizes,
664 ) -> (Arc<Mutex<RingBuffer>>, TestSendBuffer) {
665 let buffers = ClientBuffers::new(buffer_sizes);
666 if let ProvidedBuffers::Buffers(b) = self {
667 *b.0.as_ref().lock() = Some(buffers.clone());
668 }
669 let ClientBuffers { receive, send } = buffers;
670 (receive, TestSendBuffer::new(send, Default::default()))
671 }
672 }
673
674 impl ListenerNotifier for ProvidedBuffers {
675 fn new_incoming_connections(&mut self, _: usize) {}
676 }
677
678 #[derive(Debug)]
679 pub struct RepeatingPayload {
680 len: usize,
681 }
682
683 impl RepeatingPayload {
684 const REPEATING_BYTE: u8 = 0xAA;
685 }
686
687 impl PayloadLen for RepeatingPayload {
688 fn len(&self) -> usize {
689 self.len
690 }
691 }
692
693 impl Payload for RepeatingPayload {
694 fn slice(self, range: Range<u32>) -> Self {
695 Self { len: usize::try_from(range.end - range.start).unwrap() }
696 }
697
698 fn partial_copy(&self, offset: usize, dst: &mut [u8]) {
699 assert!(offset < self.len);
700 assert_eq!(dst.len() - offset, self.len);
701 dst.fill(Self::REPEATING_BYTE);
702 }
703
704 fn partial_copy_uninit(&self, offset: usize, dst: &mut [core::mem::MaybeUninit<u8>]) {
705 assert!(offset < self.len);
706 assert_eq!(dst.len() - offset, self.len);
707 dst.fill(core::mem::MaybeUninit::new(Self::REPEATING_BYTE));
708 }
709
710 fn new_empty() -> Self {
711 Self { len: 0 }
712 }
713 }
714
715 impl InnerPacketBuilder for RepeatingPayload {
716 fn bytes_len(&self) -> usize {
717 self.len
718 }
719
720 fn serialize(&self, buffer: &mut [u8]) {
721 buffer.fill(Self::REPEATING_BYTE)
722 }
723 }
724
725 #[derive(Default, Debug, Eq, PartialEq)]
727 pub struct InfiniteSendBuffer;
728
729 impl InfiniteSendBuffer {
730 const LEN: usize = usize::MAX as usize;
731 }
732
733 impl Buffer for InfiniteSendBuffer {
734 fn limits(&self) -> BufferLimits {
735 BufferLimits { capacity: Self::LEN, len: Self::LEN }
736 }
737
738 fn target_capacity(&self) -> usize {
739 Self::LEN
740 }
741
742 fn request_capacity(&mut self, size: usize) {
743 unimplemented!("can't change capacity of infinite send buffer to {size}")
744 }
745 }
746
747 impl SendBuffer for InfiniteSendBuffer {
748 type Payload<'a> = RepeatingPayload;
749
750 fn mark_read(&mut self, _count: usize) {}
751
752 fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
753 where
754 F: FnOnce(Self::Payload<'a>) -> R,
755 {
756 f(RepeatingPayload { len: Self::LEN - offset })
757 }
758 }
759
760 #[derive(Default, Debug, Eq, PartialEq)]
763 pub struct RepeatingSendBuffer(usize);
764
765 impl RepeatingSendBuffer {
766 pub fn new(length: usize) -> Self {
768 Self(length)
769 }
770 }
771
772 impl Buffer for RepeatingSendBuffer {
773 fn limits(&self) -> BufferLimits {
774 let Self(len) = self;
775 BufferLimits { capacity: usize::MAX, len: *len }
776 }
777
778 fn target_capacity(&self) -> usize {
779 usize::MAX
780 }
781
782 fn request_capacity(&mut self, size: usize) {
783 unimplemented!("can't change capacity of repeatable send buffer to {size}")
784 }
785 }
786
787 impl SendBuffer for RepeatingSendBuffer {
788 type Payload<'a> = RepeatingPayload;
789
790 fn mark_read(&mut self, count: usize) {
791 let Self(len) = self;
792 *len = *len - count;
793 }
794
795 fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
796 where
797 F: FnOnce(Self::Payload<'a>) -> R,
798 {
799 let Self(len) = self;
800 f(RepeatingPayload { len: *len - offset })
801 }
802 }
803}
804
805#[cfg(test)]
806mod test {
807 use alloc::vec::Vec;
808 use alloc::{format, vec};
809
810 use netstack3_base::FragmentedPayload;
811 use proptest::strategy::{Just, Strategy};
812 use proptest::test_runner::Config;
813 use proptest::{prop_assert, prop_assert_eq, proptest};
814 use proptest_support::failed_seeds_no_std;
815 use test_case::test_case;
816 use testutil::RingBuffer;
817
818 use super::*;
819 proptest! {
820 #![proptest_config(Config {
821 failure_persistence: failed_seeds_no_std!(
823 "cc f621ca7d3a2b108e0dc41f7169ad028f4329b79e90e73d5f68042519a9f63999",
824 "cc c449aebed201b4ec4f137f3c224f20325f4cfee0b7fd596d9285176b6d811aa9"
825 ),
826 ..Config::default()
827 })]
828
829 #[test]
830 fn ring_buffer_make_readable((mut rb, avail) in ring_buffer::with_written()) {
831 let old_storage = rb.storage.clone();
832 let old_head = rb.head;
833 let old_len = rb.limits().len;
834 rb.make_readable(avail, false);
835 let RingBuffer { storage, head, len } = rb;
837 prop_assert_eq!(len, old_len + avail);
838 prop_assert_eq!(head, old_head);
839 prop_assert_eq!(storage, old_storage);
840 }
841
842 #[test]
843 fn ring_buffer_write_at((mut rb, offset, data) in ring_buffer::with_offset_data()) {
844 let old_head = rb.head;
845 let old_len = rb.limits().len;
846 prop_assert_eq!(rb.write_at(offset, &&data[..]), data.len());
847 prop_assert_eq!(rb.head, old_head);
848 prop_assert_eq!(rb.limits().len, old_len);
849 for i in 0..data.len() {
850 let masked = (rb.head + rb.len + offset + i) % rb.storage.len();
851 prop_assert_eq!(rb.storage[masked], data[i]);
853 rb.storage[masked] = 0;
854 }
855 prop_assert_eq!(&rb.storage, &vec![0; rb.storage.len()]);
857 }
858
859 #[test]
860 fn ring_buffer_read_with((mut rb, expected, consume) in ring_buffer::with_read_data()) {
861 prop_assert_eq!(rb.limits().len, expected.len());
862 let nread = rb.read_with(|readable| {
863 assert!(readable.len() == 1 || readable.len() == 2);
864 let got = readable.concat();
865 assert_eq!(got, expected);
866 consume
867 });
868 prop_assert_eq!(nread, consume);
869 prop_assert_eq!(rb.limits().len, expected.len() - consume);
870 }
871
872 #[test]
873 fn ring_buffer_mark_read((mut rb, readable) in ring_buffer::with_readable()) {
874 const BYTE_TO_WRITE: u8 = 0x42;
875 let written = rb.writable_regions().into_iter().fold(0, |acc, slice| {
876 slice.fill(BYTE_TO_WRITE);
877 acc + slice.len()
878 });
879 let old_storage = rb.storage.clone();
880 let old_head = rb.head;
881 let old_len = rb.limits().len;
882
883 rb.mark_read(readable);
884 let new_writable = rb.writable_regions().into_iter().fold(Vec::new(), |mut acc, slice| {
885 acc.extend_from_slice(slice);
886 acc
887 });
888 for (i, x) in new_writable.iter().enumerate().take(written) {
889 prop_assert_eq!(*x, BYTE_TO_WRITE, "i={}, rb={:?}", i, rb);
890 }
891 prop_assert!(new_writable.len() >= written);
892
893 let RingBuffer { storage, head, len } = rb;
894 prop_assert_eq!(len, old_len - readable);
895 prop_assert_eq!(head, (old_head + readable) % old_storage.len());
896 prop_assert_eq!(storage, old_storage);
897 }
898
899 #[test]
900 fn ring_buffer_peek_with((mut rb, expected, offset) in ring_buffer::with_read_data()) {
901 prop_assert_eq!(rb.limits().len, expected.len());
902 rb.peek_with(offset, |readable| {
903 prop_assert_eq!(readable.to_vec(), &expected[offset..]);
904 Ok(())
905 })?;
906 prop_assert_eq!(rb.limits().len, expected.len());
907 }
908
909 #[test]
910 fn ring_buffer_writable_regions(mut rb in ring_buffer::arb_ring_buffer()) {
911 const BYTE_TO_WRITE: u8 = 0x42;
912 let writable_len = rb.writable_regions().into_iter().fold(0, |acc, slice| {
913 slice.fill(BYTE_TO_WRITE);
914 acc + slice.len()
915 });
916 let BufferLimits {len, capacity} = rb.limits();
917 prop_assert_eq!(writable_len + len, capacity);
918 for i in 0..capacity {
919 let expected = if i < len {
920 0
921 } else {
922 BYTE_TO_WRITE
923 };
924 let idx = (rb.head + i) % rb.storage.len();
925 prop_assert_eq!(rb.storage[idx], expected);
926 }
927 }
928 }
929
930 #[test_case([Range { start: 0, end: 0 }]
931 => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(0), generation: 0 })]
932 #[test_case([Range { start: 0, end: 10 }]
933 => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(10), generation: 1 })]
934 #[test_case([Range{ start: 10, end: 15 }, Range { start: 5, end: 10 }]
935 => Assembler {
936 outstanding: [
937 SeqRange::new(SeqNum::new(5)..SeqNum::new(15), 2).unwrap()
938 ].into_iter().collect(),
939 nxt: SeqNum::new(0),
940 generation: 2,
941 })
942 ]
943 #[test_case([Range{ start: 10, end: 15 }, Range { start: 0, end: 5 }, Range { start: 5, end: 10 }]
944 => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(15), generation: 3 })]
945 #[test_case([Range{ start: 10, end: 15 }, Range { start: 5, end: 10 }, Range { start: 0, end: 5 }]
946 => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(15), generation: 3 })]
947 #[test_case([Range{ start: 10, end: 15 }, Range { start: 10, end: 15 }, Range { start: 11, end: 12 }]
948 => Assembler {
949 outstanding: [
950 SeqRange::new(SeqNum::new(10)..SeqNum::new(15), 3).unwrap()
951 ].into_iter().collect(),
952 nxt: SeqNum::new(0), generation: 3 })]
953 fn assembler_examples(ops: impl IntoIterator<Item = Range<u32>>) -> Assembler {
954 let mut assembler = Assembler::new(SeqNum::new(0));
955 for Range { start, end } in ops.into_iter() {
956 let _advanced = assembler.insert(SeqNum::new(start)..SeqNum::new(end));
957 }
958 assembler
959 }
960
961 #[test_case(&[] => Vec::<Range<u32>>::new(); "empty")]
962 #[test_case(&[1..2] => vec![1..2]; "single")]
963 #[test_case(&[1..2, 3..4] => vec![3..4, 1..2]; "latest first")]
964 #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10]
965 => vec![9..10, 7..8, 5..6, 3..4]; "max len")]
966 #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10, 6..7]
967 => vec![5..8, 9..10, 3..4, 1..2]; "gap fill")]
968 #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10, 1..8]
969 => vec![1..8, 9..10]; "large gap fill")]
970 fn assembler_sack_blocks(ops: &[Range<u32>]) -> Vec<Range<u32>> {
971 let mut assembler = Assembler::new(SeqNum::new(0));
972 for Range { start, end } in ops {
973 let _: usize = assembler.insert(SeqNum::new(*start)..SeqNum::new(*end));
974 }
975 assembler
976 .sack_blocks(SackBlockSizeLimiters { timestamp_enabled: false })
977 .try_iter()
978 .map(|r| r.expect("invalid block").into_range_u32())
979 .collect()
980 }
981
982 #[test_case(false => vec![10..11, 7..8, 4..5, 1..2]; "4_blocks_with_timestamp_disabled")]
983 #[test_case(true => vec![10..11, 7..8, 4..5]; "3_blocks_with_timestamp_disabled")]
984 fn assembler_sack_blocks_with_timestamp(timestamp_enabled: bool) -> Vec<Range<u32>> {
985 let mut assembler = Assembler::new(SeqNum::new(0));
986 for Range { start, end } in [1..2, 4..5, 7..8, 10..11] {
987 let _: usize = assembler.insert(SeqNum::new(start)..SeqNum::new(end));
988 }
989 assembler
990 .sack_blocks(SackBlockSizeLimiters { timestamp_enabled })
991 .try_iter()
992 .map(|r| r.expect("invalid_block").into_range_u32())
993 .collect()
994 }
995
996 #[test]
997 fn ring_buffer_wrap_around() {
999 const CAPACITY: usize = 16;
1000 let mut rb = RingBuffer::new(CAPACITY);
1001
1002 const BUF_SIZE: usize = 10;
1004 assert_eq!(rb.enqueue_data(&[0xAA; BUF_SIZE]), BUF_SIZE);
1005 rb.peek_with(0, |payload| {
1006 assert_eq!(payload, FragmentedPayload::new_contiguous(&[0xAA; BUF_SIZE]))
1007 });
1008 rb.mark_read(BUF_SIZE);
1009
1010 assert_eq!(rb.enqueue_data(&[0xBB; BUF_SIZE]), BUF_SIZE);
1012 rb.peek_with(0, |payload| {
1013 assert_eq!(
1014 payload,
1015 FragmentedPayload::new([
1016 &[0xBB; (CAPACITY - BUF_SIZE)],
1017 &[0xBB; (BUF_SIZE * 2 - CAPACITY)]
1018 ])
1019 )
1020 });
1021 rb.mark_read(BUF_SIZE);
1024
1025 assert_eq!(rb.enqueue_data(&[0xCC; BUF_SIZE]), BUF_SIZE);
1027 rb.peek_with(0, |payload| {
1028 assert_eq!(payload, FragmentedPayload::new_contiguous(&[0xCC; BUF_SIZE]))
1029 });
1030
1031 let read = rb.read_with(|segments| {
1034 assert_eq!(segments, [[0xCC; BUF_SIZE]]);
1035 BUF_SIZE
1036 });
1037 assert_eq!(read, BUF_SIZE);
1038 }
1039
1040 #[test]
1041 fn ring_buffer_example() {
1042 let mut rb = RingBuffer::new(16);
1043 assert_eq!(rb.write_at(5, &"World".as_bytes()), 5);
1044 assert_eq!(rb.write_at(0, &"Hello".as_bytes()), 5);
1045 rb.make_readable(10, false);
1046 assert_eq!(
1047 rb.read_with(|readable| {
1048 assert_eq!(readable, &["HelloWorld".as_bytes()]);
1049 5
1050 }),
1051 5
1052 );
1053 assert_eq!(
1054 rb.read_with(|readable| {
1055 assert_eq!(readable, &["World".as_bytes()]);
1056 readable[0].len()
1057 }),
1058 5
1059 );
1060 assert_eq!(rb.write_at(0, &"HelloWorld".as_bytes()), 10);
1061 rb.make_readable(10, false);
1062 assert_eq!(
1063 rb.read_with(|readable| {
1064 assert_eq!(readable, &["HelloW".as_bytes(), "orld".as_bytes()]);
1065 6
1066 }),
1067 6
1068 );
1069 assert_eq!(rb.limits().len, 4);
1070 assert_eq!(
1071 rb.read_with(|readable| {
1072 assert_eq!(readable, &["orld".as_bytes()]);
1073 4
1074 }),
1075 4
1076 );
1077 assert_eq!(rb.limits().len, 0);
1078
1079 assert_eq!(rb.enqueue_data("Hello".as_bytes()), 5);
1080 assert_eq!(rb.limits().len, 5);
1081
1082 rb.peek_with(3, |readable| {
1083 assert_eq!(readable.to_vec(), "lo".as_bytes());
1084 });
1085
1086 rb.mark_read(2);
1087
1088 rb.peek_with(0, |readable| {
1089 assert_eq!(readable.to_vec(), "llo".as_bytes());
1090 });
1091 }
1092
1093 mod ring_buffer {
1094 use super::*;
1095 const MAX_CAP: usize = 32;
1098
1099 fn arb_ring_buffer_args() -> impl Strategy<Value = (usize, usize, usize)> {
1100 (1..=MAX_CAP).prop_flat_map(|cap| {
1103 let max_len = cap;
1104 (Just(cap), 0..cap, 0..=max_len)
1106 })
1107 }
1108
1109 pub(super) fn arb_ring_buffer() -> impl Strategy<Value = RingBuffer> {
1110 arb_ring_buffer_args().prop_map(|(cap, head, len)| RingBuffer {
1111 storage: vec![0; cap],
1112 head,
1113 len,
1114 })
1115 }
1116
1117 pub(super) fn with_readable() -> impl Strategy<Value = (RingBuffer, usize)> {
1119 arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
1120 (Just(RingBuffer { storage: vec![0; cap], head, len }), 0..=len)
1121 })
1122 }
1123
1124 pub(super) fn with_written() -> impl Strategy<Value = (RingBuffer, usize)> {
1126 arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
1127 let rb = RingBuffer { storage: vec![0; cap], head, len };
1128 let max_written = cap - len;
1129 (Just(rb), 0..=max_written)
1130 })
1131 }
1132
1133 pub(super) fn with_offset_data() -> impl Strategy<Value = (RingBuffer, usize, Vec<u8>)> {
1135 arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
1136 let writable_len = cap - len;
1137 (0..=writable_len).prop_flat_map(move |offset| {
1138 (0..=writable_len - offset).prop_flat_map(move |data_len| {
1139 (
1140 Just(RingBuffer { storage: vec![0; cap], head, len }),
1141 Just(offset),
1142 proptest::collection::vec(1..=u8::MAX, data_len),
1143 )
1144 })
1145 })
1146 })
1147 }
1148
1149 pub(super) fn with_read_data() -> impl Strategy<Value = (RingBuffer, Vec<u8>, usize)> {
1152 arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
1153 proptest::collection::vec(1..=u8::MAX, len).prop_flat_map(move |data| {
1154 let mut rb = RingBuffer { storage: vec![0; cap], head, len: 0 };
1156 assert_eq!(rb.write_at(0, &&data[..]), len);
1157 rb.make_readable(len, false);
1158 (Just(rb), Just(data), 0..=len)
1159 })
1160 })
1161 }
1162 }
1163}