1use core::convert::{Infallible, TryFrom as _};
11use core::fmt::Debug;
12use core::num::{NonZeroU32, NonZeroU8, NonZeroUsize, TryFromIntError};
13use core::ops::{Deref, DerefMut};
14use core::time::Duration;
15
16use assert_matches::assert_matches;
17use derivative::Derivative;
18use explicit::ResultExt as _;
19use netstack3_base::{
20 Control, HandshakeOptions, IcmpErrorCode, Instant, Mss, Options, Payload, PayloadLen as _,
21 SackBlocks, Segment, SegmentHeader, SegmentOptions, SeqNum, UnscaledWindowSize, WindowScale,
22 WindowSize,
23};
24use netstack3_trace::{trace_instant, TraceResourceId};
25use packet_formats::utils::NonZeroDuration;
26use replace_with::{replace_with, replace_with_and};
27
28use crate::internal::base::{
29 BufferSizes, BuffersRefMut, ConnectionError, IcmpErrorResult, KeepAlive, SocketOptions,
30};
31use crate::internal::buffer::{Assembler, BufferLimits, IntoBuffers, ReceiveBuffer, SendBuffer};
32use crate::internal::congestion::{
33 CongestionControl, CongestionControlSendOutcome, LossRecoveryMode, LossRecoverySegment,
34};
35use crate::internal::counters::TcpCountersRefs;
36use crate::internal::rtt::{Estimator, Rto, RttSampler};
37
38pub(super) const MSL: Duration = Duration::from_secs(2 * 60);
43
44const DEFAULT_MAX_RETRIES: NonZeroU8 = NonZeroU8::new(15).unwrap();
51
52pub(super) const DEFAULT_MAX_SYN_RETRIES: NonZeroU8 = NonZeroU8::new(6).unwrap();
55const DEFAULT_MAX_SYNACK_RETRIES: NonZeroU8 = NonZeroU8::new(5).unwrap();
56
57const ACK_DELAY_THRESHOLD: Duration = Duration::from_millis(40);
65const SWS_PROBE_TIMEOUT: Duration = Duration::from_millis(100);
72const SWS_BUFFER_FACTOR: u32 = 2;
76
77const SACK_PERMITTED: bool = true;
79
80pub(crate) trait StateMachineDebugId: Debug {
85 fn trace_id(&self) -> TraceResourceId<'_>;
86}
87
88#[derive(Debug)]
101#[cfg_attr(test, derive(PartialEq, Eq))]
102pub struct Closed<Error> {
103 pub(crate) reason: Error,
105}
106
107pub(crate) enum Initial {}
110
111impl Closed<Initial> {
112 pub(crate) fn connect<I: Instant, ActiveOpen>(
118 iss: SeqNum,
119 now: I,
120 active_open: ActiveOpen,
121 buffer_sizes: BufferSizes,
122 device_mss: Mss,
123 default_mss: Mss,
124 SocketOptions {
125 keep_alive: _,
126 nagle_enabled: _,
127 user_timeout,
128 delayed_ack: _,
129 fin_wait2_timeout: _,
130 max_syn_retries,
131 ip_options: _,
132 }: &SocketOptions,
133 ) -> (SynSent<I, ActiveOpen>, Segment<()>) {
134 let rcv_wnd_scale = buffer_sizes.rwnd().scale();
135 let rwnd = buffer_sizes.rwnd_unscaled();
139 (
140 SynSent {
141 iss,
142 timestamp: Some(now),
143 retrans_timer: RetransTimer::new(
144 now,
145 Rto::DEFAULT,
146 *user_timeout,
147 *max_syn_retries,
148 ),
149 active_open,
150 buffer_sizes,
151 device_mss,
152 default_mss,
153 rcv_wnd_scale,
154 },
155 Segment::syn(
156 iss,
157 rwnd,
158 HandshakeOptions {
159 mss: Some(device_mss),
160 window_scale: Some(rcv_wnd_scale),
161 sack_permitted: SACK_PERMITTED,
162 }
163 .into(),
164 ),
165 )
166 }
167
168 pub(crate) fn listen(
169 iss: SeqNum,
170 buffer_sizes: BufferSizes,
171 device_mss: Mss,
172 default_mss: Mss,
173 user_timeout: Option<NonZeroDuration>,
174 ) -> Listen {
175 Listen { iss, buffer_sizes, device_mss, default_mss, user_timeout }
176 }
177}
178
179impl<Error> Closed<Error> {
180 pub(crate) fn on_segment(&self, segment: &Segment<impl Payload>) -> Option<Segment<()>> {
184 let segment_len = segment.len();
185 let SegmentHeader { seq: seg_seq, ack: seg_ack, wnd: _, control, options: _, push: _ } =
186 segment.header();
187
188 if *control == Some(Control::RST) {
202 return None;
203 }
204 Some(match seg_ack {
205 Some(seg_ack) => Segment::rst(*seg_ack),
206 None => Segment::rst_ack(SeqNum::from(0), *seg_seq + segment_len),
207 })
208 }
209}
210
211#[derive(Debug)]
225#[cfg_attr(test, derive(PartialEq, Eq))]
226pub struct Listen {
227 iss: SeqNum,
228 buffer_sizes: BufferSizes,
229 device_mss: Mss,
230 default_mss: Mss,
231 user_timeout: Option<NonZeroDuration>,
232}
233
234#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
236enum ListenOnSegmentDisposition<I: Instant> {
237 SendSynAckAndEnterSynRcvd(Segment<()>, SynRcvd<I, Infallible>),
238 SendRst(Segment<()>),
239 Ignore,
240}
241
242impl Listen {
243 fn on_segment<I: Instant>(
244 &self,
245 seg: Segment<impl Payload>,
246 now: I,
247 ) -> ListenOnSegmentDisposition<I> {
248 let (header, _data) = seg.into_parts();
249 let SegmentHeader { seq, ack, wnd: _, control, options, push: _ } = header;
250 let Listen { iss, buffer_sizes, device_mss, default_mss, user_timeout } = *self;
251 let smss = options.mss().unwrap_or(default_mss).min(device_mss);
252 if control == Some(Control::RST) {
256 return ListenOnSegmentDisposition::Ignore;
257 }
258 if let Some(ack) = ack {
259 return ListenOnSegmentDisposition::SendRst(Segment::rst(ack));
268 }
269 if control == Some(Control::SYN) {
270 let sack_permitted = options.sack_permitted();
271 let rcv_wnd_scale = buffer_sizes.rwnd().scale();
285 let rwnd = buffer_sizes.rwnd_unscaled();
289 return ListenOnSegmentDisposition::SendSynAckAndEnterSynRcvd(
290 Segment::syn_ack(
291 iss,
292 seq + 1,
293 rwnd,
294 HandshakeOptions {
295 mss: Some(smss),
296 window_scale: options.window_scale().map(|_| rcv_wnd_scale),
301 sack_permitted: SACK_PERMITTED,
302 }
303 .into(),
304 ),
305 SynRcvd {
306 iss,
307 irs: seq,
308 timestamp: Some(now),
309 retrans_timer: RetransTimer::new(
310 now,
311 Rto::DEFAULT,
312 user_timeout,
313 DEFAULT_MAX_SYNACK_RETRIES,
314 ),
315 simultaneous_open: None,
316 buffer_sizes,
317 smss,
318 rcv_wnd_scale,
319 snd_wnd_scale: options.window_scale(),
320 sack_permitted,
321 },
322 );
323 }
324 ListenOnSegmentDisposition::Ignore
325 }
326}
327
328#[derive(Debug)]
342#[cfg_attr(test, derive(PartialEq, Eq))]
343pub struct SynSent<I, ActiveOpen> {
344 iss: SeqNum,
345 timestamp: Option<I>,
349 retrans_timer: RetransTimer<I>,
350 active_open: ActiveOpen,
351 buffer_sizes: BufferSizes,
352 device_mss: Mss,
353 default_mss: Mss,
354 rcv_wnd_scale: WindowScale,
355}
356
357#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
359enum SynSentOnSegmentDisposition<I: Instant, ActiveOpen> {
360 SendAckAndEnterEstablished(Established<I, (), ()>),
361 SendSynAckAndEnterSynRcvd(Segment<()>, SynRcvd<I, ActiveOpen>),
362 SendRst(Segment<()>),
363 EnterClosed(Closed<Option<ConnectionError>>),
364 Ignore,
365}
366
367impl<I: Instant + 'static, ActiveOpen> SynSent<I, ActiveOpen> {
368 fn on_segment(
374 &self,
375 seg: Segment<impl Payload>,
376 now: I,
377 ) -> SynSentOnSegmentDisposition<I, ActiveOpen> {
378 let (header, _data) = seg.into_parts();
379 let SegmentHeader { seq: seg_seq, ack: seg_ack, wnd: seg_wnd, control, options, push: _ } =
380 header;
381 let SynSent {
382 iss,
383 timestamp: syn_sent_ts,
384 retrans_timer: RetransTimer { user_timeout_until, remaining_retries: _, at: _, rto: _ },
385 active_open: _,
386 buffer_sizes,
387 device_mss,
388 default_mss,
389 rcv_wnd_scale,
390 } = *self;
391 let has_ack = match seg_ack {
400 Some(ack) => {
401 if ack.before(iss) || ack.after(iss + 1) {
404 return if control == Some(Control::RST) {
405 SynSentOnSegmentDisposition::Ignore
406 } else {
407 SynSentOnSegmentDisposition::SendRst(Segment::rst(ack))
408 };
409 }
410 true
411 }
412 None => false,
413 };
414
415 match control {
416 Some(Control::RST) => {
417 if has_ack {
425 SynSentOnSegmentDisposition::EnterClosed(Closed {
426 reason: Some(ConnectionError::ConnectionRefused),
427 })
428 } else {
429 SynSentOnSegmentDisposition::Ignore
430 }
431 }
432 Some(Control::SYN) => {
433 let smss = options.mss().unwrap_or(default_mss).min(device_mss);
434 let sack_permitted = options.sack_permitted();
435 match seg_ack {
440 Some(seg_ack) => {
441 if seg_ack.after(iss) {
459 let irs = seg_seq;
460 let mut rtt_estimator = Estimator::default();
461 if let Some(syn_sent_ts) = syn_sent_ts {
462 rtt_estimator.sample(now.saturating_duration_since(syn_sent_ts));
463 }
464 let (rcv_wnd_scale, snd_wnd_scale) = options
465 .window_scale()
466 .map(|snd_wnd_scale| (rcv_wnd_scale, snd_wnd_scale))
467 .unwrap_or_default();
468 let next = iss + 1;
469 let established = Established {
470 snd: Send {
471 nxt: next,
472 max: next,
473 una: seg_ack,
474 wnd: seg_wnd << WindowScale::default(),
476 wl1: seg_seq,
477 wl2: seg_ack,
478 last_push: next,
479 buffer: (),
480 rtt_sampler: RttSampler::default(),
481 rtt_estimator,
482 timer: None,
483 congestion_control: CongestionControl::cubic_with_mss(smss),
484 wnd_scale: snd_wnd_scale,
485 wnd_max: seg_wnd << WindowScale::default(),
486 }
487 .into(),
488 rcv: Recv {
489 buffer: RecvBufferState::Open {
490 buffer: (),
491 assembler: Assembler::new(irs + 1),
492 },
493 remaining_quickacks: quickack_counter(
494 buffer_sizes.rcv_limits(),
495 smss,
496 ),
497 last_segment_at: None,
498 timer: None,
499 mss: smss,
500 wnd_scale: rcv_wnd_scale,
501 last_window_update: (irs + 1, buffer_sizes.rwnd()),
502 sack_permitted,
503 }
504 .into(),
505 };
506 SynSentOnSegmentDisposition::SendAckAndEnterEstablished(established)
507 } else {
508 SynSentOnSegmentDisposition::Ignore
509 }
510 }
511 None => {
512 if user_timeout_until.is_none_or(|t| now < t) {
513 let rcv_wnd_scale = buffer_sizes.rwnd().scale();
521 let rwnd = buffer_sizes.rwnd_unscaled();
526 SynSentOnSegmentDisposition::SendSynAckAndEnterSynRcvd(
527 Segment::syn_ack(
528 iss,
529 seg_seq + 1,
530 rwnd,
531 HandshakeOptions {
532 mss: Some(smss),
533 window_scale: options.window_scale().map(|_| rcv_wnd_scale),
534 sack_permitted: SACK_PERMITTED,
535 }
536 .into(),
537 ),
538 SynRcvd {
539 iss,
540 irs: seg_seq,
541 timestamp: Some(now),
542 retrans_timer: RetransTimer::new_with_user_deadline(
543 now,
544 Rto::DEFAULT,
545 user_timeout_until,
546 DEFAULT_MAX_SYNACK_RETRIES,
547 ),
548 simultaneous_open: None,
550 buffer_sizes,
551 smss,
552 rcv_wnd_scale,
553 snd_wnd_scale: options.window_scale(),
554 sack_permitted,
555 },
556 )
557 } else {
558 SynSentOnSegmentDisposition::EnterClosed(Closed { reason: None })
559 }
560 }
561 }
562 }
563 Some(Control::FIN) | None => SynSentOnSegmentDisposition::Ignore,
567 }
568 }
569}
570
571#[derive(Debug)]
586#[cfg_attr(test, derive(PartialEq, Eq))]
587pub struct SynRcvd<I, ActiveOpen> {
588 iss: SeqNum,
589 irs: SeqNum,
590 timestamp: Option<I>,
594 retrans_timer: RetransTimer<I>,
595 simultaneous_open: Option<ActiveOpen>,
599 buffer_sizes: BufferSizes,
600 smss: Mss,
604 rcv_wnd_scale: WindowScale,
605 snd_wnd_scale: Option<WindowScale>,
606 sack_permitted: bool,
607}
608
609impl<I: Instant, R: ReceiveBuffer, S: SendBuffer, ActiveOpen> From<SynRcvd<I, Infallible>>
610 for State<I, R, S, ActiveOpen>
611{
612 fn from(
613 SynRcvd {
614 iss,
615 irs,
616 timestamp,
617 retrans_timer,
618 simultaneous_open,
619 buffer_sizes,
620 smss,
621 rcv_wnd_scale,
622 snd_wnd_scale,
623 sack_permitted,
624 }: SynRcvd<I, Infallible>,
625 ) -> Self {
626 match simultaneous_open {
627 None => State::SynRcvd(SynRcvd {
628 iss,
629 irs,
630 timestamp,
631 retrans_timer,
632 simultaneous_open: None,
633 buffer_sizes,
634 smss,
635 rcv_wnd_scale,
636 snd_wnd_scale,
637 sack_permitted,
638 }),
639 }
640 }
641}
642enum FinQueued {}
643
644impl FinQueued {
645 const YES: bool = true;
649 const NO: bool = false;
650}
651
652#[derive(Derivative)]
654#[derivative(Debug)]
655#[cfg_attr(test, derivative(PartialEq, Eq))]
656pub(crate) struct Send<I, S, const FIN_QUEUED: bool> {
657 nxt: SeqNum,
658 pub(crate) max: SeqNum,
659 una: SeqNum,
660 wnd: WindowSize,
661 wnd_scale: WindowScale,
662 wnd_max: WindowSize,
663 wl1: SeqNum,
664 wl2: SeqNum,
665 last_push: SeqNum,
666 rtt_sampler: RttSampler<I>,
667 rtt_estimator: Estimator,
668 timer: Option<SendTimer<I>>,
669 #[derivative(PartialEq = "ignore")]
670 congestion_control: CongestionControl<I>,
671 buffer: S,
672}
673
674impl<I> Send<I, (), false> {
675 fn with_buffer<S>(self, buffer: S) -> Send<I, S, false> {
676 let Self {
677 nxt,
678 max,
679 una,
680 wnd,
681 wnd_scale,
682 wnd_max,
683 wl1,
684 wl2,
685 last_push,
686 rtt_sampler,
687 rtt_estimator,
688 timer,
689 congestion_control,
690 buffer: _,
691 } = self;
692 Send {
693 nxt,
694 max,
695 una,
696 wnd,
697 wnd_scale,
698 wnd_max,
699 wl1,
700 wl2,
701 last_push,
702 rtt_sampler,
703 rtt_estimator,
704 timer,
705 congestion_control,
706 buffer,
707 }
708 }
709}
710
711#[derive(Debug, Clone, Copy)]
712#[cfg_attr(test, derive(PartialEq, Eq))]
713struct RetransTimer<I> {
714 user_timeout_until: Option<I>,
715 remaining_retries: Option<NonZeroU8>,
716 at: I,
717 rto: Rto,
718}
719
720impl<I: Instant> RetransTimer<I> {
721 fn new(
722 now: I,
723 rto: Rto,
724 user_timeout: Option<NonZeroDuration>,
725 max_retries: NonZeroU8,
726 ) -> Self {
727 let user_timeout_until = user_timeout.map(|t| now.saturating_add(t.get()));
728 Self::new_with_user_deadline(now, rto, user_timeout_until, max_retries)
729 }
730
731 fn new_with_user_deadline(
732 now: I,
733 rto: Rto,
734 user_timeout_until: Option<I>,
735 max_retries: NonZeroU8,
736 ) -> Self {
737 let rto_at = now.panicking_add(rto.get());
738 let at = user_timeout_until.map(|i| i.min(rto_at)).unwrap_or(rto_at);
739 Self { at, rto, user_timeout_until, remaining_retries: Some(max_retries) }
740 }
741
742 fn backoff(&mut self, now: I) {
743 let Self { at, rto, user_timeout_until, remaining_retries } = self;
744 *remaining_retries = remaining_retries.and_then(|r| NonZeroU8::new(r.get() - 1));
745 *rto = rto.double();
746 let rto_at = now.panicking_add(rto.get());
747 *at = user_timeout_until.map(|i| i.min(rto_at)).unwrap_or(rto_at);
748 }
749
750 fn timed_out(&self, now: I) -> bool {
751 let RetransTimer { user_timeout_until, remaining_retries, at, rto: _ } = self;
752 (remaining_retries.is_none() && now >= *at) || user_timeout_until.is_some_and(|t| now >= t)
753 }
754}
755
756#[derive(Debug, Clone, Copy)]
758#[cfg_attr(test, derive(PartialEq, Eq))]
759enum SendTimer<I> {
760 Retrans(RetransTimer<I>),
763 KeepAlive(KeepAliveTimer<I>),
766 ZeroWindowProbe(RetransTimer<I>),
774 SWSProbe { at: I },
782}
783
784#[derive(Debug, Clone, Copy)]
785#[cfg_attr(test, derive(PartialEq, Eq))]
786enum ReceiveTimer<I> {
787 DelayedAck { at: I },
788}
789
790#[derive(Debug, Clone, Copy)]
791#[cfg_attr(test, derive(PartialEq, Eq))]
792struct KeepAliveTimer<I> {
793 at: I,
794 already_sent: u8,
795}
796
797impl<I: Instant> KeepAliveTimer<I> {
798 fn idle(now: I, keep_alive: &KeepAlive) -> Self {
799 let at = now.saturating_add(keep_alive.idle.into());
800 Self { at, already_sent: 0 }
801 }
802}
803
804impl<I: Instant> SendTimer<I> {
805 fn expiry(&self) -> I {
806 match self {
807 SendTimer::Retrans(RetransTimer {
808 at,
809 rto: _,
810 user_timeout_until: _,
811 remaining_retries: _,
812 })
813 | SendTimer::KeepAlive(KeepAliveTimer { at, already_sent: _ })
814 | SendTimer::ZeroWindowProbe(RetransTimer {
815 at,
816 rto: _,
817 user_timeout_until: _,
818 remaining_retries: _,
819 }) => *at,
820 SendTimer::SWSProbe { at } => *at,
821 }
822 }
823}
824
825impl<I: Instant> ReceiveTimer<I> {
826 fn expiry(&self) -> I {
827 match self {
828 ReceiveTimer::DelayedAck { at } => *at,
829 }
830 }
831}
832
833#[derive(Debug)]
835#[cfg_attr(test, derive(PartialEq, Eq))]
836enum RecvBufferState<R> {
837 Open { buffer: R, assembler: Assembler },
838 Closed { buffer_size: usize, nxt: SeqNum },
839}
840
841impl<R: ReceiveBuffer> RecvBufferState<R> {
842 fn is_closed(&self) -> bool {
843 matches!(self, Self::Closed { .. })
844 }
845
846 fn has_out_of_order(&self) -> bool {
847 match self {
848 Self::Open { assembler, .. } => assembler.has_out_of_order(),
849 Self::Closed { .. } => false,
850 }
851 }
852
853 fn close(&mut self) {
854 let new_state = match self {
855 Self::Open { buffer, assembler } => {
856 Self::Closed { nxt: assembler.nxt(), buffer_size: buffer.limits().capacity }
857 }
858 Self::Closed { .. } => return,
859 };
860 *self = new_state;
861 }
862
863 fn limits(&self) -> BufferLimits {
864 match self {
865 RecvBufferState::Open { buffer, .. } => buffer.limits(),
866 RecvBufferState::Closed { buffer_size, .. } => {
867 BufferLimits { capacity: *buffer_size, len: 0 }
868 }
869 }
870 }
871}
872
873fn quickack_counter(rcv_limits: BufferLimits, mss: Mss) -> usize {
875 const MIN_QUICKACK: usize = 2;
877 const MAX_QUICKACK: usize = 32;
881
882 let BufferLimits { capacity, len } = rcv_limits;
883 let window = capacity - len;
884 (window / (2 * usize::from(mss))).clamp(MIN_QUICKACK, MAX_QUICKACK)
895}
896
897#[derive(Debug)]
899#[cfg_attr(test, derive(PartialEq, Eq))]
900pub(crate) struct Recv<I, R> {
901 timer: Option<ReceiveTimer<I>>,
902 mss: Mss,
903 wnd_scale: WindowScale,
904 last_window_update: (SeqNum, WindowSize),
905 remaining_quickacks: usize,
906 last_segment_at: Option<I>,
907 sack_permitted: bool,
910
911 buffer: RecvBufferState<R>,
913}
914
915impl<I> Recv<I, ()> {
916 fn with_buffer<R>(self, buffer: R) -> Recv<I, R> {
917 let Self {
918 timer,
919 mss,
920 wnd_scale,
921 last_window_update,
922 buffer: old_buffer,
923 remaining_quickacks,
924 last_segment_at,
925 sack_permitted,
926 } = self;
927 let nxt = match old_buffer {
928 RecvBufferState::Open { assembler, .. } => assembler.nxt(),
929 RecvBufferState::Closed { .. } => unreachable!(),
930 };
931 Recv {
932 timer,
933 mss,
934 wnd_scale,
935 last_window_update,
936 remaining_quickacks,
937 last_segment_at,
938 buffer: RecvBufferState::Open { buffer, assembler: Assembler::new(nxt) },
939 sack_permitted,
940 }
941 }
942}
943
944impl<I, R> Recv<I, R> {
945 fn sack_blocks(&self) -> SackBlocks {
946 if self.sack_permitted {
947 match &self.buffer {
948 RecvBufferState::Open { buffer: _, assembler } => assembler.sack_blocks(),
949 RecvBufferState::Closed { buffer_size: _, nxt: _ } => SackBlocks::default(),
950 }
951 } else {
952 SackBlocks::default()
954 }
955 }
956}
957
958struct WindowSizeCalculation {
960 rcv_nxt: SeqNum,
963 window_size: WindowSize,
965 threshold: usize,
971}
972
973impl<I: Instant, R: ReceiveBuffer> Recv<I, R> {
974 fn calculate_window_size(&self) -> WindowSizeCalculation {
976 let rcv_nxt = self.nxt();
977 let Self {
978 buffer,
979 timer: _,
980 mss,
981 wnd_scale: _,
982 last_window_update: (rcv_wup, last_wnd),
983 remaining_quickacks: _,
984 last_segment_at: _,
985 sack_permitted: _,
986 } = self;
987
988 let BufferLimits { capacity, len } = buffer.limits();
1000
1001 let unused_window = u32::try_from(*rcv_wup + *last_wnd - rcv_nxt).unwrap_or(0);
1007 let unused_window = WindowSize::from_u32(unused_window).unwrap_or(WindowSize::MAX);
1009
1010 let reduction = capacity.saturating_sub(len.saturating_add(usize::from(unused_window)));
1014 let threshold = usize::min(capacity / 2, usize::from(mss.get().get()));
1015 let window_size = if reduction >= threshold {
1016 WindowSize::new(capacity - len).unwrap_or(WindowSize::MAX)
1018 } else {
1019 unused_window
1022 };
1023 WindowSizeCalculation { rcv_nxt, window_size, threshold }
1024 }
1025
1026 fn poll_receive_data_dequeued(&mut self, snd_max: SeqNum) -> Option<Segment<()>> {
1029 let WindowSizeCalculation { rcv_nxt, window_size: calculated_window_size, threshold } =
1030 self.calculate_window_size();
1031 let (rcv_wup, last_window_size) = self.last_window_update;
1032
1033 let rcv_diff = rcv_nxt - rcv_wup;
1039 debug_assert!(rcv_diff >= 0, "invalid RCV.NXT change: {rcv_nxt:?} < {rcv_wup:?}");
1040 let last_window_size =
1041 last_window_size.saturating_sub(usize::try_from(rcv_diff).unwrap_or(0));
1042
1043 let effective_window_size = (calculated_window_size >> self.wnd_scale) << self.wnd_scale;
1048 let effective_window_size_usize: usize = effective_window_size.into();
1051 let last_window_size_usize: usize = last_window_size.into();
1052
1053 if last_window_size_usize < threshold && effective_window_size_usize >= threshold {
1059 self.last_window_update = (rcv_nxt, calculated_window_size);
1060 self.timer = None;
1063 Some(Segment::ack(snd_max, self.nxt(), calculated_window_size >> self.wnd_scale))
1064 } else {
1065 None
1066 }
1067 }
1068
1069 pub(crate) fn nxt(&self) -> SeqNum {
1070 match &self.buffer {
1071 RecvBufferState::Open { assembler, .. } => assembler.nxt(),
1072 RecvBufferState::Closed { nxt, .. } => *nxt,
1073 }
1074 }
1075
1076 fn poll_send(&mut self, snd_max: SeqNum, now: I) -> Option<Segment<()>> {
1077 match self.timer {
1078 Some(ReceiveTimer::DelayedAck { at }) => (at <= now).then(|| {
1079 self.timer = None;
1080 self.make_ack(snd_max)
1081 }),
1082 None => None,
1083 }
1084 }
1085
1086 fn handle_fin(&self) -> RecvParams {
1088 let WindowSizeCalculation { rcv_nxt, window_size, threshold: _ } =
1089 self.calculate_window_size();
1090 RecvParams {
1091 ack: rcv_nxt + 1,
1092 wnd: window_size.checked_sub(1).unwrap_or(WindowSize::ZERO),
1093 wnd_scale: self.wnd_scale,
1094 }
1095 }
1096
1097 fn reset_quickacks(&mut self) {
1098 let Self {
1099 timer: _,
1100 mss,
1101 wnd_scale: _,
1102 last_window_update: _,
1103 remaining_quickacks,
1104 buffer,
1105 last_segment_at: _,
1106 sack_permitted: _,
1107 } = self;
1108 let new_remaining = quickack_counter(buffer.limits(), *mss);
1109 *remaining_quickacks = new_remaining.max(*remaining_quickacks);
1111 }
1112}
1113
1114impl<'a, I: Instant, R: ReceiveBuffer> RecvSegmentArgumentsProvider for &'a mut Recv<I, R> {
1115 fn take_rcv_segment_args(self) -> (SeqNum, UnscaledWindowSize, SackBlocks) {
1116 let WindowSizeCalculation { rcv_nxt, window_size, threshold: _ } =
1117 self.calculate_window_size();
1118 self.last_window_update = (rcv_nxt, window_size);
1119 (rcv_nxt, window_size >> self.wnd_scale, self.sack_blocks())
1120 }
1121}
1122
1123#[derive(Debug, Clone)]
1126#[cfg_attr(test, derive(PartialEq, Eq))]
1127pub(super) struct RecvParams {
1128 pub(super) ack: SeqNum,
1129 pub(super) wnd_scale: WindowScale,
1130 pub(super) wnd: WindowSize,
1131}
1132
1133impl<'a> RecvSegmentArgumentsProvider for &'a RecvParams {
1134 fn take_rcv_segment_args(self) -> (SeqNum, UnscaledWindowSize, SackBlocks) {
1135 (self.ack, self.wnd >> self.wnd_scale, SackBlocks::default())
1136 }
1137}
1138
1139struct CalculatedRecvParams<'a, I, R> {
1142 params: RecvParams,
1143 recv: Option<&'a mut Recv<I, R>>,
1144}
1145
1146impl<'a, I, R> CalculatedRecvParams<'a, I, R> {
1147 fn from_params(params: RecvParams) -> Self {
1152 Self { params, recv: None }
1153 }
1154
1155 fn nxt(&self) -> SeqNum {
1156 self.params.ack
1157 }
1158
1159 fn wnd(&self) -> WindowSize {
1160 self.params.wnd
1161 }
1162}
1163
1164impl<'a, I, R> RecvSegmentArgumentsProvider for CalculatedRecvParams<'a, I, R> {
1165 fn take_rcv_segment_args(self) -> (SeqNum, UnscaledWindowSize, SackBlocks) {
1166 let Self { params, recv } = self;
1167 let RecvParams { ack, wnd_scale, wnd } = params;
1168 let sack_blocks = if let Some(recv) = recv {
1169 recv.last_window_update = (ack, wnd);
1171 recv.sack_blocks()
1172 } else {
1173 SackBlocks::default()
1174 };
1175 (ack, wnd >> wnd_scale, sack_blocks)
1176 }
1177}
1178
1179impl<'a, I: Instant, R: ReceiveBuffer> CalculatedRecvParams<'a, I, R> {
1180 fn from_recv(recv: &'a mut Recv<I, R>) -> Self {
1181 let WindowSizeCalculation { rcv_nxt: ack, window_size: wnd, threshold: _ } =
1182 recv.calculate_window_size();
1183 let wnd_scale = recv.wnd_scale;
1184 Self { params: RecvParams { ack, wnd_scale, wnd }, recv: Some(recv) }
1185 }
1186}
1187
1188trait RecvSegmentArgumentsProvider: Sized {
1189 fn take_rcv_segment_args(self) -> (SeqNum, UnscaledWindowSize, SackBlocks);
1195
1196 fn make_segment<P, F: FnOnce(SeqNum, UnscaledWindowSize, SackBlocks) -> Segment<P>>(
1199 self,
1200 f: F,
1201 ) -> Segment<P> {
1202 let (ack, wnd, sack) = self.take_rcv_segment_args();
1203 f(ack, wnd, sack)
1204 }
1205
1206 fn make_ack<P: Payload>(self, seq: SeqNum) -> Segment<P> {
1209 let (ack, wnd, sack_blocks) = self.take_rcv_segment_args();
1210 Segment::ack_with_options(seq, ack, wnd, Options::Segment(SegmentOptions { sack_blocks }))
1211 }
1212}
1213
1214#[derive(Debug)]
1229#[cfg_attr(test, derive(PartialEq, Eq))]
1230pub struct Established<I, R, S> {
1231 pub(crate) snd: Takeable<Send<I, S, { FinQueued::NO }>>,
1232 pub(crate) rcv: Takeable<Recv<I, R>>,
1233}
1234
1235#[derive(Debug, Clone, Copy, PartialEq)]
1238pub(crate) enum DataAcked {
1239 Yes,
1240 No,
1241}
1242
1243impl<I: Instant, S: SendBuffer, const FIN_QUEUED: bool> Send<I, S, FIN_QUEUED> {
1244 fn timed_out(&self, now: I, keep_alive: &KeepAlive) -> bool {
1246 match self.timer {
1247 Some(SendTimer::KeepAlive(keep_alive_timer)) => {
1248 keep_alive.enabled && keep_alive_timer.already_sent >= keep_alive.count.get()
1249 }
1250 Some(SendTimer::Retrans(timer)) | Some(SendTimer::ZeroWindowProbe(timer)) => {
1251 timer.timed_out(now)
1252 }
1253 Some(SendTimer::SWSProbe { at: _ }) | None => false,
1254 }
1255 }
1256
1257 fn poll_send(
1263 &mut self,
1264 id: &impl StateMachineDebugId,
1265 counters: &TcpCountersRefs<'_>,
1266 rcv: impl RecvSegmentArgumentsProvider,
1267 limit: u32,
1268 now: I,
1269 SocketOptions {
1270 keep_alive,
1271 nagle_enabled,
1272 user_timeout,
1273 delayed_ack: _,
1274 fin_wait2_timeout: _,
1275 max_syn_retries: _,
1276 ip_options: _,
1277 }: &SocketOptions,
1278 ) -> Option<Segment<S::Payload<'_>>> {
1279 let Self {
1280 nxt: snd_nxt,
1281 max: snd_max,
1282 una: snd_una,
1283 wnd: snd_wnd,
1284 buffer,
1285 wl1: _,
1286 wl2: _,
1287 last_push,
1288 rtt_sampler,
1289 rtt_estimator,
1290 timer,
1291 congestion_control,
1292 wnd_scale: _,
1293 wnd_max: snd_wnd_max,
1294 } = self;
1295 let BufferLimits { capacity: _, len: readable_bytes } = buffer.limits();
1296 let mss = u32::from(congestion_control.mss());
1297 let mut zero_window_probe = false;
1298 let mut override_sws = false;
1299
1300 match timer {
1301 Some(SendTimer::Retrans(retrans_timer)) => {
1302 if retrans_timer.at <= now {
1303 congestion_control.on_retransmission_timeout(*snd_nxt);
1318 *snd_nxt = *snd_una;
1319 retrans_timer.backoff(now);
1320 counters.increment(|c| &c.timeouts);
1321 }
1322 }
1323 Some(SendTimer::ZeroWindowProbe(retrans_timer)) => {
1324 debug_assert!(readable_bytes > 0 || FIN_QUEUED);
1325 if retrans_timer.at <= now {
1326 zero_window_probe = true;
1327 *snd_nxt = *snd_una;
1328 retrans_timer.backoff(now);
1332 }
1333 }
1334 Some(SendTimer::KeepAlive(KeepAliveTimer { at, already_sent })) => {
1335 if keep_alive.enabled && !FIN_QUEUED && readable_bytes == 0 {
1340 if *at <= now {
1341 *at = now.saturating_add(keep_alive.interval.into());
1342 *already_sent = already_sent.saturating_add(1);
1343 return Some(rcv.make_ack(*snd_max - 1));
1346 }
1347 } else {
1348 *timer = None;
1349 }
1350 }
1351 Some(SendTimer::SWSProbe { at }) => {
1352 if *at <= now {
1353 override_sws = true;
1354 *timer = None;
1355 }
1356 }
1357 None => {}
1358 };
1359
1360 if *snd_wnd == WindowSize::ZERO && readable_bytes > 0 {
1363 match timer {
1364 Some(SendTimer::ZeroWindowProbe(_)) => {}
1365 _ => {
1366 *timer = Some(SendTimer::ZeroWindowProbe(RetransTimer::new(
1367 now,
1368 rtt_estimator.rto(),
1369 *user_timeout,
1370 DEFAULT_MAX_RETRIES,
1371 )));
1372
1373 return None;
1382 }
1383 }
1384 }
1385
1386 let CongestionControlSendOutcome {
1391 next_seg,
1392 congestion_limit,
1393 congestion_window,
1394 loss_recovery,
1395 } = congestion_control.poll_send(*snd_una, *snd_nxt, *snd_wnd, readable_bytes)?;
1396
1397 let snd_limit = *snd_una + *snd_wnd;
1401 let unused_window = u32::try_from(snd_limit - next_seg).ok_checked::<TryFromIntError>()?;
1402 let offset =
1403 usize::try_from(next_seg - *snd_una).unwrap_or_else(|TryFromIntError { .. }| {
1404 panic!("next_seg({:?}) should never fall behind snd.una({:?})", next_seg, *snd_una);
1405 });
1406 let available = u32::try_from(readable_bytes + usize::from(FIN_QUEUED) - offset)
1407 .unwrap_or_else(|_| WindowSize::MAX.into());
1408 let can_send = unused_window
1412 .min(congestion_limit)
1413 .min(available)
1414 .min(limit)
1415 .max(u32::from(zero_window_probe));
1416
1417 if can_send == 0 {
1418 if available == 0 && offset == 0 && timer.is_none() && keep_alive.enabled {
1419 *timer = Some(SendTimer::KeepAlive(KeepAliveTimer::idle(now, keep_alive)));
1420 }
1421 return None;
1422 }
1423
1424 let has_fin = FIN_QUEUED && can_send == available;
1425 let seg = buffer.peek_with(offset, |readable| {
1426 let bytes_to_send = u32::min(
1427 can_send - u32::from(has_fin),
1428 u32::try_from(readable.len()).unwrap_or(u32::MAX),
1429 );
1430 let has_fin = has_fin && bytes_to_send == can_send - u32::from(has_fin);
1431
1432 let loss_recovery_allow_delay = match loss_recovery {
1440 LossRecoverySegment::Yes { rearm_retransmit: _, mode: _ } => false,
1441 LossRecoverySegment::No => true,
1442 };
1443 if bytes_to_send < mss && !has_fin && loss_recovery_allow_delay {
1444 if bytes_to_send == 0 {
1445 return None;
1446 }
1447 if *nagle_enabled && snd_nxt.after(*snd_una) {
1455 return None;
1456 }
1457 if available > unused_window
1486 && unused_window < u32::min(mss, u32::from(*snd_wnd_max) / SWS_BUFFER_FACTOR)
1487 && !override_sws
1488 && !zero_window_probe
1489 {
1490 if timer.is_none() {
1491 *timer =
1492 Some(SendTimer::SWSProbe { at: now.panicking_add(SWS_PROBE_TIMEOUT) })
1493 }
1494 return None;
1495 }
1496 }
1497
1498 let seg = rcv.make_segment(|ack, wnd, mut sack_blocks| {
1499 let bytes_to_send = match sack_blocks.as_option() {
1500 Some(option) => {
1502 let options_len = u32::try_from(
1506 packet_formats::tcp::aligned_options_length(core::iter::once(option)),
1507 )
1508 .unwrap();
1509 if options_len < mss {
1510 bytes_to_send.min(mss - options_len)
1511 } else {
1512 sack_blocks.clear();
1520 bytes_to_send
1521 }
1522 }
1523 None => bytes_to_send,
1525 };
1526
1527 let no_more_data_to_send = u32::try_from(readable_bytes - offset)
1542 .is_ok_and(|avail| avail == bytes_to_send);
1543
1544 let periodic_push =
1545 next_seg.after_or_eq(*last_push + snd_wnd_max.halved().max(WindowSize::ONE));
1546 let push = no_more_data_to_send || periodic_push;
1547 let (seg, discarded) = Segment::new(
1548 SegmentHeader {
1549 seq: next_seg,
1550 ack: Some(ack),
1551 control: has_fin.then_some(Control::FIN),
1552 wnd,
1553 options: Options::Segment(SegmentOptions { sack_blocks }),
1554 push,
1555 },
1556 readable.slice(0..bytes_to_send),
1557 );
1558 debug_assert_eq!(discarded, 0);
1559 seg
1560 });
1561 Some(seg)
1562 })?;
1563 trace_instant!(c"tcp::Send::poll_send/segment",
1564 "id" => id.trace_id(),
1565 "seq" => u32::from(next_seg),
1566 "len" => seg.len(),
1567 "can_send" => can_send,
1568 "snd_wnd" => u32::from(*snd_wnd),
1569 "cwnd" => congestion_window,
1570 "unused_window" => unused_window,
1571 "available" => available,
1572 );
1573 let seq_max = next_seg + seg.len();
1574 rtt_sampler.on_will_send_segment(now, next_seg..seq_max, *snd_max);
1575 congestion_control.on_will_send_segment(seg.len());
1576
1577 if seq_max.after(*snd_nxt) {
1578 *snd_nxt = seq_max;
1579 } else {
1580 match loss_recovery {
1583 LossRecoverySegment::Yes { rearm_retransmit: _, ref mode } => match mode {
1584 LossRecoveryMode::FastRecovery => counters.increment(|c| &c.fast_retransmits),
1585 LossRecoveryMode::SackRecovery => counters.increment(|c| &c.sack_retransmits),
1586 },
1587 LossRecoverySegment::No => (),
1588 }
1589 }
1590 if seq_max.after(*snd_max) {
1591 *snd_max = seq_max;
1592 } else {
1593 counters.increment(|c| &c.retransmits);
1595 if congestion_control.in_slow_start() {
1596 counters.increment(|c| &c.slow_start_retransmits);
1597 }
1598 }
1599
1600 if seg.header().push {
1602 *last_push = seg.header().seq;
1603 }
1604
1605 let update_rto = match timer {
1611 Some(SendTimer::Retrans(_)) | Some(SendTimer::ZeroWindowProbe(_)) => {
1612 match loss_recovery {
1615 LossRecoverySegment::Yes { rearm_retransmit, mode: _ } => rearm_retransmit,
1616 LossRecoverySegment::No => false,
1617 }
1618 }
1619 Some(SendTimer::KeepAlive(_)) | Some(SendTimer::SWSProbe { at: _ }) | None => true,
1620 };
1621 if update_rto {
1622 *timer = Some(SendTimer::Retrans(RetransTimer::new(
1623 now,
1624 rtt_estimator.rto(),
1625 *user_timeout,
1626 DEFAULT_MAX_RETRIES,
1627 )))
1628 }
1629 Some(seg)
1630 }
1631
1632 fn process_ack<R: RecvSegmentArgumentsProvider>(
1635 &mut self,
1636 id: &impl StateMachineDebugId,
1637 counters: &TcpCountersRefs<'_>,
1638 seg_seq: SeqNum,
1639 seg_ack: SeqNum,
1640 seg_wnd: UnscaledWindowSize,
1641 seg_sack_blocks: &SackBlocks,
1642 pure_ack: bool,
1643 rcv: R,
1644 now: I,
1645 SocketOptions {
1646 keep_alive,
1647 nagle_enabled: _,
1648 user_timeout,
1649 delayed_ack: _,
1650 fin_wait2_timeout: _,
1651 max_syn_retries: _,
1652 ip_options: _,
1653 }: &SocketOptions,
1654 ) -> (Option<Segment<()>>, DataAcked) {
1655 let Self {
1656 nxt: snd_nxt,
1657 max: snd_max,
1658 una: snd_una,
1659 wnd: snd_wnd,
1660 wl1: snd_wl1,
1661 wl2: snd_wl2,
1662 last_push: _,
1663 wnd_max,
1664 buffer,
1665 rtt_sampler,
1666 rtt_estimator,
1667 timer,
1668 congestion_control,
1669 wnd_scale,
1670 } = self;
1671 let seg_wnd = seg_wnd << *wnd_scale;
1672 match timer {
1673 Some(SendTimer::KeepAlive(_)) | None => {
1674 if keep_alive.enabled {
1675 *timer = Some(SendTimer::KeepAlive(KeepAliveTimer::idle(now, keep_alive)));
1676 }
1677 }
1678 Some(SendTimer::Retrans(retrans_timer)) => {
1679 if seg_ack == *snd_max {
1687 *timer = None;
1688 } else if seg_ack.before(*snd_max) && seg_ack.after(*snd_una) {
1689 *retrans_timer = RetransTimer::new(
1690 now,
1691 rtt_estimator.rto(),
1692 *user_timeout,
1693 DEFAULT_MAX_RETRIES,
1694 );
1695 }
1696 }
1697 Some(SendTimer::ZeroWindowProbe(_)) | Some(SendTimer::SWSProbe { at: _ }) => {}
1698 }
1699 if seg_ack.after(*snd_max) {
1703 return (Some(rcv.make_ack(*snd_max)), DataAcked::No);
1708 }
1709
1710 let bytes_acked = match u32::try_from(seg_ack - *snd_una) {
1711 Ok(acked) => NonZeroU32::new(acked),
1712 Err(TryFromIntError { .. }) => {
1713 return (None, DataAcked::No);
1716 }
1717 };
1718
1719 let is_dup_ack_by_sack =
1720 congestion_control.preprocess_ack(seg_ack, *snd_nxt, seg_sack_blocks);
1721 let (is_dup_ack, data_acked) = if let Some(acked) = bytes_acked {
1722 let BufferLimits { len, capacity: _ } = buffer.limits();
1723 let fin_acked = FIN_QUEUED && seg_ack == *snd_una + len + 1;
1724 buffer.mark_read(
1729 NonZeroUsize::try_from(acked)
1730 .unwrap_or_else(|TryFromIntError { .. }| {
1731 panic!(
1735 "acked({:?}) must be smaller than isize::MAX({:?})",
1736 acked,
1737 isize::MAX
1738 )
1739 })
1740 .get()
1741 - usize::from(fin_acked),
1742 );
1743 *snd_una = seg_ack;
1744 if seg_ack.after(*snd_nxt) {
1748 *snd_nxt = seg_ack;
1749 }
1750 if let Some(rtt) = rtt_sampler.on_ack(now, seg_ack) {
1753 rtt_estimator.sample(rtt);
1754 }
1755
1756 let recovered = congestion_control.on_ack(seg_ack, acked, now, rtt_estimator.srtt());
1759 if recovered {
1760 counters.increment(|c| &c.loss_recovered);
1761 }
1762
1763 let is_dup_ack = is_dup_ack_by_sack.unwrap_or(false);
1767
1768 (is_dup_ack, DataAcked::Yes)
1770 } else {
1771 let is_dup_ack = is_dup_ack_by_sack.unwrap_or_else(|| {
1774 snd_nxt.after(*snd_una) && pure_ack && seg_ack == *snd_una && seg_wnd == *snd_wnd });
1790
1791 (is_dup_ack, DataAcked::No)
1795 };
1796
1797 if is_dup_ack {
1798 counters.increment(|c| &c.dup_acks);
1799 let new_loss_recovery = congestion_control.on_dup_ack(seg_ack, *snd_nxt);
1800 match new_loss_recovery {
1801 Some(LossRecoveryMode::FastRecovery) => counters.increment(|c| &c.fast_recovery),
1802 Some(LossRecoveryMode::SackRecovery) => counters.increment(|c| &c.sack_recovery),
1803 None => (),
1804 }
1805 }
1806
1807 if !snd_una.after(seg_ack)
1814 && (snd_wl1.before(seg_seq) || (seg_seq == *snd_wl1 && !snd_wl2.after(seg_ack)))
1815 {
1816 *snd_wnd = seg_wnd;
1817 *snd_wl1 = seg_seq;
1818 *snd_wl2 = seg_ack;
1819 *wnd_max = seg_wnd.max(*wnd_max);
1820 if seg_wnd != WindowSize::ZERO && matches!(timer, Some(SendTimer::ZeroWindowProbe(_))) {
1821 *timer = None;
1822 *snd_nxt = *snd_una;
1826 }
1827 }
1828
1829 if data_acked == DataAcked::Yes || is_dup_ack {
1831 trace_instant!(c"tcp::Send::process_ack",
1832 "id" => id.trace_id(),
1833 "seg_ack" => u32::from(seg_ack),
1834 "snd_nxt" => u32::from(*snd_nxt),
1835 "snd_wnd" => u32::from(*snd_wnd),
1836 "rtt_ms" => u32::try_from(
1837 rtt_estimator.srtt().unwrap_or(Duration::ZERO).as_millis()
1840 ).unwrap_or(u32::MAX),
1841 "cwnd" => congestion_control.inspect_cwnd().cwnd(),
1842 "ssthresh" => congestion_control.slow_start_threshold(),
1843 "loss_recovery" => congestion_control.inspect_loss_recovery_mode().is_some(),
1844 "acked" => data_acked == DataAcked::Yes,
1845 );
1846 }
1847
1848 (None, data_acked)
1849 }
1850
1851 fn update_mss(&mut self, mss: Mss, seq: SeqNum) -> ShouldRetransmit {
1852 if mss >= self.congestion_control.mss() {
1864 return ShouldRetransmit::No;
1865 }
1866
1867 self.nxt = seq;
1887
1888 self.congestion_control.update_mss(mss, self.una, self.nxt);
1891
1892 ShouldRetransmit::Yes(mss)
1901 }
1902
1903 #[cfg(test)]
1904 pub(crate) fn congestion_control(&self) -> &CongestionControl<I> {
1905 &self.congestion_control
1906 }
1907}
1908
1909impl<I: Instant, S: SendBuffer> Send<I, S, { FinQueued::NO }> {
1910 fn queue_fin(self) -> Send<I, S, { FinQueued::YES }> {
1911 let Self {
1912 nxt,
1913 max,
1914 una,
1915 wnd,
1916 wl1,
1917 wl2,
1918 last_push,
1919 buffer,
1920 rtt_sampler,
1921 rtt_estimator,
1922 timer,
1923 congestion_control,
1924 wnd_scale,
1925 wnd_max,
1926 } = self;
1927 Send {
1928 nxt,
1929 max,
1930 una,
1931 wnd,
1932 wl1,
1933 wl2,
1934 last_push,
1935 buffer,
1936 rtt_sampler,
1937 rtt_estimator,
1938 timer,
1939 congestion_control,
1940 wnd_scale,
1941 wnd_max,
1942 }
1943 }
1944}
1945
1946#[derive(Debug)]
1960#[cfg_attr(test, derive(PartialEq, Eq))]
1961pub struct CloseWait<I, S> {
1962 snd: Takeable<Send<I, S, { FinQueued::NO }>>,
1963 closed_rcv: RecvParams,
1964}
1965
1966#[derive(Debug)]
1982#[cfg_attr(test, derive(PartialEq, Eq))]
1983pub struct LastAck<I, S> {
1984 snd: Send<I, S, { FinQueued::YES }>,
1985 closed_rcv: RecvParams,
1986}
1987
1988#[derive(Debug)]
2003#[cfg_attr(test, derive(PartialEq, Eq))]
2004pub struct FinWait1<I, R, S> {
2005 snd: Takeable<Send<I, S, { FinQueued::YES }>>,
2006 rcv: Takeable<Recv<I, R>>,
2007}
2008
2009#[derive(Debug)]
2010#[cfg_attr(test, derive(PartialEq, Eq))]
2011pub struct FinWait2<I, R> {
2025 last_seq: SeqNum,
2026 rcv: Recv<I, R>,
2027 timeout_at: Option<I>,
2028}
2029
2030#[derive(Debug)]
2031#[cfg_attr(test, derive(PartialEq, Eq))]
2032pub struct Closing<I, S> {
2046 snd: Send<I, S, { FinQueued::YES }>,
2047 closed_rcv: RecvParams,
2048}
2049
2050#[derive(Debug)]
2065#[cfg_attr(test, derive(PartialEq, Eq))]
2066pub struct TimeWait<I> {
2067 pub(super) last_seq: SeqNum,
2068 pub(super) expiry: I,
2069 pub(super) closed_rcv: RecvParams,
2070}
2071
2072fn new_time_wait_expiry<I: Instant>(now: I) -> I {
2073 now.panicking_add(MSL * 2)
2074}
2075
2076#[derive(Debug)]
2077#[cfg_attr(test, derive(PartialEq, Eq))]
2078pub enum State<I, R, S, ActiveOpen> {
2079 Closed(Closed<Option<ConnectionError>>),
2080 Listen(Listen),
2081 SynRcvd(SynRcvd<I, ActiveOpen>),
2082 SynSent(SynSent<I, ActiveOpen>),
2083 Established(Established<I, R, S>),
2084 CloseWait(CloseWait<I, S>),
2085 LastAck(LastAck<I, S>),
2086 FinWait1(FinWait1<I, R, S>),
2087 FinWait2(FinWait2<I, R>),
2088 Closing(Closing<I, S>),
2089 TimeWait(TimeWait<I>),
2090}
2091
2092impl<I, R, S, ActiveOpen> core::fmt::Display for State<I, R, S, ActiveOpen> {
2093 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
2094 let name = match self {
2095 State::Closed(_) => "Closed",
2096 State::Listen(_) => "Listen",
2097 State::SynRcvd(_) => "SynRcvd",
2098 State::SynSent(_) => "SynSent",
2099 State::Established(_) => "Established",
2100 State::CloseWait(_) => "CloseWait",
2101 State::LastAck(_) => "LastAck",
2102 State::FinWait1(_) => "FinWait1",
2103 State::FinWait2(_) => "FinWait2",
2104 State::Closing(_) => "Closing",
2105 State::TimeWait(_) => "TimeWait",
2106 };
2107 write!(f, "{name}")
2108 }
2109}
2110
2111#[derive(Debug, PartialEq, Eq)]
2112pub(super) enum CloseError {
2114 Closing,
2116 NoConnection,
2118}
2119
2120pub(crate) trait BufferProvider<R: ReceiveBuffer, S: SendBuffer> {
2123 type PassiveOpen;
2126
2127 type ActiveOpen: IntoBuffers<R, S>;
2130
2131 fn new_passive_open_buffers(buffer_sizes: BufferSizes) -> (R, S, Self::PassiveOpen);
2134}
2135
2136#[derive(Copy, Clone, Debug, Eq, PartialEq)]
2148pub(crate) struct Takeable<T>(Option<T>);
2149
2150impl<T> From<T> for Takeable<T> {
2151 fn from(value: T) -> Self {
2152 Self(Some(value))
2153 }
2154}
2155
2156impl<T> Takeable<T> {
2157 pub(crate) fn get(&self) -> &T {
2158 let Self(i) = self;
2159 i.as_ref().expect("accessed taken takeable")
2160 }
2161
2162 pub(crate) fn get_mut(&mut self) -> &mut T {
2163 let Self(i) = self;
2164 i.as_mut().expect("accessed taken takeable")
2165 }
2166
2167 pub(crate) fn new(v: T) -> Self {
2168 Self(Some(v))
2169 }
2170
2171 pub(crate) fn to_ref(&mut self) -> TakeableRef<'_, T> {
2172 TakeableRef(self)
2173 }
2174
2175 pub(crate) fn from_ref(t: TakeableRef<'_, T>) -> Self {
2176 let TakeableRef(Self(t)) = t;
2177 Self(Some(t.take().expect("accessed taken takeable")))
2178 }
2179
2180 pub(crate) fn into_inner(self) -> T {
2181 let Self(i) = self;
2182 i.expect("accessed taken takeable")
2183 }
2184
2185 pub(crate) fn map<R, F: FnOnce(T) -> R>(self, f: F) -> Takeable<R> {
2186 Takeable(Some(f(self.into_inner())))
2187 }
2188}
2189
2190impl<T> Deref for Takeable<T> {
2191 type Target = T;
2192
2193 fn deref(&self) -> &Self::Target {
2194 self.get()
2195 }
2196}
2197
2198impl<T> DerefMut for Takeable<T> {
2199 fn deref_mut(&mut self) -> &mut Self::Target {
2200 self.get_mut()
2201 }
2202}
2203
2204pub(crate) struct TakeableRef<'a, T>(&'a mut Takeable<T>);
2206
2207impl<'a, T> TakeableRef<'a, T> {
2208 pub(crate) fn take(self) -> T {
2209 let Self(Takeable(t)) = self;
2210 t.take().expect("accessed taken takeable")
2211 }
2212
2213 pub(crate) fn to_takeable(self) -> Takeable<T> {
2214 Takeable::new(self.take())
2215 }
2216}
2217
2218#[must_use = "must check to determine if the socket needs to be removed from the demux state"]
2219#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2220pub(crate) enum NewlyClosed {
2221 No,
2222 Yes,
2223}
2224
2225pub(crate) enum ShouldRetransmit {
2226 No,
2227 Yes(Mss),
2228}
2229
2230impl<I: Instant + 'static, R: ReceiveBuffer, S: SendBuffer, ActiveOpen: Debug>
2231 State<I, R, S, ActiveOpen>
2232{
2233 fn transition_to_state(
2235 &mut self,
2236 counters: &TcpCountersRefs<'_>,
2237 new_state: State<I, R, S, ActiveOpen>,
2238 ) -> NewlyClosed {
2239 log::debug!("transition to state {} => {}", self, new_state);
2240 let newly_closed = if let State::Closed(Closed { reason }) = &new_state {
2241 let (was_established, was_closed) = match self {
2242 State::Closed(_) => (false, true),
2243 State::Listen(_) | State::SynRcvd(_) | State::SynSent(_) => (false, false),
2244 State::Established(_)
2245 | State::CloseWait(_)
2246 | State::LastAck(_)
2247 | State::FinWait1(_)
2248 | State::FinWait2(_)
2249 | State::Closing(_)
2250 | State::TimeWait(_) => (true, false),
2251 };
2252 if was_established {
2253 counters.increment(|c| &c.established_closed);
2254 match reason {
2255 Some(ConnectionError::ConnectionRefused)
2256 | Some(ConnectionError::ConnectionReset) => {
2257 counters.increment(|c| &c.established_resets);
2258 }
2259 Some(ConnectionError::TimedOut) => {
2260 counters.increment(|c| &c.established_timedout);
2261 }
2262 _ => {}
2263 }
2264 }
2265 (!was_closed).then_some(NewlyClosed::Yes).unwrap_or(NewlyClosed::No)
2266 } else {
2267 NewlyClosed::No
2268 };
2269 *self = new_state;
2270 newly_closed
2271 }
2272 pub(crate) fn on_segment<P: Payload, BP: BufferProvider<R, S, ActiveOpen = ActiveOpen>>(
2279 &mut self,
2280 id: &impl StateMachineDebugId,
2281 counters: &TcpCountersRefs<'_>,
2282 incoming: Segment<P>,
2283 now: I,
2284 options @ SocketOptions {
2285 keep_alive: _,
2286 nagle_enabled: _,
2287 user_timeout: _,
2288 delayed_ack,
2289 fin_wait2_timeout,
2290 max_syn_retries: _,
2291 ip_options: _,
2292 }: &SocketOptions,
2293 defunct: bool,
2294 ) -> (Option<Segment<()>>, Option<BP::PassiveOpen>, DataAcked, NewlyClosed)
2295 where
2296 BP::PassiveOpen: Debug,
2297 ActiveOpen: IntoBuffers<R, S>,
2298 {
2299 let mut passive_open = None;
2300 let mut data_acked = DataAcked::No;
2301 let (seg, newly_closed) = (|| {
2302 let (mut rcv, snd_max, rst_on_new_data) = match self {
2303 State::Closed(closed) => return (closed.on_segment(&incoming), NewlyClosed::No),
2304 State::Listen(listen) => {
2305 return (
2306 match listen.on_segment(incoming, now) {
2307 ListenOnSegmentDisposition::SendSynAckAndEnterSynRcvd(
2308 syn_ack,
2309 SynRcvd {
2310 iss,
2311 irs,
2312 timestamp,
2313 retrans_timer,
2314 simultaneous_open,
2315 buffer_sizes,
2316 smss,
2317 rcv_wnd_scale,
2318 snd_wnd_scale,
2319 sack_permitted,
2320 },
2321 ) => {
2322 match simultaneous_open {
2323 None => {
2324 assert_eq!(
2325 self.transition_to_state(
2326 counters,
2327 State::SynRcvd(SynRcvd {
2328 iss,
2329 irs,
2330 timestamp,
2331 retrans_timer,
2332 simultaneous_open: None,
2333 buffer_sizes,
2334 smss,
2335 rcv_wnd_scale,
2336 snd_wnd_scale,
2337 sack_permitted,
2338 }),
2339 ),
2340 NewlyClosed::No
2341 )
2342 }
2343 }
2344 Some(syn_ack)
2345 }
2346 ListenOnSegmentDisposition::SendRst(rst) => Some(rst),
2347 ListenOnSegmentDisposition::Ignore => None,
2348 },
2349 NewlyClosed::No,
2350 );
2351 }
2352 State::SynSent(synsent) => {
2353 return match synsent.on_segment(incoming, now) {
2354 SynSentOnSegmentDisposition::SendAckAndEnterEstablished(established) => (
2355 replace_with_and(self, |this| {
2356 assert_matches!(this, State::SynSent(SynSent {
2357 active_open,
2358 buffer_sizes,
2359 ..
2360 }) => {
2361 let Established {snd, rcv} = established;
2362 let (rcv_buffer, snd_buffer) =
2363 active_open.into_buffers(buffer_sizes);
2364 let mut established = Established {
2365 snd: snd.map(|s| s.with_buffer(snd_buffer)),
2366 rcv: rcv.map(|s| s.with_buffer(rcv_buffer)),
2367 };
2368 let ack = Some(established.rcv.make_ack(established.snd.max));
2369 (State::Established(established), ack)
2370 })
2371 }),
2372 NewlyClosed::No,
2373 ),
2374 SynSentOnSegmentDisposition::SendSynAckAndEnterSynRcvd(
2375 syn_ack,
2376 mut syn_rcvd,
2377 ) => {
2378 replace_with(self, |this| {
2379 assert_matches!(this, State::SynSent(SynSent {
2380 active_open,
2381 ..
2382 }) => {
2383 assert_matches!(syn_rcvd.simultaneous_open.replace(active_open), None);
2384 State::SynRcvd(syn_rcvd)
2385 })
2386 });
2387 (Some(syn_ack), NewlyClosed::No)
2388 }
2389 SynSentOnSegmentDisposition::SendRst(rst) => (Some(rst), NewlyClosed::No),
2390 SynSentOnSegmentDisposition::EnterClosed(closed) => {
2391 assert_eq!(
2392 self.transition_to_state(counters, State::Closed(closed)),
2393 NewlyClosed::Yes,
2394 );
2395 (None, NewlyClosed::Yes)
2396 }
2397 SynSentOnSegmentDisposition::Ignore => (None, NewlyClosed::No),
2398 }
2399 }
2400 State::SynRcvd(SynRcvd {
2401 iss,
2402 irs,
2403 timestamp: _,
2404 retrans_timer: _,
2405 simultaneous_open: _,
2406 buffer_sizes,
2407 smss: _,
2408 rcv_wnd_scale: _,
2409 snd_wnd_scale: _,
2410 sack_permitted: _,
2411 }) => {
2412 let advertised = buffer_sizes.rwnd_unscaled();
2416 (
2417 CalculatedRecvParams::from_params(RecvParams {
2418 ack: *irs + 1,
2419 wnd: advertised << WindowScale::default(),
2420 wnd_scale: WindowScale::default(),
2421 }),
2422 *iss + 1,
2423 false,
2424 )
2425 }
2426 State::Established(Established { rcv, snd }) => {
2427 (CalculatedRecvParams::from_recv(rcv.get_mut()), snd.max, false)
2428 }
2429 State::CloseWait(CloseWait { snd, closed_rcv }) => {
2430 (CalculatedRecvParams::from_params(closed_rcv.clone()), snd.max, true)
2431 }
2432 State::LastAck(LastAck { snd, closed_rcv })
2433 | State::Closing(Closing { snd, closed_rcv }) => {
2434 (CalculatedRecvParams::from_params(closed_rcv.clone()), snd.max, true)
2435 }
2436 State::FinWait1(FinWait1 { rcv, snd }) => {
2437 let closed = rcv.buffer.is_closed();
2438 (CalculatedRecvParams::from_recv(rcv.get_mut()), snd.max, closed)
2439 }
2440 State::FinWait2(FinWait2 { last_seq, rcv, timeout_at: _ }) => {
2441 let closed = rcv.buffer.is_closed();
2442 (CalculatedRecvParams::from_recv(rcv), *last_seq, closed)
2443 }
2444 State::TimeWait(TimeWait { last_seq, expiry: _, closed_rcv }) => {
2445 (CalculatedRecvParams::from_params(closed_rcv.clone()), *last_seq, true)
2446 }
2447 };
2448
2449 if rst_on_new_data && (incoming.header().seq + incoming.data().len()).after(rcv.nxt()) {
2452 return (
2453 Some(Segment::rst(snd_max)),
2454 self.transition_to_state(
2455 counters,
2456 State::Closed(Closed { reason: Some(ConnectionError::ConnectionReset) }),
2457 ),
2458 );
2459 }
2460
2461 let is_rst = incoming.header().control == Some(Control::RST);
2467 let pure_ack = incoming.len() == 0;
2469 let needs_ack = !pure_ack;
2470 let segment = match incoming.overlap(rcv.nxt(), rcv.wnd()) {
2471 Some(incoming) => incoming,
2472 None => {
2473 let segment = if is_rst {
2481 None
2482 } else {
2483 if let Some(recv) = rcv.recv.as_mut() {
2484 recv.reset_quickacks();
2487 }
2488 Some(rcv.make_ack(snd_max))
2489 };
2490
2491 return (segment, NewlyClosed::No);
2492 }
2493 };
2494 let (
2495 SegmentHeader {
2496 seq: seg_seq,
2497 ack: seg_ack,
2498 wnd: seg_wnd,
2499 control,
2500 options: seg_options,
2501 push: _,
2502 },
2503 data,
2504 ) = segment.into_parts();
2505 if control == Some(Control::RST) {
2513 return (
2514 None,
2515 self.transition_to_state(
2516 counters,
2517 State::Closed(Closed { reason: Some(ConnectionError::ConnectionReset) }),
2518 ),
2519 );
2520 }
2521 if control == Some(Control::SYN) {
2532 return (
2533 Some(Segment::rst(snd_max)),
2534 self.transition_to_state(
2535 counters,
2536 State::Closed(Closed { reason: Some(ConnectionError::ConnectionReset) }),
2537 ),
2538 );
2539 }
2540 match seg_ack {
2543 Some(seg_ack) => match self {
2544 State::Closed(_) | State::Listen(_) | State::SynSent(_) => {
2545 unreachable!("encountered an already-handled state: {:?}", self)
2547 }
2548 State::SynRcvd(SynRcvd {
2549 iss,
2550 irs,
2551 timestamp: syn_rcvd_ts,
2552 retrans_timer: _,
2553 simultaneous_open,
2554 buffer_sizes,
2555 smss,
2556 rcv_wnd_scale,
2557 snd_wnd_scale,
2558 sack_permitted,
2559 }) => {
2560 let next = *iss + 1;
2573 if seg_ack != next {
2574 return (Some(Segment::rst(seg_ack)), NewlyClosed::No);
2575 } else {
2576 let mut rtt_estimator = Estimator::default();
2577 if let Some(syn_rcvd_ts) = syn_rcvd_ts {
2578 rtt_estimator.sample(now.saturating_duration_since(*syn_rcvd_ts));
2579 }
2580 let (rcv_buffer, snd_buffer) = match simultaneous_open.take() {
2581 None => {
2582 let (rcv_buffer, snd_buffer, client) =
2583 BP::new_passive_open_buffers(*buffer_sizes);
2584 assert_matches!(passive_open.replace(client), None);
2585 (rcv_buffer, snd_buffer)
2586 }
2587 Some(active_open) => active_open.into_buffers(*buffer_sizes),
2588 };
2589 let (snd_wnd_scale, rcv_wnd_scale) = snd_wnd_scale
2590 .map(|snd_wnd_scale| (snd_wnd_scale, *rcv_wnd_scale))
2591 .unwrap_or_default();
2592 let established = Established {
2593 snd: Send {
2594 nxt: next,
2595 max: next,
2596 una: seg_ack,
2597 wnd: seg_wnd << snd_wnd_scale,
2598 wl1: seg_seq,
2599 wl2: seg_ack,
2600 last_push: next,
2601 buffer: snd_buffer,
2602 rtt_sampler: RttSampler::default(),
2603 rtt_estimator,
2604 timer: None,
2605 congestion_control: CongestionControl::cubic_with_mss(*smss),
2606 wnd_scale: snd_wnd_scale,
2607 wnd_max: seg_wnd << snd_wnd_scale,
2608 }
2609 .into(),
2610 rcv: Recv {
2611 buffer: RecvBufferState::Open {
2612 buffer: rcv_buffer,
2613 assembler: Assembler::new(*irs + 1),
2614 },
2615 timer: None,
2616 mss: *smss,
2617 wnd_scale: rcv_wnd_scale,
2618 last_segment_at: None,
2619 remaining_quickacks: quickack_counter(
2620 buffer_sizes.rcv_limits(),
2621 *smss,
2622 ),
2623 last_window_update: (*irs + 1, buffer_sizes.rwnd()),
2624 sack_permitted: *sack_permitted,
2625 }
2626 .into(),
2627 };
2628 assert_eq!(
2629 self.transition_to_state(counters, State::Established(established)),
2630 NewlyClosed::No
2631 );
2632 }
2633 }
2637 State::Established(Established { snd, rcv }) => {
2638 let (ack, segment_acked_data) = snd.process_ack(
2639 id,
2640 counters,
2641 seg_seq,
2642 seg_ack,
2643 seg_wnd,
2644 seg_options.sack_blocks(),
2645 pure_ack,
2646 rcv.get_mut(),
2647 now,
2648 options,
2649 );
2650 data_acked = segment_acked_data;
2651 if let Some(ack) = ack {
2652 return (Some(ack), NewlyClosed::No);
2653 }
2654 }
2655 State::CloseWait(CloseWait { snd, closed_rcv }) => {
2656 let (ack, segment_acked_data) = snd.process_ack(
2657 id,
2658 counters,
2659 seg_seq,
2660 seg_ack,
2661 seg_wnd,
2662 seg_options.sack_blocks(),
2663 pure_ack,
2664 &*closed_rcv,
2665 now,
2666 options,
2667 );
2668 data_acked = segment_acked_data;
2669 if let Some(ack) = ack {
2670 return (Some(ack), NewlyClosed::No);
2671 }
2672 }
2673 State::LastAck(LastAck { snd, closed_rcv }) => {
2674 let BufferLimits { len, capacity: _ } = snd.buffer.limits();
2675 let fin_seq = snd.una + len + 1;
2676 let (ack, segment_acked_data) = snd.process_ack(
2677 id,
2678 counters,
2679 seg_seq,
2680 seg_ack,
2681 seg_wnd,
2682 seg_options.sack_blocks(),
2683 pure_ack,
2684 &*closed_rcv,
2685 now,
2686 options,
2687 );
2688 data_acked = segment_acked_data;
2689 if let Some(ack) = ack {
2690 return (Some(ack), NewlyClosed::No);
2691 } else if seg_ack == fin_seq {
2692 return (
2693 None,
2694 self.transition_to_state(
2695 counters,
2696 State::Closed(Closed { reason: None }),
2697 ),
2698 );
2699 }
2700 }
2701 State::FinWait1(FinWait1 { snd, rcv }) => {
2702 let BufferLimits { len, capacity: _ } = snd.buffer.limits();
2703 let fin_seq = snd.una + len + 1;
2704 let (ack, segment_acked_data) = snd.process_ack(
2705 id,
2706 counters,
2707 seg_seq,
2708 seg_ack,
2709 seg_wnd,
2710 seg_options.sack_blocks(),
2711 pure_ack,
2712 rcv.get_mut(),
2713 now,
2714 options,
2715 );
2716 data_acked = segment_acked_data;
2717 if let Some(ack) = ack {
2718 return (Some(ack), NewlyClosed::No);
2719 } else if seg_ack == fin_seq {
2720 let last_seq = snd.nxt;
2726 let finwait2 = FinWait2 {
2727 last_seq,
2728 rcv: rcv.to_ref().take(),
2729 timeout_at: fin_wait2_timeout.and_then(|timeout| {
2733 defunct.then_some(now.saturating_add(timeout))
2734 }),
2735 };
2736 assert_eq!(
2737 self.transition_to_state(counters, State::FinWait2(finwait2)),
2738 NewlyClosed::No
2739 );
2740 }
2741 }
2742 State::Closing(Closing { snd, closed_rcv }) => {
2743 let BufferLimits { len, capacity: _ } = snd.buffer.limits();
2744 let fin_seq = snd.una + len + 1;
2745 let (ack, segment_acked_data) = snd.process_ack(
2746 id,
2747 counters,
2748 seg_seq,
2749 seg_ack,
2750 seg_wnd,
2751 seg_options.sack_blocks(),
2752 pure_ack,
2753 &*closed_rcv,
2754 now,
2755 options,
2756 );
2757 data_acked = segment_acked_data;
2758 if let Some(ack) = ack {
2759 data_acked = segment_acked_data;
2760 return (Some(ack), NewlyClosed::No);
2761 } else if seg_ack == fin_seq {
2762 let timewait = TimeWait {
2767 last_seq: snd.nxt,
2768 expiry: new_time_wait_expiry(now),
2769 closed_rcv: closed_rcv.clone(),
2770 };
2771 assert_eq!(
2772 self.transition_to_state(counters, State::TimeWait(timewait)),
2773 NewlyClosed::No
2774 );
2775 }
2776 }
2777 State::FinWait2(_) | State::TimeWait(_) => {}
2778 },
2779 None => return (None, NewlyClosed::No),
2782 }
2783 let maybe_ack_to_text = |rcv: &mut Recv<I, R>, rto: Rto| {
2805 if !needs_ack {
2806 return (None, rcv.nxt());
2807 }
2808
2809 if let Some(last) = rcv.last_segment_at.replace(now) {
2813 if now.saturating_duration_since(last) >= rto.get() {
2814 rcv.reset_quickacks();
2815 }
2816 }
2817
2818 let had_out_of_order = rcv.buffer.has_out_of_order();
2821 if data.len() > 0 {
2822 let offset = usize::try_from(seg_seq - rcv.nxt()).unwrap_or_else(|TryFromIntError {..}| {
2823 panic!("The segment was trimmed to fit the window, thus seg.seq({:?}) must not come before rcv.nxt({:?})", seg_seq, rcv.nxt());
2824 });
2825 match &mut rcv.buffer {
2826 RecvBufferState::Open { buffer, assembler } => {
2827 let nwritten = buffer.write_at(offset, &data);
2828 let readable = assembler.insert(seg_seq..seg_seq + nwritten);
2829 buffer.make_readable(readable, assembler.has_outstanding());
2830 }
2831 RecvBufferState::Closed { nxt, .. } => *nxt = seg_seq + data.len(),
2832 }
2833 }
2834 let immediate_ack = !*delayed_ack
2840 || had_out_of_order
2841 || rcv.buffer.has_out_of_order()
2842 || rcv.remaining_quickacks != 0
2844 || rcv.timer.is_some();
2869
2870 if immediate_ack {
2871 rcv.timer = None;
2872 } else {
2873 rcv.timer = Some(ReceiveTimer::DelayedAck {
2874 at: now.panicking_add(ACK_DELAY_THRESHOLD),
2875 });
2876 }
2877 let segment =
2878 (!matches!(rcv.timer, Some(ReceiveTimer::DelayedAck { .. }))).then(|| {
2879 rcv.remaining_quickacks = rcv.remaining_quickacks.saturating_sub(1);
2880 rcv.make_ack(snd_max)
2881 });
2882 (segment, rcv.nxt())
2883 };
2884
2885 let (ack_to_text, rcv_nxt) = match self {
2886 State::Closed(_) | State::Listen(_) | State::SynRcvd(_) | State::SynSent(_) => {
2887 unreachable!("encountered an already-handled state: {:?}", self)
2889 }
2890 State::Established(Established { snd, rcv }) => {
2891 maybe_ack_to_text(rcv.get_mut(), snd.rtt_estimator.rto())
2892 }
2893 State::FinWait1(FinWait1 { snd, rcv }) => {
2894 maybe_ack_to_text(rcv.get_mut(), snd.rtt_estimator.rto())
2895 }
2896 State::FinWait2(FinWait2 { last_seq: _, rcv, timeout_at: _ }) => {
2897 maybe_ack_to_text(rcv, Rto::DEFAULT)
2898 }
2899 State::CloseWait(CloseWait { closed_rcv, .. })
2900 | State::LastAck(LastAck { closed_rcv, .. })
2901 | State::Closing(Closing { closed_rcv, .. })
2902 | State::TimeWait(TimeWait { closed_rcv, .. }) => {
2903 (None, closed_rcv.ack)
2907 }
2908 };
2909 let ack_to_fin = if control == Some(Control::FIN) && rcv_nxt == seg_seq + data.len() {
2912 match self {
2917 State::Closed(_) | State::Listen(_) | State::SynRcvd(_) | State::SynSent(_) => {
2918 unreachable!("encountered an already-handled state: {:?}", self)
2920 }
2921 State::Established(Established { snd, rcv }) => {
2922 let params = rcv.handle_fin();
2925 let segment = params.make_ack(snd_max);
2926 let closewait =
2927 CloseWait { snd: snd.to_ref().to_takeable(), closed_rcv: params };
2928 assert_eq!(
2929 self.transition_to_state(counters, State::CloseWait(closewait)),
2930 NewlyClosed::No
2931 );
2932 Some(segment)
2933 }
2934 State::CloseWait(_) | State::LastAck(_) | State::Closing(_) => {
2935 None
2943 }
2944 State::FinWait1(FinWait1 { snd, rcv }) => {
2945 let params = rcv.handle_fin();
2946 let segment = params.make_ack(snd_max);
2947 let closing = Closing { snd: snd.to_ref().take(), closed_rcv: params };
2948 assert_eq!(
2949 self.transition_to_state(counters, State::Closing(closing)),
2950 NewlyClosed::No
2951 );
2952 Some(segment)
2953 }
2954 State::FinWait2(FinWait2 { last_seq, rcv, timeout_at: _ }) => {
2955 let params = rcv.handle_fin();
2956 let segment = params.make_ack(snd_max);
2957 let timewait = TimeWait {
2958 last_seq: *last_seq,
2959 expiry: new_time_wait_expiry(now),
2960 closed_rcv: params,
2961 };
2962 assert_eq!(
2963 self.transition_to_state(counters, State::TimeWait(timewait)),
2964 NewlyClosed::No,
2965 );
2966 Some(segment)
2967 }
2968 State::TimeWait(TimeWait { last_seq, expiry, closed_rcv }) => {
2969 *expiry = new_time_wait_expiry(now);
2974 Some(closed_rcv.make_ack(*last_seq))
2975 }
2976 }
2977 } else {
2978 None
2979 };
2980 (ack_to_fin.or(ack_to_text), NewlyClosed::No)
2983 })();
2984 (seg, passive_open, data_acked, newly_closed)
2985 }
2986
2987 pub(crate) fn poll_receive_data_dequeued(&mut self) -> Option<Segment<()>> {
2991 let (rcv, snd_max) = match self {
2992 State::Closed(_)
2993 | State::Listen(_)
2994 | State::SynRcvd(_)
2995 | State::SynSent(_)
2996 | State::CloseWait(_)
2997 | State::LastAck(_)
2998 | State::Closing(_)
2999 | State::TimeWait(_) => return None,
3000 State::Established(Established { snd, rcv }) => (rcv.get_mut(), snd.max),
3001 State::FinWait1(FinWait1 { snd, rcv }) => (rcv.get_mut(), snd.max),
3002 State::FinWait2(FinWait2 { last_seq, rcv, timeout_at: _ }) => (rcv, *last_seq),
3003 };
3004
3005 rcv.poll_receive_data_dequeued(snd_max)
3006 }
3007
3008 pub(crate) fn poll_send(
3016 &mut self,
3017 id: &impl StateMachineDebugId,
3018 counters: &TcpCountersRefs<'_>,
3019 limit: u32,
3020 now: I,
3021 socket_options: &SocketOptions,
3022 ) -> Result<Segment<S::Payload<'_>>, NewlyClosed> {
3023 let newly_closed = self.poll_close(counters, now, socket_options);
3024 if matches!(self, State::Closed(_)) {
3025 return Err(newly_closed);
3026 }
3027 fn poll_rcv_then_snd<
3028 'a,
3029 I: Instant,
3030 R: ReceiveBuffer,
3031 S: SendBuffer,
3032 M: StateMachineDebugId,
3033 const FIN_QUEUED: bool,
3034 >(
3035 id: &M,
3036 counters: &TcpCountersRefs<'_>,
3037 snd: &'a mut Send<I, S, FIN_QUEUED>,
3038 rcv: &'a mut Recv<I, R>,
3039 limit: u32,
3040 now: I,
3041 socket_options: &SocketOptions,
3042 ) -> Option<Segment<S::Payload<'a>>> {
3043 let seg = rcv
3049 .poll_send(snd.max, now)
3050 .map(|seg| seg.into_empty())
3051 .or_else(|| snd.poll_send(id, counters, &mut *rcv, limit, now, socket_options));
3052 if seg.is_some() && matches!(rcv.timer, Some(ReceiveTimer::DelayedAck { .. })) {
3054 rcv.timer = None;
3055 }
3056 seg
3057 }
3058 let seg = match self {
3059 State::SynSent(SynSent {
3060 iss,
3061 timestamp,
3062 retrans_timer,
3063 active_open: _,
3064 buffer_sizes: _,
3065 device_mss,
3066 default_mss: _,
3067 rcv_wnd_scale,
3068 }) => (retrans_timer.at <= now).then(|| {
3069 *timestamp = None;
3070 retrans_timer.backoff(now);
3071 Segment::syn(
3072 *iss,
3073 UnscaledWindowSize::from(u16::MAX),
3074 HandshakeOptions {
3075 mss: Some(*device_mss),
3076 window_scale: Some(*rcv_wnd_scale),
3077 sack_permitted: SACK_PERMITTED,
3078 }
3079 .into(),
3080 )
3081 }),
3082 State::SynRcvd(SynRcvd {
3083 iss,
3084 irs,
3085 timestamp,
3086 retrans_timer,
3087 simultaneous_open: _,
3088 buffer_sizes: _,
3089 smss,
3090 rcv_wnd_scale,
3091 snd_wnd_scale,
3092 sack_permitted: _,
3093 }) => (retrans_timer.at <= now).then(|| {
3094 *timestamp = None;
3095 retrans_timer.backoff(now);
3096 Segment::syn_ack(
3097 *iss,
3098 *irs + 1,
3099 UnscaledWindowSize::from(u16::MAX),
3100 HandshakeOptions {
3101 mss: Some(*smss),
3102 window_scale: snd_wnd_scale.map(|_| *rcv_wnd_scale),
3103 sack_permitted: SACK_PERMITTED,
3104 }
3105 .into(),
3106 )
3107 }),
3108 State::Established(Established { snd, rcv }) => {
3109 poll_rcv_then_snd(id, counters, snd, rcv, limit, now, socket_options)
3110 }
3111 State::CloseWait(CloseWait { snd, closed_rcv }) => {
3112 snd.poll_send(id, counters, &*closed_rcv, limit, now, socket_options)
3113 }
3114 State::LastAck(LastAck { snd, closed_rcv })
3115 | State::Closing(Closing { snd, closed_rcv }) => {
3116 snd.poll_send(id, counters, &*closed_rcv, limit, now, socket_options)
3117 }
3118 State::FinWait1(FinWait1 { snd, rcv }) => {
3119 poll_rcv_then_snd(id, counters, snd, rcv, limit, now, socket_options)
3120 }
3121 State::FinWait2(FinWait2 { last_seq, rcv, timeout_at: _ }) => {
3122 rcv.poll_send(*last_seq, now).map(|seg| seg.into_empty())
3123 }
3124 State::Closed(_) | State::Listen(_) | State::TimeWait(_) => None,
3125 };
3126 seg.ok_or(NewlyClosed::No)
3127 }
3128
3129 fn poll_close(
3133 &mut self,
3134 counters: &TcpCountersRefs<'_>,
3135 now: I,
3136 SocketOptions {
3137 keep_alive,
3138 nagle_enabled: _,
3139 user_timeout: _,
3140 delayed_ack: _,
3141 fin_wait2_timeout: _,
3142 max_syn_retries: _,
3143 ip_options: _,
3144 }: &SocketOptions,
3145 ) -> NewlyClosed {
3146 let timed_out = match self {
3147 State::Established(Established { snd, rcv: _ }) => snd.timed_out(now, keep_alive),
3148 State::CloseWait(CloseWait { snd, closed_rcv: _ }) => snd.timed_out(now, keep_alive),
3149 State::LastAck(LastAck { snd, closed_rcv: _ })
3150 | State::Closing(Closing { snd, closed_rcv: _ }) => snd.timed_out(now, keep_alive),
3151 State::FinWait1(FinWait1 { snd, rcv: _ }) => snd.timed_out(now, keep_alive),
3152 State::SynSent(SynSent {
3153 iss: _,
3154 timestamp: _,
3155 retrans_timer,
3156 active_open: _,
3157 buffer_sizes: _,
3158 device_mss: _,
3159 default_mss: _,
3160 rcv_wnd_scale: _,
3161 })
3162 | State::SynRcvd(SynRcvd {
3163 iss: _,
3164 irs: _,
3165 timestamp: _,
3166 retrans_timer,
3167 simultaneous_open: _,
3168 buffer_sizes: _,
3169 smss: _,
3170 rcv_wnd_scale: _,
3171 snd_wnd_scale: _,
3172 sack_permitted: _,
3173 }) => retrans_timer.timed_out(now),
3174
3175 State::Closed(_) | State::Listen(_) | State::TimeWait(_) => false,
3176 State::FinWait2(FinWait2 { last_seq: _, rcv: _, timeout_at }) => {
3177 timeout_at.map(|at| now >= at).unwrap_or(false)
3178 }
3179 };
3180 if timed_out {
3181 return self.transition_to_state(
3182 counters,
3183 State::Closed(Closed { reason: Some(ConnectionError::TimedOut) }),
3184 );
3185 } else if let State::TimeWait(tw) = self {
3186 if tw.expiry <= now {
3187 return self.transition_to_state(counters, State::Closed(Closed { reason: None }));
3188 }
3189 }
3190 NewlyClosed::No
3191 }
3192
3193 pub(crate) fn poll_send_at(&self) -> Option<I> {
3212 let combine_expiry = |e1: Option<I>, e2: Option<I>| match (e1, e2) {
3213 (None, None) => None,
3214 (None, Some(e2)) => Some(e2),
3215 (Some(e1), None) => Some(e1),
3216 (Some(e1), Some(e2)) => Some(e1.min(e2)),
3217 };
3218 match self {
3219 State::Established(Established { snd, rcv }) => combine_expiry(
3220 snd.timer.as_ref().map(SendTimer::expiry),
3221 rcv.timer.as_ref().map(ReceiveTimer::expiry),
3222 ),
3223 State::CloseWait(CloseWait { snd, closed_rcv: _ }) => Some(snd.timer?.expiry()),
3224 State::LastAck(LastAck { snd, closed_rcv: _ })
3225 | State::Closing(Closing { snd, closed_rcv: _ }) => Some(snd.timer?.expiry()),
3226 State::FinWait1(FinWait1 { snd, rcv }) => combine_expiry(
3227 snd.timer.as_ref().map(SendTimer::expiry),
3228 rcv.timer.as_ref().map(ReceiveTimer::expiry),
3229 ),
3230 State::FinWait2(FinWait2 { last_seq: _, rcv, timeout_at }) => {
3231 combine_expiry(*timeout_at, rcv.timer.as_ref().map(ReceiveTimer::expiry))
3232 }
3233 State::SynRcvd(syn_rcvd) => Some(syn_rcvd.retrans_timer.at),
3234 State::SynSent(syn_sent) => Some(syn_sent.retrans_timer.at),
3235 State::Closed(_) | State::Listen(_) => None,
3236 State::TimeWait(TimeWait { last_seq: _, expiry, closed_rcv: _ }) => Some(*expiry),
3237 }
3238 }
3239
3240 pub(super) fn close(
3247 &mut self,
3248 counters: &TcpCountersRefs<'_>,
3249 close_reason: CloseReason<I>,
3250 socket_options: &SocketOptions,
3251 ) -> Result<NewlyClosed, CloseError>
3252 where
3253 ActiveOpen: IntoBuffers<R, S>,
3254 {
3255 match self {
3256 State::Closed(_) => Err(CloseError::NoConnection),
3257 State::Listen(_) | State::SynSent(_) => {
3258 Ok(self.transition_to_state(counters, State::Closed(Closed { reason: None })))
3259 }
3260 State::SynRcvd(SynRcvd {
3261 iss,
3262 irs,
3263 timestamp: _,
3264 retrans_timer: _,
3265 simultaneous_open,
3266 buffer_sizes,
3267 smss,
3268 rcv_wnd_scale,
3269 snd_wnd_scale,
3270 sack_permitted,
3271 }) => {
3272 let (rcv_buffer, snd_buffer) = simultaneous_open
3292 .take()
3293 .expect(
3294 "a SYN-RCVD state that is in the pending queue \
3295 should call abort instead of close",
3296 )
3297 .into_buffers(*buffer_sizes);
3298 let (snd_wnd_scale, rcv_wnd_scale) = snd_wnd_scale
3302 .map(|snd_wnd_scale| (snd_wnd_scale, *rcv_wnd_scale))
3303 .unwrap_or_default();
3304 let next = *iss + 1;
3305 let finwait1 = FinWait1 {
3306 snd: Send {
3307 nxt: next,
3308 max: next,
3309 una: next,
3310 wnd: WindowSize::DEFAULT,
3311 wl1: *iss,
3312 wl2: *irs,
3313 last_push: next,
3314 buffer: snd_buffer,
3315 rtt_sampler: RttSampler::default(),
3316 rtt_estimator: Estimator::NoSample,
3317 timer: None,
3318 congestion_control: CongestionControl::cubic_with_mss(*smss),
3319 wnd_scale: snd_wnd_scale,
3320 wnd_max: WindowSize::DEFAULT,
3321 }
3322 .into(),
3323 rcv: Recv {
3324 buffer: RecvBufferState::Open {
3325 buffer: rcv_buffer,
3326 assembler: Assembler::new(*irs + 1),
3327 },
3328 timer: None,
3329 mss: *smss,
3330 remaining_quickacks: quickack_counter(buffer_sizes.rcv_limits(), *smss),
3331 last_segment_at: None,
3332 wnd_scale: rcv_wnd_scale,
3333 last_window_update: (*irs + 1, buffer_sizes.rwnd()),
3334 sack_permitted: *sack_permitted,
3335 }
3336 .into(),
3337 };
3338 Ok(self.transition_to_state(counters, State::FinWait1(finwait1)))
3339 }
3340 State::Established(Established { snd, rcv }) => {
3341 let finwait1 = FinWait1 {
3347 snd: snd.to_ref().take().queue_fin().into(),
3348 rcv: rcv.to_ref().to_takeable(),
3349 };
3350 Ok(self.transition_to_state(counters, State::FinWait1(finwait1)))
3351 }
3352 State::CloseWait(CloseWait { snd, closed_rcv }) => {
3353 let lastack = LastAck {
3354 snd: snd.to_ref().take().queue_fin(),
3355 closed_rcv: closed_rcv.clone(),
3356 };
3357 Ok(self.transition_to_state(counters, State::LastAck(lastack)))
3358 }
3359 State::LastAck(_) | State::FinWait1(_) | State::Closing(_) | State::TimeWait(_) => {
3360 Err(CloseError::Closing)
3361 }
3362 State::FinWait2(FinWait2 { last_seq: _, rcv: _, timeout_at }) => {
3363 if let (CloseReason::Close { now }, Some(fin_wait2_timeout)) =
3364 (close_reason, socket_options.fin_wait2_timeout)
3365 {
3366 assert_eq!(timeout_at.replace(now.saturating_add(fin_wait2_timeout)), None);
3367 }
3368 Err(CloseError::Closing)
3369 }
3370 }
3371 }
3372
3373 pub(super) fn shutdown_recv(&mut self) -> Result<(), CloseError> {
3374 match self {
3375 State::Closed(_) => Err(CloseError::NoConnection),
3376
3377 State::Listen(_)
3378 | State::SynSent(_)
3379 | State::SynRcvd(_)
3380 | State::CloseWait(_)
3381 | State::LastAck(_)
3382 | State::Closing(_)
3383 | State::TimeWait(_) => Ok(()),
3384
3385 State::Established(Established { rcv, .. }) | State::FinWait1(FinWait1 { rcv, .. }) => {
3387 rcv.buffer.close();
3388 Ok(())
3389 }
3390 State::FinWait2(FinWait2 { rcv, .. }) => {
3391 rcv.buffer.close();
3392 Ok(())
3393 }
3394 }
3395 }
3396
3397 pub(crate) fn abort(
3400 &mut self,
3401 counters: &TcpCountersRefs<'_>,
3402 ) -> (Option<Segment<()>>, NewlyClosed) {
3403 let reply = match self {
3404 State::Closed(_)
3417 | State::Listen(_)
3418 | State::SynSent(_)
3419 | State::Closing(_)
3420 | State::LastAck(_)
3421 | State::TimeWait(_) => None,
3422 State::SynRcvd(SynRcvd {
3434 iss,
3435 irs,
3436 timestamp: _,
3437 retrans_timer: _,
3438 simultaneous_open: _,
3439 buffer_sizes: _,
3440 smss: _,
3441 rcv_wnd_scale: _,
3442 snd_wnd_scale: _,
3443 sack_permitted: _,
3444 }) => {
3445 Some(Segment::rst_ack(*iss + 1, *irs + 1))
3448 }
3449 State::Established(Established { snd, rcv }) => {
3450 Some(Segment::rst_ack(snd.nxt, rcv.nxt()))
3451 }
3452 State::FinWait1(FinWait1 { snd, rcv }) => Some(Segment::rst_ack(snd.nxt, rcv.nxt())),
3453 State::FinWait2(FinWait2 { rcv, last_seq, timeout_at: _ }) => {
3454 Some(Segment::rst_ack(*last_seq, rcv.nxt()))
3455 }
3456 State::CloseWait(CloseWait { snd, closed_rcv }) => {
3457 Some(Segment::rst_ack(snd.nxt, closed_rcv.ack))
3458 }
3459 };
3460 (
3461 reply,
3462 self.transition_to_state(
3463 counters,
3464 State::Closed(Closed { reason: Some(ConnectionError::ConnectionReset) }),
3465 ),
3466 )
3467 }
3468
3469 pub(crate) fn buffers_mut(&mut self) -> BuffersRefMut<'_, R, S> {
3470 match self {
3471 State::TimeWait(_) | State::Closed(_) => BuffersRefMut::NoBuffers,
3472 State::Listen(Listen { buffer_sizes, .. })
3473 | State::SynRcvd(SynRcvd { buffer_sizes, .. })
3474 | State::SynSent(SynSent { buffer_sizes, .. }) => BuffersRefMut::Sizes(buffer_sizes),
3475 State::Established(Established { snd, rcv }) => match &mut rcv.buffer {
3476 RecvBufferState::Open { buffer: ref mut recv_buf, .. } => {
3477 BuffersRefMut::Both { send: &mut snd.buffer, recv: recv_buf }
3478 }
3479 RecvBufferState::Closed { .. } => BuffersRefMut::SendOnly(&mut snd.buffer),
3480 },
3481 State::FinWait1(FinWait1 { snd, rcv }) => match &mut rcv.buffer {
3482 RecvBufferState::Open { buffer: ref mut recv_buf, .. } => {
3483 BuffersRefMut::Both { send: &mut snd.buffer, recv: recv_buf }
3484 }
3485 RecvBufferState::Closed { .. } => BuffersRefMut::SendOnly(&mut snd.buffer),
3486 },
3487 State::FinWait2(FinWait2::<I, R> { rcv, .. }) => match &mut rcv.buffer {
3488 RecvBufferState::Open { buffer: ref mut recv_buf, .. } => {
3489 BuffersRefMut::RecvOnly(recv_buf)
3490 }
3491 RecvBufferState::Closed { .. } => BuffersRefMut::NoBuffers,
3492 },
3493 State::Closing(Closing::<I, S> { snd, .. })
3494 | State::LastAck(LastAck::<I, S> { snd, .. }) => {
3495 BuffersRefMut::SendOnly(&mut snd.buffer)
3496 }
3497 State::CloseWait(CloseWait::<I, S> { snd, .. }) => {
3498 BuffersRefMut::SendOnly(&mut snd.buffer)
3499 }
3500 }
3501 }
3502
3503 pub(super) fn on_icmp_error(
3506 &mut self,
3507 counters: &TcpCountersRefs<'_>,
3508 err: IcmpErrorCode,
3509 seq: SeqNum,
3510 ) -> (Option<ConnectionError>, NewlyClosed, ShouldRetransmit) {
3511 let Some(result) = IcmpErrorResult::try_from_icmp_error(err) else {
3512 return (None, NewlyClosed::No, ShouldRetransmit::No);
3513 };
3514 let err = match result {
3515 IcmpErrorResult::ConnectionError(err) => err,
3516 IcmpErrorResult::PmtuUpdate(mms) => {
3517 let should_send = if let Some(mss) = Mss::from_mms(mms) {
3518 self.on_pmtu_update(mss, seq)
3519 } else {
3520 ShouldRetransmit::No
3521 };
3522 return (None, NewlyClosed::No, should_send);
3523 }
3524 };
3525 let connect_error = match self {
3546 State::Closed(_) => None,
3547 State::Listen(listen) => unreachable!(
3548 "ICMP errors should not be delivered on a listener, received code {:?} on {:?}",
3549 err, listen
3550 ),
3551 State::SynRcvd(SynRcvd {
3552 iss,
3553 irs: _,
3554 timestamp: _,
3555 retrans_timer: _,
3556 simultaneous_open: _,
3557 buffer_sizes: _,
3558 smss: _,
3559 rcv_wnd_scale: _,
3560 snd_wnd_scale: _,
3561 sack_permitted: _,
3562 })
3563 | State::SynSent(SynSent {
3564 iss,
3565 timestamp: _,
3566 retrans_timer: _,
3567 active_open: _,
3568 buffer_sizes: _,
3569 device_mss: _,
3570 default_mss: _,
3571 rcv_wnd_scale: _,
3572 }) => {
3573 if *iss == seq {
3574 return (
3575 None,
3576 self.transition_to_state(
3577 counters,
3578 State::Closed(Closed { reason: Some(err) }),
3579 ),
3580 ShouldRetransmit::No,
3581 );
3582 }
3583 None
3584 }
3585 State::Established(Established { snd, rcv: _ })
3586 | State::CloseWait(CloseWait { snd, closed_rcv: _ }) => {
3587 (!snd.una.after(seq) && seq.before(snd.nxt)).then_some(err)
3588 }
3589 State::LastAck(LastAck { snd, closed_rcv: _ })
3590 | State::Closing(Closing { snd, closed_rcv: _ }) => {
3591 (!snd.una.after(seq) && seq.before(snd.nxt)).then_some(err)
3592 }
3593 State::FinWait1(FinWait1 { snd, rcv: _ }) => {
3594 (!snd.una.after(seq) && seq.before(snd.nxt)).then_some(err)
3595 }
3596 State::FinWait2(_) | State::TimeWait(_) => None,
3599 };
3600 (connect_error, NewlyClosed::No, ShouldRetransmit::No)
3601 }
3602
3603 fn on_pmtu_update(&mut self, mss: Mss, seq: SeqNum) -> ShouldRetransmit {
3604 match self {
3607 State::Listen(listen) => unreachable!(
3608 "PMTU updates should not be delivered to a listener, received {mss:?} on {listen:?}"
3609 ),
3610 State::Closed(_)
3611 | State::SynRcvd(_)
3612 | State::SynSent(_)
3613 | State::FinWait2(_)
3614 | State::TimeWait(_) => {}
3615 State::Established(Established { snd, .. })
3616 | State::CloseWait(CloseWait { snd, .. }) => {
3617 if !snd.una.after(seq) && seq.before(snd.nxt) {
3618 return snd.update_mss(mss, seq);
3619 }
3620 }
3621 State::LastAck(LastAck { snd, .. }) | State::Closing(Closing { snd, .. }) => {
3622 if !snd.una.after(seq) && seq.before(snd.nxt) {
3623 return snd.update_mss(mss, seq);
3624 }
3625 }
3626 State::FinWait1(FinWait1 { snd, .. }) => {
3627 if !snd.una.after(seq) && seq.before(snd.nxt) {
3628 return snd.update_mss(mss, seq);
3629 }
3630 }
3631 }
3632 ShouldRetransmit::No
3633 }
3634}
3635
3636pub(super) enum CloseReason<I: Instant> {
3640 Shutdown,
3641 Close { now: I },
3642}
3643
3644#[cfg(test)]
3645mod test {
3646 use alloc::vec;
3647 use alloc::vec::Vec;
3648 use core::fmt::Debug;
3649 use core::num::NonZeroU16;
3650 use core::time::Duration;
3651
3652 use assert_matches::assert_matches;
3653 use net_types::ip::Ipv4;
3654 use netstack3_base::testutil::{FakeInstant, FakeInstantCtx};
3655 use netstack3_base::{FragmentedPayload, InstantContext as _, Options, SackBlock};
3656 use test_case::{test_case, test_matrix};
3657
3658 use super::*;
3659 use crate::internal::base::DEFAULT_FIN_WAIT2_TIMEOUT;
3660 use crate::internal::buffer::testutil::{InfiniteSendBuffer, RepeatingSendBuffer, RingBuffer};
3661 use crate::internal::buffer::Buffer;
3662 use crate::internal::congestion::DUP_ACK_THRESHOLD;
3663 use crate::internal::counters::testutil::CounterExpectations;
3664 use crate::internal::counters::TcpCountersWithSocketInner;
3665 use crate::internal::testutil::{
3666 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE_USIZE,
3667 };
3668
3669 const TEST_IRS: SeqNum = SeqNum::new(100);
3670 const TEST_ISS: SeqNum = SeqNum::new(300);
3671
3672 const ISS_1: SeqNum = SeqNum::new(500);
3673 const ISS_2: SeqNum = SeqNum::new(700);
3674
3675 const RTT: Duration = Duration::from_millis(500);
3676
3677 const DEVICE_MAXIMUM_SEGMENT_SIZE: Mss = Mss(NonZeroU16::new(1400 as u16).unwrap());
3678
3679 fn default_quickack_counter() -> usize {
3680 quickack_counter(
3681 BufferLimits { capacity: WindowSize::DEFAULT.into(), len: 0 },
3682 DEVICE_MAXIMUM_SEGMENT_SIZE,
3683 )
3684 }
3685
3686 impl SocketOptions {
3687 fn default_for_state_tests() -> Self {
3688 Self { delayed_ack: false, nagle_enabled: false, ..Default::default() }
3691 }
3692 }
3693
3694 enum ClientlessBufferProvider {}
3697
3698 impl<R: ReceiveBuffer + Default, S: SendBuffer + Default> BufferProvider<R, S>
3699 for ClientlessBufferProvider
3700 {
3701 type PassiveOpen = ();
3702 type ActiveOpen = ();
3703
3704 fn new_passive_open_buffers(_buffer_sizes: BufferSizes) -> (R, S, Self::PassiveOpen) {
3705 (R::default(), S::default(), ())
3706 }
3707 }
3708
3709 impl RingBuffer {
3710 fn with_data<'a>(cap: usize, data: &'a [u8]) -> Self {
3711 let mut buffer = RingBuffer::new(cap);
3712 let nwritten = buffer.write_at(0, &data);
3713 assert_eq!(nwritten, data.len());
3714 buffer.make_readable(nwritten, false);
3715 buffer
3716 }
3717 }
3718
3719 #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
3721 struct NullBuffer;
3722
3723 impl Buffer for NullBuffer {
3724 fn capacity_range() -> (usize, usize) {
3725 (usize::MIN, usize::MAX)
3726 }
3727
3728 fn limits(&self) -> BufferLimits {
3729 BufferLimits { len: 0, capacity: 0 }
3730 }
3731
3732 fn target_capacity(&self) -> usize {
3733 0
3734 }
3735
3736 fn request_capacity(&mut self, _size: usize) {}
3737 }
3738
3739 impl ReceiveBuffer for NullBuffer {
3740 fn write_at<P: Payload>(&mut self, _offset: usize, _data: &P) -> usize {
3741 0
3742 }
3743
3744 fn make_readable(&mut self, count: usize, has_outstanding: bool) {
3745 assert_eq!(count, 0);
3746 assert_eq!(has_outstanding, false);
3747 }
3748 }
3749
3750 impl SendBuffer for NullBuffer {
3751 type Payload<'a> = &'a [u8];
3752
3753 fn mark_read(&mut self, count: usize) {
3754 assert_eq!(count, 0);
3755 }
3756
3757 fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
3758 where
3759 F: FnOnce(Self::Payload<'a>) -> R,
3760 {
3761 assert_eq!(offset, 0);
3762 f(&[])
3763 }
3764 }
3765
3766 #[derive(Debug)]
3767 struct FakeStateMachineDebugId;
3768
3769 impl StateMachineDebugId for FakeStateMachineDebugId {
3770 fn trace_id(&self) -> TraceResourceId<'_> {
3771 TraceResourceId::new(0)
3772 }
3773 }
3774
3775 impl<R: ReceiveBuffer, S: SendBuffer> State<FakeInstant, R, S, ()> {
3776 fn poll_send_with_default_options(
3777 &mut self,
3778 mss: u32,
3779 now: FakeInstant,
3780 counters: &TcpCountersRefs<'_>,
3781 ) -> Option<Segment<S::Payload<'_>>> {
3782 self.poll_send(
3783 &FakeStateMachineDebugId,
3784 counters,
3785 mss,
3786 now,
3787 &SocketOptions::default_for_state_tests(),
3788 )
3789 .ok()
3790 }
3791
3792 fn on_segment_with_default_options<P: Payload, BP: BufferProvider<R, S, ActiveOpen = ()>>(
3793 &mut self,
3794 incoming: Segment<P>,
3795 now: FakeInstant,
3796 counters: &TcpCountersRefs<'_>,
3797 ) -> (Option<Segment<()>>, Option<BP::PassiveOpen>)
3798 where
3799 BP::PassiveOpen: Debug,
3800 R: Default,
3801 S: Default,
3802 {
3803 self.on_segment_with_options::<_, BP>(
3804 incoming,
3805 now,
3806 counters,
3807 &SocketOptions::default_for_state_tests(),
3808 )
3809 }
3810
3811 fn on_segment_with_options<P: Payload, BP: BufferProvider<R, S, ActiveOpen = ()>>(
3812 &mut self,
3813 incoming: Segment<P>,
3814 now: FakeInstant,
3815 counters: &TcpCountersRefs<'_>,
3816 options: &SocketOptions,
3817 ) -> (Option<Segment<()>>, Option<BP::PassiveOpen>)
3818 where
3819 BP::PassiveOpen: Debug,
3820 R: Default,
3821 S: Default,
3822 {
3823 let (segment, passive_open, _data_acked, _newly_closed) = self.on_segment::<P, BP>(
3824 &FakeStateMachineDebugId,
3825 counters,
3826 incoming,
3827 now,
3828 options,
3829 false, );
3831 (segment, passive_open)
3832 }
3833
3834 fn recv_mut(&mut self) -> Option<&mut Recv<FakeInstant, R>> {
3835 match self {
3836 State::Closed(_)
3837 | State::Listen(_)
3838 | State::SynRcvd(_)
3839 | State::SynSent(_)
3840 | State::CloseWait(_)
3841 | State::LastAck(_)
3842 | State::Closing(_)
3843 | State::TimeWait(_) => None,
3844 State::Established(Established { rcv, .. })
3845 | State::FinWait1(FinWait1 { rcv, .. }) => Some(rcv.get_mut()),
3846 State::FinWait2(FinWait2 { rcv, .. }) => Some(rcv),
3847 }
3848 }
3849
3850 #[track_caller]
3851 fn assert_established(&mut self) -> &mut Established<FakeInstant, R, S> {
3852 assert_matches!(self, State::Established(e) => e)
3853 }
3854 }
3855
3856 impl<S: SendBuffer + Debug> State<FakeInstant, RingBuffer, S, ()> {
3857 fn read_with(&mut self, f: impl for<'b> FnOnce(&'b [&'_ [u8]]) -> usize) -> usize {
3858 match self {
3859 State::Closed(_)
3860 | State::Listen(_)
3861 | State::SynRcvd(_)
3862 | State::SynSent(_)
3863 | State::CloseWait(_)
3864 | State::LastAck(_)
3865 | State::Closing(_)
3866 | State::TimeWait(_) => {
3867 panic!("No receive state in {:?}", self);
3868 }
3869 State::Established(Established { snd: _, rcv })
3870 | State::FinWait1(FinWait1 { snd: _, rcv }) => {
3871 assert_matches!(&mut rcv.buffer, RecvBufferState::Open{ buffer, .. } => buffer.read_with(f))
3872 }
3873 State::FinWait2(FinWait2 { last_seq: _, rcv, timeout_at: _ }) => {
3874 assert_matches!(&mut rcv.buffer, RecvBufferState::Open{ buffer, .. } => buffer.read_with(f))
3875 }
3876 }
3877 }
3878 }
3879
3880 impl State<FakeInstant, RingBuffer, NullBuffer, ()> {
3881 fn new_syn_rcvd(instant: FakeInstant) -> Self {
3882 State::SynRcvd(SynRcvd {
3883 iss: TEST_ISS,
3884 irs: TEST_IRS,
3885 timestamp: Some(instant),
3886 retrans_timer: RetransTimer::new(instant, Rto::DEFAULT, None, DEFAULT_MAX_RETRIES),
3887 simultaneous_open: Some(()),
3888 buffer_sizes: Default::default(),
3889 smss: DEVICE_MAXIMUM_SEGMENT_SIZE,
3890 rcv_wnd_scale: WindowScale::default(),
3891 snd_wnd_scale: Some(WindowScale::default()),
3892 sack_permitted: SACK_PERMITTED,
3893 })
3894 }
3895 }
3896
3897 impl<S, const FIN_QUEUED: bool> Send<FakeInstant, S, FIN_QUEUED> {
3898 fn default_for_test_at(seq: SeqNum, buffer: S) -> Self {
3899 Self {
3900 nxt: seq,
3901 max: seq,
3902 una: seq,
3903 wnd: WindowSize::DEFAULT,
3904 wnd_max: WindowSize::DEFAULT,
3905 buffer,
3906 wl1: TEST_IRS + 1,
3907 wl2: seq,
3908 last_push: seq,
3909 rtt_estimator: Estimator::default(),
3910 rtt_sampler: RttSampler::default(),
3911 timer: None,
3912 congestion_control: CongestionControl::cubic_with_mss(DEVICE_MAXIMUM_SEGMENT_SIZE),
3913 wnd_scale: WindowScale::default(),
3914 }
3915 }
3916
3917 fn default_for_test(buffer: S) -> Self {
3918 Self::default_for_test_at(TEST_ISS + 1, buffer)
3919 }
3920 }
3921
3922 impl<R: ReceiveBuffer> Recv<FakeInstant, R> {
3923 fn default_for_test_at(seq: SeqNum, buffer: R) -> Self {
3924 let BufferLimits { capacity, len } = buffer.limits();
3925 let avail_buffer = capacity - len;
3926 Self {
3927 buffer: RecvBufferState::Open { buffer, assembler: Assembler::new(seq) },
3928 timer: None,
3929 mss: DEVICE_MAXIMUM_SEGMENT_SIZE,
3930 remaining_quickacks: 0,
3931 last_segment_at: None,
3932 wnd_scale: WindowScale::default(),
3933 last_window_update: (
3934 seq,
3935 WindowSize::from_u32(avail_buffer.try_into().unwrap()).unwrap(),
3936 ),
3937 sack_permitted: SACK_PERMITTED,
3938 }
3939 }
3940
3941 fn default_for_test(buffer: R) -> Self {
3942 Self::default_for_test_at(TEST_IRS + 1, buffer)
3943 }
3944 }
3945
3946 #[derive(Default)]
3947 struct FakeTcpCounters {
3948 stack_wide: TcpCountersWithSocketInner,
3949 per_socket: TcpCountersWithSocketInner,
3950 }
3951
3952 impl FakeTcpCounters {
3953 fn refs<'a>(&'a self) -> TcpCountersRefs<'a> {
3954 let Self { stack_wide, per_socket } = self;
3955 TcpCountersRefs { stack_wide, per_socket }
3956 }
3957 }
3958
3959 impl CounterExpectations {
3960 #[track_caller]
3961 fn assert_counters(&self, FakeTcpCounters { stack_wide, per_socket }: &FakeTcpCounters) {
3962 assert_eq!(self, &CounterExpectations::from(stack_wide), "stack-wide counter mismatch");
3963 assert_eq!(self, &CounterExpectations::from(per_socket), "per-socket counter mismatch");
3964 }
3965 }
3966
3967 #[test_case(Segment::rst(TEST_IRS) => None; "drop RST")]
3968 #[test_case(Segment::rst_ack(TEST_IRS, TEST_ISS) => None; "drop RST|ACK")]
3969 #[test_case(Segment::syn(TEST_IRS, UnscaledWindowSize::from(0), HandshakeOptions::default().into()) => Some(Segment::rst_ack(SeqNum::new(0), TEST_IRS + 1)); "reset SYN")]
3970 #[test_case(Segment::syn_ack(TEST_IRS, TEST_ISS, UnscaledWindowSize::from(0), HandshakeOptions::default().into()) => Some(Segment::rst(TEST_ISS)); "reset SYN|ACK")]
3971 #[test_case(Segment::with_data(TEST_IRS, TEST_ISS, UnscaledWindowSize::from(0), &[0, 1, 2][..]) => Some(Segment::rst(TEST_ISS)); "reset data segment")]
3972 fn segment_arrives_when_closed(
3973 incoming: impl Into<Segment<&'static [u8]>>,
3974 ) -> Option<Segment<()>> {
3975 let closed = Closed { reason: () };
3976 closed.on_segment(&incoming.into())
3977 }
3978
3979 #[test_case(
3980 Segment::rst_ack(TEST_ISS, TEST_IRS - 1), RTT
3981 => SynSentOnSegmentDisposition::Ignore; "unacceptable ACK with RST")]
3982 #[test_case(
3983 Segment::ack(TEST_ISS, TEST_IRS - 1, UnscaledWindowSize::from(u16::MAX)), RTT
3984 => SynSentOnSegmentDisposition::SendRst(
3985 Segment::rst(TEST_IRS-1),
3986 ); "unacceptable ACK without RST")]
3987 #[test_case(
3988 Segment::rst_ack(TEST_ISS, TEST_IRS), RTT
3989 => SynSentOnSegmentDisposition::EnterClosed(
3990 Closed { reason: Some(ConnectionError::ConnectionRefused) },
3991 ); "acceptable ACK(ISS) with RST")]
3992 #[test_case(
3993 Segment::rst_ack(TEST_ISS, TEST_IRS + 1), RTT
3994 => SynSentOnSegmentDisposition::EnterClosed(
3995 Closed { reason: Some(ConnectionError::ConnectionRefused) },
3996 ); "acceptable ACK(ISS+1) with RST")]
3997 #[test_case(
3998 Segment::rst(TEST_ISS), RTT
3999 => SynSentOnSegmentDisposition::Ignore; "RST without ack")]
4000 #[test_case(
4001 Segment::syn(
4002 TEST_ISS,
4003 UnscaledWindowSize::from(u16::MAX),
4004 HandshakeOptions {
4005 window_scale: Some(WindowScale::default()),
4006 ..Default::default() }.into()
4007 ), RTT
4008 => SynSentOnSegmentDisposition::SendSynAckAndEnterSynRcvd(
4009 Segment::syn_ack(
4010 TEST_IRS,
4011 TEST_ISS + 1,
4012 UnscaledWindowSize::from(u16::MAX),
4013 HandshakeOptions {
4014 mss: Some(Mss::default::<Ipv4>()),
4015 window_scale: Some(WindowScale::default()),
4016 sack_permitted: SACK_PERMITTED,
4017 ..Default::default()
4018 }.into()),
4019 SynRcvd {
4020 iss: TEST_IRS,
4021 irs: TEST_ISS,
4022 timestamp: Some(FakeInstant::from(RTT)),
4023 retrans_timer: RetransTimer::new(
4024 FakeInstant::from(RTT),
4025 Rto::DEFAULT,
4026 NonZeroDuration::new(TEST_USER_TIMEOUT.get() - RTT),
4027 DEFAULT_MAX_SYNACK_RETRIES
4028 ),
4029 simultaneous_open: None,
4030 buffer_sizes: BufferSizes::default(),
4031 smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4032 rcv_wnd_scale: WindowScale::default(),
4033 snd_wnd_scale: Some(WindowScale::default()),
4034 sack_permitted: false,
4035 }
4036 ); "SYN only")]
4037 #[test_case(
4038 Segment::fin(TEST_ISS, TEST_IRS + 1, UnscaledWindowSize::from(u16::MAX)), RTT
4039 => SynSentOnSegmentDisposition::Ignore; "acceptable ACK with FIN")]
4040 #[test_case(
4041 Segment::ack(TEST_ISS, TEST_IRS + 1, UnscaledWindowSize::from(u16::MAX)), RTT
4042 => SynSentOnSegmentDisposition::Ignore; "acceptable ACK(ISS+1) with nothing")]
4043 #[test_case(
4044 Segment::ack(TEST_ISS, TEST_IRS, UnscaledWindowSize::from(u16::MAX)), RTT
4045 => SynSentOnSegmentDisposition::Ignore; "acceptable ACK(ISS) without RST")]
4046 #[test_case(
4047 Segment::syn(TEST_ISS, UnscaledWindowSize::from(u16::MAX), HandshakeOptions::default().into()),
4048 TEST_USER_TIMEOUT.get()
4049 => SynSentOnSegmentDisposition::EnterClosed(Closed {
4050 reason: None
4051 }); "syn but timed out")]
4052 fn segment_arrives_when_syn_sent(
4053 incoming: Segment<()>,
4054 delay: Duration,
4055 ) -> SynSentOnSegmentDisposition<FakeInstant, ()> {
4056 let syn_sent = SynSent {
4057 iss: TEST_IRS,
4058 timestamp: Some(FakeInstant::default()),
4059 retrans_timer: RetransTimer::new(
4060 FakeInstant::default(),
4061 Rto::DEFAULT,
4062 Some(TEST_USER_TIMEOUT),
4063 DEFAULT_MAX_RETRIES,
4064 ),
4065 active_open: (),
4066 buffer_sizes: BufferSizes::default(),
4067 default_mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4068 device_mss: DEVICE_MAXIMUM_SEGMENT_SIZE,
4069 rcv_wnd_scale: WindowScale::default(),
4070 };
4071 syn_sent.on_segment(incoming, FakeInstant::from(delay))
4072 }
4073
4074 #[test_case(Segment::rst(TEST_ISS) => ListenOnSegmentDisposition::Ignore; "ignore RST")]
4075 #[test_case(Segment::ack(TEST_ISS, TEST_IRS, UnscaledWindowSize::from(u16::MAX)) =>
4076 ListenOnSegmentDisposition::SendRst(Segment::rst(TEST_IRS)); "reject ACK")]
4077 #[test_case(Segment::syn(TEST_ISS, UnscaledWindowSize::from(u16::MAX),
4078 HandshakeOptions {
4079 window_scale: Some(WindowScale::default()),
4080 ..Default::default()
4081 }.into()) =>
4082 ListenOnSegmentDisposition::SendSynAckAndEnterSynRcvd(
4083 Segment::syn_ack(
4084 TEST_IRS,
4085 TEST_ISS + 1,
4086 UnscaledWindowSize::from(u16::MAX),
4087 HandshakeOptions {
4088 mss: Some(Mss::default::<Ipv4>()),
4089 window_scale: Some(WindowScale::default()),
4090 sack_permitted: SACK_PERMITTED,
4091 ..Default::default()
4092 }.into()),
4093 SynRcvd {
4094 iss: TEST_IRS,
4095 irs: TEST_ISS,
4096 timestamp: Some(FakeInstant::default()),
4097 retrans_timer: RetransTimer::new(
4098 FakeInstant::default(),
4099 Rto::DEFAULT,
4100 None,
4101 DEFAULT_MAX_SYNACK_RETRIES,
4102 ),
4103 simultaneous_open: None,
4104 buffer_sizes: BufferSizes::default(),
4105 smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4106 sack_permitted: false,
4107 rcv_wnd_scale: WindowScale::default(),
4108 snd_wnd_scale: Some(WindowScale::default()),
4109 }); "accept syn")]
4110 fn segment_arrives_when_listen(
4111 incoming: Segment<()>,
4112 ) -> ListenOnSegmentDisposition<FakeInstant> {
4113 let listen = Closed::<Initial>::listen(
4114 TEST_IRS,
4115 Default::default(),
4116 DEVICE_MAXIMUM_SEGMENT_SIZE,
4117 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4118 None,
4119 );
4120 listen.on_segment(incoming, FakeInstant::default())
4121 }
4122
4123 #[test_case(
4124 Segment::ack(TEST_IRS, TEST_ISS, UnscaledWindowSize::from(u16::MAX)),
4125 None
4126 => Some(
4127 Segment::ack(TEST_ISS + 1, TEST_IRS + 1, UnscaledWindowSize::from(u16::MAX))
4128 ); "OTW segment")]
4129 #[test_case(
4130 Segment::rst_ack(TEST_IRS, TEST_ISS),
4131 None
4132 => None; "OTW RST")]
4133 #[test_case(
4134 Segment::rst_ack(TEST_IRS + 1, TEST_ISS),
4135 Some(State::Closed(Closed { reason: Some(ConnectionError::ConnectionReset) }))
4136 => None; "acceptable RST")]
4137 #[test_case(
4138 Segment::syn(TEST_IRS + 1, UnscaledWindowSize::from(u16::MAX),
4139 HandshakeOptions { window_scale: Some(WindowScale::default()), ..Default::default() }.into()),
4140 Some(State::Closed(Closed { reason: Some(ConnectionError::ConnectionReset) }))
4141 => Some(
4142 Segment::rst(TEST_ISS + 1)
4143 ); "duplicate syn")]
4144 #[test_case(
4145 Segment::ack(TEST_IRS + 1, TEST_ISS, UnscaledWindowSize::from(u16::MAX)),
4146 None
4147 => Some(
4148 Segment::rst(TEST_ISS)
4149 ); "unacceptable ack (ISS)")]
4150 #[test_case(
4151 Segment::ack(TEST_IRS + 1, TEST_ISS + 1, UnscaledWindowSize::from(u16::MAX)),
4152 Some(State::Established(
4153 Established {
4154 snd: Send {
4155 rtt_estimator: Estimator::Measured {
4156 srtt: RTT,
4157 rtt_var: RTT / 2,
4158 },
4159 ..Send::default_for_test(NullBuffer)
4160 }.into(),
4161 rcv: Recv {
4162 remaining_quickacks: quickack_counter(BufferLimits {
4163 capacity: WindowSize::DEFAULT.into(),
4164 len: 0,
4165 }, DEVICE_MAXIMUM_SEGMENT_SIZE),
4166 ..Recv::default_for_test(RingBuffer::default())
4167 }.into(),
4168 }
4169 ))
4170 => None; "acceptable ack (ISS + 1)")]
4171 #[test_case(
4172 Segment::ack(TEST_IRS + 1, TEST_ISS + 2, UnscaledWindowSize::from(u16::MAX)),
4173 None
4174 => Some(
4175 Segment::rst(TEST_ISS + 2)
4176 ); "unacceptable ack (ISS + 2)")]
4177 #[test_case(
4178 Segment::ack(TEST_IRS + 1, TEST_ISS - 1, UnscaledWindowSize::from(u16::MAX)),
4179 None
4180 => Some(
4181 Segment::rst(TEST_ISS - 1)
4182 ); "unacceptable ack (ISS - 1)")]
4183 #[test_case(
4184 Segment::new_empty(
4185 SegmentHeader {
4186 seq: TEST_IRS + 1,
4187 wnd: UnscaledWindowSize::from(u16::MAX),
4188 ..Default::default()
4189 }
4190 ),
4191 None
4192 => None; "no ack")]
4193 #[test_case(
4194 Segment::fin(TEST_IRS + 1, TEST_ISS + 1, UnscaledWindowSize::from(u16::MAX)),
4195 Some(State::CloseWait(CloseWait {
4196 snd: Send {
4197 rtt_estimator: Estimator::Measured {
4198 srtt: RTT,
4199 rtt_var: RTT / 2,
4200 },
4201 ..Send::default_for_test(NullBuffer)
4202 }.into(),
4203 closed_rcv: RecvParams {
4204 ack: TEST_IRS + 2,
4205 wnd: WindowSize::from_u32(u32::from(u16::MAX - 1)).unwrap(),
4206 wnd_scale: WindowScale::ZERO,
4207 }
4208 }))
4209 => Some(
4210 Segment::ack(TEST_ISS + 1, TEST_IRS + 2, UnscaledWindowSize::from(u16::MAX - 1))
4211 ); "fin")]
4212 fn segment_arrives_when_syn_rcvd(
4213 incoming: Segment<()>,
4214 expected: Option<State<FakeInstant, RingBuffer, NullBuffer, ()>>,
4215 ) -> Option<Segment<()>> {
4216 let mut clock = FakeInstantCtx::default();
4217 let counters = FakeTcpCounters::default();
4218 let mut state = State::new_syn_rcvd(clock.now());
4219 clock.sleep(RTT);
4220 let (seg, _passive_open) = state
4221 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
4222 incoming,
4223 clock.now(),
4224 &counters.refs(),
4225 );
4226 match expected {
4227 Some(new_state) => assert_eq!(state, new_state),
4228 None => assert_matches!(state, State::SynRcvd(_)),
4229 };
4230 seg
4231 }
4232
4233 #[test]
4234 fn abort_when_syn_rcvd() {
4235 let clock = FakeInstantCtx::default();
4236 let counters = FakeTcpCounters::default();
4237 let mut state = State::new_syn_rcvd(clock.now());
4238 let segment = assert_matches!(
4239 state.abort(&counters.refs()),
4240 (Some(seg), NewlyClosed::Yes) => seg
4241 );
4242 assert_eq!(segment.header().control, Some(Control::RST));
4243 assert_eq!(segment.header().seq, TEST_ISS + 1);
4244 assert_eq!(segment.header().ack, Some(TEST_IRS + 1));
4245 }
4246
4247 #[test_case(
4248 Segment::syn(TEST_IRS + 1, UnscaledWindowSize::from(u16::MAX), HandshakeOptions::default().into()),
4249 Some(State::Closed (
4250 Closed { reason: Some(ConnectionError::ConnectionReset) },
4251 ))
4252 => Some(Segment::rst(TEST_ISS + 1)); "duplicate syn")]
4253 #[test_case(
4254 Segment::rst(TEST_IRS + 1),
4255 Some(State::Closed (
4256 Closed { reason: Some(ConnectionError::ConnectionReset) },
4257 ))
4258 => None; "accepatable rst")]
4259 #[test_case(
4260 Segment::ack(TEST_ISS + 1, TEST_IRS + 2, UnscaledWindowSize::from(u16::MAX)),
4261 None
4262 => Some(
4263 Segment::ack(TEST_ISS + 1, TEST_IRS + 1, UnscaledWindowSize::from(2))
4264 ); "unacceptable ack")]
4265 #[test_case(
4266 Segment::ack(TEST_IRS + 1, TEST_ISS + 1, UnscaledWindowSize::from(u16::MAX)),
4267 None
4268 => None; "pure ack")]
4269 #[test_case(
4270 Segment::fin(TEST_IRS + 1, TEST_ISS + 1, UnscaledWindowSize::from(u16::MAX)),
4271 Some(State::CloseWait(CloseWait {
4272 snd: Send::default_for_test(NullBuffer).into(),
4273 closed_rcv: RecvParams {
4274 ack: TEST_IRS + 2,
4275 wnd: WindowSize::new(1).unwrap(),
4276 wnd_scale: WindowScale::ZERO,
4277 }
4278 }))
4279 => Some(
4280 Segment::ack(TEST_ISS + 1, TEST_IRS + 2, UnscaledWindowSize::from(1))
4281 ); "pure fin")]
4282 #[test_case(
4283 Segment::piggybacked_fin(TEST_IRS + 1, TEST_ISS + 1, UnscaledWindowSize::from(u16::MAX), "A".as_bytes()),
4284 Some(State::CloseWait(CloseWait {
4285 snd: Send::default_for_test(NullBuffer).into(),
4286 closed_rcv: RecvParams {
4287 ack: TEST_IRS + 3,
4288 wnd: WindowSize::ZERO,
4289 wnd_scale: WindowScale::ZERO,
4290 }
4291 }))
4292 => Some(
4293 Segment::ack(TEST_ISS + 1, TEST_IRS + 3, UnscaledWindowSize::from(0))
4294 ); "fin with 1 byte")]
4295 #[test_case(
4296 Segment::piggybacked_fin(TEST_IRS + 1, TEST_ISS + 1, UnscaledWindowSize::from(u16::MAX), "AB".as_bytes()),
4297 None
4298 => Some(
4299 Segment::ack(TEST_ISS + 1, TEST_IRS + 3, UnscaledWindowSize::from(0))
4300 ); "fin with 2 bytes")]
4301 fn segment_arrives_when_established(
4302 incoming: Segment<&[u8]>,
4303 expected: Option<State<FakeInstant, RingBuffer, NullBuffer, ()>>,
4304 ) -> Option<Segment<()>> {
4305 let counters = FakeTcpCounters::default();
4306 let mut state = State::Established(Established {
4307 snd: Send::default_for_test(NullBuffer).into(),
4308 rcv: Recv::default_for_test(RingBuffer::new(2)).into(),
4309 });
4310 let (seg, passive_open) = state
4311 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
4312 incoming,
4313 FakeInstant::default(),
4314 &counters.refs(),
4315 );
4316 assert_eq!(passive_open, None);
4317 match expected {
4318 Some(new_state) => assert_eq!(new_state, state),
4319 None => assert_matches!(state, State::Established(_)),
4320 };
4321 seg
4322 }
4323
4324 #[test]
4325 fn common_rcv_data_segment_arrives() {
4326 let counters = FakeTcpCounters::default();
4327 let new_snd = || Send::default_for_test(NullBuffer);
4330 let new_rcv = || Recv::default_for_test(RingBuffer::new(TEST_BYTES.len()));
4331 for mut state in [
4332 State::Established(Established { snd: new_snd().into(), rcv: new_rcv().into() }),
4333 State::FinWait1(FinWait1 { snd: new_snd().queue_fin().into(), rcv: new_rcv().into() }),
4334 State::FinWait2(FinWait2 { last_seq: TEST_ISS + 1, rcv: new_rcv(), timeout_at: None }),
4335 ] {
4336 assert_eq!(
4337 state.on_segment_with_default_options::<_, ClientlessBufferProvider>(
4338 Segment::with_data(
4339 TEST_IRS + 1,
4340 TEST_ISS + 1,
4341 UnscaledWindowSize::from(u16::MAX),
4342 TEST_BYTES
4343 ),
4344 FakeInstant::default(),
4345 &counters.refs(),
4346 ),
4347 (
4348 Some(Segment::ack(
4349 TEST_ISS + 1,
4350 TEST_IRS + 1 + TEST_BYTES.len(),
4351 UnscaledWindowSize::from(0)
4352 )),
4353 None
4354 )
4355 );
4356 assert_eq!(
4357 state.read_with(|bytes| {
4358 assert_eq!(bytes.concat(), TEST_BYTES);
4359 TEST_BYTES.len()
4360 }),
4361 TEST_BYTES.len()
4362 );
4363 }
4364 }
4365
4366 #[test]
4367 fn common_snd_ack_segment_arrives() {
4368 let counters = FakeTcpCounters::default();
4369 let new_snd =
4372 || Send::default_for_test(RingBuffer::with_data(TEST_BYTES.len(), TEST_BYTES));
4373 let new_rcv = || Recv::default_for_test(NullBuffer);
4374 for mut state in [
4375 State::Established(Established { snd: new_snd().into(), rcv: new_rcv().into() }),
4376 State::FinWait1(FinWait1 { snd: new_snd().queue_fin().into(), rcv: new_rcv().into() }),
4377 State::Closing(Closing {
4378 snd: new_snd().queue_fin(),
4379 closed_rcv: RecvParams {
4380 ack: TEST_IRS + 1,
4381 wnd: WindowSize::ZERO,
4382 wnd_scale: WindowScale::default(),
4383 },
4384 }),
4385 State::CloseWait(CloseWait {
4386 snd: new_snd().into(),
4387 closed_rcv: RecvParams {
4388 ack: TEST_IRS + 1,
4389 wnd: WindowSize::ZERO,
4390 wnd_scale: WindowScale::default(),
4391 },
4392 }),
4393 State::LastAck(LastAck {
4394 snd: new_snd().queue_fin(),
4395 closed_rcv: RecvParams {
4396 ack: TEST_IRS + 1,
4397 wnd: WindowSize::ZERO,
4398 wnd_scale: WindowScale::default(),
4399 },
4400 }),
4401 ] {
4402 assert_eq!(
4403 state.poll_send_with_default_options(
4404 u32::try_from(TEST_BYTES.len()).unwrap(),
4405 FakeInstant::default(),
4406 &counters.refs(),
4407 ),
4408 Some(Segment::new_assert_no_discard(
4409 SegmentHeader {
4410 seq: TEST_ISS + 1,
4411 ack: Some(TEST_IRS + 1),
4412 wnd: UnscaledWindowSize::from(0),
4413 push: true,
4414 ..Default::default()
4415 },
4416 FragmentedPayload::new_contiguous(TEST_BYTES)
4417 ))
4418 );
4419 assert_eq!(
4420 state.on_segment_with_default_options::<_, ClientlessBufferProvider>(
4421 Segment::<()>::ack(
4422 TEST_IRS + 1,
4423 TEST_ISS + 1 + TEST_BYTES.len(),
4424 UnscaledWindowSize::from(u16::MAX)
4425 ),
4426 FakeInstant::default(),
4427 &counters.refs(),
4428 ),
4429 (None, None),
4430 );
4431 assert_eq!(state.poll_send_at(), None);
4432 let snd = match state {
4433 State::Closed(_)
4434 | State::Listen(_)
4435 | State::SynRcvd(_)
4436 | State::SynSent(_)
4437 | State::FinWait2(_)
4438 | State::TimeWait(_) => unreachable!("Unexpected state {:?}", state),
4439 State::Established(e) => e.snd.into_inner().queue_fin(),
4440 State::CloseWait(c) => c.snd.into_inner().queue_fin(),
4441 State::LastAck(l) => l.snd,
4442 State::FinWait1(f) => f.snd.into_inner(),
4443 State::Closing(c) => c.snd,
4444 };
4445 assert_eq!(snd.nxt, TEST_ISS + 1 + TEST_BYTES.len());
4446 assert_eq!(snd.max, TEST_ISS + 1 + TEST_BYTES.len());
4447 assert_eq!(snd.una, TEST_ISS + 1 + TEST_BYTES.len());
4448 assert_eq!(snd.buffer.limits().len, 0);
4449 }
4450 }
4451
4452 #[test_case(
4453 Segment::syn(TEST_IRS + 2, UnscaledWindowSize::from(u16::MAX),
4454 HandshakeOptions::default().into()),
4455 Some(State::Closed (
4456 Closed { reason: Some(ConnectionError::ConnectionReset) },
4457 ))
4458 => Some(Segment::rst(TEST_ISS + 1)); "syn")]
4459 #[test_case(
4460 Segment::rst(TEST_IRS + 2),
4461 Some(State::Closed (
4462 Closed { reason: Some(ConnectionError::ConnectionReset) },
4463 ))
4464 => None; "rst")]
4465 #[test_case(
4466 Segment::fin(TEST_IRS + 2, TEST_ISS + 1, UnscaledWindowSize::from(u16::MAX)),
4467 None
4468 => None; "ignore fin")]
4469 #[test_case(
4470 Segment::with_data(TEST_IRS, TEST_ISS + 1, UnscaledWindowSize::from(u16::MAX), "a".as_bytes()),
4471 None => Some(Segment::ack(TEST_ISS + 1, TEST_IRS + 2, UnscaledWindowSize::from(u16::MAX)));
4472 "ack old data")]
4473 #[test_case(
4474 Segment::with_data(TEST_IRS + 2, TEST_ISS + 1, UnscaledWindowSize::from(u16::MAX), "Hello".as_bytes()),
4475 Some(State::Closed (
4476 Closed { reason: Some(ConnectionError::ConnectionReset) },
4477 ))
4478 => Some(Segment::rst(TEST_ISS + 1)); "reset on new data")]
4479 fn segment_arrives_when_close_wait(
4480 incoming: Segment<&[u8]>,
4481 expected: Option<State<FakeInstant, RingBuffer, NullBuffer, ()>>,
4482 ) -> Option<Segment<()>> {
4483 let counters = FakeTcpCounters::default();
4484 let mut state = State::CloseWait(CloseWait {
4485 snd: Send::default_for_test(NullBuffer).into(),
4486 closed_rcv: RecvParams {
4487 ack: TEST_IRS + 2,
4488 wnd: WindowSize::DEFAULT,
4489 wnd_scale: WindowScale::ZERO,
4490 },
4491 });
4492 let (seg, _passive_open) = state
4493 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
4494 incoming,
4495 FakeInstant::default(),
4496 &counters.refs(),
4497 );
4498 match expected {
4499 Some(new_state) => assert_eq!(new_state, state),
4500 None => assert_matches!(state, State::CloseWait(_)),
4501 };
4502 seg
4503 }
4504
4505 #[test_case(true; "sack")]
4506 #[test_case(false; "no sack")]
4507 fn active_passive_open(sack_permitted: bool) {
4508 let mut clock = FakeInstantCtx::default();
4509 let counters = FakeTcpCounters::default();
4510 let passive_iss = ISS_2;
4511 let active_iss = ISS_1;
4512 let (syn_sent, syn_seg) = Closed::<Initial>::connect(
4513 active_iss,
4514 clock.now(),
4515 (),
4516 Default::default(),
4517 DEVICE_MAXIMUM_SEGMENT_SIZE,
4518 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4519 &SocketOptions::default_for_state_tests(),
4520 );
4521 assert_eq!(
4522 syn_seg,
4523 Segment::syn(
4524 active_iss,
4525 UnscaledWindowSize::from(u16::MAX),
4526 HandshakeOptions {
4527 mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE),
4528 window_scale: Some(WindowScale::default()),
4529 sack_permitted: SACK_PERMITTED,
4531 }
4532 .into(),
4533 )
4534 );
4535 assert_eq!(
4536 syn_sent,
4537 SynSent {
4538 iss: active_iss,
4539 timestamp: Some(clock.now()),
4540 retrans_timer: RetransTimer::new(
4541 clock.now(),
4542 Rto::DEFAULT,
4543 None,
4544 DEFAULT_MAX_SYN_RETRIES,
4545 ),
4546 active_open: (),
4547 buffer_sizes: BufferSizes::default(),
4548 default_mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4549 device_mss: DEVICE_MAXIMUM_SEGMENT_SIZE,
4550 rcv_wnd_scale: WindowScale::default(),
4551 }
4552 );
4553 let mut active = State::SynSent(syn_sent);
4554 let mut passive = State::Listen(Closed::<Initial>::listen(
4555 passive_iss,
4556 Default::default(),
4557 DEVICE_MAXIMUM_SEGMENT_SIZE,
4558 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4559 None,
4560 ));
4561 clock.sleep(RTT / 2);
4562
4563 let syn_seg = {
4565 let (mut header, data) = syn_seg.into_parts();
4566 let opt = assert_matches!(&mut header.options, Options::Handshake(o) => o);
4567 opt.sack_permitted = sack_permitted;
4568 Segment::new_assert_no_discard(header, data)
4569 };
4570
4571 let (seg, passive_open) = passive
4572 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
4573 syn_seg,
4574 clock.now(),
4575 &counters.refs(),
4576 );
4577 let syn_ack = seg.expect("failed to generate a syn-ack segment");
4578 assert_eq!(passive_open, None);
4579 assert_eq!(
4580 syn_ack,
4581 Segment::syn_ack(
4582 passive_iss,
4583 active_iss + 1,
4584 UnscaledWindowSize::from(u16::MAX),
4585 HandshakeOptions {
4586 mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE),
4587 window_scale: Some(WindowScale::default()),
4588 sack_permitted: SACK_PERMITTED,
4590 }
4591 .into(),
4592 )
4593 );
4594 assert_matches!(passive, State::SynRcvd(ref syn_rcvd) if syn_rcvd == &SynRcvd {
4595 iss: passive_iss,
4596 irs: active_iss,
4597 timestamp: Some(clock.now()),
4598 retrans_timer: RetransTimer::new(
4599 clock.now(),
4600 Rto::DEFAULT,
4601 None,
4602 DEFAULT_MAX_SYNACK_RETRIES,
4603 ),
4604 simultaneous_open: None,
4605 buffer_sizes: Default::default(),
4606 smss: DEVICE_MAXIMUM_SEGMENT_SIZE,
4607 rcv_wnd_scale: WindowScale::default(),
4608 snd_wnd_scale: Some(WindowScale::default()),
4609 sack_permitted,
4610 });
4611 clock.sleep(RTT / 2);
4612
4613 let syn_ack = {
4615 let (mut header, data) = syn_ack.into_parts();
4616 let opt = assert_matches!(&mut header.options, Options::Handshake(o) => o);
4617 opt.sack_permitted = sack_permitted;
4618 Segment::new_assert_no_discard(header, data)
4619 };
4620
4621 let (seg, passive_open) = active
4622 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
4623 syn_ack,
4624 clock.now(),
4625 &counters.refs(),
4626 );
4627 let ack_seg = seg.expect("failed to generate a ack segment");
4628 assert_eq!(passive_open, None);
4629 assert_eq!(
4630 ack_seg,
4631 Segment::ack(active_iss + 1, passive_iss + 1, UnscaledWindowSize::from(u16::MAX))
4632 );
4633 let established = assert_matches!(&active, State::Established(e) => e);
4634 assert_eq!(
4635 established,
4636 &Established {
4637 snd: Send {
4638 wl1: passive_iss,
4639 rtt_estimator: Estimator::Measured { srtt: RTT, rtt_var: RTT / 2 },
4640 ..Send::default_for_test_at(active_iss + 1, RingBuffer::default())
4641 }
4642 .into(),
4643 rcv: Recv {
4644 remaining_quickacks: default_quickack_counter(),
4645 sack_permitted,
4646 ..Recv::default_for_test_at(passive_iss + 1, RingBuffer::default())
4647 }
4648 .into()
4649 }
4650 );
4651 clock.sleep(RTT / 2);
4652 assert_eq!(
4653 passive.on_segment_with_default_options::<_, ClientlessBufferProvider>(
4654 ack_seg,
4655 clock.now(),
4656 &counters.refs(),
4657 ),
4658 (None, Some(())),
4659 );
4660 let established = assert_matches!(&passive, State::Established(e) => e);
4661 assert_eq!(
4662 established,
4663 &Established {
4664 snd: Send {
4665 wl1: active_iss + 1,
4666 rtt_estimator: Estimator::Measured { srtt: RTT, rtt_var: RTT / 2 },
4667 ..Send::default_for_test_at(passive_iss + 1, RingBuffer::default())
4668 }
4669 .into(),
4670 rcv: Recv {
4671 remaining_quickacks: default_quickack_counter(),
4672 sack_permitted,
4673 ..Recv::default_for_test_at(active_iss + 1, RingBuffer::default())
4674 }
4675 .into()
4676 }
4677 )
4678 }
4679
4680 #[test]
4681 fn simultaneous_open() {
4682 let mut clock = FakeInstantCtx::default();
4683 let counters = FakeTcpCounters::default();
4684 let (syn_sent1, syn1) = Closed::<Initial>::connect(
4685 ISS_1,
4686 clock.now(),
4687 (),
4688 Default::default(),
4689 DEVICE_MAXIMUM_SEGMENT_SIZE,
4690 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4691 &SocketOptions::default_for_state_tests(),
4692 );
4693 let (syn_sent2, syn2) = Closed::<Initial>::connect(
4694 ISS_2,
4695 clock.now(),
4696 (),
4697 Default::default(),
4698 DEVICE_MAXIMUM_SEGMENT_SIZE,
4699 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4700 &SocketOptions::default_for_state_tests(),
4701 );
4702
4703 assert_eq!(
4704 syn1,
4705 Segment::syn(
4706 ISS_1,
4707 UnscaledWindowSize::from(u16::MAX),
4708 HandshakeOptions {
4709 mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE),
4710 window_scale: Some(WindowScale::default()),
4711 sack_permitted: SACK_PERMITTED,
4712 }
4713 .into(),
4714 )
4715 );
4716 assert_eq!(
4717 syn2,
4718 Segment::syn(
4719 ISS_2,
4720 UnscaledWindowSize::from(u16::MAX),
4721 HandshakeOptions {
4722 mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE),
4723 window_scale: Some(WindowScale::default()),
4724 sack_permitted: SACK_PERMITTED,
4725 }
4726 .into(),
4727 )
4728 );
4729
4730 let mut state1 = State::SynSent(syn_sent1);
4731 let mut state2 = State::SynSent(syn_sent2);
4732
4733 clock.sleep(RTT);
4734 let (seg, passive_open) = state1
4735 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
4736 syn2,
4737 clock.now(),
4738 &counters.refs(),
4739 );
4740 let syn_ack1 = seg.expect("failed to generate syn ack");
4741 assert_eq!(passive_open, None);
4742 let (seg, passive_open) = state2
4743 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
4744 syn1,
4745 clock.now(),
4746 &counters.refs(),
4747 );
4748 let syn_ack2 = seg.expect("failed to generate syn ack");
4749 assert_eq!(passive_open, None);
4750
4751 assert_eq!(
4752 syn_ack1,
4753 Segment::syn_ack(
4754 ISS_1,
4755 ISS_2 + 1,
4756 UnscaledWindowSize::from(u16::MAX),
4757 HandshakeOptions {
4758 mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE),
4759 window_scale: Some(WindowScale::default()),
4760 sack_permitted: SACK_PERMITTED,
4761 }
4762 .into()
4763 )
4764 );
4765 assert_eq!(
4766 syn_ack2,
4767 Segment::syn_ack(
4768 ISS_2,
4769 ISS_1 + 1,
4770 UnscaledWindowSize::from(u16::MAX),
4771 HandshakeOptions {
4772 mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE),
4773 window_scale: Some(WindowScale::default()),
4774 sack_permitted: SACK_PERMITTED,
4775 }
4776 .into()
4777 )
4778 );
4779
4780 assert_matches!(state1, State::SynRcvd(ref syn_rcvd) if syn_rcvd == &SynRcvd {
4781 iss: ISS_1,
4782 irs: ISS_2,
4783 timestamp: Some(clock.now()),
4784 retrans_timer: RetransTimer::new(
4785 clock.now(),
4786 Rto::DEFAULT,
4787 None,
4788 DEFAULT_MAX_SYNACK_RETRIES,
4789 ),
4790 simultaneous_open: Some(()),
4791 buffer_sizes: BufferSizes::default(),
4792 smss: DEVICE_MAXIMUM_SEGMENT_SIZE,
4793 rcv_wnd_scale: WindowScale::default(),
4794 snd_wnd_scale: Some(WindowScale::default()),
4795 sack_permitted: SACK_PERMITTED,
4796 });
4797 assert_matches!(state2, State::SynRcvd(ref syn_rcvd) if syn_rcvd == &SynRcvd {
4798 iss: ISS_2,
4799 irs: ISS_1,
4800 timestamp: Some(clock.now()),
4801 retrans_timer: RetransTimer::new(
4802 clock.now(),
4803 Rto::DEFAULT,
4804 None,
4805 DEFAULT_MAX_SYNACK_RETRIES,
4806 ),
4807 simultaneous_open: Some(()),
4808 buffer_sizes: BufferSizes::default(),
4809 smss: DEVICE_MAXIMUM_SEGMENT_SIZE,
4810 rcv_wnd_scale: WindowScale::default(),
4811 snd_wnd_scale: Some(WindowScale::default()),
4812 sack_permitted: SACK_PERMITTED,
4813 });
4814
4815 clock.sleep(RTT);
4816 assert_eq!(
4817 state1.on_segment_with_default_options::<_, ClientlessBufferProvider>(
4818 syn_ack2,
4819 clock.now(),
4820 &counters.refs(),
4821 ),
4822 (Some(Segment::ack(ISS_1 + 1, ISS_2 + 1, UnscaledWindowSize::from(u16::MAX))), None)
4823 );
4824 assert_eq!(
4825 state2.on_segment_with_default_options::<_, ClientlessBufferProvider>(
4826 syn_ack1,
4827 clock.now(),
4828 &counters.refs(),
4829 ),
4830 (Some(Segment::ack(ISS_2 + 1, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX))), None)
4831 );
4832
4833 let established = assert_matches!(state1, State::Established(e) => e);
4834 assert_eq!(
4835 established,
4836 Established {
4837 snd: Send {
4838 wl1: ISS_2 + 1,
4839 rtt_estimator: Estimator::Measured { srtt: RTT, rtt_var: RTT / 2 },
4840 congestion_control: CongestionControl::cubic_with_mss(
4841 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE
4842 ),
4843 ..Send::default_for_test_at(ISS_1 + 1, RingBuffer::default())
4844 }
4845 .into(),
4846 rcv: Recv {
4847 remaining_quickacks: default_quickack_counter() - 1,
4848 last_segment_at: Some(clock.now()),
4849 ..Recv::default_for_test_at(ISS_2 + 1, RingBuffer::default())
4850 }
4851 .into()
4852 }
4853 );
4854
4855 let established = assert_matches!(state2, State::Established(e) => e);
4856 assert_eq!(
4857 established,
4858 Established {
4859 snd: Send {
4860 wl1: ISS_1 + 1,
4861 rtt_estimator: Estimator::Measured { srtt: RTT, rtt_var: RTT / 2 },
4862 congestion_control: CongestionControl::cubic_with_mss(
4863 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE
4864 ),
4865 ..Send::default_for_test_at(ISS_2 + 1, RingBuffer::default())
4866 }
4867 .into(),
4868 rcv: Recv {
4869 remaining_quickacks: default_quickack_counter() - 1,
4870 last_segment_at: Some(clock.now()),
4871 ..Recv::default_for_test_at(ISS_1 + 1, RingBuffer::default())
4872 }
4873 .into()
4874 }
4875 );
4876 }
4877
4878 const BUFFER_SIZE: usize = 16;
4879 const TEST_BYTES: &[u8] = "Hello".as_bytes();
4880
4881 #[test_case(true; "sack permitted")]
4882 #[test_case(false; "sack not permitted")]
4883 fn established_receive(sack_permitted: bool) {
4884 let clock = FakeInstantCtx::default();
4885 let counters = FakeTcpCounters::default();
4886 let mut established = State::Established(Established {
4887 snd: Send {
4888 wnd: WindowSize::ZERO,
4889 wnd_max: WindowSize::ZERO,
4890 buffer: NullBuffer,
4891 congestion_control: CongestionControl::cubic_with_mss(Mss(
4892 NonZeroU16::new(5).unwrap()
4893 )),
4894 ..Send::default_for_test(NullBuffer)
4895 }
4896 .into(),
4897 rcv: Recv {
4898 mss: Mss(NonZeroU16::new(5).unwrap()),
4899 sack_permitted,
4900 ..Recv::default_for_test(RingBuffer::new(BUFFER_SIZE))
4901 }
4902 .into(),
4903 });
4904
4905 assert_eq!(
4907 established.on_segment_with_default_options::<_, ClientlessBufferProvider>(
4908 Segment::with_data(
4909 TEST_IRS + 1,
4910 TEST_ISS + 1,
4911 UnscaledWindowSize::from(0),
4912 TEST_BYTES,
4913 ),
4914 clock.now(),
4915 &counters.refs(),
4916 ),
4917 (
4918 Some(Segment::ack(
4919 TEST_ISS + 1,
4920 TEST_IRS + 1 + TEST_BYTES.len(),
4921 UnscaledWindowSize::from((BUFFER_SIZE - TEST_BYTES.len()) as u16),
4922 )),
4923 None
4924 ),
4925 );
4926 assert_eq!(
4927 established.read_with(|available| {
4928 assert_eq!(available, &[TEST_BYTES]);
4929 available[0].len()
4930 }),
4931 TEST_BYTES.len()
4932 );
4933
4934 let segment_start = TEST_IRS + 1 + TEST_BYTES.len() * 2;
4936 assert_eq!(
4937 established.on_segment_with_default_options::<_, ClientlessBufferProvider>(
4938 Segment::with_data(
4939 segment_start,
4940 TEST_ISS + 1,
4941 UnscaledWindowSize::from(0),
4942 TEST_BYTES,
4943 ),
4944 clock.now(),
4945 &counters.refs()
4946 ),
4947 (
4948 Some(Segment::ack_with_options(
4949 TEST_ISS + 1,
4950 TEST_IRS + 1 + TEST_BYTES.len(),
4951 UnscaledWindowSize::from(u16::try_from(BUFFER_SIZE).unwrap()),
4952 SegmentOptions {
4953 sack_blocks: if sack_permitted {
4954 [SackBlock::try_new(
4955 segment_start,
4956 segment_start + u32::try_from(TEST_BYTES.len()).unwrap(),
4957 )
4958 .unwrap()]
4959 .into_iter()
4960 .collect()
4961 } else {
4962 SackBlocks::default()
4963 }
4964 }
4965 .into()
4966 )),
4967 None
4968 ),
4969 );
4970 assert_eq!(
4971 established.read_with(|available| {
4972 let empty: &[u8] = &[];
4973 assert_eq!(available, &[empty]);
4974 0
4975 }),
4976 0
4977 );
4978
4979 assert_eq!(
4981 established.on_segment_with_default_options::<_, ClientlessBufferProvider>(
4982 Segment::with_data(
4983 TEST_IRS + 1 + TEST_BYTES.len(),
4984 TEST_ISS + 1,
4985 UnscaledWindowSize::from(0),
4986 TEST_BYTES,
4987 ),
4988 clock.now(),
4989 &counters.refs()
4990 ),
4991 (
4992 Some(Segment::ack(
4993 TEST_ISS + 1,
4994 TEST_IRS + 1 + 3 * TEST_BYTES.len(),
4995 UnscaledWindowSize::from_usize(BUFFER_SIZE - 2 * TEST_BYTES.len()),
4996 )),
4997 None
4998 ),
4999 );
5000 assert_eq!(
5001 established.read_with(|available| {
5002 assert_eq!(available, &[[TEST_BYTES, TEST_BYTES].concat()]);
5003 available[0].len()
5004 }),
5005 10
5006 );
5007 }
5008
5009 #[test]
5010 fn established_send() {
5011 let clock = FakeInstantCtx::default();
5012 let counters = FakeTcpCounters::default();
5013 let mut send_buffer = RingBuffer::new(BUFFER_SIZE);
5014 assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5);
5015 let mut established = State::Established(Established {
5016 snd: Send {
5017 una: TEST_ISS,
5018 wl2: TEST_ISS,
5019 wnd: WindowSize::ZERO,
5020 wnd_max: WindowSize::ZERO,
5021 ..Send::default_for_test_at(TEST_ISS + 1, send_buffer)
5022 }
5023 .into(),
5024 rcv: Recv::default_for_test(RingBuffer::new(BUFFER_SIZE)).into(),
5025 });
5026 assert_eq!(
5028 established.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5029 None
5030 );
5031 let open_window = |established: &mut State<FakeInstant, RingBuffer, RingBuffer, ()>,
5032 ack: SeqNum,
5033 win: usize,
5034 now: FakeInstant,
5035 counters: &TcpCountersRefs<'_>| {
5036 assert_eq!(
5037 established.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5038 Segment::ack(TEST_IRS + 1, ack, UnscaledWindowSize::from_usize(win)),
5039 now,
5040 counters
5041 ),
5042 (None, None),
5043 );
5044 };
5045 open_window(&mut established, TEST_ISS + 1, 1, clock.now(), &counters.refs());
5047 assert_eq!(
5048 established.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5049 Some(Segment::with_data(
5050 TEST_ISS + 1,
5051 TEST_IRS + 1,
5052 UnscaledWindowSize::from_usize(BUFFER_SIZE),
5053 FragmentedPayload::new_contiguous(&TEST_BYTES[1..2]),
5054 ))
5055 );
5056
5057 open_window(&mut established, TEST_ISS + 2, 10, clock.now(), &counters.refs());
5059 assert_eq!(
5060 established.poll_send_with_default_options(2, clock.now(), &counters.refs()),
5061 Some(Segment::with_data(
5062 TEST_ISS + 2,
5063 TEST_IRS + 1,
5064 UnscaledWindowSize::from_usize(BUFFER_SIZE),
5065 FragmentedPayload::new_contiguous(&TEST_BYTES[2..4]),
5066 ))
5067 );
5068
5069 assert_eq!(
5070 established.poll_send(
5071 &FakeStateMachineDebugId,
5072 &counters.refs(),
5073 1,
5074 clock.now(),
5075 &SocketOptions { nagle_enabled: false, ..SocketOptions::default_for_state_tests() }
5076 ),
5077 Ok(Segment::new_assert_no_discard(
5078 SegmentHeader {
5079 seq: TEST_ISS + 4,
5080 ack: Some(TEST_IRS + 1),
5081 wnd: UnscaledWindowSize::from_usize(BUFFER_SIZE),
5082 push: true,
5083 ..Default::default()
5084 },
5085 FragmentedPayload::new_contiguous(&TEST_BYTES[4..]),
5086 ))
5087 );
5088
5089 assert_eq!(
5091 established.poll_send_with_default_options(1, clock.now(), &counters.refs()),
5092 None
5093 );
5094 }
5095
5096 #[test]
5097 fn self_connect_retransmission() {
5098 let mut clock = FakeInstantCtx::default();
5099 let counters = FakeTcpCounters::default();
5100 let (syn_sent, syn) = Closed::<Initial>::connect(
5101 ISS_1,
5102 clock.now(),
5103 (),
5104 Default::default(),
5105 DEVICE_MAXIMUM_SEGMENT_SIZE,
5106 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
5107 &SocketOptions::default_for_state_tests(),
5108 );
5109 let mut state = State::<_, RingBuffer, RingBuffer, ()>::SynSent(syn_sent);
5110 assert_eq!(state.poll_send_at(), Some(FakeInstant::from(Rto::DEFAULT.get())));
5112 clock.sleep(Rto::DEFAULT.get());
5113 assert_eq!(
5115 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5116 Some(syn.clone().into_empty())
5117 );
5118
5119 let (seg, passive_open) = state
5121 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
5122 syn,
5123 clock.now(),
5124 &counters.refs(),
5125 );
5126 let syn_ack = seg.expect("expected SYN-ACK");
5127 assert_eq!(passive_open, None);
5128 assert_eq!(state.poll_send_at(), Some(clock.now() + Rto::DEFAULT.get()));
5130 clock.sleep(Rto::DEFAULT.get());
5131 assert_eq!(
5133 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5134 Some(syn_ack.clone().into_empty())
5135 );
5136
5137 assert_eq!(
5139 state.on_segment_with_default_options::<_, ClientlessBufferProvider>(
5140 syn_ack,
5141 clock.now(),
5142 &counters.refs(),
5143 ),
5144 (Some(Segment::ack(ISS_1 + 1, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX))), None)
5145 );
5146 match state {
5147 State::Closed(_)
5148 | State::Listen(_)
5149 | State::SynRcvd(_)
5150 | State::SynSent(_)
5151 | State::LastAck(_)
5152 | State::FinWait1(_)
5153 | State::FinWait2(_)
5154 | State::Closing(_)
5155 | State::TimeWait(_) => {
5156 panic!("expected that we have entered established state, but got {:?}", state)
5157 }
5158 State::Established(Established { ref mut snd, rcv: _ })
5159 | State::CloseWait(CloseWait { ref mut snd, closed_rcv: _ }) => {
5160 assert_eq!(snd.buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len());
5161 }
5162 }
5163 assert_eq!(state.poll_send_at(), None);
5165 for i in 0..3 {
5167 assert_eq!(
5168 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5169 Some(Segment::new_assert_no_discard(
5170 SegmentHeader {
5171 seq: ISS_1 + 1,
5172 ack: Some(ISS_1 + 1),
5173 wnd: UnscaledWindowSize::from(u16::MAX),
5174 push: true,
5175 ..Default::default()
5176 },
5177 FragmentedPayload::new_contiguous(TEST_BYTES),
5178 ))
5179 );
5180 assert_eq!(state.poll_send_at(), Some(clock.now() + (1 << i) * Rto::DEFAULT.get()));
5181 clock.sleep((1 << i) * Rto::DEFAULT.get());
5182 CounterExpectations {
5183 retransmits: i,
5184 slow_start_retransmits: i,
5185 timeouts: i,
5186 ..Default::default()
5187 }
5188 .assert_counters(&counters);
5189 }
5190 assert_eq!(
5192 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5193 Segment::ack(
5194 ISS_1 + 1 + TEST_BYTES.len(),
5195 ISS_1 + 1 + 1,
5196 UnscaledWindowSize::from(u16::MAX)
5197 ),
5198 clock.now(),
5199 &counters.refs(),
5200 ),
5201 (None, None),
5202 );
5203 assert_eq!(state.poll_send_at(), Some(clock.now() + Rto::DEFAULT.get()));
5206 clock.sleep(Rto::DEFAULT.get());
5207 assert_eq!(
5208 state.poll_send_with_default_options(1, clock.now(), &counters.refs(),),
5209 Some(Segment::with_data(
5210 ISS_1 + 1 + 1,
5211 ISS_1 + 1,
5212 UnscaledWindowSize::from(u16::MAX),
5213 FragmentedPayload::new_contiguous(&TEST_BYTES[1..2]),
5214 ))
5215 );
5216 assert_eq!(
5219 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5220 Segment::ack(
5221 ISS_1 + 1 + TEST_BYTES.len(),
5222 ISS_1 + 1 + 3,
5223 UnscaledWindowSize::from(u16::MAX)
5224 ),
5225 clock.now(),
5226 &counters.refs(),
5227 ),
5228 (None, None)
5229 );
5230 CounterExpectations {
5233 retransmits: 3,
5234 slow_start_retransmits: 3,
5235 timeouts: 3,
5236 ..Default::default()
5237 }
5238 .assert_counters(&counters);
5239 assert_eq!(state.poll_send_at(), Some(clock.now() + Rto::DEFAULT.get()));
5240 assert_eq!(
5241 state.poll_send_with_default_options(1, clock.now(), &counters.refs()),
5242 Some(Segment::with_data(
5243 ISS_1 + 1 + 3,
5244 ISS_1 + 1,
5245 UnscaledWindowSize::from(u16::MAX),
5246 FragmentedPayload::new_contiguous(&TEST_BYTES[3..4]),
5247 ))
5248 );
5249 assert_eq!(
5251 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5252 Segment::ack(
5253 ISS_1 + 1 + TEST_BYTES.len(),
5254 ISS_1 + 1 + TEST_BYTES.len(),
5255 UnscaledWindowSize::from(u16::MAX)
5256 ),
5257 clock.now(),
5258 &counters.refs()
5259 ),
5260 (None, None)
5261 );
5262 assert_eq!(state.poll_send_at(), None);
5264 }
5265
5266 #[test]
5267 fn passive_close() {
5268 let mut clock = FakeInstantCtx::default();
5269 let counters = FakeTcpCounters::default();
5270 let mut send_buffer = RingBuffer::new(BUFFER_SIZE);
5271 assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5);
5272 let mut state = State::Established(Established {
5274 snd: Send::default_for_test(send_buffer.clone()).into(),
5275 rcv: Recv::default_for_test(RingBuffer::new(BUFFER_SIZE)).into(),
5276 });
5277 let last_wnd = WindowSize::new(BUFFER_SIZE - 1).unwrap();
5278 let last_wnd_scale = WindowScale::default();
5279 assert_eq!(
5281 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5282 Segment::fin(TEST_IRS + 1, TEST_ISS + 1, UnscaledWindowSize::from(u16::MAX)),
5283 clock.now(),
5284 &counters.refs(),
5285 ),
5286 (
5287 Some(Segment::ack(
5288 TEST_ISS + 1,
5289 TEST_IRS + 2,
5290 UnscaledWindowSize::from_usize(BUFFER_SIZE - 1)
5291 )),
5292 None
5293 )
5294 );
5295 assert_eq!(
5297 state.close(
5298 &counters.refs(),
5299 CloseReason::Shutdown,
5300 &SocketOptions::default_for_state_tests()
5301 ),
5302 Ok(NewlyClosed::No)
5303 );
5304 assert_eq!(
5305 state,
5306 State::LastAck(LastAck {
5307 snd: Send::default_for_test(send_buffer),
5308 closed_rcv: RecvParams {
5309 ack: TEST_IRS + 2,
5310 wnd: last_wnd,
5311 wnd_scale: last_wnd_scale
5312 }
5313 })
5314 );
5315 assert_eq!(
5317 state.poll_send_with_default_options(2, clock.now(), &counters.refs()),
5318 Some(Segment::with_data(
5319 TEST_ISS + 1,
5320 TEST_IRS + 2,
5321 last_wnd >> WindowScale::default(),
5322 FragmentedPayload::new_contiguous(&TEST_BYTES[..2]),
5323 ))
5324 );
5325 assert_eq!(
5327 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5328 Some(Segment::new_assert_no_discard(
5329 SegmentHeader {
5330 seq: TEST_ISS + 3,
5331 ack: Some(TEST_IRS + 2),
5332 control: Some(Control::FIN),
5333 wnd: last_wnd >> WindowScale::default(),
5334 push: true,
5335 ..Default::default()
5336 },
5337 FragmentedPayload::new_contiguous(&TEST_BYTES[2..]),
5338 ))
5339 );
5340 clock.sleep(RTT);
5342 assert_eq!(
5343 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5344 Segment::ack(
5345 TEST_IRS + 2,
5346 TEST_ISS + 1 + TEST_BYTES.len(),
5347 UnscaledWindowSize::from(u16::MAX)
5348 ),
5349 clock.now(),
5350 &counters.refs(),
5351 ),
5352 (None, None)
5353 );
5354 assert_eq!(state.poll_send_at(), Some(clock.now() + Rto::DEFAULT.get()));
5355 clock.sleep(Rto::DEFAULT.get());
5356 assert_eq!(
5358 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5359 Some(Segment::fin(
5360 TEST_ISS + 1 + TEST_BYTES.len(),
5361 TEST_IRS + 2,
5362 last_wnd >> WindowScale::default()
5363 ))
5364 );
5365
5366 assert_eq!(
5368 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5369 Segment::ack(
5370 TEST_IRS + 2,
5371 TEST_ISS + 1 + TEST_BYTES.len() + 1,
5372 UnscaledWindowSize::from(u16::MAX),
5373 ),
5374 clock.now(),
5375 &counters.refs(),
5376 ),
5377 (None, None)
5378 );
5379 assert_eq!(state, State::Closed(Closed { reason: None }));
5381 CounterExpectations {
5382 retransmits: 1,
5383 slow_start_retransmits: 1,
5384 timeouts: 1,
5385 established_closed: 1,
5386 ..Default::default()
5387 }
5388 .assert_counters(&counters);
5389 }
5390
5391 #[test]
5392 fn syn_rcvd_active_close() {
5393 let counters = FakeTcpCounters::default();
5394 let mut state: State<_, RingBuffer, NullBuffer, ()> = State::SynRcvd(SynRcvd {
5395 iss: TEST_ISS,
5396 irs: TEST_IRS,
5397 timestamp: None,
5398 retrans_timer: RetransTimer {
5399 at: FakeInstant::default(),
5400 rto: Rto::MIN,
5401 user_timeout_until: None,
5402 remaining_retries: Some(DEFAULT_MAX_RETRIES),
5403 },
5404 simultaneous_open: Some(()),
5405 buffer_sizes: Default::default(),
5406 smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
5407 rcv_wnd_scale: WindowScale::default(),
5408 snd_wnd_scale: Some(WindowScale::default()),
5409 sack_permitted: SACK_PERMITTED,
5410 });
5411 assert_eq!(
5412 state.close(
5413 &counters.refs(),
5414 CloseReason::Shutdown,
5415 &SocketOptions::default_for_state_tests()
5416 ),
5417 Ok(NewlyClosed::No)
5418 );
5419 assert_matches!(state, State::FinWait1(_));
5420 assert_eq!(
5421 state.poll_send_with_default_options(
5422 u32::MAX,
5423 FakeInstant::default(),
5424 &counters.refs()
5425 ),
5426 Some(Segment::fin(TEST_ISS + 1, TEST_IRS + 1, UnscaledWindowSize::from(u16::MAX)))
5427 );
5428 }
5429
5430 #[test]
5431 fn established_active_close() {
5432 let mut clock = FakeInstantCtx::default();
5433 let counters = FakeTcpCounters::default();
5434 let mut send_buffer = RingBuffer::new(BUFFER_SIZE);
5435 assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5);
5436 let mut state = State::Established(Established {
5438 snd: Send {
5439 congestion_control: CongestionControl::cubic_with_mss(Mss(
5440 NonZeroU16::new(5).unwrap()
5441 )),
5442 ..Send::default_for_test(send_buffer.clone())
5443 }
5444 .into(),
5445 rcv: Recv {
5446 mss: Mss(NonZeroU16::new(5).unwrap()),
5447 ..Recv::default_for_test(RingBuffer::new(BUFFER_SIZE))
5448 }
5449 .into(),
5450 });
5451 assert_eq!(
5452 state.close(
5453 &counters.refs(),
5454 CloseReason::Shutdown,
5455 &SocketOptions::default_for_state_tests()
5456 ),
5457 Ok(NewlyClosed::No)
5458 );
5459 assert_matches!(state, State::FinWait1(_));
5460 assert_eq!(
5461 state.close(
5462 &counters.refs(),
5463 CloseReason::Shutdown,
5464 &SocketOptions::default_for_state_tests()
5465 ),
5466 Err(CloseError::Closing)
5467 );
5468
5469 assert_eq!(
5471 state.poll_send_with_default_options(2, clock.now(), &counters.refs()),
5472 Some(Segment::with_data(
5473 TEST_ISS + 1,
5474 TEST_IRS + 1,
5475 UnscaledWindowSize::from_usize(BUFFER_SIZE),
5476 FragmentedPayload::new_contiguous(&TEST_BYTES[..2])
5477 ))
5478 );
5479
5480 assert_eq!(
5482 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5483 Some(Segment::new_assert_no_discard(
5484 SegmentHeader {
5485 seq: TEST_ISS + 3,
5486 ack: Some(TEST_IRS + 1),
5487 control: Some(Control::FIN),
5488 wnd: UnscaledWindowSize::from_usize(BUFFER_SIZE),
5489 push: true,
5490 ..Default::default()
5491 },
5492 FragmentedPayload::new_contiguous(&TEST_BYTES[2..])
5493 ))
5494 );
5495
5496 assert_eq!(
5498 state.on_segment_with_default_options::<_, ClientlessBufferProvider>(
5499 Segment::with_data(
5500 TEST_IRS + 1,
5501 TEST_ISS + 1 + 1,
5502 UnscaledWindowSize::from(u16::MAX),
5503 TEST_BYTES
5504 ),
5505 clock.now(),
5506 &counters.refs(),
5507 ),
5508 (
5509 Some(Segment::ack(
5510 TEST_ISS + TEST_BYTES.len() + 2,
5511 TEST_IRS + TEST_BYTES.len() + 1,
5512 UnscaledWindowSize::from_usize(BUFFER_SIZE - TEST_BYTES.len()),
5513 )),
5514 None
5515 )
5516 );
5517
5518 assert_eq!(
5519 state.read_with(|avail| {
5520 let got = avail.concat();
5521 assert_eq!(got, TEST_BYTES);
5522 got.len()
5523 }),
5524 TEST_BYTES.len()
5525 );
5526
5527 assert_eq!(state.poll_send_at(), Some(clock.now() + Rto::DEFAULT.get()));
5529
5530 clock.sleep(Rto::DEFAULT.get());
5532 assert_eq!(
5533 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5534 Some(Segment::new_assert_no_discard(
5535 SegmentHeader {
5536 seq: TEST_ISS + 2,
5537 ack: Some(TEST_IRS + TEST_BYTES.len() + 1),
5538 control: Some(Control::FIN),
5539 wnd: UnscaledWindowSize::from_usize(BUFFER_SIZE),
5540 push: true,
5541 ..Default::default()
5542 },
5543 FragmentedPayload::new_contiguous(&TEST_BYTES[1..]),
5544 ))
5545 );
5546
5547 assert_eq!(
5549 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5550 Segment::ack(
5551 TEST_IRS + TEST_BYTES.len() + 1,
5552 TEST_ISS + TEST_BYTES.len() + 2,
5553 UnscaledWindowSize::from(u16::MAX)
5554 ),
5555 clock.now(),
5556 &counters.refs(),
5557 ),
5558 (None, None)
5559 );
5560 assert_matches!(state, State::FinWait2(_));
5561
5562 assert_eq!(
5564 state.on_segment_with_default_options::<_, ClientlessBufferProvider>(
5565 Segment::with_data(
5566 TEST_IRS + 1 + TEST_BYTES.len(),
5567 TEST_ISS + TEST_BYTES.len() + 2,
5568 UnscaledWindowSize::from(u16::MAX),
5569 TEST_BYTES
5570 ),
5571 clock.now(),
5572 &counters.refs(),
5573 ),
5574 (
5575 Some(Segment::ack(
5576 TEST_ISS + TEST_BYTES.len() + 2,
5577 TEST_IRS + 2 * TEST_BYTES.len() + 1,
5578 UnscaledWindowSize::from_usize(BUFFER_SIZE - TEST_BYTES.len()),
5579 )),
5580 None
5581 )
5582 );
5583
5584 assert_eq!(
5585 state.read_with(|avail| {
5586 let got = avail.concat();
5587 assert_eq!(got, TEST_BYTES);
5588 got.len()
5589 }),
5590 TEST_BYTES.len()
5591 );
5592
5593 assert_eq!(
5595 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5596 Segment::fin(
5597 TEST_IRS + 2 * TEST_BYTES.len() + 1,
5598 TEST_ISS + TEST_BYTES.len() + 2,
5599 UnscaledWindowSize::from(u16::MAX)
5600 ),
5601 clock.now(),
5602 &counters.refs(),
5603 ),
5604 (
5605 Some(Segment::ack(
5606 TEST_ISS + TEST_BYTES.len() + 2,
5607 TEST_IRS + 2 * TEST_BYTES.len() + 2,
5608 UnscaledWindowSize::from_usize(BUFFER_SIZE - 1),
5609 )),
5610 None
5611 )
5612 );
5613
5614 assert_matches!(state, State::TimeWait(_));
5615
5616 const SMALLEST_DURATION: Duration = Duration::from_secs(1);
5617 assert_eq!(state.poll_send_at(), Some(clock.now() + MSL * 2));
5618 clock.sleep(MSL * 2 - SMALLEST_DURATION);
5619 assert_eq!(
5621 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5622 None
5623 );
5624 assert_matches!(state, State::TimeWait(_));
5625 clock.sleep(SMALLEST_DURATION);
5626 assert_eq!(
5628 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5629 None
5630 );
5631 assert_eq!(state, State::Closed(Closed { reason: None }));
5632 CounterExpectations {
5633 retransmits: 1,
5634 slow_start_retransmits: 1,
5635 timeouts: 1,
5636 established_closed: 1,
5637 ..Default::default()
5638 }
5639 .assert_counters(&counters);
5640 }
5641
5642 #[test]
5643 fn fin_wait_1_fin_ack_to_time_wait() {
5644 let counters = FakeTcpCounters::default();
5645 let mut state = State::FinWait1(FinWait1 {
5648 snd: Send { una: TEST_ISS + 1, ..Send::default_for_test_at(TEST_ISS + 2, NullBuffer) }
5649 .into(),
5650 rcv: Recv::default_for_test(RingBuffer::new(BUFFER_SIZE)).into(),
5651 });
5652 assert_eq!(
5653 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5654 Segment::fin(TEST_IRS + 1, TEST_ISS + 2, UnscaledWindowSize::from(u16::MAX)),
5655 FakeInstant::default(),
5656 &counters.refs(),
5657 ),
5658 (
5659 Some(Segment::ack(
5660 TEST_ISS + 2,
5661 TEST_IRS + 2,
5662 UnscaledWindowSize::from_usize(BUFFER_SIZE - 1)
5663 )),
5664 None
5665 ),
5666 );
5667 assert_matches!(state, State::TimeWait(_));
5668 }
5669
5670 #[test]
5671 fn simultaneous_close() {
5672 let mut clock = FakeInstantCtx::default();
5673 let counters = FakeTcpCounters::default();
5674 let mut send_buffer = RingBuffer::new(BUFFER_SIZE);
5675 assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5);
5676
5677 let iss = ISS_1;
5678 let mut state = State::Established(Established {
5680 snd: Send::default_for_test_at(iss + 1, send_buffer.clone()).into(),
5681 rcv: Recv::default_for_test_at(iss + 1, RingBuffer::new(BUFFER_SIZE)).into(),
5682 });
5683 assert_eq!(
5684 state.close(
5685 &counters.refs(),
5686 CloseReason::Shutdown,
5687 &SocketOptions::default_for_state_tests()
5688 ),
5689 Ok(NewlyClosed::No)
5690 );
5691 assert_matches!(state, State::FinWait1(_));
5692 assert_eq!(
5693 state.close(
5694 &counters.refs(),
5695 CloseReason::Shutdown,
5696 &SocketOptions::default_for_state_tests()
5697 ),
5698 Err(CloseError::Closing)
5699 );
5700
5701 let fin = state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs());
5702 assert_eq!(
5703 fin,
5704 Some(Segment::new_assert_no_discard(
5705 SegmentHeader {
5706 seq: iss + 1,
5707 ack: Some(iss + 1),
5708 control: Some(Control::FIN),
5709 wnd: UnscaledWindowSize::from_usize(BUFFER_SIZE),
5710 push: true,
5711 ..Default::default()
5712 },
5713 FragmentedPayload::new_contiguous(TEST_BYTES),
5714 ))
5715 );
5716 assert_eq!(
5717 state.on_segment_with_default_options::<_, ClientlessBufferProvider>(
5718 Segment::piggybacked_fin(
5719 iss + 1,
5720 iss + 1,
5721 UnscaledWindowSize::from_usize(BUFFER_SIZE),
5722 TEST_BYTES,
5723 ),
5724 clock.now(),
5725 &counters.refs(),
5726 ),
5727 (
5728 Some(Segment::ack(
5729 iss + TEST_BYTES.len() + 2,
5730 iss + TEST_BYTES.len() + 2,
5731 UnscaledWindowSize::from_usize(BUFFER_SIZE - TEST_BYTES.len() - 1),
5732 )),
5733 None
5734 )
5735 );
5736
5737 assert_matches!(state, State::Closing(_));
5740 assert_eq!(
5741 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5742 Segment::ack(
5743 iss + TEST_BYTES.len() + 2,
5744 iss + TEST_BYTES.len() + 2,
5745 UnscaledWindowSize::from_usize(BUFFER_SIZE - TEST_BYTES.len() - 1),
5746 ),
5747 clock.now(),
5748 &counters.refs(),
5749 ),
5750 (None, None)
5751 );
5752
5753 assert_matches!(state, State::TimeWait(_));
5756
5757 const SMALLEST_DURATION: Duration = Duration::from_secs(1);
5758 assert_eq!(state.poll_send_at(), Some(clock.now() + MSL * 2));
5759 clock.sleep(MSL * 2 - SMALLEST_DURATION);
5760 assert_eq!(
5762 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5763 None
5764 );
5765 assert_matches!(state, State::TimeWait(_));
5766 clock.sleep(SMALLEST_DURATION);
5767 assert_eq!(
5769 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5770 None
5771 );
5772 assert_eq!(state, State::Closed(Closed { reason: None }));
5773 CounterExpectations { established_closed: 1, ..Default::default() }
5774 .assert_counters(&counters);
5775 }
5776
5777 #[test]
5778 fn time_wait_restarts_timer() {
5779 let mut clock = FakeInstantCtx::default();
5780 let counters = FakeTcpCounters::default();
5781 let mut time_wait = State::<_, NullBuffer, NullBuffer, ()>::TimeWait(TimeWait {
5782 last_seq: TEST_ISS + 2,
5783 closed_rcv: RecvParams {
5784 ack: TEST_IRS + 2,
5785 wnd: WindowSize::DEFAULT,
5786 wnd_scale: WindowScale::default(),
5787 },
5788 expiry: new_time_wait_expiry(clock.now()),
5789 });
5790
5791 assert_eq!(time_wait.poll_send_at(), Some(clock.now() + MSL * 2));
5792 clock.sleep(Duration::from_secs(1));
5793 assert_eq!(
5794 time_wait.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5795 Segment::fin(TEST_IRS + 2, TEST_ISS + 2, UnscaledWindowSize::from(u16::MAX)),
5796 clock.now(),
5797 &counters.refs(),
5798 ),
5799 (
5800 Some(Segment::ack(TEST_ISS + 2, TEST_IRS + 2, UnscaledWindowSize::from(u16::MAX))),
5801 None
5802 ),
5803 );
5804 assert_eq!(time_wait.poll_send_at(), Some(clock.now() + MSL * 2));
5805 }
5806
5807 #[test_case(
5808 State::Established(Established {
5809 snd: Send::default_for_test_at(TEST_ISS, NullBuffer).into(),
5810 rcv: Recv::default_for_test_at(TEST_IRS + 5, RingBuffer::default()).into(),
5811 }),
5812 Segment::with_data(TEST_IRS, TEST_ISS, UnscaledWindowSize::from(u16::MAX), TEST_BYTES) =>
5813 Some(Segment::ack(TEST_ISS, TEST_IRS + 5, UnscaledWindowSize::from(u16::MAX))); "retransmit data"
5814 )]
5815 #[test_case(
5816 State::SynRcvd(SynRcvd {
5817 iss: TEST_ISS,
5818 irs: TEST_IRS,
5819 timestamp: None,
5820 retrans_timer: RetransTimer {
5821 at: FakeInstant::default(),
5822 rto: Rto::MIN,
5823 user_timeout_until: None,
5824 remaining_retries: Some(DEFAULT_MAX_RETRIES),
5825 },
5826 simultaneous_open: None,
5827 buffer_sizes: BufferSizes::default(),
5828 smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
5829 rcv_wnd_scale: WindowScale::default(),
5830 snd_wnd_scale: Some(WindowScale::default()),
5831 sack_permitted: SACK_PERMITTED,
5832 }),
5833 Segment::syn_ack(TEST_IRS, TEST_ISS + 1, UnscaledWindowSize::from(u16::MAX),
5834 HandshakeOptions { window_scale: Some(WindowScale::default()), ..Default::default() }.into()) =>
5835 Some(Segment::ack(TEST_ISS + 1, TEST_IRS + 1, UnscaledWindowSize::from(u16::MAX))); "retransmit syn_ack"
5836 )]
5837 fn ack_to_retransmitted_segment(
5839 mut state: State<FakeInstant, RingBuffer, NullBuffer, ()>,
5840 seg: Segment<&[u8]>,
5841 ) -> Option<Segment<()>> {
5842 let counters = FakeTcpCounters::default();
5843 let (reply, _): (_, Option<()>) = state
5844 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
5845 seg,
5846 FakeInstant::default(),
5847 &counters.refs(),
5848 );
5849 reply
5850 }
5851
5852 #[test]
5853 fn fast_retransmit() {
5854 let mut clock = FakeInstantCtx::default();
5855 let counters = FakeTcpCounters::default();
5856 let mut send_buffer = RingBuffer::default();
5857 let first_payload_byte = b'A';
5858 let last_payload_byte = b'D';
5859 for b in first_payload_byte..=last_payload_byte {
5860 assert_eq!(
5861 send_buffer.enqueue_data(&[b; DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE_USIZE]),
5862 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE_USIZE
5863 );
5864 }
5865 let mut state: State<_, _, _, ()> = State::Established(Established {
5866 snd: Send {
5867 congestion_control: CongestionControl::cubic_with_mss(
5868 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
5869 ),
5870 ..Send::default_for_test_at(TEST_ISS, send_buffer.clone())
5871 }
5872 .into(),
5873 rcv: Recv::default_for_test_at(TEST_IRS, RingBuffer::default()).into(),
5874 });
5875
5876 assert_eq!(
5877 state.poll_send_with_default_options(
5878 u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
5879 clock.now(),
5880 &counters.refs(),
5881 ),
5882 Some(Segment::with_data(
5883 TEST_ISS,
5884 TEST_IRS,
5885 UnscaledWindowSize::from(u16::MAX),
5886 FragmentedPayload::new_contiguous(&[b'A'; DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE_USIZE])
5887 ))
5888 );
5889
5890 let mut dup_ack = |expected_byte: u8, counters: &TcpCountersRefs<'_>| {
5891 clock.sleep(Duration::from_millis(10));
5892 assert_eq!(
5893 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5894 Segment::ack(TEST_IRS, TEST_ISS, UnscaledWindowSize::from(u16::MAX)),
5895 clock.now(),
5896 counters,
5897 ),
5898 (None, None)
5899 );
5900
5901 assert_eq!(
5902 state.poll_send_with_default_options(
5903 u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
5904 clock.now(),
5905 counters,
5906 ),
5907 Some(Segment::new_assert_no_discard(
5908 SegmentHeader {
5909 seq: TEST_ISS
5910 + u32::from(expected_byte - first_payload_byte)
5911 * u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
5912 ack: Some(TEST_IRS),
5913 wnd: UnscaledWindowSize::from(u16::MAX),
5914 push: expected_byte == last_payload_byte,
5915 ..Default::default()
5916 },
5917 FragmentedPayload::new_contiguous(
5918 &[expected_byte; DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE_USIZE]
5919 )
5920 ))
5921 );
5922 };
5923
5924 CounterExpectations::default().assert_counters(&counters);
5927 dup_ack(b'B', &counters.refs());
5928 CounterExpectations { fast_recovery: 0, dup_acks: 1, ..Default::default() }
5929 .assert_counters(&counters);
5930 dup_ack(b'C', &counters.refs());
5931 CounterExpectations { fast_recovery: 0, dup_acks: 2, ..Default::default() }
5932 .assert_counters(&counters);
5933 dup_ack(b'A', &counters.refs());
5936 CounterExpectations {
5937 retransmits: 1,
5938 fast_recovery: 1,
5939 fast_retransmits: 1,
5940 dup_acks: 3,
5941 ..Default::default()
5942 }
5943 .assert_counters(&counters);
5944 dup_ack(b'D', &counters.refs());
5946 CounterExpectations {
5947 retransmits: 1,
5948 fast_recovery: 1,
5949 fast_retransmits: 1,
5950 dup_acks: 4,
5951 ..Default::default()
5952 }
5953 .assert_counters(&counters);
5954
5955 clock.sleep(Duration::from_millis(10));
5957 assert_eq!(
5958 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5959 Segment::ack(
5960 TEST_IRS,
5961 TEST_ISS + u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
5962 UnscaledWindowSize::from(u16::MAX)
5963 ),
5964 clock.now(),
5965 &counters.refs(),
5966 ),
5967 (None, None)
5968 );
5969 let established = assert_matches!(state, State::Established(established) => established);
5970 assert_eq!(
5971 established.snd.congestion_control.inspect_cwnd().cwnd(),
5972 2 * u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE)
5973 );
5974 assert_eq!(established.snd.congestion_control.inspect_loss_recovery_mode(), None);
5975 CounterExpectations {
5976 retransmits: 1,
5977 fast_recovery: 1,
5978 fast_retransmits: 1,
5979 dup_acks: 4,
5980 loss_recovered: 1,
5981 ..Default::default()
5982 }
5983 .assert_counters(&counters);
5984 }
5985
5986 #[test]
5987 fn keep_alive() {
5988 let mut clock = FakeInstantCtx::default();
5989 let counters = FakeTcpCounters::default();
5990 let mut state: State<_, _, _, ()> = State::Established(Established {
5991 snd: Send::default_for_test_at(TEST_ISS, RingBuffer::default()).into(),
5992 rcv: Recv::default_for_test_at(TEST_IRS, RingBuffer::default()).into(),
5993 });
5994
5995 let socket_options = {
5996 let mut socket_options = SocketOptions::default_for_state_tests();
5997 socket_options.keep_alive.enabled = true;
5998 socket_options
5999 };
6000 let socket_options = &socket_options;
6001 let keep_alive = &socket_options.keep_alive;
6002
6003 assert_eq!(
6005 state.poll_send(
6006 &FakeStateMachineDebugId,
6007 &counters.refs(),
6008 u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
6009 clock.now(),
6010 socket_options,
6011 ),
6012 Err(NewlyClosed::No),
6013 );
6014 assert_eq!(state.poll_send_at(), Some(clock.now().panicking_add(keep_alive.idle.into())));
6017
6018 clock.sleep(Duration::from_secs(60 * 60));
6020 assert_eq!(
6021 state.on_segment::<&[u8], ClientlessBufferProvider>(
6022 &FakeStateMachineDebugId,
6023 &counters.refs(),
6024 Segment::ack(TEST_IRS, TEST_ISS, UnscaledWindowSize::from(u16::MAX)),
6025 clock.now(),
6026 socket_options,
6027 false, ),
6029 (None, None, DataAcked::No, NewlyClosed::No)
6030 );
6031 assert_eq!(state.poll_send_at(), Some(clock.now().panicking_add(keep_alive.idle.into())),);
6033 clock.sleep(keep_alive.idle.into());
6034
6035 for _ in 0..keep_alive.count.get() {
6038 assert_eq!(
6039 state.poll_send(
6040 &FakeStateMachineDebugId,
6041 &counters.refs(),
6042 u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
6043 clock.now(),
6044 socket_options,
6045 ),
6046 Ok(Segment::ack(TEST_ISS - 1, TEST_IRS, UnscaledWindowSize::from(u16::MAX)))
6047 );
6048 clock.sleep(keep_alive.interval.into());
6049 assert_matches!(state, State::Established(_));
6050 }
6051
6052 assert_eq!(
6055 state.poll_send(
6056 &FakeStateMachineDebugId,
6057 &counters.refs(),
6058 u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
6059 clock.now(),
6060 socket_options,
6061 ),
6062 Err(NewlyClosed::Yes),
6063 );
6064 assert_eq!(state, State::Closed(Closed { reason: Some(ConnectionError::TimedOut) }));
6065 CounterExpectations {
6066 established_closed: 1,
6067 established_timedout: 1,
6068 ..Default::default()
6069 }
6070 .assert_counters(&counters);
6071 }
6072
6073 #[derive(Debug)]
6075 struct ReservingBuffer<B> {
6076 buffer: B,
6077 reserved_bytes: usize,
6078 }
6079
6080 impl<B: Buffer> Buffer for ReservingBuffer<B> {
6081 fn capacity_range() -> (usize, usize) {
6082 B::capacity_range()
6083 }
6084
6085 fn limits(&self) -> BufferLimits {
6086 self.buffer.limits()
6087 }
6088
6089 fn target_capacity(&self) -> usize {
6090 self.buffer.target_capacity()
6091 }
6092
6093 fn request_capacity(&mut self, size: usize) {
6094 self.buffer.request_capacity(size)
6095 }
6096 }
6097
6098 impl<B: SendBuffer> SendBuffer for ReservingBuffer<B> {
6099 type Payload<'a> = B::Payload<'a>;
6100
6101 fn mark_read(&mut self, count: usize) {
6102 self.buffer.mark_read(count)
6103 }
6104
6105 fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
6106 where
6107 F: FnOnce(B::Payload<'a>) -> R,
6108 {
6109 let Self { buffer, reserved_bytes } = self;
6110 buffer.peek_with(offset, |payload| {
6111 let len = payload.len();
6112 let new_len = len.saturating_sub(*reserved_bytes);
6113 f(payload.slice(0..new_len.try_into().unwrap_or(u32::MAX)))
6114 })
6115 }
6116 }
6117
6118 #[test_case(true, 0)]
6119 #[test_case(false, 0)]
6120 #[test_case(true, 1)]
6121 #[test_case(false, 1)]
6122 fn poll_send_len(has_fin: bool, reserved_bytes: usize) {
6123 const VALUE: u8 = 0xaa;
6124
6125 fn with_poll_send_result<const HAS_FIN: bool>(
6126 f: impl FnOnce(Segment<FragmentedPayload<'_, 2>>),
6127 reserved_bytes: usize,
6128 ) {
6129 const DATA_LEN: usize = 40;
6130 let buffer = ReservingBuffer {
6131 buffer: RingBuffer::with_data(DATA_LEN, &vec![VALUE; DATA_LEN]),
6132 reserved_bytes,
6133 };
6134 assert_eq!(buffer.limits().len, DATA_LEN);
6135
6136 let mut snd = Send::<FakeInstant, _, HAS_FIN>::default_for_test_at(TEST_ISS, buffer);
6137 let counters = FakeTcpCounters::default();
6138
6139 f(snd
6140 .poll_send(
6141 &FakeStateMachineDebugId,
6142 &counters.refs(),
6143 &RecvParams {
6144 ack: TEST_ISS,
6145 wnd: WindowSize::DEFAULT,
6146 wnd_scale: WindowScale::ZERO,
6147 },
6148 u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
6149 FakeInstant::default(),
6150 &SocketOptions::default_for_state_tests(),
6151 )
6152 .expect("has data"))
6153 }
6154
6155 let f = |segment: Segment<FragmentedPayload<'_, 2>>| {
6156 let segment_len = segment.len();
6157 let (SegmentHeader { .. }, data) = segment.into_parts();
6158 let data_len = data.len();
6159
6160 if has_fin && reserved_bytes == 0 {
6161 assert_eq!(
6162 segment_len,
6163 u32::try_from(data_len + 1).unwrap(),
6164 "FIN not accounted for"
6165 );
6166 } else {
6167 assert_eq!(segment_len, u32::try_from(data_len).unwrap());
6168 }
6169
6170 let mut target = vec![0; data_len];
6171 data.partial_copy(0, target.as_mut_slice());
6172 assert_eq!(target, vec![VALUE; data_len]);
6173 };
6174 match has_fin {
6175 true => with_poll_send_result::<true>(f, reserved_bytes),
6176 false => with_poll_send_result::<false>(f, reserved_bytes),
6177 }
6178 }
6179
6180 #[test]
6181 fn zero_window_probe() {
6182 let mut clock = FakeInstantCtx::default();
6183 let counters = FakeTcpCounters::default();
6184 let mut send_buffer = RingBuffer::new(BUFFER_SIZE);
6185 assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5);
6186 let mut state = State::Established(Established {
6188 snd: Send {
6189 wnd: WindowSize::ZERO,
6190 wnd_max: WindowSize::ZERO,
6191 ..Send::default_for_test(send_buffer.clone())
6192 }
6193 .into(),
6194 rcv: Recv::default_for_test(RingBuffer::new(BUFFER_SIZE)).into(),
6195 });
6196 assert_eq!(
6197 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
6198 None
6199 );
6200 assert_eq!(state.poll_send_at(), Some(clock.now().panicking_add(Rto::DEFAULT.get())));
6201
6202 clock.sleep(Rto::DEFAULT.get());
6204 assert_eq!(
6205 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
6206 Some(Segment::with_data(
6207 TEST_ISS + 1,
6208 TEST_IRS + 1,
6209 UnscaledWindowSize::from_usize(BUFFER_SIZE),
6210 FragmentedPayload::new_contiguous(&TEST_BYTES[0..1])
6211 ))
6212 );
6213
6214 assert_eq!(
6216 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
6217 Segment::ack(TEST_IRS + 1, TEST_ISS + 1, UnscaledWindowSize::from(0)),
6218 clock.now(),
6219 &counters.refs(),
6220 ),
6221 (None, None)
6222 );
6223 assert_eq!(
6225 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
6226 None
6227 );
6228 assert_eq!(state.poll_send_at(), Some(clock.now().panicking_add(Rto::DEFAULT.get() * 2)));
6229
6230 clock.sleep(Rto::DEFAULT.get());
6232 assert_eq!(
6233 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
6234 None
6235 );
6236
6237 clock.sleep(Rto::DEFAULT.get());
6239 assert_eq!(
6240 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
6241 Some(Segment::with_data(
6242 TEST_ISS + 1,
6243 TEST_IRS + 1,
6244 UnscaledWindowSize::from_usize(BUFFER_SIZE),
6245 FragmentedPayload::new_contiguous(&TEST_BYTES[0..1])
6246 ))
6247 );
6248
6249 assert_eq!(
6251 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
6252 Segment::ack(TEST_IRS + 1, TEST_ISS + 2, UnscaledWindowSize::from(u16::MAX)),
6253 clock.now(),
6254 &counters.refs(),
6255 ),
6256 (None, None)
6257 );
6258 assert_eq!(state.poll_send_at(), None);
6259 assert_eq!(
6260 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
6261 Some(Segment::new_assert_no_discard(
6262 SegmentHeader {
6263 seq: TEST_ISS + 2,
6264 ack: Some(TEST_IRS + 1),
6265 wnd: UnscaledWindowSize::from_usize(BUFFER_SIZE),
6266 push: true,
6267 ..Default::default()
6268 },
6269 FragmentedPayload::new_contiguous(&TEST_BYTES[1..])
6270 ))
6271 );
6272 }
6273
6274 #[test]
6275 fn nagle() {
6276 let clock = FakeInstantCtx::default();
6277 let counters = FakeTcpCounters::default();
6278 let mut send_buffer = RingBuffer::new(BUFFER_SIZE);
6279 assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5);
6280 let mut state: State<_, _, _, ()> = State::Established(Established {
6282 snd: Send::default_for_test(send_buffer.clone()).into(),
6283 rcv: Recv::default_for_test(RingBuffer::new(BUFFER_SIZE)).into(),
6284 });
6285 let mut socket_options =
6286 SocketOptions { nagle_enabled: true, ..SocketOptions::default_for_state_tests() };
6287 assert_eq!(
6288 state.poll_send(
6289 &FakeStateMachineDebugId,
6290 &counters.refs(),
6291 3,
6292 clock.now(),
6293 &socket_options
6294 ),
6295 Ok(Segment::with_data(
6296 TEST_ISS + 1,
6297 TEST_IRS + 1,
6298 UnscaledWindowSize::from_usize(BUFFER_SIZE),
6299 FragmentedPayload::new_contiguous(&TEST_BYTES[0..3])
6300 ))
6301 );
6302 assert_eq!(
6303 state.poll_send(
6304 &FakeStateMachineDebugId,
6305 &counters.refs(),
6306 3,
6307 clock.now(),
6308 &socket_options
6309 ),
6310 Err(NewlyClosed::No)
6311 );
6312 socket_options.nagle_enabled = false;
6313 assert_eq!(
6314 state.poll_send(
6315 &FakeStateMachineDebugId,
6316 &counters.refs(),
6317 3,
6318 clock.now(),
6319 &socket_options
6320 ),
6321 Ok(Segment::new_assert_no_discard(
6322 SegmentHeader {
6323 seq: TEST_ISS + 4,
6324 ack: Some(TEST_IRS + 1),
6325 wnd: UnscaledWindowSize::from_usize(BUFFER_SIZE),
6326 push: true,
6327 ..Default::default()
6328 },
6329 FragmentedPayload::new_contiguous(&TEST_BYTES[3..])
6330 ))
6331 );
6332 }
6333
6334 #[test]
6335 fn mss_option() {
6336 let clock = FakeInstantCtx::default();
6337 let counters = FakeTcpCounters::default();
6338 let (syn_sent, syn) = Closed::<Initial>::connect(
6339 TEST_ISS,
6340 clock.now(),
6341 (),
6342 Default::default(),
6343 Mss(NonZeroU16::new(1).unwrap()),
6344 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6345 &SocketOptions::default_for_state_tests(),
6346 );
6347 let mut state = State::<_, RingBuffer, RingBuffer, ()>::SynSent(syn_sent);
6348
6349 let (seg, passive_open) = state
6351 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
6352 syn,
6353 clock.now(),
6354 &counters.refs(),
6355 );
6356 let syn_ack = seg.expect("expected SYN-ACK");
6357 assert_eq!(passive_open, None);
6358
6359 assert_eq!(
6361 state.on_segment_with_default_options::<_, ClientlessBufferProvider>(
6362 syn_ack,
6363 clock.now(),
6364 &counters.refs(),
6365 ),
6366 (
6367 Some(Segment::ack(TEST_ISS + 1, TEST_ISS + 1, UnscaledWindowSize::from(u16::MAX))),
6368 None
6369 )
6370 );
6371 match state {
6372 State::Closed(_)
6373 | State::Listen(_)
6374 | State::SynRcvd(_)
6375 | State::SynSent(_)
6376 | State::LastAck(_)
6377 | State::FinWait1(_)
6378 | State::FinWait2(_)
6379 | State::Closing(_)
6380 | State::TimeWait(_) => {
6381 panic!("expected that we have entered established state, but got {:?}", state)
6382 }
6383 State::Established(Established { ref mut snd, rcv: _ })
6384 | State::CloseWait(CloseWait { ref mut snd, closed_rcv: _ }) => {
6385 assert_eq!(snd.buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len());
6386 }
6387 }
6388 assert_eq!(
6390 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
6391 Some(Segment::with_data(
6392 TEST_ISS + 1,
6393 TEST_ISS + 1,
6394 UnscaledWindowSize::from(u16::MAX),
6395 FragmentedPayload::new_contiguous(&TEST_BYTES[..1]),
6396 ))
6397 );
6398 }
6399
6400 const TEST_USER_TIMEOUT: NonZeroDuration = NonZeroDuration::from_secs(2 * 60).unwrap();
6401
6402 #[test_case(Duration::from_millis(1), false, true; "retrans_max_retries")]
6406 #[test_case(Duration::from_secs(1), false, false; "retrans_no_max_retries")]
6407 #[test_case(Duration::from_millis(1), true, true; "zwp_max_retries")]
6408 #[test_case(Duration::from_secs(1), true, false; "zwp_no_max_retires")]
6409 fn user_timeout(rtt: Duration, zero_window_probe: bool, max_retries: bool) {
6410 let mut clock = FakeInstantCtx::default();
6411 let counters = FakeTcpCounters::default();
6412 let mut send_buffer = RingBuffer::new(BUFFER_SIZE);
6413 assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5);
6414 let mut state: State<_, _, _, ()> = State::Established(Established {
6416 snd: Send {
6417 rtt_estimator: Estimator::Measured { srtt: rtt, rtt_var: Duration::ZERO },
6418 ..Send::default_for_test(send_buffer.clone())
6419 }
6420 .into(),
6421 rcv: Recv::default_for_test(RingBuffer::new(BUFFER_SIZE)).into(),
6422 });
6423 let mut times = 0;
6424 let start = clock.now();
6425 let socket_options = SocketOptions {
6426 user_timeout: (!max_retries).then_some(TEST_USER_TIMEOUT),
6427 ..SocketOptions::default_for_state_tests()
6428 };
6429 while let Ok(seg) = state.poll_send(
6430 &FakeStateMachineDebugId,
6431 &counters.refs(),
6432 u32::MAX,
6433 clock.now(),
6434 &socket_options,
6435 ) {
6436 if zero_window_probe {
6437 let zero_window_ack = Segment::ack(
6438 seg.header().ack.unwrap(),
6439 seg.header().seq,
6440 UnscaledWindowSize::from(0),
6441 );
6442 assert_matches!(
6443 state.on_segment::<(), ClientlessBufferProvider>(
6444 &FakeStateMachineDebugId,
6445 &counters.refs(),
6446 zero_window_ack,
6447 clock.now(),
6448 &socket_options,
6449 false,
6450 ),
6451 (None, None, DataAcked::No, _newly_closed)
6452 );
6453
6454 assert_matches!(
6459 state.poll_send(
6460 &FakeStateMachineDebugId,
6461 &counters.refs(),
6462 u32::MAX,
6463 clock.now(),
6464 &socket_options,
6465 ),
6466 Err(NewlyClosed::No)
6467 );
6468 let inner_state = assert_matches!(state, State::Established(ref e) => e);
6469 assert_matches!(inner_state.snd.timer, Some(SendTimer::ZeroWindowProbe(_)));
6470 }
6471
6472 let deadline = state.poll_send_at().expect("must have a retransmission timer");
6473 clock.sleep(deadline.checked_duration_since(clock.now()).unwrap());
6474 times += 1;
6475 }
6476 let elapsed = clock.now().checked_duration_since(start).unwrap();
6477 if max_retries {
6478 assert_eq!(times, 1 + DEFAULT_MAX_RETRIES.get());
6479 } else {
6480 assert_eq!(elapsed, TEST_USER_TIMEOUT.get());
6481 assert!(times < DEFAULT_MAX_RETRIES.get());
6482 }
6483 assert_eq!(state, State::Closed(Closed { reason: Some(ConnectionError::TimedOut) }));
6484 CounterExpectations {
6485 established_closed: 1,
6486 established_timedout: 1,
6487 fast_recovery: if zero_window_probe { 1 } else { 0 },
6488 timeouts: counters.stack_wide.timeouts.get(),
6491 retransmits: counters.stack_wide.retransmits.get(),
6492 slow_start_retransmits: counters.stack_wide.slow_start_retransmits.get(),
6493 dup_acks: counters.stack_wide.dup_acks.get(),
6494 ..Default::default()
6495 }
6496 .assert_counters(&counters);
6497 }
6498
6499 #[test]
6500 fn retrans_timer_backoff() {
6501 let mut clock = FakeInstantCtx::default();
6502 let mut timer = RetransTimer::new(
6503 clock.now(),
6504 Rto::DEFAULT,
6505 Some(TEST_USER_TIMEOUT),
6506 DEFAULT_MAX_RETRIES,
6507 );
6508 assert_eq!(timer.at, FakeInstant::from(Rto::DEFAULT.get()));
6509 clock.sleep(TEST_USER_TIMEOUT.get());
6510 timer.backoff(clock.now());
6511 assert_eq!(timer.at, FakeInstant::from(TEST_USER_TIMEOUT.get()));
6512 clock.sleep(Duration::from_secs(1));
6513 timer.backoff(clock.now());
6515 assert_eq!(timer.at, FakeInstant::from(TEST_USER_TIMEOUT.get()));
6517 }
6518
6519 #[test_case(
6520 State::Established(Established {
6521 snd: Send {
6522 rtt_estimator: Estimator::Measured {
6523 srtt: Rto::DEFAULT.get(),
6524 rtt_var: Duration::ZERO,
6525 },
6526 ..Send::default_for_test(RingBuffer::new(BUFFER_SIZE))
6527 }.into(),
6528 rcv: Recv::default_for_test(RingBuffer::new(
6529 TEST_BYTES.len() + 2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE) as usize,
6530 )).into(),
6531 }); "established")]
6532 #[test_case(
6533 State::FinWait1(FinWait1 {
6534 snd: Send {
6535 rtt_estimator: Estimator::Measured {
6536 srtt: Rto::DEFAULT.get(),
6537 rtt_var: Duration::ZERO,
6538 },
6539 ..Send::default_for_test(RingBuffer::new(BUFFER_SIZE))
6540 }.into(),
6541 rcv: Recv::default_for_test(RingBuffer::new(
6542 TEST_BYTES.len() + 2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE) as usize,
6543 )).into(),
6544 }); "fin_wait_1")]
6545 #[test_case(
6546 State::FinWait2(FinWait2 {
6547 last_seq: TEST_ISS + 1,
6548 rcv: Recv::default_for_test(RingBuffer::new(
6549 TEST_BYTES.len() + 2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE) as usize,
6550 )),
6551 timeout_at: None,
6552 }); "fin_wait_2")]
6553 fn delayed_ack(mut state: State<FakeInstant, RingBuffer, RingBuffer, ()>) {
6554 let mut clock = FakeInstantCtx::default();
6555 let counters = FakeTcpCounters::default();
6556 let socket_options =
6557 SocketOptions { delayed_ack: true, ..SocketOptions::default_for_state_tests() };
6558 assert_eq!(
6559 state.on_segment::<_, ClientlessBufferProvider>(
6560 &FakeStateMachineDebugId,
6561 &counters.refs(),
6562 Segment::with_data(
6563 TEST_IRS + 1,
6564 TEST_ISS + 1,
6565 UnscaledWindowSize::from(u16::MAX),
6566 TEST_BYTES,
6567 ),
6568 clock.now(),
6569 &socket_options,
6570 false, ),
6572 (None, None, DataAcked::No, NewlyClosed::No)
6573 );
6574 assert_eq!(state.poll_send_at(), Some(clock.now().panicking_add(ACK_DELAY_THRESHOLD)));
6575 clock.sleep(ACK_DELAY_THRESHOLD);
6576 assert_eq!(
6577 state.poll_send(
6578 &FakeStateMachineDebugId,
6579 &counters.refs(),
6580 u32::MAX,
6581 clock.now(),
6582 &socket_options
6583 ),
6584 Ok(Segment::ack(
6585 TEST_ISS + 1,
6586 TEST_IRS + 1 + TEST_BYTES.len(),
6587 UnscaledWindowSize::from_u32(2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE)),
6588 ))
6589 );
6590 let full_segment_sized_payload =
6591 vec![b'0'; u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE) as usize];
6592
6593 let expect_last_window_update = (
6594 TEST_IRS + 1 + TEST_BYTES.len(),
6595 WindowSize::from_u32(2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE)).unwrap(),
6596 );
6597 assert_eq!(state.recv_mut().unwrap().last_window_update, expect_last_window_update);
6598 assert_eq!(
6600 state.on_segment::<_, ClientlessBufferProvider>(
6601 &FakeStateMachineDebugId,
6602 &counters.refs(),
6603 Segment::with_data(
6604 TEST_IRS + 1 + TEST_BYTES.len(),
6605 TEST_ISS + 1,
6606 UnscaledWindowSize::from(u16::MAX),
6607 &full_segment_sized_payload[..],
6608 ),
6609 clock.now(),
6610 &socket_options,
6611 false, ),
6613 (None, None, DataAcked::No, NewlyClosed::No)
6614 );
6615 assert_eq!(state.poll_send_at(), Some(clock.now().panicking_add(ACK_DELAY_THRESHOLD)));
6617 assert_eq!(state.recv_mut().unwrap().last_window_update, expect_last_window_update);
6620
6621 assert_eq!(
6624 state.on_segment::<_, ClientlessBufferProvider>(
6625 &FakeStateMachineDebugId,
6626 &counters.refs(),
6627 Segment::with_data(
6628 TEST_IRS + 1 + TEST_BYTES.len() + full_segment_sized_payload.len(),
6629 TEST_ISS + 1,
6630 UnscaledWindowSize::from(u16::MAX),
6631 &full_segment_sized_payload[..],
6632 ),
6633 clock.now(),
6634 &socket_options,
6635 false, ),
6637 (
6638 Some(Segment::ack(
6639 TEST_ISS + 1,
6640 TEST_IRS + 1 + TEST_BYTES.len() + 2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE),
6641 UnscaledWindowSize::from(0),
6642 )),
6643 None,
6644 DataAcked::No,
6645 NewlyClosed::No,
6646 )
6647 );
6648 assert_eq!(
6650 state.recv_mut().unwrap().last_window_update,
6651 (
6652 TEST_IRS + 1 + TEST_BYTES.len() + 2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE),
6653 WindowSize::ZERO,
6654 )
6655 );
6656 assert_eq!(state.poll_send_at(), None);
6657 }
6658
6659 #[test_case(true; "sack permitted")]
6660 #[test_case(false; "sack not permitted")]
6661 fn immediate_ack_if_out_of_order_or_fin(sack_permitted: bool) {
6662 let clock = FakeInstantCtx::default();
6663 let counters = FakeTcpCounters::default();
6664 let socket_options =
6665 SocketOptions { delayed_ack: true, ..SocketOptions::default_for_state_tests() };
6666 let mut state: State<_, _, _, ()> = State::Established(Established {
6667 snd: Send::default_for_test(RingBuffer::new(BUFFER_SIZE)).into(),
6668 rcv: Recv {
6669 sack_permitted,
6670 ..Recv::default_for_test(RingBuffer::new(TEST_BYTES.len() + 1))
6671 }
6672 .into(),
6673 });
6674 let segment_start = TEST_IRS + 2;
6677 assert_eq!(
6678 state.on_segment::<_, ClientlessBufferProvider>(
6679 &FakeStateMachineDebugId,
6680 &counters.refs(),
6681 Segment::with_data(
6682 segment_start,
6683 TEST_ISS + 1,
6684 UnscaledWindowSize::from(u16::MAX),
6685 &TEST_BYTES[1..]
6686 ),
6687 clock.now(),
6688 &socket_options,
6689 false, ),
6691 (
6692 Some(Segment::ack_with_options(
6693 TEST_ISS + 1,
6694 TEST_IRS + 1,
6695 UnscaledWindowSize::from(u16::try_from(TEST_BYTES.len() + 1).unwrap()),
6696 SegmentOptions {
6697 sack_blocks: if sack_permitted {
6698 [SackBlock::try_new(
6699 segment_start,
6700 segment_start + u32::try_from(TEST_BYTES.len()).unwrap() - 1,
6701 )
6702 .unwrap()]
6703 .into_iter()
6704 .collect()
6705 } else {
6706 SackBlocks::default()
6707 }
6708 }
6709 .into()
6710 )),
6711 None,
6712 DataAcked::No,
6713 NewlyClosed::No,
6714 )
6715 );
6716 assert_eq!(state.poll_send_at(), None);
6717 assert_eq!(
6720 state.on_segment::<_, ClientlessBufferProvider>(
6721 &FakeStateMachineDebugId,
6722 &counters.refs(),
6723 Segment::with_data(
6724 TEST_IRS + 1,
6725 TEST_ISS + 1,
6726 UnscaledWindowSize::from(u16::MAX),
6727 &TEST_BYTES[..1]
6728 ),
6729 clock.now(),
6730 &socket_options,
6731 false, ),
6733 (
6734 Some(Segment::ack(
6735 TEST_ISS + 1,
6736 TEST_IRS + 1 + TEST_BYTES.len(),
6737 UnscaledWindowSize::from(1),
6738 )),
6739 None,
6740 DataAcked::No,
6741 NewlyClosed::No
6742 )
6743 );
6744 assert_eq!(state.poll_send_at(), None);
6745 assert_eq!(
6747 state.on_segment::<(), ClientlessBufferProvider>(
6748 &FakeStateMachineDebugId,
6749 &counters.refs(),
6750 Segment::fin(
6751 TEST_IRS + 1 + TEST_BYTES.len(),
6752 TEST_ISS + 1,
6753 UnscaledWindowSize::from(u16::MAX),
6754 ),
6755 clock.now(),
6756 &socket_options,
6757 false, ),
6759 (
6760 Some(Segment::ack(
6761 TEST_ISS + 1,
6762 TEST_IRS + 1 + TEST_BYTES.len() + 1,
6763 UnscaledWindowSize::from(0),
6764 )),
6765 None,
6766 DataAcked::No,
6767 NewlyClosed::No,
6768 )
6769 );
6770 assert_eq!(state.poll_send_at(), None);
6771 }
6772
6773 #[test]
6774 fn fin_wait2_timeout() {
6775 let mut clock = FakeInstantCtx::default();
6776 let counters = FakeTcpCounters::default();
6777 let mut state: State<_, _, NullBuffer, ()> = State::FinWait2(FinWait2 {
6778 last_seq: TEST_ISS,
6779 rcv: Recv::default_for_test_at(TEST_IRS, NullBuffer),
6780 timeout_at: None,
6781 });
6782 assert_eq!(
6783 state.close(
6784 &counters.refs(),
6785 CloseReason::Close { now: clock.now() },
6786 &SocketOptions::default_for_state_tests()
6787 ),
6788 Err(CloseError::Closing)
6789 );
6790 assert_eq!(
6791 state.poll_send_at(),
6792 Some(clock.now().panicking_add(DEFAULT_FIN_WAIT2_TIMEOUT))
6793 );
6794 clock.sleep(DEFAULT_FIN_WAIT2_TIMEOUT);
6795 assert_eq!(
6796 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
6797 None
6798 );
6799 assert_eq!(state, State::Closed(Closed { reason: Some(ConnectionError::TimedOut) }));
6800 CounterExpectations {
6801 established_closed: 1,
6802 established_timedout: 1,
6803 ..Default::default()
6804 }
6805 .assert_counters(&counters);
6806 }
6807
6808 #[test_case(RetransTimer {
6809 user_timeout_until: Some(FakeInstant::from(Duration::from_secs(100))),
6810 remaining_retries: None,
6811 at: FakeInstant::from(Duration::from_secs(1)),
6812 rto: Rto::new(Duration::from_secs(1)),
6813 }, FakeInstant::from(Duration::from_secs(1)) => true)]
6814 #[test_case(RetransTimer {
6815 user_timeout_until: Some(FakeInstant::from(Duration::from_secs(100))),
6816 remaining_retries: None,
6817 at: FakeInstant::from(Duration::from_secs(2)),
6818 rto: Rto::new(Duration::from_secs(1)),
6819 }, FakeInstant::from(Duration::from_secs(1)) => false)]
6820 #[test_case(RetransTimer {
6821 user_timeout_until: Some(FakeInstant::from(Duration::from_secs(100))),
6822 remaining_retries: Some(NonZeroU8::new(1).unwrap()),
6823 at: FakeInstant::from(Duration::from_secs(2)),
6824 rto: Rto::new(Duration::from_secs(1)),
6825 }, FakeInstant::from(Duration::from_secs(1)) => false)]
6826 #[test_case(RetransTimer {
6827 user_timeout_until: Some(FakeInstant::from(Duration::from_secs(1))),
6828 remaining_retries: Some(NonZeroU8::new(1).unwrap()),
6829 at: FakeInstant::from(Duration::from_secs(1)),
6830 rto: Rto::new(Duration::from_secs(1)),
6831 }, FakeInstant::from(Duration::from_secs(1)) => true)]
6832 fn send_timed_out(timer: RetransTimer<FakeInstant>, now: FakeInstant) -> bool {
6833 timer.timed_out(now)
6834 }
6835
6836 #[test_case(
6837 State::SynSent(SynSent{
6838 iss: TEST_ISS,
6839 timestamp: Some(FakeInstant::default()),
6840 retrans_timer: RetransTimer::new(
6841 FakeInstant::default(),
6842 Rto::MIN,
6843 NonZeroDuration::from_secs(60),
6844 DEFAULT_MAX_SYN_RETRIES,
6845 ),
6846 active_open: (),
6847 buffer_sizes: Default::default(),
6848 device_mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6849 default_mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6850 rcv_wnd_scale: WindowScale::default(),
6851 })
6852 => DEFAULT_MAX_SYN_RETRIES.get(); "syn_sent")]
6853 #[test_case(
6854 State::SynRcvd(SynRcvd{
6855 iss: TEST_ISS,
6856 irs: TEST_IRS,
6857 timestamp: Some(FakeInstant::default()),
6858 retrans_timer: RetransTimer::new(
6859 FakeInstant::default(),
6860 Rto::MIN,
6861 NonZeroDuration::from_secs(60),
6862 DEFAULT_MAX_SYNACK_RETRIES,
6863 ),
6864 simultaneous_open: None,
6865 buffer_sizes: BufferSizes::default(),
6866 smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6867 rcv_wnd_scale: WindowScale::default(),
6868 snd_wnd_scale: Some(WindowScale::default()),
6869 sack_permitted: SACK_PERMITTED,
6870 })
6871 => DEFAULT_MAX_SYNACK_RETRIES.get(); "syn_rcvd")]
6872 fn handshake_timeout(mut state: State<FakeInstant, RingBuffer, RingBuffer, ()>) -> u8 {
6873 let mut clock = FakeInstantCtx::default();
6874 let counters = FakeTcpCounters::default();
6875 let mut retransmissions = 0;
6876 clock.sleep_until(state.poll_send_at().expect("must have a retransmission timer"));
6877 while let Some(_seg) =
6878 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs())
6879 {
6880 let deadline = state.poll_send_at().expect("must have a retransmission timer");
6881 clock.sleep_until(deadline);
6882 retransmissions += 1;
6883 }
6884 assert_eq!(state, State::Closed(Closed { reason: Some(ConnectionError::TimedOut) }));
6885 CounterExpectations::default().assert_counters(&counters);
6886 retransmissions
6887 }
6888
6889 #[test_case(
6890 u16::MAX as usize, WindowScale::default(), Some(WindowScale::default())
6891 => (WindowScale::default(), WindowScale::default()))]
6892 #[test_case(
6893 u16::MAX as usize + 1, WindowScale::new(1).unwrap(), Some(WindowScale::default())
6894 => (WindowScale::new(1).unwrap(), WindowScale::default()))]
6895 #[test_case(
6896 u16::MAX as usize + 1, WindowScale::new(1).unwrap(), None
6897 => (WindowScale::default(), WindowScale::default()))]
6898 #[test_case(
6899 u16::MAX as usize, WindowScale::default(), Some(WindowScale::new(1).unwrap())
6900 => (WindowScale::default(), WindowScale::new(1).unwrap()))]
6901 fn window_scale(
6902 buffer_size: usize,
6903 syn_window_scale: WindowScale,
6904 syn_ack_window_scale: Option<WindowScale>,
6905 ) -> (WindowScale, WindowScale) {
6906 let mut clock = FakeInstantCtx::default();
6907 let counters = FakeTcpCounters::default();
6908 let (syn_sent, syn_seg) = Closed::<Initial>::connect(
6909 TEST_ISS,
6910 clock.now(),
6911 (),
6912 BufferSizes { receive: buffer_size, ..Default::default() },
6913 DEVICE_MAXIMUM_SEGMENT_SIZE,
6914 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6915 &SocketOptions::default_for_state_tests(),
6916 );
6917 assert_eq!(
6918 syn_seg,
6919 Segment::syn(
6920 TEST_ISS,
6921 UnscaledWindowSize::from(u16::try_from(buffer_size).unwrap_or(u16::MAX)),
6922 HandshakeOptions {
6923 mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE),
6924 window_scale: Some(syn_window_scale),
6925 sack_permitted: SACK_PERMITTED,
6926 }
6927 .into(),
6928 )
6929 );
6930 let mut active = State::SynSent(syn_sent);
6931 clock.sleep(RTT / 2);
6932 let (seg, passive_open) = active
6933 .on_segment_with_default_options::<(), ClientlessBufferProvider>(
6934 Segment::syn_ack(
6935 TEST_IRS,
6936 TEST_ISS + 1,
6937 UnscaledWindowSize::from(u16::MAX),
6938 HandshakeOptions {
6939 mss: Some(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
6940 window_scale: syn_ack_window_scale,
6941 sack_permitted: SACK_PERMITTED,
6942 }
6943 .into(),
6944 ),
6945 clock.now(),
6946 &counters.refs(),
6947 );
6948 assert_eq!(passive_open, None);
6949 assert_matches!(seg, Some(_));
6950
6951 let established: Established<FakeInstant, RingBuffer, NullBuffer> =
6952 assert_matches!(active, State::Established(established) => established);
6953
6954 assert_eq!(established.snd.wnd, WindowSize::DEFAULT);
6955
6956 (established.rcv.wnd_scale, established.snd.wnd_scale)
6957 }
6958
6959 #[test_case(
6960 u16::MAX as usize,
6961 Segment::syn_ack(
6962 TEST_IRS + 1 + u16::MAX as usize,
6963 TEST_ISS + 1,
6964 UnscaledWindowSize::from(u16::MAX),
6965 Options::default(),
6966 )
6967 )]
6968 #[test_case(
6969 u16::MAX as usize + 1,
6970 Segment::syn_ack(
6971 TEST_IRS + 1 + u16::MAX as usize,
6972 TEST_ISS + 1,
6973 UnscaledWindowSize::from(u16::MAX),
6974 Options::default(),
6975 )
6976 )]
6977 #[test_case(
6978 u16::MAX as usize,
6979 Segment::with_data(
6980 TEST_IRS + 1 + u16::MAX as usize,
6981 TEST_ISS + 1,
6982 UnscaledWindowSize::from(u16::MAX),
6983 &TEST_BYTES[..],
6984 )
6985 )]
6986 #[test_case(
6987 u16::MAX as usize + 1,
6988 Segment::with_data(
6989 TEST_IRS + 1 + u16::MAX as usize,
6990 TEST_ISS + 1,
6991 UnscaledWindowSize::from(u16::MAX),
6992 &TEST_BYTES[..],
6993 )
6994 )]
6995 fn window_scale_otw_seq(receive_buf_size: usize, otw_seg: impl Into<Segment<&'static [u8]>>) {
6996 let counters = FakeTcpCounters::default();
6997 let buffer_sizes = BufferSizes { send: 0, receive: receive_buf_size };
6998 let rcv_wnd_scale = buffer_sizes.rwnd().scale();
6999 let mut syn_rcvd: State<_, RingBuffer, RingBuffer, ()> = State::SynRcvd(SynRcvd {
7000 iss: TEST_ISS,
7001 irs: TEST_IRS,
7002 timestamp: None,
7003 retrans_timer: RetransTimer::new(
7004 FakeInstant::default(),
7005 Rto::DEFAULT,
7006 NonZeroDuration::from_secs(10),
7007 DEFAULT_MAX_SYNACK_RETRIES,
7008 ),
7009 simultaneous_open: None,
7010 buffer_sizes: BufferSizes { send: 0, receive: receive_buf_size },
7011 smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
7012 rcv_wnd_scale,
7013 snd_wnd_scale: WindowScale::new(1),
7014 sack_permitted: SACK_PERMITTED,
7015 });
7016
7017 assert_eq!(
7018 syn_rcvd.on_segment_with_default_options::<_, ClientlessBufferProvider>(
7019 otw_seg.into(),
7020 FakeInstant::default(),
7021 &counters.refs()
7022 ),
7023 (Some(Segment::ack(TEST_ISS + 1, TEST_IRS + 1, buffer_sizes.rwnd_unscaled())), None),
7024 )
7025 }
7026
7027 #[test]
7028 fn poll_send_reserving_buffer() {
7029 const RESERVED_BYTES: usize = 3;
7030 let mut snd: Send<FakeInstant, _, false> = Send::default_for_test(ReservingBuffer {
7031 buffer: RingBuffer::with_data(TEST_BYTES.len(), TEST_BYTES),
7032 reserved_bytes: RESERVED_BYTES,
7033 });
7034
7035 let counters = FakeTcpCounters::default();
7036
7037 assert_eq!(
7038 snd.poll_send(
7039 &FakeStateMachineDebugId,
7040 &counters.refs(),
7041 &RecvParams {
7042 ack: TEST_IRS + 1,
7043 wnd: WindowSize::DEFAULT,
7044 wnd_scale: WindowScale::ZERO,
7045 },
7046 u32::MAX,
7047 FakeInstant::default(),
7048 &SocketOptions::default_for_state_tests(),
7049 ),
7050 Some(Segment::with_data(
7051 TEST_ISS + 1,
7052 TEST_IRS + 1,
7053 WindowSize::DEFAULT >> WindowScale::default(),
7054 FragmentedPayload::new_contiguous(&TEST_BYTES[..TEST_BYTES.len() - RESERVED_BYTES])
7055 ))
7056 );
7057
7058 assert_eq!(snd.nxt, TEST_ISS + 1 + (TEST_BYTES.len() - RESERVED_BYTES));
7059 }
7060
7061 #[test]
7062 fn rcv_silly_window_avoidance() {
7063 const MULTIPLE: usize = 3;
7064 const CAP: usize = TEST_BYTES.len() * MULTIPLE;
7065 let mut rcv: Recv<FakeInstant, RingBuffer> = Recv {
7066 mss: Mss(NonZeroU16::new(TEST_BYTES.len() as u16).unwrap()),
7067 ..Recv::default_for_test_at(TEST_IRS, RingBuffer::new(CAP))
7068 };
7069
7070 fn get_buffer(rcv: &mut Recv<FakeInstant, RingBuffer>) -> &mut RingBuffer {
7071 assert_matches!(
7072 &mut rcv.buffer,
7073 RecvBufferState::Open {ref mut buffer, .. } => buffer
7074 )
7075 }
7076
7077 assert_eq!(rcv.calculate_window_size().window_size, WindowSize::new(CAP).unwrap());
7079
7080 for _ in 0..MULTIPLE {
7081 assert_eq!(get_buffer(&mut rcv).enqueue_data(TEST_BYTES), TEST_BYTES.len());
7082 }
7083 let assembler = assert_matches!(&mut rcv.buffer,
7084 RecvBufferState::Open { ref mut assembler, .. } => assembler);
7085 assert_eq!(assembler.insert(TEST_IRS..TEST_IRS + CAP), CAP);
7086 assert_eq!(rcv.calculate_window_size().window_size, WindowSize::ZERO);
7088
7089 assert_eq!(get_buffer(&mut rcv).read_with(|_| 1), 1);
7092 assert_eq!(rcv.calculate_window_size().window_size, WindowSize::ZERO);
7093
7094 assert_eq!(get_buffer(&mut rcv).read_with(|_| TEST_BYTES.len()), TEST_BYTES.len());
7097 assert_eq!(
7098 rcv.calculate_window_size().window_size,
7099 WindowSize::new(TEST_BYTES.len() + 1).unwrap()
7100 );
7101 }
7102
7103 #[test]
7104 fn correct_window_scale_during_send() {
7106 let snd_wnd_scale = WindowScale::new(4).unwrap();
7107 let rcv_wnd_scale = WindowScale::new(8).unwrap();
7108 let wnd_size = WindowSize::new(1024).unwrap();
7109
7110 let counters = FakeTcpCounters::default();
7111 let new_snd = || Send {
7114 wnd: wnd_size,
7115 wnd_scale: snd_wnd_scale,
7116 ..Send::default_for_test(RingBuffer::with_data(TEST_BYTES.len(), TEST_BYTES))
7117 };
7118 let new_rcv = || Recv {
7119 wnd_scale: rcv_wnd_scale,
7120 ..Recv::default_for_test(RingBuffer::new(wnd_size.into()))
7121 };
7122 for mut state in [
7123 State::Established(Established { snd: new_snd().into(), rcv: new_rcv().into() }),
7124 State::FinWait1(FinWait1 { snd: new_snd().queue_fin().into(), rcv: new_rcv().into() }),
7125 State::Closing(Closing {
7126 snd: new_snd().queue_fin(),
7127 closed_rcv: RecvParams {
7128 ack: TEST_IRS + 1,
7129 wnd: wnd_size,
7130 wnd_scale: rcv_wnd_scale,
7131 },
7132 }),
7133 State::CloseWait(CloseWait {
7134 snd: new_snd().into(),
7135 closed_rcv: RecvParams {
7136 ack: TEST_IRS + 1,
7137 wnd: wnd_size,
7138 wnd_scale: rcv_wnd_scale,
7139 },
7140 }),
7141 State::LastAck(LastAck {
7142 snd: new_snd().queue_fin(),
7143 closed_rcv: RecvParams {
7144 ack: TEST_IRS + 1,
7145 wnd: wnd_size,
7146 wnd_scale: rcv_wnd_scale,
7147 },
7148 }),
7149 ] {
7150 assert_eq!(
7151 state.poll_send_with_default_options(
7152 u32::try_from(TEST_BYTES.len()).unwrap(),
7153 FakeInstant::default(),
7154 &counters.refs(),
7155 ),
7156 Some(Segment::new_assert_no_discard(
7157 SegmentHeader {
7158 seq: TEST_ISS + 1,
7159 ack: Some(TEST_IRS + 1),
7160 wnd: UnscaledWindowSize::from(4),
7163 push: true,
7164 ..Default::default()
7165 },
7166 FragmentedPayload::new_contiguous(TEST_BYTES)
7167 ))
7168 );
7169 }
7170 }
7171
7172 #[test_case(true; "prompted window update")]
7173 #[test_case(false; "unprompted window update")]
7174 fn snd_silly_window_avoidance(prompted_window_update: bool) {
7175 const CAP: usize = TEST_BYTES.len() * 2;
7176 let mut snd: Send<FakeInstant, RingBuffer, false> = Send {
7177 wl1: TEST_IRS,
7178 wl2: TEST_ISS,
7179 wnd: WindowSize::new(CAP).unwrap(),
7180 congestion_control: CongestionControl::cubic_with_mss(Mss(NonZeroU16::new(
7181 TEST_BYTES.len() as u16,
7182 )
7183 .unwrap())),
7184 ..Send::default_for_test(RingBuffer::new(CAP))
7185 };
7186
7187 let mut clock = FakeInstantCtx::default();
7188 let counters = FakeTcpCounters::default();
7189
7190 assert_eq!(snd.buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len());
7192 assert_eq!(snd.buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len());
7193
7194 assert_eq!(
7196 snd.poll_send(
7197 &FakeStateMachineDebugId,
7198 &counters.refs(),
7199 &RecvParams {
7200 ack: TEST_IRS + 1,
7201 wnd: WindowSize::DEFAULT,
7202 wnd_scale: WindowScale::ZERO,
7203 },
7204 u32::MAX,
7205 clock.now(),
7206 &SocketOptions::default_for_state_tests(),
7207 ),
7208 Some(Segment::with_data(
7209 TEST_ISS + 1,
7210 TEST_IRS + 1,
7211 UnscaledWindowSize::from(u16::MAX),
7212 FragmentedPayload::new_contiguous(TEST_BYTES),
7213 )),
7214 );
7215
7216 assert_eq!(
7217 snd.process_ack(
7218 &FakeStateMachineDebugId,
7219 &counters.refs(),
7220 TEST_IRS + 1,
7221 TEST_ISS + 1 + TEST_BYTES.len(),
7222 UnscaledWindowSize::from(0),
7223 &SackBlocks::EMPTY,
7224 true,
7225 &RecvParams {
7226 ack: TEST_IRS + 1,
7227 wnd_scale: WindowScale::default(),
7228 wnd: WindowSize::DEFAULT,
7229 },
7230 clock.now(),
7231 &SocketOptions::default(),
7232 ),
7233 (None, DataAcked::Yes)
7234 );
7235
7236 assert_eq!(
7239 snd.poll_send(
7240 &FakeStateMachineDebugId,
7241 &counters.refs(),
7242 &RecvParams {
7243 ack: TEST_IRS + 1,
7244 wnd: WindowSize::DEFAULT,
7245 wnd_scale: WindowScale::ZERO,
7246 },
7247 u32::MAX,
7248 clock.now(),
7249 &SocketOptions::default_for_state_tests(),
7250 ),
7251 None
7252 );
7253
7254 assert_eq!(
7255 snd.timer,
7256 Some(SendTimer::ZeroWindowProbe(RetransTimer::new(
7257 clock.now(),
7258 snd.rtt_estimator.rto(),
7259 None,
7260 DEFAULT_MAX_RETRIES,
7261 )))
7262 );
7263
7264 clock.sleep_until(snd.timer.as_ref().unwrap().expiry());
7265
7266 assert_eq!(
7267 snd.poll_send(
7268 &FakeStateMachineDebugId,
7269 &counters.refs(),
7270 &RecvParams {
7271 ack: TEST_IRS + 1,
7272 wnd: WindowSize::DEFAULT,
7273 wnd_scale: WindowScale::ZERO,
7274 },
7275 u32::MAX,
7276 clock.now(),
7277 &SocketOptions::default_for_state_tests(),
7278 ),
7279 Some(Segment::with_data(
7280 TEST_ISS + 1 + TEST_BYTES.len(),
7281 TEST_IRS + 1,
7282 UnscaledWindowSize::from(u16::MAX),
7283 FragmentedPayload::new_contiguous(&TEST_BYTES[..1]),
7284 ))
7285 );
7286
7287 if prompted_window_update {
7288 assert_eq!(
7291 snd.process_ack(
7292 &FakeStateMachineDebugId,
7293 &counters.refs(),
7294 TEST_IRS + 1,
7295 TEST_ISS + 1 + TEST_BYTES.len() + 1,
7296 UnscaledWindowSize::from(3),
7297 &SackBlocks::EMPTY,
7298 true,
7299 &RecvParams {
7300 ack: TEST_IRS + 1,
7301 wnd_scale: WindowScale::default(),
7302 wnd: WindowSize::DEFAULT,
7303 },
7304 clock.now(),
7305 &SocketOptions::default(),
7306 ),
7307 (None, DataAcked::Yes)
7308 );
7309 } else {
7310 assert_eq!(
7312 snd.process_ack(
7313 &FakeStateMachineDebugId,
7314 &counters.refs(),
7315 TEST_IRS + 1,
7316 TEST_ISS + 1 + TEST_BYTES.len(),
7317 UnscaledWindowSize::from(0),
7318 &SackBlocks::EMPTY,
7319 true,
7320 &RecvParams {
7321 ack: TEST_IRS + 1,
7322 wnd_scale: WindowScale::default(),
7323 wnd: WindowSize::DEFAULT,
7324 },
7325 clock.now(),
7326 &SocketOptions::default(),
7327 ),
7328 (None, DataAcked::No)
7329 );
7330
7331 assert_eq!(
7334 snd.process_ack(
7335 &FakeStateMachineDebugId,
7336 &counters.refs(),
7337 TEST_IRS + 1,
7338 TEST_ISS + 1 + TEST_BYTES.len(),
7339 UnscaledWindowSize::from(3),
7340 &SackBlocks::EMPTY,
7341 true,
7342 &RecvParams {
7343 ack: TEST_IRS + 1,
7344 wnd_scale: WindowScale::default(),
7345 wnd: WindowSize::DEFAULT,
7346 },
7347 clock.now(),
7348 &SocketOptions::default(),
7349 ),
7350 (None, DataAcked::No)
7351 );
7352 }
7353
7354 assert_eq!(
7356 snd.poll_send(
7357 &FakeStateMachineDebugId,
7358 &counters.refs(),
7359 &RecvParams {
7360 ack: TEST_IRS + 1,
7361 wnd: WindowSize::DEFAULT,
7362 wnd_scale: WindowScale::ZERO,
7363 },
7364 u32::MAX,
7365 clock.now(),
7366 &SocketOptions::default_for_state_tests(),
7367 ),
7368 None,
7369 );
7370 assert_eq!(
7371 snd.timer,
7372 Some(SendTimer::SWSProbe { at: clock.now().panicking_add(SWS_PROBE_TIMEOUT) })
7373 );
7374 clock.sleep(SWS_PROBE_TIMEOUT);
7375
7376 let seq_index = usize::from(prompted_window_update);
7379 assert_eq!(
7380 snd.poll_send(
7381 &FakeStateMachineDebugId,
7382 &counters.refs(),
7383 &RecvParams {
7384 ack: TEST_IRS + 1,
7385 wnd: WindowSize::DEFAULT,
7386 wnd_scale: WindowScale::ZERO,
7387 },
7388 u32::MAX,
7389 clock.now(),
7390 &SocketOptions::default_for_state_tests(),
7391 ),
7392 Some(Segment::with_data(
7393 TEST_ISS + 1 + TEST_BYTES.len() + seq_index,
7394 TEST_IRS + 1,
7395 UnscaledWindowSize::from(u16::MAX),
7396 FragmentedPayload::new_contiguous(&TEST_BYTES[seq_index..3 + seq_index]),
7397 ))
7398 );
7399 }
7400
7401 #[test]
7402 fn snd_enter_zwp_on_negative_window_update() {
7403 const CAP: usize = TEST_BYTES.len() * 2;
7404 let mut snd: Send<FakeInstant, RingBuffer, false> = Send {
7405 wnd: WindowSize::new(CAP).unwrap(),
7406 wl1: TEST_IRS,
7407 wl2: TEST_ISS,
7408 congestion_control: CongestionControl::cubic_with_mss(Mss(NonZeroU16::new(
7409 TEST_BYTES.len() as u16,
7410 )
7411 .unwrap())),
7412 ..Send::default_for_test(RingBuffer::new(CAP))
7413 };
7414
7415 let clock = FakeInstantCtx::default();
7416 let counters = FakeTcpCounters::default();
7417
7418 assert_eq!(snd.buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len());
7420 assert_eq!(snd.buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len());
7421
7422 assert_eq!(
7424 snd.poll_send(
7425 &FakeStateMachineDebugId,
7426 &counters.refs(),
7427 &RecvParams {
7428 ack: TEST_IRS + 1,
7429 wnd: WindowSize::DEFAULT,
7430 wnd_scale: WindowScale::ZERO,
7431 },
7432 u32::MAX,
7433 clock.now(),
7434 &SocketOptions::default_for_state_tests(),
7435 ),
7436 Some(Segment::with_data(
7437 TEST_ISS + 1,
7438 TEST_IRS + 1,
7439 UnscaledWindowSize::from(u16::MAX),
7440 FragmentedPayload::new_contiguous(TEST_BYTES),
7441 )),
7442 );
7443
7444 assert_matches!(snd.timer, Some(SendTimer::Retrans(_)));
7447
7448 assert_eq!(
7456 snd.process_ack(
7457 &FakeStateMachineDebugId,
7458 &counters.refs(),
7459 TEST_IRS + 1,
7460 TEST_ISS + 1 + TEST_BYTES.len(),
7461 UnscaledWindowSize::from(0),
7462 &SackBlocks::EMPTY,
7463 true,
7464 &RecvParams {
7465 ack: TEST_IRS + 1,
7466 wnd_scale: WindowScale::default(),
7467 wnd: WindowSize::DEFAULT,
7468 },
7469 clock.now(),
7470 &SocketOptions::default(),
7471 ),
7472 (None, DataAcked::Yes)
7473 );
7474
7475 assert_eq!(
7478 snd.poll_send(
7479 &FakeStateMachineDebugId,
7480 &counters.refs(),
7481 &RecvParams {
7482 ack: TEST_IRS + 1,
7483 wnd: WindowSize::DEFAULT,
7484 wnd_scale: WindowScale::ZERO,
7485 },
7486 u32::MAX,
7487 clock.now(),
7488 &SocketOptions::default_for_state_tests(),
7489 ),
7490 None,
7491 );
7492 assert_matches!(snd.timer, Some(SendTimer::ZeroWindowProbe(_)));
7493 }
7494
7495 #[test]
7496 fn ack_uses_snd_max() {
7498 let counters = FakeTcpCounters::default();
7499 let mss = Mss(NonZeroU16::new(u16::try_from(TEST_BYTES.len()).unwrap()).unwrap());
7500 let mut clock = FakeInstantCtx::default();
7501 let mut buffer = RingBuffer::new(BUFFER_SIZE);
7502 assert_eq!(buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len());
7503 assert_eq!(buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len());
7504
7505 let iss = ISS_1 + 1;
7507 let mut state: State<_, _, _, ()> = State::Established(Established {
7508 snd: Send {
7509 congestion_control: CongestionControl::cubic_with_mss(mss),
7510 ..Send::default_for_test_at(iss, buffer)
7511 }
7512 .into(),
7513 rcv: Recv { mss, ..Recv::default_for_test_at(iss, RingBuffer::new(BUFFER_SIZE)) }
7514 .into(),
7515 });
7516
7517 assert_eq!(
7519 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
7520 Some(Segment::with_data(
7521 iss,
7522 iss,
7523 UnscaledWindowSize::from(u16::try_from(BUFFER_SIZE).unwrap()),
7524 FragmentedPayload::new_contiguous(TEST_BYTES),
7525 )),
7526 );
7527 assert_eq!(
7528 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
7529 Some(Segment::new_assert_no_discard(
7530 SegmentHeader {
7531 seq: iss + TEST_BYTES.len(),
7532 ack: Some(iss),
7533 wnd: UnscaledWindowSize::from(u16::try_from(BUFFER_SIZE).unwrap()),
7534 push: true,
7535 ..Default::default()
7536 },
7537 FragmentedPayload::new_contiguous(TEST_BYTES),
7538 )),
7539 );
7540
7541 clock.sleep(Rto::DEFAULT.get());
7543 assert_eq!(
7544 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
7545 Some(Segment::with_data(
7546 iss,
7547 iss,
7548 UnscaledWindowSize::from(u16::try_from(BUFFER_SIZE).unwrap()),
7549 FragmentedPayload::new_contiguous(TEST_BYTES),
7550 )),
7551 );
7552
7553 assert_eq!(
7556 state.on_segment_with_default_options::<_, ClientlessBufferProvider>(
7557 Segment::with_data(
7558 iss,
7559 iss,
7560 UnscaledWindowSize::from(u16::try_from(BUFFER_SIZE).unwrap()),
7561 TEST_BYTES,
7562 ),
7563 clock.now(),
7564 &counters.refs(),
7565 ),
7566 (
7567 Some(Segment::ack(
7568 iss + 2 * TEST_BYTES.len(),
7569 iss + TEST_BYTES.len(),
7570 UnscaledWindowSize::from(
7571 u16::try_from(BUFFER_SIZE - TEST_BYTES.len()).unwrap()
7572 ),
7573 )),
7574 None,
7575 )
7576 );
7577 }
7578
7579 #[test_case(
7580 State::Closed(Closed { reason: None }),
7581 State::Closed(Closed { reason: None }) => NewlyClosed::No; "closed to closed")]
7582 #[test_case(
7583 State::SynSent(SynSent {
7584 iss: TEST_ISS,
7585 timestamp: Some(FakeInstant::default()),
7586 retrans_timer: RetransTimer::new(
7587 FakeInstant::default(),
7588 Rto::DEFAULT,
7589 None,
7590 DEFAULT_MAX_SYN_RETRIES,
7591 ),
7592 active_open: (),
7593 buffer_sizes: BufferSizes::default(),
7594 default_mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
7595 device_mss: DEVICE_MAXIMUM_SEGMENT_SIZE,
7596 rcv_wnd_scale: WindowScale::default(),
7597 }),
7598 State::Closed(Closed { reason: None }) => NewlyClosed::Yes; "non-closed to closed")]
7599 #[test_case(
7600 State::SynSent(SynSent {
7601 iss: TEST_ISS,
7602 timestamp: Some(FakeInstant::default()),
7603 retrans_timer: RetransTimer::new(
7604 FakeInstant::default(),
7605 Rto::DEFAULT,
7606 None,
7607 DEFAULT_MAX_SYN_RETRIES,
7608 ),
7609 active_open: (),
7610 buffer_sizes: BufferSizes::default(),
7611 default_mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
7612 device_mss: DEVICE_MAXIMUM_SEGMENT_SIZE,
7613 rcv_wnd_scale: WindowScale::default(),
7614 },
7615 ),
7616 State::SynRcvd(SynRcvd {
7617 iss: TEST_ISS,
7618 irs: TEST_IRS,
7619 timestamp: None,
7620 retrans_timer: RetransTimer::new(
7621 FakeInstant::default(),
7622 Rto::DEFAULT,
7623 NonZeroDuration::from_secs(10),
7624 DEFAULT_MAX_SYNACK_RETRIES,
7625 ),
7626 simultaneous_open: None,
7627 buffer_sizes: BufferSizes { send: 0, receive: 0 },
7628 smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
7629 rcv_wnd_scale: WindowScale::new(0).unwrap(),
7630 snd_wnd_scale: WindowScale::new(0),
7631 sack_permitted: SACK_PERMITTED,
7632 }) => NewlyClosed::No; "non-closed to non-closed")]
7633 fn transition_to_state(
7634 mut old_state: State<FakeInstant, RingBuffer, RingBuffer, ()>,
7635 new_state: State<FakeInstant, RingBuffer, RingBuffer, ()>,
7636 ) -> NewlyClosed {
7637 let counters = FakeTcpCounters::default();
7638 old_state.transition_to_state(&counters.refs(), new_state)
7639 }
7640
7641 #[test_case(true, false; "more than mss dequeued")]
7642 #[test_case(false, false; "less than mss dequeued")]
7643 #[test_case(true, true; "more than mss dequeued and delack")]
7644 #[test_case(false, true; "less than mss dequeued and delack")]
7645 fn poll_receive_data_dequeued_state(dequeue_more_than_mss: bool, delayed_ack: bool) {
7646 const BUFFER_SIZE: usize = 5;
7649 const MSS: Mss = Mss(NonZeroU16::new(5).unwrap());
7650 const TEST_BYTES: &[u8] = "Hello".as_bytes();
7651
7652 let new_snd = || Send {
7653 congestion_control: CongestionControl::cubic_with_mss(MSS),
7654 ..Send::default_for_test(NullBuffer)
7655 };
7656 let new_rcv = || Recv { mss: MSS, ..Recv::default_for_test(RingBuffer::new(BUFFER_SIZE)) };
7657
7658 let clock = FakeInstantCtx::default();
7659 let counters = FakeTcpCounters::default();
7660 for mut state in [
7661 State::Established(Established { snd: new_snd().into(), rcv: new_rcv().into() }),
7662 State::FinWait1(FinWait1 { snd: new_snd().queue_fin().into(), rcv: new_rcv().into() }),
7663 State::FinWait2(FinWait2 { last_seq: TEST_ISS + 1, rcv: new_rcv(), timeout_at: None }),
7664 ] {
7665 assert_matches!(state.poll_receive_data_dequeued(), None);
7668 let expect_window_update = (TEST_IRS + 1, WindowSize::new(BUFFER_SIZE).unwrap());
7669 assert_eq!(state.recv_mut().unwrap().last_window_update, expect_window_update);
7670
7671 let seg = state.on_segment_with_options::<_, ClientlessBufferProvider>(
7675 Segment::with_data(
7676 TEST_IRS + 1,
7677 TEST_ISS + 1,
7678 UnscaledWindowSize::from(0),
7679 TEST_BYTES,
7680 ),
7681 clock.now(),
7682 &counters.refs(),
7683 &SocketOptions { delayed_ack, ..SocketOptions::default_for_state_tests() },
7684 );
7685
7686 let expect_ack = (!delayed_ack).then_some(Segment::ack(
7687 TEST_ISS + 1,
7688 TEST_IRS + 1 + TEST_BYTES.len(),
7689 UnscaledWindowSize::from((BUFFER_SIZE - TEST_BYTES.len()) as u16),
7690 ));
7691 assert_eq!(seg, (expect_ack, None));
7692 assert_matches!(state.poll_receive_data_dequeued(), None);
7693
7694 let expect_window_update = if delayed_ack {
7695 expect_window_update
7696 } else {
7697 (TEST_IRS + 1 + TEST_BYTES.len(), WindowSize::new(0).unwrap())
7698 };
7699 assert_eq!(state.recv_mut().unwrap().last_window_update, expect_window_update);
7700
7701 if dequeue_more_than_mss {
7702 assert_eq!(
7706 state.read_with(|available| {
7707 assert_eq!(available, &[TEST_BYTES]);
7708 available[0].len()
7709 }),
7710 TEST_BYTES.len()
7711 );
7712 assert_eq!(
7713 state.poll_receive_data_dequeued(),
7714 Some(Segment::ack(
7715 TEST_ISS + 1,
7716 TEST_IRS + 1 + TEST_BYTES.len(),
7717 UnscaledWindowSize::from(BUFFER_SIZE as u16)
7718 ))
7719 );
7720 assert_eq!(
7721 state.recv_mut().unwrap().last_window_update,
7722 (TEST_IRS + 1 + TEST_BYTES.len(), WindowSize::new(BUFFER_SIZE).unwrap())
7723 );
7724 assert!(state.recv_mut().unwrap().timer.is_none());
7726 } else {
7727 let mss: usize = MSS.get().get().into();
7731 assert_eq!(
7732 state.read_with(|available| {
7733 assert_eq!(available, &[TEST_BYTES]);
7734 mss / 2 - 1
7735 }),
7736 mss / 2 - 1
7737 );
7738 assert_eq!(state.poll_receive_data_dequeued(), None,);
7739 assert_eq!(
7740 state.recv_mut().unwrap().last_window_update,
7741 expect_window_update,
7743 );
7744 assert_eq!(state.recv_mut().unwrap().timer.is_some(), delayed_ack);
7745 }
7746 }
7747 }
7748
7749 #[test]
7750 fn poll_receive_data_dequeued_small_window() {
7751 const MSS: Mss = Mss(NonZeroU16::new(65000).unwrap());
7752 const WINDOW: WindowSize = WindowSize::from_u32(500).unwrap();
7753 let mut recv = Recv::<FakeInstant, _> {
7754 mss: MSS,
7755 last_window_update: (TEST_IRS + 1, WindowSize::from_u32(1).unwrap()),
7756 ..Recv::default_for_test(RingBuffer::new(WINDOW.into()))
7757 };
7758 let seg = recv.poll_receive_data_dequeued(TEST_ISS).expect("generates segment");
7759 assert_eq!(seg.header().ack, Some(recv.nxt()));
7760 assert_eq!(seg.header().wnd << recv.wnd_scale, WINDOW);
7761 }
7762
7763 #[test]
7764 fn quickack_period() {
7765 let mut quickack = default_quickack_counter();
7766 let mut state = State::Established(Established::<FakeInstant, _, _> {
7767 snd: Send::default_for_test(NullBuffer).into(),
7768 rcv: Recv {
7769 remaining_quickacks: quickack,
7770 ..Recv::default_for_test(RingBuffer::default())
7771 }
7772 .into(),
7773 });
7774 let socket_options = SocketOptions { delayed_ack: true, ..Default::default() };
7775 let clock = FakeInstantCtx::default();
7776 let counters = FakeTcpCounters::default();
7777 let data = vec![0u8; usize::from(DEVICE_MAXIMUM_SEGMENT_SIZE)];
7778 while quickack != 0 {
7779 let seq = state.recv_mut().unwrap().nxt();
7780 let segment = Segment::new_assert_no_discard(
7781 SegmentHeader { seq, ack: Some(TEST_ISS + 1), ..Default::default() },
7782 &data[..],
7783 );
7784 let (seg, passive_open) = state.on_segment_with_options::<_, ClientlessBufferProvider>(
7785 segment,
7786 clock.now(),
7787 &counters.refs(),
7788 &socket_options,
7789 );
7790 let recv = state.recv_mut().unwrap();
7791 assert_matches!(recv.timer, None);
7792
7793 assert_eq!(passive_open, None);
7794 let seg = seg.expect("no segment generated");
7795 assert_eq!(seg.header().ack, Some(seq + u32::try_from(data.len()).unwrap()));
7796 assert_eq!(recv.remaining_quickacks, quickack - 1);
7797 quickack -= 1;
7798 state.buffers_mut().into_receive_buffer().unwrap().reset();
7799 }
7800
7801 let segment = Segment::new_assert_no_discard(
7803 SegmentHeader {
7804 seq: state.recv_mut().unwrap().nxt(),
7805 ack: Some(TEST_ISS + 1),
7806 ..Default::default()
7807 },
7808 &data[..],
7809 );
7810 let (seg, passive_open) = state.on_segment_with_options::<_, ClientlessBufferProvider>(
7811 segment,
7812 clock.now(),
7813 &counters.refs(),
7814 &socket_options,
7815 );
7816 assert_eq!(passive_open, None);
7817 assert_eq!(seg, None);
7818 assert_matches!(state.recv_mut().unwrap().timer, Some(ReceiveTimer::DelayedAck { .. }));
7819 }
7820
7821 #[test]
7822 fn quickack_reset_out_of_window() {
7823 let mut state = State::Established(Established::<FakeInstant, _, _> {
7824 snd: Send::default_for_test(NullBuffer).into(),
7825 rcv: Recv::default_for_test(RingBuffer::default()).into(),
7826 });
7827 let clock = FakeInstantCtx::default();
7828 let counters = FakeTcpCounters::default();
7829 let data = vec![0u8; usize::from(DEVICE_MAXIMUM_SEGMENT_SIZE)];
7830
7831 let segment = Segment::new_assert_no_discard(
7832 SegmentHeader {
7833 seq: state.recv_mut().unwrap().nxt() - i32::try_from(data.len() + 1).unwrap(),
7835 ack: Some(TEST_ISS + 1),
7836 ..Default::default()
7837 },
7838 &data[..],
7839 );
7840 let (seg, passive_open) = state
7841 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
7842 segment,
7843 clock.now(),
7844 &counters.refs(),
7845 );
7846 assert_eq!(passive_open, None);
7847 let recv = state.recv_mut().unwrap();
7848 let seg = seg.expect("expected segment");
7849 assert_eq!(seg.header().ack, Some(recv.nxt()));
7850 assert_eq!(recv.remaining_quickacks, default_quickack_counter());
7851 }
7852
7853 #[test]
7854 fn quickack_reset_rto() {
7855 let mut clock = FakeInstantCtx::default();
7856 let mut state = State::Established(Established::<FakeInstant, _, _> {
7857 snd: Send::default_for_test(NullBuffer).into(),
7858 rcv: Recv {
7859 last_segment_at: Some(clock.now()),
7860 ..Recv::default_for_test(RingBuffer::default())
7861 }
7862 .into(),
7863 });
7864 let counters = FakeTcpCounters::default();
7865 let data = vec![0u8; usize::from(DEVICE_MAXIMUM_SEGMENT_SIZE)];
7866
7867 let segment = Segment::new_assert_no_discard(
7868 SegmentHeader {
7869 seq: state.recv_mut().unwrap().nxt(),
7870 ack: Some(TEST_ISS + 1),
7871 ..Default::default()
7872 },
7873 &data[..],
7874 );
7875 clock.sleep(Rto::DEFAULT.get());
7876 let (seg, passive_open) = state
7877 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
7878 segment,
7879 clock.now(),
7880 &counters.refs(),
7881 );
7882 assert_eq!(passive_open, None);
7883 let recv = state.recv_mut().unwrap();
7884 let seg = seg.expect("expected segment");
7885 assert_eq!(seg.header().ack, Some(recv.nxt()));
7886 assert_eq!(recv.remaining_quickacks, default_quickack_counter() - 1);
7889 assert_eq!(recv.last_segment_at, Some(clock.now()));
7890 }
7891
7892 #[test_case(true; "sack permitted")]
7893 #[test_case(false; "sack not permitted")]
7894 fn receiver_selective_acks(sack_permitted: bool) {
7895 let mut state = State::Established(Established::<FakeInstant, _, _> {
7896 snd: Send::default_for_test(RingBuffer::default()).into(),
7897 rcv: Recv { sack_permitted, ..Recv::default_for_test(RingBuffer::default()) }.into(),
7898 });
7899 let clock = FakeInstantCtx::default();
7900 let counters = FakeTcpCounters::default();
7901 let data = vec![0u8; usize::from(DEVICE_MAXIMUM_SEGMENT_SIZE)];
7902 let mss = u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE);
7903 let seg_start = TEST_IRS + 1 + mss;
7905 let segment = Segment::new_assert_no_discard(
7906 SegmentHeader {
7907 seq: seg_start,
7908 ack: Some(TEST_ISS + 1),
7909 wnd: WindowSize::DEFAULT >> WindowScale::default(),
7910 ..Default::default()
7911 },
7912 &data[..],
7913 );
7914 let (seg, passive_open) = state
7915 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
7916 segment,
7917 clock.now(),
7918 &counters.refs(),
7919 );
7920 assert_eq!(passive_open, None);
7921 let seg = seg.expect("expected segment");
7922 assert_eq!(seg.header().ack, Some(TEST_IRS + 1));
7923 let expect = if sack_permitted {
7924 SackBlocks::from_iter([SackBlock::try_new(seg_start, seg_start + mss).unwrap()])
7925 } else {
7926 SackBlocks::default()
7927 };
7928 let sack_blocks =
7929 assert_matches!(&seg.header().options, Options::Segment(o) => &o.sack_blocks);
7930 assert_eq!(sack_blocks, &expect);
7931
7932 assert_eq!(
7934 state.buffers_mut().into_send_buffer().unwrap().enqueue_data(&data[..]),
7935 data.len()
7936 );
7937 let seg = state
7938 .poll_send_with_default_options(mss, clock.now(), &counters.refs())
7939 .expect("generates segment");
7940 assert_eq!(seg.header().ack, Some(TEST_IRS + 1));
7941
7942 let sack_blocks =
7944 assert_matches!(&seg.header().options, Options::Segment(o) => &o.sack_blocks);
7945 assert_eq!(sack_blocks, &expect);
7946 let expect_len = if sack_permitted {
7949 mss - 12
7951 } else {
7952 mss
7953 };
7954 assert_eq!(seg.len(), expect_len);
7955
7956 let segment = Segment::new_assert_no_discard(
7959 SegmentHeader { seq: TEST_IRS + 1, ack: Some(TEST_ISS + 1), ..Default::default() },
7960 &data[..],
7961 );
7962 let (seg, passive_open) = state
7963 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
7964 segment,
7965 clock.now(),
7966 &counters.refs(),
7967 );
7968 assert_eq!(passive_open, None);
7969 let seg = seg.expect("expected segment");
7970 assert_eq!(seg.header().ack, Some(TEST_IRS + (2 * mss) + 1));
7971 let sack_blocks =
7972 assert_matches!(&seg.header().options, Options::Segment(o) => &o.sack_blocks);
7973 assert_eq!(sack_blocks, &SackBlocks::default());
7974 }
7975
7976 #[derive(Debug)]
7977 enum RttTestScenario {
7978 AckOne,
7979 AckTwo,
7980 Retransmit,
7981 AckPartial,
7982 }
7983
7984 #[test_case(RttTestScenario::AckOne)]
7986 #[test_case(RttTestScenario::AckTwo)]
7987 #[test_case(RttTestScenario::Retransmit)]
7988 #[test_case(RttTestScenario::AckPartial)]
7989 fn rtt(scenario: RttTestScenario) {
7990 let mut state = State::Established(Established::<FakeInstant, _, _> {
7991 snd: Send::default_for_test(RingBuffer::default()).into(),
7992 rcv: Recv::default_for_test(RingBuffer::default()).into(),
7993 });
7994
7995 const CLOCK_STEP: Duration = Duration::from_millis(1);
7996
7997 let data = "Hello World".as_bytes();
7998 let data_len_u32 = data.len().try_into().unwrap();
7999 let mut clock = FakeInstantCtx::default();
8000 let counters = FakeTcpCounters::default();
8001 for _ in 0..3 {
8002 assert_eq!(
8003 state.buffers_mut().into_send_buffer().unwrap().enqueue_data(data),
8004 data.len()
8005 );
8006 }
8007
8008 assert_eq!(state.assert_established().snd.rtt_sampler, RttSampler::NotTracking);
8009 let seg = state
8010 .poll_send_with_default_options(data_len_u32, clock.now(), &counters.refs())
8011 .expect("generate segment");
8012 assert_eq!(seg.header().seq, TEST_ISS + 1);
8013 assert_eq!(seg.len(), data_len_u32);
8014 let expect_sampler = RttSampler::Tracking {
8015 range: (TEST_ISS + 1)..(TEST_ISS + 1 + seg.len()),
8016 timestamp: clock.now(),
8017 };
8018 assert_eq!(state.assert_established().snd.rtt_sampler, expect_sampler);
8019 clock.sleep(CLOCK_STEP);
8021 let seg = state
8022 .poll_send_with_default_options(data_len_u32, clock.now(), &counters.refs())
8023 .expect("generate segment");
8024 assert_eq!(seg.header().seq, TEST_ISS + 1 + data.len());
8025 assert_eq!(seg.len(), data_len_u32);
8026 let established = state.assert_established();
8028 assert_eq!(established.snd.rtt_sampler, expect_sampler);
8029
8030 assert_eq!(established.snd.rtt_estimator.srtt(), None);
8032
8033 let (retransmit, ack_number) = match scenario {
8034 RttTestScenario::AckPartial => (false, TEST_ISS + 1 + 1),
8035 RttTestScenario::AckOne => (false, TEST_ISS + 1 + data.len()),
8036 RttTestScenario::AckTwo => (false, TEST_ISS + 1 + data.len() * 2),
8037 RttTestScenario::Retransmit => (true, TEST_ISS + 1 + data.len() * 2),
8038 };
8039
8040 if retransmit {
8041 let timeout = state.poll_send_at().expect("timeout should be present");
8043 clock.time = timeout;
8044 let seg = state
8045 .poll_send_with_default_options(data_len_u32, clock.now(), &counters.refs())
8046 .expect("generate segment");
8047 assert_eq!(seg.header().seq, TEST_ISS + 1);
8049 assert_eq!(state.assert_established().snd.rtt_sampler, RttSampler::NotTracking);
8051 } else {
8052 clock.sleep(CLOCK_STEP);
8053 }
8054
8055 assert_eq!(
8056 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
8057 Segment::ack(
8058 TEST_IRS + 1,
8059 ack_number,
8060 WindowSize::DEFAULT >> WindowScale::default()
8061 ),
8062 clock.now(),
8063 &counters.refs()
8064 ),
8065 (None, None)
8066 );
8067 let established = state.assert_established();
8068 assert_eq!(established.snd.rtt_sampler, RttSampler::NotTracking);
8070 if retransmit {
8071 assert_eq!(established.snd.rtt_estimator.srtt(), None);
8073 } else {
8074 assert_eq!(established.snd.rtt_estimator.srtt(), Some(CLOCK_STEP * 2));
8077 }
8078
8079 clock.sleep(CLOCK_STEP);
8080 let seg = state
8081 .poll_send_with_default_options(data_len_u32, clock.now(), &counters.refs())
8082 .expect("generate segment");
8083 let seq = seg.header().seq;
8084 assert_eq!(seq, TEST_ISS + 1 + data.len() * 2);
8085 let expect_sampler =
8086 RttSampler::Tracking { range: seq..(seq + seg.len()), timestamp: clock.now() };
8087 assert_eq!(state.assert_established().snd.rtt_sampler, expect_sampler);
8088
8089 clock.sleep(CLOCK_STEP);
8093 assert_eq!(
8094 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
8095 Segment::ack(
8096 TEST_IRS + 1,
8097 TEST_ISS + 1 + data.len() * 2,
8098 WindowSize::DEFAULT >> WindowScale::default()
8099 ),
8100 clock.now(),
8101 &counters.refs()
8102 ),
8103 (None, None)
8104 );
8105 assert_eq!(state.assert_established().snd.rtt_sampler, expect_sampler);
8106 }
8107
8108 #[test]
8109 fn loss_recovery_skips_nagle() {
8110 let mut buffer = RingBuffer::default();
8111 let payload = "Hello World".as_bytes();
8112 assert_eq!(buffer.enqueue_data(payload), payload.len());
8113 let mut state = State::<_, _, _, ()>::Established(Established {
8114 snd: Send::default_for_test(buffer).into(),
8115 rcv: Recv::default_for_test(RingBuffer::default()).into(),
8116 });
8117
8118 let socket_options =
8119 SocketOptions { nagle_enabled: true, ..SocketOptions::default_for_state_tests() };
8120 let clock = FakeInstantCtx::default();
8121 let counters = FakeTcpCounters::default();
8122 let seg = state
8123 .poll_send(
8124 &FakeStateMachineDebugId,
8125 &counters.refs(),
8126 u32::MAX,
8127 clock.now(),
8128 &socket_options,
8129 )
8130 .expect("should not close");
8131 assert_eq!(seg.len(), u32::try_from(payload.len()).unwrap());
8132 let ack = Segment::ack(
8135 TEST_IRS + 1,
8136 seg.header().seq,
8137 WindowSize::DEFAULT >> WindowScale::default(),
8138 );
8139
8140 let mut dup_acks = 0;
8141 let seg = loop {
8142 let (seg, passive_open, data_acked, newly_closed) = state
8143 .on_segment::<(), ClientlessBufferProvider>(
8144 &FakeStateMachineDebugId,
8145 &counters.refs(),
8146 ack.clone(),
8147 clock.now(),
8148 &socket_options,
8149 false,
8150 );
8151 assert_eq!(seg, None);
8152 assert_eq!(passive_open, None);
8153 assert_eq!(data_acked, DataAcked::No);
8154 assert_eq!(newly_closed, NewlyClosed::No);
8155 dup_acks += 1;
8156
8157 match state.poll_send(
8158 &FakeStateMachineDebugId,
8159 &counters.refs(),
8160 u32::MAX,
8161 clock.now(),
8162 &socket_options,
8163 ) {
8164 Ok(seg) => break seg,
8165 Err(newly_closed) => {
8166 assert_eq!(newly_closed, NewlyClosed::No);
8167 assert!(
8168 dup_acks < DUP_ACK_THRESHOLD,
8169 "failed to retransmit after {dup_acks} dup acks"
8170 );
8171 }
8172 }
8173 };
8174 assert_eq!(seg.len(), u32::try_from(payload.len()).unwrap());
8175 assert_eq!(seg.header().seq, ack.header().ack.unwrap());
8176 }
8177
8178 #[test]
8179 fn sack_recovery_rearms_rto() {
8180 let mss = DEVICE_MAXIMUM_SEGMENT_SIZE;
8181 let una = TEST_ISS + 1;
8182 let nxt = una + (u32::from(DUP_ACK_THRESHOLD) + 1) * u32::from(mss);
8183
8184 let mut congestion_control = CongestionControl::cubic_with_mss(mss);
8185 congestion_control.inflate_cwnd(20 * u32::from(mss));
8187 let mut state = State::<_, _, _, ()>::Established(Established {
8188 snd: Send {
8189 nxt,
8190 max: nxt,
8191 una,
8192 wl1: TEST_IRS + 1,
8193 wl2: una,
8194 congestion_control,
8195 ..Send::default_for_test(InfiniteSendBuffer::default())
8196 }
8197 .into(),
8198 rcv: Recv::default_for_test(RingBuffer::default()).into(),
8199 });
8200
8201 let socket_options = SocketOptions::default_for_state_tests();
8202 const RTT: Duration = Duration::from_millis(1);
8203 let mut clock = FakeInstantCtx::default();
8204 let counters = FakeTcpCounters::default();
8205 let seg = state
8207 .poll_send(
8208 &FakeStateMachineDebugId,
8209 &counters.refs(),
8210 u32::MAX,
8211 clock.now(),
8212 &socket_options,
8213 )
8214 .expect("should not close");
8215 assert_eq!(seg.len(), u32::from(mss));
8216 let start_rto = assert_matches!(
8217 state.assert_established().snd.timer,
8218 Some(SendTimer::Retrans(RetransTimer{at, ..})) => at
8219 );
8220 clock.sleep(RTT);
8221
8222 let ack = Segment::ack_with_options(
8224 TEST_IRS + 1,
8225 una,
8226 WindowSize::DEFAULT >> WindowScale::default(),
8227 SegmentOptions {
8228 sack_blocks: [SackBlock::try_new(
8229 nxt - u32::from(DUP_ACK_THRESHOLD) * u32::from(mss),
8230 nxt,
8231 )
8232 .unwrap()]
8233 .into_iter()
8234 .collect(),
8235 }
8236 .into(),
8237 );
8238 let (seg, passive_open, data_acked, newly_closed) = state
8239 .on_segment::<(), ClientlessBufferProvider>(
8240 &FakeStateMachineDebugId,
8241 &counters.refs(),
8242 ack,
8243 clock.now(),
8244 &socket_options,
8245 false,
8246 );
8247 assert_eq!(seg, None);
8248 assert_eq!(passive_open, None);
8249 assert_eq!(data_acked, DataAcked::No);
8250 assert_eq!(newly_closed, NewlyClosed::No);
8251 assert_eq!(
8252 state.assert_established().snd.congestion_control.inspect_loss_recovery_mode(),
8253 Some(LossRecoveryMode::SackRecovery)
8254 );
8255
8256 let seg = state
8257 .poll_send(
8258 &FakeStateMachineDebugId,
8259 &counters.refs(),
8260 u32::MAX,
8261 clock.now(),
8262 &socket_options,
8263 )
8264 .expect("should not close");
8265 assert_eq!(seg.len(), u32::from(mss));
8266 assert_eq!(seg.header().seq, una);
8268 let new_rto = assert_matches!(
8270 state.assert_established().snd.timer,
8271 Some(SendTimer::Retrans(RetransTimer { at, .. })) => at
8272 );
8273 assert!(new_rto > start_rto, "{new_rto:?} > {start_rto:?}");
8274
8275 CounterExpectations {
8276 retransmits: 1,
8277 sack_recovery: 1,
8278 sack_retransmits: 1,
8279 dup_acks: 1,
8280 ..Default::default()
8281 }
8282 .assert_counters(&counters);
8283 }
8284
8285 enum SackPermitted {
8288 Yes,
8289 No,
8290 }
8291
8292 #[test_matrix(
8307 [1, 2, 3, 5, 20, 50],
8308 [SackPermitted::Yes, SackPermitted::No]
8309 )]
8310 fn congestion_window_limiting(theoretical_window: u32, sack_permitted: SackPermitted) {
8311 netstack3_base::testutil::set_logger_for_test();
8312
8313 let mss = DEVICE_MAXIMUM_SEGMENT_SIZE;
8314 let generate_sack = match sack_permitted {
8315 SackPermitted::Yes => true,
8316 SackPermitted::No => false,
8317 };
8318
8319 let theoretical_window = theoretical_window * u32::from(mss);
8322
8323 let snd_wnd = WindowSize::MAX;
8325 let wnd_scale = snd_wnd.scale();
8326 let buffer = InfiniteSendBuffer::default();
8328
8329 let mut state = State::<FakeInstant, _, _, ()>::Established(Established {
8330 snd: Send {
8331 wnd: snd_wnd,
8332 wnd_max: snd_wnd,
8333 wnd_scale,
8334 congestion_control: CongestionControl::cubic_with_mss(mss),
8335 ..Send::default_for_test(buffer)
8336 }
8337 .into(),
8338 rcv: Recv { mss, ..Recv::default_for_test(RingBuffer::default()) }.into(),
8339 });
8340
8341 let socket_options = SocketOptions::default_for_state_tests();
8342 let mut clock = FakeInstantCtx::default();
8343 let counters = FakeTcpCounters::default();
8344
8345 assert!(state.assert_established().snd.congestion_control.in_slow_start());
8346
8347 let poll_until_empty =
8348 |state: &mut State<_, _, _, _>, segments: &mut Vec<(SeqNum, u32)>, now| loop {
8349 match state.poll_send(
8350 &FakeStateMachineDebugId,
8351 &counters.refs(),
8352 u32::MAX,
8353 now,
8354 &socket_options,
8355 ) {
8356 Ok(seg) => {
8357 assert_eq!(seg.len(), u32::from(mss));
8360 segments.push((seg.header().seq, seg.len()));
8361 }
8362 Err(closed) => {
8363 assert_eq!(closed, NewlyClosed::No);
8364 break;
8365 }
8366 }
8367 };
8368
8369 let mut pending_segments = Vec::new();
8370 let mut pending_acks = Vec::new();
8371 let mut receiver = Assembler::new(TEST_ISS + 1);
8372 let mut total_sent = 0;
8373 let mut total_sent_rounds = 0;
8374
8375 let mut loops = 500;
8377 let mut continue_running = |state: &mut State<_, _, _, _>| {
8378 loops -= 1;
8379 assert!(loops > 0, "test seems to have stalled");
8380
8381 const CONGESTION_EVENTS: u64 = 10;
8384 let event_count =
8385 counters.stack_wide.timeouts.get() + counters.stack_wide.loss_recovered.get();
8386 let congestion_control = &state.assert_established().snd.congestion_control;
8387 event_count <= CONGESTION_EVENTS
8388 || congestion_control.inspect_loss_recovery_mode().is_some() || congestion_control.in_slow_start()
8391 };
8392
8393 while continue_running(&mut state) {
8394 clock.sleep(Duration::from_millis(10));
8399 if pending_acks.is_empty() {
8400 poll_until_empty(&mut state, &mut pending_segments, clock.now());
8401 } else {
8402 for (ack, sack_blocks) in pending_acks.drain(..) {
8403 let seg: Segment<()> = Segment::ack_with_options(
8404 TEST_IRS + 1,
8405 ack,
8406 snd_wnd >> wnd_scale,
8407 SegmentOptions { sack_blocks }.into(),
8408 );
8409 let (seg, passive_open, data_acked, newly_closed) = state
8410 .on_segment::<_, ClientlessBufferProvider>(
8411 &FakeStateMachineDebugId,
8412 &counters.refs(),
8413 seg,
8414 clock.now(),
8415 &socket_options,
8416 false,
8417 );
8418
8419 assert_eq!(seg, None);
8420 assert_eq!(passive_open, None);
8421 assert_eq!(newly_closed, NewlyClosed::No);
8422 let _: DataAcked = data_acked;
8426
8427 poll_until_empty(&mut state, &mut pending_segments, clock.now());
8428 }
8429 }
8430
8431 let established = state.assert_established();
8432 let congestion_control = &established.snd.congestion_control;
8433 let ssthresh = congestion_control.slow_start_threshold();
8434 let cwnd = congestion_control.inspect_cwnd().cwnd();
8435 let in_slow_start = congestion_control.in_slow_start();
8436 let in_loss_recovery = congestion_control.inspect_loss_recovery_mode().is_some();
8437 let pipe = congestion_control.pipe();
8438 let sent = u32::try_from(pending_segments.len()).unwrap() * u32::from(mss);
8439 let recovery_counters = if generate_sack {
8440 (
8441 counters.stack_wide.sack_retransmits.get(),
8442 counters.stack_wide.sack_recovery.get(),
8443 )
8444 } else {
8445 (
8446 counters.stack_wide.fast_retransmits.get(),
8447 counters.stack_wide.fast_recovery.get(),
8448 )
8449 };
8450
8451 if !in_slow_start {
8452 total_sent += sent;
8453 total_sent_rounds += 1;
8454 }
8455
8456 log::debug!(
8459 "ssthresh={ssthresh}, \
8460 cwnd={cwnd}, \
8461 sent={sent}, \
8462 pipe={pipe}, \
8463 in_slow_start={in_slow_start}, \
8464 in_loss_recovery={in_loss_recovery}, \
8465 total_retransmits={}, \
8466 (retransmits,recovery)={:?}",
8467 counters.stack_wide.retransmits.get(),
8468 recovery_counters,
8469 );
8470
8471 if pending_segments.is_empty() {
8472 assert_matches!(established.snd.timer, Some(SendTimer::Retrans(_)));
8473 clock.sleep_until(state.poll_send_at().expect("must have timeout"));
8475 log::debug!("RTO");
8476 continue;
8477 }
8478
8479 let mut available = theoretical_window;
8480 for (seq, len) in pending_segments.drain(..) {
8481 if available < len {
8483 break;
8484 }
8485 available -= len;
8486 if seq.after_or_eq(receiver.nxt()) {
8488 let _: usize = receiver.insert(seq..(seq + len));
8489 }
8490 let sack_blocks =
8491 if generate_sack { receiver.sack_blocks() } else { SackBlocks::default() };
8492 pending_acks.push((receiver.nxt(), sack_blocks));
8493 }
8494 }
8495
8496 let avg_sent = total_sent / total_sent_rounds;
8499 let tolerance = (theoretical_window / 3).max(u32::from(mss));
8505 let low_range = theoretical_window - tolerance;
8506 let high_range = theoretical_window + tolerance;
8507 assert!(
8508 avg_sent >= low_range && avg_sent <= high_range,
8509 "{low_range} <= {avg_sent} <= {high_range}"
8510 );
8511 }
8512
8513 #[test]
8516 fn out_of_order_ack_with_sack_blocks() {
8517 let send_segments = u32::from(DUP_ACK_THRESHOLD + 2);
8518 let mss = DEVICE_MAXIMUM_SEGMENT_SIZE;
8519
8520 let send_bytes = send_segments * u32::from(mss);
8521 let snd_wnd = WindowSize::from_u32(send_bytes).unwrap();
8523 let wnd_scale = snd_wnd.scale();
8524 let mut congestion_control = CongestionControl::cubic_with_mss(mss);
8525 congestion_control.inflate_cwnd(snd_wnd.into());
8526
8527 let start = TEST_ISS + 1;
8528
8529 let mut state = State::<FakeInstant, _, _, ()>::Established(Established {
8530 snd: Send {
8531 congestion_control,
8532 wnd_scale,
8533 wnd: snd_wnd,
8534 wnd_max: snd_wnd,
8535 ..Send::default_for_test_at(
8536 start,
8537 RepeatingSendBuffer::new(usize::try_from(send_bytes).unwrap()),
8538 )
8539 }
8540 .into(),
8541 rcv: Recv::default_for_test(RingBuffer::default()).into(),
8542 });
8543 let socket_options = SocketOptions::default_for_state_tests();
8544 let clock = FakeInstantCtx::default();
8545 let counters = FakeTcpCounters::default();
8546 let sent = core::iter::from_fn(|| {
8547 match state.poll_send(
8548 &FakeStateMachineDebugId,
8549 &counters.refs(),
8550 u32::MAX,
8551 clock.now(),
8552 &socket_options,
8553 ) {
8554 Ok(seg) => Some(seg.len()),
8555 Err(newly_closed) => {
8556 assert_eq!(newly_closed, NewlyClosed::No);
8557 None
8558 }
8559 }
8560 })
8561 .sum::<u32>();
8562 assert_eq!(sent, send_segments * u32::from(mss));
8563
8564 let end = state.assert_established().snd.nxt;
8566 let seg = Segment::<()>::ack(TEST_IRS + 1, end, snd_wnd >> wnd_scale);
8567 assert_eq!(
8568 state.on_segment::<_, ClientlessBufferProvider>(
8569 &FakeStateMachineDebugId,
8570 &counters.refs(),
8571 seg,
8572 clock.now(),
8573 &socket_options,
8574 false
8575 ),
8576 (None, None, DataAcked::Yes, NewlyClosed::No)
8577 );
8578 assert_eq!(state.assert_established().snd.congestion_control.pipe(), 0);
8579 assert_matches!(
8580 state.poll_send(
8581 &FakeStateMachineDebugId,
8582 &counters.refs(),
8583 u32::MAX,
8584 clock.now(),
8585 &socket_options
8586 ),
8587 Err(NewlyClosed::No)
8588 );
8589
8590 let sack_block = SackBlock::try_new(
8593 start + u32::from(mss),
8594 start + u32::from(DUP_ACK_THRESHOLD + 1) * u32::from(mss),
8595 )
8596 .unwrap();
8597 assert!(sack_block.right().before(end));
8598 let seg = Segment::<()>::ack_with_options(
8599 TEST_IRS + 1,
8600 start,
8601 snd_wnd >> wnd_scale,
8602 SegmentOptions { sack_blocks: [sack_block].into_iter().collect() }.into(),
8603 );
8604 assert_eq!(
8605 state.on_segment::<_, ClientlessBufferProvider>(
8606 &FakeStateMachineDebugId,
8607 &counters.refs(),
8608 seg,
8609 clock.now(),
8610 &socket_options,
8611 false
8612 ),
8613 (None, None, DataAcked::No, NewlyClosed::No)
8614 );
8615 let congestion_control = &state.assert_established().snd.congestion_control;
8618 assert_eq!(congestion_control.pipe(), 0);
8619 assert_eq!(congestion_control.inspect_loss_recovery_mode(), None);
8620 assert_matches!(
8622 state.poll_send(
8623 &FakeStateMachineDebugId,
8624 &counters.refs(),
8625 u32::MAX,
8626 clock.now(),
8627 &socket_options
8628 ),
8629 Err(NewlyClosed::No)
8630 );
8631 }
8632
8633 #[test]
8634 fn push_segments() {
8635 let send_segments = 16;
8636 let mss = DEVICE_MAXIMUM_SEGMENT_SIZE;
8637 let send_bytes = send_segments * u32::from(mss);
8638 let snd_wnd = WindowSize::from_u32(4 * u32::from(mss)).unwrap();
8641 let wnd_scale = snd_wnd.scale();
8642 let mut state = State::<FakeInstant, _, _, ()>::Established(Established {
8643 snd: Send {
8644 congestion_control: CongestionControl::cubic_with_mss(mss),
8645 wnd_scale,
8646 wnd: snd_wnd,
8647 wnd_max: snd_wnd,
8648 ..Send::default_for_test(RepeatingSendBuffer::new(
8649 usize::try_from(send_bytes).unwrap(),
8650 ))
8651 }
8652 .into(),
8653 rcv: Recv::default_for_test(RingBuffer::default()).into(),
8654 });
8655 let socket_options = SocketOptions::default_for_state_tests();
8656 let clock = FakeInstantCtx::default();
8657 let counters = FakeTcpCounters::default();
8658
8659 for i in 0..send_segments {
8660 let seg = state
8661 .poll_send(
8662 &FakeStateMachineDebugId,
8663 &counters.refs(),
8664 u32::MAX,
8665 clock.now(),
8666 &socket_options,
8667 )
8668 .expect("produces segment");
8669 let is_last = i == (send_segments - 1);
8670 let is_periodic = i != 0 && i % 2 == 0;
8674 assert!(!(is_last && is_periodic));
8677 assert_eq!(seg.header().push, is_last || is_periodic, "at {i}");
8678 let ack =
8679 Segment::ack(TEST_IRS + 1, seg.header().seq + seg.len(), snd_wnd >> wnd_scale);
8680 assert_eq!(
8681 state.on_segment::<(), ClientlessBufferProvider>(
8682 &FakeStateMachineDebugId,
8683 &counters.refs(),
8684 ack,
8685 clock.now(),
8686 &socket_options,
8687 false
8688 ),
8689 (None, None, DataAcked::Yes, NewlyClosed::No)
8690 );
8691 }
8692 }
8693}