netstack3_tcp/
buffer.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Defines the buffer traits needed by the TCP implementation. The traits
6//! in this module provide a common interface for platform-specific buffers
7//! used by TCP.
8
9use netstack3_base::{Payload, SackBlocks, SeqNum};
10
11use arrayvec::ArrayVec;
12use core::fmt::Debug;
13use core::ops::Range;
14use packet::InnerPacketBuilder;
15
16use crate::internal::base::BufferSizes;
17use crate::internal::seq_ranges::{SeqRange, SeqRanges};
18
19/// Common super trait for both sending and receiving buffer.
20pub trait Buffer: Debug + Sized {
21    /// Returns information about the number of bytes in the buffer.
22    ///
23    /// Returns a [`BufferLimits`] instance with information about the number of
24    /// bytes in the buffer.
25    fn limits(&self) -> BufferLimits;
26
27    /// Gets the target size of the buffer, in bytes.
28    ///
29    /// The target capacity of the buffer is distinct from the actual capacity
30    /// (returned by [`Buffer::capacity`]) in that the target capacity should
31    /// remain fixed unless requested otherwise, while the actual capacity can
32    /// vary with usage.
33    ///
34    /// For fixed-size buffers this should return the same result as calling
35    /// `self.capacity()`. For buffer types that support resizing, the
36    /// returned value can be different but should not change unless a resize
37    /// was requested.
38    fn target_capacity(&self) -> usize;
39
40    /// Requests that the buffer be resized to hold the given number of bytes.
41    ///
42    /// Calling this method suggests to the buffer that it should alter its size.
43    /// Implementations are free to impose constraints or ignore requests
44    /// entirely.
45    fn request_capacity(&mut self, size: usize);
46}
47
48/// A buffer supporting TCP receiving operations.
49pub trait ReceiveBuffer: Buffer {
50    /// Writes `data` into the buffer at `offset`.
51    ///
52    /// Returns the number of bytes written.
53    fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize;
54
55    /// Marks `count` bytes available for the application to read.
56    ///
57    /// `has_outstanding` informs the buffer if any bytes past `count` may have
58    /// been populated by out of order segments.
59    ///
60    /// # Panics
61    ///
62    /// Panics if the caller attempts to make more bytes readable than the
63    /// buffer has capacity for. That is, this method panics if `self.len() +
64    /// count > self.cap()`
65    fn make_readable(&mut self, count: usize, has_outstanding: bool);
66}
67
68/// A buffer supporting TCP sending operations.
69pub trait SendBuffer: Buffer {
70    /// The payload type given to `peek_with`.
71    type Payload<'a>: InnerPacketBuilder + Payload + Debug + 'a;
72
73    /// Removes `count` bytes from the beginning of the buffer as already read.
74    ///
75    /// # Panics
76    ///
77    /// Panics if more bytes are marked as read than are available, i.e.,
78    /// `count > self.len`.
79    fn mark_read(&mut self, count: usize);
80
81    /// Calls `f` with contiguous sequences of readable bytes in the buffer
82    /// without advancing the reading pointer.
83    ///
84    /// # Panics
85    ///
86    /// Panics if more bytes are peeked than are available, i.e.,
87    /// `offset > self.len`
88    fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
89    where
90        F: FnOnce(Self::Payload<'a>) -> R;
91}
92
93/// Information about the number of bytes in a [`Buffer`].
94#[derive(Eq, PartialEq, Debug, Copy, Clone)]
95pub struct BufferLimits {
96    /// The total number of bytes that the buffer can hold.
97    pub capacity: usize,
98
99    /// The number of readable bytes that the buffer currently holds.
100    pub len: usize,
101}
102
103/// Assembler for out-of-order segment data.
104#[derive(Debug)]
105#[cfg_attr(test, derive(PartialEq, Eq))]
106pub(super) struct Assembler {
107    // `nxt` is the next sequence number to be expected. It should be before
108    // any sequnce number of the out-of-order sequence numbers we keep track
109    // of below.
110    nxt: SeqNum,
111    // Keeps track of the "age" of segments in the outstanding queue. Every time
112    // a segment is inserted, the generation increases. This allows
113    // RFC-compliant ordering of selective ACK blocks.
114    generation: usize,
115    // Holds all the sequence number ranges which we have already received.
116    // These ranges are sorted and should have a gap of at least 1 byte
117    // between any consecutive two. These ranges should only be after `nxt`.
118    // Each range is tagged with the generation that last modified it.
119    outstanding: SeqRanges<usize>,
120}
121
122impl Assembler {
123    /// Creates a new assembler.
124    pub(super) fn new(nxt: SeqNum) -> Self {
125        Self { outstanding: SeqRanges::default(), generation: 0, nxt }
126    }
127
128    /// Returns the next sequence number expected to be received.
129    pub(super) fn nxt(&self) -> SeqNum {
130        self.nxt
131    }
132
133    /// Returns whether there are out-of-order segments waiting to be
134    /// acknowledged.
135    pub(super) fn has_out_of_order(&self) -> bool {
136        !self.outstanding.is_empty()
137    }
138
139    /// Inserts a received segment.
140    ///
141    /// The newly added segment will be merged with as many existing ones as
142    /// possible and `nxt` will be advanced to the highest ACK number possible.
143    ///
144    /// Returns number of bytes that should be available for the application
145    /// to consume.
146    ///
147    /// # Panics
148    ///
149    /// Panics if `start` is after `end` or if `start` is before `self.nxt`.
150    pub(super) fn insert(&mut self, Range { start, end }: Range<SeqNum>) -> usize {
151        assert!(!start.after(end));
152        assert!(!start.before(self.nxt));
153        if start == end {
154            return 0;
155        }
156
157        let Self { outstanding, nxt, generation } = self;
158        *generation = *generation + 1;
159        let _: bool = outstanding.insert(start..end, *generation);
160
161        if let Some(advanced) = outstanding.pop_front_if(|r| r.start() == *nxt) {
162            *nxt = advanced.end();
163            usize::try_from(advanced.len()).unwrap()
164        } else {
165            0
166        }
167    }
168
169    pub(super) fn has_outstanding(&self) -> bool {
170        let Self { outstanding, nxt: _, generation: _ } = self;
171        !outstanding.is_empty()
172    }
173
174    /// Returns the current outstanding selective ack blocks in the assembler.
175    ///
176    /// The returned blocks are sorted according to [RFC 2018 section 4]:
177    ///
178    /// * The first SACK block (i.e., the one immediately following the kind and
179    ///   length fields in the option) MUST specify the contiguous block of data
180    ///   containing the segment which triggered this ACK. [...]
181    /// * The SACK option SHOULD be filled out by repeating the most recently
182    ///   reported SACK blocks [...]
183    ///
184    /// This is achieved by always returning the blocks that were most recently
185    /// changed by incoming segments.
186    ///
187    /// [RFC 2018 section 4]:
188    ///     https://datatracker.ietf.org/doc/html/rfc2018#section-4
189    pub(crate) fn sack_blocks(&self, size_limits: SackBlockSizeLimiters) -> SackBlocks {
190        let Self { nxt: _, generation: _, outstanding } = self;
191        // Fast exit, no outstanding blocks.
192        if outstanding.is_empty() {
193            return SackBlocks::default();
194        }
195
196        // Create a heap with storage to hold the maximum allowed number of
197        // blocks, but the number of blocks allowed for this connection may be
198        // lower based on the other TCP options in use.
199        let mut heap = ArrayVec::<&SeqRange<_>, { SackBlocks::MAX_BLOCKS }>::new();
200        let num_blocks_allowed = size_limits.num_blocks_allowed();
201
202        for block in outstanding.iter() {
203            if heap.len() >= num_blocks_allowed {
204                if heap.last().is_some_and(|l| l.meta() < block.meta()) {
205                    // New block is later than the earliest block in the heap.
206                    let _: Option<_> = heap.pop();
207                } else {
208                    // New block is earlier than the earliest block in the heap,
209                    // pass.
210                    continue;
211                }
212            }
213
214            heap.push(block);
215            // Sort heap larger generation to lower.
216            heap.sort_by(|a, b| b.meta().cmp(&a.meta()))
217        }
218
219        SackBlocks::from_iter(heap.into_iter().map(|block| block.to_sack_block()))
220    }
221}
222
223pub(crate) struct SackBlockSizeLimiters {
224    pub(crate) timestamp_enabled: bool,
225}
226
227impl SackBlockSizeLimiters {
228    fn num_blocks_allowed(&self) -> usize {
229        let Self { timestamp_enabled } = self;
230        if *timestamp_enabled {
231            SackBlocks::MAX_BLOCKS_WITH_TIMESTAMP
232        } else {
233            SackBlocks::MAX_BLOCKS
234        }
235    }
236}
237
238/// A conversion trait that converts the object that Bindings give us into a
239/// pair of receive and send buffers.
240pub trait IntoBuffers<R: ReceiveBuffer, S: SendBuffer> {
241    /// Converts to receive and send buffers.
242    fn into_buffers(self, buffer_sizes: BufferSizes) -> (R, S);
243}
244
245#[cfg(any(test, feature = "testutils"))]
246impl<R: Default + ReceiveBuffer, S: Default + SendBuffer> IntoBuffers<R, S> for () {
247    fn into_buffers(self, buffer_sizes: BufferSizes) -> (R, S) {
248        // Ignore buffer sizes since this is a test-only impl.
249        let BufferSizes { send: _, receive: _ } = buffer_sizes;
250        Default::default()
251    }
252}
253
254#[cfg(any(test, feature = "testutils"))]
255pub(crate) mod testutil {
256    use super::*;
257
258    use alloc::sync::Arc;
259    use alloc::vec;
260    use alloc::vec::Vec;
261    use core::cmp;
262
263    use either::Either;
264    use netstack3_base::sync::Mutex;
265    use netstack3_base::{FragmentedPayload, PayloadLen, WindowSize};
266
267    use crate::internal::socket::accept_queue::ListenerNotifier;
268
269    /// A circular buffer implementation.
270    ///
271    /// A [`RingBuffer`] holds a logically contiguous ring of memory in three
272    /// regions:
273    ///
274    /// - *readable*: memory is available for reading and not for writing,
275    /// - *writable*: memory that is available for writing and not for reading,
276    /// - *reserved*: memory that was read from and is no longer available
277    ///   for reading or for writing.
278    ///
279    /// Zero or more of these regions can be empty, and a region of memory can
280    /// transition from one to another in a few different ways:
281    ///
282    /// *Readable* memory, once read, becomes writable.
283    ///
284    /// *Writable* memory, once marked as such, becomes readable.
285    #[derive(Clone, PartialEq, Eq)]
286    pub struct RingBuffer {
287        pub(super) storage: Vec<u8>,
288        /// The index where the reader starts to read.
289        ///
290        /// Maintains the invariant that `head < storage.len()` by wrapping
291        /// around to 0 as needed.
292        pub(super) head: usize,
293        /// The amount of readable data in `storage`.
294        ///
295        /// Anything between [head, head+len) is readable. This will never exceed
296        /// `storage.len()`.
297        pub(super) len: usize,
298    }
299
300    impl Debug for RingBuffer {
301        fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
302            let Self { storage, head, len } = self;
303            f.debug_struct("RingBuffer")
304                .field("storage (len, cap)", &(storage.len(), storage.capacity()))
305                .field("head", head)
306                .field("len", len)
307                .finish()
308        }
309    }
310
311    impl Default for RingBuffer {
312        fn default() -> Self {
313            Self::new(WindowSize::DEFAULT.into())
314        }
315    }
316
317    impl RingBuffer {
318        /// Creates a new `RingBuffer`.
319        pub fn new(capacity: usize) -> Self {
320            Self { storage: vec![0; capacity], head: 0, len: 0 }
321        }
322
323        /// Resets the buffer to be entirely unwritten.
324        pub fn reset(&mut self) {
325            let Self { storage: _, head, len } = self;
326            *head = 0;
327            *len = 0;
328        }
329
330        /// Calls `f` on the contiguous sequences from `start` up to `len` bytes.
331        fn with_readable<'a, F, R>(storage: &'a Vec<u8>, start: usize, len: usize, f: F) -> R
332        where
333            F: for<'b> FnOnce(&'b [&'a [u8]]) -> R,
334        {
335            // Don't read past the end of storage.
336            let end = start + len;
337            if end > storage.len() {
338                let first_part = &storage[start..storage.len()];
339                let second_part = &storage[0..len - first_part.len()];
340                f(&[first_part, second_part][..])
341            } else {
342                let all_bytes = &storage[start..end];
343                f(&[all_bytes][..])
344            }
345        }
346
347        /// Calls `f` with contiguous sequences of readable bytes in the buffer and
348        /// discards the amount of bytes returned by `f`.
349        ///
350        /// # Panics
351        ///
352        /// Panics if the closure wants to discard more bytes than possible, i.e.,
353        /// the value returned by `f` is greater than `self.len()`.
354        pub fn read_with<F>(&mut self, f: F) -> usize
355        where
356            F: for<'a, 'b> FnOnce(&'b [&'a [u8]]) -> usize,
357        {
358            let Self { storage, head, len } = self;
359            if storage.len() == 0 {
360                return f(&[&[]]);
361            }
362            let nread = RingBuffer::with_readable(storage, *head, *len, f);
363            assert!(nread <= *len);
364            *len -= nread;
365            *head = (*head + nread) % storage.len();
366            nread
367        }
368
369        /// Returns the writable regions of the [`RingBuffer`].
370        pub fn writable_regions(&mut self) -> impl IntoIterator<Item = &mut [u8]> {
371            let BufferLimits { capacity, len } = self.limits();
372            let available = capacity - len;
373            let Self { storage, head, len } = self;
374
375            let mut write_start = *head + *len;
376            if write_start >= storage.len() {
377                write_start -= storage.len()
378            }
379            let write_end = write_start + available;
380            if write_end <= storage.len() {
381                Either::Left([&mut self.storage[write_start..write_end]].into_iter())
382            } else {
383                let (b1, b2) = self.storage[..].split_at_mut(write_start);
384                let b2_len = b2.len();
385                Either::Right([b2, &mut b1[..(available - b2_len)]].into_iter())
386            }
387        }
388    }
389
390    impl Buffer for RingBuffer {
391        fn limits(&self) -> BufferLimits {
392            let Self { storage, len, head: _ } = self;
393            let capacity = storage.len();
394            BufferLimits { len: *len, capacity }
395        }
396
397        fn target_capacity(&self) -> usize {
398            let Self { storage, len: _, head: _ } = self;
399            storage.len()
400        }
401
402        fn request_capacity(&mut self, size: usize) {
403            unimplemented!("capacity request for {size} not supported")
404        }
405    }
406
407    impl ReceiveBuffer for RingBuffer {
408        fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize {
409            let BufferLimits { capacity, len } = self.limits();
410            let available = capacity - len;
411            let Self { storage, head, len } = self;
412            if storage.len() == 0 {
413                return 0;
414            }
415
416            if offset > available {
417                return 0;
418            }
419            let start_at = (*head + *len + offset) % storage.len();
420            let to_write = cmp::min(data.len(), available);
421            // Write the first part of the payload.
422            let first_len = cmp::min(to_write, storage.len() - start_at);
423            data.partial_copy(0, &mut storage[start_at..start_at + first_len]);
424            // If we have more to write, wrap around and start from the beginning
425            // of the storage.
426            if to_write > first_len {
427                data.partial_copy(first_len, &mut storage[0..to_write - first_len]);
428            }
429            to_write
430        }
431
432        fn make_readable(&mut self, count: usize, _has_outstanding: bool) {
433            let BufferLimits { capacity, len } = self.limits();
434            debug_assert!(count <= capacity - len);
435            self.len += count;
436        }
437    }
438
439    impl SendBuffer for RingBuffer {
440        type Payload<'a> = FragmentedPayload<'a, 2>;
441
442        fn mark_read(&mut self, count: usize) {
443            let Self { storage, head, len } = self;
444            assert!(count <= *len);
445            *len -= count;
446            *head = (*head + count) % storage.len();
447        }
448
449        fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
450        where
451            F: FnOnce(Self::Payload<'a>) -> R,
452        {
453            let Self { storage, head, len } = self;
454            if storage.len() == 0 {
455                return f(FragmentedPayload::new_empty());
456            }
457            assert!(offset <= *len);
458            RingBuffer::with_readable(
459                storage,
460                (*head + offset) % storage.len(),
461                *len - offset,
462                |readable| f(readable.into_iter().map(|x| *x).collect()),
463            )
464        }
465    }
466
467    impl RingBuffer {
468        /// Enqueues as much of `data` as possible to the end of the buffer.
469        ///
470        /// Returns the number of bytes actually queued.
471        pub(crate) fn enqueue_data(&mut self, data: &[u8]) -> usize {
472            let nwritten = self.write_at(0, &data);
473            self.make_readable(nwritten, false);
474            nwritten
475        }
476    }
477
478    impl Buffer for Arc<Mutex<RingBuffer>> {
479        fn limits(&self) -> BufferLimits {
480            self.lock().limits()
481        }
482
483        fn target_capacity(&self) -> usize {
484            self.lock().target_capacity()
485        }
486
487        fn request_capacity(&mut self, size: usize) {
488            self.lock().request_capacity(size)
489        }
490    }
491
492    impl ReceiveBuffer for Arc<Mutex<RingBuffer>> {
493        fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize {
494            self.lock().write_at(offset, data)
495        }
496
497        fn make_readable(&mut self, count: usize, has_outstanding: bool) {
498            self.lock().make_readable(count, has_outstanding)
499        }
500    }
501
502    /// An implementation of [`SendBuffer`] for tests.
503    #[derive(Debug, Default)]
504    pub struct TestSendBuffer {
505        fake_stream: Arc<Mutex<Vec<u8>>>,
506        ring: RingBuffer,
507    }
508
509    impl TestSendBuffer {
510        /// Creates a new `TestSendBuffer` with a backing shared vec and a
511        /// helper ring buffer.
512        pub fn new(fake_stream: Arc<Mutex<Vec<u8>>>, ring: RingBuffer) -> TestSendBuffer {
513            Self { fake_stream, ring }
514        }
515
516        /// Enqueues data into the buffer.
517        pub fn enqueue_data(&mut self, data: &[u8]) {
518            self.fake_stream.lock().extend_from_slice(data);
519        }
520    }
521
522    impl Buffer for TestSendBuffer {
523        fn limits(&self) -> BufferLimits {
524            let Self { fake_stream, ring } = self;
525            let BufferLimits { capacity: ring_capacity, len: ring_len } = ring.limits();
526            let guard = fake_stream.lock();
527            let len = ring_len + guard.len();
528            let capacity = ring_capacity + guard.capacity();
529            BufferLimits { len, capacity }
530        }
531
532        fn target_capacity(&self) -> usize {
533            let Self { fake_stream: _, ring } = self;
534            ring.target_capacity()
535        }
536
537        fn request_capacity(&mut self, size: usize) {
538            let Self { fake_stream: _, ring } = self;
539            ring.request_capacity(size)
540        }
541    }
542
543    impl SendBuffer for TestSendBuffer {
544        type Payload<'a> = FragmentedPayload<'a, 2>;
545
546        fn mark_read(&mut self, count: usize) {
547            let Self { fake_stream: _, ring } = self;
548            ring.mark_read(count)
549        }
550
551        fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
552        where
553            F: FnOnce(Self::Payload<'a>) -> R,
554        {
555            let Self { fake_stream, ring } = self;
556            let mut guard = fake_stream.lock();
557            if !guard.is_empty() {
558                // Pull from the fake stream into the ring if there is capacity.
559                let BufferLimits { capacity, len } = ring.limits();
560                let len = (capacity - len).min(guard.len());
561                let rest = guard.split_off(len);
562                let first = core::mem::replace(&mut *guard, rest);
563                assert_eq!(ring.enqueue_data(&first[..]), len);
564            }
565            ring.peek_with(offset, f)
566        }
567    }
568
569    fn arc_mutex_eq<T: PartialEq>(a: &Arc<Mutex<T>>, b: &Arc<Mutex<T>>) -> bool {
570        if Arc::ptr_eq(a, b) {
571            return true;
572        }
573        (&*a.lock()) == (&*b.lock())
574    }
575
576    /// A fake implementation of client-side TCP buffers.
577    #[derive(Clone, Debug, Default)]
578    pub struct ClientBuffers {
579        /// Receive buffer shared with core TCP implementation.
580        pub receive: Arc<Mutex<RingBuffer>>,
581        /// Send buffer shared with core TCP implementation.
582        pub send: Arc<Mutex<Vec<u8>>>,
583    }
584
585    impl PartialEq for ClientBuffers {
586        fn eq(&self, ClientBuffers { receive: other_receive, send: other_send }: &Self) -> bool {
587            let Self { receive, send } = self;
588            arc_mutex_eq(receive, other_receive) && arc_mutex_eq(send, other_send)
589        }
590    }
591
592    impl Eq for ClientBuffers {}
593
594    impl ClientBuffers {
595        /// Creates new a `ClientBuffers` with `buffer_sizes`.
596        pub fn new(buffer_sizes: BufferSizes) -> Self {
597            let BufferSizes { send, receive } = buffer_sizes;
598            Self {
599                receive: Arc::new(Mutex::new(RingBuffer::new(receive))),
600                send: Arc::new(Mutex::new(Vec::with_capacity(send))),
601            }
602        }
603    }
604
605    /// A fake implementation of bindings buffers for TCP.
606    #[derive(Debug, Clone, Eq, PartialEq)]
607    #[allow(missing_docs)]
608    pub enum ProvidedBuffers {
609        Buffers(WriteBackClientBuffers),
610        NoBuffers,
611    }
612
613    impl Default for ProvidedBuffers {
614        fn default() -> Self {
615            Self::NoBuffers
616        }
617    }
618
619    impl From<WriteBackClientBuffers> for ProvidedBuffers {
620        fn from(buffers: WriteBackClientBuffers) -> Self {
621            ProvidedBuffers::Buffers(buffers)
622        }
623    }
624
625    impl From<ProvidedBuffers> for WriteBackClientBuffers {
626        fn from(extra: ProvidedBuffers) -> Self {
627            match extra {
628                ProvidedBuffers::Buffers(buffers) => buffers,
629                ProvidedBuffers::NoBuffers => Default::default(),
630            }
631        }
632    }
633
634    impl From<ProvidedBuffers> for () {
635        fn from(_: ProvidedBuffers) -> Self {
636            ()
637        }
638    }
639
640    impl From<()> for ProvidedBuffers {
641        fn from(_: ()) -> Self {
642            Default::default()
643        }
644    }
645
646    /// The variant of [`ProvidedBuffers`] that provides observing the data
647    /// sent/received to TCP sockets.
648    #[derive(Debug, Default, Clone)]
649    pub struct WriteBackClientBuffers(pub Arc<Mutex<Option<ClientBuffers>>>);
650
651    impl PartialEq for WriteBackClientBuffers {
652        fn eq(&self, Self(other): &Self) -> bool {
653            let Self(this) = self;
654            arc_mutex_eq(this, other)
655        }
656    }
657
658    impl Eq for WriteBackClientBuffers {}
659
660    impl IntoBuffers<Arc<Mutex<RingBuffer>>, TestSendBuffer> for ProvidedBuffers {
661        fn into_buffers(
662            self,
663            buffer_sizes: BufferSizes,
664        ) -> (Arc<Mutex<RingBuffer>>, TestSendBuffer) {
665            let buffers = ClientBuffers::new(buffer_sizes);
666            if let ProvidedBuffers::Buffers(b) = self {
667                *b.0.as_ref().lock() = Some(buffers.clone());
668            }
669            let ClientBuffers { receive, send } = buffers;
670            (receive, TestSendBuffer::new(send, Default::default()))
671        }
672    }
673
674    impl ListenerNotifier for ProvidedBuffers {
675        fn new_incoming_connections(&mut self, _: usize) {}
676    }
677
678    #[derive(Debug)]
679    pub struct RepeatingPayload {
680        len: usize,
681    }
682
683    impl RepeatingPayload {
684        const REPEATING_BYTE: u8 = 0xAA;
685    }
686
687    impl PayloadLen for RepeatingPayload {
688        fn len(&self) -> usize {
689            self.len
690        }
691    }
692
693    impl Payload for RepeatingPayload {
694        fn slice(self, range: Range<u32>) -> Self {
695            Self { len: usize::try_from(range.end - range.start).unwrap() }
696        }
697
698        fn partial_copy(&self, offset: usize, dst: &mut [u8]) {
699            assert!(offset < self.len);
700            assert_eq!(dst.len() - offset, self.len);
701            dst.fill(Self::REPEATING_BYTE);
702        }
703
704        fn partial_copy_uninit(&self, offset: usize, dst: &mut [core::mem::MaybeUninit<u8>]) {
705            assert!(offset < self.len);
706            assert_eq!(dst.len() - offset, self.len);
707            dst.fill(core::mem::MaybeUninit::new(Self::REPEATING_BYTE));
708        }
709
710        fn new_empty() -> Self {
711            Self { len: 0 }
712        }
713    }
714
715    impl InnerPacketBuilder for RepeatingPayload {
716        fn bytes_len(&self) -> usize {
717            self.len
718        }
719
720        fn serialize(&self, buffer: &mut [u8]) {
721            buffer.fill(Self::REPEATING_BYTE)
722        }
723    }
724
725    /// A buffer that always has [`usize::MAX`] bytes available to write.
726    #[derive(Default, Debug, Eq, PartialEq)]
727    pub struct InfiniteSendBuffer;
728
729    impl InfiniteSendBuffer {
730        const LEN: usize = usize::MAX as usize;
731    }
732
733    impl Buffer for InfiniteSendBuffer {
734        fn limits(&self) -> BufferLimits {
735            BufferLimits { capacity: Self::LEN, len: Self::LEN }
736        }
737
738        fn target_capacity(&self) -> usize {
739            Self::LEN
740        }
741
742        fn request_capacity(&mut self, size: usize) {
743            unimplemented!("can't change capacity of infinite send buffer to {size}")
744        }
745    }
746
747    impl SendBuffer for InfiniteSendBuffer {
748        type Payload<'a> = RepeatingPayload;
749
750        fn mark_read(&mut self, _count: usize) {}
751
752        fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
753        where
754            F: FnOnce(Self::Payload<'a>) -> R,
755        {
756            f(RepeatingPayload { len: Self::LEN - offset })
757        }
758    }
759
760    /// A buffer that has a controllable amount of [`RepeatingPayload`] bytes
761    /// available to read.
762    #[derive(Default, Debug, Eq, PartialEq)]
763    pub struct RepeatingSendBuffer(usize);
764
765    impl RepeatingSendBuffer {
766        /// Creates a new buffer with the provided `length`.
767        pub fn new(length: usize) -> Self {
768            Self(length)
769        }
770    }
771
772    impl Buffer for RepeatingSendBuffer {
773        fn limits(&self) -> BufferLimits {
774            let Self(len) = self;
775            BufferLimits { capacity: usize::MAX, len: *len }
776        }
777
778        fn target_capacity(&self) -> usize {
779            usize::MAX
780        }
781
782        fn request_capacity(&mut self, size: usize) {
783            unimplemented!("can't change capacity of repeatable send buffer to {size}")
784        }
785    }
786
787    impl SendBuffer for RepeatingSendBuffer {
788        type Payload<'a> = RepeatingPayload;
789
790        fn mark_read(&mut self, count: usize) {
791            let Self(len) = self;
792            *len = *len - count;
793        }
794
795        fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
796        where
797            F: FnOnce(Self::Payload<'a>) -> R,
798        {
799            let Self(len) = self;
800            f(RepeatingPayload { len: *len - offset })
801        }
802    }
803}
804
805#[cfg(test)]
806mod test {
807    use alloc::vec::Vec;
808    use alloc::{format, vec};
809
810    use netstack3_base::FragmentedPayload;
811    use proptest::strategy::{Just, Strategy};
812    use proptest::test_runner::Config;
813    use proptest::{prop_assert, prop_assert_eq, proptest};
814    use proptest_support::failed_seeds_no_std;
815    use test_case::test_case;
816    use testutil::RingBuffer;
817
818    use super::*;
819    proptest! {
820        #![proptest_config(Config {
821            // Add all failed seeds here.
822            failure_persistence: failed_seeds_no_std!(
823                "cc f621ca7d3a2b108e0dc41f7169ad028f4329b79e90e73d5f68042519a9f63999",
824                "cc c449aebed201b4ec4f137f3c224f20325f4cfee0b7fd596d9285176b6d811aa9"
825            ),
826            ..Config::default()
827        })]
828
829        #[test]
830        fn ring_buffer_make_readable((mut rb, avail) in ring_buffer::with_written()) {
831            let old_storage = rb.storage.clone();
832            let old_head = rb.head;
833            let old_len = rb.limits().len;
834            rb.make_readable(avail, false);
835            // Assert that length is updated but everything else is unchanged.
836            let RingBuffer { storage, head, len } = rb;
837            prop_assert_eq!(len, old_len + avail);
838            prop_assert_eq!(head, old_head);
839            prop_assert_eq!(storage, old_storage);
840        }
841
842        #[test]
843        fn ring_buffer_write_at((mut rb, offset, data) in ring_buffer::with_offset_data()) {
844            let old_head = rb.head;
845            let old_len = rb.limits().len;
846            prop_assert_eq!(rb.write_at(offset, &&data[..]), data.len());
847            prop_assert_eq!(rb.head, old_head);
848            prop_assert_eq!(rb.limits().len, old_len);
849            for i in 0..data.len() {
850                let masked = (rb.head + rb.len + offset + i) % rb.storage.len();
851                // Make sure that data are written.
852                prop_assert_eq!(rb.storage[masked], data[i]);
853                rb.storage[masked] = 0;
854            }
855            // And the other parts of the storage are untouched.
856            prop_assert_eq!(&rb.storage, &vec![0; rb.storage.len()]);
857        }
858
859        #[test]
860        fn ring_buffer_read_with((mut rb, expected, consume) in ring_buffer::with_read_data()) {
861            prop_assert_eq!(rb.limits().len, expected.len());
862            let nread = rb.read_with(|readable| {
863                assert!(readable.len() == 1 || readable.len() == 2);
864                let got = readable.concat();
865                assert_eq!(got, expected);
866                consume
867            });
868            prop_assert_eq!(nread, consume);
869            prop_assert_eq!(rb.limits().len, expected.len() - consume);
870        }
871
872        #[test]
873        fn ring_buffer_mark_read((mut rb, readable) in ring_buffer::with_readable()) {
874            const BYTE_TO_WRITE: u8 = 0x42;
875            let written = rb.writable_regions().into_iter().fold(0, |acc, slice| {
876                slice.fill(BYTE_TO_WRITE);
877                acc + slice.len()
878            });
879            let old_storage = rb.storage.clone();
880            let old_head = rb.head;
881            let old_len = rb.limits().len;
882
883            rb.mark_read(readable);
884            let new_writable = rb.writable_regions().into_iter().fold(Vec::new(), |mut acc, slice| {
885                acc.extend_from_slice(slice);
886                acc
887            });
888            for (i, x) in new_writable.iter().enumerate().take(written) {
889                prop_assert_eq!(*x, BYTE_TO_WRITE, "i={}, rb={:?}", i, rb);
890            }
891            prop_assert!(new_writable.len() >= written);
892
893            let RingBuffer { storage, head, len } = rb;
894            prop_assert_eq!(len, old_len - readable);
895            prop_assert_eq!(head, (old_head + readable) % old_storage.len());
896            prop_assert_eq!(storage, old_storage);
897        }
898
899        #[test]
900        fn ring_buffer_peek_with((mut rb, expected, offset) in ring_buffer::with_read_data()) {
901            prop_assert_eq!(rb.limits().len, expected.len());
902            rb.peek_with(offset, |readable| {
903                prop_assert_eq!(readable.to_vec(), &expected[offset..]);
904                Ok(())
905            })?;
906            prop_assert_eq!(rb.limits().len, expected.len());
907        }
908
909        #[test]
910        fn ring_buffer_writable_regions(mut rb in ring_buffer::arb_ring_buffer()) {
911            const BYTE_TO_WRITE: u8 = 0x42;
912            let writable_len = rb.writable_regions().into_iter().fold(0, |acc, slice| {
913                slice.fill(BYTE_TO_WRITE);
914                acc + slice.len()
915            });
916            let BufferLimits {len, capacity} = rb.limits();
917            prop_assert_eq!(writable_len + len, capacity);
918            for i in 0..capacity {
919                let expected = if i < len {
920                    0
921                } else {
922                    BYTE_TO_WRITE
923                };
924                let idx = (rb.head + i) % rb.storage.len();
925                prop_assert_eq!(rb.storage[idx], expected);
926            }
927        }
928    }
929
930    #[test_case([Range { start: 0, end: 0 }]
931        => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(0), generation: 0 })]
932    #[test_case([Range { start: 0, end: 10 }]
933        => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(10), generation: 1 })]
934    #[test_case([Range{ start: 10, end: 15 }, Range { start: 5, end: 10 }]
935        => Assembler {
936            outstanding: [
937                SeqRange::new(SeqNum::new(5)..SeqNum::new(15), 2).unwrap()
938            ].into_iter().collect(),
939            nxt: SeqNum::new(0),
940            generation: 2,
941        })
942    ]
943    #[test_case([Range{ start: 10, end: 15 }, Range { start: 0, end: 5 }, Range { start: 5, end: 10 }]
944        => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(15), generation: 3 })]
945    #[test_case([Range{ start: 10, end: 15 }, Range { start: 5, end: 10 }, Range { start: 0, end: 5 }]
946        => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(15), generation: 3 })]
947    #[test_case([Range{ start: 10, end: 15 }, Range { start: 10, end: 15 }, Range { start: 11, end: 12 }]
948        => Assembler {
949             outstanding: [
950                SeqRange::new(SeqNum::new(10)..SeqNum::new(15), 3).unwrap()
951            ].into_iter().collect(),
952            nxt: SeqNum::new(0), generation: 3 })]
953    fn assembler_examples(ops: impl IntoIterator<Item = Range<u32>>) -> Assembler {
954        let mut assembler = Assembler::new(SeqNum::new(0));
955        for Range { start, end } in ops.into_iter() {
956            let _advanced = assembler.insert(SeqNum::new(start)..SeqNum::new(end));
957        }
958        assembler
959    }
960
961    #[test_case(&[] => Vec::<Range<u32>>::new(); "empty")]
962    #[test_case(&[1..2] => vec![1..2]; "single")]
963    #[test_case(&[1..2, 3..4] => vec![3..4, 1..2]; "latest first")]
964    #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10]
965        => vec![9..10, 7..8, 5..6, 3..4]; "max len")]
966    #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10, 6..7]
967        => vec![5..8, 9..10, 3..4, 1..2]; "gap fill")]
968    #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10, 1..8]
969        => vec![1..8, 9..10]; "large gap fill")]
970    fn assembler_sack_blocks(ops: &[Range<u32>]) -> Vec<Range<u32>> {
971        let mut assembler = Assembler::new(SeqNum::new(0));
972        for Range { start, end } in ops {
973            let _: usize = assembler.insert(SeqNum::new(*start)..SeqNum::new(*end));
974        }
975        assembler
976            .sack_blocks(SackBlockSizeLimiters { timestamp_enabled: false })
977            .try_iter()
978            .map(|r| r.expect("invalid block").into_range_u32())
979            .collect()
980    }
981
982    #[test_case(false => vec![10..11, 7..8, 4..5, 1..2]; "4_blocks_with_timestamp_disabled")]
983    #[test_case(true => vec![10..11, 7..8, 4..5]; "3_blocks_with_timestamp_disabled")]
984    fn assembler_sack_blocks_with_timestamp(timestamp_enabled: bool) -> Vec<Range<u32>> {
985        let mut assembler = Assembler::new(SeqNum::new(0));
986        for Range { start, end } in [1..2, 4..5, 7..8, 10..11] {
987            let _: usize = assembler.insert(SeqNum::new(start)..SeqNum::new(end));
988        }
989        assembler
990            .sack_blocks(SackBlockSizeLimiters { timestamp_enabled })
991            .try_iter()
992            .map(|r| r.expect("invalid_block").into_range_u32())
993            .collect()
994    }
995
996    #[test]
997    // Regression test for https://fxbug.dev/42061342.
998    fn ring_buffer_wrap_around() {
999        const CAPACITY: usize = 16;
1000        let mut rb = RingBuffer::new(CAPACITY);
1001
1002        // Write more than half the buffer.
1003        const BUF_SIZE: usize = 10;
1004        assert_eq!(rb.enqueue_data(&[0xAA; BUF_SIZE]), BUF_SIZE);
1005        rb.peek_with(0, |payload| {
1006            assert_eq!(payload, FragmentedPayload::new_contiguous(&[0xAA; BUF_SIZE]))
1007        });
1008        rb.mark_read(BUF_SIZE);
1009
1010        // Write around the end of the buffer.
1011        assert_eq!(rb.enqueue_data(&[0xBB; BUF_SIZE]), BUF_SIZE);
1012        rb.peek_with(0, |payload| {
1013            assert_eq!(
1014                payload,
1015                FragmentedPayload::new([
1016                    &[0xBB; (CAPACITY - BUF_SIZE)],
1017                    &[0xBB; (BUF_SIZE * 2 - CAPACITY)]
1018                ])
1019            )
1020        });
1021        // Mark everything read, which should advance `head` around to the
1022        // beginning of the buffer.
1023        rb.mark_read(BUF_SIZE);
1024
1025        // Now make a contiguous sequence of bytes readable.
1026        assert_eq!(rb.enqueue_data(&[0xCC; BUF_SIZE]), BUF_SIZE);
1027        rb.peek_with(0, |payload| {
1028            assert_eq!(payload, FragmentedPayload::new_contiguous(&[0xCC; BUF_SIZE]))
1029        });
1030
1031        // Check that the unwritten bytes are left untouched. If `head` was
1032        // advanced improperly, this will crash.
1033        let read = rb.read_with(|segments| {
1034            assert_eq!(segments, [[0xCC; BUF_SIZE]]);
1035            BUF_SIZE
1036        });
1037        assert_eq!(read, BUF_SIZE);
1038    }
1039
1040    #[test]
1041    fn ring_buffer_example() {
1042        let mut rb = RingBuffer::new(16);
1043        assert_eq!(rb.write_at(5, &"World".as_bytes()), 5);
1044        assert_eq!(rb.write_at(0, &"Hello".as_bytes()), 5);
1045        rb.make_readable(10, false);
1046        assert_eq!(
1047            rb.read_with(|readable| {
1048                assert_eq!(readable, &["HelloWorld".as_bytes()]);
1049                5
1050            }),
1051            5
1052        );
1053        assert_eq!(
1054            rb.read_with(|readable| {
1055                assert_eq!(readable, &["World".as_bytes()]);
1056                readable[0].len()
1057            }),
1058            5
1059        );
1060        assert_eq!(rb.write_at(0, &"HelloWorld".as_bytes()), 10);
1061        rb.make_readable(10, false);
1062        assert_eq!(
1063            rb.read_with(|readable| {
1064                assert_eq!(readable, &["HelloW".as_bytes(), "orld".as_bytes()]);
1065                6
1066            }),
1067            6
1068        );
1069        assert_eq!(rb.limits().len, 4);
1070        assert_eq!(
1071            rb.read_with(|readable| {
1072                assert_eq!(readable, &["orld".as_bytes()]);
1073                4
1074            }),
1075            4
1076        );
1077        assert_eq!(rb.limits().len, 0);
1078
1079        assert_eq!(rb.enqueue_data("Hello".as_bytes()), 5);
1080        assert_eq!(rb.limits().len, 5);
1081
1082        rb.peek_with(3, |readable| {
1083            assert_eq!(readable.to_vec(), "lo".as_bytes());
1084        });
1085
1086        rb.mark_read(2);
1087
1088        rb.peek_with(0, |readable| {
1089            assert_eq!(readable.to_vec(), "llo".as_bytes());
1090        });
1091    }
1092
1093    mod ring_buffer {
1094        use super::*;
1095        // Use a small capacity so that we have a higher chance to exercise
1096        // wrapping around logic.
1097        const MAX_CAP: usize = 32;
1098
1099        fn arb_ring_buffer_args() -> impl Strategy<Value = (usize, usize, usize)> {
1100            // Use a small capacity so that we have a higher chance to exercise
1101            // wrapping around logic.
1102            (1..=MAX_CAP).prop_flat_map(|cap| {
1103                let max_len = cap;
1104                //  cap      head     len
1105                (Just(cap), 0..cap, 0..=max_len)
1106            })
1107        }
1108
1109        pub(super) fn arb_ring_buffer() -> impl Strategy<Value = RingBuffer> {
1110            arb_ring_buffer_args().prop_map(|(cap, head, len)| RingBuffer {
1111                storage: vec![0; cap],
1112                head,
1113                len,
1114            })
1115        }
1116
1117        /// A strategy for a [`RingBuffer`] and a valid length to mark read.
1118        pub(super) fn with_readable() -> impl Strategy<Value = (RingBuffer, usize)> {
1119            arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
1120                (Just(RingBuffer { storage: vec![0; cap], head, len }), 0..=len)
1121            })
1122        }
1123
1124        /// A strategy for a [`RingBuffer`] and a valid length to make readable.
1125        pub(super) fn with_written() -> impl Strategy<Value = (RingBuffer, usize)> {
1126            arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
1127                let rb = RingBuffer { storage: vec![0; cap], head, len };
1128                let max_written = cap - len;
1129                (Just(rb), 0..=max_written)
1130            })
1131        }
1132
1133        /// A strategy for a [`RingBuffer`], a valid offset and data to write.
1134        pub(super) fn with_offset_data() -> impl Strategy<Value = (RingBuffer, usize, Vec<u8>)> {
1135            arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
1136                let writable_len = cap - len;
1137                (0..=writable_len).prop_flat_map(move |offset| {
1138                    (0..=writable_len - offset).prop_flat_map(move |data_len| {
1139                        (
1140                            Just(RingBuffer { storage: vec![0; cap], head, len }),
1141                            Just(offset),
1142                            proptest::collection::vec(1..=u8::MAX, data_len),
1143                        )
1144                    })
1145                })
1146            })
1147        }
1148
1149        /// A strategy for a [`RingBuffer`], its readable data, and how many
1150        /// bytes to consume.
1151        pub(super) fn with_read_data() -> impl Strategy<Value = (RingBuffer, Vec<u8>, usize)> {
1152            arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
1153                proptest::collection::vec(1..=u8::MAX, len).prop_flat_map(move |data| {
1154                    // Fill the RingBuffer with the data.
1155                    let mut rb = RingBuffer { storage: vec![0; cap], head, len: 0 };
1156                    assert_eq!(rb.write_at(0, &&data[..]), len);
1157                    rb.make_readable(len, false);
1158                    (Just(rb), Just(data), 0..=len)
1159                })
1160            })
1161        }
1162    }
1163}