netstack3_tcp/
buffer.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Defines the buffer traits needed by the TCP implementation. The traits
6//! in this module provide a common interface for platform-specific buffers
7//! used by TCP.
8
9use netstack3_base::{Payload, SackBlock, SackBlocks, SeqNum};
10
11use arrayvec::ArrayVec;
12use core::fmt::Debug;
13use core::ops::Range;
14use packet::InnerPacketBuilder;
15
16use crate::internal::base::BufferSizes;
17use crate::internal::seq_ranges::{SeqRange, SeqRanges};
18
19/// Common super trait for both sending and receiving buffer.
20pub trait Buffer: Debug + Sized {
21    /// Returns the capacity range `(min, max)` for this buffer type.
22    fn capacity_range() -> (usize, usize);
23
24    /// Returns information about the number of bytes in the buffer.
25    ///
26    /// Returns a [`BufferLimits`] instance with information about the number of
27    /// bytes in the buffer.
28    fn limits(&self) -> BufferLimits;
29
30    /// Gets the target size of the buffer, in bytes.
31    ///
32    /// The target capacity of the buffer is distinct from the actual capacity
33    /// (returned by [`Buffer::capacity`]) in that the target capacity should
34    /// remain fixed unless requested otherwise, while the actual capacity can
35    /// vary with usage.
36    ///
37    /// For fixed-size buffers this should return the same result as calling
38    /// `self.capacity()`. For buffer types that support resizing, the
39    /// returned value can be different but should not change unless a resize
40    /// was requested.
41    fn target_capacity(&self) -> usize;
42
43    /// Requests that the buffer be resized to hold the given number of bytes.
44    ///
45    /// Calling this method suggests to the buffer that it should alter its size.
46    /// Implementations are free to impose constraints or ignore requests
47    /// entirely.
48    fn request_capacity(&mut self, size: usize);
49}
50
51/// A buffer supporting TCP receiving operations.
52pub trait ReceiveBuffer: Buffer {
53    /// Writes `data` into the buffer at `offset`.
54    ///
55    /// Returns the number of bytes written.
56    fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize;
57
58    /// Marks `count` bytes available for the application to read.
59    ///
60    /// `has_outstanding` informs the buffer if any bytes past `count` may have
61    /// been populated by out of order segments.
62    ///
63    /// # Panics
64    ///
65    /// Panics if the caller attempts to make more bytes readable than the
66    /// buffer has capacity for. That is, this method panics if `self.len() +
67    /// count > self.cap()`
68    fn make_readable(&mut self, count: usize, has_outstanding: bool);
69}
70
71/// A buffer supporting TCP sending operations.
72pub trait SendBuffer: Buffer {
73    /// The payload type given to `peek_with`.
74    type Payload<'a>: InnerPacketBuilder + Payload + Debug + 'a;
75
76    /// Removes `count` bytes from the beginning of the buffer as already read.
77    ///
78    /// # Panics
79    ///
80    /// Panics if more bytes are marked as read than are available, i.e.,
81    /// `count > self.len`.
82    fn mark_read(&mut self, count: usize);
83
84    /// Calls `f` with contiguous sequences of readable bytes in the buffer
85    /// without advancing the reading pointer.
86    ///
87    /// # Panics
88    ///
89    /// Panics if more bytes are peeked than are available, i.e.,
90    /// `offset > self.len`
91    fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
92    where
93        F: FnOnce(Self::Payload<'a>) -> R;
94}
95
96/// Information about the number of bytes in a [`Buffer`].
97#[derive(Eq, PartialEq, Debug, Copy, Clone)]
98pub struct BufferLimits {
99    /// The total number of bytes that the buffer can hold.
100    pub capacity: usize,
101
102    /// The number of readable bytes that the buffer currently holds.
103    pub len: usize,
104}
105
106/// Assembler for out-of-order segment data.
107#[derive(Debug)]
108#[cfg_attr(test, derive(PartialEq, Eq))]
109pub(super) struct Assembler {
110    // `nxt` is the next sequence number to be expected. It should be before
111    // any sequnce number of the out-of-order sequence numbers we keep track
112    // of below.
113    nxt: SeqNum,
114    // Keeps track of the "age" of segments in the outstanding queue. Every time
115    // a segment is inserted, the generation increases. This allows
116    // RFC-compliant ordering of selective ACK blocks.
117    generation: usize,
118    // Holds all the sequence number ranges which we have already received.
119    // These ranges are sorted and should have a gap of at least 1 byte
120    // between any consecutive two. These ranges should only be after `nxt`.
121    // Each range is tagged with the generation that last modified it.
122    outstanding: SeqRanges<usize>,
123}
124
125impl Assembler {
126    /// Creates a new assembler.
127    pub(super) fn new(nxt: SeqNum) -> Self {
128        Self { outstanding: SeqRanges::default(), generation: 0, nxt }
129    }
130
131    /// Returns the next sequence number expected to be received.
132    pub(super) fn nxt(&self) -> SeqNum {
133        self.nxt
134    }
135
136    /// Returns whether there are out-of-order segments waiting to be
137    /// acknowledged.
138    pub(super) fn has_out_of_order(&self) -> bool {
139        !self.outstanding.is_empty()
140    }
141
142    /// Inserts a received segment.
143    ///
144    /// The newly added segment will be merged with as many existing ones as
145    /// possible and `nxt` will be advanced to the highest ACK number possible.
146    ///
147    /// Returns number of bytes that should be available for the application
148    /// to consume.
149    ///
150    /// # Panics
151    ///
152    /// Panics if `start` is after `end` or if `start` is before `self.nxt`.
153    pub(super) fn insert(&mut self, Range { start, end }: Range<SeqNum>) -> usize {
154        assert!(!start.after(end));
155        assert!(!start.before(self.nxt));
156        if start == end {
157            return 0;
158        }
159
160        let Self { outstanding, nxt, generation } = self;
161        *generation = *generation + 1;
162        outstanding.insert(start..end, *generation);
163
164        if let Some(advanced) = outstanding.pop_front_if(|r| r.range.start == *nxt) {
165            *nxt = advanced.range.end;
166            // The following unwrap is safe because it is invalid to have
167            // have a range where `end` is before `start`.
168            usize::try_from(advanced.range.end - advanced.range.start).unwrap()
169        } else {
170            0
171        }
172    }
173
174    pub(super) fn has_outstanding(&self) -> bool {
175        let Self { outstanding, nxt: _, generation: _ } = self;
176        !outstanding.is_empty()
177    }
178
179    /// Returns the current outstanding selective ack blocks in the assembler.
180    ///
181    /// The returned blocks are sorted according to [RFC 2018 section 4]:
182    ///
183    /// * The first SACK block (i.e., the one immediately following the kind and
184    ///   length fields in the option) MUST specify the contiguous block of data
185    ///   containing the segment which triggered this ACK. [...]
186    /// * The SACK option SHOULD be filled out by repeating the most recently
187    ///   reported SACK blocks [...]
188    ///
189    /// This is achieved by always returning the blocks that were most recently
190    /// changed by incoming segments.
191    ///
192    /// [RFC 2018 section 4]:
193    ///     https://datatracker.ietf.org/doc/html/rfc2018#section-4
194    pub(crate) fn sack_blocks(&self) -> SackBlocks {
195        let Self { nxt: _, generation: _, outstanding } = self;
196        // Fast exit, no outstanding blocks.
197        if outstanding.is_empty() {
198            return SackBlocks::default();
199        }
200
201        let mut heap = ArrayVec::<&SeqRange<_>, { SackBlocks::MAX_BLOCKS }>::new();
202        for block in outstanding.iter() {
203            if heap.is_full() {
204                if heap.last().is_some_and(|l| l.meta < block.meta) {
205                    // New block is later than the earliest block in the heap.
206                    let _: Option<_> = heap.pop();
207                } else {
208                    // New block is earlier than the earliest block in the heap,
209                    // pass.
210                    continue;
211                }
212            }
213
214            heap.push(block);
215            // Sort heap larger generation to lower.
216            heap.sort_by(|a, b| b.meta.cmp(&a.meta))
217        }
218
219        SackBlocks::from_iter(
220            heap.into_iter().map(|block| SackBlock(block.range.start, block.range.end)),
221        )
222    }
223}
224
225/// A conversion trait that converts the object that Bindings give us into a
226/// pair of receive and send buffers.
227pub trait IntoBuffers<R: ReceiveBuffer, S: SendBuffer> {
228    /// Converts to receive and send buffers.
229    fn into_buffers(self, buffer_sizes: BufferSizes) -> (R, S);
230}
231
232#[cfg(any(test, feature = "testutils"))]
233impl<R: Default + ReceiveBuffer, S: Default + SendBuffer> IntoBuffers<R, S> for () {
234    fn into_buffers(self, buffer_sizes: BufferSizes) -> (R, S) {
235        // Ignore buffer sizes since this is a test-only impl.
236        let BufferSizes { send: _, receive: _ } = buffer_sizes;
237        Default::default()
238    }
239}
240
241#[cfg(any(test, feature = "testutils"))]
242pub(crate) mod testutil {
243    use super::*;
244
245    use alloc::sync::Arc;
246    use alloc::vec;
247    use alloc::vec::Vec;
248    use core::cmp;
249
250    use either::Either;
251    use netstack3_base::sync::Mutex;
252    use netstack3_base::{FragmentedPayload, WindowSize};
253
254    use crate::internal::socket::accept_queue::ListenerNotifier;
255
256    /// A circular buffer implementation.
257    ///
258    /// A [`RingBuffer`] holds a logically contiguous ring of memory in three
259    /// regions:
260    ///
261    /// - *readable*: memory is available for reading and not for writing,
262    /// - *writable*: memory that is available for writing and not for reading,
263    /// - *reserved*: memory that was read from and is no longer available
264    ///   for reading or for writing.
265    ///
266    /// Zero or more of these regions can be empty, and a region of memory can
267    /// transition from one to another in a few different ways:
268    ///
269    /// *Readable* memory, once read, becomes writable.
270    ///
271    /// *Writable* memory, once marked as such, becomes readable.
272    #[cfg_attr(any(test, feature = "testutils"), derive(Clone, PartialEq, Eq))]
273    pub struct RingBuffer {
274        pub(super) storage: Vec<u8>,
275        /// The index where the reader starts to read.
276        ///
277        /// Maintains the invariant that `head < storage.len()` by wrapping
278        /// around to 0 as needed.
279        pub(super) head: usize,
280        /// The amount of readable data in `storage`.
281        ///
282        /// Anything between [head, head+len) is readable. This will never exceed
283        /// `storage.len()`.
284        pub(super) len: usize,
285    }
286
287    impl Debug for RingBuffer {
288        fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
289            let Self { storage, head, len } = self;
290            f.debug_struct("RingBuffer")
291                .field("storage (len, cap)", &(storage.len(), storage.capacity()))
292                .field("head", head)
293                .field("len", len)
294                .finish()
295        }
296    }
297
298    impl Default for RingBuffer {
299        fn default() -> Self {
300            Self::new(WindowSize::DEFAULT.into())
301        }
302    }
303
304    impl RingBuffer {
305        /// Creates a new `RingBuffer`.
306        pub fn new(capacity: usize) -> Self {
307            Self { storage: vec![0; capacity], head: 0, len: 0 }
308        }
309
310        /// Resets the buffer to be entirely unwritten.
311        pub fn reset(&mut self) {
312            let Self { storage: _, head, len } = self;
313            *head = 0;
314            *len = 0;
315        }
316
317        /// Calls `f` on the contiguous sequences from `start` up to `len` bytes.
318        fn with_readable<'a, F, R>(storage: &'a Vec<u8>, start: usize, len: usize, f: F) -> R
319        where
320            F: for<'b> FnOnce(&'b [&'a [u8]]) -> R,
321        {
322            // Don't read past the end of storage.
323            let end = start + len;
324            if end > storage.len() {
325                let first_part = &storage[start..storage.len()];
326                let second_part = &storage[0..len - first_part.len()];
327                f(&[first_part, second_part][..])
328            } else {
329                let all_bytes = &storage[start..end];
330                f(&[all_bytes][..])
331            }
332        }
333
334        /// Calls `f` with contiguous sequences of readable bytes in the buffer and
335        /// discards the amount of bytes returned by `f`.
336        ///
337        /// # Panics
338        ///
339        /// Panics if the closure wants to discard more bytes than possible, i.e.,
340        /// the value returned by `f` is greater than `self.len()`.
341        pub fn read_with<F>(&mut self, f: F) -> usize
342        where
343            F: for<'a, 'b> FnOnce(&'b [&'a [u8]]) -> usize,
344        {
345            let Self { storage, head, len } = self;
346            if storage.len() == 0 {
347                return f(&[&[]]);
348            }
349            let nread = RingBuffer::with_readable(storage, *head, *len, f);
350            assert!(nread <= *len);
351            *len -= nread;
352            *head = (*head + nread) % storage.len();
353            nread
354        }
355
356        /// Returns the writable regions of the [`RingBuffer`].
357        pub fn writable_regions(&mut self) -> impl IntoIterator<Item = &mut [u8]> {
358            let BufferLimits { capacity, len } = self.limits();
359            let available = capacity - len;
360            let Self { storage, head, len } = self;
361
362            let mut write_start = *head + *len;
363            if write_start >= storage.len() {
364                write_start -= storage.len()
365            }
366            let write_end = write_start + available;
367            if write_end <= storage.len() {
368                Either::Left([&mut self.storage[write_start..write_end]].into_iter())
369            } else {
370                let (b1, b2) = self.storage[..].split_at_mut(write_start);
371                let b2_len = b2.len();
372                Either::Right([b2, &mut b1[..(available - b2_len)]].into_iter())
373            }
374        }
375    }
376
377    impl Buffer for RingBuffer {
378        fn capacity_range() -> (usize, usize) {
379            // Arbitrarily chosen to satisfy tests so we have some semblance of
380            // clamping capacity in tests.
381            (16, 16 << 20)
382        }
383
384        fn limits(&self) -> BufferLimits {
385            let Self { storage, len, head: _ } = self;
386            let capacity = storage.len();
387            BufferLimits { len: *len, capacity }
388        }
389
390        fn target_capacity(&self) -> usize {
391            let Self { storage, len: _, head: _ } = self;
392            storage.len()
393        }
394
395        fn request_capacity(&mut self, size: usize) {
396            unimplemented!("capacity request for {size} not supported")
397        }
398    }
399
400    impl ReceiveBuffer for RingBuffer {
401        fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize {
402            let BufferLimits { capacity, len } = self.limits();
403            let available = capacity - len;
404            let Self { storage, head, len } = self;
405            if storage.len() == 0 {
406                return 0;
407            }
408
409            if offset > available {
410                return 0;
411            }
412            let start_at = (*head + *len + offset) % storage.len();
413            let to_write = cmp::min(data.len(), available);
414            // Write the first part of the payload.
415            let first_len = cmp::min(to_write, storage.len() - start_at);
416            data.partial_copy(0, &mut storage[start_at..start_at + first_len]);
417            // If we have more to write, wrap around and start from the beginning
418            // of the storage.
419            if to_write > first_len {
420                data.partial_copy(first_len, &mut storage[0..to_write - first_len]);
421            }
422            to_write
423        }
424
425        fn make_readable(&mut self, count: usize, _has_outstanding: bool) {
426            let BufferLimits { capacity, len } = self.limits();
427            debug_assert!(count <= capacity - len);
428            self.len += count;
429        }
430    }
431
432    impl SendBuffer for RingBuffer {
433        type Payload<'a> = FragmentedPayload<'a, 2>;
434
435        fn mark_read(&mut self, count: usize) {
436            let Self { storage, head, len } = self;
437            assert!(count <= *len);
438            *len -= count;
439            *head = (*head + count) % storage.len();
440        }
441
442        fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
443        where
444            F: FnOnce(Self::Payload<'a>) -> R,
445        {
446            let Self { storage, head, len } = self;
447            if storage.len() == 0 {
448                return f(FragmentedPayload::new_empty());
449            }
450            assert!(offset <= *len);
451            RingBuffer::with_readable(
452                storage,
453                (*head + offset) % storage.len(),
454                *len - offset,
455                |readable| f(readable.into_iter().map(|x| *x).collect()),
456            )
457        }
458    }
459
460    impl RingBuffer {
461        /// Enqueues as much of `data` as possible to the end of the buffer.
462        ///
463        /// Returns the number of bytes actually queued.
464        pub(crate) fn enqueue_data(&mut self, data: &[u8]) -> usize {
465            let nwritten = self.write_at(0, &data);
466            self.make_readable(nwritten, false);
467            nwritten
468        }
469    }
470
471    impl Buffer for Arc<Mutex<RingBuffer>> {
472        fn capacity_range() -> (usize, usize) {
473            RingBuffer::capacity_range()
474        }
475
476        fn limits(&self) -> BufferLimits {
477            self.lock().limits()
478        }
479
480        fn target_capacity(&self) -> usize {
481            self.lock().target_capacity()
482        }
483
484        fn request_capacity(&mut self, size: usize) {
485            self.lock().request_capacity(size)
486        }
487    }
488
489    impl ReceiveBuffer for Arc<Mutex<RingBuffer>> {
490        fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize {
491            self.lock().write_at(offset, data)
492        }
493
494        fn make_readable(&mut self, count: usize, has_outstanding: bool) {
495            self.lock().make_readable(count, has_outstanding)
496        }
497    }
498
499    /// An implementation of [`SendBuffer`] for tests.
500    #[derive(Debug, Default)]
501    pub struct TestSendBuffer {
502        fake_stream: Arc<Mutex<Vec<u8>>>,
503        ring: RingBuffer,
504    }
505
506    impl TestSendBuffer {
507        /// Creates a new `TestSendBuffer` with a backing shared vec and a
508        /// helper ring buffer.
509        pub fn new(fake_stream: Arc<Mutex<Vec<u8>>>, ring: RingBuffer) -> TestSendBuffer {
510            Self { fake_stream, ring }
511        }
512    }
513
514    impl Buffer for TestSendBuffer {
515        fn capacity_range() -> (usize, usize) {
516            let (min, max) = RingBuffer::capacity_range();
517            (min * 2, max * 2)
518        }
519
520        fn limits(&self) -> BufferLimits {
521            let Self { fake_stream, ring } = self;
522            let BufferLimits { capacity: ring_capacity, len: ring_len } = ring.limits();
523            let guard = fake_stream.lock();
524            let len = ring_len + guard.len();
525            let capacity = ring_capacity + guard.capacity();
526            BufferLimits { len, capacity }
527        }
528
529        fn target_capacity(&self) -> usize {
530            let Self { fake_stream: _, ring } = self;
531            ring.target_capacity()
532        }
533
534        fn request_capacity(&mut self, size: usize) {
535            let Self { fake_stream: _, ring } = self;
536            ring.request_capacity(size)
537        }
538    }
539
540    impl SendBuffer for TestSendBuffer {
541        type Payload<'a> = FragmentedPayload<'a, 2>;
542
543        fn mark_read(&mut self, count: usize) {
544            let Self { fake_stream: _, ring } = self;
545            ring.mark_read(count)
546        }
547
548        fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
549        where
550            F: FnOnce(Self::Payload<'a>) -> R,
551        {
552            let Self { fake_stream, ring } = self;
553            let mut guard = fake_stream.lock();
554            if !guard.is_empty() {
555                // Pull from the fake stream into the ring if there is capacity.
556                let BufferLimits { capacity, len } = ring.limits();
557                let len = (capacity - len).min(guard.len());
558                let rest = guard.split_off(len);
559                let first = core::mem::replace(&mut *guard, rest);
560                assert_eq!(ring.enqueue_data(&first[..]), len);
561            }
562            ring.peek_with(offset, f)
563        }
564    }
565
566    fn arc_mutex_eq<T: PartialEq>(a: &Arc<Mutex<T>>, b: &Arc<Mutex<T>>) -> bool {
567        if Arc::ptr_eq(a, b) {
568            return true;
569        }
570        (&*a.lock()) == (&*b.lock())
571    }
572
573    /// A fake implementation of client-side TCP buffers.
574    #[derive(Clone, Debug, Default)]
575    pub struct ClientBuffers {
576        /// Receive buffer shared with core TCP implementation.
577        pub receive: Arc<Mutex<RingBuffer>>,
578        /// Send buffer shared with core TCP implementation.
579        pub send: Arc<Mutex<Vec<u8>>>,
580    }
581
582    impl PartialEq for ClientBuffers {
583        fn eq(&self, ClientBuffers { receive: other_receive, send: other_send }: &Self) -> bool {
584            let Self { receive, send } = self;
585            arc_mutex_eq(receive, other_receive) && arc_mutex_eq(send, other_send)
586        }
587    }
588
589    impl Eq for ClientBuffers {}
590
591    impl ClientBuffers {
592        /// Creates new a `ClientBuffers` with `buffer_sizes`.
593        pub fn new(buffer_sizes: BufferSizes) -> Self {
594            let BufferSizes { send, receive } = buffer_sizes;
595            Self {
596                receive: Arc::new(Mutex::new(RingBuffer::new(receive))),
597                send: Arc::new(Mutex::new(Vec::with_capacity(send))),
598            }
599        }
600    }
601
602    /// A fake implementation of bindings buffers for TCP.
603    #[derive(Debug, Clone, Eq, PartialEq)]
604    #[allow(missing_docs)]
605    pub enum ProvidedBuffers {
606        Buffers(WriteBackClientBuffers),
607        NoBuffers,
608    }
609
610    impl Default for ProvidedBuffers {
611        fn default() -> Self {
612            Self::NoBuffers
613        }
614    }
615
616    impl From<WriteBackClientBuffers> for ProvidedBuffers {
617        fn from(buffers: WriteBackClientBuffers) -> Self {
618            ProvidedBuffers::Buffers(buffers)
619        }
620    }
621
622    impl From<ProvidedBuffers> for WriteBackClientBuffers {
623        fn from(extra: ProvidedBuffers) -> Self {
624            match extra {
625                ProvidedBuffers::Buffers(buffers) => buffers,
626                ProvidedBuffers::NoBuffers => Default::default(),
627            }
628        }
629    }
630
631    impl From<ProvidedBuffers> for () {
632        fn from(_: ProvidedBuffers) -> Self {
633            ()
634        }
635    }
636
637    impl From<()> for ProvidedBuffers {
638        fn from(_: ()) -> Self {
639            Default::default()
640        }
641    }
642
643    /// The variant of [`ProvidedBuffers`] that provides observing the data
644    /// sent/received to TCP sockets.
645    #[derive(Debug, Default, Clone)]
646    pub struct WriteBackClientBuffers(pub Arc<Mutex<Option<ClientBuffers>>>);
647
648    impl PartialEq for WriteBackClientBuffers {
649        fn eq(&self, Self(other): &Self) -> bool {
650            let Self(this) = self;
651            arc_mutex_eq(this, other)
652        }
653    }
654
655    impl Eq for WriteBackClientBuffers {}
656
657    impl IntoBuffers<Arc<Mutex<RingBuffer>>, TestSendBuffer> for ProvidedBuffers {
658        fn into_buffers(
659            self,
660            buffer_sizes: BufferSizes,
661        ) -> (Arc<Mutex<RingBuffer>>, TestSendBuffer) {
662            let buffers = ClientBuffers::new(buffer_sizes);
663            if let ProvidedBuffers::Buffers(b) = self {
664                *b.0.as_ref().lock() = Some(buffers.clone());
665            }
666            let ClientBuffers { receive, send } = buffers;
667            (receive, TestSendBuffer::new(send, Default::default()))
668        }
669    }
670
671    impl ListenerNotifier for ProvidedBuffers {
672        fn new_incoming_connections(&mut self, _: usize) {}
673    }
674}
675
676#[cfg(test)]
677mod test {
678    use alloc::vec::Vec;
679    use alloc::{format, vec};
680
681    use netstack3_base::FragmentedPayload;
682    use proptest::strategy::{Just, Strategy};
683    use proptest::test_runner::Config;
684    use proptest::{prop_assert, prop_assert_eq, proptest};
685    use proptest_support::failed_seeds_no_std;
686    use test_case::test_case;
687    use testutil::RingBuffer;
688
689    use super::*;
690    proptest! {
691        #![proptest_config(Config {
692            // Add all failed seeds here.
693            failure_persistence: failed_seeds_no_std!(
694                "cc f621ca7d3a2b108e0dc41f7169ad028f4329b79e90e73d5f68042519a9f63999",
695                "cc c449aebed201b4ec4f137f3c224f20325f4cfee0b7fd596d9285176b6d811aa9"
696            ),
697            ..Config::default()
698        })]
699
700        #[test]
701        fn ring_buffer_make_readable((mut rb, avail) in ring_buffer::with_written()) {
702            let old_storage = rb.storage.clone();
703            let old_head = rb.head;
704            let old_len = rb.limits().len;
705            rb.make_readable(avail, false);
706            // Assert that length is updated but everything else is unchanged.
707            let RingBuffer { storage, head, len } = rb;
708            prop_assert_eq!(len, old_len + avail);
709            prop_assert_eq!(head, old_head);
710            prop_assert_eq!(storage, old_storage);
711        }
712
713        #[test]
714        fn ring_buffer_write_at((mut rb, offset, data) in ring_buffer::with_offset_data()) {
715            let old_head = rb.head;
716            let old_len = rb.limits().len;
717            prop_assert_eq!(rb.write_at(offset, &&data[..]), data.len());
718            prop_assert_eq!(rb.head, old_head);
719            prop_assert_eq!(rb.limits().len, old_len);
720            for i in 0..data.len() {
721                let masked = (rb.head + rb.len + offset + i) % rb.storage.len();
722                // Make sure that data are written.
723                prop_assert_eq!(rb.storage[masked], data[i]);
724                rb.storage[masked] = 0;
725            }
726            // And the other parts of the storage are untouched.
727            prop_assert_eq!(&rb.storage, &vec![0; rb.storage.len()]);
728        }
729
730        #[test]
731        fn ring_buffer_read_with((mut rb, expected, consume) in ring_buffer::with_read_data()) {
732            prop_assert_eq!(rb.limits().len, expected.len());
733            let nread = rb.read_with(|readable| {
734                assert!(readable.len() == 1 || readable.len() == 2);
735                let got = readable.concat();
736                assert_eq!(got, expected);
737                consume
738            });
739            prop_assert_eq!(nread, consume);
740            prop_assert_eq!(rb.limits().len, expected.len() - consume);
741        }
742
743        #[test]
744        fn ring_buffer_mark_read((mut rb, readable) in ring_buffer::with_readable()) {
745            const BYTE_TO_WRITE: u8 = 0x42;
746            let written = rb.writable_regions().into_iter().fold(0, |acc, slice| {
747                slice.fill(BYTE_TO_WRITE);
748                acc + slice.len()
749            });
750            let old_storage = rb.storage.clone();
751            let old_head = rb.head;
752            let old_len = rb.limits().len;
753
754            rb.mark_read(readable);
755            let new_writable = rb.writable_regions().into_iter().fold(Vec::new(), |mut acc, slice| {
756                acc.extend_from_slice(slice);
757                acc
758            });
759            for (i, x) in new_writable.iter().enumerate().take(written) {
760                prop_assert_eq!(*x, BYTE_TO_WRITE, "i={}, rb={:?}", i, rb);
761            }
762            prop_assert!(new_writable.len() >= written);
763
764            let RingBuffer { storage, head, len } = rb;
765            prop_assert_eq!(len, old_len - readable);
766            prop_assert_eq!(head, (old_head + readable) % old_storage.len());
767            prop_assert_eq!(storage, old_storage);
768        }
769
770        #[test]
771        fn ring_buffer_peek_with((mut rb, expected, offset) in ring_buffer::with_read_data()) {
772            prop_assert_eq!(rb.limits().len, expected.len());
773            rb.peek_with(offset, |readable| {
774                prop_assert_eq!(readable.to_vec(), &expected[offset..]);
775                Ok(())
776            })?;
777            prop_assert_eq!(rb.limits().len, expected.len());
778        }
779
780        #[test]
781        fn ring_buffer_writable_regions(mut rb in ring_buffer::arb_ring_buffer()) {
782            const BYTE_TO_WRITE: u8 = 0x42;
783            let writable_len = rb.writable_regions().into_iter().fold(0, |acc, slice| {
784                slice.fill(BYTE_TO_WRITE);
785                acc + slice.len()
786            });
787            let BufferLimits {len, capacity} = rb.limits();
788            prop_assert_eq!(writable_len + len, capacity);
789            for i in 0..capacity {
790                let expected = if i < len {
791                    0
792                } else {
793                    BYTE_TO_WRITE
794                };
795                let idx = (rb.head + i) % rb.storage.len();
796                prop_assert_eq!(rb.storage[idx], expected);
797            }
798        }
799    }
800
801    #[test_case([Range { start: 0, end: 0 }]
802        => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(0), generation: 0 })]
803    #[test_case([Range { start: 0, end: 10 }]
804        => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(10), generation: 1 })]
805    #[test_case([Range{ start: 10, end: 15 }, Range { start: 5, end: 10 }]
806        => Assembler {
807            outstanding: [
808                SeqRange{ range: Range { start: SeqNum::new(5), end: SeqNum::new(15) }, meta: 2 },
809            ].into_iter().collect(),
810            nxt: SeqNum::new(0),
811            generation: 2,
812        })
813    ]
814    #[test_case([Range{ start: 10, end: 15 }, Range { start: 0, end: 5 }, Range { start: 5, end: 10 }]
815        => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(15), generation: 3 })]
816    #[test_case([Range{ start: 10, end: 15 }, Range { start: 5, end: 10 }, Range { start: 0, end: 5 }]
817        => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(15), generation: 3 })]
818    #[test_case([Range{ start: 10, end: 15 }, Range { start: 10, end: 15 }, Range { start: 11, end: 12 }]
819        => Assembler {
820             outstanding: [
821                SeqRange{ range: Range { start: SeqNum::new(10), end: SeqNum::new(15) }, meta: 3 }
822            ].into_iter().collect(),
823            nxt: SeqNum::new(0), generation: 3 })]
824    fn assembler_examples(ops: impl IntoIterator<Item = Range<u32>>) -> Assembler {
825        let mut assembler = Assembler::new(SeqNum::new(0));
826        for Range { start, end } in ops.into_iter() {
827            let _advanced = assembler.insert(SeqNum::new(start)..SeqNum::new(end));
828        }
829        assembler
830    }
831
832    #[test_case(&[] => Vec::<Range<u32>>::new(); "empty")]
833    #[test_case(&[1..2] => vec![1..2]; "single")]
834    #[test_case(&[1..2, 3..4] => vec![3..4, 1..2]; "latest first")]
835    #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10]
836        => vec![9..10, 7..8, 5..6, 3..4]; "max len")]
837    #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10, 6..7]
838        => vec![5..8, 9..10, 3..4, 1..2]; "gap fill")]
839    #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10, 1..8]
840        => vec![1..8, 9..10]; "large gap fill")]
841    fn assembler_sack_blocks(ops: &[Range<u32>]) -> Vec<Range<u32>> {
842        let mut assembler = Assembler::new(SeqNum::new(0));
843        for Range { start, end } in ops {
844            let _: usize = assembler.insert(SeqNum::new(*start)..SeqNum::new(*end));
845        }
846        assembler
847            .sack_blocks()
848            .iter()
849            .map(|SackBlock(start, end)| Range { start: start.into(), end: end.into() })
850            .collect()
851    }
852
853    #[test]
854    // Regression test for https://fxbug.dev/42061342.
855    fn ring_buffer_wrap_around() {
856        const CAPACITY: usize = 16;
857        let mut rb = RingBuffer::new(CAPACITY);
858
859        // Write more than half the buffer.
860        const BUF_SIZE: usize = 10;
861        assert_eq!(rb.enqueue_data(&[0xAA; BUF_SIZE]), BUF_SIZE);
862        rb.peek_with(0, |payload| {
863            assert_eq!(payload, FragmentedPayload::new_contiguous(&[0xAA; BUF_SIZE]))
864        });
865        rb.mark_read(BUF_SIZE);
866
867        // Write around the end of the buffer.
868        assert_eq!(rb.enqueue_data(&[0xBB; BUF_SIZE]), BUF_SIZE);
869        rb.peek_with(0, |payload| {
870            assert_eq!(
871                payload,
872                FragmentedPayload::new([
873                    &[0xBB; (CAPACITY - BUF_SIZE)],
874                    &[0xBB; (BUF_SIZE * 2 - CAPACITY)]
875                ])
876            )
877        });
878        // Mark everything read, which should advance `head` around to the
879        // beginning of the buffer.
880        rb.mark_read(BUF_SIZE);
881
882        // Now make a contiguous sequence of bytes readable.
883        assert_eq!(rb.enqueue_data(&[0xCC; BUF_SIZE]), BUF_SIZE);
884        rb.peek_with(0, |payload| {
885            assert_eq!(payload, FragmentedPayload::new_contiguous(&[0xCC; BUF_SIZE]))
886        });
887
888        // Check that the unwritten bytes are left untouched. If `head` was
889        // advanced improperly, this will crash.
890        let read = rb.read_with(|segments| {
891            assert_eq!(segments, [[0xCC; BUF_SIZE]]);
892            BUF_SIZE
893        });
894        assert_eq!(read, BUF_SIZE);
895    }
896
897    #[test]
898    fn ring_buffer_example() {
899        let mut rb = RingBuffer::new(16);
900        assert_eq!(rb.write_at(5, &"World".as_bytes()), 5);
901        assert_eq!(rb.write_at(0, &"Hello".as_bytes()), 5);
902        rb.make_readable(10, false);
903        assert_eq!(
904            rb.read_with(|readable| {
905                assert_eq!(readable, &["HelloWorld".as_bytes()]);
906                5
907            }),
908            5
909        );
910        assert_eq!(
911            rb.read_with(|readable| {
912                assert_eq!(readable, &["World".as_bytes()]);
913                readable[0].len()
914            }),
915            5
916        );
917        assert_eq!(rb.write_at(0, &"HelloWorld".as_bytes()), 10);
918        rb.make_readable(10, false);
919        assert_eq!(
920            rb.read_with(|readable| {
921                assert_eq!(readable, &["HelloW".as_bytes(), "orld".as_bytes()]);
922                6
923            }),
924            6
925        );
926        assert_eq!(rb.limits().len, 4);
927        assert_eq!(
928            rb.read_with(|readable| {
929                assert_eq!(readable, &["orld".as_bytes()]);
930                4
931            }),
932            4
933        );
934        assert_eq!(rb.limits().len, 0);
935
936        assert_eq!(rb.enqueue_data("Hello".as_bytes()), 5);
937        assert_eq!(rb.limits().len, 5);
938
939        let () = rb.peek_with(3, |readable| {
940            assert_eq!(readable.to_vec(), "lo".as_bytes());
941        });
942
943        rb.mark_read(2);
944
945        let () = rb.peek_with(0, |readable| {
946            assert_eq!(readable.to_vec(), "llo".as_bytes());
947        });
948    }
949
950    mod ring_buffer {
951        use super::*;
952        // Use a small capacity so that we have a higher chance to exercise
953        // wrapping around logic.
954        const MAX_CAP: usize = 32;
955
956        fn arb_ring_buffer_args() -> impl Strategy<Value = (usize, usize, usize)> {
957            // Use a small capacity so that we have a higher chance to exercise
958            // wrapping around logic.
959            (1..=MAX_CAP).prop_flat_map(|cap| {
960                let max_len = cap;
961                //  cap      head     len
962                (Just(cap), 0..cap, 0..=max_len)
963            })
964        }
965
966        pub(super) fn arb_ring_buffer() -> impl Strategy<Value = RingBuffer> {
967            arb_ring_buffer_args().prop_map(|(cap, head, len)| RingBuffer {
968                storage: vec![0; cap],
969                head,
970                len,
971            })
972        }
973
974        /// A strategy for a [`RingBuffer`] and a valid length to mark read.
975        pub(super) fn with_readable() -> impl Strategy<Value = (RingBuffer, usize)> {
976            arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
977                (Just(RingBuffer { storage: vec![0; cap], head, len }), 0..=len)
978            })
979        }
980
981        /// A strategy for a [`RingBuffer`] and a valid length to make readable.
982        pub(super) fn with_written() -> impl Strategy<Value = (RingBuffer, usize)> {
983            arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
984                let rb = RingBuffer { storage: vec![0; cap], head, len };
985                let max_written = cap - len;
986                (Just(rb), 0..=max_written)
987            })
988        }
989
990        /// A strategy for a [`RingBuffer`], a valid offset and data to write.
991        pub(super) fn with_offset_data() -> impl Strategy<Value = (RingBuffer, usize, Vec<u8>)> {
992            arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
993                let writable_len = cap - len;
994                (0..=writable_len).prop_flat_map(move |offset| {
995                    (0..=writable_len - offset).prop_flat_map(move |data_len| {
996                        (
997                            Just(RingBuffer { storage: vec![0; cap], head, len }),
998                            Just(offset),
999                            proptest::collection::vec(1..=u8::MAX, data_len),
1000                        )
1001                    })
1002                })
1003            })
1004        }
1005
1006        /// A strategy for a [`RingBuffer`], its readable data, and how many
1007        /// bytes to consume.
1008        pub(super) fn with_read_data() -> impl Strategy<Value = (RingBuffer, Vec<u8>, usize)> {
1009            arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
1010                proptest::collection::vec(1..=u8::MAX, len).prop_flat_map(move |data| {
1011                    // Fill the RingBuffer with the data.
1012                    let mut rb = RingBuffer { storage: vec![0; cap], head, len: 0 };
1013                    assert_eq!(rb.write_at(0, &&data[..]), len);
1014                    rb.make_readable(len, false);
1015                    (Just(rb), Just(data), 0..=len)
1016                })
1017            })
1018        }
1019    }
1020}