1use netstack3_base::{Payload, SackBlocks, SeqNum};
10
11use arrayvec::ArrayVec;
12use core::fmt::Debug;
13use core::ops::Range;
14use packet::InnerPacketBuilder;
15
16use crate::internal::base::BufferSizes;
17use crate::internal::seq_ranges::{SeqRange, SeqRanges};
18
19pub trait Buffer: Debug + Sized {
21 fn capacity_range() -> (usize, usize);
23
24 fn limits(&self) -> BufferLimits;
29
30 fn target_capacity(&self) -> usize;
42
43 fn request_capacity(&mut self, size: usize);
49}
50
51pub trait ReceiveBuffer: Buffer {
53 fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize;
57
58 fn make_readable(&mut self, count: usize, has_outstanding: bool);
69}
70
71pub trait SendBuffer: Buffer {
73 type Payload<'a>: InnerPacketBuilder + Payload + Debug + 'a;
75
76 fn mark_read(&mut self, count: usize);
83
84 fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
92 where
93 F: FnOnce(Self::Payload<'a>) -> R;
94}
95
96#[derive(Eq, PartialEq, Debug, Copy, Clone)]
98pub struct BufferLimits {
99 pub capacity: usize,
101
102 pub len: usize,
104}
105
106#[derive(Debug)]
108#[cfg_attr(test, derive(PartialEq, Eq))]
109pub(super) struct Assembler {
110 nxt: SeqNum,
114 generation: usize,
118 outstanding: SeqRanges<usize>,
123}
124
125impl Assembler {
126 pub(super) fn new(nxt: SeqNum) -> Self {
128 Self { outstanding: SeqRanges::default(), generation: 0, nxt }
129 }
130
131 pub(super) fn nxt(&self) -> SeqNum {
133 self.nxt
134 }
135
136 pub(super) fn has_out_of_order(&self) -> bool {
139 !self.outstanding.is_empty()
140 }
141
142 pub(super) fn insert(&mut self, Range { start, end }: Range<SeqNum>) -> usize {
154 assert!(!start.after(end));
155 assert!(!start.before(self.nxt));
156 if start == end {
157 return 0;
158 }
159
160 let Self { outstanding, nxt, generation } = self;
161 *generation = *generation + 1;
162 let _: bool = outstanding.insert(start..end, *generation);
163
164 if let Some(advanced) = outstanding.pop_front_if(|r| r.start() == *nxt) {
165 *nxt = advanced.end();
166 usize::try_from(advanced.len()).unwrap()
167 } else {
168 0
169 }
170 }
171
172 pub(super) fn has_outstanding(&self) -> bool {
173 let Self { outstanding, nxt: _, generation: _ } = self;
174 !outstanding.is_empty()
175 }
176
177 pub(crate) fn sack_blocks(&self) -> SackBlocks {
193 let Self { nxt: _, generation: _, outstanding } = self;
194 if outstanding.is_empty() {
196 return SackBlocks::default();
197 }
198
199 let mut heap = ArrayVec::<&SeqRange<_>, { SackBlocks::MAX_BLOCKS }>::new();
200 for block in outstanding.iter() {
201 if heap.is_full() {
202 if heap.last().is_some_and(|l| l.meta() < block.meta()) {
203 let _: Option<_> = heap.pop();
205 } else {
206 continue;
209 }
210 }
211
212 heap.push(block);
213 heap.sort_by(|a, b| b.meta().cmp(&a.meta()))
215 }
216
217 SackBlocks::from_iter(heap.into_iter().map(|block| block.to_sack_block()))
218 }
219}
220
221pub trait IntoBuffers<R: ReceiveBuffer, S: SendBuffer> {
224 fn into_buffers(self, buffer_sizes: BufferSizes) -> (R, S);
226}
227
228#[cfg(any(test, feature = "testutils"))]
229impl<R: Default + ReceiveBuffer, S: Default + SendBuffer> IntoBuffers<R, S> for () {
230 fn into_buffers(self, buffer_sizes: BufferSizes) -> (R, S) {
231 let BufferSizes { send: _, receive: _ } = buffer_sizes;
233 Default::default()
234 }
235}
236
237#[cfg(any(test, feature = "testutils"))]
238pub(crate) mod testutil {
239 use super::*;
240
241 use alloc::sync::Arc;
242 use alloc::vec;
243 use alloc::vec::Vec;
244 use core::cmp;
245
246 use either::Either;
247 use netstack3_base::sync::Mutex;
248 use netstack3_base::{FragmentedPayload, PayloadLen, WindowSize};
249
250 use crate::internal::socket::accept_queue::ListenerNotifier;
251
252 #[derive(Clone, PartialEq, Eq)]
269 pub struct RingBuffer {
270 pub(super) storage: Vec<u8>,
271 pub(super) head: usize,
276 pub(super) len: usize,
281 }
282
283 impl Debug for RingBuffer {
284 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
285 let Self { storage, head, len } = self;
286 f.debug_struct("RingBuffer")
287 .field("storage (len, cap)", &(storage.len(), storage.capacity()))
288 .field("head", head)
289 .field("len", len)
290 .finish()
291 }
292 }
293
294 impl Default for RingBuffer {
295 fn default() -> Self {
296 Self::new(WindowSize::DEFAULT.into())
297 }
298 }
299
300 impl RingBuffer {
301 pub fn new(capacity: usize) -> Self {
303 Self { storage: vec![0; capacity], head: 0, len: 0 }
304 }
305
306 pub fn reset(&mut self) {
308 let Self { storage: _, head, len } = self;
309 *head = 0;
310 *len = 0;
311 }
312
313 fn with_readable<'a, F, R>(storage: &'a Vec<u8>, start: usize, len: usize, f: F) -> R
315 where
316 F: for<'b> FnOnce(&'b [&'a [u8]]) -> R,
317 {
318 let end = start + len;
320 if end > storage.len() {
321 let first_part = &storage[start..storage.len()];
322 let second_part = &storage[0..len - first_part.len()];
323 f(&[first_part, second_part][..])
324 } else {
325 let all_bytes = &storage[start..end];
326 f(&[all_bytes][..])
327 }
328 }
329
330 pub fn read_with<F>(&mut self, f: F) -> usize
338 where
339 F: for<'a, 'b> FnOnce(&'b [&'a [u8]]) -> usize,
340 {
341 let Self { storage, head, len } = self;
342 if storage.len() == 0 {
343 return f(&[&[]]);
344 }
345 let nread = RingBuffer::with_readable(storage, *head, *len, f);
346 assert!(nread <= *len);
347 *len -= nread;
348 *head = (*head + nread) % storage.len();
349 nread
350 }
351
352 pub fn writable_regions(&mut self) -> impl IntoIterator<Item = &mut [u8]> {
354 let BufferLimits { capacity, len } = self.limits();
355 let available = capacity - len;
356 let Self { storage, head, len } = self;
357
358 let mut write_start = *head + *len;
359 if write_start >= storage.len() {
360 write_start -= storage.len()
361 }
362 let write_end = write_start + available;
363 if write_end <= storage.len() {
364 Either::Left([&mut self.storage[write_start..write_end]].into_iter())
365 } else {
366 let (b1, b2) = self.storage[..].split_at_mut(write_start);
367 let b2_len = b2.len();
368 Either::Right([b2, &mut b1[..(available - b2_len)]].into_iter())
369 }
370 }
371 }
372
373 impl Buffer for RingBuffer {
374 fn capacity_range() -> (usize, usize) {
375 (16, 16 << 20)
378 }
379
380 fn limits(&self) -> BufferLimits {
381 let Self { storage, len, head: _ } = self;
382 let capacity = storage.len();
383 BufferLimits { len: *len, capacity }
384 }
385
386 fn target_capacity(&self) -> usize {
387 let Self { storage, len: _, head: _ } = self;
388 storage.len()
389 }
390
391 fn request_capacity(&mut self, size: usize) {
392 unimplemented!("capacity request for {size} not supported")
393 }
394 }
395
396 impl ReceiveBuffer for RingBuffer {
397 fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize {
398 let BufferLimits { capacity, len } = self.limits();
399 let available = capacity - len;
400 let Self { storage, head, len } = self;
401 if storage.len() == 0 {
402 return 0;
403 }
404
405 if offset > available {
406 return 0;
407 }
408 let start_at = (*head + *len + offset) % storage.len();
409 let to_write = cmp::min(data.len(), available);
410 let first_len = cmp::min(to_write, storage.len() - start_at);
412 data.partial_copy(0, &mut storage[start_at..start_at + first_len]);
413 if to_write > first_len {
416 data.partial_copy(first_len, &mut storage[0..to_write - first_len]);
417 }
418 to_write
419 }
420
421 fn make_readable(&mut self, count: usize, _has_outstanding: bool) {
422 let BufferLimits { capacity, len } = self.limits();
423 debug_assert!(count <= capacity - len);
424 self.len += count;
425 }
426 }
427
428 impl SendBuffer for RingBuffer {
429 type Payload<'a> = FragmentedPayload<'a, 2>;
430
431 fn mark_read(&mut self, count: usize) {
432 let Self { storage, head, len } = self;
433 assert!(count <= *len);
434 *len -= count;
435 *head = (*head + count) % storage.len();
436 }
437
438 fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
439 where
440 F: FnOnce(Self::Payload<'a>) -> R,
441 {
442 let Self { storage, head, len } = self;
443 if storage.len() == 0 {
444 return f(FragmentedPayload::new_empty());
445 }
446 assert!(offset <= *len);
447 RingBuffer::with_readable(
448 storage,
449 (*head + offset) % storage.len(),
450 *len - offset,
451 |readable| f(readable.into_iter().map(|x| *x).collect()),
452 )
453 }
454 }
455
456 impl RingBuffer {
457 pub(crate) fn enqueue_data(&mut self, data: &[u8]) -> usize {
461 let nwritten = self.write_at(0, &data);
462 self.make_readable(nwritten, false);
463 nwritten
464 }
465 }
466
467 impl Buffer for Arc<Mutex<RingBuffer>> {
468 fn capacity_range() -> (usize, usize) {
469 RingBuffer::capacity_range()
470 }
471
472 fn limits(&self) -> BufferLimits {
473 self.lock().limits()
474 }
475
476 fn target_capacity(&self) -> usize {
477 self.lock().target_capacity()
478 }
479
480 fn request_capacity(&mut self, size: usize) {
481 self.lock().request_capacity(size)
482 }
483 }
484
485 impl ReceiveBuffer for Arc<Mutex<RingBuffer>> {
486 fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize {
487 self.lock().write_at(offset, data)
488 }
489
490 fn make_readable(&mut self, count: usize, has_outstanding: bool) {
491 self.lock().make_readable(count, has_outstanding)
492 }
493 }
494
495 #[derive(Debug, Default)]
497 pub struct TestSendBuffer {
498 fake_stream: Arc<Mutex<Vec<u8>>>,
499 ring: RingBuffer,
500 }
501
502 impl TestSendBuffer {
503 pub fn new(fake_stream: Arc<Mutex<Vec<u8>>>, ring: RingBuffer) -> TestSendBuffer {
506 Self { fake_stream, ring }
507 }
508 }
509
510 impl Buffer for TestSendBuffer {
511 fn capacity_range() -> (usize, usize) {
512 let (min, max) = RingBuffer::capacity_range();
513 (min * 2, max * 2)
514 }
515
516 fn limits(&self) -> BufferLimits {
517 let Self { fake_stream, ring } = self;
518 let BufferLimits { capacity: ring_capacity, len: ring_len } = ring.limits();
519 let guard = fake_stream.lock();
520 let len = ring_len + guard.len();
521 let capacity = ring_capacity + guard.capacity();
522 BufferLimits { len, capacity }
523 }
524
525 fn target_capacity(&self) -> usize {
526 let Self { fake_stream: _, ring } = self;
527 ring.target_capacity()
528 }
529
530 fn request_capacity(&mut self, size: usize) {
531 let Self { fake_stream: _, ring } = self;
532 ring.request_capacity(size)
533 }
534 }
535
536 impl SendBuffer for TestSendBuffer {
537 type Payload<'a> = FragmentedPayload<'a, 2>;
538
539 fn mark_read(&mut self, count: usize) {
540 let Self { fake_stream: _, ring } = self;
541 ring.mark_read(count)
542 }
543
544 fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
545 where
546 F: FnOnce(Self::Payload<'a>) -> R,
547 {
548 let Self { fake_stream, ring } = self;
549 let mut guard = fake_stream.lock();
550 if !guard.is_empty() {
551 let BufferLimits { capacity, len } = ring.limits();
553 let len = (capacity - len).min(guard.len());
554 let rest = guard.split_off(len);
555 let first = core::mem::replace(&mut *guard, rest);
556 assert_eq!(ring.enqueue_data(&first[..]), len);
557 }
558 ring.peek_with(offset, f)
559 }
560 }
561
562 fn arc_mutex_eq<T: PartialEq>(a: &Arc<Mutex<T>>, b: &Arc<Mutex<T>>) -> bool {
563 if Arc::ptr_eq(a, b) {
564 return true;
565 }
566 (&*a.lock()) == (&*b.lock())
567 }
568
569 #[derive(Clone, Debug, Default)]
571 pub struct ClientBuffers {
572 pub receive: Arc<Mutex<RingBuffer>>,
574 pub send: Arc<Mutex<Vec<u8>>>,
576 }
577
578 impl PartialEq for ClientBuffers {
579 fn eq(&self, ClientBuffers { receive: other_receive, send: other_send }: &Self) -> bool {
580 let Self { receive, send } = self;
581 arc_mutex_eq(receive, other_receive) && arc_mutex_eq(send, other_send)
582 }
583 }
584
585 impl Eq for ClientBuffers {}
586
587 impl ClientBuffers {
588 pub fn new(buffer_sizes: BufferSizes) -> Self {
590 let BufferSizes { send, receive } = buffer_sizes;
591 Self {
592 receive: Arc::new(Mutex::new(RingBuffer::new(receive))),
593 send: Arc::new(Mutex::new(Vec::with_capacity(send))),
594 }
595 }
596 }
597
598 #[derive(Debug, Clone, Eq, PartialEq)]
600 #[allow(missing_docs)]
601 pub enum ProvidedBuffers {
602 Buffers(WriteBackClientBuffers),
603 NoBuffers,
604 }
605
606 impl Default for ProvidedBuffers {
607 fn default() -> Self {
608 Self::NoBuffers
609 }
610 }
611
612 impl From<WriteBackClientBuffers> for ProvidedBuffers {
613 fn from(buffers: WriteBackClientBuffers) -> Self {
614 ProvidedBuffers::Buffers(buffers)
615 }
616 }
617
618 impl From<ProvidedBuffers> for WriteBackClientBuffers {
619 fn from(extra: ProvidedBuffers) -> Self {
620 match extra {
621 ProvidedBuffers::Buffers(buffers) => buffers,
622 ProvidedBuffers::NoBuffers => Default::default(),
623 }
624 }
625 }
626
627 impl From<ProvidedBuffers> for () {
628 fn from(_: ProvidedBuffers) -> Self {
629 ()
630 }
631 }
632
633 impl From<()> for ProvidedBuffers {
634 fn from(_: ()) -> Self {
635 Default::default()
636 }
637 }
638
639 #[derive(Debug, Default, Clone)]
642 pub struct WriteBackClientBuffers(pub Arc<Mutex<Option<ClientBuffers>>>);
643
644 impl PartialEq for WriteBackClientBuffers {
645 fn eq(&self, Self(other): &Self) -> bool {
646 let Self(this) = self;
647 arc_mutex_eq(this, other)
648 }
649 }
650
651 impl Eq for WriteBackClientBuffers {}
652
653 impl IntoBuffers<Arc<Mutex<RingBuffer>>, TestSendBuffer> for ProvidedBuffers {
654 fn into_buffers(
655 self,
656 buffer_sizes: BufferSizes,
657 ) -> (Arc<Mutex<RingBuffer>>, TestSendBuffer) {
658 let buffers = ClientBuffers::new(buffer_sizes);
659 if let ProvidedBuffers::Buffers(b) = self {
660 *b.0.as_ref().lock() = Some(buffers.clone());
661 }
662 let ClientBuffers { receive, send } = buffers;
663 (receive, TestSendBuffer::new(send, Default::default()))
664 }
665 }
666
667 impl ListenerNotifier for ProvidedBuffers {
668 fn new_incoming_connections(&mut self, _: usize) {}
669 }
670
671 #[derive(Debug)]
672 pub struct RepeatingPayload {
673 len: usize,
674 }
675
676 impl RepeatingPayload {
677 const REPEATING_BYTE: u8 = 0xAA;
678 }
679
680 impl PayloadLen for RepeatingPayload {
681 fn len(&self) -> usize {
682 self.len
683 }
684 }
685
686 impl Payload for RepeatingPayload {
687 fn slice(self, range: Range<u32>) -> Self {
688 Self { len: usize::try_from(range.end - range.start).unwrap() }
689 }
690
691 fn partial_copy(&self, offset: usize, dst: &mut [u8]) {
692 assert!(offset < self.len);
693 assert_eq!(dst.len() - offset, self.len);
694 dst.fill(Self::REPEATING_BYTE);
695 }
696
697 fn partial_copy_uninit(&self, offset: usize, dst: &mut [core::mem::MaybeUninit<u8>]) {
698 assert!(offset < self.len);
699 assert_eq!(dst.len() - offset, self.len);
700 dst.fill(core::mem::MaybeUninit::new(Self::REPEATING_BYTE));
701 }
702
703 fn new_empty() -> Self {
704 Self { len: 0 }
705 }
706 }
707
708 impl InnerPacketBuilder for RepeatingPayload {
709 fn bytes_len(&self) -> usize {
710 self.len
711 }
712
713 fn serialize(&self, buffer: &mut [u8]) {
714 buffer.fill(Self::REPEATING_BYTE)
715 }
716 }
717
718 #[derive(Default, Debug, Eq, PartialEq)]
720 pub struct InfiniteSendBuffer;
721
722 impl InfiniteSendBuffer {
723 const LEN: usize = usize::MAX as usize;
724 }
725
726 impl Buffer for InfiniteSendBuffer {
727 fn capacity_range() -> (usize, usize) {
728 (0, Self::LEN)
729 }
730
731 fn limits(&self) -> BufferLimits {
732 BufferLimits { capacity: Self::LEN, len: Self::LEN }
733 }
734
735 fn target_capacity(&self) -> usize {
736 Self::LEN
737 }
738
739 fn request_capacity(&mut self, size: usize) {
740 unimplemented!("can't change capacity of infinite send buffer to {size}")
741 }
742 }
743
744 impl SendBuffer for InfiniteSendBuffer {
745 type Payload<'a> = RepeatingPayload;
746
747 fn mark_read(&mut self, _count: usize) {}
748
749 fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
750 where
751 F: FnOnce(Self::Payload<'a>) -> R,
752 {
753 f(RepeatingPayload { len: Self::LEN - offset })
754 }
755 }
756
757 #[derive(Default, Debug, Eq, PartialEq)]
760 pub struct RepeatingSendBuffer(usize);
761
762 impl RepeatingSendBuffer {
763 pub fn new(length: usize) -> Self {
765 Self(length)
766 }
767 }
768
769 impl Buffer for RepeatingSendBuffer {
770 fn capacity_range() -> (usize, usize) {
771 (0, usize::MAX)
772 }
773
774 fn limits(&self) -> BufferLimits {
775 let Self(len) = self;
776 BufferLimits { capacity: usize::MAX, len: *len }
777 }
778
779 fn target_capacity(&self) -> usize {
780 usize::MAX
781 }
782
783 fn request_capacity(&mut self, size: usize) {
784 unimplemented!("can't change capacity of repeatable send buffer to {size}")
785 }
786 }
787
788 impl SendBuffer for RepeatingSendBuffer {
789 type Payload<'a> = RepeatingPayload;
790
791 fn mark_read(&mut self, count: usize) {
792 let Self(len) = self;
793 *len = *len - count;
794 }
795
796 fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
797 where
798 F: FnOnce(Self::Payload<'a>) -> R,
799 {
800 let Self(len) = self;
801 f(RepeatingPayload { len: *len - offset })
802 }
803 }
804}
805
806#[cfg(test)]
807mod test {
808 use alloc::vec::Vec;
809 use alloc::{format, vec};
810
811 use netstack3_base::FragmentedPayload;
812 use proptest::strategy::{Just, Strategy};
813 use proptest::test_runner::Config;
814 use proptest::{prop_assert, prop_assert_eq, proptest};
815 use proptest_support::failed_seeds_no_std;
816 use test_case::test_case;
817 use testutil::RingBuffer;
818
819 use super::*;
820 proptest! {
821 #![proptest_config(Config {
822 failure_persistence: failed_seeds_no_std!(
824 "cc f621ca7d3a2b108e0dc41f7169ad028f4329b79e90e73d5f68042519a9f63999",
825 "cc c449aebed201b4ec4f137f3c224f20325f4cfee0b7fd596d9285176b6d811aa9"
826 ),
827 ..Config::default()
828 })]
829
830 #[test]
831 fn ring_buffer_make_readable((mut rb, avail) in ring_buffer::with_written()) {
832 let old_storage = rb.storage.clone();
833 let old_head = rb.head;
834 let old_len = rb.limits().len;
835 rb.make_readable(avail, false);
836 let RingBuffer { storage, head, len } = rb;
838 prop_assert_eq!(len, old_len + avail);
839 prop_assert_eq!(head, old_head);
840 prop_assert_eq!(storage, old_storage);
841 }
842
843 #[test]
844 fn ring_buffer_write_at((mut rb, offset, data) in ring_buffer::with_offset_data()) {
845 let old_head = rb.head;
846 let old_len = rb.limits().len;
847 prop_assert_eq!(rb.write_at(offset, &&data[..]), data.len());
848 prop_assert_eq!(rb.head, old_head);
849 prop_assert_eq!(rb.limits().len, old_len);
850 for i in 0..data.len() {
851 let masked = (rb.head + rb.len + offset + i) % rb.storage.len();
852 prop_assert_eq!(rb.storage[masked], data[i]);
854 rb.storage[masked] = 0;
855 }
856 prop_assert_eq!(&rb.storage, &vec![0; rb.storage.len()]);
858 }
859
860 #[test]
861 fn ring_buffer_read_with((mut rb, expected, consume) in ring_buffer::with_read_data()) {
862 prop_assert_eq!(rb.limits().len, expected.len());
863 let nread = rb.read_with(|readable| {
864 assert!(readable.len() == 1 || readable.len() == 2);
865 let got = readable.concat();
866 assert_eq!(got, expected);
867 consume
868 });
869 prop_assert_eq!(nread, consume);
870 prop_assert_eq!(rb.limits().len, expected.len() - consume);
871 }
872
873 #[test]
874 fn ring_buffer_mark_read((mut rb, readable) in ring_buffer::with_readable()) {
875 const BYTE_TO_WRITE: u8 = 0x42;
876 let written = rb.writable_regions().into_iter().fold(0, |acc, slice| {
877 slice.fill(BYTE_TO_WRITE);
878 acc + slice.len()
879 });
880 let old_storage = rb.storage.clone();
881 let old_head = rb.head;
882 let old_len = rb.limits().len;
883
884 rb.mark_read(readable);
885 let new_writable = rb.writable_regions().into_iter().fold(Vec::new(), |mut acc, slice| {
886 acc.extend_from_slice(slice);
887 acc
888 });
889 for (i, x) in new_writable.iter().enumerate().take(written) {
890 prop_assert_eq!(*x, BYTE_TO_WRITE, "i={}, rb={:?}", i, rb);
891 }
892 prop_assert!(new_writable.len() >= written);
893
894 let RingBuffer { storage, head, len } = rb;
895 prop_assert_eq!(len, old_len - readable);
896 prop_assert_eq!(head, (old_head + readable) % old_storage.len());
897 prop_assert_eq!(storage, old_storage);
898 }
899
900 #[test]
901 fn ring_buffer_peek_with((mut rb, expected, offset) in ring_buffer::with_read_data()) {
902 prop_assert_eq!(rb.limits().len, expected.len());
903 rb.peek_with(offset, |readable| {
904 prop_assert_eq!(readable.to_vec(), &expected[offset..]);
905 Ok(())
906 })?;
907 prop_assert_eq!(rb.limits().len, expected.len());
908 }
909
910 #[test]
911 fn ring_buffer_writable_regions(mut rb in ring_buffer::arb_ring_buffer()) {
912 const BYTE_TO_WRITE: u8 = 0x42;
913 let writable_len = rb.writable_regions().into_iter().fold(0, |acc, slice| {
914 slice.fill(BYTE_TO_WRITE);
915 acc + slice.len()
916 });
917 let BufferLimits {len, capacity} = rb.limits();
918 prop_assert_eq!(writable_len + len, capacity);
919 for i in 0..capacity {
920 let expected = if i < len {
921 0
922 } else {
923 BYTE_TO_WRITE
924 };
925 let idx = (rb.head + i) % rb.storage.len();
926 prop_assert_eq!(rb.storage[idx], expected);
927 }
928 }
929 }
930
931 #[test_case([Range { start: 0, end: 0 }]
932 => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(0), generation: 0 })]
933 #[test_case([Range { start: 0, end: 10 }]
934 => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(10), generation: 1 })]
935 #[test_case([Range{ start: 10, end: 15 }, Range { start: 5, end: 10 }]
936 => Assembler {
937 outstanding: [
938 SeqRange::new(SeqNum::new(5)..SeqNum::new(15), 2).unwrap()
939 ].into_iter().collect(),
940 nxt: SeqNum::new(0),
941 generation: 2,
942 })
943 ]
944 #[test_case([Range{ start: 10, end: 15 }, Range { start: 0, end: 5 }, Range { start: 5, end: 10 }]
945 => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(15), generation: 3 })]
946 #[test_case([Range{ start: 10, end: 15 }, Range { start: 5, end: 10 }, Range { start: 0, end: 5 }]
947 => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(15), generation: 3 })]
948 #[test_case([Range{ start: 10, end: 15 }, Range { start: 10, end: 15 }, Range { start: 11, end: 12 }]
949 => Assembler {
950 outstanding: [
951 SeqRange::new(SeqNum::new(10)..SeqNum::new(15), 3).unwrap()
952 ].into_iter().collect(),
953 nxt: SeqNum::new(0), generation: 3 })]
954 fn assembler_examples(ops: impl IntoIterator<Item = Range<u32>>) -> Assembler {
955 let mut assembler = Assembler::new(SeqNum::new(0));
956 for Range { start, end } in ops.into_iter() {
957 let _advanced = assembler.insert(SeqNum::new(start)..SeqNum::new(end));
958 }
959 assembler
960 }
961
962 #[test_case(&[] => Vec::<Range<u32>>::new(); "empty")]
963 #[test_case(&[1..2] => vec![1..2]; "single")]
964 #[test_case(&[1..2, 3..4] => vec![3..4, 1..2]; "latest first")]
965 #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10]
966 => vec![9..10, 7..8, 5..6, 3..4]; "max len")]
967 #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10, 6..7]
968 => vec![5..8, 9..10, 3..4, 1..2]; "gap fill")]
969 #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10, 1..8]
970 => vec![1..8, 9..10]; "large gap fill")]
971 fn assembler_sack_blocks(ops: &[Range<u32>]) -> Vec<Range<u32>> {
972 let mut assembler = Assembler::new(SeqNum::new(0));
973 for Range { start, end } in ops {
974 let _: usize = assembler.insert(SeqNum::new(*start)..SeqNum::new(*end));
975 }
976 assembler
977 .sack_blocks()
978 .try_iter()
979 .map(|r| r.expect("invalid block").into_range_u32())
980 .collect()
981 }
982
983 #[test]
984 fn ring_buffer_wrap_around() {
986 const CAPACITY: usize = 16;
987 let mut rb = RingBuffer::new(CAPACITY);
988
989 const BUF_SIZE: usize = 10;
991 assert_eq!(rb.enqueue_data(&[0xAA; BUF_SIZE]), BUF_SIZE);
992 rb.peek_with(0, |payload| {
993 assert_eq!(payload, FragmentedPayload::new_contiguous(&[0xAA; BUF_SIZE]))
994 });
995 rb.mark_read(BUF_SIZE);
996
997 assert_eq!(rb.enqueue_data(&[0xBB; BUF_SIZE]), BUF_SIZE);
999 rb.peek_with(0, |payload| {
1000 assert_eq!(
1001 payload,
1002 FragmentedPayload::new([
1003 &[0xBB; (CAPACITY - BUF_SIZE)],
1004 &[0xBB; (BUF_SIZE * 2 - CAPACITY)]
1005 ])
1006 )
1007 });
1008 rb.mark_read(BUF_SIZE);
1011
1012 assert_eq!(rb.enqueue_data(&[0xCC; BUF_SIZE]), BUF_SIZE);
1014 rb.peek_with(0, |payload| {
1015 assert_eq!(payload, FragmentedPayload::new_contiguous(&[0xCC; BUF_SIZE]))
1016 });
1017
1018 let read = rb.read_with(|segments| {
1021 assert_eq!(segments, [[0xCC; BUF_SIZE]]);
1022 BUF_SIZE
1023 });
1024 assert_eq!(read, BUF_SIZE);
1025 }
1026
1027 #[test]
1028 fn ring_buffer_example() {
1029 let mut rb = RingBuffer::new(16);
1030 assert_eq!(rb.write_at(5, &"World".as_bytes()), 5);
1031 assert_eq!(rb.write_at(0, &"Hello".as_bytes()), 5);
1032 rb.make_readable(10, false);
1033 assert_eq!(
1034 rb.read_with(|readable| {
1035 assert_eq!(readable, &["HelloWorld".as_bytes()]);
1036 5
1037 }),
1038 5
1039 );
1040 assert_eq!(
1041 rb.read_with(|readable| {
1042 assert_eq!(readable, &["World".as_bytes()]);
1043 readable[0].len()
1044 }),
1045 5
1046 );
1047 assert_eq!(rb.write_at(0, &"HelloWorld".as_bytes()), 10);
1048 rb.make_readable(10, false);
1049 assert_eq!(
1050 rb.read_with(|readable| {
1051 assert_eq!(readable, &["HelloW".as_bytes(), "orld".as_bytes()]);
1052 6
1053 }),
1054 6
1055 );
1056 assert_eq!(rb.limits().len, 4);
1057 assert_eq!(
1058 rb.read_with(|readable| {
1059 assert_eq!(readable, &["orld".as_bytes()]);
1060 4
1061 }),
1062 4
1063 );
1064 assert_eq!(rb.limits().len, 0);
1065
1066 assert_eq!(rb.enqueue_data("Hello".as_bytes()), 5);
1067 assert_eq!(rb.limits().len, 5);
1068
1069 let () = rb.peek_with(3, |readable| {
1070 assert_eq!(readable.to_vec(), "lo".as_bytes());
1071 });
1072
1073 rb.mark_read(2);
1074
1075 let () = rb.peek_with(0, |readable| {
1076 assert_eq!(readable.to_vec(), "llo".as_bytes());
1077 });
1078 }
1079
1080 mod ring_buffer {
1081 use super::*;
1082 const MAX_CAP: usize = 32;
1085
1086 fn arb_ring_buffer_args() -> impl Strategy<Value = (usize, usize, usize)> {
1087 (1..=MAX_CAP).prop_flat_map(|cap| {
1090 let max_len = cap;
1091 (Just(cap), 0..cap, 0..=max_len)
1093 })
1094 }
1095
1096 pub(super) fn arb_ring_buffer() -> impl Strategy<Value = RingBuffer> {
1097 arb_ring_buffer_args().prop_map(|(cap, head, len)| RingBuffer {
1098 storage: vec![0; cap],
1099 head,
1100 len,
1101 })
1102 }
1103
1104 pub(super) fn with_readable() -> impl Strategy<Value = (RingBuffer, usize)> {
1106 arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
1107 (Just(RingBuffer { storage: vec![0; cap], head, len }), 0..=len)
1108 })
1109 }
1110
1111 pub(super) fn with_written() -> impl Strategy<Value = (RingBuffer, usize)> {
1113 arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
1114 let rb = RingBuffer { storage: vec![0; cap], head, len };
1115 let max_written = cap - len;
1116 (Just(rb), 0..=max_written)
1117 })
1118 }
1119
1120 pub(super) fn with_offset_data() -> impl Strategy<Value = (RingBuffer, usize, Vec<u8>)> {
1122 arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
1123 let writable_len = cap - len;
1124 (0..=writable_len).prop_flat_map(move |offset| {
1125 (0..=writable_len - offset).prop_flat_map(move |data_len| {
1126 (
1127 Just(RingBuffer { storage: vec![0; cap], head, len }),
1128 Just(offset),
1129 proptest::collection::vec(1..=u8::MAX, data_len),
1130 )
1131 })
1132 })
1133 })
1134 }
1135
1136 pub(super) fn with_read_data() -> impl Strategy<Value = (RingBuffer, Vec<u8>, usize)> {
1139 arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
1140 proptest::collection::vec(1..=u8::MAX, len).prop_flat_map(move |data| {
1141 let mut rb = RingBuffer { storage: vec![0; cap], head, len: 0 };
1143 assert_eq!(rb.write_at(0, &&data[..]), len);
1144 rb.make_readable(len, false);
1145 (Just(rb), Just(data), 0..=len)
1146 })
1147 })
1148 }
1149 }
1150}