netstack3_tcp/
buffer.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Defines the buffer traits needed by the TCP implementation. The traits
6//! in this module provide a common interface for platform-specific buffers
7//! used by TCP.
8
9use netstack3_base::{Payload, SackBlocks, SeqNum};
10
11use arrayvec::ArrayVec;
12use core::fmt::Debug;
13use core::ops::Range;
14use packet::InnerPacketBuilder;
15
16use crate::internal::base::BufferSizes;
17use crate::internal::seq_ranges::{SeqRange, SeqRanges};
18
19/// Common super trait for both sending and receiving buffer.
20pub trait Buffer: Debug + Sized {
21    /// Returns the capacity range `(min, max)` for this buffer type.
22    fn capacity_range() -> (usize, usize);
23
24    /// Returns information about the number of bytes in the buffer.
25    ///
26    /// Returns a [`BufferLimits`] instance with information about the number of
27    /// bytes in the buffer.
28    fn limits(&self) -> BufferLimits;
29
30    /// Gets the target size of the buffer, in bytes.
31    ///
32    /// The target capacity of the buffer is distinct from the actual capacity
33    /// (returned by [`Buffer::capacity`]) in that the target capacity should
34    /// remain fixed unless requested otherwise, while the actual capacity can
35    /// vary with usage.
36    ///
37    /// For fixed-size buffers this should return the same result as calling
38    /// `self.capacity()`. For buffer types that support resizing, the
39    /// returned value can be different but should not change unless a resize
40    /// was requested.
41    fn target_capacity(&self) -> usize;
42
43    /// Requests that the buffer be resized to hold the given number of bytes.
44    ///
45    /// Calling this method suggests to the buffer that it should alter its size.
46    /// Implementations are free to impose constraints or ignore requests
47    /// entirely.
48    fn request_capacity(&mut self, size: usize);
49}
50
51/// A buffer supporting TCP receiving operations.
52pub trait ReceiveBuffer: Buffer {
53    /// Writes `data` into the buffer at `offset`.
54    ///
55    /// Returns the number of bytes written.
56    fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize;
57
58    /// Marks `count` bytes available for the application to read.
59    ///
60    /// `has_outstanding` informs the buffer if any bytes past `count` may have
61    /// been populated by out of order segments.
62    ///
63    /// # Panics
64    ///
65    /// Panics if the caller attempts to make more bytes readable than the
66    /// buffer has capacity for. That is, this method panics if `self.len() +
67    /// count > self.cap()`
68    fn make_readable(&mut self, count: usize, has_outstanding: bool);
69}
70
71/// A buffer supporting TCP sending operations.
72pub trait SendBuffer: Buffer {
73    /// The payload type given to `peek_with`.
74    type Payload<'a>: InnerPacketBuilder + Payload + Debug + 'a;
75
76    /// Removes `count` bytes from the beginning of the buffer as already read.
77    ///
78    /// # Panics
79    ///
80    /// Panics if more bytes are marked as read than are available, i.e.,
81    /// `count > self.len`.
82    fn mark_read(&mut self, count: usize);
83
84    /// Calls `f` with contiguous sequences of readable bytes in the buffer
85    /// without advancing the reading pointer.
86    ///
87    /// # Panics
88    ///
89    /// Panics if more bytes are peeked than are available, i.e.,
90    /// `offset > self.len`
91    fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
92    where
93        F: FnOnce(Self::Payload<'a>) -> R;
94}
95
96/// Information about the number of bytes in a [`Buffer`].
97#[derive(Eq, PartialEq, Debug, Copy, Clone)]
98pub struct BufferLimits {
99    /// The total number of bytes that the buffer can hold.
100    pub capacity: usize,
101
102    /// The number of readable bytes that the buffer currently holds.
103    pub len: usize,
104}
105
106/// Assembler for out-of-order segment data.
107#[derive(Debug)]
108#[cfg_attr(test, derive(PartialEq, Eq))]
109pub(super) struct Assembler {
110    // `nxt` is the next sequence number to be expected. It should be before
111    // any sequnce number of the out-of-order sequence numbers we keep track
112    // of below.
113    nxt: SeqNum,
114    // Keeps track of the "age" of segments in the outstanding queue. Every time
115    // a segment is inserted, the generation increases. This allows
116    // RFC-compliant ordering of selective ACK blocks.
117    generation: usize,
118    // Holds all the sequence number ranges which we have already received.
119    // These ranges are sorted and should have a gap of at least 1 byte
120    // between any consecutive two. These ranges should only be after `nxt`.
121    // Each range is tagged with the generation that last modified it.
122    outstanding: SeqRanges<usize>,
123}
124
125impl Assembler {
126    /// Creates a new assembler.
127    pub(super) fn new(nxt: SeqNum) -> Self {
128        Self { outstanding: SeqRanges::default(), generation: 0, nxt }
129    }
130
131    /// Returns the next sequence number expected to be received.
132    pub(super) fn nxt(&self) -> SeqNum {
133        self.nxt
134    }
135
136    /// Returns whether there are out-of-order segments waiting to be
137    /// acknowledged.
138    pub(super) fn has_out_of_order(&self) -> bool {
139        !self.outstanding.is_empty()
140    }
141
142    /// Inserts a received segment.
143    ///
144    /// The newly added segment will be merged with as many existing ones as
145    /// possible and `nxt` will be advanced to the highest ACK number possible.
146    ///
147    /// Returns number of bytes that should be available for the application
148    /// to consume.
149    ///
150    /// # Panics
151    ///
152    /// Panics if `start` is after `end` or if `start` is before `self.nxt`.
153    pub(super) fn insert(&mut self, Range { start, end }: Range<SeqNum>) -> usize {
154        assert!(!start.after(end));
155        assert!(!start.before(self.nxt));
156        if start == end {
157            return 0;
158        }
159
160        let Self { outstanding, nxt, generation } = self;
161        *generation = *generation + 1;
162        let _: bool = outstanding.insert(start..end, *generation);
163
164        if let Some(advanced) = outstanding.pop_front_if(|r| r.start() == *nxt) {
165            *nxt = advanced.end();
166            usize::try_from(advanced.len()).unwrap()
167        } else {
168            0
169        }
170    }
171
172    pub(super) fn has_outstanding(&self) -> bool {
173        let Self { outstanding, nxt: _, generation: _ } = self;
174        !outstanding.is_empty()
175    }
176
177    /// Returns the current outstanding selective ack blocks in the assembler.
178    ///
179    /// The returned blocks are sorted according to [RFC 2018 section 4]:
180    ///
181    /// * The first SACK block (i.e., the one immediately following the kind and
182    ///   length fields in the option) MUST specify the contiguous block of data
183    ///   containing the segment which triggered this ACK. [...]
184    /// * The SACK option SHOULD be filled out by repeating the most recently
185    ///   reported SACK blocks [...]
186    ///
187    /// This is achieved by always returning the blocks that were most recently
188    /// changed by incoming segments.
189    ///
190    /// [RFC 2018 section 4]:
191    ///     https://datatracker.ietf.org/doc/html/rfc2018#section-4
192    pub(crate) fn sack_blocks(&self) -> SackBlocks {
193        let Self { nxt: _, generation: _, outstanding } = self;
194        // Fast exit, no outstanding blocks.
195        if outstanding.is_empty() {
196            return SackBlocks::default();
197        }
198
199        let mut heap = ArrayVec::<&SeqRange<_>, { SackBlocks::MAX_BLOCKS }>::new();
200        for block in outstanding.iter() {
201            if heap.is_full() {
202                if heap.last().is_some_and(|l| l.meta() < block.meta()) {
203                    // New block is later than the earliest block in the heap.
204                    let _: Option<_> = heap.pop();
205                } else {
206                    // New block is earlier than the earliest block in the heap,
207                    // pass.
208                    continue;
209                }
210            }
211
212            heap.push(block);
213            // Sort heap larger generation to lower.
214            heap.sort_by(|a, b| b.meta().cmp(&a.meta()))
215        }
216
217        SackBlocks::from_iter(heap.into_iter().map(|block| block.to_sack_block()))
218    }
219}
220
221/// A conversion trait that converts the object that Bindings give us into a
222/// pair of receive and send buffers.
223pub trait IntoBuffers<R: ReceiveBuffer, S: SendBuffer> {
224    /// Converts to receive and send buffers.
225    fn into_buffers(self, buffer_sizes: BufferSizes) -> (R, S);
226}
227
228#[cfg(any(test, feature = "testutils"))]
229impl<R: Default + ReceiveBuffer, S: Default + SendBuffer> IntoBuffers<R, S> for () {
230    fn into_buffers(self, buffer_sizes: BufferSizes) -> (R, S) {
231        // Ignore buffer sizes since this is a test-only impl.
232        let BufferSizes { send: _, receive: _ } = buffer_sizes;
233        Default::default()
234    }
235}
236
237#[cfg(any(test, feature = "testutils"))]
238pub(crate) mod testutil {
239    use super::*;
240
241    use alloc::sync::Arc;
242    use alloc::vec;
243    use alloc::vec::Vec;
244    use core::cmp;
245
246    use either::Either;
247    use netstack3_base::sync::Mutex;
248    use netstack3_base::{FragmentedPayload, PayloadLen, WindowSize};
249
250    use crate::internal::socket::accept_queue::ListenerNotifier;
251
252    /// A circular buffer implementation.
253    ///
254    /// A [`RingBuffer`] holds a logically contiguous ring of memory in three
255    /// regions:
256    ///
257    /// - *readable*: memory is available for reading and not for writing,
258    /// - *writable*: memory that is available for writing and not for reading,
259    /// - *reserved*: memory that was read from and is no longer available
260    ///   for reading or for writing.
261    ///
262    /// Zero or more of these regions can be empty, and a region of memory can
263    /// transition from one to another in a few different ways:
264    ///
265    /// *Readable* memory, once read, becomes writable.
266    ///
267    /// *Writable* memory, once marked as such, becomes readable.
268    #[derive(Clone, PartialEq, Eq)]
269    pub struct RingBuffer {
270        pub(super) storage: Vec<u8>,
271        /// The index where the reader starts to read.
272        ///
273        /// Maintains the invariant that `head < storage.len()` by wrapping
274        /// around to 0 as needed.
275        pub(super) head: usize,
276        /// The amount of readable data in `storage`.
277        ///
278        /// Anything between [head, head+len) is readable. This will never exceed
279        /// `storage.len()`.
280        pub(super) len: usize,
281    }
282
283    impl Debug for RingBuffer {
284        fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
285            let Self { storage, head, len } = self;
286            f.debug_struct("RingBuffer")
287                .field("storage (len, cap)", &(storage.len(), storage.capacity()))
288                .field("head", head)
289                .field("len", len)
290                .finish()
291        }
292    }
293
294    impl Default for RingBuffer {
295        fn default() -> Self {
296            Self::new(WindowSize::DEFAULT.into())
297        }
298    }
299
300    impl RingBuffer {
301        /// Creates a new `RingBuffer`.
302        pub fn new(capacity: usize) -> Self {
303            Self { storage: vec![0; capacity], head: 0, len: 0 }
304        }
305
306        /// Resets the buffer to be entirely unwritten.
307        pub fn reset(&mut self) {
308            let Self { storage: _, head, len } = self;
309            *head = 0;
310            *len = 0;
311        }
312
313        /// Calls `f` on the contiguous sequences from `start` up to `len` bytes.
314        fn with_readable<'a, F, R>(storage: &'a Vec<u8>, start: usize, len: usize, f: F) -> R
315        where
316            F: for<'b> FnOnce(&'b [&'a [u8]]) -> R,
317        {
318            // Don't read past the end of storage.
319            let end = start + len;
320            if end > storage.len() {
321                let first_part = &storage[start..storage.len()];
322                let second_part = &storage[0..len - first_part.len()];
323                f(&[first_part, second_part][..])
324            } else {
325                let all_bytes = &storage[start..end];
326                f(&[all_bytes][..])
327            }
328        }
329
330        /// Calls `f` with contiguous sequences of readable bytes in the buffer and
331        /// discards the amount of bytes returned by `f`.
332        ///
333        /// # Panics
334        ///
335        /// Panics if the closure wants to discard more bytes than possible, i.e.,
336        /// the value returned by `f` is greater than `self.len()`.
337        pub fn read_with<F>(&mut self, f: F) -> usize
338        where
339            F: for<'a, 'b> FnOnce(&'b [&'a [u8]]) -> usize,
340        {
341            let Self { storage, head, len } = self;
342            if storage.len() == 0 {
343                return f(&[&[]]);
344            }
345            let nread = RingBuffer::with_readable(storage, *head, *len, f);
346            assert!(nread <= *len);
347            *len -= nread;
348            *head = (*head + nread) % storage.len();
349            nread
350        }
351
352        /// Returns the writable regions of the [`RingBuffer`].
353        pub fn writable_regions(&mut self) -> impl IntoIterator<Item = &mut [u8]> {
354            let BufferLimits { capacity, len } = self.limits();
355            let available = capacity - len;
356            let Self { storage, head, len } = self;
357
358            let mut write_start = *head + *len;
359            if write_start >= storage.len() {
360                write_start -= storage.len()
361            }
362            let write_end = write_start + available;
363            if write_end <= storage.len() {
364                Either::Left([&mut self.storage[write_start..write_end]].into_iter())
365            } else {
366                let (b1, b2) = self.storage[..].split_at_mut(write_start);
367                let b2_len = b2.len();
368                Either::Right([b2, &mut b1[..(available - b2_len)]].into_iter())
369            }
370        }
371    }
372
373    impl Buffer for RingBuffer {
374        fn capacity_range() -> (usize, usize) {
375            // Arbitrarily chosen to satisfy tests so we have some semblance of
376            // clamping capacity in tests.
377            (16, 16 << 20)
378        }
379
380        fn limits(&self) -> BufferLimits {
381            let Self { storage, len, head: _ } = self;
382            let capacity = storage.len();
383            BufferLimits { len: *len, capacity }
384        }
385
386        fn target_capacity(&self) -> usize {
387            let Self { storage, len: _, head: _ } = self;
388            storage.len()
389        }
390
391        fn request_capacity(&mut self, size: usize) {
392            unimplemented!("capacity request for {size} not supported")
393        }
394    }
395
396    impl ReceiveBuffer for RingBuffer {
397        fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize {
398            let BufferLimits { capacity, len } = self.limits();
399            let available = capacity - len;
400            let Self { storage, head, len } = self;
401            if storage.len() == 0 {
402                return 0;
403            }
404
405            if offset > available {
406                return 0;
407            }
408            let start_at = (*head + *len + offset) % storage.len();
409            let to_write = cmp::min(data.len(), available);
410            // Write the first part of the payload.
411            let first_len = cmp::min(to_write, storage.len() - start_at);
412            data.partial_copy(0, &mut storage[start_at..start_at + first_len]);
413            // If we have more to write, wrap around and start from the beginning
414            // of the storage.
415            if to_write > first_len {
416                data.partial_copy(first_len, &mut storage[0..to_write - first_len]);
417            }
418            to_write
419        }
420
421        fn make_readable(&mut self, count: usize, _has_outstanding: bool) {
422            let BufferLimits { capacity, len } = self.limits();
423            debug_assert!(count <= capacity - len);
424            self.len += count;
425        }
426    }
427
428    impl SendBuffer for RingBuffer {
429        type Payload<'a> = FragmentedPayload<'a, 2>;
430
431        fn mark_read(&mut self, count: usize) {
432            let Self { storage, head, len } = self;
433            assert!(count <= *len);
434            *len -= count;
435            *head = (*head + count) % storage.len();
436        }
437
438        fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
439        where
440            F: FnOnce(Self::Payload<'a>) -> R,
441        {
442            let Self { storage, head, len } = self;
443            if storage.len() == 0 {
444                return f(FragmentedPayload::new_empty());
445            }
446            assert!(offset <= *len);
447            RingBuffer::with_readable(
448                storage,
449                (*head + offset) % storage.len(),
450                *len - offset,
451                |readable| f(readable.into_iter().map(|x| *x).collect()),
452            )
453        }
454    }
455
456    impl RingBuffer {
457        /// Enqueues as much of `data` as possible to the end of the buffer.
458        ///
459        /// Returns the number of bytes actually queued.
460        pub(crate) fn enqueue_data(&mut self, data: &[u8]) -> usize {
461            let nwritten = self.write_at(0, &data);
462            self.make_readable(nwritten, false);
463            nwritten
464        }
465    }
466
467    impl Buffer for Arc<Mutex<RingBuffer>> {
468        fn capacity_range() -> (usize, usize) {
469            RingBuffer::capacity_range()
470        }
471
472        fn limits(&self) -> BufferLimits {
473            self.lock().limits()
474        }
475
476        fn target_capacity(&self) -> usize {
477            self.lock().target_capacity()
478        }
479
480        fn request_capacity(&mut self, size: usize) {
481            self.lock().request_capacity(size)
482        }
483    }
484
485    impl ReceiveBuffer for Arc<Mutex<RingBuffer>> {
486        fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize {
487            self.lock().write_at(offset, data)
488        }
489
490        fn make_readable(&mut self, count: usize, has_outstanding: bool) {
491            self.lock().make_readable(count, has_outstanding)
492        }
493    }
494
495    /// An implementation of [`SendBuffer`] for tests.
496    #[derive(Debug, Default)]
497    pub struct TestSendBuffer {
498        fake_stream: Arc<Mutex<Vec<u8>>>,
499        ring: RingBuffer,
500    }
501
502    impl TestSendBuffer {
503        /// Creates a new `TestSendBuffer` with a backing shared vec and a
504        /// helper ring buffer.
505        pub fn new(fake_stream: Arc<Mutex<Vec<u8>>>, ring: RingBuffer) -> TestSendBuffer {
506            Self { fake_stream, ring }
507        }
508    }
509
510    impl Buffer for TestSendBuffer {
511        fn capacity_range() -> (usize, usize) {
512            let (min, max) = RingBuffer::capacity_range();
513            (min * 2, max * 2)
514        }
515
516        fn limits(&self) -> BufferLimits {
517            let Self { fake_stream, ring } = self;
518            let BufferLimits { capacity: ring_capacity, len: ring_len } = ring.limits();
519            let guard = fake_stream.lock();
520            let len = ring_len + guard.len();
521            let capacity = ring_capacity + guard.capacity();
522            BufferLimits { len, capacity }
523        }
524
525        fn target_capacity(&self) -> usize {
526            let Self { fake_stream: _, ring } = self;
527            ring.target_capacity()
528        }
529
530        fn request_capacity(&mut self, size: usize) {
531            let Self { fake_stream: _, ring } = self;
532            ring.request_capacity(size)
533        }
534    }
535
536    impl SendBuffer for TestSendBuffer {
537        type Payload<'a> = FragmentedPayload<'a, 2>;
538
539        fn mark_read(&mut self, count: usize) {
540            let Self { fake_stream: _, ring } = self;
541            ring.mark_read(count)
542        }
543
544        fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
545        where
546            F: FnOnce(Self::Payload<'a>) -> R,
547        {
548            let Self { fake_stream, ring } = self;
549            let mut guard = fake_stream.lock();
550            if !guard.is_empty() {
551                // Pull from the fake stream into the ring if there is capacity.
552                let BufferLimits { capacity, len } = ring.limits();
553                let len = (capacity - len).min(guard.len());
554                let rest = guard.split_off(len);
555                let first = core::mem::replace(&mut *guard, rest);
556                assert_eq!(ring.enqueue_data(&first[..]), len);
557            }
558            ring.peek_with(offset, f)
559        }
560    }
561
562    fn arc_mutex_eq<T: PartialEq>(a: &Arc<Mutex<T>>, b: &Arc<Mutex<T>>) -> bool {
563        if Arc::ptr_eq(a, b) {
564            return true;
565        }
566        (&*a.lock()) == (&*b.lock())
567    }
568
569    /// A fake implementation of client-side TCP buffers.
570    #[derive(Clone, Debug, Default)]
571    pub struct ClientBuffers {
572        /// Receive buffer shared with core TCP implementation.
573        pub receive: Arc<Mutex<RingBuffer>>,
574        /// Send buffer shared with core TCP implementation.
575        pub send: Arc<Mutex<Vec<u8>>>,
576    }
577
578    impl PartialEq for ClientBuffers {
579        fn eq(&self, ClientBuffers { receive: other_receive, send: other_send }: &Self) -> bool {
580            let Self { receive, send } = self;
581            arc_mutex_eq(receive, other_receive) && arc_mutex_eq(send, other_send)
582        }
583    }
584
585    impl Eq for ClientBuffers {}
586
587    impl ClientBuffers {
588        /// Creates new a `ClientBuffers` with `buffer_sizes`.
589        pub fn new(buffer_sizes: BufferSizes) -> Self {
590            let BufferSizes { send, receive } = buffer_sizes;
591            Self {
592                receive: Arc::new(Mutex::new(RingBuffer::new(receive))),
593                send: Arc::new(Mutex::new(Vec::with_capacity(send))),
594            }
595        }
596    }
597
598    /// A fake implementation of bindings buffers for TCP.
599    #[derive(Debug, Clone, Eq, PartialEq)]
600    #[allow(missing_docs)]
601    pub enum ProvidedBuffers {
602        Buffers(WriteBackClientBuffers),
603        NoBuffers,
604    }
605
606    impl Default for ProvidedBuffers {
607        fn default() -> Self {
608            Self::NoBuffers
609        }
610    }
611
612    impl From<WriteBackClientBuffers> for ProvidedBuffers {
613        fn from(buffers: WriteBackClientBuffers) -> Self {
614            ProvidedBuffers::Buffers(buffers)
615        }
616    }
617
618    impl From<ProvidedBuffers> for WriteBackClientBuffers {
619        fn from(extra: ProvidedBuffers) -> Self {
620            match extra {
621                ProvidedBuffers::Buffers(buffers) => buffers,
622                ProvidedBuffers::NoBuffers => Default::default(),
623            }
624        }
625    }
626
627    impl From<ProvidedBuffers> for () {
628        fn from(_: ProvidedBuffers) -> Self {
629            ()
630        }
631    }
632
633    impl From<()> for ProvidedBuffers {
634        fn from(_: ()) -> Self {
635            Default::default()
636        }
637    }
638
639    /// The variant of [`ProvidedBuffers`] that provides observing the data
640    /// sent/received to TCP sockets.
641    #[derive(Debug, Default, Clone)]
642    pub struct WriteBackClientBuffers(pub Arc<Mutex<Option<ClientBuffers>>>);
643
644    impl PartialEq for WriteBackClientBuffers {
645        fn eq(&self, Self(other): &Self) -> bool {
646            let Self(this) = self;
647            arc_mutex_eq(this, other)
648        }
649    }
650
651    impl Eq for WriteBackClientBuffers {}
652
653    impl IntoBuffers<Arc<Mutex<RingBuffer>>, TestSendBuffer> for ProvidedBuffers {
654        fn into_buffers(
655            self,
656            buffer_sizes: BufferSizes,
657        ) -> (Arc<Mutex<RingBuffer>>, TestSendBuffer) {
658            let buffers = ClientBuffers::new(buffer_sizes);
659            if let ProvidedBuffers::Buffers(b) = self {
660                *b.0.as_ref().lock() = Some(buffers.clone());
661            }
662            let ClientBuffers { receive, send } = buffers;
663            (receive, TestSendBuffer::new(send, Default::default()))
664        }
665    }
666
667    impl ListenerNotifier for ProvidedBuffers {
668        fn new_incoming_connections(&mut self, _: usize) {}
669    }
670
671    #[derive(Debug)]
672    pub struct RepeatingPayload {
673        len: usize,
674    }
675
676    impl RepeatingPayload {
677        const REPEATING_BYTE: u8 = 0xAA;
678    }
679
680    impl PayloadLen for RepeatingPayload {
681        fn len(&self) -> usize {
682            self.len
683        }
684    }
685
686    impl Payload for RepeatingPayload {
687        fn slice(self, range: Range<u32>) -> Self {
688            Self { len: usize::try_from(range.end - range.start).unwrap() }
689        }
690
691        fn partial_copy(&self, offset: usize, dst: &mut [u8]) {
692            assert!(offset < self.len);
693            assert_eq!(dst.len() - offset, self.len);
694            dst.fill(Self::REPEATING_BYTE);
695        }
696
697        fn partial_copy_uninit(&self, offset: usize, dst: &mut [core::mem::MaybeUninit<u8>]) {
698            assert!(offset < self.len);
699            assert_eq!(dst.len() - offset, self.len);
700            dst.fill(core::mem::MaybeUninit::new(Self::REPEATING_BYTE));
701        }
702
703        fn new_empty() -> Self {
704            Self { len: 0 }
705        }
706    }
707
708    impl InnerPacketBuilder for RepeatingPayload {
709        fn bytes_len(&self) -> usize {
710            self.len
711        }
712
713        fn serialize(&self, buffer: &mut [u8]) {
714            buffer.fill(Self::REPEATING_BYTE)
715        }
716    }
717
718    /// A buffer that always has [`usize::MAX`] bytes available to write.
719    #[derive(Default, Debug, Eq, PartialEq)]
720    pub struct InfiniteSendBuffer;
721
722    impl InfiniteSendBuffer {
723        const LEN: usize = usize::MAX as usize;
724    }
725
726    impl Buffer for InfiniteSendBuffer {
727        fn capacity_range() -> (usize, usize) {
728            (0, Self::LEN)
729        }
730
731        fn limits(&self) -> BufferLimits {
732            BufferLimits { capacity: Self::LEN, len: Self::LEN }
733        }
734
735        fn target_capacity(&self) -> usize {
736            Self::LEN
737        }
738
739        fn request_capacity(&mut self, size: usize) {
740            unimplemented!("can't change capacity of infinite send buffer to {size}")
741        }
742    }
743
744    impl SendBuffer for InfiniteSendBuffer {
745        type Payload<'a> = RepeatingPayload;
746
747        fn mark_read(&mut self, _count: usize) {}
748
749        fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
750        where
751            F: FnOnce(Self::Payload<'a>) -> R,
752        {
753            f(RepeatingPayload { len: Self::LEN - offset })
754        }
755    }
756
757    /// A buffer that has a controllable amount of [`RepeatingPayload`] bytes
758    /// available to read.
759    #[derive(Default, Debug, Eq, PartialEq)]
760    pub struct RepeatingSendBuffer(usize);
761
762    impl RepeatingSendBuffer {
763        /// Creates a new buffer with the provided `length`.
764        pub fn new(length: usize) -> Self {
765            Self(length)
766        }
767    }
768
769    impl Buffer for RepeatingSendBuffer {
770        fn capacity_range() -> (usize, usize) {
771            (0, usize::MAX)
772        }
773
774        fn limits(&self) -> BufferLimits {
775            let Self(len) = self;
776            BufferLimits { capacity: usize::MAX, len: *len }
777        }
778
779        fn target_capacity(&self) -> usize {
780            usize::MAX
781        }
782
783        fn request_capacity(&mut self, size: usize) {
784            unimplemented!("can't change capacity of repeatable send buffer to {size}")
785        }
786    }
787
788    impl SendBuffer for RepeatingSendBuffer {
789        type Payload<'a> = RepeatingPayload;
790
791        fn mark_read(&mut self, count: usize) {
792            let Self(len) = self;
793            *len = *len - count;
794        }
795
796        fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
797        where
798            F: FnOnce(Self::Payload<'a>) -> R,
799        {
800            let Self(len) = self;
801            f(RepeatingPayload { len: *len - offset })
802        }
803    }
804}
805
806#[cfg(test)]
807mod test {
808    use alloc::vec::Vec;
809    use alloc::{format, vec};
810
811    use netstack3_base::FragmentedPayload;
812    use proptest::strategy::{Just, Strategy};
813    use proptest::test_runner::Config;
814    use proptest::{prop_assert, prop_assert_eq, proptest};
815    use proptest_support::failed_seeds_no_std;
816    use test_case::test_case;
817    use testutil::RingBuffer;
818
819    use super::*;
820    proptest! {
821        #![proptest_config(Config {
822            // Add all failed seeds here.
823            failure_persistence: failed_seeds_no_std!(
824                "cc f621ca7d3a2b108e0dc41f7169ad028f4329b79e90e73d5f68042519a9f63999",
825                "cc c449aebed201b4ec4f137f3c224f20325f4cfee0b7fd596d9285176b6d811aa9"
826            ),
827            ..Config::default()
828        })]
829
830        #[test]
831        fn ring_buffer_make_readable((mut rb, avail) in ring_buffer::with_written()) {
832            let old_storage = rb.storage.clone();
833            let old_head = rb.head;
834            let old_len = rb.limits().len;
835            rb.make_readable(avail, false);
836            // Assert that length is updated but everything else is unchanged.
837            let RingBuffer { storage, head, len } = rb;
838            prop_assert_eq!(len, old_len + avail);
839            prop_assert_eq!(head, old_head);
840            prop_assert_eq!(storage, old_storage);
841        }
842
843        #[test]
844        fn ring_buffer_write_at((mut rb, offset, data) in ring_buffer::with_offset_data()) {
845            let old_head = rb.head;
846            let old_len = rb.limits().len;
847            prop_assert_eq!(rb.write_at(offset, &&data[..]), data.len());
848            prop_assert_eq!(rb.head, old_head);
849            prop_assert_eq!(rb.limits().len, old_len);
850            for i in 0..data.len() {
851                let masked = (rb.head + rb.len + offset + i) % rb.storage.len();
852                // Make sure that data are written.
853                prop_assert_eq!(rb.storage[masked], data[i]);
854                rb.storage[masked] = 0;
855            }
856            // And the other parts of the storage are untouched.
857            prop_assert_eq!(&rb.storage, &vec![0; rb.storage.len()]);
858        }
859
860        #[test]
861        fn ring_buffer_read_with((mut rb, expected, consume) in ring_buffer::with_read_data()) {
862            prop_assert_eq!(rb.limits().len, expected.len());
863            let nread = rb.read_with(|readable| {
864                assert!(readable.len() == 1 || readable.len() == 2);
865                let got = readable.concat();
866                assert_eq!(got, expected);
867                consume
868            });
869            prop_assert_eq!(nread, consume);
870            prop_assert_eq!(rb.limits().len, expected.len() - consume);
871        }
872
873        #[test]
874        fn ring_buffer_mark_read((mut rb, readable) in ring_buffer::with_readable()) {
875            const BYTE_TO_WRITE: u8 = 0x42;
876            let written = rb.writable_regions().into_iter().fold(0, |acc, slice| {
877                slice.fill(BYTE_TO_WRITE);
878                acc + slice.len()
879            });
880            let old_storage = rb.storage.clone();
881            let old_head = rb.head;
882            let old_len = rb.limits().len;
883
884            rb.mark_read(readable);
885            let new_writable = rb.writable_regions().into_iter().fold(Vec::new(), |mut acc, slice| {
886                acc.extend_from_slice(slice);
887                acc
888            });
889            for (i, x) in new_writable.iter().enumerate().take(written) {
890                prop_assert_eq!(*x, BYTE_TO_WRITE, "i={}, rb={:?}", i, rb);
891            }
892            prop_assert!(new_writable.len() >= written);
893
894            let RingBuffer { storage, head, len } = rb;
895            prop_assert_eq!(len, old_len - readable);
896            prop_assert_eq!(head, (old_head + readable) % old_storage.len());
897            prop_assert_eq!(storage, old_storage);
898        }
899
900        #[test]
901        fn ring_buffer_peek_with((mut rb, expected, offset) in ring_buffer::with_read_data()) {
902            prop_assert_eq!(rb.limits().len, expected.len());
903            rb.peek_with(offset, |readable| {
904                prop_assert_eq!(readable.to_vec(), &expected[offset..]);
905                Ok(())
906            })?;
907            prop_assert_eq!(rb.limits().len, expected.len());
908        }
909
910        #[test]
911        fn ring_buffer_writable_regions(mut rb in ring_buffer::arb_ring_buffer()) {
912            const BYTE_TO_WRITE: u8 = 0x42;
913            let writable_len = rb.writable_regions().into_iter().fold(0, |acc, slice| {
914                slice.fill(BYTE_TO_WRITE);
915                acc + slice.len()
916            });
917            let BufferLimits {len, capacity} = rb.limits();
918            prop_assert_eq!(writable_len + len, capacity);
919            for i in 0..capacity {
920                let expected = if i < len {
921                    0
922                } else {
923                    BYTE_TO_WRITE
924                };
925                let idx = (rb.head + i) % rb.storage.len();
926                prop_assert_eq!(rb.storage[idx], expected);
927            }
928        }
929    }
930
931    #[test_case([Range { start: 0, end: 0 }]
932        => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(0), generation: 0 })]
933    #[test_case([Range { start: 0, end: 10 }]
934        => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(10), generation: 1 })]
935    #[test_case([Range{ start: 10, end: 15 }, Range { start: 5, end: 10 }]
936        => Assembler {
937            outstanding: [
938                SeqRange::new(SeqNum::new(5)..SeqNum::new(15), 2).unwrap()
939            ].into_iter().collect(),
940            nxt: SeqNum::new(0),
941            generation: 2,
942        })
943    ]
944    #[test_case([Range{ start: 10, end: 15 }, Range { start: 0, end: 5 }, Range { start: 5, end: 10 }]
945        => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(15), generation: 3 })]
946    #[test_case([Range{ start: 10, end: 15 }, Range { start: 5, end: 10 }, Range { start: 0, end: 5 }]
947        => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(15), generation: 3 })]
948    #[test_case([Range{ start: 10, end: 15 }, Range { start: 10, end: 15 }, Range { start: 11, end: 12 }]
949        => Assembler {
950             outstanding: [
951                SeqRange::new(SeqNum::new(10)..SeqNum::new(15), 3).unwrap()
952            ].into_iter().collect(),
953            nxt: SeqNum::new(0), generation: 3 })]
954    fn assembler_examples(ops: impl IntoIterator<Item = Range<u32>>) -> Assembler {
955        let mut assembler = Assembler::new(SeqNum::new(0));
956        for Range { start, end } in ops.into_iter() {
957            let _advanced = assembler.insert(SeqNum::new(start)..SeqNum::new(end));
958        }
959        assembler
960    }
961
962    #[test_case(&[] => Vec::<Range<u32>>::new(); "empty")]
963    #[test_case(&[1..2] => vec![1..2]; "single")]
964    #[test_case(&[1..2, 3..4] => vec![3..4, 1..2]; "latest first")]
965    #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10]
966        => vec![9..10, 7..8, 5..6, 3..4]; "max len")]
967    #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10, 6..7]
968        => vec![5..8, 9..10, 3..4, 1..2]; "gap fill")]
969    #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10, 1..8]
970        => vec![1..8, 9..10]; "large gap fill")]
971    fn assembler_sack_blocks(ops: &[Range<u32>]) -> Vec<Range<u32>> {
972        let mut assembler = Assembler::new(SeqNum::new(0));
973        for Range { start, end } in ops {
974            let _: usize = assembler.insert(SeqNum::new(*start)..SeqNum::new(*end));
975        }
976        assembler
977            .sack_blocks()
978            .try_iter()
979            .map(|r| r.expect("invalid block").into_range_u32())
980            .collect()
981    }
982
983    #[test]
984    // Regression test for https://fxbug.dev/42061342.
985    fn ring_buffer_wrap_around() {
986        const CAPACITY: usize = 16;
987        let mut rb = RingBuffer::new(CAPACITY);
988
989        // Write more than half the buffer.
990        const BUF_SIZE: usize = 10;
991        assert_eq!(rb.enqueue_data(&[0xAA; BUF_SIZE]), BUF_SIZE);
992        rb.peek_with(0, |payload| {
993            assert_eq!(payload, FragmentedPayload::new_contiguous(&[0xAA; BUF_SIZE]))
994        });
995        rb.mark_read(BUF_SIZE);
996
997        // Write around the end of the buffer.
998        assert_eq!(rb.enqueue_data(&[0xBB; BUF_SIZE]), BUF_SIZE);
999        rb.peek_with(0, |payload| {
1000            assert_eq!(
1001                payload,
1002                FragmentedPayload::new([
1003                    &[0xBB; (CAPACITY - BUF_SIZE)],
1004                    &[0xBB; (BUF_SIZE * 2 - CAPACITY)]
1005                ])
1006            )
1007        });
1008        // Mark everything read, which should advance `head` around to the
1009        // beginning of the buffer.
1010        rb.mark_read(BUF_SIZE);
1011
1012        // Now make a contiguous sequence of bytes readable.
1013        assert_eq!(rb.enqueue_data(&[0xCC; BUF_SIZE]), BUF_SIZE);
1014        rb.peek_with(0, |payload| {
1015            assert_eq!(payload, FragmentedPayload::new_contiguous(&[0xCC; BUF_SIZE]))
1016        });
1017
1018        // Check that the unwritten bytes are left untouched. If `head` was
1019        // advanced improperly, this will crash.
1020        let read = rb.read_with(|segments| {
1021            assert_eq!(segments, [[0xCC; BUF_SIZE]]);
1022            BUF_SIZE
1023        });
1024        assert_eq!(read, BUF_SIZE);
1025    }
1026
1027    #[test]
1028    fn ring_buffer_example() {
1029        let mut rb = RingBuffer::new(16);
1030        assert_eq!(rb.write_at(5, &"World".as_bytes()), 5);
1031        assert_eq!(rb.write_at(0, &"Hello".as_bytes()), 5);
1032        rb.make_readable(10, false);
1033        assert_eq!(
1034            rb.read_with(|readable| {
1035                assert_eq!(readable, &["HelloWorld".as_bytes()]);
1036                5
1037            }),
1038            5
1039        );
1040        assert_eq!(
1041            rb.read_with(|readable| {
1042                assert_eq!(readable, &["World".as_bytes()]);
1043                readable[0].len()
1044            }),
1045            5
1046        );
1047        assert_eq!(rb.write_at(0, &"HelloWorld".as_bytes()), 10);
1048        rb.make_readable(10, false);
1049        assert_eq!(
1050            rb.read_with(|readable| {
1051                assert_eq!(readable, &["HelloW".as_bytes(), "orld".as_bytes()]);
1052                6
1053            }),
1054            6
1055        );
1056        assert_eq!(rb.limits().len, 4);
1057        assert_eq!(
1058            rb.read_with(|readable| {
1059                assert_eq!(readable, &["orld".as_bytes()]);
1060                4
1061            }),
1062            4
1063        );
1064        assert_eq!(rb.limits().len, 0);
1065
1066        assert_eq!(rb.enqueue_data("Hello".as_bytes()), 5);
1067        assert_eq!(rb.limits().len, 5);
1068
1069        let () = rb.peek_with(3, |readable| {
1070            assert_eq!(readable.to_vec(), "lo".as_bytes());
1071        });
1072
1073        rb.mark_read(2);
1074
1075        let () = rb.peek_with(0, |readable| {
1076            assert_eq!(readable.to_vec(), "llo".as_bytes());
1077        });
1078    }
1079
1080    mod ring_buffer {
1081        use super::*;
1082        // Use a small capacity so that we have a higher chance to exercise
1083        // wrapping around logic.
1084        const MAX_CAP: usize = 32;
1085
1086        fn arb_ring_buffer_args() -> impl Strategy<Value = (usize, usize, usize)> {
1087            // Use a small capacity so that we have a higher chance to exercise
1088            // wrapping around logic.
1089            (1..=MAX_CAP).prop_flat_map(|cap| {
1090                let max_len = cap;
1091                //  cap      head     len
1092                (Just(cap), 0..cap, 0..=max_len)
1093            })
1094        }
1095
1096        pub(super) fn arb_ring_buffer() -> impl Strategy<Value = RingBuffer> {
1097            arb_ring_buffer_args().prop_map(|(cap, head, len)| RingBuffer {
1098                storage: vec![0; cap],
1099                head,
1100                len,
1101            })
1102        }
1103
1104        /// A strategy for a [`RingBuffer`] and a valid length to mark read.
1105        pub(super) fn with_readable() -> impl Strategy<Value = (RingBuffer, usize)> {
1106            arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
1107                (Just(RingBuffer { storage: vec![0; cap], head, len }), 0..=len)
1108            })
1109        }
1110
1111        /// A strategy for a [`RingBuffer`] and a valid length to make readable.
1112        pub(super) fn with_written() -> impl Strategy<Value = (RingBuffer, usize)> {
1113            arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
1114                let rb = RingBuffer { storage: vec![0; cap], head, len };
1115                let max_written = cap - len;
1116                (Just(rb), 0..=max_written)
1117            })
1118        }
1119
1120        /// A strategy for a [`RingBuffer`], a valid offset and data to write.
1121        pub(super) fn with_offset_data() -> impl Strategy<Value = (RingBuffer, usize, Vec<u8>)> {
1122            arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
1123                let writable_len = cap - len;
1124                (0..=writable_len).prop_flat_map(move |offset| {
1125                    (0..=writable_len - offset).prop_flat_map(move |data_len| {
1126                        (
1127                            Just(RingBuffer { storage: vec![0; cap], head, len }),
1128                            Just(offset),
1129                            proptest::collection::vec(1..=u8::MAX, data_len),
1130                        )
1131                    })
1132                })
1133            })
1134        }
1135
1136        /// A strategy for a [`RingBuffer`], its readable data, and how many
1137        /// bytes to consume.
1138        pub(super) fn with_read_data() -> impl Strategy<Value = (RingBuffer, Vec<u8>, usize)> {
1139            arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
1140                proptest::collection::vec(1..=u8::MAX, len).prop_flat_map(move |data| {
1141                    // Fill the RingBuffer with the data.
1142                    let mut rb = RingBuffer { storage: vec![0; cap], head, len: 0 };
1143                    assert_eq!(rb.write_at(0, &&data[..]), len);
1144                    rb.make_readable(len, false);
1145                    (Just(rb), Just(data), 0..=len)
1146                })
1147            })
1148        }
1149    }
1150}