1use netstack3_base::{Payload, SackBlock, SackBlocks, SeqNum};
10
11use arrayvec::ArrayVec;
12use core::fmt::Debug;
13use core::ops::Range;
14use packet::InnerPacketBuilder;
15
16use crate::internal::base::BufferSizes;
17use crate::internal::seq_ranges::{SeqRange, SeqRanges};
18
19pub trait Buffer: Debug + Sized {
21 fn capacity_range() -> (usize, usize);
23
24 fn limits(&self) -> BufferLimits;
29
30 fn target_capacity(&self) -> usize;
42
43 fn request_capacity(&mut self, size: usize);
49}
50
51pub trait ReceiveBuffer: Buffer {
53 fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize;
57
58 fn make_readable(&mut self, count: usize, has_outstanding: bool);
69}
70
71pub trait SendBuffer: Buffer {
73 type Payload<'a>: InnerPacketBuilder + Payload + Debug + 'a;
75
76 fn mark_read(&mut self, count: usize);
83
84 fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
92 where
93 F: FnOnce(Self::Payload<'a>) -> R;
94}
95
96#[derive(Eq, PartialEq, Debug, Copy, Clone)]
98pub struct BufferLimits {
99 pub capacity: usize,
101
102 pub len: usize,
104}
105
106#[derive(Debug)]
108#[cfg_attr(test, derive(PartialEq, Eq))]
109pub(super) struct Assembler {
110 nxt: SeqNum,
114 generation: usize,
118 outstanding: SeqRanges<usize>,
123}
124
125impl Assembler {
126 pub(super) fn new(nxt: SeqNum) -> Self {
128 Self { outstanding: SeqRanges::default(), generation: 0, nxt }
129 }
130
131 pub(super) fn nxt(&self) -> SeqNum {
133 self.nxt
134 }
135
136 pub(super) fn has_out_of_order(&self) -> bool {
139 !self.outstanding.is_empty()
140 }
141
142 pub(super) fn insert(&mut self, Range { start, end }: Range<SeqNum>) -> usize {
154 assert!(!start.after(end));
155 assert!(!start.before(self.nxt));
156 if start == end {
157 return 0;
158 }
159
160 let Self { outstanding, nxt, generation } = self;
161 *generation = *generation + 1;
162 outstanding.insert(start..end, *generation);
163
164 if let Some(advanced) = outstanding.pop_front_if(|r| r.range.start == *nxt) {
165 *nxt = advanced.range.end;
166 usize::try_from(advanced.range.end - advanced.range.start).unwrap()
169 } else {
170 0
171 }
172 }
173
174 pub(super) fn has_outstanding(&self) -> bool {
175 let Self { outstanding, nxt: _, generation: _ } = self;
176 !outstanding.is_empty()
177 }
178
179 pub(crate) fn sack_blocks(&self) -> SackBlocks {
195 let Self { nxt: _, generation: _, outstanding } = self;
196 if outstanding.is_empty() {
198 return SackBlocks::default();
199 }
200
201 let mut heap = ArrayVec::<&SeqRange<_>, { SackBlocks::MAX_BLOCKS }>::new();
202 for block in outstanding.iter() {
203 if heap.is_full() {
204 if heap.last().is_some_and(|l| l.meta < block.meta) {
205 let _: Option<_> = heap.pop();
207 } else {
208 continue;
211 }
212 }
213
214 heap.push(block);
215 heap.sort_by(|a, b| b.meta.cmp(&a.meta))
217 }
218
219 SackBlocks::from_iter(
220 heap.into_iter().map(|block| SackBlock(block.range.start, block.range.end)),
221 )
222 }
223}
224
225pub trait IntoBuffers<R: ReceiveBuffer, S: SendBuffer> {
228 fn into_buffers(self, buffer_sizes: BufferSizes) -> (R, S);
230}
231
232#[cfg(any(test, feature = "testutils"))]
233impl<R: Default + ReceiveBuffer, S: Default + SendBuffer> IntoBuffers<R, S> for () {
234 fn into_buffers(self, buffer_sizes: BufferSizes) -> (R, S) {
235 let BufferSizes { send: _, receive: _ } = buffer_sizes;
237 Default::default()
238 }
239}
240
241#[cfg(any(test, feature = "testutils"))]
242pub(crate) mod testutil {
243 use super::*;
244
245 use alloc::sync::Arc;
246 use alloc::vec;
247 use alloc::vec::Vec;
248 use core::cmp;
249
250 use either::Either;
251 use netstack3_base::sync::Mutex;
252 use netstack3_base::{FragmentedPayload, WindowSize};
253
254 use crate::internal::socket::accept_queue::ListenerNotifier;
255
256 #[cfg_attr(any(test, feature = "testutils"), derive(Clone, PartialEq, Eq))]
273 pub struct RingBuffer {
274 pub(super) storage: Vec<u8>,
275 pub(super) head: usize,
280 pub(super) len: usize,
285 }
286
287 impl Debug for RingBuffer {
288 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
289 let Self { storage, head, len } = self;
290 f.debug_struct("RingBuffer")
291 .field("storage (len, cap)", &(storage.len(), storage.capacity()))
292 .field("head", head)
293 .field("len", len)
294 .finish()
295 }
296 }
297
298 impl Default for RingBuffer {
299 fn default() -> Self {
300 Self::new(WindowSize::DEFAULT.into())
301 }
302 }
303
304 impl RingBuffer {
305 pub fn new(capacity: usize) -> Self {
307 Self { storage: vec![0; capacity], head: 0, len: 0 }
308 }
309
310 pub fn reset(&mut self) {
312 let Self { storage: _, head, len } = self;
313 *head = 0;
314 *len = 0;
315 }
316
317 fn with_readable<'a, F, R>(storage: &'a Vec<u8>, start: usize, len: usize, f: F) -> R
319 where
320 F: for<'b> FnOnce(&'b [&'a [u8]]) -> R,
321 {
322 let end = start + len;
324 if end > storage.len() {
325 let first_part = &storage[start..storage.len()];
326 let second_part = &storage[0..len - first_part.len()];
327 f(&[first_part, second_part][..])
328 } else {
329 let all_bytes = &storage[start..end];
330 f(&[all_bytes][..])
331 }
332 }
333
334 pub fn read_with<F>(&mut self, f: F) -> usize
342 where
343 F: for<'a, 'b> FnOnce(&'b [&'a [u8]]) -> usize,
344 {
345 let Self { storage, head, len } = self;
346 if storage.len() == 0 {
347 return f(&[&[]]);
348 }
349 let nread = RingBuffer::with_readable(storage, *head, *len, f);
350 assert!(nread <= *len);
351 *len -= nread;
352 *head = (*head + nread) % storage.len();
353 nread
354 }
355
356 pub fn writable_regions(&mut self) -> impl IntoIterator<Item = &mut [u8]> {
358 let BufferLimits { capacity, len } = self.limits();
359 let available = capacity - len;
360 let Self { storage, head, len } = self;
361
362 let mut write_start = *head + *len;
363 if write_start >= storage.len() {
364 write_start -= storage.len()
365 }
366 let write_end = write_start + available;
367 if write_end <= storage.len() {
368 Either::Left([&mut self.storage[write_start..write_end]].into_iter())
369 } else {
370 let (b1, b2) = self.storage[..].split_at_mut(write_start);
371 let b2_len = b2.len();
372 Either::Right([b2, &mut b1[..(available - b2_len)]].into_iter())
373 }
374 }
375 }
376
377 impl Buffer for RingBuffer {
378 fn capacity_range() -> (usize, usize) {
379 (16, 16 << 20)
382 }
383
384 fn limits(&self) -> BufferLimits {
385 let Self { storage, len, head: _ } = self;
386 let capacity = storage.len();
387 BufferLimits { len: *len, capacity }
388 }
389
390 fn target_capacity(&self) -> usize {
391 let Self { storage, len: _, head: _ } = self;
392 storage.len()
393 }
394
395 fn request_capacity(&mut self, size: usize) {
396 unimplemented!("capacity request for {size} not supported")
397 }
398 }
399
400 impl ReceiveBuffer for RingBuffer {
401 fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize {
402 let BufferLimits { capacity, len } = self.limits();
403 let available = capacity - len;
404 let Self { storage, head, len } = self;
405 if storage.len() == 0 {
406 return 0;
407 }
408
409 if offset > available {
410 return 0;
411 }
412 let start_at = (*head + *len + offset) % storage.len();
413 let to_write = cmp::min(data.len(), available);
414 let first_len = cmp::min(to_write, storage.len() - start_at);
416 data.partial_copy(0, &mut storage[start_at..start_at + first_len]);
417 if to_write > first_len {
420 data.partial_copy(first_len, &mut storage[0..to_write - first_len]);
421 }
422 to_write
423 }
424
425 fn make_readable(&mut self, count: usize, _has_outstanding: bool) {
426 let BufferLimits { capacity, len } = self.limits();
427 debug_assert!(count <= capacity - len);
428 self.len += count;
429 }
430 }
431
432 impl SendBuffer for RingBuffer {
433 type Payload<'a> = FragmentedPayload<'a, 2>;
434
435 fn mark_read(&mut self, count: usize) {
436 let Self { storage, head, len } = self;
437 assert!(count <= *len);
438 *len -= count;
439 *head = (*head + count) % storage.len();
440 }
441
442 fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
443 where
444 F: FnOnce(Self::Payload<'a>) -> R,
445 {
446 let Self { storage, head, len } = self;
447 if storage.len() == 0 {
448 return f(FragmentedPayload::new_empty());
449 }
450 assert!(offset <= *len);
451 RingBuffer::with_readable(
452 storage,
453 (*head + offset) % storage.len(),
454 *len - offset,
455 |readable| f(readable.into_iter().map(|x| *x).collect()),
456 )
457 }
458 }
459
460 impl RingBuffer {
461 pub(crate) fn enqueue_data(&mut self, data: &[u8]) -> usize {
465 let nwritten = self.write_at(0, &data);
466 self.make_readable(nwritten, false);
467 nwritten
468 }
469 }
470
471 impl Buffer for Arc<Mutex<RingBuffer>> {
472 fn capacity_range() -> (usize, usize) {
473 RingBuffer::capacity_range()
474 }
475
476 fn limits(&self) -> BufferLimits {
477 self.lock().limits()
478 }
479
480 fn target_capacity(&self) -> usize {
481 self.lock().target_capacity()
482 }
483
484 fn request_capacity(&mut self, size: usize) {
485 self.lock().request_capacity(size)
486 }
487 }
488
489 impl ReceiveBuffer for Arc<Mutex<RingBuffer>> {
490 fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize {
491 self.lock().write_at(offset, data)
492 }
493
494 fn make_readable(&mut self, count: usize, has_outstanding: bool) {
495 self.lock().make_readable(count, has_outstanding)
496 }
497 }
498
499 #[derive(Debug, Default)]
501 pub struct TestSendBuffer {
502 fake_stream: Arc<Mutex<Vec<u8>>>,
503 ring: RingBuffer,
504 }
505
506 impl TestSendBuffer {
507 pub fn new(fake_stream: Arc<Mutex<Vec<u8>>>, ring: RingBuffer) -> TestSendBuffer {
510 Self { fake_stream, ring }
511 }
512 }
513
514 impl Buffer for TestSendBuffer {
515 fn capacity_range() -> (usize, usize) {
516 let (min, max) = RingBuffer::capacity_range();
517 (min * 2, max * 2)
518 }
519
520 fn limits(&self) -> BufferLimits {
521 let Self { fake_stream, ring } = self;
522 let BufferLimits { capacity: ring_capacity, len: ring_len } = ring.limits();
523 let guard = fake_stream.lock();
524 let len = ring_len + guard.len();
525 let capacity = ring_capacity + guard.capacity();
526 BufferLimits { len, capacity }
527 }
528
529 fn target_capacity(&self) -> usize {
530 let Self { fake_stream: _, ring } = self;
531 ring.target_capacity()
532 }
533
534 fn request_capacity(&mut self, size: usize) {
535 let Self { fake_stream: _, ring } = self;
536 ring.request_capacity(size)
537 }
538 }
539
540 impl SendBuffer for TestSendBuffer {
541 type Payload<'a> = FragmentedPayload<'a, 2>;
542
543 fn mark_read(&mut self, count: usize) {
544 let Self { fake_stream: _, ring } = self;
545 ring.mark_read(count)
546 }
547
548 fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
549 where
550 F: FnOnce(Self::Payload<'a>) -> R,
551 {
552 let Self { fake_stream, ring } = self;
553 let mut guard = fake_stream.lock();
554 if !guard.is_empty() {
555 let BufferLimits { capacity, len } = ring.limits();
557 let len = (capacity - len).min(guard.len());
558 let rest = guard.split_off(len);
559 let first = core::mem::replace(&mut *guard, rest);
560 assert_eq!(ring.enqueue_data(&first[..]), len);
561 }
562 ring.peek_with(offset, f)
563 }
564 }
565
566 fn arc_mutex_eq<T: PartialEq>(a: &Arc<Mutex<T>>, b: &Arc<Mutex<T>>) -> bool {
567 if Arc::ptr_eq(a, b) {
568 return true;
569 }
570 (&*a.lock()) == (&*b.lock())
571 }
572
573 #[derive(Clone, Debug, Default)]
575 pub struct ClientBuffers {
576 pub receive: Arc<Mutex<RingBuffer>>,
578 pub send: Arc<Mutex<Vec<u8>>>,
580 }
581
582 impl PartialEq for ClientBuffers {
583 fn eq(&self, ClientBuffers { receive: other_receive, send: other_send }: &Self) -> bool {
584 let Self { receive, send } = self;
585 arc_mutex_eq(receive, other_receive) && arc_mutex_eq(send, other_send)
586 }
587 }
588
589 impl Eq for ClientBuffers {}
590
591 impl ClientBuffers {
592 pub fn new(buffer_sizes: BufferSizes) -> Self {
594 let BufferSizes { send, receive } = buffer_sizes;
595 Self {
596 receive: Arc::new(Mutex::new(RingBuffer::new(receive))),
597 send: Arc::new(Mutex::new(Vec::with_capacity(send))),
598 }
599 }
600 }
601
602 #[derive(Debug, Clone, Eq, PartialEq)]
604 #[allow(missing_docs)]
605 pub enum ProvidedBuffers {
606 Buffers(WriteBackClientBuffers),
607 NoBuffers,
608 }
609
610 impl Default for ProvidedBuffers {
611 fn default() -> Self {
612 Self::NoBuffers
613 }
614 }
615
616 impl From<WriteBackClientBuffers> for ProvidedBuffers {
617 fn from(buffers: WriteBackClientBuffers) -> Self {
618 ProvidedBuffers::Buffers(buffers)
619 }
620 }
621
622 impl From<ProvidedBuffers> for WriteBackClientBuffers {
623 fn from(extra: ProvidedBuffers) -> Self {
624 match extra {
625 ProvidedBuffers::Buffers(buffers) => buffers,
626 ProvidedBuffers::NoBuffers => Default::default(),
627 }
628 }
629 }
630
631 impl From<ProvidedBuffers> for () {
632 fn from(_: ProvidedBuffers) -> Self {
633 ()
634 }
635 }
636
637 impl From<()> for ProvidedBuffers {
638 fn from(_: ()) -> Self {
639 Default::default()
640 }
641 }
642
643 #[derive(Debug, Default, Clone)]
646 pub struct WriteBackClientBuffers(pub Arc<Mutex<Option<ClientBuffers>>>);
647
648 impl PartialEq for WriteBackClientBuffers {
649 fn eq(&self, Self(other): &Self) -> bool {
650 let Self(this) = self;
651 arc_mutex_eq(this, other)
652 }
653 }
654
655 impl Eq for WriteBackClientBuffers {}
656
657 impl IntoBuffers<Arc<Mutex<RingBuffer>>, TestSendBuffer> for ProvidedBuffers {
658 fn into_buffers(
659 self,
660 buffer_sizes: BufferSizes,
661 ) -> (Arc<Mutex<RingBuffer>>, TestSendBuffer) {
662 let buffers = ClientBuffers::new(buffer_sizes);
663 if let ProvidedBuffers::Buffers(b) = self {
664 *b.0.as_ref().lock() = Some(buffers.clone());
665 }
666 let ClientBuffers { receive, send } = buffers;
667 (receive, TestSendBuffer::new(send, Default::default()))
668 }
669 }
670
671 impl ListenerNotifier for ProvidedBuffers {
672 fn new_incoming_connections(&mut self, _: usize) {}
673 }
674}
675
676#[cfg(test)]
677mod test {
678 use alloc::vec::Vec;
679 use alloc::{format, vec};
680
681 use netstack3_base::FragmentedPayload;
682 use proptest::strategy::{Just, Strategy};
683 use proptest::test_runner::Config;
684 use proptest::{prop_assert, prop_assert_eq, proptest};
685 use proptest_support::failed_seeds_no_std;
686 use test_case::test_case;
687 use testutil::RingBuffer;
688
689 use super::*;
690 proptest! {
691 #![proptest_config(Config {
692 failure_persistence: failed_seeds_no_std!(
694 "cc f621ca7d3a2b108e0dc41f7169ad028f4329b79e90e73d5f68042519a9f63999",
695 "cc c449aebed201b4ec4f137f3c224f20325f4cfee0b7fd596d9285176b6d811aa9"
696 ),
697 ..Config::default()
698 })]
699
700 #[test]
701 fn ring_buffer_make_readable((mut rb, avail) in ring_buffer::with_written()) {
702 let old_storage = rb.storage.clone();
703 let old_head = rb.head;
704 let old_len = rb.limits().len;
705 rb.make_readable(avail, false);
706 let RingBuffer { storage, head, len } = rb;
708 prop_assert_eq!(len, old_len + avail);
709 prop_assert_eq!(head, old_head);
710 prop_assert_eq!(storage, old_storage);
711 }
712
713 #[test]
714 fn ring_buffer_write_at((mut rb, offset, data) in ring_buffer::with_offset_data()) {
715 let old_head = rb.head;
716 let old_len = rb.limits().len;
717 prop_assert_eq!(rb.write_at(offset, &&data[..]), data.len());
718 prop_assert_eq!(rb.head, old_head);
719 prop_assert_eq!(rb.limits().len, old_len);
720 for i in 0..data.len() {
721 let masked = (rb.head + rb.len + offset + i) % rb.storage.len();
722 prop_assert_eq!(rb.storage[masked], data[i]);
724 rb.storage[masked] = 0;
725 }
726 prop_assert_eq!(&rb.storage, &vec![0; rb.storage.len()]);
728 }
729
730 #[test]
731 fn ring_buffer_read_with((mut rb, expected, consume) in ring_buffer::with_read_data()) {
732 prop_assert_eq!(rb.limits().len, expected.len());
733 let nread = rb.read_with(|readable| {
734 assert!(readable.len() == 1 || readable.len() == 2);
735 let got = readable.concat();
736 assert_eq!(got, expected);
737 consume
738 });
739 prop_assert_eq!(nread, consume);
740 prop_assert_eq!(rb.limits().len, expected.len() - consume);
741 }
742
743 #[test]
744 fn ring_buffer_mark_read((mut rb, readable) in ring_buffer::with_readable()) {
745 const BYTE_TO_WRITE: u8 = 0x42;
746 let written = rb.writable_regions().into_iter().fold(0, |acc, slice| {
747 slice.fill(BYTE_TO_WRITE);
748 acc + slice.len()
749 });
750 let old_storage = rb.storage.clone();
751 let old_head = rb.head;
752 let old_len = rb.limits().len;
753
754 rb.mark_read(readable);
755 let new_writable = rb.writable_regions().into_iter().fold(Vec::new(), |mut acc, slice| {
756 acc.extend_from_slice(slice);
757 acc
758 });
759 for (i, x) in new_writable.iter().enumerate().take(written) {
760 prop_assert_eq!(*x, BYTE_TO_WRITE, "i={}, rb={:?}", i, rb);
761 }
762 prop_assert!(new_writable.len() >= written);
763
764 let RingBuffer { storage, head, len } = rb;
765 prop_assert_eq!(len, old_len - readable);
766 prop_assert_eq!(head, (old_head + readable) % old_storage.len());
767 prop_assert_eq!(storage, old_storage);
768 }
769
770 #[test]
771 fn ring_buffer_peek_with((mut rb, expected, offset) in ring_buffer::with_read_data()) {
772 prop_assert_eq!(rb.limits().len, expected.len());
773 rb.peek_with(offset, |readable| {
774 prop_assert_eq!(readable.to_vec(), &expected[offset..]);
775 Ok(())
776 })?;
777 prop_assert_eq!(rb.limits().len, expected.len());
778 }
779
780 #[test]
781 fn ring_buffer_writable_regions(mut rb in ring_buffer::arb_ring_buffer()) {
782 const BYTE_TO_WRITE: u8 = 0x42;
783 let writable_len = rb.writable_regions().into_iter().fold(0, |acc, slice| {
784 slice.fill(BYTE_TO_WRITE);
785 acc + slice.len()
786 });
787 let BufferLimits {len, capacity} = rb.limits();
788 prop_assert_eq!(writable_len + len, capacity);
789 for i in 0..capacity {
790 let expected = if i < len {
791 0
792 } else {
793 BYTE_TO_WRITE
794 };
795 let idx = (rb.head + i) % rb.storage.len();
796 prop_assert_eq!(rb.storage[idx], expected);
797 }
798 }
799 }
800
801 #[test_case([Range { start: 0, end: 0 }]
802 => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(0), generation: 0 })]
803 #[test_case([Range { start: 0, end: 10 }]
804 => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(10), generation: 1 })]
805 #[test_case([Range{ start: 10, end: 15 }, Range { start: 5, end: 10 }]
806 => Assembler {
807 outstanding: [
808 SeqRange{ range: Range { start: SeqNum::new(5), end: SeqNum::new(15) }, meta: 2 },
809 ].into_iter().collect(),
810 nxt: SeqNum::new(0),
811 generation: 2,
812 })
813 ]
814 #[test_case([Range{ start: 10, end: 15 }, Range { start: 0, end: 5 }, Range { start: 5, end: 10 }]
815 => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(15), generation: 3 })]
816 #[test_case([Range{ start: 10, end: 15 }, Range { start: 5, end: 10 }, Range { start: 0, end: 5 }]
817 => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(15), generation: 3 })]
818 #[test_case([Range{ start: 10, end: 15 }, Range { start: 10, end: 15 }, Range { start: 11, end: 12 }]
819 => Assembler {
820 outstanding: [
821 SeqRange{ range: Range { start: SeqNum::new(10), end: SeqNum::new(15) }, meta: 3 }
822 ].into_iter().collect(),
823 nxt: SeqNum::new(0), generation: 3 })]
824 fn assembler_examples(ops: impl IntoIterator<Item = Range<u32>>) -> Assembler {
825 let mut assembler = Assembler::new(SeqNum::new(0));
826 for Range { start, end } in ops.into_iter() {
827 let _advanced = assembler.insert(SeqNum::new(start)..SeqNum::new(end));
828 }
829 assembler
830 }
831
832 #[test_case(&[] => Vec::<Range<u32>>::new(); "empty")]
833 #[test_case(&[1..2] => vec![1..2]; "single")]
834 #[test_case(&[1..2, 3..4] => vec![3..4, 1..2]; "latest first")]
835 #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10]
836 => vec![9..10, 7..8, 5..6, 3..4]; "max len")]
837 #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10, 6..7]
838 => vec![5..8, 9..10, 3..4, 1..2]; "gap fill")]
839 #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10, 1..8]
840 => vec![1..8, 9..10]; "large gap fill")]
841 fn assembler_sack_blocks(ops: &[Range<u32>]) -> Vec<Range<u32>> {
842 let mut assembler = Assembler::new(SeqNum::new(0));
843 for Range { start, end } in ops {
844 let _: usize = assembler.insert(SeqNum::new(*start)..SeqNum::new(*end));
845 }
846 assembler
847 .sack_blocks()
848 .iter()
849 .map(|SackBlock(start, end)| Range { start: start.into(), end: end.into() })
850 .collect()
851 }
852
853 #[test]
854 fn ring_buffer_wrap_around() {
856 const CAPACITY: usize = 16;
857 let mut rb = RingBuffer::new(CAPACITY);
858
859 const BUF_SIZE: usize = 10;
861 assert_eq!(rb.enqueue_data(&[0xAA; BUF_SIZE]), BUF_SIZE);
862 rb.peek_with(0, |payload| {
863 assert_eq!(payload, FragmentedPayload::new_contiguous(&[0xAA; BUF_SIZE]))
864 });
865 rb.mark_read(BUF_SIZE);
866
867 assert_eq!(rb.enqueue_data(&[0xBB; BUF_SIZE]), BUF_SIZE);
869 rb.peek_with(0, |payload| {
870 assert_eq!(
871 payload,
872 FragmentedPayload::new([
873 &[0xBB; (CAPACITY - BUF_SIZE)],
874 &[0xBB; (BUF_SIZE * 2 - CAPACITY)]
875 ])
876 )
877 });
878 rb.mark_read(BUF_SIZE);
881
882 assert_eq!(rb.enqueue_data(&[0xCC; BUF_SIZE]), BUF_SIZE);
884 rb.peek_with(0, |payload| {
885 assert_eq!(payload, FragmentedPayload::new_contiguous(&[0xCC; BUF_SIZE]))
886 });
887
888 let read = rb.read_with(|segments| {
891 assert_eq!(segments, [[0xCC; BUF_SIZE]]);
892 BUF_SIZE
893 });
894 assert_eq!(read, BUF_SIZE);
895 }
896
897 #[test]
898 fn ring_buffer_example() {
899 let mut rb = RingBuffer::new(16);
900 assert_eq!(rb.write_at(5, &"World".as_bytes()), 5);
901 assert_eq!(rb.write_at(0, &"Hello".as_bytes()), 5);
902 rb.make_readable(10, false);
903 assert_eq!(
904 rb.read_with(|readable| {
905 assert_eq!(readable, &["HelloWorld".as_bytes()]);
906 5
907 }),
908 5
909 );
910 assert_eq!(
911 rb.read_with(|readable| {
912 assert_eq!(readable, &["World".as_bytes()]);
913 readable[0].len()
914 }),
915 5
916 );
917 assert_eq!(rb.write_at(0, &"HelloWorld".as_bytes()), 10);
918 rb.make_readable(10, false);
919 assert_eq!(
920 rb.read_with(|readable| {
921 assert_eq!(readable, &["HelloW".as_bytes(), "orld".as_bytes()]);
922 6
923 }),
924 6
925 );
926 assert_eq!(rb.limits().len, 4);
927 assert_eq!(
928 rb.read_with(|readable| {
929 assert_eq!(readable, &["orld".as_bytes()]);
930 4
931 }),
932 4
933 );
934 assert_eq!(rb.limits().len, 0);
935
936 assert_eq!(rb.enqueue_data("Hello".as_bytes()), 5);
937 assert_eq!(rb.limits().len, 5);
938
939 let () = rb.peek_with(3, |readable| {
940 assert_eq!(readable.to_vec(), "lo".as_bytes());
941 });
942
943 rb.mark_read(2);
944
945 let () = rb.peek_with(0, |readable| {
946 assert_eq!(readable.to_vec(), "llo".as_bytes());
947 });
948 }
949
950 mod ring_buffer {
951 use super::*;
952 const MAX_CAP: usize = 32;
955
956 fn arb_ring_buffer_args() -> impl Strategy<Value = (usize, usize, usize)> {
957 (1..=MAX_CAP).prop_flat_map(|cap| {
960 let max_len = cap;
961 (Just(cap), 0..cap, 0..=max_len)
963 })
964 }
965
966 pub(super) fn arb_ring_buffer() -> impl Strategy<Value = RingBuffer> {
967 arb_ring_buffer_args().prop_map(|(cap, head, len)| RingBuffer {
968 storage: vec![0; cap],
969 head,
970 len,
971 })
972 }
973
974 pub(super) fn with_readable() -> impl Strategy<Value = (RingBuffer, usize)> {
976 arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
977 (Just(RingBuffer { storage: vec![0; cap], head, len }), 0..=len)
978 })
979 }
980
981 pub(super) fn with_written() -> impl Strategy<Value = (RingBuffer, usize)> {
983 arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
984 let rb = RingBuffer { storage: vec![0; cap], head, len };
985 let max_written = cap - len;
986 (Just(rb), 0..=max_written)
987 })
988 }
989
990 pub(super) fn with_offset_data() -> impl Strategy<Value = (RingBuffer, usize, Vec<u8>)> {
992 arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
993 let writable_len = cap - len;
994 (0..=writable_len).prop_flat_map(move |offset| {
995 (0..=writable_len - offset).prop_flat_map(move |data_len| {
996 (
997 Just(RingBuffer { storage: vec![0; cap], head, len }),
998 Just(offset),
999 proptest::collection::vec(1..=u8::MAX, data_len),
1000 )
1001 })
1002 })
1003 })
1004 }
1005
1006 pub(super) fn with_read_data() -> impl Strategy<Value = (RingBuffer, Vec<u8>, usize)> {
1009 arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
1010 proptest::collection::vec(1..=u8::MAX, len).prop_flat_map(move |data| {
1011 let mut rb = RingBuffer { storage: vec![0; cap], head, len: 0 };
1013 assert_eq!(rb.write_at(0, &&data[..]), len);
1014 rb.make_readable(len, false);
1015 (Just(rb), Just(data), 0..=len)
1016 })
1017 })
1018 }
1019 }
1020}