1use core::convert::{Infallible, TryFrom as _};
11use core::fmt::Debug;
12use core::num::{NonZeroU32, NonZeroU8, NonZeroUsize, TryFromIntError};
13use core::ops::{Deref, DerefMut};
14use core::time::Duration;
15
16use assert_matches::assert_matches;
17use derivative::Derivative;
18use explicit::ResultExt as _;
19use netstack3_base::{
20 Control, HandshakeOptions, IcmpErrorCode, Instant, Mss, Options, Payload, PayloadLen as _,
21 SackBlocks, Segment, SegmentHeader, SegmentOptions, SeqNum, UnscaledWindowSize, WindowScale,
22 WindowSize,
23};
24use netstack3_trace::{trace_instant, TraceResourceId};
25use packet_formats::utils::NonZeroDuration;
26use replace_with::{replace_with, replace_with_and};
27
28use crate::internal::base::{
29 BufferSizes, BuffersRefMut, ConnectionError, KeepAlive, SocketOptions,
30};
31use crate::internal::buffer::{Assembler, BufferLimits, IntoBuffers, ReceiveBuffer, SendBuffer};
32use crate::internal::congestion::CongestionControl;
33use crate::internal::counters::TcpCountersRefs;
34use crate::internal::rtt::{Estimator, Rto, RttSampler};
35
36pub(super) const MSL: Duration = Duration::from_secs(2 * 60);
41
42const DEFAULT_MAX_RETRIES: NonZeroU8 = NonZeroU8::new(15).unwrap();
49
50pub(super) const DEFAULT_MAX_SYN_RETRIES: NonZeroU8 = NonZeroU8::new(6).unwrap();
53const DEFAULT_MAX_SYNACK_RETRIES: NonZeroU8 = NonZeroU8::new(5).unwrap();
54
55const ACK_DELAY_THRESHOLD: Duration = Duration::from_millis(40);
63const SWS_PROBE_TIMEOUT: Duration = Duration::from_millis(100);
70const SWS_BUFFER_FACTOR: u32 = 2;
74
75const SACK_PERMITTED: bool = false;
78
79pub(crate) trait StateMachineDebugId: Debug {
84 fn trace_id(&self) -> TraceResourceId<'_>;
85}
86
87#[derive(Debug)]
100#[cfg_attr(test, derive(PartialEq, Eq))]
101pub struct Closed<Error> {
102 pub(crate) reason: Error,
104}
105
106pub(crate) enum Initial {}
109
110impl Closed<Initial> {
111 pub(crate) fn connect<I: Instant, ActiveOpen>(
117 iss: SeqNum,
118 now: I,
119 active_open: ActiveOpen,
120 buffer_sizes: BufferSizes,
121 device_mss: Mss,
122 default_mss: Mss,
123 SocketOptions {
124 keep_alive: _,
125 nagle_enabled: _,
126 user_timeout,
127 delayed_ack: _,
128 fin_wait2_timeout: _,
129 max_syn_retries,
130 ip_options: _,
131 }: &SocketOptions,
132 ) -> (SynSent<I, ActiveOpen>, Segment<()>) {
133 let rcv_wnd_scale = buffer_sizes.rwnd().scale();
134 let rwnd = buffer_sizes.rwnd_unscaled();
138 (
139 SynSent {
140 iss,
141 timestamp: Some(now),
142 retrans_timer: RetransTimer::new(
143 now,
144 Rto::DEFAULT,
145 *user_timeout,
146 *max_syn_retries,
147 ),
148 active_open,
149 buffer_sizes,
150 device_mss,
151 default_mss,
152 rcv_wnd_scale,
153 },
154 Segment::syn(
155 iss,
156 rwnd,
157 HandshakeOptions {
158 mss: Some(device_mss),
159 window_scale: Some(rcv_wnd_scale),
160 sack_permitted: SACK_PERMITTED,
161 }
162 .into(),
163 ),
164 )
165 }
166
167 pub(crate) fn listen(
168 iss: SeqNum,
169 buffer_sizes: BufferSizes,
170 device_mss: Mss,
171 default_mss: Mss,
172 user_timeout: Option<NonZeroDuration>,
173 ) -> Listen {
174 Listen { iss, buffer_sizes, device_mss, default_mss, user_timeout }
175 }
176}
177
178impl<Error> Closed<Error> {
179 pub(crate) fn on_segment(&self, segment: &Segment<impl Payload>) -> Option<Segment<()>> {
183 let segment_len = segment.len();
184 let Segment {
185 header: SegmentHeader { seq: seg_seq, ack: seg_ack, wnd: _, control, options: _ },
186 data: _,
187 } = segment;
188
189 if *control == Some(Control::RST) {
203 return None;
204 }
205 Some(match seg_ack {
206 Some(seg_ack) => Segment::rst(*seg_ack),
207 None => Segment::rst_ack(SeqNum::from(0), *seg_seq + segment_len),
208 })
209 }
210}
211
212#[derive(Debug)]
226#[cfg_attr(test, derive(PartialEq, Eq))]
227pub struct Listen {
228 iss: SeqNum,
229 buffer_sizes: BufferSizes,
230 device_mss: Mss,
231 default_mss: Mss,
232 user_timeout: Option<NonZeroDuration>,
233}
234
235#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
237enum ListenOnSegmentDisposition<I: Instant> {
238 SendSynAckAndEnterSynRcvd(Segment<()>, SynRcvd<I, Infallible>),
239 SendRst(Segment<()>),
240 Ignore,
241}
242
243impl Listen {
244 fn on_segment<I: Instant>(
245 &self,
246 Segment { header: SegmentHeader { seq, ack, wnd: _, control, options }, data: _ }: Segment<
247 impl Payload,
248 >,
249 now: I,
250 ) -> ListenOnSegmentDisposition<I> {
251 let Listen { iss, buffer_sizes, device_mss, default_mss, user_timeout } = *self;
252 let smss = options.mss().unwrap_or(default_mss).min(device_mss);
253 if control == Some(Control::RST) {
257 return ListenOnSegmentDisposition::Ignore;
258 }
259 if let Some(ack) = ack {
260 return ListenOnSegmentDisposition::SendRst(Segment::rst(ack));
269 }
270 if control == Some(Control::SYN) {
271 let sack_permitted = options.sack_permitted();
272 let rcv_wnd_scale = buffer_sizes.rwnd().scale();
286 let rwnd = buffer_sizes.rwnd_unscaled();
290 return ListenOnSegmentDisposition::SendSynAckAndEnterSynRcvd(
291 Segment::syn_ack(
292 iss,
293 seq + 1,
294 rwnd,
295 HandshakeOptions {
296 mss: Some(smss),
297 window_scale: options.window_scale().map(|_| rcv_wnd_scale),
302 sack_permitted: SACK_PERMITTED,
303 }
304 .into(),
305 ),
306 SynRcvd {
307 iss,
308 irs: seq,
309 timestamp: Some(now),
310 retrans_timer: RetransTimer::new(
311 now,
312 Rto::DEFAULT,
313 user_timeout,
314 DEFAULT_MAX_SYNACK_RETRIES,
315 ),
316 simultaneous_open: None,
317 buffer_sizes,
318 smss,
319 rcv_wnd_scale,
320 snd_wnd_scale: options.window_scale(),
321 sack_permitted,
322 },
323 );
324 }
325 ListenOnSegmentDisposition::Ignore
326 }
327}
328
329#[derive(Debug)]
343#[cfg_attr(test, derive(PartialEq, Eq))]
344pub struct SynSent<I, ActiveOpen> {
345 iss: SeqNum,
346 timestamp: Option<I>,
350 retrans_timer: RetransTimer<I>,
351 active_open: ActiveOpen,
352 buffer_sizes: BufferSizes,
353 device_mss: Mss,
354 default_mss: Mss,
355 rcv_wnd_scale: WindowScale,
356}
357
358#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
360enum SynSentOnSegmentDisposition<I: Instant, ActiveOpen> {
361 SendAckAndEnterEstablished(Established<I, (), ()>),
362 SendSynAckAndEnterSynRcvd(Segment<()>, SynRcvd<I, ActiveOpen>),
363 SendRst(Segment<()>),
364 EnterClosed(Closed<Option<ConnectionError>>),
365 Ignore,
366}
367
368impl<I: Instant + 'static, ActiveOpen> SynSent<I, ActiveOpen> {
369 fn on_segment(
375 &self,
376 Segment {
377 header: SegmentHeader { seq: seg_seq, ack: seg_ack, wnd: seg_wnd, control, options },
378 data: _,
379 }: Segment<impl Payload>,
380 now: I,
381 ) -> SynSentOnSegmentDisposition<I, ActiveOpen> {
382 let SynSent {
383 iss,
384 timestamp: syn_sent_ts,
385 retrans_timer: RetransTimer { user_timeout_until, remaining_retries: _, at: _, rto: _ },
386 active_open: _,
387 buffer_sizes,
388 device_mss,
389 default_mss,
390 rcv_wnd_scale,
391 } = *self;
392 let has_ack = match seg_ack {
401 Some(ack) => {
402 if ack.before(iss) || ack.after(iss + 1) {
405 return if control == Some(Control::RST) {
406 SynSentOnSegmentDisposition::Ignore
407 } else {
408 SynSentOnSegmentDisposition::SendRst(Segment::rst(ack))
409 };
410 }
411 true
412 }
413 None => false,
414 };
415
416 match control {
417 Some(Control::RST) => {
418 if has_ack {
426 SynSentOnSegmentDisposition::EnterClosed(Closed {
427 reason: Some(ConnectionError::ConnectionRefused),
428 })
429 } else {
430 SynSentOnSegmentDisposition::Ignore
431 }
432 }
433 Some(Control::SYN) => {
434 let smss = options.mss().unwrap_or(default_mss).min(device_mss);
435 let sack_permitted = options.sack_permitted();
436 match seg_ack {
441 Some(seg_ack) => {
442 if seg_ack.after(iss) {
460 let irs = seg_seq;
461 let mut rtt_estimator = Estimator::default();
462 if let Some(syn_sent_ts) = syn_sent_ts {
463 rtt_estimator.sample(now.saturating_duration_since(syn_sent_ts));
464 }
465 let (rcv_wnd_scale, snd_wnd_scale) = options
466 .window_scale()
467 .map(|snd_wnd_scale| (rcv_wnd_scale, snd_wnd_scale))
468 .unwrap_or_default();
469 let established = Established {
470 snd: Send {
471 nxt: iss + 1,
472 max: iss + 1,
473 una: seg_ack,
474 wnd: seg_wnd << WindowScale::default(),
476 wl1: seg_seq,
477 wl2: seg_ack,
478 buffer: (),
479 rtt_sampler: RttSampler::default(),
480 rtt_estimator,
481 timer: None,
482 congestion_control: CongestionControl::cubic_with_mss(smss),
483 wnd_scale: snd_wnd_scale,
484 wnd_max: seg_wnd << WindowScale::default(),
485 }
486 .into(),
487 rcv: Recv {
488 buffer: RecvBufferState::Open {
489 buffer: (),
490 assembler: Assembler::new(irs + 1),
491 },
492 remaining_quickacks: quickack_counter(
493 buffer_sizes.rcv_limits(),
494 smss,
495 ),
496 last_segment_at: None,
497 timer: None,
498 mss: smss,
499 wnd_scale: rcv_wnd_scale,
500 last_window_update: (irs + 1, buffer_sizes.rwnd()),
501 sack_permitted,
502 }
503 .into(),
504 };
505 SynSentOnSegmentDisposition::SendAckAndEnterEstablished(established)
506 } else {
507 SynSentOnSegmentDisposition::Ignore
508 }
509 }
510 None => {
511 if user_timeout_until.is_none_or(|t| now < t) {
512 let rcv_wnd_scale = buffer_sizes.rwnd().scale();
520 let rwnd = buffer_sizes.rwnd_unscaled();
525 SynSentOnSegmentDisposition::SendSynAckAndEnterSynRcvd(
526 Segment::syn_ack(
527 iss,
528 seg_seq + 1,
529 rwnd,
530 HandshakeOptions {
531 mss: Some(smss),
532 window_scale: options.window_scale().map(|_| rcv_wnd_scale),
533 sack_permitted: SACK_PERMITTED,
534 }
535 .into(),
536 ),
537 SynRcvd {
538 iss,
539 irs: seg_seq,
540 timestamp: Some(now),
541 retrans_timer: RetransTimer::new_with_user_deadline(
542 now,
543 Rto::DEFAULT,
544 user_timeout_until,
545 DEFAULT_MAX_SYNACK_RETRIES,
546 ),
547 simultaneous_open: None,
549 buffer_sizes,
550 smss,
551 rcv_wnd_scale,
552 snd_wnd_scale: options.window_scale(),
553 sack_permitted,
554 },
555 )
556 } else {
557 SynSentOnSegmentDisposition::EnterClosed(Closed { reason: None })
558 }
559 }
560 }
561 }
562 Some(Control::FIN) | None => SynSentOnSegmentDisposition::Ignore,
566 }
567 }
568}
569
570#[derive(Debug)]
585#[cfg_attr(test, derive(PartialEq, Eq))]
586pub struct SynRcvd<I, ActiveOpen> {
587 iss: SeqNum,
588 irs: SeqNum,
589 timestamp: Option<I>,
593 retrans_timer: RetransTimer<I>,
594 simultaneous_open: Option<ActiveOpen>,
598 buffer_sizes: BufferSizes,
599 smss: Mss,
603 rcv_wnd_scale: WindowScale,
604 snd_wnd_scale: Option<WindowScale>,
605 sack_permitted: bool,
606}
607
608impl<I: Instant, R: ReceiveBuffer, S: SendBuffer, ActiveOpen> From<SynRcvd<I, Infallible>>
609 for State<I, R, S, ActiveOpen>
610{
611 fn from(
612 SynRcvd {
613 iss,
614 irs,
615 timestamp,
616 retrans_timer,
617 simultaneous_open,
618 buffer_sizes,
619 smss,
620 rcv_wnd_scale,
621 snd_wnd_scale,
622 sack_permitted,
623 }: SynRcvd<I, Infallible>,
624 ) -> Self {
625 match simultaneous_open {
626 None => State::SynRcvd(SynRcvd {
627 iss,
628 irs,
629 timestamp,
630 retrans_timer,
631 simultaneous_open: None,
632 buffer_sizes,
633 smss,
634 rcv_wnd_scale,
635 snd_wnd_scale,
636 sack_permitted,
637 }),
638 }
639 }
640}
641enum FinQueued {}
642
643impl FinQueued {
644 const YES: bool = true;
648 const NO: bool = false;
649}
650
651#[derive(Derivative)]
653#[derivative(Debug)]
654#[cfg_attr(test, derivative(PartialEq, Eq))]
655pub(crate) struct Send<I, S, const FIN_QUEUED: bool> {
656 nxt: SeqNum,
657 pub(crate) max: SeqNum,
658 una: SeqNum,
659 wnd: WindowSize,
660 wnd_scale: WindowScale,
661 wnd_max: WindowSize,
662 wl1: SeqNum,
663 wl2: SeqNum,
664 rtt_sampler: RttSampler<I>,
665 rtt_estimator: Estimator,
666 timer: Option<SendTimer<I>>,
667 #[derivative(PartialEq = "ignore")]
668 congestion_control: CongestionControl<I>,
669 buffer: S,
670}
671
672impl<I> Send<I, (), false> {
673 fn with_buffer<S>(self, buffer: S) -> Send<I, S, false> {
674 let Self {
675 nxt,
676 max,
677 una,
678 wnd,
679 wnd_scale,
680 wnd_max,
681 wl1,
682 wl2,
683 rtt_sampler,
684 rtt_estimator,
685 timer,
686 congestion_control,
687 buffer: _,
688 } = self;
689 Send {
690 nxt,
691 max,
692 una,
693 wnd,
694 wnd_scale,
695 wnd_max,
696 wl1,
697 wl2,
698 rtt_sampler,
699 rtt_estimator,
700 timer,
701 congestion_control,
702 buffer,
703 }
704 }
705}
706
707#[derive(Debug, Clone, Copy)]
708#[cfg_attr(test, derive(PartialEq, Eq))]
709struct RetransTimer<I> {
710 user_timeout_until: Option<I>,
711 remaining_retries: Option<NonZeroU8>,
712 at: I,
713 rto: Rto,
714}
715
716impl<I: Instant> RetransTimer<I> {
717 fn new(
718 now: I,
719 rto: Rto,
720 user_timeout: Option<NonZeroDuration>,
721 max_retries: NonZeroU8,
722 ) -> Self {
723 let user_timeout_until = user_timeout.map(|t| now.saturating_add(t.get()));
724 Self::new_with_user_deadline(now, rto, user_timeout_until, max_retries)
725 }
726
727 fn new_with_user_deadline(
728 now: I,
729 rto: Rto,
730 user_timeout_until: Option<I>,
731 max_retries: NonZeroU8,
732 ) -> Self {
733 let rto_at = now.panicking_add(rto.get());
734 let at = user_timeout_until.map(|i| i.min(rto_at)).unwrap_or(rto_at);
735 Self { at, rto, user_timeout_until, remaining_retries: Some(max_retries) }
736 }
737
738 fn backoff(&mut self, now: I) {
739 let Self { at, rto, user_timeout_until, remaining_retries } = self;
740 *remaining_retries = remaining_retries.and_then(|r| NonZeroU8::new(r.get() - 1));
741 *rto = rto.double();
742 let rto_at = now.panicking_add(rto.get());
743 *at = user_timeout_until.map(|i| i.min(rto_at)).unwrap_or(rto_at);
744 }
745
746 fn timed_out(&self, now: I) -> bool {
747 let RetransTimer { user_timeout_until, remaining_retries, at, rto: _ } = self;
748 (remaining_retries.is_none() && now >= *at) || user_timeout_until.is_some_and(|t| now >= t)
749 }
750}
751
752#[derive(Debug, Clone, Copy)]
754#[cfg_attr(test, derive(PartialEq, Eq))]
755enum SendTimer<I> {
756 Retrans(RetransTimer<I>),
759 KeepAlive(KeepAliveTimer<I>),
762 ZeroWindowProbe(RetransTimer<I>),
770 SWSProbe { at: I },
778}
779
780#[derive(Debug, Clone, Copy)]
781#[cfg_attr(test, derive(PartialEq, Eq))]
782enum ReceiveTimer<I> {
783 DelayedAck { at: I },
784}
785
786#[derive(Debug, Clone, Copy)]
787#[cfg_attr(test, derive(PartialEq, Eq))]
788struct KeepAliveTimer<I> {
789 at: I,
790 already_sent: u8,
791}
792
793impl<I: Instant> KeepAliveTimer<I> {
794 fn idle(now: I, keep_alive: &KeepAlive) -> Self {
795 let at = now.saturating_add(keep_alive.idle.into());
796 Self { at, already_sent: 0 }
797 }
798}
799
800impl<I: Instant> SendTimer<I> {
801 fn expiry(&self) -> I {
802 match self {
803 SendTimer::Retrans(RetransTimer {
804 at,
805 rto: _,
806 user_timeout_until: _,
807 remaining_retries: _,
808 })
809 | SendTimer::KeepAlive(KeepAliveTimer { at, already_sent: _ })
810 | SendTimer::ZeroWindowProbe(RetransTimer {
811 at,
812 rto: _,
813 user_timeout_until: _,
814 remaining_retries: _,
815 }) => *at,
816 SendTimer::SWSProbe { at } => *at,
817 }
818 }
819}
820
821impl<I: Instant> ReceiveTimer<I> {
822 fn expiry(&self) -> I {
823 match self {
824 ReceiveTimer::DelayedAck { at } => *at,
825 }
826 }
827}
828
829#[derive(Debug)]
831#[cfg_attr(test, derive(PartialEq, Eq))]
832enum RecvBufferState<R> {
833 Open { buffer: R, assembler: Assembler },
834 Closed { buffer_size: usize, nxt: SeqNum },
835}
836
837impl<R: ReceiveBuffer> RecvBufferState<R> {
838 fn is_closed(&self) -> bool {
839 matches!(self, Self::Closed { .. })
840 }
841
842 fn has_out_of_order(&self) -> bool {
843 match self {
844 Self::Open { assembler, .. } => assembler.has_out_of_order(),
845 Self::Closed { .. } => false,
846 }
847 }
848
849 fn close(&mut self) {
850 let new_state = match self {
851 Self::Open { buffer, assembler } => {
852 Self::Closed { nxt: assembler.nxt(), buffer_size: buffer.limits().capacity }
853 }
854 Self::Closed { .. } => return,
855 };
856 *self = new_state;
857 }
858
859 fn limits(&self) -> BufferLimits {
860 match self {
861 RecvBufferState::Open { buffer, .. } => buffer.limits(),
862 RecvBufferState::Closed { buffer_size, .. } => {
863 BufferLimits { capacity: *buffer_size, len: 0 }
864 }
865 }
866 }
867}
868
869fn quickack_counter(rcv_limits: BufferLimits, mss: Mss) -> usize {
871 const MIN_QUICKACK: usize = 2;
873 const MAX_QUICKACK: usize = 32;
877
878 let BufferLimits { capacity, len } = rcv_limits;
879 let window = capacity - len;
880 (window / (2 * usize::from(mss))).clamp(MIN_QUICKACK, MAX_QUICKACK)
891}
892
893#[derive(Debug)]
895#[cfg_attr(test, derive(PartialEq, Eq))]
896pub(crate) struct Recv<I, R> {
897 timer: Option<ReceiveTimer<I>>,
898 mss: Mss,
899 wnd_scale: WindowScale,
900 last_window_update: (SeqNum, WindowSize),
901 remaining_quickacks: usize,
902 last_segment_at: Option<I>,
903 sack_permitted: bool,
906
907 buffer: RecvBufferState<R>,
909}
910
911impl<I> Recv<I, ()> {
912 fn with_buffer<R>(self, buffer: R) -> Recv<I, R> {
913 let Self {
914 timer,
915 mss,
916 wnd_scale,
917 last_window_update,
918 buffer: old_buffer,
919 remaining_quickacks,
920 last_segment_at,
921 sack_permitted,
922 } = self;
923 let nxt = match old_buffer {
924 RecvBufferState::Open { assembler, .. } => assembler.nxt(),
925 RecvBufferState::Closed { .. } => unreachable!(),
926 };
927 Recv {
928 timer,
929 mss,
930 wnd_scale,
931 last_window_update,
932 remaining_quickacks,
933 last_segment_at,
934 buffer: RecvBufferState::Open { buffer, assembler: Assembler::new(nxt) },
935 sack_permitted,
936 }
937 }
938}
939
940impl<I, R> Recv<I, R> {
941 fn sack_blocks(&self) -> SackBlocks {
942 if self.sack_permitted {
943 match &self.buffer {
944 RecvBufferState::Open { buffer: _, assembler } => assembler.sack_blocks(),
945 RecvBufferState::Closed { buffer_size: _, nxt: _ } => SackBlocks::default(),
946 }
947 } else {
948 SackBlocks::default()
950 }
951 }
952}
953
954struct WindowSizeCalculation {
956 rcv_nxt: SeqNum,
959 window_size: WindowSize,
961 threshold: usize,
967}
968
969impl<I: Instant, R: ReceiveBuffer> Recv<I, R> {
970 fn calculate_window_size(&self) -> WindowSizeCalculation {
972 let rcv_nxt = self.nxt();
973 let Self {
974 buffer,
975 timer: _,
976 mss,
977 wnd_scale: _,
978 last_window_update: (rcv_wup, last_wnd),
979 remaining_quickacks: _,
980 last_segment_at: _,
981 sack_permitted: _,
982 } = self;
983
984 let BufferLimits { capacity, len } = buffer.limits();
996
997 let unused_window = u32::try_from(*rcv_wup + *last_wnd - rcv_nxt).unwrap_or(0);
1003 let unused_window = WindowSize::from_u32(unused_window).unwrap_or(WindowSize::MAX);
1005
1006 let reduction = capacity.saturating_sub(len.saturating_add(usize::from(unused_window)));
1010 let threshold = usize::min(capacity / 2, usize::from(mss.get().get()));
1011 let window_size = if reduction >= threshold {
1012 WindowSize::new(capacity - len).unwrap_or(WindowSize::MAX)
1014 } else {
1015 unused_window
1018 };
1019 WindowSizeCalculation { rcv_nxt, window_size, threshold }
1020 }
1021
1022 fn poll_receive_data_dequeued(&mut self, snd_max: SeqNum) -> Option<Segment<()>> {
1025 let WindowSizeCalculation { rcv_nxt, window_size: calculated_window_size, threshold } =
1026 self.calculate_window_size();
1027 let (rcv_wup, last_window_size) = self.last_window_update;
1028
1029 let rcv_diff = rcv_nxt - rcv_wup;
1035 debug_assert!(rcv_diff >= 0, "invalid RCV.NXT change: {rcv_nxt:?} < {rcv_wup:?}");
1036 let last_window_size =
1037 last_window_size.saturating_sub(usize::try_from(rcv_diff).unwrap_or(0));
1038
1039 let effective_window_size = (calculated_window_size >> self.wnd_scale) << self.wnd_scale;
1044 let effective_window_size_usize: usize = effective_window_size.into();
1047 let last_window_size_usize: usize = last_window_size.into();
1048
1049 if last_window_size_usize < threshold && effective_window_size_usize >= threshold {
1055 self.last_window_update = (rcv_nxt, calculated_window_size);
1056 self.timer = None;
1059 Some(Segment::ack(snd_max, self.nxt(), calculated_window_size >> self.wnd_scale))
1060 } else {
1061 None
1062 }
1063 }
1064
1065 pub(crate) fn nxt(&self) -> SeqNum {
1066 match &self.buffer {
1067 RecvBufferState::Open { assembler, .. } => assembler.nxt(),
1068 RecvBufferState::Closed { nxt, .. } => *nxt,
1069 }
1070 }
1071
1072 fn poll_send(&mut self, snd_max: SeqNum, now: I) -> Option<Segment<()>> {
1073 match self.timer {
1074 Some(ReceiveTimer::DelayedAck { at }) => (at <= now).then(|| {
1075 self.timer = None;
1076 self.make_ack(snd_max)
1077 }),
1078 None => None,
1079 }
1080 }
1081
1082 fn handle_fin(&self) -> RecvParams {
1084 let WindowSizeCalculation { rcv_nxt, window_size, threshold: _ } =
1085 self.calculate_window_size();
1086 RecvParams {
1087 ack: rcv_nxt + 1,
1088 wnd: window_size.checked_sub(1).unwrap_or(WindowSize::ZERO),
1089 wnd_scale: self.wnd_scale,
1090 }
1091 }
1092
1093 fn reset_quickacks(&mut self) {
1094 let Self {
1095 timer: _,
1096 mss,
1097 wnd_scale: _,
1098 last_window_update: _,
1099 remaining_quickacks,
1100 buffer,
1101 last_segment_at: _,
1102 sack_permitted: _,
1103 } = self;
1104 let new_remaining = quickack_counter(buffer.limits(), *mss);
1105 *remaining_quickacks = new_remaining.max(*remaining_quickacks);
1107 }
1108}
1109
1110impl<'a, I: Instant, R: ReceiveBuffer> RecvSegmentArgumentsProvider for &'a mut Recv<I, R> {
1111 fn take_rcv_segment_args(self) -> (SeqNum, UnscaledWindowSize, SackBlocks) {
1112 let WindowSizeCalculation { rcv_nxt, window_size, threshold: _ } =
1113 self.calculate_window_size();
1114 self.last_window_update = (rcv_nxt, window_size);
1115 (rcv_nxt, window_size >> self.wnd_scale, self.sack_blocks())
1116 }
1117}
1118
1119#[derive(Debug, Clone)]
1122#[cfg_attr(test, derive(PartialEq, Eq))]
1123pub(super) struct RecvParams {
1124 pub(super) ack: SeqNum,
1125 pub(super) wnd_scale: WindowScale,
1126 pub(super) wnd: WindowSize,
1127}
1128
1129impl<'a> RecvSegmentArgumentsProvider for &'a RecvParams {
1130 fn take_rcv_segment_args(self) -> (SeqNum, UnscaledWindowSize, SackBlocks) {
1131 (self.ack, self.wnd >> self.wnd_scale, SackBlocks::default())
1132 }
1133}
1134
1135struct CalculatedRecvParams<'a, I, R> {
1138 params: RecvParams,
1139 recv: Option<&'a mut Recv<I, R>>,
1140}
1141
1142impl<'a, I, R> CalculatedRecvParams<'a, I, R> {
1143 fn from_params(params: RecvParams) -> Self {
1148 Self { params, recv: None }
1149 }
1150
1151 fn nxt(&self) -> SeqNum {
1152 self.params.ack
1153 }
1154
1155 fn wnd(&self) -> WindowSize {
1156 self.params.wnd
1157 }
1158}
1159
1160impl<'a, I, R> RecvSegmentArgumentsProvider for CalculatedRecvParams<'a, I, R> {
1161 fn take_rcv_segment_args(self) -> (SeqNum, UnscaledWindowSize, SackBlocks) {
1162 let Self { params, recv } = self;
1163 let RecvParams { ack, wnd_scale, wnd } = params;
1164 let sack_blocks = if let Some(recv) = recv {
1165 recv.last_window_update = (ack, wnd);
1167 recv.sack_blocks()
1168 } else {
1169 SackBlocks::default()
1170 };
1171 (ack, wnd >> wnd_scale, sack_blocks)
1172 }
1173}
1174
1175impl<'a, I: Instant, R: ReceiveBuffer> CalculatedRecvParams<'a, I, R> {
1176 fn from_recv(recv: &'a mut Recv<I, R>) -> Self {
1177 let WindowSizeCalculation { rcv_nxt: ack, window_size: wnd, threshold: _ } =
1178 recv.calculate_window_size();
1179 let wnd_scale = recv.wnd_scale;
1180 Self { params: RecvParams { ack, wnd_scale, wnd }, recv: Some(recv) }
1181 }
1182}
1183
1184trait RecvSegmentArgumentsProvider: Sized {
1185 fn take_rcv_segment_args(self) -> (SeqNum, UnscaledWindowSize, SackBlocks);
1191
1192 fn make_segment<P, F: FnOnce(SeqNum, UnscaledWindowSize, SackBlocks) -> Segment<P>>(
1195 self,
1196 f: F,
1197 ) -> Segment<P> {
1198 let (ack, wnd, sack) = self.take_rcv_segment_args();
1199 f(ack, wnd, sack)
1200 }
1201
1202 fn make_ack<P: Payload>(self, seq: SeqNum) -> Segment<P> {
1205 let (ack, wnd, sack_blocks) = self.take_rcv_segment_args();
1206 Segment::ack_with_options(seq, ack, wnd, Options::Segment(SegmentOptions { sack_blocks }))
1207 }
1208}
1209
1210#[derive(Debug)]
1225#[cfg_attr(test, derive(PartialEq, Eq))]
1226pub struct Established<I, R, S> {
1227 pub(crate) snd: Takeable<Send<I, S, { FinQueued::NO }>>,
1228 pub(crate) rcv: Takeable<Recv<I, R>>,
1229}
1230
1231#[derive(Debug, Clone, Copy, PartialEq)]
1234pub(crate) enum DataAcked {
1235 Yes,
1236 No,
1237}
1238
1239impl<I: Instant, S: SendBuffer, const FIN_QUEUED: bool> Send<I, S, FIN_QUEUED> {
1240 fn timed_out(&self, now: I, keep_alive: &KeepAlive) -> bool {
1242 match self.timer {
1243 Some(SendTimer::KeepAlive(keep_alive_timer)) => {
1244 keep_alive.enabled && keep_alive_timer.already_sent >= keep_alive.count.get()
1245 }
1246 Some(SendTimer::Retrans(timer)) | Some(SendTimer::ZeroWindowProbe(timer)) => {
1247 timer.timed_out(now)
1248 }
1249 Some(SendTimer::SWSProbe { at: _ }) | None => false,
1250 }
1251 }
1252
1253 fn poll_send(
1259 &mut self,
1260 id: &impl StateMachineDebugId,
1261 counters: &TcpCountersRefs<'_>,
1262 rcv: impl RecvSegmentArgumentsProvider,
1263 limit: u32,
1264 now: I,
1265 SocketOptions {
1266 keep_alive,
1267 nagle_enabled,
1268 user_timeout,
1269 delayed_ack: _,
1270 fin_wait2_timeout: _,
1271 max_syn_retries: _,
1272 ip_options: _,
1273 }: &SocketOptions,
1274 ) -> Option<Segment<S::Payload<'_>>> {
1275 let Self {
1276 nxt: snd_nxt,
1277 max: snd_max,
1278 una: snd_una,
1279 wnd: snd_wnd,
1280 buffer,
1281 wl1: _,
1282 wl2: _,
1283 rtt_sampler,
1284 rtt_estimator,
1285 timer,
1286 congestion_control,
1287 wnd_scale: _,
1288 wnd_max: snd_wnd_max,
1289 } = self;
1290 let BufferLimits { capacity: _, len: readable_bytes } = buffer.limits();
1291 let mss = u32::from(congestion_control.mss());
1292 let mut zero_window_probe = false;
1293 let mut override_sws = false;
1294
1295 let increment_retransmit_counters = |congestion_control: &CongestionControl<I>| {
1296 counters.increment(|c| &c.retransmits);
1297 if congestion_control.in_fast_recovery() {
1298 counters.increment(|c| &c.fast_retransmits);
1299 }
1300 if congestion_control.in_slow_start() {
1301 counters.increment(|c| &c.slow_start_retransmits);
1302 }
1303 };
1304
1305 match timer {
1306 Some(SendTimer::Retrans(retrans_timer)) => {
1307 if retrans_timer.at <= now {
1308 *snd_nxt = *snd_una;
1320 retrans_timer.backoff(now);
1321 congestion_control.on_retransmission_timeout();
1322 counters.increment(|c| &c.timeouts);
1323 increment_retransmit_counters(congestion_control);
1324 }
1325 }
1326 Some(SendTimer::ZeroWindowProbe(retrans_timer)) => {
1327 debug_assert!(readable_bytes > 0 || FIN_QUEUED);
1328 if retrans_timer.at <= now {
1329 zero_window_probe = true;
1330 *snd_nxt = *snd_una;
1331 retrans_timer.backoff(now);
1335 }
1336 }
1337 Some(SendTimer::KeepAlive(KeepAliveTimer { at, already_sent })) => {
1338 if keep_alive.enabled && !FIN_QUEUED && readable_bytes == 0 {
1343 if *at <= now {
1344 *at = now.saturating_add(keep_alive.interval.into());
1345 *already_sent = already_sent.saturating_add(1);
1346 return Some(rcv.make_ack(*snd_max - 1));
1349 }
1350 } else {
1351 *timer = None;
1352 }
1353 }
1354 Some(SendTimer::SWSProbe { at }) => {
1355 if *at <= now {
1356 override_sws = true;
1357 *timer = None;
1358 }
1359 }
1360 None => {}
1361 };
1362
1363 if *snd_wnd == WindowSize::ZERO && readable_bytes > 0 {
1366 match timer {
1367 Some(SendTimer::ZeroWindowProbe(_)) => {}
1368 _ => {
1369 *timer = Some(SendTimer::ZeroWindowProbe(RetransTimer::new(
1370 now,
1371 rtt_estimator.rto(),
1372 *user_timeout,
1373 DEFAULT_MAX_RETRIES,
1374 )));
1375
1376 return None;
1385 }
1386 }
1387 }
1388
1389 let next_seg = match congestion_control.fast_retransmit() {
1392 None => *snd_nxt,
1393 Some(seg) => {
1394 increment_retransmit_counters(congestion_control);
1395 seg
1396 }
1397 };
1398 let cwnd = congestion_control.cwnd();
1402 let swnd = WindowSize::min(*snd_wnd, cwnd);
1403 let open_window =
1404 u32::try_from(*snd_una + swnd - next_seg).ok_checked::<TryFromIntError>()?;
1405 let offset =
1406 usize::try_from(next_seg - *snd_una).unwrap_or_else(|TryFromIntError { .. }| {
1407 panic!("next_seg({:?}) should never fall behind snd.una({:?})", *snd_nxt, *snd_una);
1408 });
1409 let available = u32::try_from(readable_bytes + usize::from(FIN_QUEUED) - offset)
1410 .unwrap_or_else(|_| WindowSize::MAX.into());
1411 let can_send =
1415 open_window.min(available).min(mss).min(limit).max(u32::from(zero_window_probe));
1416 if can_send == 0 {
1417 if available == 0 && offset == 0 && timer.is_none() && keep_alive.enabled {
1418 *timer = Some(SendTimer::KeepAlive(KeepAliveTimer::idle(now, keep_alive)));
1419 }
1420 return None;
1421 }
1422 let has_fin = FIN_QUEUED && can_send == available;
1423 let seg = buffer.peek_with(offset, |readable| {
1424 let bytes_to_send = u32::min(
1425 can_send - u32::from(has_fin),
1426 u32::try_from(readable.len()).unwrap_or(u32::MAX),
1427 );
1428 let has_fin = has_fin && bytes_to_send == can_send - u32::from(has_fin);
1429 if bytes_to_send < mss && !has_fin {
1432 if bytes_to_send == 0 {
1433 return None;
1434 }
1435 if *nagle_enabled && snd_nxt.after(*snd_una) {
1443 return None;
1444 }
1445 if available > open_window
1474 && open_window < u32::min(mss, u32::from(*snd_wnd_max) / SWS_BUFFER_FACTOR)
1475 && !override_sws
1476 && !zero_window_probe
1477 {
1478 if timer.is_none() {
1479 *timer =
1480 Some(SendTimer::SWSProbe { at: now.panicking_add(SWS_PROBE_TIMEOUT) })
1481 }
1482 return None;
1483 }
1484 }
1485
1486 let seg = rcv.make_segment(|ack, wnd, mut sack_blocks| {
1487 let bytes_to_send = match sack_blocks.as_option() {
1488 Some(option) => {
1490 let options_len = u32::try_from(
1494 packet_formats::tcp::aligned_options_length(core::iter::once(option)),
1495 )
1496 .unwrap();
1497 if options_len < mss {
1498 bytes_to_send.min(mss - options_len)
1499 } else {
1500 sack_blocks.clear();
1508 bytes_to_send
1509 }
1510 }
1511 None => bytes_to_send,
1513 };
1514 let (seg, discarded) = Segment::with_data_options(
1515 next_seg,
1516 Some(ack),
1517 has_fin.then_some(Control::FIN),
1518 wnd,
1519 readable.slice(0..bytes_to_send),
1520 Options::Segment(SegmentOptions { sack_blocks }),
1521 );
1522 debug_assert_eq!(discarded, 0);
1523 seg
1524 });
1525 Some(seg)
1526 })?;
1527 trace_instant!(c"tcp::Send::poll_send/segment",
1528 "id" => id.trace_id(),
1529 "seq" => u32::from(next_seg),
1530 "len" => seg.len(),
1531 "can_send" => can_send,
1532 "snd_wnd" => u32::from(*snd_wnd),
1533 "cwnd" => u32::from(cwnd),
1534 "open_window" => open_window,
1535 "available" => available,
1536 );
1537 let seq_max = next_seg + seg.len();
1538 rtt_sampler.on_will_send_segment(now, next_seg..seq_max, *snd_max);
1539
1540 if seq_max.after(*snd_nxt) {
1541 *snd_nxt = seq_max;
1542 }
1543 if seq_max.after(*snd_max) {
1544 *snd_max = seq_max;
1545 }
1546 match timer {
1552 Some(SendTimer::Retrans(_)) | Some(SendTimer::ZeroWindowProbe(_)) => {}
1553 Some(SendTimer::KeepAlive(_)) | Some(SendTimer::SWSProbe { at: _ }) | None => {
1554 *timer = Some(SendTimer::Retrans(RetransTimer::new(
1555 now,
1556 rtt_estimator.rto(),
1557 *user_timeout,
1558 DEFAULT_MAX_RETRIES,
1559 )))
1560 }
1561 }
1562 Some(seg)
1563 }
1564
1565 fn process_ack<R: RecvSegmentArgumentsProvider>(
1568 &mut self,
1569 id: &impl StateMachineDebugId,
1570 counters: &TcpCountersRefs<'_>,
1571 seg_seq: SeqNum,
1572 seg_ack: SeqNum,
1573 seg_wnd: UnscaledWindowSize,
1574 pure_ack: bool,
1575 rcv: R,
1576 now: I,
1577 SocketOptions {
1578 keep_alive,
1579 nagle_enabled: _,
1580 user_timeout,
1581 delayed_ack: _,
1582 fin_wait2_timeout: _,
1583 max_syn_retries: _,
1584 ip_options: _,
1585 }: &SocketOptions,
1586 ) -> (Option<Segment<()>>, DataAcked) {
1587 let Self {
1588 nxt: snd_nxt,
1589 max: snd_max,
1590 una: snd_una,
1591 wnd: snd_wnd,
1592 wl1: snd_wl1,
1593 wl2: snd_wl2,
1594 wnd_max,
1595 buffer,
1596 rtt_sampler,
1597 rtt_estimator,
1598 timer,
1599 congestion_control,
1600 wnd_scale,
1601 } = self;
1602 let seg_wnd = seg_wnd << *wnd_scale;
1603 match timer {
1604 Some(SendTimer::KeepAlive(_)) | None => {
1605 if keep_alive.enabled {
1606 *timer = Some(SendTimer::KeepAlive(KeepAliveTimer::idle(now, keep_alive)));
1607 }
1608 }
1609 Some(SendTimer::Retrans(retrans_timer)) => {
1610 if seg_ack == *snd_max {
1618 *timer = None;
1619 } else if seg_ack.before(*snd_max) && seg_ack.after(*snd_una) {
1620 *retrans_timer = RetransTimer::new(
1621 now,
1622 rtt_estimator.rto(),
1623 *user_timeout,
1624 DEFAULT_MAX_RETRIES,
1625 );
1626 }
1627 }
1628 Some(SendTimer::ZeroWindowProbe(_)) | Some(SendTimer::SWSProbe { at: _ }) => {}
1629 }
1630 if seg_ack.after(*snd_max) {
1634 (Some(rcv.make_ack(*snd_max)), DataAcked::No)
1639 } else {
1640 let (trace, data_acked) = if seg_ack.after(*snd_una) {
1641 let acked = u32::try_from(seg_ack - *snd_una)
1643 .ok_checked::<TryFromIntError>()
1644 .and_then(NonZeroU32::new)
1645 .unwrap_or_else(|| {
1646 panic!("seg_ack({:?}) - snd_una({:?}) must be positive", seg_ack, snd_una);
1647 });
1648 let BufferLimits { len, capacity: _ } = buffer.limits();
1649 let fin_acked = FIN_QUEUED && seg_ack == *snd_una + len + 1;
1650 buffer.mark_read(
1655 NonZeroUsize::try_from(acked)
1656 .unwrap_or_else(|TryFromIntError { .. }| {
1657 panic!(
1661 "acked({:?}) must be smaller than isize::MAX({:?})",
1662 acked,
1663 isize::MAX
1664 )
1665 })
1666 .get()
1667 - usize::from(fin_acked),
1668 );
1669 *snd_una = seg_ack;
1670 if seg_ack.after(*snd_nxt) {
1674 *snd_nxt = seg_ack;
1675 }
1676 if let Some(rtt) = rtt_sampler.on_ack(now, seg_ack) {
1679 rtt_estimator.sample(rtt);
1680 }
1681
1682 if let Some(rtt) = rtt_estimator.srtt() {
1688 congestion_control.on_ack(acked, now, rtt);
1689 }
1690
1691 (true, DataAcked::Yes)
1693 } else {
1694 let is_dup_ack = {
1704 snd_nxt.after(*snd_una) && pure_ack && seg_ack == *snd_una && seg_wnd == *snd_wnd };
1709 if is_dup_ack {
1710 let fast_recovery_initiated = congestion_control.on_dup_ack(seg_ack);
1711 if fast_recovery_initiated {
1712 counters.increment(|c| &c.fast_recovery);
1713 }
1714 }
1715 (is_dup_ack, DataAcked::No)
1719 };
1720
1721 if !snd_una.after(seg_ack)
1728 && (snd_wl1.before(seg_seq) || (seg_seq == *snd_wl1 && !snd_wl2.after(seg_ack)))
1729 {
1730 *snd_wnd = seg_wnd;
1731 *snd_wl1 = seg_seq;
1732 *snd_wl2 = seg_ack;
1733 *wnd_max = seg_wnd.max(*wnd_max);
1734 if seg_wnd != WindowSize::ZERO
1735 && matches!(timer, Some(SendTimer::ZeroWindowProbe(_)))
1736 {
1737 *timer = None;
1738 *snd_nxt = *snd_una;
1742 }
1743 }
1744
1745 if trace {
1747 trace_instant!(c"tcp::Send::process_ack",
1748 "id" => id.trace_id(),
1749 "seg_ack" => u32::from(seg_ack),
1750 "snd_nxt" => u32::from(*snd_nxt),
1751 "snd_wnd" => u32::from(*snd_wnd),
1752 "rtt_ms" => u32::try_from(
1753 rtt_estimator.srtt().unwrap_or(Duration::ZERO).as_millis()
1756 ).unwrap_or(u32::MAX),
1757 "cwnd" => u32::from(congestion_control.cwnd()),
1758 "ssthresh" => congestion_control.slow_start_threshold(),
1759 "fast_recovery" => congestion_control.in_fast_recovery(),
1760 "acked" => data_acked == DataAcked::Yes,
1761 );
1762 }
1763
1764 (None, data_acked)
1765 }
1766 }
1767}
1768
1769impl<I: Instant, S: SendBuffer> Send<I, S, { FinQueued::NO }> {
1770 fn queue_fin(self) -> Send<I, S, { FinQueued::YES }> {
1771 let Self {
1772 nxt,
1773 max,
1774 una,
1775 wnd,
1776 wl1,
1777 wl2,
1778 buffer,
1779 rtt_sampler,
1780 rtt_estimator,
1781 timer,
1782 congestion_control,
1783 wnd_scale,
1784 wnd_max,
1785 } = self;
1786 Send {
1787 nxt,
1788 max,
1789 una,
1790 wnd,
1791 wl1,
1792 wl2,
1793 buffer,
1794 rtt_sampler,
1795 rtt_estimator,
1796 timer,
1797 congestion_control,
1798 wnd_scale,
1799 wnd_max,
1800 }
1801 }
1802}
1803
1804#[derive(Debug)]
1818#[cfg_attr(test, derive(PartialEq, Eq))]
1819pub struct CloseWait<I, S> {
1820 snd: Takeable<Send<I, S, { FinQueued::NO }>>,
1821 closed_rcv: RecvParams,
1822}
1823
1824#[derive(Debug)]
1840#[cfg_attr(test, derive(PartialEq, Eq))]
1841pub struct LastAck<I, S> {
1842 snd: Send<I, S, { FinQueued::YES }>,
1843 closed_rcv: RecvParams,
1844}
1845
1846#[derive(Debug)]
1861#[cfg_attr(test, derive(PartialEq, Eq))]
1862pub struct FinWait1<I, R, S> {
1863 snd: Takeable<Send<I, S, { FinQueued::YES }>>,
1864 rcv: Takeable<Recv<I, R>>,
1865}
1866
1867#[derive(Debug)]
1868#[cfg_attr(test, derive(PartialEq, Eq))]
1869pub struct FinWait2<I, R> {
1883 last_seq: SeqNum,
1884 rcv: Recv<I, R>,
1885 timeout_at: Option<I>,
1886}
1887
1888#[derive(Debug)]
1889#[cfg_attr(test, derive(PartialEq, Eq))]
1890pub struct Closing<I, S> {
1904 snd: Send<I, S, { FinQueued::YES }>,
1905 closed_rcv: RecvParams,
1906}
1907
1908#[derive(Debug)]
1923#[cfg_attr(test, derive(PartialEq, Eq))]
1924pub struct TimeWait<I> {
1925 pub(super) last_seq: SeqNum,
1926 pub(super) expiry: I,
1927 pub(super) closed_rcv: RecvParams,
1928}
1929
1930fn new_time_wait_expiry<I: Instant>(now: I) -> I {
1931 now.panicking_add(MSL * 2)
1932}
1933
1934#[derive(Debug)]
1935#[cfg_attr(test, derive(PartialEq, Eq))]
1936pub enum State<I, R, S, ActiveOpen> {
1937 Closed(Closed<Option<ConnectionError>>),
1938 Listen(Listen),
1939 SynRcvd(SynRcvd<I, ActiveOpen>),
1940 SynSent(SynSent<I, ActiveOpen>),
1941 Established(Established<I, R, S>),
1942 CloseWait(CloseWait<I, S>),
1943 LastAck(LastAck<I, S>),
1944 FinWait1(FinWait1<I, R, S>),
1945 FinWait2(FinWait2<I, R>),
1946 Closing(Closing<I, S>),
1947 TimeWait(TimeWait<I>),
1948}
1949
1950impl<I, R, S, ActiveOpen> core::fmt::Display for State<I, R, S, ActiveOpen> {
1951 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
1952 let name = match self {
1953 State::Closed(_) => "Closed",
1954 State::Listen(_) => "Listen",
1955 State::SynRcvd(_) => "SynRcvd",
1956 State::SynSent(_) => "SynSent",
1957 State::Established(_) => "Established",
1958 State::CloseWait(_) => "CloseWait",
1959 State::LastAck(_) => "LastAck",
1960 State::FinWait1(_) => "FinWait1",
1961 State::FinWait2(_) => "FinWait2",
1962 State::Closing(_) => "Closing",
1963 State::TimeWait(_) => "TimeWait",
1964 };
1965 write!(f, "{name}")
1966 }
1967}
1968
1969#[derive(Debug, PartialEq, Eq)]
1970pub(super) enum CloseError {
1972 Closing,
1974 NoConnection,
1976}
1977
1978pub(crate) trait BufferProvider<R: ReceiveBuffer, S: SendBuffer> {
1981 type PassiveOpen;
1984
1985 type ActiveOpen: IntoBuffers<R, S>;
1988
1989 fn new_passive_open_buffers(buffer_sizes: BufferSizes) -> (R, S, Self::PassiveOpen);
1992}
1993
1994#[derive(Copy, Clone, Debug, Eq, PartialEq)]
2006pub(crate) struct Takeable<T>(Option<T>);
2007
2008impl<T> From<T> for Takeable<T> {
2009 fn from(value: T) -> Self {
2010 Self(Some(value))
2011 }
2012}
2013
2014impl<T> Takeable<T> {
2015 pub(crate) fn get(&self) -> &T {
2016 let Self(i) = self;
2017 i.as_ref().expect("accessed taken takeable")
2018 }
2019
2020 pub(crate) fn get_mut(&mut self) -> &mut T {
2021 let Self(i) = self;
2022 i.as_mut().expect("accessed taken takeable")
2023 }
2024
2025 pub(crate) fn new(v: T) -> Self {
2026 Self(Some(v))
2027 }
2028
2029 pub(crate) fn to_ref(&mut self) -> TakeableRef<'_, T> {
2030 TakeableRef(self)
2031 }
2032
2033 pub(crate) fn from_ref(t: TakeableRef<'_, T>) -> Self {
2034 let TakeableRef(Self(t)) = t;
2035 Self(Some(t.take().expect("accessed taken takeable")))
2036 }
2037
2038 pub(crate) fn into_inner(self) -> T {
2039 let Self(i) = self;
2040 i.expect("accessed taken takeable")
2041 }
2042
2043 pub(crate) fn map<R, F: FnOnce(T) -> R>(self, f: F) -> Takeable<R> {
2044 Takeable(Some(f(self.into_inner())))
2045 }
2046}
2047
2048impl<T> Deref for Takeable<T> {
2049 type Target = T;
2050
2051 fn deref(&self) -> &Self::Target {
2052 self.get()
2053 }
2054}
2055
2056impl<T> DerefMut for Takeable<T> {
2057 fn deref_mut(&mut self) -> &mut Self::Target {
2058 self.get_mut()
2059 }
2060}
2061
2062pub(crate) struct TakeableRef<'a, T>(&'a mut Takeable<T>);
2064
2065impl<'a, T> TakeableRef<'a, T> {
2066 pub(crate) fn take(self) -> T {
2067 let Self(Takeable(t)) = self;
2068 t.take().expect("accessed taken takeable")
2069 }
2070
2071 pub(crate) fn to_takeable(self) -> Takeable<T> {
2072 Takeable::new(self.take())
2073 }
2074}
2075
2076#[must_use = "must check to determine if the socket needs to be removed from the demux state"]
2077#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2078pub(crate) enum NewlyClosed {
2079 No,
2080 Yes,
2081}
2082
2083impl<I: Instant + 'static, R: ReceiveBuffer, S: SendBuffer, ActiveOpen: Debug>
2084 State<I, R, S, ActiveOpen>
2085{
2086 fn transition_to_state(
2088 &mut self,
2089 counters: &TcpCountersRefs<'_>,
2090 new_state: State<I, R, S, ActiveOpen>,
2091 ) -> NewlyClosed {
2092 log::debug!("transition to state {} => {}", self, new_state);
2093 let newly_closed = if let State::Closed(Closed { reason }) = &new_state {
2094 let (was_established, was_closed) = match self {
2095 State::Closed(_) => (false, true),
2096 State::Listen(_) | State::SynRcvd(_) | State::SynSent(_) => (false, false),
2097 State::Established(_)
2098 | State::CloseWait(_)
2099 | State::LastAck(_)
2100 | State::FinWait1(_)
2101 | State::FinWait2(_)
2102 | State::Closing(_)
2103 | State::TimeWait(_) => (true, false),
2104 };
2105 if was_established {
2106 counters.increment(|c| &c.established_closed);
2107 match reason {
2108 Some(ConnectionError::ConnectionRefused)
2109 | Some(ConnectionError::ConnectionReset) => {
2110 counters.increment(|c| &c.established_resets);
2111 }
2112 Some(ConnectionError::TimedOut) => {
2113 counters.increment(|c| &c.established_timedout);
2114 }
2115 _ => {}
2116 }
2117 }
2118 (!was_closed).then_some(NewlyClosed::Yes).unwrap_or(NewlyClosed::No)
2119 } else {
2120 NewlyClosed::No
2121 };
2122 *self = new_state;
2123 newly_closed
2124 }
2125 pub(crate) fn on_segment<P: Payload, BP: BufferProvider<R, S, ActiveOpen = ActiveOpen>>(
2132 &mut self,
2133 id: &impl StateMachineDebugId,
2134 counters: &TcpCountersRefs<'_>,
2135 incoming: Segment<P>,
2136 now: I,
2137 options @ SocketOptions {
2138 keep_alive: _,
2139 nagle_enabled: _,
2140 user_timeout: _,
2141 delayed_ack,
2142 fin_wait2_timeout,
2143 max_syn_retries: _,
2144 ip_options: _,
2145 }: &SocketOptions,
2146 defunct: bool,
2147 ) -> (Option<Segment<()>>, Option<BP::PassiveOpen>, DataAcked, NewlyClosed)
2148 where
2149 BP::PassiveOpen: Debug,
2150 ActiveOpen: IntoBuffers<R, S>,
2151 {
2152 let mut passive_open = None;
2153 let mut data_acked = DataAcked::No;
2154 let (seg, newly_closed) = (|| {
2155 let (mut rcv, snd_max, rst_on_new_data) = match self {
2156 State::Closed(closed) => return (closed.on_segment(&incoming), NewlyClosed::No),
2157 State::Listen(listen) => {
2158 return (
2159 match listen.on_segment(incoming, now) {
2160 ListenOnSegmentDisposition::SendSynAckAndEnterSynRcvd(
2161 syn_ack,
2162 SynRcvd {
2163 iss,
2164 irs,
2165 timestamp,
2166 retrans_timer,
2167 simultaneous_open,
2168 buffer_sizes,
2169 smss,
2170 rcv_wnd_scale,
2171 snd_wnd_scale,
2172 sack_permitted,
2173 },
2174 ) => {
2175 match simultaneous_open {
2176 None => {
2177 assert_eq!(
2178 self.transition_to_state(
2179 counters,
2180 State::SynRcvd(SynRcvd {
2181 iss,
2182 irs,
2183 timestamp,
2184 retrans_timer,
2185 simultaneous_open: None,
2186 buffer_sizes,
2187 smss,
2188 rcv_wnd_scale,
2189 snd_wnd_scale,
2190 sack_permitted,
2191 }),
2192 ),
2193 NewlyClosed::No
2194 )
2195 }
2196 }
2197 Some(syn_ack)
2198 }
2199 ListenOnSegmentDisposition::SendRst(rst) => Some(rst),
2200 ListenOnSegmentDisposition::Ignore => None,
2201 },
2202 NewlyClosed::No,
2203 );
2204 }
2205 State::SynSent(synsent) => {
2206 return match synsent.on_segment(incoming, now) {
2207 SynSentOnSegmentDisposition::SendAckAndEnterEstablished(established) => (
2208 replace_with_and(self, |this| {
2209 assert_matches!(this, State::SynSent(SynSent {
2210 active_open,
2211 buffer_sizes,
2212 ..
2213 }) => {
2214 let Established {snd, rcv} = established;
2215 let (rcv_buffer, snd_buffer) =
2216 active_open.into_buffers(buffer_sizes);
2217 let mut established = Established {
2218 snd: snd.map(|s| s.with_buffer(snd_buffer)),
2219 rcv: rcv.map(|s| s.with_buffer(rcv_buffer)),
2220 };
2221 let ack = Some(established.rcv.make_ack(established.snd.max));
2222 (State::Established(established), ack)
2223 })
2224 }),
2225 NewlyClosed::No,
2226 ),
2227 SynSentOnSegmentDisposition::SendSynAckAndEnterSynRcvd(
2228 syn_ack,
2229 mut syn_rcvd,
2230 ) => {
2231 replace_with(self, |this| {
2232 assert_matches!(this, State::SynSent(SynSent {
2233 active_open,
2234 ..
2235 }) => {
2236 assert_matches!(syn_rcvd.simultaneous_open.replace(active_open), None);
2237 State::SynRcvd(syn_rcvd)
2238 })
2239 });
2240 (Some(syn_ack), NewlyClosed::No)
2241 }
2242 SynSentOnSegmentDisposition::SendRst(rst) => (Some(rst), NewlyClosed::No),
2243 SynSentOnSegmentDisposition::EnterClosed(closed) => {
2244 assert_eq!(
2245 self.transition_to_state(counters, State::Closed(closed)),
2246 NewlyClosed::Yes,
2247 );
2248 (None, NewlyClosed::Yes)
2249 }
2250 SynSentOnSegmentDisposition::Ignore => (None, NewlyClosed::No),
2251 }
2252 }
2253 State::SynRcvd(SynRcvd {
2254 iss,
2255 irs,
2256 timestamp: _,
2257 retrans_timer: _,
2258 simultaneous_open: _,
2259 buffer_sizes,
2260 smss: _,
2261 rcv_wnd_scale: _,
2262 snd_wnd_scale: _,
2263 sack_permitted: _,
2264 }) => {
2265 let advertised = buffer_sizes.rwnd_unscaled();
2269 (
2270 CalculatedRecvParams::from_params(RecvParams {
2271 ack: *irs + 1,
2272 wnd: advertised << WindowScale::default(),
2273 wnd_scale: WindowScale::default(),
2274 }),
2275 *iss + 1,
2276 false,
2277 )
2278 }
2279 State::Established(Established { rcv, snd }) => {
2280 (CalculatedRecvParams::from_recv(rcv.get_mut()), snd.max, false)
2281 }
2282 State::CloseWait(CloseWait { snd, closed_rcv }) => {
2283 (CalculatedRecvParams::from_params(closed_rcv.clone()), snd.max, true)
2284 }
2285 State::LastAck(LastAck { snd, closed_rcv })
2286 | State::Closing(Closing { snd, closed_rcv }) => {
2287 (CalculatedRecvParams::from_params(closed_rcv.clone()), snd.max, true)
2288 }
2289 State::FinWait1(FinWait1 { rcv, snd }) => {
2290 let closed = rcv.buffer.is_closed();
2291 (CalculatedRecvParams::from_recv(rcv.get_mut()), snd.max, closed)
2292 }
2293 State::FinWait2(FinWait2 { last_seq, rcv, timeout_at: _ }) => {
2294 let closed = rcv.buffer.is_closed();
2295 (CalculatedRecvParams::from_recv(rcv), *last_seq, closed)
2296 }
2297 State::TimeWait(TimeWait { last_seq, expiry: _, closed_rcv }) => {
2298 (CalculatedRecvParams::from_params(closed_rcv.clone()), *last_seq, true)
2299 }
2300 };
2301
2302 if rst_on_new_data && (incoming.header.seq + incoming.data.len()).after(rcv.nxt()) {
2305 return (
2306 Some(Segment::rst(snd_max)),
2307 self.transition_to_state(
2308 counters,
2309 State::Closed(Closed { reason: Some(ConnectionError::ConnectionReset) }),
2310 ),
2311 );
2312 }
2313
2314 let is_rst = incoming.header.control == Some(Control::RST);
2320 let pure_ack = incoming.len() == 0;
2322 let needs_ack = !pure_ack;
2323 let Segment {
2324 header:
2325 SegmentHeader { seq: seg_seq, ack: seg_ack, wnd: seg_wnd, control, options: _ },
2326 data,
2327 } = match incoming.overlap(rcv.nxt(), rcv.wnd()) {
2328 Some(incoming) => incoming,
2329 None => {
2330 let segment = if is_rst {
2338 None
2339 } else {
2340 if let Some(recv) = rcv.recv.as_mut() {
2341 recv.reset_quickacks();
2344 }
2345 Some(rcv.make_ack(snd_max))
2346 };
2347
2348 return (segment, NewlyClosed::No);
2349 }
2350 };
2351 if control == Some(Control::RST) {
2359 return (
2360 None,
2361 self.transition_to_state(
2362 counters,
2363 State::Closed(Closed { reason: Some(ConnectionError::ConnectionReset) }),
2364 ),
2365 );
2366 }
2367 if control == Some(Control::SYN) {
2378 return (
2379 Some(Segment::rst(snd_max)),
2380 self.transition_to_state(
2381 counters,
2382 State::Closed(Closed { reason: Some(ConnectionError::ConnectionReset) }),
2383 ),
2384 );
2385 }
2386 match seg_ack {
2389 Some(seg_ack) => match self {
2390 State::Closed(_) | State::Listen(_) | State::SynSent(_) => {
2391 unreachable!("encountered an already-handled state: {:?}", self)
2393 }
2394 State::SynRcvd(SynRcvd {
2395 iss,
2396 irs,
2397 timestamp: syn_rcvd_ts,
2398 retrans_timer: _,
2399 simultaneous_open,
2400 buffer_sizes,
2401 smss,
2402 rcv_wnd_scale,
2403 snd_wnd_scale,
2404 sack_permitted,
2405 }) => {
2406 if seg_ack != *iss + 1 {
2419 return (Some(Segment::rst(seg_ack)), NewlyClosed::No);
2420 } else {
2421 let mut rtt_estimator = Estimator::default();
2422 if let Some(syn_rcvd_ts) = syn_rcvd_ts {
2423 rtt_estimator.sample(now.saturating_duration_since(*syn_rcvd_ts));
2424 }
2425 let (rcv_buffer, snd_buffer) = match simultaneous_open.take() {
2426 None => {
2427 let (rcv_buffer, snd_buffer, client) =
2428 BP::new_passive_open_buffers(*buffer_sizes);
2429 assert_matches!(passive_open.replace(client), None);
2430 (rcv_buffer, snd_buffer)
2431 }
2432 Some(active_open) => active_open.into_buffers(*buffer_sizes),
2433 };
2434 let (snd_wnd_scale, rcv_wnd_scale) = snd_wnd_scale
2435 .map(|snd_wnd_scale| (snd_wnd_scale, *rcv_wnd_scale))
2436 .unwrap_or_default();
2437 let established = Established {
2438 snd: Send {
2439 nxt: *iss + 1,
2440 max: *iss + 1,
2441 una: seg_ack,
2442 wnd: seg_wnd << snd_wnd_scale,
2443 wl1: seg_seq,
2444 wl2: seg_ack,
2445 buffer: snd_buffer,
2446 rtt_sampler: RttSampler::default(),
2447 rtt_estimator,
2448 timer: None,
2449 congestion_control: CongestionControl::cubic_with_mss(*smss),
2450 wnd_scale: snd_wnd_scale,
2451 wnd_max: seg_wnd << snd_wnd_scale,
2452 }
2453 .into(),
2454 rcv: Recv {
2455 buffer: RecvBufferState::Open {
2456 buffer: rcv_buffer,
2457 assembler: Assembler::new(*irs + 1),
2458 },
2459 timer: None,
2460 mss: *smss,
2461 wnd_scale: rcv_wnd_scale,
2462 last_segment_at: None,
2463 remaining_quickacks: quickack_counter(
2464 buffer_sizes.rcv_limits(),
2465 *smss,
2466 ),
2467 last_window_update: (*irs + 1, buffer_sizes.rwnd()),
2468 sack_permitted: *sack_permitted,
2469 }
2470 .into(),
2471 };
2472 assert_eq!(
2473 self.transition_to_state(counters, State::Established(established)),
2474 NewlyClosed::No
2475 );
2476 }
2477 }
2481 State::Established(Established { snd, rcv }) => {
2482 let (ack, segment_acked_data) = snd.process_ack(
2483 id,
2484 counters,
2485 seg_seq,
2486 seg_ack,
2487 seg_wnd,
2488 pure_ack,
2489 rcv.get_mut(),
2490 now,
2491 options,
2492 );
2493 data_acked = segment_acked_data;
2494 if let Some(ack) = ack {
2495 return (Some(ack), NewlyClosed::No);
2496 }
2497 }
2498 State::CloseWait(CloseWait { snd, closed_rcv }) => {
2499 let (ack, segment_acked_data) = snd.process_ack(
2500 id,
2501 counters,
2502 seg_seq,
2503 seg_ack,
2504 seg_wnd,
2505 pure_ack,
2506 &*closed_rcv,
2507 now,
2508 options,
2509 );
2510 data_acked = segment_acked_data;
2511 if let Some(ack) = ack {
2512 return (Some(ack), NewlyClosed::No);
2513 }
2514 }
2515 State::LastAck(LastAck { snd, closed_rcv }) => {
2516 let BufferLimits { len, capacity: _ } = snd.buffer.limits();
2517 let fin_seq = snd.una + len + 1;
2518 let (ack, segment_acked_data) = snd.process_ack(
2519 id,
2520 counters,
2521 seg_seq,
2522 seg_ack,
2523 seg_wnd,
2524 pure_ack,
2525 &*closed_rcv,
2526 now,
2527 options,
2528 );
2529 data_acked = segment_acked_data;
2530 if let Some(ack) = ack {
2531 return (Some(ack), NewlyClosed::No);
2532 } else if seg_ack == fin_seq {
2533 return (
2534 None,
2535 self.transition_to_state(
2536 counters,
2537 State::Closed(Closed { reason: None }),
2538 ),
2539 );
2540 }
2541 }
2542 State::FinWait1(FinWait1 { snd, rcv }) => {
2543 let BufferLimits { len, capacity: _ } = snd.buffer.limits();
2544 let fin_seq = snd.una + len + 1;
2545 let (ack, segment_acked_data) = snd.process_ack(
2546 id,
2547 counters,
2548 seg_seq,
2549 seg_ack,
2550 seg_wnd,
2551 pure_ack,
2552 rcv.get_mut(),
2553 now,
2554 options,
2555 );
2556 data_acked = segment_acked_data;
2557 if let Some(ack) = ack {
2558 return (Some(ack), NewlyClosed::No);
2559 } else if seg_ack == fin_seq {
2560 let last_seq = snd.nxt;
2566 let finwait2 = FinWait2 {
2567 last_seq,
2568 rcv: rcv.to_ref().take(),
2569 timeout_at: fin_wait2_timeout.and_then(|timeout| {
2573 defunct.then_some(now.saturating_add(timeout))
2574 }),
2575 };
2576 assert_eq!(
2577 self.transition_to_state(counters, State::FinWait2(finwait2)),
2578 NewlyClosed::No
2579 );
2580 }
2581 }
2582 State::Closing(Closing { snd, closed_rcv }) => {
2583 let BufferLimits { len, capacity: _ } = snd.buffer.limits();
2584 let fin_seq = snd.una + len + 1;
2585 let (ack, segment_acked_data) = snd.process_ack(
2586 id,
2587 counters,
2588 seg_seq,
2589 seg_ack,
2590 seg_wnd,
2591 pure_ack,
2592 &*closed_rcv,
2593 now,
2594 options,
2595 );
2596 data_acked = segment_acked_data;
2597 if let Some(ack) = ack {
2598 data_acked = segment_acked_data;
2599 return (Some(ack), NewlyClosed::No);
2600 } else if seg_ack == fin_seq {
2601 let timewait = TimeWait {
2606 last_seq: snd.nxt,
2607 expiry: new_time_wait_expiry(now),
2608 closed_rcv: closed_rcv.clone(),
2609 };
2610 assert_eq!(
2611 self.transition_to_state(counters, State::TimeWait(timewait)),
2612 NewlyClosed::No
2613 );
2614 }
2615 }
2616 State::FinWait2(_) | State::TimeWait(_) => {}
2617 },
2618 None => return (None, NewlyClosed::No),
2621 }
2622 let maybe_ack_to_text = |rcv: &mut Recv<I, R>, rto: Rto| {
2644 if !needs_ack {
2645 return (None, rcv.nxt());
2646 }
2647
2648 if let Some(last) = rcv.last_segment_at.replace(now) {
2652 if now.saturating_duration_since(last) >= rto.get() {
2653 rcv.reset_quickacks();
2654 }
2655 }
2656
2657 let had_out_of_order = rcv.buffer.has_out_of_order();
2660 if data.len() > 0 {
2661 let offset = usize::try_from(seg_seq - rcv.nxt()).unwrap_or_else(|TryFromIntError {..}| {
2662 panic!("The segment was trimmed to fit the window, thus seg.seq({:?}) must not come before rcv.nxt({:?})", seg_seq, rcv.nxt());
2663 });
2664 match &mut rcv.buffer {
2665 RecvBufferState::Open { buffer, assembler } => {
2666 let nwritten = buffer.write_at(offset, &data);
2667 let readable = assembler.insert(seg_seq..seg_seq + nwritten);
2668 buffer.make_readable(readable, assembler.has_outstanding());
2669 }
2670 RecvBufferState::Closed { nxt, .. } => *nxt = seg_seq + data.len(),
2671 }
2672 }
2673 let immediate_ack = !*delayed_ack
2679 || had_out_of_order
2680 || rcv.buffer.has_out_of_order()
2681 || rcv.remaining_quickacks != 0
2683 || rcv.timer.is_some();
2708
2709 if immediate_ack {
2710 rcv.timer = None;
2711 } else {
2712 rcv.timer = Some(ReceiveTimer::DelayedAck {
2713 at: now.panicking_add(ACK_DELAY_THRESHOLD),
2714 });
2715 }
2716 let segment =
2717 (!matches!(rcv.timer, Some(ReceiveTimer::DelayedAck { .. }))).then(|| {
2718 rcv.remaining_quickacks = rcv.remaining_quickacks.saturating_sub(1);
2719 rcv.make_ack(snd_max)
2720 });
2721 (segment, rcv.nxt())
2722 };
2723
2724 let (ack_to_text, rcv_nxt) = match self {
2725 State::Closed(_) | State::Listen(_) | State::SynRcvd(_) | State::SynSent(_) => {
2726 unreachable!("encountered an already-handled state: {:?}", self)
2728 }
2729 State::Established(Established { snd, rcv }) => {
2730 maybe_ack_to_text(rcv.get_mut(), snd.rtt_estimator.rto())
2731 }
2732 State::FinWait1(FinWait1 { snd, rcv }) => {
2733 maybe_ack_to_text(rcv.get_mut(), snd.rtt_estimator.rto())
2734 }
2735 State::FinWait2(FinWait2 { last_seq: _, rcv, timeout_at: _ }) => {
2736 maybe_ack_to_text(rcv, Rto::DEFAULT)
2737 }
2738 State::CloseWait(CloseWait { closed_rcv, .. })
2739 | State::LastAck(LastAck { closed_rcv, .. })
2740 | State::Closing(Closing { closed_rcv, .. })
2741 | State::TimeWait(TimeWait { closed_rcv, .. }) => {
2742 (None, closed_rcv.ack)
2746 }
2747 };
2748 let ack_to_fin = if control == Some(Control::FIN) && rcv_nxt == seg_seq + data.len() {
2751 match self {
2756 State::Closed(_) | State::Listen(_) | State::SynRcvd(_) | State::SynSent(_) => {
2757 unreachable!("encountered an already-handled state: {:?}", self)
2759 }
2760 State::Established(Established { snd, rcv }) => {
2761 let params = rcv.handle_fin();
2764 let segment = params.make_ack(snd_max);
2765 let closewait =
2766 CloseWait { snd: snd.to_ref().to_takeable(), closed_rcv: params };
2767 assert_eq!(
2768 self.transition_to_state(counters, State::CloseWait(closewait)),
2769 NewlyClosed::No
2770 );
2771 Some(segment)
2772 }
2773 State::CloseWait(_) | State::LastAck(_) | State::Closing(_) => {
2774 None
2782 }
2783 State::FinWait1(FinWait1 { snd, rcv }) => {
2784 let params = rcv.handle_fin();
2785 let segment = params.make_ack(snd_max);
2786 let closing = Closing { snd: snd.to_ref().take(), closed_rcv: params };
2787 assert_eq!(
2788 self.transition_to_state(counters, State::Closing(closing)),
2789 NewlyClosed::No
2790 );
2791 Some(segment)
2792 }
2793 State::FinWait2(FinWait2 { last_seq, rcv, timeout_at: _ }) => {
2794 let params = rcv.handle_fin();
2795 let segment = params.make_ack(snd_max);
2796 let timewait = TimeWait {
2797 last_seq: *last_seq,
2798 expiry: new_time_wait_expiry(now),
2799 closed_rcv: params,
2800 };
2801 assert_eq!(
2802 self.transition_to_state(counters, State::TimeWait(timewait)),
2803 NewlyClosed::No,
2804 );
2805 Some(segment)
2806 }
2807 State::TimeWait(TimeWait { last_seq, expiry, closed_rcv }) => {
2808 *expiry = new_time_wait_expiry(now);
2813 Some(closed_rcv.make_ack(*last_seq))
2814 }
2815 }
2816 } else {
2817 None
2818 };
2819 (ack_to_fin.or(ack_to_text), NewlyClosed::No)
2822 })();
2823 (seg, passive_open, data_acked, newly_closed)
2824 }
2825
2826 pub(crate) fn poll_receive_data_dequeued(&mut self) -> Option<Segment<()>> {
2830 let (rcv, snd_max) = match self {
2831 State::Closed(_)
2832 | State::Listen(_)
2833 | State::SynRcvd(_)
2834 | State::SynSent(_)
2835 | State::CloseWait(_)
2836 | State::LastAck(_)
2837 | State::Closing(_)
2838 | State::TimeWait(_) => return None,
2839 State::Established(Established { snd, rcv }) => (rcv.get_mut(), snd.max),
2840 State::FinWait1(FinWait1 { snd, rcv }) => (rcv.get_mut(), snd.max),
2841 State::FinWait2(FinWait2 { last_seq, rcv, timeout_at: _ }) => (rcv, *last_seq),
2842 };
2843
2844 rcv.poll_receive_data_dequeued(snd_max)
2845 }
2846
2847 pub(crate) fn poll_send(
2855 &mut self,
2856 id: &impl StateMachineDebugId,
2857 counters: &TcpCountersRefs<'_>,
2858 limit: u32,
2859 now: I,
2860 socket_options: &SocketOptions,
2861 ) -> Result<Segment<S::Payload<'_>>, NewlyClosed> {
2862 let newly_closed = self.poll_close(counters, now, socket_options);
2863 if matches!(self, State::Closed(_)) {
2864 return Err(newly_closed);
2865 }
2866 fn poll_rcv_then_snd<
2867 'a,
2868 I: Instant,
2869 R: ReceiveBuffer,
2870 S: SendBuffer,
2871 M: StateMachineDebugId,
2872 const FIN_QUEUED: bool,
2873 >(
2874 id: &M,
2875 counters: &TcpCountersRefs<'_>,
2876 snd: &'a mut Send<I, S, FIN_QUEUED>,
2877 rcv: &'a mut Recv<I, R>,
2878 limit: u32,
2879 now: I,
2880 socket_options: &SocketOptions,
2881 ) -> Option<Segment<S::Payload<'a>>> {
2882 let seg = rcv
2888 .poll_send(snd.max, now)
2889 .map(|seg| seg.into_empty())
2890 .or_else(|| snd.poll_send(id, counters, &mut *rcv, limit, now, socket_options));
2891 if seg.is_some() && matches!(rcv.timer, Some(ReceiveTimer::DelayedAck { .. })) {
2893 rcv.timer = None;
2894 }
2895 seg
2896 }
2897 let seg = match self {
2898 State::SynSent(SynSent {
2899 iss,
2900 timestamp,
2901 retrans_timer,
2902 active_open: _,
2903 buffer_sizes: _,
2904 device_mss,
2905 default_mss: _,
2906 rcv_wnd_scale,
2907 }) => (retrans_timer.at <= now).then(|| {
2908 *timestamp = None;
2909 retrans_timer.backoff(now);
2910 Segment::syn(
2911 *iss,
2912 UnscaledWindowSize::from(u16::MAX),
2913 HandshakeOptions {
2914 mss: Some(*device_mss),
2915 window_scale: Some(*rcv_wnd_scale),
2916 sack_permitted: SACK_PERMITTED,
2917 }
2918 .into(),
2919 )
2920 }),
2921 State::SynRcvd(SynRcvd {
2922 iss,
2923 irs,
2924 timestamp,
2925 retrans_timer,
2926 simultaneous_open: _,
2927 buffer_sizes: _,
2928 smss,
2929 rcv_wnd_scale,
2930 snd_wnd_scale,
2931 sack_permitted: _,
2932 }) => (retrans_timer.at <= now).then(|| {
2933 *timestamp = None;
2934 retrans_timer.backoff(now);
2935 Segment::syn_ack(
2936 *iss,
2937 *irs + 1,
2938 UnscaledWindowSize::from(u16::MAX),
2939 HandshakeOptions {
2940 mss: Some(*smss),
2941 window_scale: snd_wnd_scale.map(|_| *rcv_wnd_scale),
2942 sack_permitted: SACK_PERMITTED,
2943 }
2944 .into(),
2945 )
2946 }),
2947 State::Established(Established { snd, rcv }) => {
2948 poll_rcv_then_snd(id, counters, snd, rcv, limit, now, socket_options)
2949 }
2950 State::CloseWait(CloseWait { snd, closed_rcv }) => {
2951 snd.poll_send(id, counters, &*closed_rcv, limit, now, socket_options)
2952 }
2953 State::LastAck(LastAck { snd, closed_rcv })
2954 | State::Closing(Closing { snd, closed_rcv }) => {
2955 snd.poll_send(id, counters, &*closed_rcv, limit, now, socket_options)
2956 }
2957 State::FinWait1(FinWait1 { snd, rcv }) => {
2958 poll_rcv_then_snd(id, counters, snd, rcv, limit, now, socket_options)
2959 }
2960 State::FinWait2(FinWait2 { last_seq, rcv, timeout_at: _ }) => {
2961 rcv.poll_send(*last_seq, now).map(|seg| seg.into_empty())
2962 }
2963 State::Closed(_) | State::Listen(_) | State::TimeWait(_) => None,
2964 };
2965 seg.ok_or(NewlyClosed::No)
2966 }
2967
2968 fn poll_close(
2972 &mut self,
2973 counters: &TcpCountersRefs<'_>,
2974 now: I,
2975 SocketOptions {
2976 keep_alive,
2977 nagle_enabled: _,
2978 user_timeout: _,
2979 delayed_ack: _,
2980 fin_wait2_timeout: _,
2981 max_syn_retries: _,
2982 ip_options: _,
2983 }: &SocketOptions,
2984 ) -> NewlyClosed {
2985 let timed_out = match self {
2986 State::Established(Established { snd, rcv: _ }) => snd.timed_out(now, keep_alive),
2987 State::CloseWait(CloseWait { snd, closed_rcv: _ }) => snd.timed_out(now, keep_alive),
2988 State::LastAck(LastAck { snd, closed_rcv: _ })
2989 | State::Closing(Closing { snd, closed_rcv: _ }) => snd.timed_out(now, keep_alive),
2990 State::FinWait1(FinWait1 { snd, rcv: _ }) => snd.timed_out(now, keep_alive),
2991 State::SynSent(SynSent {
2992 iss: _,
2993 timestamp: _,
2994 retrans_timer,
2995 active_open: _,
2996 buffer_sizes: _,
2997 device_mss: _,
2998 default_mss: _,
2999 rcv_wnd_scale: _,
3000 })
3001 | State::SynRcvd(SynRcvd {
3002 iss: _,
3003 irs: _,
3004 timestamp: _,
3005 retrans_timer,
3006 simultaneous_open: _,
3007 buffer_sizes: _,
3008 smss: _,
3009 rcv_wnd_scale: _,
3010 snd_wnd_scale: _,
3011 sack_permitted: _,
3012 }) => retrans_timer.timed_out(now),
3013
3014 State::Closed(_) | State::Listen(_) | State::TimeWait(_) => false,
3015 State::FinWait2(FinWait2 { last_seq: _, rcv: _, timeout_at }) => {
3016 timeout_at.map(|at| now >= at).unwrap_or(false)
3017 }
3018 };
3019 if timed_out {
3020 return self.transition_to_state(
3021 counters,
3022 State::Closed(Closed { reason: Some(ConnectionError::TimedOut) }),
3023 );
3024 } else if let State::TimeWait(tw) = self {
3025 if tw.expiry <= now {
3026 return self.transition_to_state(counters, State::Closed(Closed { reason: None }));
3027 }
3028 }
3029 NewlyClosed::No
3030 }
3031
3032 pub(crate) fn poll_send_at(&self) -> Option<I> {
3051 let combine_expiry = |e1: Option<I>, e2: Option<I>| match (e1, e2) {
3052 (None, None) => None,
3053 (None, Some(e2)) => Some(e2),
3054 (Some(e1), None) => Some(e1),
3055 (Some(e1), Some(e2)) => Some(e1.min(e2)),
3056 };
3057 match self {
3058 State::Established(Established { snd, rcv }) => combine_expiry(
3059 snd.timer.as_ref().map(SendTimer::expiry),
3060 rcv.timer.as_ref().map(ReceiveTimer::expiry),
3061 ),
3062 State::CloseWait(CloseWait { snd, closed_rcv: _ }) => Some(snd.timer?.expiry()),
3063 State::LastAck(LastAck { snd, closed_rcv: _ })
3064 | State::Closing(Closing { snd, closed_rcv: _ }) => Some(snd.timer?.expiry()),
3065 State::FinWait1(FinWait1 { snd, rcv }) => combine_expiry(
3066 snd.timer.as_ref().map(SendTimer::expiry),
3067 rcv.timer.as_ref().map(ReceiveTimer::expiry),
3068 ),
3069 State::FinWait2(FinWait2 { last_seq: _, rcv, timeout_at }) => {
3070 combine_expiry(*timeout_at, rcv.timer.as_ref().map(ReceiveTimer::expiry))
3071 }
3072 State::SynRcvd(syn_rcvd) => Some(syn_rcvd.retrans_timer.at),
3073 State::SynSent(syn_sent) => Some(syn_sent.retrans_timer.at),
3074 State::Closed(_) | State::Listen(_) => None,
3075 State::TimeWait(TimeWait { last_seq: _, expiry, closed_rcv: _ }) => Some(*expiry),
3076 }
3077 }
3078
3079 pub(super) fn close(
3086 &mut self,
3087 counters: &TcpCountersRefs<'_>,
3088 close_reason: CloseReason<I>,
3089 socket_options: &SocketOptions,
3090 ) -> Result<NewlyClosed, CloseError>
3091 where
3092 ActiveOpen: IntoBuffers<R, S>,
3093 {
3094 match self {
3095 State::Closed(_) => Err(CloseError::NoConnection),
3096 State::Listen(_) | State::SynSent(_) => {
3097 Ok(self.transition_to_state(counters, State::Closed(Closed { reason: None })))
3098 }
3099 State::SynRcvd(SynRcvd {
3100 iss,
3101 irs,
3102 timestamp: _,
3103 retrans_timer: _,
3104 simultaneous_open,
3105 buffer_sizes,
3106 smss,
3107 rcv_wnd_scale,
3108 snd_wnd_scale,
3109 sack_permitted,
3110 }) => {
3111 let (rcv_buffer, snd_buffer) = simultaneous_open
3131 .take()
3132 .expect(
3133 "a SYN-RCVD state that is in the pending queue \
3134 should call abort instead of close",
3135 )
3136 .into_buffers(*buffer_sizes);
3137 let (snd_wnd_scale, rcv_wnd_scale) = snd_wnd_scale
3141 .map(|snd_wnd_scale| (snd_wnd_scale, *rcv_wnd_scale))
3142 .unwrap_or_default();
3143 let finwait1 = FinWait1 {
3144 snd: Send {
3145 nxt: *iss + 1,
3146 max: *iss + 1,
3147 una: *iss + 1,
3148 wnd: WindowSize::DEFAULT,
3149 wl1: *iss,
3150 wl2: *irs,
3151 buffer: snd_buffer,
3152 rtt_sampler: RttSampler::default(),
3153 rtt_estimator: Estimator::NoSample,
3154 timer: None,
3155 congestion_control: CongestionControl::cubic_with_mss(*smss),
3156 wnd_scale: snd_wnd_scale,
3157 wnd_max: WindowSize::DEFAULT,
3158 }
3159 .into(),
3160 rcv: Recv {
3161 buffer: RecvBufferState::Open {
3162 buffer: rcv_buffer,
3163 assembler: Assembler::new(*irs + 1),
3164 },
3165 timer: None,
3166 mss: *smss,
3167 remaining_quickacks: quickack_counter(buffer_sizes.rcv_limits(), *smss),
3168 last_segment_at: None,
3169 wnd_scale: rcv_wnd_scale,
3170 last_window_update: (*irs + 1, buffer_sizes.rwnd()),
3171 sack_permitted: *sack_permitted,
3172 }
3173 .into(),
3174 };
3175 Ok(self.transition_to_state(counters, State::FinWait1(finwait1)))
3176 }
3177 State::Established(Established { snd, rcv }) => {
3178 let finwait1 = FinWait1 {
3184 snd: snd.to_ref().take().queue_fin().into(),
3185 rcv: rcv.to_ref().to_takeable(),
3186 };
3187 Ok(self.transition_to_state(counters, State::FinWait1(finwait1)))
3188 }
3189 State::CloseWait(CloseWait { snd, closed_rcv }) => {
3190 let lastack = LastAck {
3191 snd: snd.to_ref().take().queue_fin(),
3192 closed_rcv: closed_rcv.clone(),
3193 };
3194 Ok(self.transition_to_state(counters, State::LastAck(lastack)))
3195 }
3196 State::LastAck(_) | State::FinWait1(_) | State::Closing(_) | State::TimeWait(_) => {
3197 Err(CloseError::Closing)
3198 }
3199 State::FinWait2(FinWait2 { last_seq: _, rcv: _, timeout_at }) => {
3200 if let (CloseReason::Close { now }, Some(fin_wait2_timeout)) =
3201 (close_reason, socket_options.fin_wait2_timeout)
3202 {
3203 assert_eq!(timeout_at.replace(now.saturating_add(fin_wait2_timeout)), None);
3204 }
3205 Err(CloseError::Closing)
3206 }
3207 }
3208 }
3209
3210 pub(super) fn shutdown_recv(&mut self) -> Result<(), CloseError> {
3211 match self {
3212 State::Closed(_) => Err(CloseError::NoConnection),
3213
3214 State::Listen(_)
3215 | State::SynSent(_)
3216 | State::SynRcvd(_)
3217 | State::CloseWait(_)
3218 | State::LastAck(_)
3219 | State::Closing(_)
3220 | State::TimeWait(_) => Ok(()),
3221
3222 State::Established(Established { rcv, .. }) | State::FinWait1(FinWait1 { rcv, .. }) => {
3224 rcv.buffer.close();
3225 Ok(())
3226 }
3227 State::FinWait2(FinWait2 { rcv, .. }) => {
3228 rcv.buffer.close();
3229 Ok(())
3230 }
3231 }
3232 }
3233
3234 pub(crate) fn abort(
3237 &mut self,
3238 counters: &TcpCountersRefs<'_>,
3239 ) -> (Option<Segment<()>>, NewlyClosed) {
3240 let reply = match self {
3241 State::Closed(_)
3254 | State::Listen(_)
3255 | State::SynSent(_)
3256 | State::Closing(_)
3257 | State::LastAck(_)
3258 | State::TimeWait(_) => None,
3259 State::SynRcvd(SynRcvd {
3271 iss,
3272 irs,
3273 timestamp: _,
3274 retrans_timer: _,
3275 simultaneous_open: _,
3276 buffer_sizes: _,
3277 smss: _,
3278 rcv_wnd_scale: _,
3279 snd_wnd_scale: _,
3280 sack_permitted: _,
3281 }) => {
3282 Some(Segment::rst_ack(*iss + 1, *irs + 1))
3285 }
3286 State::Established(Established { snd, rcv }) => {
3287 Some(Segment::rst_ack(snd.nxt, rcv.nxt()))
3288 }
3289 State::FinWait1(FinWait1 { snd, rcv }) => Some(Segment::rst_ack(snd.nxt, rcv.nxt())),
3290 State::FinWait2(FinWait2 { rcv, last_seq, timeout_at: _ }) => {
3291 Some(Segment::rst_ack(*last_seq, rcv.nxt()))
3292 }
3293 State::CloseWait(CloseWait { snd, closed_rcv }) => {
3294 Some(Segment::rst_ack(snd.nxt, closed_rcv.ack))
3295 }
3296 };
3297 (
3298 reply,
3299 self.transition_to_state(
3300 counters,
3301 State::Closed(Closed { reason: Some(ConnectionError::ConnectionReset) }),
3302 ),
3303 )
3304 }
3305
3306 pub(crate) fn buffers_mut(&mut self) -> BuffersRefMut<'_, R, S> {
3307 match self {
3308 State::TimeWait(_) | State::Closed(_) => BuffersRefMut::NoBuffers,
3309 State::Listen(Listen { buffer_sizes, .. })
3310 | State::SynRcvd(SynRcvd { buffer_sizes, .. })
3311 | State::SynSent(SynSent { buffer_sizes, .. }) => BuffersRefMut::Sizes(buffer_sizes),
3312 State::Established(Established { snd, rcv }) => match &mut rcv.buffer {
3313 RecvBufferState::Open { buffer: ref mut recv_buf, .. } => {
3314 BuffersRefMut::Both { send: &mut snd.buffer, recv: recv_buf }
3315 }
3316 RecvBufferState::Closed { .. } => BuffersRefMut::SendOnly(&mut snd.buffer),
3317 },
3318 State::FinWait1(FinWait1 { snd, rcv }) => match &mut rcv.buffer {
3319 RecvBufferState::Open { buffer: ref mut recv_buf, .. } => {
3320 BuffersRefMut::Both { send: &mut snd.buffer, recv: recv_buf }
3321 }
3322 RecvBufferState::Closed { .. } => BuffersRefMut::SendOnly(&mut snd.buffer),
3323 },
3324 State::FinWait2(FinWait2::<I, R> { rcv, .. }) => match &mut rcv.buffer {
3325 RecvBufferState::Open { buffer: ref mut recv_buf, .. } => {
3326 BuffersRefMut::RecvOnly(recv_buf)
3327 }
3328 RecvBufferState::Closed { .. } => BuffersRefMut::NoBuffers,
3329 },
3330 State::Closing(Closing::<I, S> { snd, .. })
3331 | State::LastAck(LastAck::<I, S> { snd, .. }) => {
3332 BuffersRefMut::SendOnly(&mut snd.buffer)
3333 }
3334 State::CloseWait(CloseWait::<I, S> { snd, .. }) => {
3335 BuffersRefMut::SendOnly(&mut snd.buffer)
3336 }
3337 }
3338 }
3339
3340 pub(super) fn on_icmp_error(
3343 &mut self,
3344 counters: &TcpCountersRefs<'_>,
3345 err: IcmpErrorCode,
3346 seq: SeqNum,
3347 ) -> (Option<ConnectionError>, NewlyClosed) {
3348 let Some(err) = ConnectionError::try_from_icmp_error(err) else {
3349 return (None, NewlyClosed::No);
3350 };
3351 let connect_error = match self {
3372 State::Closed(_) => None,
3373 State::Listen(listen) => unreachable!(
3374 "ICMP errors should not be delivered on a listener, received code {:?} on {:?}",
3375 err, listen
3376 ),
3377 State::SynRcvd(SynRcvd {
3378 iss,
3379 irs: _,
3380 timestamp: _,
3381 retrans_timer: _,
3382 simultaneous_open: _,
3383 buffer_sizes: _,
3384 smss: _,
3385 rcv_wnd_scale: _,
3386 snd_wnd_scale: _,
3387 sack_permitted: _,
3388 })
3389 | State::SynSent(SynSent {
3390 iss,
3391 timestamp: _,
3392 retrans_timer: _,
3393 active_open: _,
3394 buffer_sizes: _,
3395 device_mss: _,
3396 default_mss: _,
3397 rcv_wnd_scale: _,
3398 }) => {
3399 if *iss == seq {
3400 return (
3401 None,
3402 self.transition_to_state(
3403 counters,
3404 State::Closed(Closed { reason: Some(err) }),
3405 ),
3406 );
3407 }
3408 None
3409 }
3410 State::Established(Established { snd, rcv: _ })
3411 | State::CloseWait(CloseWait { snd, closed_rcv: _ }) => {
3412 (!snd.una.after(seq) && seq.before(snd.nxt)).then_some(err)
3413 }
3414 State::LastAck(LastAck { snd, closed_rcv: _ })
3415 | State::Closing(Closing { snd, closed_rcv: _ }) => {
3416 (!snd.una.after(seq) && seq.before(snd.nxt)).then_some(err)
3417 }
3418 State::FinWait1(FinWait1 { snd, rcv: _ }) => {
3419 (!snd.una.after(seq) && seq.before(snd.nxt)).then_some(err)
3420 }
3421 State::FinWait2(_) | State::TimeWait(_) => None,
3424 };
3425 (connect_error, NewlyClosed::No)
3426 }
3427}
3428
3429pub(super) enum CloseReason<I: Instant> {
3433 Shutdown,
3434 Close { now: I },
3435}
3436
3437#[cfg(test)]
3438mod test {
3439 use alloc::vec;
3440 use core::fmt::Debug;
3441 use core::num::NonZeroU16;
3442 use core::time::Duration;
3443
3444 use assert_matches::assert_matches;
3445 use net_types::ip::Ipv4;
3446 use netstack3_base::testutil::{FakeInstant, FakeInstantCtx};
3447 use netstack3_base::{FragmentedPayload, InstantContext as _, Options, SackBlock};
3448 use test_case::test_case;
3449
3450 use super::*;
3451 use crate::internal::base::testutil::{
3452 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE, DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE_USIZE,
3453 };
3454 use crate::internal::base::DEFAULT_FIN_WAIT2_TIMEOUT;
3455 use crate::internal::buffer::testutil::RingBuffer;
3456 use crate::internal::buffer::Buffer;
3457 use crate::internal::counters::testutil::CounterExpectations;
3458 use crate::internal::counters::TcpCountersWithSocketInner;
3459
3460 const ISS_1: SeqNum = SeqNum::new(100);
3461 const ISS_2: SeqNum = SeqNum::new(300);
3462
3463 const RTT: Duration = Duration::from_millis(500);
3464
3465 const DEVICE_MAXIMUM_SEGMENT_SIZE: Mss = Mss(NonZeroU16::new(1400 as u16).unwrap());
3466
3467 fn default_quickack_counter() -> usize {
3468 quickack_counter(
3469 BufferLimits { capacity: WindowSize::DEFAULT.into(), len: 0 },
3470 DEVICE_MAXIMUM_SEGMENT_SIZE,
3471 )
3472 }
3473
3474 impl SocketOptions {
3475 fn default_for_state_tests() -> Self {
3476 Self { delayed_ack: false, nagle_enabled: false, ..Default::default() }
3479 }
3480 }
3481
3482 enum ClientlessBufferProvider {}
3485
3486 impl<R: ReceiveBuffer + Default, S: SendBuffer + Default> BufferProvider<R, S>
3487 for ClientlessBufferProvider
3488 {
3489 type PassiveOpen = ();
3490 type ActiveOpen = ();
3491
3492 fn new_passive_open_buffers(_buffer_sizes: BufferSizes) -> (R, S, Self::PassiveOpen) {
3493 (R::default(), S::default(), ())
3494 }
3495 }
3496
3497 impl RingBuffer {
3498 fn with_data<'a>(cap: usize, data: &'a [u8]) -> Self {
3499 let mut buffer = RingBuffer::new(cap);
3500 let nwritten = buffer.write_at(0, &data);
3501 assert_eq!(nwritten, data.len());
3502 buffer.make_readable(nwritten, false);
3503 buffer
3504 }
3505 }
3506
3507 #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
3509 struct NullBuffer;
3510
3511 impl Buffer for NullBuffer {
3512 fn capacity_range() -> (usize, usize) {
3513 (usize::MIN, usize::MAX)
3514 }
3515
3516 fn limits(&self) -> BufferLimits {
3517 BufferLimits { len: 0, capacity: 0 }
3518 }
3519
3520 fn target_capacity(&self) -> usize {
3521 0
3522 }
3523
3524 fn request_capacity(&mut self, _size: usize) {}
3525 }
3526
3527 impl ReceiveBuffer for NullBuffer {
3528 fn write_at<P: Payload>(&mut self, _offset: usize, _data: &P) -> usize {
3529 0
3530 }
3531
3532 fn make_readable(&mut self, count: usize, has_outstanding: bool) {
3533 assert_eq!(count, 0);
3534 assert_eq!(has_outstanding, false);
3535 }
3536 }
3537
3538 impl SendBuffer for NullBuffer {
3539 type Payload<'a> = &'a [u8];
3540
3541 fn mark_read(&mut self, count: usize) {
3542 assert_eq!(count, 0);
3543 }
3544
3545 fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
3546 where
3547 F: FnOnce(Self::Payload<'a>) -> R,
3548 {
3549 assert_eq!(offset, 0);
3550 f(&[])
3551 }
3552 }
3553
3554 #[derive(Debug)]
3555 struct FakeStateMachineDebugId;
3556
3557 impl StateMachineDebugId for FakeStateMachineDebugId {
3558 fn trace_id(&self) -> TraceResourceId<'_> {
3559 TraceResourceId::new(0)
3560 }
3561 }
3562
3563 impl<R: ReceiveBuffer, S: SendBuffer> State<FakeInstant, R, S, ()> {
3564 fn poll_send_with_default_options(
3565 &mut self,
3566 mss: u32,
3567 now: FakeInstant,
3568 counters: &TcpCountersRefs<'_>,
3569 ) -> Option<Segment<S::Payload<'_>>> {
3570 self.poll_send(
3571 &FakeStateMachineDebugId,
3572 counters,
3573 mss,
3574 now,
3575 &SocketOptions::default_for_state_tests(),
3576 )
3577 .ok()
3578 }
3579
3580 fn on_segment_with_default_options<P: Payload, BP: BufferProvider<R, S, ActiveOpen = ()>>(
3581 &mut self,
3582 incoming: Segment<P>,
3583 now: FakeInstant,
3584 counters: &TcpCountersRefs<'_>,
3585 ) -> (Option<Segment<()>>, Option<BP::PassiveOpen>)
3586 where
3587 BP::PassiveOpen: Debug,
3588 R: Default,
3589 S: Default,
3590 {
3591 self.on_segment_with_options::<_, BP>(
3592 incoming,
3593 now,
3594 counters,
3595 &SocketOptions::default_for_state_tests(),
3596 )
3597 }
3598
3599 fn on_segment_with_options<P: Payload, BP: BufferProvider<R, S, ActiveOpen = ()>>(
3600 &mut self,
3601 incoming: Segment<P>,
3602 now: FakeInstant,
3603 counters: &TcpCountersRefs<'_>,
3604 options: &SocketOptions,
3605 ) -> (Option<Segment<()>>, Option<BP::PassiveOpen>)
3606 where
3607 BP::PassiveOpen: Debug,
3608 R: Default,
3609 S: Default,
3610 {
3611 let (segment, passive_open, _data_acked, _newly_closed) = self.on_segment::<P, BP>(
3612 &FakeStateMachineDebugId,
3613 counters,
3614 incoming,
3615 now,
3616 options,
3617 false, );
3619 (segment, passive_open)
3620 }
3621
3622 fn recv_mut(&mut self) -> Option<&mut Recv<FakeInstant, R>> {
3623 match self {
3624 State::Closed(_)
3625 | State::Listen(_)
3626 | State::SynRcvd(_)
3627 | State::SynSent(_)
3628 | State::CloseWait(_)
3629 | State::LastAck(_)
3630 | State::Closing(_)
3631 | State::TimeWait(_) => None,
3632 State::Established(Established { rcv, .. })
3633 | State::FinWait1(FinWait1 { rcv, .. }) => Some(rcv.get_mut()),
3634 State::FinWait2(FinWait2 { rcv, .. }) => Some(rcv),
3635 }
3636 }
3637
3638 #[track_caller]
3639 fn assert_established(&mut self) -> &mut Established<FakeInstant, R, S> {
3640 assert_matches!(self, State::Established(e) => e)
3641 }
3642 }
3643
3644 impl<S: SendBuffer + Debug> State<FakeInstant, RingBuffer, S, ()> {
3645 fn read_with(&mut self, f: impl for<'b> FnOnce(&'b [&'_ [u8]]) -> usize) -> usize {
3646 match self {
3647 State::Closed(_)
3648 | State::Listen(_)
3649 | State::SynRcvd(_)
3650 | State::SynSent(_)
3651 | State::CloseWait(_)
3652 | State::LastAck(_)
3653 | State::Closing(_)
3654 | State::TimeWait(_) => {
3655 panic!("No receive state in {:?}", self);
3656 }
3657 State::Established(Established { snd: _, rcv })
3658 | State::FinWait1(FinWait1 { snd: _, rcv }) => {
3659 assert_matches!(&mut rcv.buffer, RecvBufferState::Open{ buffer, .. } => buffer.read_with(f))
3660 }
3661 State::FinWait2(FinWait2 { last_seq: _, rcv, timeout_at: _ }) => {
3662 assert_matches!(&mut rcv.buffer, RecvBufferState::Open{ buffer, .. } => buffer.read_with(f))
3663 }
3664 }
3665 }
3666 }
3667
3668 impl State<FakeInstant, RingBuffer, NullBuffer, ()> {
3669 fn new_syn_rcvd(instant: FakeInstant) -> Self {
3670 State::SynRcvd(SynRcvd {
3671 iss: ISS_2,
3672 irs: ISS_1,
3673 timestamp: Some(instant),
3674 retrans_timer: RetransTimer::new(instant, Rto::DEFAULT, None, DEFAULT_MAX_RETRIES),
3675 simultaneous_open: Some(()),
3676 buffer_sizes: Default::default(),
3677 smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
3678 rcv_wnd_scale: WindowScale::default(),
3679 snd_wnd_scale: Some(WindowScale::default()),
3680 sack_permitted: SACK_PERMITTED,
3681 })
3682 }
3683 }
3684
3685 #[derive(Default)]
3686 struct FakeTcpCounters {
3687 stack_wide: TcpCountersWithSocketInner,
3688 per_socket: TcpCountersWithSocketInner,
3689 }
3690
3691 impl FakeTcpCounters {
3692 fn refs<'a>(&'a self) -> TcpCountersRefs<'a> {
3693 let Self { stack_wide, per_socket } = self;
3694 TcpCountersRefs { stack_wide, per_socket }
3695 }
3696 }
3697
3698 impl CounterExpectations {
3699 #[track_caller]
3700 fn assert_counters(&self, FakeTcpCounters { stack_wide, per_socket }: &FakeTcpCounters) {
3701 assert_eq!(self, &CounterExpectations::from(stack_wide), "stack-wide counter mismatch");
3702 assert_eq!(self, &CounterExpectations::from(per_socket), "per-socket counter mismatch");
3703 }
3704 }
3705
3706 #[test_case(Segment::rst(ISS_1) => None; "drop RST")]
3707 #[test_case(Segment::rst_ack(ISS_1, ISS_2) => None; "drop RST|ACK")]
3708 #[test_case(Segment::syn(ISS_1, UnscaledWindowSize::from(0), HandshakeOptions::default().into()) => Some(Segment::rst_ack(SeqNum::new(0), ISS_1 + 1)); "reset SYN")]
3709 #[test_case(Segment::syn_ack(ISS_1, ISS_2, UnscaledWindowSize::from(0), HandshakeOptions::default().into()) => Some(Segment::rst(ISS_2)); "reset SYN|ACK")]
3710 #[test_case(Segment::data(ISS_1, ISS_2, UnscaledWindowSize::from(0), &[0, 1, 2][..]) => Some(Segment::rst(ISS_2)); "reset data segment")]
3711 fn segment_arrives_when_closed(
3712 incoming: impl Into<Segment<&'static [u8]>>,
3713 ) -> Option<Segment<()>> {
3714 let closed = Closed { reason: () };
3715 closed.on_segment(&incoming.into())
3716 }
3717
3718 #[test_case(
3719 Segment::rst_ack(ISS_2, ISS_1 - 1), RTT
3720 => SynSentOnSegmentDisposition::Ignore; "unacceptable ACK with RST")]
3721 #[test_case(
3722 Segment::ack(ISS_2, ISS_1 - 1, UnscaledWindowSize::from(u16::MAX)), RTT
3723 => SynSentOnSegmentDisposition::SendRst(
3724 Segment::rst(ISS_1-1),
3725 ); "unacceptable ACK without RST")]
3726 #[test_case(
3727 Segment::rst_ack(ISS_2, ISS_1), RTT
3728 => SynSentOnSegmentDisposition::EnterClosed(
3729 Closed { reason: Some(ConnectionError::ConnectionRefused) },
3730 ); "acceptable ACK(ISS) with RST")]
3731 #[test_case(
3732 Segment::rst_ack(ISS_2, ISS_1 + 1), RTT
3733 => SynSentOnSegmentDisposition::EnterClosed(
3734 Closed { reason: Some(ConnectionError::ConnectionRefused) },
3735 ); "acceptable ACK(ISS+1) with RST")]
3736 #[test_case(
3737 Segment::rst(ISS_2), RTT
3738 => SynSentOnSegmentDisposition::Ignore; "RST without ack")]
3739 #[test_case(
3740 Segment::syn(
3741 ISS_2,
3742 UnscaledWindowSize::from(u16::MAX),
3743 HandshakeOptions {
3744 window_scale: Some(WindowScale::default()),
3745 ..Default::default() }.into()
3746 ), RTT
3747 => SynSentOnSegmentDisposition::SendSynAckAndEnterSynRcvd(
3748 Segment::syn_ack(
3749 ISS_1,
3750 ISS_2 + 1,
3751 UnscaledWindowSize::from(u16::MAX),
3752 HandshakeOptions {
3753 mss: Some(Mss::default::<Ipv4>()),
3754 window_scale: Some(WindowScale::default()),
3755 ..Default::default()
3756 }.into()),
3757 SynRcvd {
3758 iss: ISS_1,
3759 irs: ISS_2,
3760 timestamp: Some(FakeInstant::from(RTT)),
3761 retrans_timer: RetransTimer::new(
3762 FakeInstant::from(RTT),
3763 Rto::DEFAULT,
3764 NonZeroDuration::new(TEST_USER_TIMEOUT.get() - RTT),
3765 DEFAULT_MAX_SYNACK_RETRIES
3766 ),
3767 simultaneous_open: None,
3768 buffer_sizes: BufferSizes::default(),
3769 smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
3770 rcv_wnd_scale: WindowScale::default(),
3771 snd_wnd_scale: Some(WindowScale::default()),
3772 sack_permitted: SACK_PERMITTED,
3773 }
3774 ); "SYN only")]
3775 #[test_case(
3776 Segment::fin(ISS_2, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX)), RTT
3777 => SynSentOnSegmentDisposition::Ignore; "acceptable ACK with FIN")]
3778 #[test_case(
3779 Segment::ack(ISS_2, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX)), RTT
3780 => SynSentOnSegmentDisposition::Ignore; "acceptable ACK(ISS+1) with nothing")]
3781 #[test_case(
3782 Segment::ack(ISS_2, ISS_1, UnscaledWindowSize::from(u16::MAX)), RTT
3783 => SynSentOnSegmentDisposition::Ignore; "acceptable ACK(ISS) without RST")]
3784 #[test_case(
3785 Segment::syn(ISS_2, UnscaledWindowSize::from(u16::MAX), HandshakeOptions::default().into()),
3786 TEST_USER_TIMEOUT.get()
3787 => SynSentOnSegmentDisposition::EnterClosed(Closed {
3788 reason: None
3789 }); "syn but timed out")]
3790 fn segment_arrives_when_syn_sent(
3791 incoming: Segment<()>,
3792 delay: Duration,
3793 ) -> SynSentOnSegmentDisposition<FakeInstant, ()> {
3794 let syn_sent = SynSent {
3795 iss: ISS_1,
3796 timestamp: Some(FakeInstant::default()),
3797 retrans_timer: RetransTimer::new(
3798 FakeInstant::default(),
3799 Rto::DEFAULT,
3800 Some(TEST_USER_TIMEOUT),
3801 DEFAULT_MAX_RETRIES,
3802 ),
3803 active_open: (),
3804 buffer_sizes: BufferSizes::default(),
3805 default_mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
3806 device_mss: DEVICE_MAXIMUM_SEGMENT_SIZE,
3807 rcv_wnd_scale: WindowScale::default(),
3808 };
3809 syn_sent.on_segment(incoming, FakeInstant::from(delay))
3810 }
3811
3812 #[test_case(Segment::rst(ISS_2) => ListenOnSegmentDisposition::Ignore; "ignore RST")]
3813 #[test_case(Segment::ack(ISS_2, ISS_1, UnscaledWindowSize::from(u16::MAX)) =>
3814 ListenOnSegmentDisposition::SendRst(Segment::rst(ISS_1)); "reject ACK")]
3815 #[test_case(Segment::syn(ISS_2, UnscaledWindowSize::from(u16::MAX),
3816 HandshakeOptions {
3817 window_scale: Some(WindowScale::default()),
3818 ..Default::default()
3819 }.into()) =>
3820 ListenOnSegmentDisposition::SendSynAckAndEnterSynRcvd(
3821 Segment::syn_ack(
3822 ISS_1,
3823 ISS_2 + 1,
3824 UnscaledWindowSize::from(u16::MAX),
3825 HandshakeOptions {
3826 mss: Some(Mss::default::<Ipv4>()),
3827 window_scale: Some(WindowScale::default()),
3828 ..Default::default()
3829 }.into()),
3830 SynRcvd {
3831 iss: ISS_1,
3832 irs: ISS_2,
3833 timestamp: Some(FakeInstant::default()),
3834 retrans_timer: RetransTimer::new(
3835 FakeInstant::default(),
3836 Rto::DEFAULT,
3837 None,
3838 DEFAULT_MAX_SYNACK_RETRIES,
3839 ),
3840 simultaneous_open: None,
3841 buffer_sizes: BufferSizes::default(),
3842 smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
3843 sack_permitted: SACK_PERMITTED,
3844 rcv_wnd_scale: WindowScale::default(),
3845 snd_wnd_scale: Some(WindowScale::default()),
3846 }); "accept syn")]
3847 fn segment_arrives_when_listen(
3848 incoming: Segment<()>,
3849 ) -> ListenOnSegmentDisposition<FakeInstant> {
3850 let listen = Closed::<Initial>::listen(
3851 ISS_1,
3852 Default::default(),
3853 DEVICE_MAXIMUM_SEGMENT_SIZE,
3854 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
3855 None,
3856 );
3857 listen.on_segment(incoming, FakeInstant::default())
3858 }
3859
3860 #[test_case(
3861 Segment::ack(ISS_1, ISS_2, UnscaledWindowSize::from(u16::MAX)),
3862 None
3863 => Some(
3864 Segment::ack(ISS_2 + 1, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX))
3865 ); "OTW segment")]
3866 #[test_case(
3867 Segment::rst_ack(ISS_1, ISS_2),
3868 None
3869 => None; "OTW RST")]
3870 #[test_case(
3871 Segment::rst_ack(ISS_1 + 1, ISS_2),
3872 Some(State::Closed(Closed { reason: Some(ConnectionError::ConnectionReset) }))
3873 => None; "acceptable RST")]
3874 #[test_case(
3875 Segment::syn(ISS_1 + 1, UnscaledWindowSize::from(u16::MAX),
3876 HandshakeOptions { window_scale: Some(WindowScale::default()), ..Default::default() }.into()),
3877 Some(State::Closed(Closed { reason: Some(ConnectionError::ConnectionReset) }))
3878 => Some(
3879 Segment::rst(ISS_2 + 1)
3880 ); "duplicate syn")]
3881 #[test_case(
3882 Segment::ack(ISS_1 + 1, ISS_2, UnscaledWindowSize::from(u16::MAX)),
3883 None
3884 => Some(
3885 Segment::rst(ISS_2)
3886 ); "unacceptable ack (ISS)")]
3887 #[test_case(
3888 Segment::ack(ISS_1 + 1, ISS_2 + 1, UnscaledWindowSize::from(u16::MAX)),
3889 Some(State::Established(
3890 Established {
3891 snd: Send {
3892 nxt: ISS_2 + 1,
3893 max: ISS_2 + 1,
3894 una: ISS_2 + 1,
3895 wnd: WindowSize::DEFAULT,
3896 wnd_max: WindowSize::DEFAULT,
3897 buffer: NullBuffer,
3898 wl1: ISS_1 + 1,
3899 wl2: ISS_2 + 1,
3900 rtt_estimator: Estimator::Measured {
3901 srtt: RTT,
3902 rtt_var: RTT / 2,
3903 },
3904 rtt_sampler: RttSampler::default(),
3905 timer: None,
3906 congestion_control: CongestionControl::cubic_with_mss(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
3907 wnd_scale: WindowScale::default(),
3908 }.into(),
3909 rcv: Recv {
3910 buffer: RecvBufferState::Open {
3911 buffer: RingBuffer::default(),
3912 assembler: Assembler::new(ISS_1 + 1),
3913 },
3914 remaining_quickacks: quickack_counter(BufferLimits {
3915 capacity: WindowSize::DEFAULT.into(),
3916 len: 0,
3917 }, DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
3918 last_segment_at: None,
3919 timer: None,
3920 mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
3921 wnd_scale: WindowScale::default(),
3922 last_window_update: (ISS_1 + 1, WindowSize::DEFAULT),
3923 sack_permitted: SACK_PERMITTED,
3924 }.into(),
3925 }
3926 ))
3927 => None; "acceptable ack (ISS + 1)")]
3928 #[test_case(
3929 Segment::ack(ISS_1 + 1, ISS_2 + 2, UnscaledWindowSize::from(u16::MAX)),
3930 None
3931 => Some(
3932 Segment::rst(ISS_2 + 2)
3933 ); "unacceptable ack (ISS + 2)")]
3934 #[test_case(
3935 Segment::ack(ISS_1 + 1, ISS_2 - 1, UnscaledWindowSize::from(u16::MAX)),
3936 None
3937 => Some(
3938 Segment::rst(ISS_2 - 1)
3939 ); "unacceptable ack (ISS - 1)")]
3940 #[test_case(
3941 Segment::new(ISS_1 + 1, None, None, UnscaledWindowSize::from(u16::MAX)),
3942 None
3943 => None; "no ack")]
3944 #[test_case(
3945 Segment::fin(ISS_1 + 1, ISS_2 + 1, UnscaledWindowSize::from(u16::MAX)),
3946 Some(State::CloseWait(CloseWait {
3947 snd: Send {
3948 nxt: ISS_2 + 1,
3949 max: ISS_2 + 1,
3950 una: ISS_2 + 1,
3951 wnd: WindowSize::DEFAULT,
3952 wnd_max: WindowSize::DEFAULT,
3953 buffer: NullBuffer,
3954 wl1: ISS_1 + 1,
3955 wl2: ISS_2 + 1,
3956 rtt_estimator: Estimator::Measured{
3957 srtt: RTT,
3958 rtt_var: RTT / 2,
3959 },
3960 rtt_sampler: RttSampler::default(),
3961 timer: None,
3962 congestion_control: CongestionControl::cubic_with_mss(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
3963 wnd_scale: WindowScale::default(),
3964 }.into(),
3965 closed_rcv: RecvParams {
3966 ack: ISS_1 + 2,
3967 wnd: WindowSize::from_u32(u32::from(u16::MAX - 1)).unwrap(),
3968 wnd_scale: WindowScale::ZERO,
3969 }
3970 }))
3971 => Some(
3972 Segment::ack(ISS_2 + 1, ISS_1 + 2, UnscaledWindowSize::from(u16::MAX - 1))
3973 ); "fin")]
3974 fn segment_arrives_when_syn_rcvd(
3975 incoming: Segment<()>,
3976 expected: Option<State<FakeInstant, RingBuffer, NullBuffer, ()>>,
3977 ) -> Option<Segment<()>> {
3978 let mut clock = FakeInstantCtx::default();
3979 let counters = FakeTcpCounters::default();
3980 let mut state = State::new_syn_rcvd(clock.now());
3981 clock.sleep(RTT);
3982 let (seg, _passive_open) = state
3983 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
3984 incoming,
3985 clock.now(),
3986 &counters.refs(),
3987 );
3988 match expected {
3989 Some(new_state) => assert_eq!(new_state, state),
3990 None => assert_matches!(state, State::SynRcvd(_)),
3991 };
3992 seg
3993 }
3994
3995 #[test]
3996 fn abort_when_syn_rcvd() {
3997 let clock = FakeInstantCtx::default();
3998 let counters = FakeTcpCounters::default();
3999 let mut state = State::new_syn_rcvd(clock.now());
4000 let segment = assert_matches!(
4001 state.abort(&counters.refs()),
4002 (Some(seg), NewlyClosed::Yes) => seg
4003 );
4004 assert_eq!(segment.header.control, Some(Control::RST));
4005 assert_eq!(segment.header.seq, ISS_2 + 1);
4006 assert_eq!(segment.header.ack, Some(ISS_1 + 1));
4007 }
4008
4009 #[test_case(
4010 Segment::syn(ISS_2 + 1, UnscaledWindowSize::from(u16::MAX), HandshakeOptions::default().into()),
4011 Some(State::Closed (
4012 Closed { reason: Some(ConnectionError::ConnectionReset) },
4013 ))
4014 => Some(Segment::rst(ISS_1 + 1)); "duplicate syn")]
4015 #[test_case(
4016 Segment::rst(ISS_2 + 1),
4017 Some(State::Closed (
4018 Closed { reason: Some(ConnectionError::ConnectionReset) },
4019 ))
4020 => None; "accepatable rst")]
4021 #[test_case(
4022 Segment::ack(ISS_2 + 1, ISS_1 + 2, UnscaledWindowSize::from(u16::MAX)),
4023 None
4024 => Some(
4025 Segment::ack(ISS_1 + 1, ISS_2 + 1, UnscaledWindowSize::from(2))
4026 ); "unacceptable ack")]
4027 #[test_case(
4028 Segment::ack(ISS_2 + 1, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX)),
4029 None
4030 => None; "pure ack")]
4031 #[test_case(
4032 Segment::fin(ISS_2 + 1, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX)),
4033 Some(State::CloseWait(CloseWait {
4034 snd: Send {
4035 nxt: ISS_1 + 1,
4036 max: ISS_1 + 1,
4037 una: ISS_1 + 1,
4038 wnd: WindowSize::DEFAULT,
4039 wnd_max: WindowSize::DEFAULT,
4040 buffer: NullBuffer,
4041 wl1: ISS_2 + 1,
4042 wl2: ISS_1 + 1,
4043 rtt_estimator: Estimator::default(),
4044 rtt_sampler: RttSampler::default(),
4045 timer: None,
4046 congestion_control: CongestionControl::cubic_with_mss(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
4047 wnd_scale: WindowScale::default(),
4048 }.into(),
4049 closed_rcv: RecvParams {
4050 ack: ISS_2 + 2,
4051 wnd: WindowSize::new(1).unwrap(),
4052 wnd_scale: WindowScale::ZERO,
4053 }
4054 }))
4055 => Some(
4056 Segment::ack(ISS_1 + 1, ISS_2 + 2, UnscaledWindowSize::from(1))
4057 ); "pure fin")]
4058 #[test_case(
4059 Segment::piggybacked_fin(ISS_2 + 1, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX), "A".as_bytes()),
4060 Some(State::CloseWait(CloseWait {
4061 snd: Send {
4062 nxt: ISS_1 + 1,
4063 max: ISS_1 + 1,
4064 una: ISS_1 + 1,
4065 wnd: WindowSize::DEFAULT,
4066 wnd_max: WindowSize::DEFAULT,
4067 buffer: NullBuffer,
4068 wl1: ISS_2 + 1,
4069 wl2: ISS_1 + 1,
4070 rtt_estimator: Estimator::default(),
4071 rtt_sampler: RttSampler::default(),
4072 timer: None,
4073 congestion_control: CongestionControl::cubic_with_mss(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
4074 wnd_scale: WindowScale::default(),
4075 }.into(),
4076 closed_rcv: RecvParams {
4077 ack: ISS_2 + 3,
4078 wnd: WindowSize::ZERO,
4079 wnd_scale: WindowScale::ZERO,
4080 }
4081 }))
4082 => Some(
4083 Segment::ack(ISS_1 + 1, ISS_2 + 3, UnscaledWindowSize::from(0))
4084 ); "fin with 1 byte")]
4085 #[test_case(
4086 Segment::piggybacked_fin(ISS_2 + 1, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX), "AB".as_bytes()),
4087 None
4088 => Some(
4089 Segment::ack(ISS_1 + 1, ISS_2 + 3, UnscaledWindowSize::from(0))
4090 ); "fin with 2 bytes")]
4091 fn segment_arrives_when_established(
4092 incoming: Segment<&[u8]>,
4093 expected: Option<State<FakeInstant, RingBuffer, NullBuffer, ()>>,
4094 ) -> Option<Segment<()>> {
4095 let counters = FakeTcpCounters::default();
4096 let mut state = State::Established(Established {
4097 snd: Send {
4098 nxt: ISS_1 + 1,
4099 max: ISS_1 + 1,
4100 una: ISS_1 + 1,
4101 wnd: WindowSize::DEFAULT,
4102 wnd_max: WindowSize::DEFAULT,
4103 buffer: NullBuffer,
4104 wl1: ISS_2 + 1,
4105 wl2: ISS_1 + 1,
4106 rtt_estimator: Estimator::default(),
4107 rtt_sampler: RttSampler::default(),
4108 timer: None,
4109 congestion_control: CongestionControl::cubic_with_mss(
4110 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4111 ),
4112 wnd_scale: WindowScale::default(),
4113 }
4114 .into(),
4115 rcv: Recv {
4116 buffer: RecvBufferState::Open {
4117 buffer: RingBuffer::new(2),
4118 assembler: Assembler::new(ISS_2 + 1),
4119 },
4120 timer: None,
4121 remaining_quickacks: 0,
4122 last_segment_at: None,
4123 mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4124 wnd_scale: WindowScale::default(),
4125 last_window_update: (ISS_2 + 1, WindowSize::new(2).unwrap()),
4126 sack_permitted: SACK_PERMITTED,
4127 }
4128 .into(),
4129 });
4130 let (seg, passive_open) = state
4131 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
4132 incoming,
4133 FakeInstant::default(),
4134 &counters.refs(),
4135 );
4136 assert_eq!(passive_open, None);
4137 match expected {
4138 Some(new_state) => assert_eq!(new_state, state),
4139 None => assert_matches!(state, State::Established(_)),
4140 };
4141 seg
4142 }
4143
4144 #[test]
4145 fn common_rcv_data_segment_arrives() {
4146 let counters = FakeTcpCounters::default();
4147 let new_snd = || Send {
4150 nxt: ISS_1 + 1,
4151 max: ISS_1 + 1,
4152 una: ISS_1 + 1,
4153 wnd: WindowSize::DEFAULT,
4154 wnd_max: WindowSize::DEFAULT,
4155 buffer: NullBuffer,
4156 wl1: ISS_2 + 1,
4157 wl2: ISS_1 + 1,
4158 rtt_estimator: Estimator::default(),
4159 rtt_sampler: RttSampler::default(),
4160 timer: None,
4161 congestion_control: CongestionControl::cubic_with_mss(
4162 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4163 ),
4164 wnd_scale: WindowScale::default(),
4165 };
4166 let new_rcv = || Recv {
4167 buffer: RecvBufferState::Open {
4168 buffer: RingBuffer::new(TEST_BYTES.len()),
4169 assembler: Assembler::new(ISS_2 + 1),
4170 },
4171 timer: None,
4172 remaining_quickacks: 0,
4173 last_segment_at: None,
4174 mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4175 wnd_scale: WindowScale::default(),
4176 last_window_update: (ISS_2 + 1, WindowSize::new(TEST_BYTES.len()).unwrap()),
4177 sack_permitted: SACK_PERMITTED,
4178 };
4179 for mut state in [
4180 State::Established(Established { snd: new_snd().into(), rcv: new_rcv().into() }),
4181 State::FinWait1(FinWait1 { snd: new_snd().queue_fin().into(), rcv: new_rcv().into() }),
4182 State::FinWait2(FinWait2 { last_seq: ISS_1 + 1, rcv: new_rcv(), timeout_at: None }),
4183 ] {
4184 assert_eq!(
4185 state.on_segment_with_default_options::<_, ClientlessBufferProvider>(
4186 Segment::data(
4187 ISS_2 + 1,
4188 ISS_1 + 1,
4189 UnscaledWindowSize::from(u16::MAX),
4190 TEST_BYTES
4191 ),
4192 FakeInstant::default(),
4193 &counters.refs(),
4194 ),
4195 (
4196 Some(Segment::ack(
4197 ISS_1 + 1,
4198 ISS_2 + 1 + TEST_BYTES.len(),
4199 UnscaledWindowSize::from(0)
4200 )),
4201 None
4202 )
4203 );
4204 assert_eq!(
4205 state.read_with(|bytes| {
4206 assert_eq!(bytes.concat(), TEST_BYTES);
4207 TEST_BYTES.len()
4208 }),
4209 TEST_BYTES.len()
4210 );
4211 }
4212 }
4213
4214 #[test]
4215 fn common_snd_ack_segment_arrives() {
4216 let counters = FakeTcpCounters::default();
4217 let new_snd = || Send {
4220 nxt: ISS_1 + 1,
4221 max: ISS_1 + 1,
4222 una: ISS_1 + 1,
4223 wnd: WindowSize::DEFAULT,
4224 wnd_max: WindowSize::DEFAULT,
4225 buffer: RingBuffer::with_data(TEST_BYTES.len(), TEST_BYTES),
4226 wl1: ISS_2 + 1,
4227 wl2: ISS_1 + 1,
4228 rtt_estimator: Estimator::default(),
4229 rtt_sampler: RttSampler::default(),
4230 timer: None,
4231 congestion_control: CongestionControl::cubic_with_mss(
4232 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4233 ),
4234 wnd_scale: WindowScale::default(),
4235 };
4236 let new_rcv = || Recv {
4237 buffer: RecvBufferState::Open {
4238 buffer: NullBuffer,
4239 assembler: Assembler::new(ISS_2 + 1),
4240 },
4241 timer: None,
4242 remaining_quickacks: 0,
4243 last_segment_at: None,
4244 mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4245 wnd_scale: WindowScale::default(),
4246 last_window_update: (ISS_2 + 1, WindowSize::ZERO),
4247 sack_permitted: SACK_PERMITTED,
4248 };
4249 for mut state in [
4250 State::Established(Established { snd: new_snd().into(), rcv: new_rcv().into() }),
4251 State::FinWait1(FinWait1 { snd: new_snd().queue_fin().into(), rcv: new_rcv().into() }),
4252 State::Closing(Closing {
4253 snd: new_snd().queue_fin(),
4254 closed_rcv: RecvParams {
4255 ack: ISS_2 + 1,
4256 wnd: WindowSize::ZERO,
4257 wnd_scale: WindowScale::default(),
4258 },
4259 }),
4260 State::CloseWait(CloseWait {
4261 snd: new_snd().into(),
4262 closed_rcv: RecvParams {
4263 ack: ISS_2 + 1,
4264 wnd: WindowSize::ZERO,
4265 wnd_scale: WindowScale::default(),
4266 },
4267 }),
4268 State::LastAck(LastAck {
4269 snd: new_snd().queue_fin(),
4270 closed_rcv: RecvParams {
4271 ack: ISS_2 + 1,
4272 wnd: WindowSize::ZERO,
4273 wnd_scale: WindowScale::default(),
4274 },
4275 }),
4276 ] {
4277 assert_eq!(
4278 state.poll_send_with_default_options(
4279 u32::try_from(TEST_BYTES.len()).unwrap(),
4280 FakeInstant::default(),
4281 &counters.refs(),
4282 ),
4283 Some(Segment::data(
4284 ISS_1 + 1,
4285 ISS_2 + 1,
4286 UnscaledWindowSize::from(0),
4287 FragmentedPayload::new_contiguous(TEST_BYTES)
4288 ))
4289 );
4290 assert_eq!(
4291 state.on_segment_with_default_options::<_, ClientlessBufferProvider>(
4292 Segment::<()>::ack(
4293 ISS_2 + 1,
4294 ISS_1 + 1 + TEST_BYTES.len(),
4295 UnscaledWindowSize::from(u16::MAX)
4296 ),
4297 FakeInstant::default(),
4298 &counters.refs(),
4299 ),
4300 (None, None),
4301 );
4302 assert_eq!(state.poll_send_at(), None);
4303 let snd = match state {
4304 State::Closed(_)
4305 | State::Listen(_)
4306 | State::SynRcvd(_)
4307 | State::SynSent(_)
4308 | State::FinWait2(_)
4309 | State::TimeWait(_) => unreachable!("Unexpected state {:?}", state),
4310 State::Established(e) => e.snd.into_inner().queue_fin(),
4311 State::CloseWait(c) => c.snd.into_inner().queue_fin(),
4312 State::LastAck(l) => l.snd,
4313 State::FinWait1(f) => f.snd.into_inner(),
4314 State::Closing(c) => c.snd,
4315 };
4316 assert_eq!(snd.nxt, ISS_1 + 1 + TEST_BYTES.len());
4317 assert_eq!(snd.max, ISS_1 + 1 + TEST_BYTES.len());
4318 assert_eq!(snd.una, ISS_1 + 1 + TEST_BYTES.len());
4319 assert_eq!(snd.buffer.limits().len, 0);
4320 }
4321 }
4322
4323 #[test_case(
4324 Segment::syn(ISS_2 + 2, UnscaledWindowSize::from(u16::MAX),
4325 HandshakeOptions::default().into()),
4326 Some(State::Closed (
4327 Closed { reason: Some(ConnectionError::ConnectionReset) },
4328 ))
4329 => Some(Segment::rst(ISS_1 + 1)); "syn")]
4330 #[test_case(
4331 Segment::rst(ISS_2 + 2),
4332 Some(State::Closed (
4333 Closed { reason: Some(ConnectionError::ConnectionReset) },
4334 ))
4335 => None; "rst")]
4336 #[test_case(
4337 Segment::fin(ISS_2 + 2, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX)),
4338 None
4339 => None; "ignore fin")]
4340 #[test_case(
4341 Segment::data(ISS_2, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX), "a".as_bytes()),
4342 None => Some(Segment::ack(ISS_1 + 1, ISS_2 + 2, UnscaledWindowSize::from(u16::MAX)));
4343 "ack old data")]
4344 #[test_case(
4345 Segment::data(ISS_2 + 2, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX), "Hello".as_bytes()),
4346 Some(State::Closed (
4347 Closed { reason: Some(ConnectionError::ConnectionReset) },
4348 ))
4349 => Some(Segment::rst(ISS_1 + 1)); "reset on new data")]
4350 fn segment_arrives_when_close_wait(
4351 incoming: Segment<&[u8]>,
4352 expected: Option<State<FakeInstant, RingBuffer, NullBuffer, ()>>,
4353 ) -> Option<Segment<()>> {
4354 let counters = FakeTcpCounters::default();
4355 let mut state = State::CloseWait(CloseWait {
4356 snd: Send {
4357 nxt: ISS_1 + 1,
4358 max: ISS_1 + 1,
4359 una: ISS_1 + 1,
4360 wnd: WindowSize::DEFAULT,
4361 wnd_max: WindowSize::DEFAULT,
4362 buffer: NullBuffer,
4363 wl1: ISS_2 + 1,
4364 wl2: ISS_1 + 1,
4365 rtt_estimator: Estimator::default(),
4366 rtt_sampler: RttSampler::default(),
4367 timer: None,
4368 congestion_control: CongestionControl::cubic_with_mss(
4369 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4370 ),
4371 wnd_scale: WindowScale::default(),
4372 }
4373 .into(),
4374 closed_rcv: RecvParams {
4375 ack: ISS_2 + 2,
4376 wnd: WindowSize::DEFAULT,
4377 wnd_scale: WindowScale::ZERO,
4378 },
4379 });
4380 let (seg, _passive_open) = state
4381 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
4382 incoming,
4383 FakeInstant::default(),
4384 &counters.refs(),
4385 );
4386 match expected {
4387 Some(new_state) => assert_eq!(new_state, state),
4388 None => assert_matches!(state, State::CloseWait(_)),
4389 };
4390 seg
4391 }
4392
4393 #[test_case(true; "sack")]
4394 #[test_case(false; "no sack")]
4395 fn active_passive_open(sack_permitted: bool) {
4396 let mut clock = FakeInstantCtx::default();
4397 let counters = FakeTcpCounters::default();
4398 let (syn_sent, mut syn_seg) = Closed::<Initial>::connect(
4399 ISS_1,
4400 clock.now(),
4401 (),
4402 Default::default(),
4403 DEVICE_MAXIMUM_SEGMENT_SIZE,
4404 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4405 &SocketOptions::default_for_state_tests(),
4406 );
4407 assert_eq!(
4408 syn_seg,
4409 Segment::syn(
4410 ISS_1,
4411 UnscaledWindowSize::from(u16::MAX),
4412 HandshakeOptions {
4413 mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE),
4414 window_scale: Some(WindowScale::default()),
4415 sack_permitted: SACK_PERMITTED,
4417 }
4418 .into(),
4419 )
4420 );
4421 assert_eq!(
4422 syn_sent,
4423 SynSent {
4424 iss: ISS_1,
4425 timestamp: Some(clock.now()),
4426 retrans_timer: RetransTimer::new(
4427 clock.now(),
4428 Rto::DEFAULT,
4429 None,
4430 DEFAULT_MAX_SYN_RETRIES,
4431 ),
4432 active_open: (),
4433 buffer_sizes: BufferSizes::default(),
4434 default_mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4435 device_mss: DEVICE_MAXIMUM_SEGMENT_SIZE,
4436 rcv_wnd_scale: WindowScale::default(),
4437 }
4438 );
4439 let mut active = State::SynSent(syn_sent);
4440 let mut passive = State::Listen(Closed::<Initial>::listen(
4441 ISS_2,
4442 Default::default(),
4443 DEVICE_MAXIMUM_SEGMENT_SIZE,
4444 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4445 None,
4446 ));
4447 clock.sleep(RTT / 2);
4448
4449 {
4451 let opt = assert_matches!(&mut syn_seg.header.options, Options::Handshake(o) => o);
4452 opt.sack_permitted = sack_permitted;
4453 }
4454
4455 let (seg, passive_open) = passive
4456 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
4457 syn_seg,
4458 clock.now(),
4459 &counters.refs(),
4460 );
4461 let mut syn_ack = seg.expect("failed to generate a syn-ack segment");
4462 assert_eq!(passive_open, None);
4463 assert_eq!(
4464 syn_ack,
4465 Segment::syn_ack(
4466 ISS_2,
4467 ISS_1 + 1,
4468 UnscaledWindowSize::from(u16::MAX),
4469 HandshakeOptions {
4470 mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE),
4471 window_scale: Some(WindowScale::default()),
4472 sack_permitted: SACK_PERMITTED,
4474 }
4475 .into(),
4476 )
4477 );
4478 assert_matches!(passive, State::SynRcvd(ref syn_rcvd) if syn_rcvd == &SynRcvd {
4479 iss: ISS_2,
4480 irs: ISS_1,
4481 timestamp: Some(clock.now()),
4482 retrans_timer: RetransTimer::new(
4483 clock.now(),
4484 Rto::DEFAULT,
4485 None,
4486 DEFAULT_MAX_SYNACK_RETRIES,
4487 ),
4488 simultaneous_open: None,
4489 buffer_sizes: Default::default(),
4490 smss: DEVICE_MAXIMUM_SEGMENT_SIZE,
4491 rcv_wnd_scale: WindowScale::default(),
4492 snd_wnd_scale: Some(WindowScale::default()),
4493 sack_permitted,
4494 });
4495 clock.sleep(RTT / 2);
4496
4497 {
4499 let opt = assert_matches!(&mut syn_ack.header.options, Options::Handshake(o) => o);
4500 opt.sack_permitted = sack_permitted;
4501 }
4502
4503 let (seg, passive_open) = active
4504 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
4505 syn_ack,
4506 clock.now(),
4507 &counters.refs(),
4508 );
4509 let ack_seg = seg.expect("failed to generate a ack segment");
4510 assert_eq!(passive_open, None);
4511 assert_eq!(ack_seg, Segment::ack(ISS_1 + 1, ISS_2 + 1, UnscaledWindowSize::from(u16::MAX)));
4512 let established = assert_matches!(&active, State::Established(e) => e);
4513 assert_eq!(
4514 established,
4515 &Established {
4516 snd: Send {
4517 nxt: ISS_1 + 1,
4518 max: ISS_1 + 1,
4519 una: ISS_1 + 1,
4520 wnd: WindowSize::DEFAULT,
4521 wnd_max: WindowSize::DEFAULT,
4522 buffer: RingBuffer::default(),
4523 wl1: ISS_2,
4524 wl2: ISS_1 + 1,
4525 rtt_estimator: Estimator::Measured { srtt: RTT, rtt_var: RTT / 2 },
4526 rtt_sampler: RttSampler::default(),
4527 timer: None,
4528 congestion_control: CongestionControl::cubic_with_mss(
4529 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE
4530 ),
4531 wnd_scale: WindowScale::default(),
4532 }
4533 .into(),
4534 rcv: Recv {
4535 buffer: RecvBufferState::Open {
4536 buffer: RingBuffer::default(),
4537 assembler: Assembler::new(ISS_2 + 1),
4538 },
4539 timer: None,
4540 remaining_quickacks: default_quickack_counter(),
4541 last_segment_at: None,
4542 mss: DEVICE_MAXIMUM_SEGMENT_SIZE,
4543 wnd_scale: WindowScale::default(),
4544 last_window_update: (ISS_2 + 1, WindowSize::DEFAULT),
4545 sack_permitted,
4546 }
4547 .into()
4548 }
4549 );
4550 clock.sleep(RTT / 2);
4551 assert_eq!(
4552 passive.on_segment_with_default_options::<_, ClientlessBufferProvider>(
4553 ack_seg,
4554 clock.now(),
4555 &counters.refs(),
4556 ),
4557 (None, Some(())),
4558 );
4559 let established = assert_matches!(&passive, State::Established(e) => e);
4560 assert_eq!(
4561 established,
4562 &Established {
4563 snd: Send {
4564 nxt: ISS_2 + 1,
4565 max: ISS_2 + 1,
4566 una: ISS_2 + 1,
4567 wnd: WindowSize::DEFAULT,
4568 wnd_max: WindowSize::DEFAULT,
4569 buffer: RingBuffer::default(),
4570 wl1: ISS_1 + 1,
4571 wl2: ISS_2 + 1,
4572 rtt_estimator: Estimator::Measured { srtt: RTT, rtt_var: RTT / 2 },
4573 rtt_sampler: RttSampler::default(),
4574 timer: None,
4575 congestion_control: CongestionControl::cubic_with_mss(
4576 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE
4577 ),
4578 wnd_scale: WindowScale::default(),
4579 }
4580 .into(),
4581 rcv: Recv {
4582 buffer: RecvBufferState::Open {
4583 buffer: RingBuffer::default(),
4584 assembler: Assembler::new(ISS_1 + 1),
4585 },
4586 timer: None,
4587 remaining_quickacks: default_quickack_counter(),
4588 last_segment_at: None,
4589 mss: DEVICE_MAXIMUM_SEGMENT_SIZE,
4590 wnd_scale: WindowScale::default(),
4591 last_window_update: (ISS_1 + 1, WindowSize::DEFAULT),
4592 sack_permitted,
4593 }
4594 .into()
4595 }
4596 )
4597 }
4598
4599 #[test]
4600 fn simultaneous_open() {
4601 let mut clock = FakeInstantCtx::default();
4602 let counters = FakeTcpCounters::default();
4603 let (syn_sent1, syn1) = Closed::<Initial>::connect(
4604 ISS_1,
4605 clock.now(),
4606 (),
4607 Default::default(),
4608 DEVICE_MAXIMUM_SEGMENT_SIZE,
4609 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4610 &SocketOptions::default_for_state_tests(),
4611 );
4612 let (syn_sent2, syn2) = Closed::<Initial>::connect(
4613 ISS_2,
4614 clock.now(),
4615 (),
4616 Default::default(),
4617 DEVICE_MAXIMUM_SEGMENT_SIZE,
4618 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4619 &SocketOptions::default_for_state_tests(),
4620 );
4621
4622 assert_eq!(
4623 syn1,
4624 Segment::syn(
4625 ISS_1,
4626 UnscaledWindowSize::from(u16::MAX),
4627 HandshakeOptions {
4628 mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE),
4629 window_scale: Some(WindowScale::default()),
4630 sack_permitted: SACK_PERMITTED,
4631 }
4632 .into(),
4633 )
4634 );
4635 assert_eq!(
4636 syn2,
4637 Segment::syn(
4638 ISS_2,
4639 UnscaledWindowSize::from(u16::MAX),
4640 HandshakeOptions {
4641 mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE),
4642 window_scale: Some(WindowScale::default()),
4643 sack_permitted: SACK_PERMITTED,
4644 }
4645 .into(),
4646 )
4647 );
4648
4649 let mut state1 = State::SynSent(syn_sent1);
4650 let mut state2 = State::SynSent(syn_sent2);
4651
4652 clock.sleep(RTT);
4653 let (seg, passive_open) = state1
4654 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
4655 syn2,
4656 clock.now(),
4657 &counters.refs(),
4658 );
4659 let syn_ack1 = seg.expect("failed to generate syn ack");
4660 assert_eq!(passive_open, None);
4661 let (seg, passive_open) = state2
4662 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
4663 syn1,
4664 clock.now(),
4665 &counters.refs(),
4666 );
4667 let syn_ack2 = seg.expect("failed to generate syn ack");
4668 assert_eq!(passive_open, None);
4669
4670 assert_eq!(
4671 syn_ack1,
4672 Segment::syn_ack(
4673 ISS_1,
4674 ISS_2 + 1,
4675 UnscaledWindowSize::from(u16::MAX),
4676 HandshakeOptions {
4677 mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE),
4678 window_scale: Some(WindowScale::default()),
4679 sack_permitted: SACK_PERMITTED,
4680 }
4681 .into()
4682 )
4683 );
4684 assert_eq!(
4685 syn_ack2,
4686 Segment::syn_ack(
4687 ISS_2,
4688 ISS_1 + 1,
4689 UnscaledWindowSize::from(u16::MAX),
4690 HandshakeOptions {
4691 mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE),
4692 window_scale: Some(WindowScale::default()),
4693 sack_permitted: SACK_PERMITTED,
4694 }
4695 .into()
4696 )
4697 );
4698
4699 assert_matches!(state1, State::SynRcvd(ref syn_rcvd) if syn_rcvd == &SynRcvd {
4700 iss: ISS_1,
4701 irs: ISS_2,
4702 timestamp: Some(clock.now()),
4703 retrans_timer: RetransTimer::new(
4704 clock.now(),
4705 Rto::DEFAULT,
4706 None,
4707 DEFAULT_MAX_SYNACK_RETRIES,
4708 ),
4709 simultaneous_open: Some(()),
4710 buffer_sizes: BufferSizes::default(),
4711 smss: DEVICE_MAXIMUM_SEGMENT_SIZE,
4712 rcv_wnd_scale: WindowScale::default(),
4713 snd_wnd_scale: Some(WindowScale::default()),
4714 sack_permitted: SACK_PERMITTED,
4715 });
4716 assert_matches!(state2, State::SynRcvd(ref syn_rcvd) if syn_rcvd == &SynRcvd {
4717 iss: ISS_2,
4718 irs: ISS_1,
4719 timestamp: Some(clock.now()),
4720 retrans_timer: RetransTimer::new(
4721 clock.now(),
4722 Rto::DEFAULT,
4723 None,
4724 DEFAULT_MAX_SYNACK_RETRIES,
4725 ),
4726 simultaneous_open: Some(()),
4727 buffer_sizes: BufferSizes::default(),
4728 smss: DEVICE_MAXIMUM_SEGMENT_SIZE,
4729 rcv_wnd_scale: WindowScale::default(),
4730 snd_wnd_scale: Some(WindowScale::default()),
4731 sack_permitted: SACK_PERMITTED,
4732 });
4733
4734 clock.sleep(RTT);
4735 assert_eq!(
4736 state1.on_segment_with_default_options::<_, ClientlessBufferProvider>(
4737 syn_ack2,
4738 clock.now(),
4739 &counters.refs(),
4740 ),
4741 (Some(Segment::ack(ISS_1 + 1, ISS_2 + 1, UnscaledWindowSize::from(u16::MAX))), None)
4742 );
4743 assert_eq!(
4744 state2.on_segment_with_default_options::<_, ClientlessBufferProvider>(
4745 syn_ack1,
4746 clock.now(),
4747 &counters.refs(),
4748 ),
4749 (Some(Segment::ack(ISS_2 + 1, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX))), None)
4750 );
4751
4752 let established = assert_matches!(state1, State::Established(e) => e);
4753 assert_eq!(
4754 established,
4755 Established {
4756 snd: Send {
4757 nxt: ISS_1 + 1,
4758 max: ISS_1 + 1,
4759 una: ISS_1 + 1,
4760 wnd: WindowSize::DEFAULT,
4761 wnd_max: WindowSize::DEFAULT,
4762 buffer: RingBuffer::default(),
4763 wl1: ISS_2 + 1,
4764 wl2: ISS_1 + 1,
4765 rtt_estimator: Estimator::Measured { srtt: RTT, rtt_var: RTT / 2 },
4766 rtt_sampler: RttSampler::default(),
4767 timer: None,
4768 congestion_control: CongestionControl::cubic_with_mss(
4769 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE
4770 ),
4771 wnd_scale: WindowScale::default(),
4772 }
4773 .into(),
4774 rcv: Recv {
4775 buffer: RecvBufferState::Open {
4776 buffer: RingBuffer::default(),
4777 assembler: Assembler::new(ISS_2 + 1),
4778 },
4779 timer: None,
4780 remaining_quickacks: default_quickack_counter() - 1,
4781 last_segment_at: Some(clock.now()),
4782 mss: DEVICE_MAXIMUM_SEGMENT_SIZE,
4783 wnd_scale: WindowScale::default(),
4784 last_window_update: (ISS_2 + 1, WindowSize::DEFAULT),
4785 sack_permitted: SACK_PERMITTED,
4786 }
4787 .into()
4788 }
4789 );
4790
4791 let established = assert_matches!(state2, State::Established(e) => e);
4792 assert_eq!(
4793 established,
4794 Established {
4795 snd: Send {
4796 nxt: ISS_2 + 1,
4797 max: ISS_2 + 1,
4798 una: ISS_2 + 1,
4799 wnd: WindowSize::DEFAULT,
4800 wnd_max: WindowSize::DEFAULT,
4801 buffer: RingBuffer::default(),
4802 wl1: ISS_1 + 1,
4803 wl2: ISS_2 + 1,
4804 rtt_estimator: Estimator::Measured { srtt: RTT, rtt_var: RTT / 2 },
4805 rtt_sampler: RttSampler::default(),
4806 timer: None,
4807 congestion_control: CongestionControl::cubic_with_mss(
4808 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE
4809 ),
4810 wnd_scale: WindowScale::default(),
4811 }
4812 .into(),
4813 rcv: Recv {
4814 buffer: RecvBufferState::Open {
4815 buffer: RingBuffer::default(),
4816 assembler: Assembler::new(ISS_1 + 1),
4817 },
4818 timer: None,
4819 remaining_quickacks: default_quickack_counter() - 1,
4820 last_segment_at: Some(clock.now()),
4821 mss: DEVICE_MAXIMUM_SEGMENT_SIZE,
4822 wnd_scale: WindowScale::default(),
4823 last_window_update: (ISS_1 + 1, WindowSize::DEFAULT),
4824 sack_permitted: SACK_PERMITTED,
4825 }
4826 .into()
4827 }
4828 );
4829 }
4830
4831 const BUFFER_SIZE: usize = 16;
4832 const TEST_BYTES: &[u8] = "Hello".as_bytes();
4833
4834 #[test]
4835 fn established_receive() {
4836 let clock = FakeInstantCtx::default();
4837 let counters = FakeTcpCounters::default();
4838 let mut established = State::Established(Established {
4839 snd: Send {
4840 nxt: ISS_1 + 1,
4841 max: ISS_1 + 1,
4842 una: ISS_1 + 1,
4843 wnd: WindowSize::ZERO,
4844 wnd_max: WindowSize::ZERO,
4845 buffer: NullBuffer,
4846 wl1: ISS_2 + 1,
4847 wl2: ISS_1 + 1,
4848 rtt_estimator: Estimator::default(),
4849 rtt_sampler: RttSampler::default(),
4850 timer: None,
4851 congestion_control: CongestionControl::cubic_with_mss(Mss(
4852 NonZeroU16::new(5).unwrap()
4853 )),
4854 wnd_scale: WindowScale::default(),
4855 }
4856 .into(),
4857 rcv: Recv {
4858 buffer: RecvBufferState::Open {
4859 buffer: RingBuffer::new(BUFFER_SIZE),
4860 assembler: Assembler::new(ISS_2 + 1),
4861 },
4862 timer: None,
4863 remaining_quickacks: 0,
4864 last_segment_at: None,
4865 mss: Mss(NonZeroU16::new(5).unwrap()),
4866 wnd_scale: WindowScale::default(),
4867 last_window_update: (ISS_2 + 1, WindowSize::new(BUFFER_SIZE).unwrap()),
4868 sack_permitted: SACK_PERMITTED,
4869 }
4870 .into(),
4871 });
4872
4873 assert_eq!(
4875 established.on_segment_with_default_options::<_, ClientlessBufferProvider>(
4876 Segment::data(ISS_2 + 1, ISS_1 + 1, UnscaledWindowSize::from(0), TEST_BYTES,),
4877 clock.now(),
4878 &counters.refs(),
4879 ),
4880 (
4881 Some(Segment::ack(
4882 ISS_1 + 1,
4883 ISS_2 + 1 + TEST_BYTES.len(),
4884 UnscaledWindowSize::from((BUFFER_SIZE - TEST_BYTES.len()) as u16),
4885 )),
4886 None
4887 ),
4888 );
4889 assert_eq!(
4890 established.read_with(|available| {
4891 assert_eq!(available, &[TEST_BYTES]);
4892 available[0].len()
4893 }),
4894 TEST_BYTES.len()
4895 );
4896
4897 assert_eq!(
4899 established.on_segment_with_default_options::<_, ClientlessBufferProvider>(
4900 Segment::data(
4901 ISS_2 + 1 + TEST_BYTES.len() * 2,
4902 ISS_1 + 1,
4903 UnscaledWindowSize::from(0),
4904 TEST_BYTES,
4905 ),
4906 clock.now(),
4907 &counters.refs()
4908 ),
4909 (
4910 Some(Segment::ack(
4911 ISS_1 + 1,
4912 ISS_2 + 1 + TEST_BYTES.len(),
4913 UnscaledWindowSize::from(u16::try_from(BUFFER_SIZE).unwrap()),
4914 )),
4915 None
4916 ),
4917 );
4918 assert_eq!(
4919 established.read_with(|available| {
4920 let empty: &[u8] = &[];
4921 assert_eq!(available, &[empty]);
4922 0
4923 }),
4924 0
4925 );
4926
4927 assert_eq!(
4929 established.on_segment_with_default_options::<_, ClientlessBufferProvider>(
4930 Segment::data(
4931 ISS_2 + 1 + TEST_BYTES.len(),
4932 ISS_1 + 1,
4933 UnscaledWindowSize::from(0),
4934 TEST_BYTES,
4935 ),
4936 clock.now(),
4937 &counters.refs()
4938 ),
4939 (
4940 Some(Segment::ack(
4941 ISS_1 + 1,
4942 ISS_2 + 1 + 3 * TEST_BYTES.len(),
4943 UnscaledWindowSize::from_usize(BUFFER_SIZE - 2 * TEST_BYTES.len()),
4944 )),
4945 None
4946 ),
4947 );
4948 assert_eq!(
4949 established.read_with(|available| {
4950 assert_eq!(available, &[[TEST_BYTES, TEST_BYTES].concat()]);
4951 available[0].len()
4952 }),
4953 10
4954 );
4955 }
4956
4957 #[test]
4958 fn established_send() {
4959 let clock = FakeInstantCtx::default();
4960 let counters = FakeTcpCounters::default();
4961 let mut send_buffer = RingBuffer::new(BUFFER_SIZE);
4962 assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5);
4963 let mut established = State::Established(Established {
4964 snd: Send {
4965 nxt: ISS_1 + 1,
4966 max: ISS_1 + 1,
4967 una: ISS_1,
4968 wnd: WindowSize::ZERO,
4969 wnd_max: WindowSize::ZERO,
4970 buffer: send_buffer,
4971 wl1: ISS_2,
4972 wl2: ISS_1,
4973 rtt_sampler: RttSampler::default(),
4974 rtt_estimator: Estimator::default(),
4975 timer: None,
4976 congestion_control: CongestionControl::cubic_with_mss(
4977 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4978 ),
4979 wnd_scale: WindowScale::default(),
4980 }
4981 .into(),
4982 rcv: Recv {
4983 buffer: RecvBufferState::Open {
4984 buffer: RingBuffer::new(BUFFER_SIZE),
4985 assembler: Assembler::new(ISS_2 + 1),
4986 },
4987 timer: None,
4988 remaining_quickacks: 0,
4989 last_segment_at: None,
4990 mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
4991 wnd_scale: WindowScale::default(),
4992 last_window_update: (ISS_2 + 1, WindowSize::new(BUFFER_SIZE).unwrap()),
4993 sack_permitted: SACK_PERMITTED,
4994 }
4995 .into(),
4996 });
4997 assert_eq!(
4999 established.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5000 None
5001 );
5002 let open_window = |established: &mut State<FakeInstant, RingBuffer, RingBuffer, ()>,
5003 ack: SeqNum,
5004 win: usize,
5005 now: FakeInstant,
5006 counters: &TcpCountersRefs<'_>| {
5007 assert_eq!(
5008 established.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5009 Segment::ack(ISS_2 + 1, ack, UnscaledWindowSize::from_usize(win)),
5010 now,
5011 counters
5012 ),
5013 (None, None),
5014 );
5015 };
5016 open_window(&mut established, ISS_1 + 1, 1, clock.now(), &counters.refs());
5018 assert_eq!(
5019 established.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5020 Some(Segment::data(
5021 ISS_1 + 1,
5022 ISS_2 + 1,
5023 UnscaledWindowSize::from_usize(BUFFER_SIZE),
5024 FragmentedPayload::new_contiguous(&TEST_BYTES[1..2]),
5025 ))
5026 );
5027
5028 open_window(&mut established, ISS_1 + 2, 10, clock.now(), &counters.refs());
5030 assert_eq!(
5031 established.poll_send_with_default_options(2, clock.now(), &counters.refs()),
5032 Some(Segment::data(
5033 ISS_1 + 2,
5034 ISS_2 + 1,
5035 UnscaledWindowSize::from_usize(BUFFER_SIZE),
5036 FragmentedPayload::new_contiguous(&TEST_BYTES[2..4]),
5037 ))
5038 );
5039
5040 assert_eq!(
5041 established.poll_send(
5042 &FakeStateMachineDebugId,
5043 &counters.refs(),
5044 1,
5045 clock.now(),
5046 &SocketOptions { nagle_enabled: false, ..SocketOptions::default_for_state_tests() }
5047 ),
5048 Ok(Segment::data(
5049 ISS_1 + 4,
5050 ISS_2 + 1,
5051 UnscaledWindowSize::from_usize(BUFFER_SIZE),
5052 FragmentedPayload::new_contiguous(&TEST_BYTES[4..5]),
5053 ))
5054 );
5055
5056 assert_eq!(
5058 established.poll_send_with_default_options(1, clock.now(), &counters.refs()),
5059 None
5060 );
5061 }
5062
5063 #[test]
5064 fn self_connect_retransmission() {
5065 let mut clock = FakeInstantCtx::default();
5066 let counters = FakeTcpCounters::default();
5067 let (syn_sent, syn) = Closed::<Initial>::connect(
5068 ISS_1,
5069 clock.now(),
5070 (),
5071 Default::default(),
5072 DEVICE_MAXIMUM_SEGMENT_SIZE,
5073 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
5074 &SocketOptions::default_for_state_tests(),
5075 );
5076 let mut state = State::<_, RingBuffer, RingBuffer, ()>::SynSent(syn_sent);
5077 assert_eq!(state.poll_send_at(), Some(FakeInstant::from(Rto::DEFAULT.get())));
5079 clock.sleep(Rto::DEFAULT.get());
5080 assert_eq!(
5082 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5083 Some(syn.clone().into_empty())
5084 );
5085
5086 let (seg, passive_open) = state
5088 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
5089 syn,
5090 clock.now(),
5091 &counters.refs(),
5092 );
5093 let syn_ack = seg.expect("expected SYN-ACK");
5094 assert_eq!(passive_open, None);
5095 assert_eq!(state.poll_send_at(), Some(clock.now() + Rto::DEFAULT.get()));
5097 clock.sleep(Rto::DEFAULT.get());
5098 assert_eq!(
5100 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5101 Some(syn_ack.clone().into_empty())
5102 );
5103
5104 assert_eq!(
5106 state.on_segment_with_default_options::<_, ClientlessBufferProvider>(
5107 syn_ack,
5108 clock.now(),
5109 &counters.refs(),
5110 ),
5111 (Some(Segment::ack(ISS_1 + 1, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX))), None)
5112 );
5113 match state {
5114 State::Closed(_)
5115 | State::Listen(_)
5116 | State::SynRcvd(_)
5117 | State::SynSent(_)
5118 | State::LastAck(_)
5119 | State::FinWait1(_)
5120 | State::FinWait2(_)
5121 | State::Closing(_)
5122 | State::TimeWait(_) => {
5123 panic!("expected that we have entered established state, but got {:?}", state)
5124 }
5125 State::Established(Established { ref mut snd, rcv: _ })
5126 | State::CloseWait(CloseWait { ref mut snd, closed_rcv: _ }) => {
5127 assert_eq!(snd.buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len());
5128 }
5129 }
5130 assert_eq!(state.poll_send_at(), None);
5132 for i in 0..3 {
5134 assert_eq!(
5135 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5136 Some(Segment::data(
5137 ISS_1 + 1,
5138 ISS_1 + 1,
5139 UnscaledWindowSize::from(u16::MAX),
5140 FragmentedPayload::new_contiguous(TEST_BYTES),
5141 ))
5142 );
5143 assert_eq!(state.poll_send_at(), Some(clock.now() + (1 << i) * Rto::DEFAULT.get()));
5144 clock.sleep((1 << i) * Rto::DEFAULT.get());
5145 CounterExpectations {
5146 retransmits: i,
5147 slow_start_retransmits: i,
5148 timeouts: i,
5149 ..Default::default()
5150 }
5151 .assert_counters(&counters);
5152 }
5153 assert_eq!(
5155 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5156 Segment::ack(
5157 ISS_1 + 1 + TEST_BYTES.len(),
5158 ISS_1 + 1 + 1,
5159 UnscaledWindowSize::from(u16::MAX)
5160 ),
5161 clock.now(),
5162 &counters.refs(),
5163 ),
5164 (None, None),
5165 );
5166 assert_eq!(state.poll_send_at(), Some(clock.now() + Rto::DEFAULT.get()));
5169 clock.sleep(Rto::DEFAULT.get());
5170 assert_eq!(
5171 state.poll_send_with_default_options(1, clock.now(), &counters.refs(),),
5172 Some(Segment::data(
5173 ISS_1 + 1 + 1,
5174 ISS_1 + 1,
5175 UnscaledWindowSize::from(u16::MAX),
5176 FragmentedPayload::new_contiguous(&TEST_BYTES[1..2]),
5177 ))
5178 );
5179 assert_eq!(
5182 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5183 Segment::ack(
5184 ISS_1 + 1 + TEST_BYTES.len(),
5185 ISS_1 + 1 + 3,
5186 UnscaledWindowSize::from(u16::MAX)
5187 ),
5188 clock.now(),
5189 &counters.refs(),
5190 ),
5191 (None, None)
5192 );
5193 CounterExpectations {
5196 retransmits: 3,
5197 slow_start_retransmits: 3,
5198 timeouts: 3,
5199 ..Default::default()
5200 }
5201 .assert_counters(&counters);
5202 assert_eq!(state.poll_send_at(), Some(clock.now() + Rto::DEFAULT.get()));
5203 assert_eq!(
5204 state.poll_send_with_default_options(1, clock.now(), &counters.refs()),
5205 Some(Segment::data(
5206 ISS_1 + 1 + 3,
5207 ISS_1 + 1,
5208 UnscaledWindowSize::from(u16::MAX),
5209 FragmentedPayload::new_contiguous(&TEST_BYTES[3..4]),
5210 ))
5211 );
5212 assert_eq!(
5214 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5215 Segment::ack(
5216 ISS_1 + 1 + TEST_BYTES.len(),
5217 ISS_1 + 1 + TEST_BYTES.len(),
5218 UnscaledWindowSize::from(u16::MAX)
5219 ),
5220 clock.now(),
5221 &counters.refs()
5222 ),
5223 (None, None)
5224 );
5225 assert_eq!(state.poll_send_at(), None);
5227 }
5228
5229 #[test]
5230 fn passive_close() {
5231 let mut clock = FakeInstantCtx::default();
5232 let counters = FakeTcpCounters::default();
5233 let mut send_buffer = RingBuffer::new(BUFFER_SIZE);
5234 assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5);
5235 let mut state = State::Established(Established {
5237 snd: Send {
5238 nxt: ISS_1 + 1,
5239 max: ISS_1 + 1,
5240 una: ISS_1 + 1,
5241 wnd: WindowSize::DEFAULT,
5242 wnd_max: WindowSize::DEFAULT,
5243 buffer: send_buffer.clone(),
5244 wl1: ISS_2,
5245 wl2: ISS_1,
5246 rtt_sampler: RttSampler::default(),
5247 rtt_estimator: Estimator::default(),
5248 timer: None,
5249 congestion_control: CongestionControl::cubic_with_mss(
5250 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
5251 ),
5252 wnd_scale: WindowScale::default(),
5253 }
5254 .into(),
5255 rcv: Recv {
5256 buffer: RecvBufferState::Open {
5257 buffer: RingBuffer::new(BUFFER_SIZE),
5258 assembler: Assembler::new(ISS_2 + 1),
5259 },
5260 timer: None,
5261 remaining_quickacks: 0,
5262 last_segment_at: None,
5263 mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
5264 wnd_scale: WindowScale::default(),
5265 last_window_update: (ISS_2 + 1, WindowSize::new(BUFFER_SIZE).unwrap()),
5266 sack_permitted: SACK_PERMITTED,
5267 }
5268 .into(),
5269 });
5270 let last_wnd = WindowSize::new(BUFFER_SIZE - 1).unwrap();
5271 let last_wnd_scale = WindowScale::default();
5272 assert_eq!(
5274 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5275 Segment::fin(ISS_2 + 1, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX)),
5276 clock.now(),
5277 &counters.refs(),
5278 ),
5279 (
5280 Some(Segment::ack(
5281 ISS_1 + 1,
5282 ISS_2 + 2,
5283 UnscaledWindowSize::from_usize(BUFFER_SIZE - 1)
5284 )),
5285 None
5286 )
5287 );
5288 assert_eq!(
5290 state.close(
5291 &counters.refs(),
5292 CloseReason::Shutdown,
5293 &SocketOptions::default_for_state_tests()
5294 ),
5295 Ok(NewlyClosed::No)
5296 );
5297 assert_eq!(
5298 state,
5299 State::LastAck(LastAck {
5300 snd: Send {
5301 nxt: ISS_1 + 1,
5302 max: ISS_1 + 1,
5303 una: ISS_1 + 1,
5304 wnd: WindowSize::DEFAULT,
5305 wnd_max: WindowSize::DEFAULT,
5306 buffer: send_buffer,
5307 wl1: ISS_2 + 1,
5308 wl2: ISS_1 + 1,
5309 rtt_sampler: RttSampler::default(),
5310 rtt_estimator: Estimator::default(),
5311 timer: None,
5312 congestion_control: CongestionControl::cubic_with_mss(
5313 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE
5314 ),
5315 wnd_scale: WindowScale::default(),
5316 },
5317 closed_rcv: RecvParams { ack: ISS_2 + 2, wnd: last_wnd, wnd_scale: last_wnd_scale }
5318 })
5319 );
5320 assert_eq!(
5322 state.poll_send_with_default_options(2, clock.now(), &counters.refs()),
5323 Some(Segment::data(
5324 ISS_1 + 1,
5325 ISS_2 + 2,
5326 last_wnd >> WindowScale::default(),
5327 FragmentedPayload::new_contiguous(&TEST_BYTES[..2]),
5328 ))
5329 );
5330 assert_eq!(
5332 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5333 Some(Segment::piggybacked_fin(
5334 ISS_1 + 3,
5335 ISS_2 + 2,
5336 last_wnd >> WindowScale::default(),
5337 FragmentedPayload::new_contiguous(&TEST_BYTES[2..]),
5338 ))
5339 );
5340 clock.sleep(RTT);
5342 assert_eq!(
5343 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5344 Segment::ack(
5345 ISS_2 + 2,
5346 ISS_1 + 1 + TEST_BYTES.len(),
5347 UnscaledWindowSize::from(u16::MAX)
5348 ),
5349 clock.now(),
5350 &counters.refs(),
5351 ),
5352 (None, None)
5353 );
5354 assert_eq!(state.poll_send_at(), Some(clock.now() + Rto::DEFAULT.get()));
5355 clock.sleep(Rto::DEFAULT.get());
5356 assert_eq!(
5358 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5359 Some(Segment::fin(
5360 ISS_1 + 1 + TEST_BYTES.len(),
5361 ISS_2 + 2,
5362 last_wnd >> WindowScale::default()
5363 ))
5364 );
5365
5366 assert_eq!(
5368 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5369 Segment::ack(
5370 ISS_2 + 2,
5371 ISS_1 + 1 + TEST_BYTES.len() + 1,
5372 UnscaledWindowSize::from(u16::MAX),
5373 ),
5374 clock.now(),
5375 &counters.refs(),
5376 ),
5377 (None, None)
5378 );
5379 assert_eq!(state, State::Closed(Closed { reason: None }));
5381 CounterExpectations {
5382 retransmits: 1,
5383 slow_start_retransmits: 1,
5384 timeouts: 1,
5385 established_closed: 1,
5386 ..Default::default()
5387 }
5388 .assert_counters(&counters);
5389 }
5390
5391 #[test]
5392 fn syn_rcvd_active_close() {
5393 let counters = FakeTcpCounters::default();
5394 let mut state: State<_, RingBuffer, NullBuffer, ()> = State::SynRcvd(SynRcvd {
5395 iss: ISS_1,
5396 irs: ISS_2,
5397 timestamp: None,
5398 retrans_timer: RetransTimer {
5399 at: FakeInstant::default(),
5400 rto: Rto::MIN,
5401 user_timeout_until: None,
5402 remaining_retries: Some(DEFAULT_MAX_RETRIES),
5403 },
5404 simultaneous_open: Some(()),
5405 buffer_sizes: Default::default(),
5406 smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
5407 rcv_wnd_scale: WindowScale::default(),
5408 snd_wnd_scale: Some(WindowScale::default()),
5409 sack_permitted: SACK_PERMITTED,
5410 });
5411 assert_eq!(
5412 state.close(
5413 &counters.refs(),
5414 CloseReason::Shutdown,
5415 &SocketOptions::default_for_state_tests()
5416 ),
5417 Ok(NewlyClosed::No)
5418 );
5419 assert_matches!(state, State::FinWait1(_));
5420 assert_eq!(
5421 state.poll_send_with_default_options(
5422 u32::MAX,
5423 FakeInstant::default(),
5424 &counters.refs()
5425 ),
5426 Some(Segment::fin(ISS_1 + 1, ISS_2 + 1, UnscaledWindowSize::from(u16::MAX)))
5427 );
5428 }
5429
5430 #[test]
5431 fn established_active_close() {
5432 let mut clock = FakeInstantCtx::default();
5433 let counters = FakeTcpCounters::default();
5434 let mut send_buffer = RingBuffer::new(BUFFER_SIZE);
5435 assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5);
5436 let mut state = State::Established(Established {
5438 snd: Send {
5439 nxt: ISS_1 + 1,
5440 max: ISS_1 + 1,
5441 una: ISS_1 + 1,
5442 wnd: WindowSize::DEFAULT,
5443 wnd_max: WindowSize::DEFAULT,
5444 buffer: send_buffer.clone(),
5445 wl1: ISS_2,
5446 wl2: ISS_1,
5447 rtt_sampler: RttSampler::default(),
5448 rtt_estimator: Estimator::default(),
5449 timer: None,
5450 congestion_control: CongestionControl::cubic_with_mss(Mss(
5451 NonZeroU16::new(5).unwrap()
5452 )),
5453 wnd_scale: WindowScale::default(),
5454 }
5455 .into(),
5456 rcv: Recv {
5457 buffer: RecvBufferState::Open {
5458 buffer: RingBuffer::new(BUFFER_SIZE),
5459 assembler: Assembler::new(ISS_2 + 1),
5460 },
5461 timer: None,
5462 remaining_quickacks: 0,
5463 last_segment_at: None,
5464 mss: Mss(NonZeroU16::new(5).unwrap()),
5465 wnd_scale: WindowScale::default(),
5466 last_window_update: (ISS_2 + 1, WindowSize::new(BUFFER_SIZE).unwrap()),
5467 sack_permitted: SACK_PERMITTED,
5468 }
5469 .into(),
5470 });
5471 assert_eq!(
5472 state.close(
5473 &counters.refs(),
5474 CloseReason::Shutdown,
5475 &SocketOptions::default_for_state_tests()
5476 ),
5477 Ok(NewlyClosed::No)
5478 );
5479 assert_matches!(state, State::FinWait1(_));
5480 assert_eq!(
5481 state.close(
5482 &counters.refs(),
5483 CloseReason::Shutdown,
5484 &SocketOptions::default_for_state_tests()
5485 ),
5486 Err(CloseError::Closing)
5487 );
5488
5489 assert_eq!(
5491 state.poll_send_with_default_options(2, clock.now(), &counters.refs()),
5492 Some(Segment::data(
5493 ISS_1 + 1,
5494 ISS_2 + 1,
5495 UnscaledWindowSize::from_usize(BUFFER_SIZE),
5496 FragmentedPayload::new_contiguous(&TEST_BYTES[..2])
5497 ))
5498 );
5499
5500 assert_eq!(
5502 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5503 Some(Segment::piggybacked_fin(
5504 ISS_1 + 3,
5505 ISS_2 + 1,
5506 UnscaledWindowSize::from_usize(BUFFER_SIZE),
5507 FragmentedPayload::new_contiguous(&TEST_BYTES[2..])
5508 ))
5509 );
5510
5511 assert_eq!(
5513 state.on_segment_with_default_options::<_, ClientlessBufferProvider>(
5514 Segment::data(
5515 ISS_2 + 1,
5516 ISS_1 + 1 + 1,
5517 UnscaledWindowSize::from(u16::MAX),
5518 TEST_BYTES
5519 ),
5520 clock.now(),
5521 &counters.refs(),
5522 ),
5523 (
5524 Some(Segment::ack(
5525 ISS_1 + TEST_BYTES.len() + 2,
5526 ISS_2 + TEST_BYTES.len() + 1,
5527 UnscaledWindowSize::from_usize(BUFFER_SIZE - TEST_BYTES.len()),
5528 )),
5529 None
5530 )
5531 );
5532
5533 assert_eq!(
5534 state.read_with(|avail| {
5535 let got = avail.concat();
5536 assert_eq!(got, TEST_BYTES);
5537 got.len()
5538 }),
5539 TEST_BYTES.len()
5540 );
5541
5542 assert_eq!(state.poll_send_at(), Some(clock.now() + Rto::DEFAULT.get()));
5544
5545 clock.sleep(Rto::DEFAULT.get());
5547 assert_eq!(
5548 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5549 Some(Segment::piggybacked_fin(
5550 ISS_1 + 2,
5551 ISS_2 + TEST_BYTES.len() + 1,
5552 UnscaledWindowSize::from_usize(BUFFER_SIZE),
5553 FragmentedPayload::new_contiguous(&TEST_BYTES[1..]),
5554 ))
5555 );
5556
5557 assert_eq!(
5559 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5560 Segment::ack(
5561 ISS_2 + TEST_BYTES.len() + 1,
5562 ISS_1 + TEST_BYTES.len() + 2,
5563 UnscaledWindowSize::from(u16::MAX)
5564 ),
5565 clock.now(),
5566 &counters.refs(),
5567 ),
5568 (None, None)
5569 );
5570 assert_matches!(state, State::FinWait2(_));
5571
5572 assert_eq!(
5574 state.on_segment_with_default_options::<_, ClientlessBufferProvider>(
5575 Segment::data(
5576 ISS_2 + 1 + TEST_BYTES.len(),
5577 ISS_1 + TEST_BYTES.len() + 2,
5578 UnscaledWindowSize::from(u16::MAX),
5579 TEST_BYTES
5580 ),
5581 clock.now(),
5582 &counters.refs(),
5583 ),
5584 (
5585 Some(Segment::ack(
5586 ISS_1 + TEST_BYTES.len() + 2,
5587 ISS_2 + 2 * TEST_BYTES.len() + 1,
5588 UnscaledWindowSize::from_usize(BUFFER_SIZE - TEST_BYTES.len()),
5589 )),
5590 None
5591 )
5592 );
5593
5594 assert_eq!(
5595 state.read_with(|avail| {
5596 let got = avail.concat();
5597 assert_eq!(got, TEST_BYTES);
5598 got.len()
5599 }),
5600 TEST_BYTES.len()
5601 );
5602
5603 assert_eq!(
5605 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5606 Segment::fin(
5607 ISS_2 + 2 * TEST_BYTES.len() + 1,
5608 ISS_1 + TEST_BYTES.len() + 2,
5609 UnscaledWindowSize::from(u16::MAX)
5610 ),
5611 clock.now(),
5612 &counters.refs(),
5613 ),
5614 (
5615 Some(Segment::ack(
5616 ISS_1 + TEST_BYTES.len() + 2,
5617 ISS_2 + 2 * TEST_BYTES.len() + 2,
5618 UnscaledWindowSize::from_usize(BUFFER_SIZE - 1),
5619 )),
5620 None
5621 )
5622 );
5623
5624 assert_matches!(state, State::TimeWait(_));
5625
5626 const SMALLEST_DURATION: Duration = Duration::from_secs(1);
5627 assert_eq!(state.poll_send_at(), Some(clock.now() + MSL * 2));
5628 clock.sleep(MSL * 2 - SMALLEST_DURATION);
5629 assert_eq!(
5631 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5632 None
5633 );
5634 assert_matches!(state, State::TimeWait(_));
5635 clock.sleep(SMALLEST_DURATION);
5636 assert_eq!(
5638 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5639 None
5640 );
5641 assert_eq!(state, State::Closed(Closed { reason: None }));
5642 CounterExpectations {
5643 retransmits: 1,
5644 slow_start_retransmits: 1,
5645 timeouts: 1,
5646 established_closed: 1,
5647 ..Default::default()
5648 }
5649 .assert_counters(&counters);
5650 }
5651
5652 #[test]
5653 fn fin_wait_1_fin_ack_to_time_wait() {
5654 let counters = FakeTcpCounters::default();
5655 let mut state = State::FinWait1(FinWait1 {
5658 snd: Send {
5659 nxt: ISS_1 + 2,
5660 max: ISS_1 + 2,
5661 una: ISS_1 + 1,
5662 wnd: WindowSize::DEFAULT,
5663 wnd_max: WindowSize::DEFAULT,
5664 buffer: NullBuffer,
5665 wl1: ISS_2,
5666 wl2: ISS_1,
5667 rtt_sampler: RttSampler::default(),
5668 rtt_estimator: Estimator::default(),
5669 timer: None,
5670 congestion_control: CongestionControl::cubic_with_mss(
5671 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
5672 ),
5673 wnd_scale: WindowScale::default(),
5674 }
5675 .into(),
5676 rcv: Recv {
5677 buffer: RecvBufferState::Open {
5678 buffer: RingBuffer::new(BUFFER_SIZE),
5679 assembler: Assembler::new(ISS_2 + 1),
5680 },
5681 timer: None,
5682 remaining_quickacks: 0,
5683 last_segment_at: None,
5684 mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
5685 wnd_scale: WindowScale::default(),
5686 last_window_update: (ISS_2 + 1, WindowSize::new(BUFFER_SIZE).unwrap()),
5687 sack_permitted: SACK_PERMITTED,
5688 }
5689 .into(),
5690 });
5691 assert_eq!(
5692 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5693 Segment::fin(ISS_2 + 1, ISS_1 + 2, UnscaledWindowSize::from(u16::MAX)),
5694 FakeInstant::default(),
5695 &counters.refs(),
5696 ),
5697 (
5698 Some(Segment::ack(
5699 ISS_1 + 2,
5700 ISS_2 + 2,
5701 UnscaledWindowSize::from_usize(BUFFER_SIZE - 1)
5702 )),
5703 None
5704 ),
5705 );
5706 assert_matches!(state, State::TimeWait(_));
5707 }
5708
5709 #[test]
5710 fn simultaneous_close() {
5711 let mut clock = FakeInstantCtx::default();
5712 let counters = FakeTcpCounters::default();
5713 let mut send_buffer = RingBuffer::new(BUFFER_SIZE);
5714 assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5);
5715 let mut state = State::Established(Established {
5717 snd: Send {
5718 nxt: ISS_1 + 1,
5719 max: ISS_1 + 1,
5720 una: ISS_1 + 1,
5721 wnd: WindowSize::DEFAULT,
5722 wnd_max: WindowSize::DEFAULT,
5723 buffer: send_buffer.clone(),
5724 wl1: ISS_2,
5725 wl2: ISS_1,
5726 rtt_sampler: RttSampler::default(),
5727 rtt_estimator: Estimator::default(),
5728 timer: None,
5729 congestion_control: CongestionControl::cubic_with_mss(
5730 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
5731 ),
5732 wnd_scale: WindowScale::default(),
5733 }
5734 .into(),
5735 rcv: Recv {
5736 buffer: RecvBufferState::Open {
5737 buffer: RingBuffer::new(BUFFER_SIZE),
5738 assembler: Assembler::new(ISS_1 + 1),
5739 },
5740 timer: None,
5741 mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
5742 wnd_scale: WindowScale::default(),
5743 last_window_update: (ISS_1 + 1, WindowSize::new(BUFFER_SIZE).unwrap()),
5744 remaining_quickacks: 0,
5745 last_segment_at: None,
5746 sack_permitted: SACK_PERMITTED,
5747 }
5748 .into(),
5749 });
5750 assert_eq!(
5751 state.close(
5752 &counters.refs(),
5753 CloseReason::Shutdown,
5754 &SocketOptions::default_for_state_tests()
5755 ),
5756 Ok(NewlyClosed::No)
5757 );
5758 assert_matches!(state, State::FinWait1(_));
5759 assert_eq!(
5760 state.close(
5761 &counters.refs(),
5762 CloseReason::Shutdown,
5763 &SocketOptions::default_for_state_tests()
5764 ),
5765 Err(CloseError::Closing)
5766 );
5767
5768 let fin = state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs());
5769 assert_eq!(
5770 fin,
5771 Some(Segment::piggybacked_fin(
5772 ISS_1 + 1,
5773 ISS_1 + 1,
5774 UnscaledWindowSize::from_usize(BUFFER_SIZE),
5775 FragmentedPayload::new_contiguous(TEST_BYTES),
5776 ))
5777 );
5778 assert_eq!(
5779 state.on_segment_with_default_options::<_, ClientlessBufferProvider>(
5780 Segment::piggybacked_fin(
5781 ISS_1 + 1,
5782 ISS_1 + 1,
5783 UnscaledWindowSize::from_usize(BUFFER_SIZE),
5784 TEST_BYTES,
5785 ),
5786 clock.now(),
5787 &counters.refs(),
5788 ),
5789 (
5790 Some(Segment::ack(
5791 ISS_1 + TEST_BYTES.len() + 2,
5792 ISS_1 + TEST_BYTES.len() + 2,
5793 UnscaledWindowSize::from_usize(BUFFER_SIZE - TEST_BYTES.len() - 1),
5794 )),
5795 None
5796 )
5797 );
5798
5799 assert_matches!(state, State::Closing(_));
5802 assert_eq!(
5803 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5804 Segment::ack(
5805 ISS_1 + TEST_BYTES.len() + 2,
5806 ISS_1 + TEST_BYTES.len() + 2,
5807 UnscaledWindowSize::from_usize(BUFFER_SIZE - TEST_BYTES.len() - 1),
5808 ),
5809 clock.now(),
5810 &counters.refs(),
5811 ),
5812 (None, None)
5813 );
5814
5815 assert_matches!(state, State::TimeWait(_));
5818
5819 const SMALLEST_DURATION: Duration = Duration::from_secs(1);
5820 assert_eq!(state.poll_send_at(), Some(clock.now() + MSL * 2));
5821 clock.sleep(MSL * 2 - SMALLEST_DURATION);
5822 assert_eq!(
5824 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5825 None
5826 );
5827 assert_matches!(state, State::TimeWait(_));
5828 clock.sleep(SMALLEST_DURATION);
5829 assert_eq!(
5831 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
5832 None
5833 );
5834 assert_eq!(state, State::Closed(Closed { reason: None }));
5835 CounterExpectations { established_closed: 1, ..Default::default() }
5836 .assert_counters(&counters);
5837 }
5838
5839 #[test]
5840 fn time_wait_restarts_timer() {
5841 let mut clock = FakeInstantCtx::default();
5842 let counters = FakeTcpCounters::default();
5843 let mut time_wait = State::<_, NullBuffer, NullBuffer, ()>::TimeWait(TimeWait {
5844 last_seq: ISS_1 + 2,
5845 closed_rcv: RecvParams {
5846 ack: ISS_2 + 2,
5847 wnd: WindowSize::DEFAULT,
5848 wnd_scale: WindowScale::default(),
5849 },
5850 expiry: new_time_wait_expiry(clock.now()),
5851 });
5852
5853 assert_eq!(time_wait.poll_send_at(), Some(clock.now() + MSL * 2));
5854 clock.sleep(Duration::from_secs(1));
5855 assert_eq!(
5856 time_wait.on_segment_with_default_options::<(), ClientlessBufferProvider>(
5857 Segment::fin(ISS_2 + 2, ISS_1 + 2, UnscaledWindowSize::from(u16::MAX)),
5858 clock.now(),
5859 &counters.refs(),
5860 ),
5861 (Some(Segment::ack(ISS_1 + 2, ISS_2 + 2, UnscaledWindowSize::from(u16::MAX))), None),
5862 );
5863 assert_eq!(time_wait.poll_send_at(), Some(clock.now() + MSL * 2));
5864 }
5865
5866 #[test_case(
5867 State::Established(Established {
5868 snd: Send {
5869 nxt: ISS_1,
5870 max: ISS_1,
5871 una: ISS_1,
5872 wnd: WindowSize::DEFAULT,
5873 wnd_max: WindowSize::DEFAULT,
5874 buffer: NullBuffer,
5875 wl1: ISS_2,
5876 wl2: ISS_1,
5877 rtt_estimator: Estimator::Measured { srtt: RTT, rtt_var: RTT / 2 },
5878 rtt_sampler: RttSampler::default(),
5879 timer: None,
5880 congestion_control: CongestionControl::cubic_with_mss(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
5881 wnd_scale: WindowScale::default(),
5882 }.into(),
5883 rcv: Recv {
5884 buffer: RecvBufferState::Open {
5885 buffer: RingBuffer::default(),
5886 assembler: Assembler::new(ISS_2 + 5),
5887 },
5888 timer: None,
5889 mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
5890 wnd_scale: WindowScale::default(),
5891 last_window_update: (ISS_2 + 5, WindowSize::DEFAULT),
5892 remaining_quickacks: 0,
5893 last_segment_at: None,
5894 sack_permitted: SACK_PERMITTED,
5895 }.into(),
5896 }),
5897 Segment::data(ISS_2, ISS_1, UnscaledWindowSize::from(u16::MAX), TEST_BYTES) =>
5898 Some(Segment::ack(ISS_1, ISS_2 + 5, UnscaledWindowSize::from(u16::MAX))); "retransmit data"
5899 )]
5900 #[test_case(
5901 State::SynRcvd(SynRcvd {
5902 iss: ISS_1,
5903 irs: ISS_2,
5904 timestamp: None,
5905 retrans_timer: RetransTimer {
5906 at: FakeInstant::default(),
5907 rto: Rto::MIN,
5908 user_timeout_until: None,
5909 remaining_retries: Some(DEFAULT_MAX_RETRIES),
5910 },
5911 simultaneous_open: None,
5912 buffer_sizes: BufferSizes::default(),
5913 smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
5914 rcv_wnd_scale: WindowScale::default(),
5915 snd_wnd_scale: Some(WindowScale::default()),
5916 sack_permitted: SACK_PERMITTED,
5917 }),
5918 Segment::syn_ack(ISS_2, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX),
5919 HandshakeOptions { window_scale: Some(WindowScale::default()), ..Default::default() }.into()) =>
5920 Some(Segment::ack(ISS_1 + 1, ISS_2 + 1, UnscaledWindowSize::from(u16::MAX))); "retransmit syn_ack"
5921 )]
5922 fn ack_to_retransmitted_segment(
5924 mut state: State<FakeInstant, RingBuffer, NullBuffer, ()>,
5925 seg: Segment<&[u8]>,
5926 ) -> Option<Segment<()>> {
5927 let counters = FakeTcpCounters::default();
5928 let (reply, _): (_, Option<()>) = state
5929 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
5930 seg,
5931 FakeInstant::default(),
5932 &counters.refs(),
5933 );
5934 reply
5935 }
5936
5937 #[test]
5938 fn fast_retransmit() {
5939 let mut clock = FakeInstantCtx::default();
5940 let counters = FakeTcpCounters::default();
5941 let mut send_buffer = RingBuffer::default();
5942 for b in b'A'..=b'D' {
5943 assert_eq!(
5944 send_buffer.enqueue_data(&[b; DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE_USIZE]),
5945 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE_USIZE
5946 );
5947 }
5948 let mut state: State<_, _, _, ()> = State::Established(Established {
5949 snd: Send {
5950 nxt: ISS_1,
5951 max: ISS_1,
5952 una: ISS_1,
5953 wnd: WindowSize::DEFAULT,
5954 wnd_max: WindowSize::DEFAULT,
5955 buffer: send_buffer,
5956 wl1: ISS_2,
5957 wl2: ISS_1,
5958 rtt_estimator: Estimator::Measured { srtt: RTT, rtt_var: RTT / 2 },
5959 rtt_sampler: RttSampler::default(),
5960 timer: None,
5961 congestion_control: CongestionControl::cubic_with_mss(
5962 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
5963 ),
5964 wnd_scale: WindowScale::default(),
5965 }
5966 .into(),
5967 rcv: Recv {
5968 buffer: RecvBufferState::Open {
5969 buffer: RingBuffer::default(),
5970 assembler: Assembler::new(ISS_2),
5971 },
5972 timer: None,
5973 remaining_quickacks: 0,
5974 last_segment_at: None,
5975 mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
5976 wnd_scale: WindowScale::default(),
5977 last_window_update: (ISS_2, WindowSize::DEFAULT),
5978 sack_permitted: SACK_PERMITTED,
5979 }
5980 .into(),
5981 });
5982
5983 assert_eq!(
5984 state.poll_send_with_default_options(
5985 u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
5986 clock.now(),
5987 &counters.refs(),
5988 ),
5989 Some(Segment::data(
5990 ISS_1,
5991 ISS_2,
5992 UnscaledWindowSize::from(u16::MAX),
5993 FragmentedPayload::new_contiguous(&[b'A'; DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE_USIZE])
5994 ))
5995 );
5996
5997 let mut dup_ack = |expected_byte: u8, counters: &TcpCountersRefs<'_>| {
5998 clock.sleep(Duration::from_millis(10));
5999 assert_eq!(
6000 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
6001 Segment::ack(ISS_2, ISS_1, UnscaledWindowSize::from(u16::MAX)),
6002 clock.now(),
6003 counters,
6004 ),
6005 (None, None)
6006 );
6007
6008 assert_eq!(
6009 state.poll_send_with_default_options(
6010 u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
6011 clock.now(),
6012 counters,
6013 ),
6014 Some(Segment::data(
6015 ISS_1
6016 + u32::from(expected_byte - b'A')
6017 * u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
6018 ISS_2,
6019 UnscaledWindowSize::from(u16::MAX),
6020 FragmentedPayload::new_contiguous(
6021 &[expected_byte; DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE_USIZE]
6022 )
6023 ))
6024 );
6025 };
6026
6027 CounterExpectations::default().assert_counters(&counters);
6030 dup_ack(b'B', &counters.refs());
6031 CounterExpectations { fast_recovery: 1, ..Default::default() }.assert_counters(&counters);
6032 dup_ack(b'C', &counters.refs());
6033 CounterExpectations { fast_recovery: 1, ..Default::default() }.assert_counters(&counters);
6034 dup_ack(b'A', &counters.refs());
6037 CounterExpectations {
6038 retransmits: 1,
6039 fast_recovery: 1,
6040 fast_retransmits: 1,
6041 ..Default::default()
6042 }
6043 .assert_counters(&counters);
6044 dup_ack(b'D', &counters.refs());
6046 CounterExpectations {
6047 retransmits: 1,
6048 fast_recovery: 1,
6049 fast_retransmits: 1,
6050 ..Default::default()
6051 }
6052 .assert_counters(&counters);
6053
6054 clock.sleep(Duration::from_millis(10));
6056 assert_eq!(
6057 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
6058 Segment::ack(
6059 ISS_2,
6060 ISS_1 + u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
6061 UnscaledWindowSize::from(u16::MAX)
6062 ),
6063 clock.now(),
6064 &counters.refs(),
6065 ),
6066 (None, None)
6067 );
6068 let established = assert_matches!(state, State::Established(established) => established);
6069 assert_eq!(
6070 u32::from(established.snd.congestion_control.cwnd()),
6071 2 * u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE)
6072 );
6073 }
6074
6075 #[test]
6076 fn keep_alive() {
6077 let mut clock = FakeInstantCtx::default();
6078 let counters = FakeTcpCounters::default();
6079 let mut state: State<_, _, _, ()> = State::Established(Established {
6080 snd: Send {
6081 nxt: ISS_1,
6082 max: ISS_1,
6083 una: ISS_1,
6084 wnd: WindowSize::DEFAULT,
6085 wnd_max: WindowSize::DEFAULT,
6086 buffer: RingBuffer::default(),
6087 wl1: ISS_2,
6088 wl2: ISS_1,
6089 rtt_estimator: Estimator::Measured { srtt: RTT, rtt_var: RTT / 2 },
6090 rtt_sampler: RttSampler::default(),
6091 timer: None,
6092 congestion_control: CongestionControl::cubic_with_mss(
6093 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6094 ),
6095 wnd_scale: WindowScale::default(),
6096 }
6097 .into(),
6098 rcv: Recv {
6099 buffer: RecvBufferState::Open {
6100 buffer: RingBuffer::default(),
6101 assembler: Assembler::new(ISS_2),
6102 },
6103 timer: None,
6104 mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6105 wnd_scale: WindowScale::default(),
6106 last_window_update: (ISS_2, WindowSize::DEFAULT),
6107 remaining_quickacks: 0,
6108 last_segment_at: None,
6109 sack_permitted: SACK_PERMITTED,
6110 }
6111 .into(),
6112 });
6113
6114 let socket_options = {
6115 let mut socket_options = SocketOptions::default_for_state_tests();
6116 socket_options.keep_alive.enabled = true;
6117 socket_options
6118 };
6119 let socket_options = &socket_options;
6120 let keep_alive = &socket_options.keep_alive;
6121
6122 assert_eq!(
6124 state.poll_send(
6125 &FakeStateMachineDebugId,
6126 &counters.refs(),
6127 u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
6128 clock.now(),
6129 socket_options,
6130 ),
6131 Err(NewlyClosed::No),
6132 );
6133 assert_eq!(state.poll_send_at(), Some(clock.now().panicking_add(keep_alive.idle.into())));
6136
6137 clock.sleep(Duration::from_secs(60 * 60));
6139 assert_eq!(
6140 state.on_segment::<&[u8], ClientlessBufferProvider>(
6141 &FakeStateMachineDebugId,
6142 &counters.refs(),
6143 Segment::ack(ISS_2, ISS_1, UnscaledWindowSize::from(u16::MAX)),
6144 clock.now(),
6145 socket_options,
6146 false, ),
6148 (None, None, DataAcked::No, NewlyClosed::No)
6149 );
6150 assert_eq!(state.poll_send_at(), Some(clock.now().panicking_add(keep_alive.idle.into())),);
6152 clock.sleep(keep_alive.idle.into());
6153
6154 for _ in 0..keep_alive.count.get() {
6157 assert_eq!(
6158 state.poll_send(
6159 &FakeStateMachineDebugId,
6160 &counters.refs(),
6161 u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
6162 clock.now(),
6163 socket_options,
6164 ),
6165 Ok(Segment::ack(ISS_1 - 1, ISS_2, UnscaledWindowSize::from(u16::MAX)))
6166 );
6167 clock.sleep(keep_alive.interval.into());
6168 assert_matches!(state, State::Established(_));
6169 }
6170
6171 assert_eq!(
6174 state.poll_send(
6175 &FakeStateMachineDebugId,
6176 &counters.refs(),
6177 u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
6178 clock.now(),
6179 socket_options,
6180 ),
6181 Err(NewlyClosed::Yes),
6182 );
6183 assert_eq!(state, State::Closed(Closed { reason: Some(ConnectionError::TimedOut) }));
6184 CounterExpectations {
6185 established_closed: 1,
6186 established_timedout: 1,
6187 ..Default::default()
6188 }
6189 .assert_counters(&counters);
6190 }
6191
6192 #[derive(Debug)]
6194 struct ReservingBuffer<B> {
6195 buffer: B,
6196 reserved_bytes: usize,
6197 }
6198
6199 impl<B: Buffer> Buffer for ReservingBuffer<B> {
6200 fn capacity_range() -> (usize, usize) {
6201 B::capacity_range()
6202 }
6203
6204 fn limits(&self) -> BufferLimits {
6205 self.buffer.limits()
6206 }
6207
6208 fn target_capacity(&self) -> usize {
6209 self.buffer.target_capacity()
6210 }
6211
6212 fn request_capacity(&mut self, size: usize) {
6213 self.buffer.request_capacity(size)
6214 }
6215 }
6216
6217 impl<B: SendBuffer> SendBuffer for ReservingBuffer<B> {
6218 type Payload<'a> = B::Payload<'a>;
6219
6220 fn mark_read(&mut self, count: usize) {
6221 self.buffer.mark_read(count)
6222 }
6223
6224 fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
6225 where
6226 F: FnOnce(B::Payload<'a>) -> R,
6227 {
6228 let Self { buffer, reserved_bytes } = self;
6229 buffer.peek_with(offset, |payload| {
6230 let len = payload.len();
6231 let new_len = len.saturating_sub(*reserved_bytes);
6232 f(payload.slice(0..new_len.try_into().unwrap_or(u32::MAX)))
6233 })
6234 }
6235 }
6236
6237 #[test_case(true, 0)]
6238 #[test_case(false, 0)]
6239 #[test_case(true, 1)]
6240 #[test_case(false, 1)]
6241 fn poll_send_len(has_fin: bool, reserved_bytes: usize) {
6242 const VALUE: u8 = 0xaa;
6243
6244 fn with_poll_send_result<const HAS_FIN: bool>(
6245 f: impl FnOnce(Segment<FragmentedPayload<'_, 2>>),
6246 reserved_bytes: usize,
6247 ) {
6248 const DATA_LEN: usize = 40;
6249 let buffer = ReservingBuffer {
6250 buffer: RingBuffer::with_data(DATA_LEN, &vec![VALUE; DATA_LEN]),
6251 reserved_bytes,
6252 };
6253 assert_eq!(buffer.limits().len, DATA_LEN);
6254
6255 let mut snd = Send::<FakeInstant, _, HAS_FIN> {
6256 nxt: ISS_1,
6257 max: ISS_1,
6258 una: ISS_1,
6259 wnd: WindowSize::DEFAULT,
6260 wnd_max: WindowSize::DEFAULT,
6261 buffer,
6262 wl1: ISS_2,
6263 wl2: ISS_1,
6264 rtt_estimator: Estimator::Measured { srtt: RTT, rtt_var: RTT / 2 },
6265 rtt_sampler: RttSampler::default(),
6266 timer: None,
6267 congestion_control: CongestionControl::cubic_with_mss(
6268 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6269 ),
6270 wnd_scale: WindowScale::default(),
6271 };
6272
6273 let counters = FakeTcpCounters::default();
6274
6275 f(snd
6276 .poll_send(
6277 &FakeStateMachineDebugId,
6278 &counters.refs(),
6279 &RecvParams {
6280 ack: ISS_1,
6281 wnd: WindowSize::DEFAULT,
6282 wnd_scale: WindowScale::ZERO,
6283 },
6284 u32::from(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
6285 FakeInstant::default(),
6286 &SocketOptions::default_for_state_tests(),
6287 )
6288 .expect("has data"))
6289 }
6290
6291 let f = |segment: Segment<FragmentedPayload<'_, 2>>| {
6292 let segment_len = segment.len();
6293 let Segment { header: _, data } = segment;
6294 let data_len = data.len();
6295
6296 if has_fin && reserved_bytes == 0 {
6297 assert_eq!(
6298 segment_len,
6299 u32::try_from(data_len + 1).unwrap(),
6300 "FIN not accounted for"
6301 );
6302 } else {
6303 assert_eq!(segment_len, u32::try_from(data_len).unwrap());
6304 }
6305
6306 let mut target = vec![0; data_len];
6307 data.partial_copy(0, target.as_mut_slice());
6308 assert_eq!(target, vec![VALUE; data_len]);
6309 };
6310 match has_fin {
6311 true => with_poll_send_result::<true>(f, reserved_bytes),
6312 false => with_poll_send_result::<false>(f, reserved_bytes),
6313 }
6314 }
6315
6316 #[test]
6317 fn zero_window_probe() {
6318 let mut clock = FakeInstantCtx::default();
6319 let counters = FakeTcpCounters::default();
6320 let mut send_buffer = RingBuffer::new(BUFFER_SIZE);
6321 assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5);
6322 let mut state = State::Established(Established {
6324 snd: Send {
6325 nxt: ISS_1 + 1,
6326 max: ISS_1 + 1,
6327 una: ISS_1 + 1,
6328 wnd: WindowSize::ZERO,
6329 wnd_max: WindowSize::ZERO,
6330 buffer: send_buffer.clone(),
6331 wl1: ISS_2,
6332 wl2: ISS_1,
6333 rtt_sampler: RttSampler::default(),
6334 rtt_estimator: Estimator::default(),
6335 timer: None,
6336 congestion_control: CongestionControl::cubic_with_mss(
6337 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6338 ),
6339 wnd_scale: WindowScale::default(),
6340 }
6341 .into(),
6342 rcv: Recv {
6343 buffer: RecvBufferState::Open {
6344 buffer: RingBuffer::new(BUFFER_SIZE),
6345 assembler: Assembler::new(ISS_2 + 1),
6346 },
6347 timer: None,
6348 remaining_quickacks: 0,
6349 last_segment_at: None,
6350 mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6351 wnd_scale: WindowScale::default(),
6352 last_window_update: (ISS_2 + 1, WindowSize::new(BUFFER_SIZE).unwrap()),
6353 sack_permitted: SACK_PERMITTED,
6354 }
6355 .into(),
6356 });
6357 assert_eq!(
6358 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
6359 None
6360 );
6361 assert_eq!(state.poll_send_at(), Some(clock.now().panicking_add(Rto::DEFAULT.get())));
6362
6363 clock.sleep(Rto::DEFAULT.get());
6365 assert_eq!(
6366 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
6367 Some(Segment::data(
6368 ISS_1 + 1,
6369 ISS_2 + 1,
6370 UnscaledWindowSize::from_usize(BUFFER_SIZE),
6371 FragmentedPayload::new_contiguous(&TEST_BYTES[0..1])
6372 ))
6373 );
6374
6375 assert_eq!(
6377 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
6378 Segment::ack(ISS_2 + 1, ISS_1 + 1, UnscaledWindowSize::from(0)),
6379 clock.now(),
6380 &counters.refs(),
6381 ),
6382 (None, None)
6383 );
6384 assert_eq!(
6386 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
6387 None
6388 );
6389 assert_eq!(state.poll_send_at(), Some(clock.now().panicking_add(Rto::DEFAULT.get() * 2)));
6390
6391 clock.sleep(Rto::DEFAULT.get());
6393 assert_eq!(
6394 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
6395 None
6396 );
6397
6398 clock.sleep(Rto::DEFAULT.get());
6400 assert_eq!(
6401 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
6402 Some(Segment::data(
6403 ISS_1 + 1,
6404 ISS_2 + 1,
6405 UnscaledWindowSize::from_usize(BUFFER_SIZE),
6406 FragmentedPayload::new_contiguous(&TEST_BYTES[0..1])
6407 ))
6408 );
6409
6410 assert_eq!(
6412 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
6413 Segment::ack(ISS_2 + 1, ISS_1 + 2, UnscaledWindowSize::from(u16::MAX)),
6414 clock.now(),
6415 &counters.refs(),
6416 ),
6417 (None, None)
6418 );
6419 assert_eq!(state.poll_send_at(), None);
6420 assert_eq!(
6421 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
6422 Some(Segment::data(
6423 ISS_1 + 2,
6424 ISS_2 + 1,
6425 UnscaledWindowSize::from_usize(BUFFER_SIZE),
6426 FragmentedPayload::new_contiguous(&TEST_BYTES[1..])
6427 ))
6428 );
6429 }
6430
6431 #[test]
6432 fn nagle() {
6433 let clock = FakeInstantCtx::default();
6434 let counters = FakeTcpCounters::default();
6435 let mut send_buffer = RingBuffer::new(BUFFER_SIZE);
6436 assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5);
6437 let mut state: State<_, _, _, ()> = State::Established(Established {
6439 snd: Send {
6440 nxt: ISS_1 + 1,
6441 max: ISS_1 + 1,
6442 una: ISS_1 + 1,
6443 wnd: WindowSize::DEFAULT,
6444 wnd_max: WindowSize::DEFAULT,
6445 buffer: send_buffer.clone(),
6446 wl1: ISS_2,
6447 wl2: ISS_1,
6448 rtt_sampler: RttSampler::default(),
6449 rtt_estimator: Estimator::default(),
6450 timer: None,
6451 congestion_control: CongestionControl::cubic_with_mss(
6452 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6453 ),
6454 wnd_scale: WindowScale::default(),
6455 }
6456 .into(),
6457 rcv: Recv {
6458 buffer: RecvBufferState::Open {
6459 buffer: RingBuffer::new(BUFFER_SIZE),
6460 assembler: Assembler::new(ISS_2 + 1),
6461 },
6462 timer: None,
6463 remaining_quickacks: 0,
6464 last_segment_at: None,
6465 mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6466 wnd_scale: WindowScale::default(),
6467 last_window_update: (ISS_2 + 1, WindowSize::new(BUFFER_SIZE).unwrap()),
6468 sack_permitted: SACK_PERMITTED,
6469 }
6470 .into(),
6471 });
6472 let mut socket_options =
6473 SocketOptions { nagle_enabled: true, ..SocketOptions::default_for_state_tests() };
6474 assert_eq!(
6475 state.poll_send(
6476 &FakeStateMachineDebugId,
6477 &counters.refs(),
6478 3,
6479 clock.now(),
6480 &socket_options
6481 ),
6482 Ok(Segment::data(
6483 ISS_1 + 1,
6484 ISS_2 + 1,
6485 UnscaledWindowSize::from_usize(BUFFER_SIZE),
6486 FragmentedPayload::new_contiguous(&TEST_BYTES[0..3])
6487 ))
6488 );
6489 assert_eq!(
6490 state.poll_send(
6491 &FakeStateMachineDebugId,
6492 &counters.refs(),
6493 3,
6494 clock.now(),
6495 &socket_options
6496 ),
6497 Err(NewlyClosed::No)
6498 );
6499 socket_options.nagle_enabled = false;
6500 assert_eq!(
6501 state.poll_send(
6502 &FakeStateMachineDebugId,
6503 &counters.refs(),
6504 3,
6505 clock.now(),
6506 &socket_options
6507 ),
6508 Ok(Segment::data(
6509 ISS_1 + 4,
6510 ISS_2 + 1,
6511 UnscaledWindowSize::from_usize(BUFFER_SIZE),
6512 FragmentedPayload::new_contiguous(&TEST_BYTES[3..5])
6513 ))
6514 );
6515 }
6516
6517 #[test]
6518 fn mss_option() {
6519 let clock = FakeInstantCtx::default();
6520 let counters = FakeTcpCounters::default();
6521 let (syn_sent, syn) = Closed::<Initial>::connect(
6522 ISS_1,
6523 clock.now(),
6524 (),
6525 Default::default(),
6526 Mss(NonZeroU16::new(1).unwrap()),
6527 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6528 &SocketOptions::default_for_state_tests(),
6529 );
6530 let mut state = State::<_, RingBuffer, RingBuffer, ()>::SynSent(syn_sent);
6531
6532 let (seg, passive_open) = state
6534 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
6535 syn,
6536 clock.now(),
6537 &counters.refs(),
6538 );
6539 let syn_ack = seg.expect("expected SYN-ACK");
6540 assert_eq!(passive_open, None);
6541
6542 assert_eq!(
6544 state.on_segment_with_default_options::<_, ClientlessBufferProvider>(
6545 syn_ack,
6546 clock.now(),
6547 &counters.refs(),
6548 ),
6549 (Some(Segment::ack(ISS_1 + 1, ISS_1 + 1, UnscaledWindowSize::from(u16::MAX))), None)
6550 );
6551 match state {
6552 State::Closed(_)
6553 | State::Listen(_)
6554 | State::SynRcvd(_)
6555 | State::SynSent(_)
6556 | State::LastAck(_)
6557 | State::FinWait1(_)
6558 | State::FinWait2(_)
6559 | State::Closing(_)
6560 | State::TimeWait(_) => {
6561 panic!("expected that we have entered established state, but got {:?}", state)
6562 }
6563 State::Established(Established { ref mut snd, rcv: _ })
6564 | State::CloseWait(CloseWait { ref mut snd, closed_rcv: _ }) => {
6565 assert_eq!(snd.buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len());
6566 }
6567 }
6568 assert_eq!(
6570 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
6571 Some(Segment::data(
6572 ISS_1 + 1,
6573 ISS_1 + 1,
6574 UnscaledWindowSize::from(u16::MAX),
6575 FragmentedPayload::new_contiguous(&TEST_BYTES[..1]),
6576 ))
6577 );
6578 }
6579
6580 const TEST_USER_TIMEOUT: NonZeroDuration = NonZeroDuration::from_secs(2 * 60).unwrap();
6581
6582 #[test_case(Duration::from_millis(1), false, true; "retrans_max_retries")]
6586 #[test_case(Duration::from_secs(1), false, false; "retrans_no_max_retries")]
6587 #[test_case(Duration::from_millis(1), true, true; "zwp_max_retries")]
6588 #[test_case(Duration::from_secs(1), true, false; "zwp_no_max_retires")]
6589 fn user_timeout(rtt: Duration, zero_window_probe: bool, max_retries: bool) {
6590 let mut clock = FakeInstantCtx::default();
6591 let counters = FakeTcpCounters::default();
6592 let mut send_buffer = RingBuffer::new(BUFFER_SIZE);
6593 assert_eq!(send_buffer.enqueue_data(TEST_BYTES), 5);
6594 let mut state: State<_, _, _, ()> = State::Established(Established {
6596 snd: Send {
6597 nxt: ISS_1 + 1,
6598 max: ISS_1 + 1,
6599 una: ISS_1 + 1,
6600 wnd: WindowSize::DEFAULT,
6601 wnd_max: WindowSize::DEFAULT,
6602 buffer: send_buffer.clone(),
6603 wl1: ISS_2 + 1,
6604 wl2: ISS_1 + 1,
6605 rtt_sampler: RttSampler::default(),
6606 rtt_estimator: Estimator::Measured { srtt: rtt, rtt_var: Duration::ZERO },
6607 timer: None,
6608 congestion_control: CongestionControl::cubic_with_mss(
6609 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6610 ),
6611 wnd_scale: WindowScale::default(),
6612 }
6613 .into(),
6614 rcv: Recv {
6615 buffer: RecvBufferState::Open {
6616 buffer: RingBuffer::new(BUFFER_SIZE),
6617 assembler: Assembler::new(ISS_2 + 1),
6618 },
6619 timer: None,
6620 remaining_quickacks: 0,
6621 last_segment_at: None,
6622 mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6623 wnd_scale: WindowScale::default(),
6624 last_window_update: (ISS_2 + 1, WindowSize::DEFAULT),
6625 sack_permitted: SACK_PERMITTED,
6626 }
6627 .into(),
6628 });
6629 let mut times = 0;
6630 let start = clock.now();
6631 let socket_options = SocketOptions {
6632 user_timeout: (!max_retries).then_some(TEST_USER_TIMEOUT),
6633 ..SocketOptions::default_for_state_tests()
6634 };
6635 while let Ok(seg) = state.poll_send(
6636 &FakeStateMachineDebugId,
6637 &counters.refs(),
6638 u32::MAX,
6639 clock.now(),
6640 &socket_options,
6641 ) {
6642 if zero_window_probe {
6643 let zero_window_ack = Segment::ack(
6644 seg.header.ack.unwrap(),
6645 seg.header.seq,
6646 UnscaledWindowSize::from(0),
6647 );
6648 assert_matches!(
6649 state.on_segment::<(), ClientlessBufferProvider>(
6650 &FakeStateMachineDebugId,
6651 &counters.refs(),
6652 zero_window_ack,
6653 clock.now(),
6654 &socket_options,
6655 false,
6656 ),
6657 (None, None, DataAcked::No, _newly_closed)
6658 );
6659
6660 assert_matches!(
6665 state.poll_send(
6666 &FakeStateMachineDebugId,
6667 &counters.refs(),
6668 u32::MAX,
6669 clock.now(),
6670 &socket_options,
6671 ),
6672 Err(NewlyClosed::No)
6673 );
6674 let inner_state = assert_matches!(state, State::Established(ref e) => e);
6675 assert_matches!(inner_state.snd.timer, Some(SendTimer::ZeroWindowProbe(_)));
6676 }
6677
6678 let deadline = state.poll_send_at().expect("must have a retransmission timer");
6679 clock.sleep(deadline.checked_duration_since(clock.now()).unwrap());
6680 times += 1;
6681 }
6682 let elapsed = clock.now().checked_duration_since(start).unwrap();
6683 if max_retries {
6684 assert_eq!(times, 1 + DEFAULT_MAX_RETRIES.get());
6685 } else {
6686 assert_eq!(elapsed, TEST_USER_TIMEOUT.get());
6687 assert!(times < DEFAULT_MAX_RETRIES.get());
6688 }
6689 assert_eq!(state, State::Closed(Closed { reason: Some(ConnectionError::TimedOut) }));
6690 CounterExpectations {
6691 established_closed: 1,
6692 established_timedout: 1,
6693 fast_retransmits: if zero_window_probe { 1 } else { 0 },
6694 fast_recovery: if zero_window_probe { 1 } else { 0 },
6695 timeouts: counters.stack_wide.timeouts.get(),
6698 retransmits: counters.stack_wide.retransmits.get(),
6699 slow_start_retransmits: counters.stack_wide.slow_start_retransmits.get(),
6700 ..Default::default()
6701 }
6702 .assert_counters(&counters);
6703 }
6704
6705 #[test]
6706 fn retrans_timer_backoff() {
6707 let mut clock = FakeInstantCtx::default();
6708 let mut timer = RetransTimer::new(
6709 clock.now(),
6710 Rto::DEFAULT,
6711 Some(TEST_USER_TIMEOUT),
6712 DEFAULT_MAX_RETRIES,
6713 );
6714 assert_eq!(timer.at, FakeInstant::from(Rto::DEFAULT.get()));
6715 clock.sleep(TEST_USER_TIMEOUT.get());
6716 timer.backoff(clock.now());
6717 assert_eq!(timer.at, FakeInstant::from(TEST_USER_TIMEOUT.get()));
6718 clock.sleep(Duration::from_secs(1));
6719 timer.backoff(clock.now());
6721 assert_eq!(timer.at, FakeInstant::from(TEST_USER_TIMEOUT.get()));
6723 }
6724
6725 #[test_case(
6726 State::Established(Established {
6727 snd: Send {
6728 nxt: ISS_1 + 1,
6729 max: ISS_1 + 1,
6730 una: ISS_1 + 1,
6731 wnd: WindowSize::DEFAULT,
6732 wnd_max: WindowSize::DEFAULT,
6733 buffer: RingBuffer::new(BUFFER_SIZE),
6734 wl1: ISS_2,
6735 wl2: ISS_1,
6736 rtt_sampler: RttSampler::default(),
6737 rtt_estimator: Estimator::Measured {
6738 srtt: Rto::DEFAULT.get(),
6739 rtt_var: Duration::ZERO,
6740 },
6741 timer: None,
6742 congestion_control: CongestionControl::cubic_with_mss(
6743 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6744 ),
6745 wnd_scale: WindowScale::default(),
6746 }.into(),
6747 rcv: Recv {
6748 buffer: RecvBufferState::Open {
6749 buffer: RingBuffer::new(
6750 TEST_BYTES.len() + 2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE) as usize,
6751 ),
6752 assembler: Assembler::new(ISS_2 + 1),
6753 },
6754 timer: None,
6755 remaining_quickacks: 0,
6756 last_segment_at: None,
6757 mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6758 wnd_scale: WindowScale::default(),
6759 last_window_update: (
6760 ISS_2 + 1,
6761 WindowSize::new(TEST_BYTES.len() + 2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE) as usize).unwrap()
6762 ),
6763 sack_permitted: SACK_PERMITTED,
6764 }.into(),
6765 }); "established")]
6766 #[test_case(
6767 State::FinWait1(FinWait1 {
6768 snd: Send {
6769 nxt: ISS_1 + 1,
6770 max: ISS_1 + 1,
6771 una: ISS_1 + 1,
6772 wnd: WindowSize::DEFAULT,
6773 wnd_max: WindowSize::DEFAULT,
6774 buffer: RingBuffer::new(BUFFER_SIZE),
6775 wl1: ISS_2,
6776 wl2: ISS_1,
6777 rtt_sampler: RttSampler::default(),
6778 rtt_estimator: Estimator::Measured {
6779 srtt: Rto::DEFAULT.get(),
6780 rtt_var: Duration::ZERO,
6781 },
6782 timer: None,
6783 congestion_control: CongestionControl::cubic_with_mss(
6784 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6785 ),
6786 wnd_scale: WindowScale::default(),
6787 }.into(),
6788 rcv: Recv {
6789 buffer: RecvBufferState::Open {
6790 buffer: RingBuffer::new(
6791 TEST_BYTES.len() + 2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE) as usize,
6792 ),
6793 assembler: Assembler::new(ISS_2 + 1),
6794 },
6795 timer: None,
6796 remaining_quickacks: 0,
6797 last_segment_at: None,
6798 mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6799 wnd_scale: WindowScale::default(),
6800 last_window_update: (
6801 ISS_2 + 1,
6802 WindowSize::new(TEST_BYTES.len() + 2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE) as usize).unwrap()
6803 ),
6804 sack_permitted: SACK_PERMITTED,
6805 }.into(),
6806 }); "fin_wait_1")]
6807 #[test_case(
6808 State::FinWait2(FinWait2 {
6809 last_seq: ISS_1 + 1,
6810 rcv: Recv {
6811 buffer: RecvBufferState::Open {
6812 buffer: RingBuffer::new(
6813 TEST_BYTES.len() + 2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE) as usize,
6814 ),
6815 assembler: Assembler::new(ISS_2 + 1),
6816 },
6817 timer: None,
6818 remaining_quickacks: 0,
6819 last_segment_at: None,
6820 mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6821 wnd_scale: WindowScale::default(),
6822 last_window_update: (
6823 ISS_2 + 1,
6824 WindowSize::new(TEST_BYTES.len() + 2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE) as usize).unwrap()
6825 ),
6826 sack_permitted: SACK_PERMITTED,
6827 },
6828 timeout_at: None,
6829 }); "fin_wait_2")]
6830 fn delayed_ack(mut state: State<FakeInstant, RingBuffer, RingBuffer, ()>) {
6831 let mut clock = FakeInstantCtx::default();
6832 let counters = FakeTcpCounters::default();
6833 let socket_options =
6834 SocketOptions { delayed_ack: true, ..SocketOptions::default_for_state_tests() };
6835 assert_eq!(
6836 state.on_segment::<_, ClientlessBufferProvider>(
6837 &FakeStateMachineDebugId,
6838 &counters.refs(),
6839 Segment::data(
6840 ISS_2 + 1,
6841 ISS_1 + 1,
6842 UnscaledWindowSize::from(u16::MAX),
6843 TEST_BYTES,
6844 ),
6845 clock.now(),
6846 &socket_options,
6847 false, ),
6849 (None, None, DataAcked::No, NewlyClosed::No)
6850 );
6851 assert_eq!(state.poll_send_at(), Some(clock.now().panicking_add(ACK_DELAY_THRESHOLD)));
6852 clock.sleep(ACK_DELAY_THRESHOLD);
6853 assert_eq!(
6854 state.poll_send(
6855 &FakeStateMachineDebugId,
6856 &counters.refs(),
6857 u32::MAX,
6858 clock.now(),
6859 &socket_options
6860 ),
6861 Ok(Segment::ack(
6862 ISS_1 + 1,
6863 ISS_2 + 1 + TEST_BYTES.len(),
6864 UnscaledWindowSize::from_u32(2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE)),
6865 ))
6866 );
6867 let full_segment_sized_payload =
6868 vec![b'0'; u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE) as usize];
6869
6870 let expect_last_window_update = (
6871 ISS_2 + 1 + TEST_BYTES.len(),
6872 WindowSize::from_u32(2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE)).unwrap(),
6873 );
6874 assert_eq!(state.recv_mut().unwrap().last_window_update, expect_last_window_update);
6875 assert_eq!(
6877 state.on_segment::<_, ClientlessBufferProvider>(
6878 &FakeStateMachineDebugId,
6879 &counters.refs(),
6880 Segment::data(
6881 ISS_2 + 1 + TEST_BYTES.len(),
6882 ISS_1 + 1,
6883 UnscaledWindowSize::from(u16::MAX),
6884 &full_segment_sized_payload[..],
6885 ),
6886 clock.now(),
6887 &socket_options,
6888 false, ),
6890 (None, None, DataAcked::No, NewlyClosed::No)
6891 );
6892 assert_eq!(state.poll_send_at(), Some(clock.now().panicking_add(ACK_DELAY_THRESHOLD)));
6894 assert_eq!(state.recv_mut().unwrap().last_window_update, expect_last_window_update);
6897
6898 assert_eq!(
6901 state.on_segment::<_, ClientlessBufferProvider>(
6902 &FakeStateMachineDebugId,
6903 &counters.refs(),
6904 Segment::data(
6905 ISS_2 + 1 + TEST_BYTES.len() + full_segment_sized_payload.len(),
6906 ISS_1 + 1,
6907 UnscaledWindowSize::from(u16::MAX),
6908 &full_segment_sized_payload[..],
6909 ),
6910 clock.now(),
6911 &socket_options,
6912 false, ),
6914 (
6915 Some(Segment::ack(
6916 ISS_1 + 1,
6917 ISS_2 + 1 + TEST_BYTES.len() + 2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE),
6918 UnscaledWindowSize::from(0),
6919 )),
6920 None,
6921 DataAcked::No,
6922 NewlyClosed::No,
6923 )
6924 );
6925 assert_eq!(
6927 state.recv_mut().unwrap().last_window_update,
6928 (
6929 ISS_2 + 1 + TEST_BYTES.len() + 2 * u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE),
6930 WindowSize::ZERO,
6931 )
6932 );
6933 assert_eq!(state.poll_send_at(), None);
6934 }
6935
6936 #[test]
6937 fn immediate_ack_if_out_of_order_or_fin() {
6938 let clock = FakeInstantCtx::default();
6939 let counters = FakeTcpCounters::default();
6940 let socket_options =
6941 SocketOptions { delayed_ack: true, ..SocketOptions::default_for_state_tests() };
6942 let mut state: State<_, _, _, ()> = State::Established(Established {
6943 snd: Send {
6944 nxt: ISS_1 + 1,
6945 max: ISS_1 + 1,
6946 una: ISS_1 + 1,
6947 wnd: WindowSize::DEFAULT,
6948 wnd_max: WindowSize::DEFAULT,
6949 buffer: RingBuffer::new(BUFFER_SIZE),
6950 wl1: ISS_2,
6951 wl2: ISS_1,
6952 rtt_sampler: RttSampler::default(),
6953 rtt_estimator: Estimator::Measured {
6954 srtt: Rto::DEFAULT.get(),
6955 rtt_var: Duration::ZERO,
6956 },
6957 timer: None,
6958 congestion_control: CongestionControl::cubic_with_mss(
6959 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6960 ),
6961 wnd_scale: WindowScale::default(),
6962 }
6963 .into(),
6964 rcv: Recv {
6965 buffer: RecvBufferState::Open {
6966 buffer: RingBuffer::new(TEST_BYTES.len() + 1),
6967 assembler: Assembler::new(ISS_2 + 1),
6968 },
6969 timer: None,
6970 remaining_quickacks: 0,
6971 last_segment_at: None,
6972 mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
6973 wnd_scale: WindowScale::default(),
6974 last_window_update: (ISS_2 + 1, WindowSize::new(TEST_BYTES.len() + 1).unwrap()),
6975 sack_permitted: SACK_PERMITTED,
6976 }
6977 .into(),
6978 });
6979 assert_eq!(
6982 state.on_segment::<_, ClientlessBufferProvider>(
6983 &FakeStateMachineDebugId,
6984 &counters.refs(),
6985 Segment::data(
6986 ISS_2 + 2,
6987 ISS_1 + 1,
6988 UnscaledWindowSize::from(u16::MAX),
6989 &TEST_BYTES[1..]
6990 ),
6991 clock.now(),
6992 &socket_options,
6993 false, ),
6995 (
6996 Some(Segment::ack(
6997 ISS_1 + 1,
6998 ISS_2 + 1,
6999 UnscaledWindowSize::from(u16::try_from(TEST_BYTES.len() + 1).unwrap())
7000 )),
7001 None,
7002 DataAcked::No,
7003 NewlyClosed::No,
7004 )
7005 );
7006 assert_eq!(state.poll_send_at(), None);
7007 assert_eq!(
7010 state.on_segment::<_, ClientlessBufferProvider>(
7011 &FakeStateMachineDebugId,
7012 &counters.refs(),
7013 Segment::data(
7014 ISS_2 + 1,
7015 ISS_1 + 1,
7016 UnscaledWindowSize::from(u16::MAX),
7017 &TEST_BYTES[..1]
7018 ),
7019 clock.now(),
7020 &socket_options,
7021 false, ),
7023 (
7024 Some(Segment::ack(
7025 ISS_1 + 1,
7026 ISS_2 + 1 + TEST_BYTES.len(),
7027 UnscaledWindowSize::from(1),
7028 )),
7029 None,
7030 DataAcked::No,
7031 NewlyClosed::No
7032 )
7033 );
7034 assert_eq!(state.poll_send_at(), None);
7035 assert_eq!(
7037 state.on_segment::<(), ClientlessBufferProvider>(
7038 &FakeStateMachineDebugId,
7039 &counters.refs(),
7040 Segment::fin(
7041 ISS_2 + 1 + TEST_BYTES.len(),
7042 ISS_1 + 1,
7043 UnscaledWindowSize::from(u16::MAX),
7044 ),
7045 clock.now(),
7046 &socket_options,
7047 false, ),
7049 (
7050 Some(Segment::ack(
7051 ISS_1 + 1,
7052 ISS_2 + 1 + TEST_BYTES.len() + 1,
7053 UnscaledWindowSize::from(0),
7054 )),
7055 None,
7056 DataAcked::No,
7057 NewlyClosed::No,
7058 )
7059 );
7060 assert_eq!(state.poll_send_at(), None);
7061 }
7062
7063 #[test]
7064 fn fin_wait2_timeout() {
7065 let mut clock = FakeInstantCtx::default();
7066 let counters = FakeTcpCounters::default();
7067 let mut state: State<_, _, NullBuffer, ()> = State::FinWait2(FinWait2 {
7068 last_seq: ISS_1,
7069 rcv: Recv {
7070 buffer: RecvBufferState::Open {
7071 buffer: NullBuffer,
7072 assembler: Assembler::new(ISS_2),
7073 },
7074 timer: None,
7075 remaining_quickacks: 0,
7076 last_segment_at: None,
7077 mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
7078 wnd_scale: WindowScale::default(),
7079 last_window_update: (ISS_2, WindowSize::DEFAULT),
7080 sack_permitted: SACK_PERMITTED,
7081 },
7082 timeout_at: None,
7083 });
7084 assert_eq!(
7085 state.close(
7086 &counters.refs(),
7087 CloseReason::Close { now: clock.now() },
7088 &SocketOptions::default_for_state_tests()
7089 ),
7090 Err(CloseError::Closing)
7091 );
7092 assert_eq!(
7093 state.poll_send_at(),
7094 Some(clock.now().panicking_add(DEFAULT_FIN_WAIT2_TIMEOUT))
7095 );
7096 clock.sleep(DEFAULT_FIN_WAIT2_TIMEOUT);
7097 assert_eq!(
7098 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
7099 None
7100 );
7101 assert_eq!(state, State::Closed(Closed { reason: Some(ConnectionError::TimedOut) }));
7102 CounterExpectations {
7103 established_closed: 1,
7104 established_timedout: 1,
7105 ..Default::default()
7106 }
7107 .assert_counters(&counters);
7108 }
7109
7110 #[test_case(RetransTimer {
7111 user_timeout_until: Some(FakeInstant::from(Duration::from_secs(100))),
7112 remaining_retries: None,
7113 at: FakeInstant::from(Duration::from_secs(1)),
7114 rto: Rto::new(Duration::from_secs(1)),
7115 }, FakeInstant::from(Duration::from_secs(1)) => true)]
7116 #[test_case(RetransTimer {
7117 user_timeout_until: Some(FakeInstant::from(Duration::from_secs(100))),
7118 remaining_retries: None,
7119 at: FakeInstant::from(Duration::from_secs(2)),
7120 rto: Rto::new(Duration::from_secs(1)),
7121 }, FakeInstant::from(Duration::from_secs(1)) => false)]
7122 #[test_case(RetransTimer {
7123 user_timeout_until: Some(FakeInstant::from(Duration::from_secs(100))),
7124 remaining_retries: Some(NonZeroU8::new(1).unwrap()),
7125 at: FakeInstant::from(Duration::from_secs(2)),
7126 rto: Rto::new(Duration::from_secs(1)),
7127 }, FakeInstant::from(Duration::from_secs(1)) => false)]
7128 #[test_case(RetransTimer {
7129 user_timeout_until: Some(FakeInstant::from(Duration::from_secs(1))),
7130 remaining_retries: Some(NonZeroU8::new(1).unwrap()),
7131 at: FakeInstant::from(Duration::from_secs(1)),
7132 rto: Rto::new(Duration::from_secs(1)),
7133 }, FakeInstant::from(Duration::from_secs(1)) => true)]
7134 fn send_timed_out(timer: RetransTimer<FakeInstant>, now: FakeInstant) -> bool {
7135 timer.timed_out(now)
7136 }
7137
7138 #[test_case(
7139 State::SynSent(SynSent{
7140 iss: ISS_1,
7141 timestamp: Some(FakeInstant::default()),
7142 retrans_timer: RetransTimer::new(
7143 FakeInstant::default(),
7144 Rto::MIN,
7145 NonZeroDuration::from_secs(60),
7146 DEFAULT_MAX_SYN_RETRIES,
7147 ),
7148 active_open: (),
7149 buffer_sizes: Default::default(),
7150 device_mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
7151 default_mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
7152 rcv_wnd_scale: WindowScale::default(),
7153 })
7154 => DEFAULT_MAX_SYN_RETRIES.get(); "syn_sent")]
7155 #[test_case(
7156 State::SynRcvd(SynRcvd{
7157 iss: ISS_1,
7158 irs: ISS_2,
7159 timestamp: Some(FakeInstant::default()),
7160 retrans_timer: RetransTimer::new(
7161 FakeInstant::default(),
7162 Rto::MIN,
7163 NonZeroDuration::from_secs(60),
7164 DEFAULT_MAX_SYNACK_RETRIES,
7165 ),
7166 simultaneous_open: None,
7167 buffer_sizes: BufferSizes::default(),
7168 smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
7169 rcv_wnd_scale: WindowScale::default(),
7170 snd_wnd_scale: Some(WindowScale::default()),
7171 sack_permitted: SACK_PERMITTED,
7172 })
7173 => DEFAULT_MAX_SYNACK_RETRIES.get(); "syn_rcvd")]
7174 fn handshake_timeout(mut state: State<FakeInstant, RingBuffer, RingBuffer, ()>) -> u8 {
7175 let mut clock = FakeInstantCtx::default();
7176 let counters = FakeTcpCounters::default();
7177 let mut retransmissions = 0;
7178 clock.sleep_until(state.poll_send_at().expect("must have a retransmission timer"));
7179 while let Some(_seg) =
7180 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs())
7181 {
7182 let deadline = state.poll_send_at().expect("must have a retransmission timer");
7183 clock.sleep_until(deadline);
7184 retransmissions += 1;
7185 }
7186 assert_eq!(state, State::Closed(Closed { reason: Some(ConnectionError::TimedOut) }));
7187 CounterExpectations::default().assert_counters(&counters);
7188 retransmissions
7189 }
7190
7191 #[test_case(
7192 u16::MAX as usize, WindowScale::default(), Some(WindowScale::default())
7193 => (WindowScale::default(), WindowScale::default()))]
7194 #[test_case(
7195 u16::MAX as usize + 1, WindowScale::new(1).unwrap(), Some(WindowScale::default())
7196 => (WindowScale::new(1).unwrap(), WindowScale::default()))]
7197 #[test_case(
7198 u16::MAX as usize + 1, WindowScale::new(1).unwrap(), None
7199 => (WindowScale::default(), WindowScale::default()))]
7200 #[test_case(
7201 u16::MAX as usize, WindowScale::default(), Some(WindowScale::new(1).unwrap())
7202 => (WindowScale::default(), WindowScale::new(1).unwrap()))]
7203 fn window_scale(
7204 buffer_size: usize,
7205 syn_window_scale: WindowScale,
7206 syn_ack_window_scale: Option<WindowScale>,
7207 ) -> (WindowScale, WindowScale) {
7208 let mut clock = FakeInstantCtx::default();
7209 let counters = FakeTcpCounters::default();
7210 let (syn_sent, syn_seg) = Closed::<Initial>::connect(
7211 ISS_1,
7212 clock.now(),
7213 (),
7214 BufferSizes { receive: buffer_size, ..Default::default() },
7215 DEVICE_MAXIMUM_SEGMENT_SIZE,
7216 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
7217 &SocketOptions::default_for_state_tests(),
7218 );
7219 assert_eq!(
7220 syn_seg,
7221 Segment::syn(
7222 ISS_1,
7223 UnscaledWindowSize::from(u16::try_from(buffer_size).unwrap_or(u16::MAX)),
7224 HandshakeOptions {
7225 mss: Some(DEVICE_MAXIMUM_SEGMENT_SIZE),
7226 window_scale: Some(syn_window_scale),
7227 sack_permitted: SACK_PERMITTED,
7228 }
7229 .into(),
7230 )
7231 );
7232 let mut active = State::SynSent(syn_sent);
7233 clock.sleep(RTT / 2);
7234 let (seg, passive_open) = active
7235 .on_segment_with_default_options::<(), ClientlessBufferProvider>(
7236 Segment::syn_ack(
7237 ISS_2,
7238 ISS_1 + 1,
7239 UnscaledWindowSize::from(u16::MAX),
7240 HandshakeOptions {
7241 mss: Some(DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE),
7242 window_scale: syn_ack_window_scale,
7243 sack_permitted: SACK_PERMITTED,
7244 }
7245 .into(),
7246 ),
7247 clock.now(),
7248 &counters.refs(),
7249 );
7250 assert_eq!(passive_open, None);
7251 assert_matches!(seg, Some(_));
7252
7253 let established: Established<FakeInstant, RingBuffer, NullBuffer> =
7254 assert_matches!(active, State::Established(established) => established);
7255
7256 assert_eq!(established.snd.wnd, WindowSize::DEFAULT);
7257
7258 (established.rcv.wnd_scale, established.snd.wnd_scale)
7259 }
7260
7261 #[test_case(
7262 u16::MAX as usize,
7263 Segment::syn_ack(
7264 ISS_2 + 1 + u16::MAX as usize,
7265 ISS_1 + 1,
7266 UnscaledWindowSize::from(u16::MAX),
7267 Options::default(),
7268 )
7269 )]
7270 #[test_case(
7271 u16::MAX as usize + 1,
7272 Segment::syn_ack(
7273 ISS_2 + 1 + u16::MAX as usize,
7274 ISS_1 + 1,
7275 UnscaledWindowSize::from(u16::MAX),
7276 Options::default(),
7277 )
7278 )]
7279 #[test_case(
7280 u16::MAX as usize,
7281 Segment::data(
7282 ISS_2 + 1 + u16::MAX as usize,
7283 ISS_1 + 1,
7284 UnscaledWindowSize::from(u16::MAX),
7285 &TEST_BYTES[..],
7286 )
7287 )]
7288 #[test_case(
7289 u16::MAX as usize + 1,
7290 Segment::data(
7291 ISS_2 + 1 + u16::MAX as usize,
7292 ISS_1 + 1,
7293 UnscaledWindowSize::from(u16::MAX),
7294 &TEST_BYTES[..],
7295 )
7296 )]
7297 fn window_scale_otw_seq(receive_buf_size: usize, otw_seg: impl Into<Segment<&'static [u8]>>) {
7298 let counters = FakeTcpCounters::default();
7299 let buffer_sizes = BufferSizes { send: 0, receive: receive_buf_size };
7300 let rcv_wnd_scale = buffer_sizes.rwnd().scale();
7301 let mut syn_rcvd: State<_, RingBuffer, RingBuffer, ()> = State::SynRcvd(SynRcvd {
7302 iss: ISS_1,
7303 irs: ISS_2,
7304 timestamp: None,
7305 retrans_timer: RetransTimer::new(
7306 FakeInstant::default(),
7307 Rto::DEFAULT,
7308 NonZeroDuration::from_secs(10),
7309 DEFAULT_MAX_SYNACK_RETRIES,
7310 ),
7311 simultaneous_open: None,
7312 buffer_sizes: BufferSizes { send: 0, receive: receive_buf_size },
7313 smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
7314 rcv_wnd_scale,
7315 snd_wnd_scale: WindowScale::new(1),
7316 sack_permitted: SACK_PERMITTED,
7317 });
7318
7319 assert_eq!(
7320 syn_rcvd.on_segment_with_default_options::<_, ClientlessBufferProvider>(
7321 otw_seg.into(),
7322 FakeInstant::default(),
7323 &counters.refs()
7324 ),
7325 (Some(Segment::ack(ISS_1 + 1, ISS_2 + 1, buffer_sizes.rwnd_unscaled())), None),
7326 )
7327 }
7328
7329 #[test]
7330 fn poll_send_reserving_buffer() {
7331 const RESERVED_BYTES: usize = 3;
7332 let mut snd: Send<FakeInstant, _, false> = Send {
7333 nxt: ISS_1 + 1,
7334 max: ISS_1 + 1,
7335 una: ISS_1 + 1,
7336 wnd: WindowSize::DEFAULT,
7337 wnd_max: WindowSize::DEFAULT,
7338 wnd_scale: WindowScale::default(),
7339 wl1: ISS_1,
7340 wl2: ISS_2,
7341 buffer: ReservingBuffer {
7342 buffer: RingBuffer::with_data(TEST_BYTES.len(), TEST_BYTES),
7343 reserved_bytes: RESERVED_BYTES,
7344 },
7345 rtt_sampler: RttSampler::default(),
7346 rtt_estimator: Estimator::default(),
7347 timer: None,
7348 congestion_control: CongestionControl::cubic_with_mss(
7349 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
7350 ),
7351 };
7352
7353 let counters = FakeTcpCounters::default();
7354
7355 assert_eq!(
7356 snd.poll_send(
7357 &FakeStateMachineDebugId,
7358 &counters.refs(),
7359 &RecvParams {
7360 ack: ISS_2 + 1,
7361 wnd: WindowSize::DEFAULT,
7362 wnd_scale: WindowScale::ZERO,
7363 },
7364 u32::MAX,
7365 FakeInstant::default(),
7366 &SocketOptions::default_for_state_tests(),
7367 ),
7368 Some(Segment::data(
7369 ISS_1 + 1,
7370 ISS_2 + 1,
7371 WindowSize::DEFAULT >> WindowScale::default(),
7372 FragmentedPayload::new_contiguous(&TEST_BYTES[..TEST_BYTES.len() - RESERVED_BYTES])
7373 ))
7374 );
7375
7376 assert_eq!(snd.nxt, ISS_1 + 1 + (TEST_BYTES.len() - RESERVED_BYTES));
7377 }
7378
7379 #[test]
7380 fn rcv_silly_window_avoidance() {
7381 const MULTIPLE: usize = 3;
7382 const CAP: usize = TEST_BYTES.len() * MULTIPLE;
7383 let mut rcv: Recv<FakeInstant, RingBuffer> = Recv {
7384 buffer: RecvBufferState::Open {
7385 buffer: RingBuffer::new(CAP),
7386 assembler: Assembler::new(ISS_1),
7387 },
7388 timer: None,
7389 remaining_quickacks: 0,
7390 last_segment_at: None,
7391 mss: Mss(NonZeroU16::new(TEST_BYTES.len() as u16).unwrap()),
7392 wnd_scale: WindowScale::default(),
7393 last_window_update: (ISS_1, WindowSize::new(CAP).unwrap()),
7394 sack_permitted: SACK_PERMITTED,
7395 };
7396
7397 fn get_buffer(rcv: &mut Recv<FakeInstant, RingBuffer>) -> &mut RingBuffer {
7398 assert_matches!(
7399 &mut rcv.buffer,
7400 RecvBufferState::Open {ref mut buffer, .. } => buffer
7401 )
7402 }
7403
7404 assert_eq!(rcv.calculate_window_size().window_size, WindowSize::new(CAP).unwrap());
7406
7407 for _ in 0..MULTIPLE {
7408 assert_eq!(get_buffer(&mut rcv).enqueue_data(TEST_BYTES), TEST_BYTES.len());
7409 }
7410 let assembler = assert_matches!(&mut rcv.buffer,
7411 RecvBufferState::Open { ref mut assembler, .. } => assembler);
7412 assert_eq!(assembler.insert(ISS_1..ISS_1 + CAP), CAP);
7413 assert_eq!(rcv.calculate_window_size().window_size, WindowSize::ZERO);
7415
7416 assert_eq!(get_buffer(&mut rcv).read_with(|_| 1), 1);
7419 assert_eq!(rcv.calculate_window_size().window_size, WindowSize::ZERO);
7420
7421 assert_eq!(get_buffer(&mut rcv).read_with(|_| TEST_BYTES.len()), TEST_BYTES.len());
7424 assert_eq!(
7425 rcv.calculate_window_size().window_size,
7426 WindowSize::new(TEST_BYTES.len() + 1).unwrap()
7427 );
7428 }
7429
7430 #[test]
7431 fn correct_window_scale_during_send() {
7433 let snd_wnd_scale = WindowScale::new(4).unwrap();
7434 let rcv_wnd_scale = WindowScale::new(8).unwrap();
7435 let wnd_size = WindowSize::new(1024).unwrap();
7436
7437 let counters = FakeTcpCounters::default();
7438 let new_snd = || Send {
7441 nxt: ISS_1 + 1,
7442 max: ISS_1 + 1,
7443 una: ISS_1 + 1,
7444 wnd: wnd_size,
7445 wnd_max: WindowSize::DEFAULT,
7446 buffer: RingBuffer::with_data(TEST_BYTES.len(), TEST_BYTES),
7447 wl1: ISS_2 + 1,
7448 wl2: ISS_1 + 1,
7449 rtt_estimator: Estimator::default(),
7450 rtt_sampler: RttSampler::default(),
7451 timer: None,
7452 congestion_control: CongestionControl::cubic_with_mss(
7453 DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
7454 ),
7455 wnd_scale: snd_wnd_scale,
7456 };
7457 let new_rcv = || Recv {
7458 buffer: RecvBufferState::Open {
7459 buffer: RingBuffer::new(wnd_size.into()),
7460 assembler: Assembler::new(ISS_2 + 1),
7461 },
7462 timer: None,
7463 remaining_quickacks: 0,
7464 last_segment_at: None,
7465 mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
7466 wnd_scale: rcv_wnd_scale,
7467 last_window_update: (ISS_2 + 1, WindowSize::ZERO),
7468 sack_permitted: SACK_PERMITTED,
7469 };
7470 for mut state in [
7471 State::Established(Established { snd: new_snd().into(), rcv: new_rcv().into() }),
7472 State::FinWait1(FinWait1 { snd: new_snd().queue_fin().into(), rcv: new_rcv().into() }),
7473 State::Closing(Closing {
7474 snd: new_snd().queue_fin(),
7475 closed_rcv: RecvParams { ack: ISS_2 + 1, wnd: wnd_size, wnd_scale: rcv_wnd_scale },
7476 }),
7477 State::CloseWait(CloseWait {
7478 snd: new_snd().into(),
7479 closed_rcv: RecvParams { ack: ISS_2 + 1, wnd: wnd_size, wnd_scale: rcv_wnd_scale },
7480 }),
7481 State::LastAck(LastAck {
7482 snd: new_snd().queue_fin(),
7483 closed_rcv: RecvParams { ack: ISS_2 + 1, wnd: wnd_size, wnd_scale: rcv_wnd_scale },
7484 }),
7485 ] {
7486 assert_eq!(
7487 state.poll_send_with_default_options(
7488 u32::try_from(TEST_BYTES.len()).unwrap(),
7489 FakeInstant::default(),
7490 &counters.refs(),
7491 ),
7492 Some(Segment::data(
7493 ISS_1 + 1,
7494 ISS_2 + 1,
7495 UnscaledWindowSize::from(4),
7498 FragmentedPayload::new_contiguous(TEST_BYTES)
7499 ))
7500 );
7501 }
7502 }
7503
7504 #[test_case(true; "prompted window update")]
7505 #[test_case(false; "unprompted window update")]
7506 fn snd_silly_window_avoidance(prompted_window_update: bool) {
7507 const CAP: usize = TEST_BYTES.len() * 2;
7508 let mut snd: Send<FakeInstant, RingBuffer, false> = Send {
7509 nxt: ISS_1 + 1,
7510 max: ISS_1 + 1,
7511 una: ISS_1 + 1,
7512 wnd: WindowSize::new(CAP).unwrap(),
7513 wnd_scale: WindowScale::default(),
7514 wnd_max: WindowSize::DEFAULT,
7515 wl1: ISS_1,
7516 wl2: ISS_2,
7517 buffer: RingBuffer::new(CAP),
7518 rtt_sampler: RttSampler::default(),
7519 rtt_estimator: Estimator::default(),
7520 timer: None,
7521 congestion_control: CongestionControl::cubic_with_mss(Mss(NonZeroU16::new(
7522 TEST_BYTES.len() as u16,
7523 )
7524 .unwrap())),
7525 };
7526
7527 let mut clock = FakeInstantCtx::default();
7528 let counters = FakeTcpCounters::default();
7529
7530 assert_eq!(snd.buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len());
7532 assert_eq!(snd.buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len());
7533
7534 assert_eq!(
7536 snd.poll_send(
7537 &FakeStateMachineDebugId,
7538 &counters.refs(),
7539 &RecvParams {
7540 ack: ISS_2 + 1,
7541 wnd: WindowSize::DEFAULT,
7542 wnd_scale: WindowScale::ZERO,
7543 },
7544 u32::MAX,
7545 clock.now(),
7546 &SocketOptions::default_for_state_tests(),
7547 ),
7548 Some(Segment::data(
7549 ISS_1 + 1,
7550 ISS_2 + 1,
7551 UnscaledWindowSize::from(u16::MAX),
7552 FragmentedPayload::new_contiguous(TEST_BYTES),
7553 )),
7554 );
7555
7556 assert_eq!(
7557 snd.process_ack(
7558 &FakeStateMachineDebugId,
7559 &counters.refs(),
7560 ISS_2 + 1,
7561 ISS_1 + 1 + TEST_BYTES.len(),
7562 UnscaledWindowSize::from(0),
7563 true,
7564 &RecvParams {
7565 ack: ISS_2 + 1,
7566 wnd_scale: WindowScale::default(),
7567 wnd: WindowSize::DEFAULT,
7568 },
7569 clock.now(),
7570 &SocketOptions::default(),
7571 ),
7572 (None, DataAcked::Yes)
7573 );
7574
7575 assert_eq!(
7578 snd.poll_send(
7579 &FakeStateMachineDebugId,
7580 &counters.refs(),
7581 &RecvParams {
7582 ack: ISS_2 + 1,
7583 wnd: WindowSize::DEFAULT,
7584 wnd_scale: WindowScale::ZERO,
7585 },
7586 u32::MAX,
7587 clock.now(),
7588 &SocketOptions::default_for_state_tests(),
7589 ),
7590 None
7591 );
7592
7593 assert_eq!(
7594 snd.timer,
7595 Some(SendTimer::ZeroWindowProbe(RetransTimer::new(
7596 clock.now(),
7597 snd.rtt_estimator.rto(),
7598 None,
7599 DEFAULT_MAX_RETRIES,
7600 )))
7601 );
7602
7603 clock.sleep_until(snd.timer.as_ref().unwrap().expiry());
7604
7605 assert_eq!(
7606 snd.poll_send(
7607 &FakeStateMachineDebugId,
7608 &counters.refs(),
7609 &RecvParams {
7610 ack: ISS_2 + 1,
7611 wnd: WindowSize::DEFAULT,
7612 wnd_scale: WindowScale::ZERO,
7613 },
7614 u32::MAX,
7615 clock.now(),
7616 &SocketOptions::default_for_state_tests(),
7617 ),
7618 Some(Segment::data(
7619 ISS_1 + 1 + TEST_BYTES.len(),
7620 ISS_2 + 1,
7621 UnscaledWindowSize::from(u16::MAX),
7622 FragmentedPayload::new_contiguous(&TEST_BYTES[..1]),
7623 ))
7624 );
7625
7626 if prompted_window_update {
7627 assert_eq!(
7630 snd.process_ack(
7631 &FakeStateMachineDebugId,
7632 &counters.refs(),
7633 ISS_2 + 1,
7634 ISS_1 + 1 + TEST_BYTES.len() + 1,
7635 UnscaledWindowSize::from(3),
7636 true,
7637 &RecvParams {
7638 ack: ISS_2 + 1,
7639 wnd_scale: WindowScale::default(),
7640 wnd: WindowSize::DEFAULT,
7641 },
7642 clock.now(),
7643 &SocketOptions::default(),
7644 ),
7645 (None, DataAcked::Yes)
7646 );
7647 } else {
7648 assert_eq!(
7650 snd.process_ack(
7651 &FakeStateMachineDebugId,
7652 &counters.refs(),
7653 ISS_2 + 1,
7654 ISS_1 + 1 + TEST_BYTES.len(),
7655 UnscaledWindowSize::from(0),
7656 true,
7657 &RecvParams {
7658 ack: ISS_2 + 1,
7659 wnd_scale: WindowScale::default(),
7660 wnd: WindowSize::DEFAULT,
7661 },
7662 clock.now(),
7663 &SocketOptions::default(),
7664 ),
7665 (None, DataAcked::No)
7666 );
7667
7668 assert_eq!(
7671 snd.process_ack(
7672 &FakeStateMachineDebugId,
7673 &counters.refs(),
7674 ISS_2 + 1,
7675 ISS_1 + 1 + TEST_BYTES.len(),
7676 UnscaledWindowSize::from(3),
7677 true,
7678 &RecvParams {
7679 ack: ISS_2 + 1,
7680 wnd_scale: WindowScale::default(),
7681 wnd: WindowSize::DEFAULT,
7682 },
7683 clock.now(),
7684 &SocketOptions::default(),
7685 ),
7686 (None, DataAcked::No)
7687 );
7688 }
7689
7690 assert_eq!(
7692 snd.poll_send(
7693 &FakeStateMachineDebugId,
7694 &counters.refs(),
7695 &RecvParams {
7696 ack: ISS_2 + 1,
7697 wnd: WindowSize::DEFAULT,
7698 wnd_scale: WindowScale::ZERO,
7699 },
7700 u32::MAX,
7701 clock.now(),
7702 &SocketOptions::default_for_state_tests(),
7703 ),
7704 None,
7705 );
7706 assert_eq!(
7707 snd.timer,
7708 Some(SendTimer::SWSProbe { at: clock.now().panicking_add(SWS_PROBE_TIMEOUT) })
7709 );
7710 clock.sleep(SWS_PROBE_TIMEOUT);
7711
7712 let seq_index = usize::from(prompted_window_update);
7715 assert_eq!(
7716 snd.poll_send(
7717 &FakeStateMachineDebugId,
7718 &counters.refs(),
7719 &RecvParams {
7720 ack: ISS_2 + 1,
7721 wnd: WindowSize::DEFAULT,
7722 wnd_scale: WindowScale::ZERO,
7723 },
7724 u32::MAX,
7725 clock.now(),
7726 &SocketOptions::default_for_state_tests(),
7727 ),
7728 Some(Segment::data(
7729 ISS_1 + 1 + TEST_BYTES.len() + seq_index,
7730 ISS_2 + 1,
7731 UnscaledWindowSize::from(u16::MAX),
7732 FragmentedPayload::new_contiguous(&TEST_BYTES[seq_index..3 + seq_index]),
7733 ))
7734 );
7735 }
7736
7737 #[test]
7738 fn snd_enter_zwp_on_negative_window_update() {
7739 const CAP: usize = TEST_BYTES.len() * 2;
7740 let mut snd: Send<FakeInstant, RingBuffer, false> = Send {
7741 nxt: ISS_1 + 1,
7742 max: ISS_1 + 1,
7743 una: ISS_1 + 1,
7744 wnd: WindowSize::new(CAP).unwrap(),
7745 wnd_scale: WindowScale::default(),
7746 wnd_max: WindowSize::DEFAULT,
7747 wl1: ISS_1,
7748 wl2: ISS_2,
7749 buffer: RingBuffer::new(CAP),
7750 rtt_sampler: RttSampler::default(),
7751 rtt_estimator: Estimator::default(),
7752 timer: None,
7753 congestion_control: CongestionControl::cubic_with_mss(Mss(NonZeroU16::new(
7754 TEST_BYTES.len() as u16,
7755 )
7756 .unwrap())),
7757 };
7758
7759 let clock = FakeInstantCtx::default();
7760 let counters = FakeTcpCounters::default();
7761
7762 assert_eq!(snd.buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len());
7764 assert_eq!(snd.buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len());
7765
7766 assert_eq!(
7768 snd.poll_send(
7769 &FakeStateMachineDebugId,
7770 &counters.refs(),
7771 &RecvParams {
7772 ack: ISS_2 + 1,
7773 wnd: WindowSize::DEFAULT,
7774 wnd_scale: WindowScale::ZERO,
7775 },
7776 u32::MAX,
7777 clock.now(),
7778 &SocketOptions::default_for_state_tests(),
7779 ),
7780 Some(Segment::data(
7781 ISS_1 + 1,
7782 ISS_2 + 1,
7783 UnscaledWindowSize::from(u16::MAX),
7784 FragmentedPayload::new_contiguous(TEST_BYTES),
7785 )),
7786 );
7787
7788 assert_matches!(snd.timer, Some(SendTimer::Retrans(_)));
7791
7792 assert_eq!(
7800 snd.process_ack(
7801 &FakeStateMachineDebugId,
7802 &counters.refs(),
7803 ISS_2 + 1,
7804 ISS_1 + 1 + TEST_BYTES.len(),
7805 UnscaledWindowSize::from(0),
7806 true,
7807 &RecvParams {
7808 ack: ISS_2 + 1,
7809 wnd_scale: WindowScale::default(),
7810 wnd: WindowSize::DEFAULT,
7811 },
7812 clock.now(),
7813 &SocketOptions::default(),
7814 ),
7815 (None, DataAcked::Yes)
7816 );
7817
7818 assert_eq!(
7821 snd.poll_send(
7822 &FakeStateMachineDebugId,
7823 &counters.refs(),
7824 &RecvParams {
7825 ack: ISS_2 + 1,
7826 wnd: WindowSize::DEFAULT,
7827 wnd_scale: WindowScale::ZERO,
7828 },
7829 u32::MAX,
7830 clock.now(),
7831 &SocketOptions::default_for_state_tests(),
7832 ),
7833 None,
7834 );
7835 assert_matches!(snd.timer, Some(SendTimer::ZeroWindowProbe(_)));
7836 }
7837
7838 #[test]
7839 fn ack_uses_snd_max() {
7841 let counters = FakeTcpCounters::default();
7842 let mss = Mss(NonZeroU16::new(u16::try_from(TEST_BYTES.len()).unwrap()).unwrap());
7843
7844 let mut clock = FakeInstantCtx::default();
7845 let mut buffer = RingBuffer::new(BUFFER_SIZE);
7846 assert_eq!(buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len());
7847 assert_eq!(buffer.enqueue_data(TEST_BYTES), TEST_BYTES.len());
7848
7849 let mut state: State<_, _, _, ()> = State::Established(Established {
7851 snd: Send {
7852 nxt: ISS_1 + 1,
7853 max: ISS_1 + 1,
7854 una: ISS_1 + 1,
7855 wnd: WindowSize::DEFAULT,
7856 wnd_max: WindowSize::DEFAULT,
7857 buffer,
7858 wl1: ISS_1,
7859 wl2: ISS_1,
7860 rtt_sampler: RttSampler::default(),
7861 rtt_estimator: Estimator::NoSample,
7862 timer: None,
7863 congestion_control: CongestionControl::cubic_with_mss(mss),
7864 wnd_scale: WindowScale::default(),
7865 }
7866 .into(),
7867 rcv: Recv {
7868 buffer: RecvBufferState::Open {
7869 buffer: RingBuffer::new(BUFFER_SIZE),
7870 assembler: Assembler::new(ISS_1 + 1),
7871 },
7872 timer: None,
7873 mss,
7874 wnd_scale: WindowScale::default(),
7875 remaining_quickacks: 0,
7876 last_segment_at: None,
7877 last_window_update: (ISS_1 + 1, WindowSize::new(BUFFER_SIZE).unwrap()),
7878 sack_permitted: SACK_PERMITTED,
7879 }
7880 .into(),
7881 });
7882
7883 assert_eq!(
7885 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
7886 Some(Segment::data(
7887 ISS_1 + 1,
7888 ISS_1 + 1,
7889 UnscaledWindowSize::from(u16::try_from(BUFFER_SIZE).unwrap()),
7890 FragmentedPayload::new_contiguous(TEST_BYTES),
7891 )),
7892 );
7893 assert_eq!(
7894 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
7895 Some(Segment::data(
7896 ISS_1 + 1 + TEST_BYTES.len(),
7897 ISS_1 + 1,
7898 UnscaledWindowSize::from(u16::try_from(BUFFER_SIZE).unwrap()),
7899 FragmentedPayload::new_contiguous(TEST_BYTES),
7900 )),
7901 );
7902
7903 clock.sleep(Rto::DEFAULT.get());
7905 assert_eq!(
7906 state.poll_send_with_default_options(u32::MAX, clock.now(), &counters.refs()),
7907 Some(Segment::data(
7908 ISS_1 + 1,
7909 ISS_1 + 1,
7910 UnscaledWindowSize::from(u16::try_from(BUFFER_SIZE).unwrap()),
7911 FragmentedPayload::new_contiguous(TEST_BYTES),
7912 )),
7913 );
7914
7915 assert_eq!(
7918 state.on_segment_with_default_options::<_, ClientlessBufferProvider>(
7919 Segment::data(
7920 ISS_1 + 1,
7921 ISS_1 + 1,
7922 UnscaledWindowSize::from(u16::try_from(BUFFER_SIZE).unwrap()),
7923 TEST_BYTES,
7924 ),
7925 clock.now(),
7926 &counters.refs(),
7927 ),
7928 (
7929 Some(Segment::ack(
7930 ISS_1 + 1 + 2 * TEST_BYTES.len(),
7931 ISS_1 + 1 + TEST_BYTES.len(),
7932 UnscaledWindowSize::from(
7933 u16::try_from(BUFFER_SIZE - TEST_BYTES.len()).unwrap()
7934 ),
7935 )),
7936 None,
7937 )
7938 );
7939 }
7940
7941 #[test_case(
7942 State::Closed(Closed { reason: None }),
7943 State::Closed(Closed { reason: None }) => NewlyClosed::No; "closed to closed")]
7944 #[test_case(
7945 State::SynSent(SynSent {
7946 iss: ISS_1,
7947 timestamp: Some(FakeInstant::default()),
7948 retrans_timer: RetransTimer::new(
7949 FakeInstant::default(),
7950 Rto::DEFAULT,
7951 None,
7952 DEFAULT_MAX_SYN_RETRIES,
7953 ),
7954 active_open: (),
7955 buffer_sizes: BufferSizes::default(),
7956 default_mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
7957 device_mss: DEVICE_MAXIMUM_SEGMENT_SIZE,
7958 rcv_wnd_scale: WindowScale::default(),
7959 }),
7960 State::Closed(Closed { reason: None }) => NewlyClosed::Yes; "non-closed to closed")]
7961 #[test_case(
7962 State::SynSent(SynSent {
7963 iss: ISS_1,
7964 timestamp: Some(FakeInstant::default()),
7965 retrans_timer: RetransTimer::new(
7966 FakeInstant::default(),
7967 Rto::DEFAULT,
7968 None,
7969 DEFAULT_MAX_SYN_RETRIES,
7970 ),
7971 active_open: (),
7972 buffer_sizes: BufferSizes::default(),
7973 default_mss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
7974 device_mss: DEVICE_MAXIMUM_SEGMENT_SIZE,
7975 rcv_wnd_scale: WindowScale::default(),
7976 },
7977 ),
7978 State::SynRcvd(SynRcvd {
7979 iss: ISS_1,
7980 irs: ISS_2,
7981 timestamp: None,
7982 retrans_timer: RetransTimer::new(
7983 FakeInstant::default(),
7984 Rto::DEFAULT,
7985 NonZeroDuration::from_secs(10),
7986 DEFAULT_MAX_SYNACK_RETRIES,
7987 ),
7988 simultaneous_open: None,
7989 buffer_sizes: BufferSizes { send: 0, receive: 0 },
7990 smss: DEFAULT_IPV4_MAXIMUM_SEGMENT_SIZE,
7991 rcv_wnd_scale: WindowScale::new(0).unwrap(),
7992 snd_wnd_scale: WindowScale::new(0),
7993 sack_permitted: SACK_PERMITTED,
7994 }) => NewlyClosed::No; "non-closed to non-closed")]
7995 fn transition_to_state(
7996 mut old_state: State<FakeInstant, RingBuffer, RingBuffer, ()>,
7997 new_state: State<FakeInstant, RingBuffer, RingBuffer, ()>,
7998 ) -> NewlyClosed {
7999 let counters = FakeTcpCounters::default();
8000 old_state.transition_to_state(&counters.refs(), new_state)
8001 }
8002
8003 #[test_case(true, false; "more than mss dequeued")]
8004 #[test_case(false, false; "less than mss dequeued")]
8005 #[test_case(true, true; "more than mss dequeued and delack")]
8006 #[test_case(false, true; "less than mss dequeued and delack")]
8007 fn poll_receive_data_dequeued_state(dequeue_more_than_mss: bool, delayed_ack: bool) {
8008 const BUFFER_SIZE: usize = 5;
8011 const MSS: Mss = Mss(NonZeroU16::new(5).unwrap());
8012 const TEST_BYTES: &[u8] = "Hello".as_bytes();
8013
8014 let new_snd = || Send {
8015 nxt: ISS_1 + 1,
8016 max: ISS_1 + 1,
8017 una: ISS_1 + 1,
8018 wnd: WindowSize::ZERO,
8019 wnd_max: WindowSize::ZERO,
8020 buffer: NullBuffer,
8021 wl1: ISS_2 + 1,
8022 wl2: ISS_1 + 1,
8023 rtt_estimator: Estimator::default(),
8024 rtt_sampler: RttSampler::default(),
8025 timer: None,
8026 congestion_control: CongestionControl::cubic_with_mss(MSS),
8027 wnd_scale: WindowScale::default(),
8028 };
8029 let new_rcv = || Recv {
8030 buffer: RecvBufferState::Open {
8031 buffer: RingBuffer::new(BUFFER_SIZE),
8032 assembler: Assembler::new(ISS_2 + 1),
8033 },
8034 timer: None,
8035 mss: MSS,
8036 remaining_quickacks: 0,
8037 last_segment_at: None,
8038 wnd_scale: WindowScale::default(),
8039 last_window_update: (ISS_2 + 1, WindowSize::new(BUFFER_SIZE).unwrap()),
8040 sack_permitted: SACK_PERMITTED,
8041 };
8042
8043 let clock = FakeInstantCtx::default();
8044 let counters = FakeTcpCounters::default();
8045 for mut state in [
8046 State::Established(Established { snd: new_snd().into(), rcv: new_rcv().into() }),
8047 State::FinWait1(FinWait1 { snd: new_snd().queue_fin().into(), rcv: new_rcv().into() }),
8048 State::FinWait2(FinWait2 { last_seq: ISS_1 + 1, rcv: new_rcv(), timeout_at: None }),
8049 ] {
8050 assert_matches!(state.poll_receive_data_dequeued(), None);
8053 let expect_window_update = (ISS_2 + 1, WindowSize::new(BUFFER_SIZE).unwrap());
8054 assert_eq!(state.recv_mut().unwrap().last_window_update, expect_window_update);
8055
8056 let seg = state.on_segment_with_options::<_, ClientlessBufferProvider>(
8060 Segment::data(ISS_2 + 1, ISS_1 + 1, UnscaledWindowSize::from(0), TEST_BYTES),
8061 clock.now(),
8062 &counters.refs(),
8063 &SocketOptions { delayed_ack, ..SocketOptions::default_for_state_tests() },
8064 );
8065
8066 let expect_ack = (!delayed_ack).then_some(Segment::ack(
8067 ISS_1 + 1,
8068 ISS_2 + 1 + TEST_BYTES.len(),
8069 UnscaledWindowSize::from((BUFFER_SIZE - TEST_BYTES.len()) as u16),
8070 ));
8071 assert_eq!(seg, (expect_ack, None));
8072 assert_matches!(state.poll_receive_data_dequeued(), None);
8073
8074 let expect_window_update = if delayed_ack {
8075 expect_window_update
8076 } else {
8077 (ISS_2 + 1 + TEST_BYTES.len(), WindowSize::new(0).unwrap())
8078 };
8079 assert_eq!(state.recv_mut().unwrap().last_window_update, expect_window_update);
8080
8081 if dequeue_more_than_mss {
8082 assert_eq!(
8086 state.read_with(|available| {
8087 assert_eq!(available, &[TEST_BYTES]);
8088 available[0].len()
8089 }),
8090 TEST_BYTES.len()
8091 );
8092 assert_eq!(
8093 state.poll_receive_data_dequeued(),
8094 Some(Segment::ack(
8095 ISS_1 + 1,
8096 ISS_2 + 1 + TEST_BYTES.len(),
8097 UnscaledWindowSize::from(BUFFER_SIZE as u16)
8098 ))
8099 );
8100 assert_eq!(
8101 state.recv_mut().unwrap().last_window_update,
8102 (ISS_2 + 1 + TEST_BYTES.len(), WindowSize::new(BUFFER_SIZE).unwrap())
8103 );
8104 assert!(state.recv_mut().unwrap().timer.is_none());
8106 } else {
8107 let mss: usize = MSS.get().get().into();
8111 assert_eq!(
8112 state.read_with(|available| {
8113 assert_eq!(available, &[TEST_BYTES]);
8114 mss / 2 - 1
8115 }),
8116 mss / 2 - 1
8117 );
8118 assert_eq!(state.poll_receive_data_dequeued(), None,);
8119 assert_eq!(
8120 state.recv_mut().unwrap().last_window_update,
8121 expect_window_update,
8123 );
8124 assert_eq!(state.recv_mut().unwrap().timer.is_some(), delayed_ack);
8125 }
8126 }
8127 }
8128
8129 #[test]
8130 fn poll_receive_data_dequeued_small_window() {
8131 const MSS: Mss = Mss(NonZeroU16::new(65000).unwrap());
8132 const WINDOW: WindowSize = WindowSize::from_u32(500).unwrap();
8133 let mut recv = Recv::<FakeInstant, _> {
8134 buffer: RecvBufferState::Open {
8135 buffer: RingBuffer::new(WINDOW.into()),
8136 assembler: Assembler::new(ISS_2 + 1),
8137 },
8138 timer: None,
8139 mss: MSS,
8140 remaining_quickacks: 0,
8141 last_segment_at: None,
8142 wnd_scale: WindowScale::default(),
8143 last_window_update: (ISS_2 + 1, WindowSize::from_u32(1).unwrap()),
8144 sack_permitted: SACK_PERMITTED,
8145 };
8146 let seg = recv.poll_receive_data_dequeued(ISS_1).expect("generates segment");
8147 assert_eq!(seg.header.ack, Some(recv.nxt()));
8148 assert_eq!(seg.header.wnd << recv.wnd_scale, WINDOW);
8149 }
8150
8151 #[test]
8152 fn quickack_period() {
8153 let mut quickack = default_quickack_counter();
8154 let mut state = State::Established(Established::<FakeInstant, _, _> {
8155 snd: Send {
8156 nxt: ISS_1 + 1,
8157 max: ISS_1 + 1,
8158 una: ISS_1 + 1,
8159 wnd: WindowSize::ZERO,
8160 wnd_max: WindowSize::ZERO,
8161 buffer: NullBuffer,
8162 wl1: ISS_2 + 1,
8163 wl2: ISS_1 + 1,
8164 rtt_estimator: Estimator::default(),
8165 rtt_sampler: RttSampler::default(),
8166 timer: None,
8167 congestion_control: CongestionControl::cubic_with_mss(DEVICE_MAXIMUM_SEGMENT_SIZE),
8168 wnd_scale: WindowScale::default(),
8169 }
8170 .into(),
8171 rcv: Recv {
8172 buffer: RecvBufferState::Open {
8173 buffer: RingBuffer::default(),
8174 assembler: Assembler::new(ISS_2 + 1),
8175 },
8176 timer: None,
8177 mss: DEVICE_MAXIMUM_SEGMENT_SIZE,
8178 remaining_quickacks: quickack,
8179 last_segment_at: None,
8180 wnd_scale: WindowScale::default(),
8181 last_window_update: (ISS_2 + 1, WindowSize::DEFAULT),
8182 sack_permitted: SACK_PERMITTED,
8183 }
8184 .into(),
8185 });
8186 let socket_options = SocketOptions { delayed_ack: true, ..Default::default() };
8187 let clock = FakeInstantCtx::default();
8188 let counters = FakeTcpCounters::default();
8189 let data = vec![0u8; usize::from(DEVICE_MAXIMUM_SEGMENT_SIZE)];
8190 while quickack != 0 {
8191 let seq = state.recv_mut().unwrap().nxt();
8192 let (segment, _) = Segment::with_data(
8193 seq,
8194 Some(ISS_1 + 1),
8195 None,
8196 WindowSize::ZERO >> WindowScale::default(),
8197 &data[..],
8198 );
8199 let (seg, passive_open) = state.on_segment_with_options::<_, ClientlessBufferProvider>(
8200 segment,
8201 clock.now(),
8202 &counters.refs(),
8203 &socket_options,
8204 );
8205 let recv = state.recv_mut().unwrap();
8206 assert_matches!(recv.timer, None);
8207
8208 assert_eq!(passive_open, None);
8209 let seg = seg.expect("no segment generated");
8210 assert_eq!(seg.header.ack, Some(seq + u32::try_from(data.len()).unwrap()));
8211 assert_eq!(recv.remaining_quickacks, quickack - 1);
8212 quickack -= 1;
8213 state.buffers_mut().into_receive_buffer().unwrap().reset();
8214 }
8215
8216 let (segment, _) = Segment::with_data(
8218 state.recv_mut().unwrap().nxt(),
8219 Some(ISS_1 + 1),
8220 None,
8221 WindowSize::ZERO >> WindowScale::default(),
8222 &data[..],
8223 );
8224 let (seg, passive_open) = state.on_segment_with_options::<_, ClientlessBufferProvider>(
8225 segment,
8226 clock.now(),
8227 &counters.refs(),
8228 &socket_options,
8229 );
8230 assert_eq!(passive_open, None);
8231 assert_eq!(seg, None);
8232 assert_matches!(state.recv_mut().unwrap().timer, Some(ReceiveTimer::DelayedAck { .. }));
8233 }
8234
8235 #[test]
8236 fn quickack_reset_out_of_window() {
8237 let mut state = State::Established(Established::<FakeInstant, _, _> {
8238 snd: Send {
8239 nxt: ISS_1 + 1,
8240 max: ISS_1 + 1,
8241 una: ISS_1 + 1,
8242 wnd: WindowSize::ZERO,
8243 wnd_max: WindowSize::ZERO,
8244 buffer: NullBuffer,
8245 wl1: ISS_2 + 1,
8246 wl2: ISS_1 + 1,
8247 rtt_estimator: Estimator::default(),
8248 rtt_sampler: RttSampler::default(),
8249 timer: None,
8250 congestion_control: CongestionControl::cubic_with_mss(DEVICE_MAXIMUM_SEGMENT_SIZE),
8251 wnd_scale: WindowScale::default(),
8252 }
8253 .into(),
8254 rcv: Recv {
8255 buffer: RecvBufferState::Open {
8256 buffer: RingBuffer::default(),
8257 assembler: Assembler::new(ISS_2 + 1),
8258 },
8259 timer: None,
8260 mss: DEVICE_MAXIMUM_SEGMENT_SIZE,
8261 remaining_quickacks: 0,
8263 last_segment_at: None,
8264 wnd_scale: WindowScale::default(),
8265 last_window_update: (ISS_2 + 1, WindowSize::DEFAULT),
8266 sack_permitted: SACK_PERMITTED,
8267 }
8268 .into(),
8269 });
8270 let clock = FakeInstantCtx::default();
8271 let counters = FakeTcpCounters::default();
8272 let data = vec![0u8; usize::from(DEVICE_MAXIMUM_SEGMENT_SIZE)];
8273
8274 let (segment, _) = Segment::with_data(
8275 state.recv_mut().unwrap().nxt() - i32::try_from(data.len() + 1).unwrap(),
8277 Some(ISS_1 + 1),
8278 None,
8279 WindowSize::ZERO >> WindowScale::default(),
8280 &data[..],
8281 );
8282 let (seg, passive_open) = state
8283 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
8284 segment,
8285 clock.now(),
8286 &counters.refs(),
8287 );
8288 assert_eq!(passive_open, None);
8289 let recv = state.recv_mut().unwrap();
8290 let seg = seg.expect("expected segment");
8291 assert_eq!(seg.header.ack, Some(recv.nxt()));
8292 assert_eq!(recv.remaining_quickacks, default_quickack_counter());
8293 }
8294
8295 #[test]
8296 fn quickack_reset_rto() {
8297 let mut clock = FakeInstantCtx::default();
8298 let mut state = State::Established(Established::<FakeInstant, _, _> {
8299 snd: Send {
8300 nxt: ISS_1 + 1,
8301 max: ISS_1 + 1,
8302 una: ISS_1 + 1,
8303 wnd: WindowSize::ZERO,
8304 wnd_max: WindowSize::ZERO,
8305 buffer: NullBuffer,
8306 wl1: ISS_2 + 1,
8307 wl2: ISS_1 + 1,
8308 rtt_estimator: Estimator::default(),
8309 rtt_sampler: RttSampler::default(),
8310 timer: None,
8311 congestion_control: CongestionControl::cubic_with_mss(DEVICE_MAXIMUM_SEGMENT_SIZE),
8312 wnd_scale: WindowScale::default(),
8313 }
8314 .into(),
8315 rcv: Recv {
8316 buffer: RecvBufferState::Open {
8317 buffer: RingBuffer::default(),
8318 assembler: Assembler::new(ISS_2 + 1),
8319 },
8320 timer: None,
8321 mss: DEVICE_MAXIMUM_SEGMENT_SIZE,
8322 remaining_quickacks: 0,
8324 last_segment_at: Some(clock.now()),
8325 wnd_scale: WindowScale::default(),
8326 last_window_update: (ISS_2 + 1, WindowSize::DEFAULT),
8327 sack_permitted: SACK_PERMITTED,
8328 }
8329 .into(),
8330 });
8331 let counters = FakeTcpCounters::default();
8332 let data = vec![0u8; usize::from(DEVICE_MAXIMUM_SEGMENT_SIZE)];
8333
8334 let (segment, _) = Segment::with_data(
8335 state.recv_mut().unwrap().nxt(),
8336 Some(ISS_1 + 1),
8337 None,
8338 WindowSize::ZERO >> WindowScale::default(),
8339 &data[..],
8340 );
8341 clock.sleep(Rto::DEFAULT.get());
8342 let (seg, passive_open) = state
8343 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
8344 segment,
8345 clock.now(),
8346 &counters.refs(),
8347 );
8348 assert_eq!(passive_open, None);
8349 let recv = state.recv_mut().unwrap();
8350 let seg = seg.expect("expected segment");
8351 assert_eq!(seg.header.ack, Some(recv.nxt()));
8352 assert_eq!(recv.remaining_quickacks, default_quickack_counter() - 1);
8355 assert_eq!(recv.last_segment_at, Some(clock.now()));
8356 }
8357
8358 #[test_case(true; "sack permitted")]
8359 #[test_case(false; "sack not permitted")]
8360 fn receiver_selective_acks(sack_permitted: bool) {
8361 let mut state = State::Established(Established::<FakeInstant, _, _> {
8362 snd: Send {
8363 nxt: ISS_1 + 1,
8364 max: ISS_1 + 1,
8365 una: ISS_1 + 1,
8366 wnd: WindowSize::DEFAULT,
8367 wnd_max: WindowSize::DEFAULT,
8368 buffer: RingBuffer::default(),
8369 wl1: ISS_2 + 1,
8370 wl2: ISS_1 + 1,
8371 rtt_estimator: Estimator::default(),
8372 rtt_sampler: RttSampler::default(),
8373 timer: None,
8374 congestion_control: CongestionControl::cubic_with_mss(DEVICE_MAXIMUM_SEGMENT_SIZE),
8375 wnd_scale: WindowScale::default(),
8376 }
8377 .into(),
8378 rcv: Recv {
8379 buffer: RecvBufferState::Open {
8380 buffer: RingBuffer::default(),
8381 assembler: Assembler::new(ISS_2 + 1),
8382 },
8383 timer: None,
8384 mss: DEVICE_MAXIMUM_SEGMENT_SIZE,
8385 remaining_quickacks: 0,
8387 last_segment_at: None,
8388 wnd_scale: WindowScale::default(),
8389 last_window_update: (ISS_2 + 1, WindowSize::DEFAULT),
8390 sack_permitted,
8391 }
8392 .into(),
8393 });
8394 let clock = FakeInstantCtx::default();
8395 let counters = FakeTcpCounters::default();
8396 let data = vec![0u8; usize::from(DEVICE_MAXIMUM_SEGMENT_SIZE)];
8397 let mss = u32::from(DEVICE_MAXIMUM_SEGMENT_SIZE);
8398 let seg_start = ISS_2 + 1 + mss;
8400 let (segment, _) = Segment::with_data(
8401 seg_start,
8402 Some(ISS_1 + 1),
8403 None,
8404 WindowSize::DEFAULT >> WindowScale::default(),
8405 &data[..],
8406 );
8407 let (seg, passive_open) = state
8408 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
8409 segment,
8410 clock.now(),
8411 &counters.refs(),
8412 );
8413 assert_eq!(passive_open, None);
8414 let seg = seg.expect("expected segment");
8415 assert_eq!(seg.header.ack, Some(ISS_2 + 1));
8416 let expect = if sack_permitted {
8417 SackBlocks::from_iter([SackBlock(seg_start, seg_start + mss)])
8418 } else {
8419 SackBlocks::default()
8420 };
8421 let sack_blocks = assert_matches!(seg.header.options, Options::Segment(o) => o.sack_blocks);
8422 assert_eq!(sack_blocks, expect);
8423
8424 assert_eq!(
8426 state.buffers_mut().into_send_buffer().unwrap().enqueue_data(&data[..]),
8427 data.len()
8428 );
8429 let seg = state
8430 .poll_send_with_default_options(mss, clock.now(), &counters.refs())
8431 .expect("generates segment");
8432 assert_eq!(seg.header.ack, Some(ISS_2 + 1));
8433
8434 let sack_blocks =
8436 assert_matches!(&seg.header.options, Options::Segment(o) => &o.sack_blocks);
8437 assert_eq!(sack_blocks, &expect);
8438 let expect_len = if sack_permitted {
8441 mss - 12
8443 } else {
8444 mss
8445 };
8446 assert_eq!(seg.len(), expect_len);
8447
8448 let (segment, _) = Segment::with_data(
8451 ISS_2 + 1,
8452 Some(ISS_1 + 1),
8453 None,
8454 WindowSize::DEFAULT >> WindowScale::default(),
8455 &data[..],
8456 );
8457 let (seg, passive_open) = state
8458 .on_segment_with_default_options::<_, ClientlessBufferProvider>(
8459 segment,
8460 clock.now(),
8461 &counters.refs(),
8462 );
8463 assert_eq!(passive_open, None);
8464 let seg = seg.expect("expected segment");
8465 assert_eq!(seg.header.ack, Some(ISS_2 + (2 * mss) + 1));
8466 let sack_blocks =
8467 assert_matches!(&seg.header.options, Options::Segment(o) => &o.sack_blocks);
8468 assert_eq!(sack_blocks, &SackBlocks::default());
8469 }
8470
8471 #[derive(Debug)]
8472 enum RttTestScenario {
8473 AckOne,
8474 AckTwo,
8475 Retransmit,
8476 AckPartial,
8477 }
8478
8479 #[test_case(RttTestScenario::AckOne)]
8481 #[test_case(RttTestScenario::AckTwo)]
8482 #[test_case(RttTestScenario::Retransmit)]
8483 #[test_case(RttTestScenario::AckPartial)]
8484 fn rtt(scenario: RttTestScenario) {
8485 let mut state = State::Established(Established::<FakeInstant, _, _> {
8486 snd: Send {
8487 nxt: ISS_1 + 1,
8488 max: ISS_1 + 1,
8489 una: ISS_1 + 1,
8490 wnd: WindowSize::DEFAULT,
8491 wnd_max: WindowSize::DEFAULT,
8492 buffer: RingBuffer::default(),
8493 wl1: ISS_2 + 1,
8494 wl2: ISS_1 + 1,
8495 rtt_estimator: Estimator::default(),
8496 rtt_sampler: RttSampler::default(),
8497 timer: None,
8498 congestion_control: CongestionControl::cubic_with_mss(DEVICE_MAXIMUM_SEGMENT_SIZE),
8499 wnd_scale: WindowScale::default(),
8500 }
8501 .into(),
8502 rcv: Recv {
8503 buffer: RecvBufferState::Open {
8504 buffer: RingBuffer::default(),
8505 assembler: Assembler::new(ISS_2 + 1),
8506 },
8507 timer: None,
8508 mss: DEVICE_MAXIMUM_SEGMENT_SIZE,
8509 remaining_quickacks: 0,
8511 last_segment_at: None,
8512 wnd_scale: WindowScale::default(),
8513 last_window_update: (ISS_2 + 1, WindowSize::DEFAULT),
8514 sack_permitted: SACK_PERMITTED,
8515 }
8516 .into(),
8517 });
8518
8519 const CLOCK_STEP: Duration = Duration::from_millis(1);
8520
8521 let data = "Hello World".as_bytes();
8522 let data_len_u32 = data.len().try_into().unwrap();
8523 let mut clock = FakeInstantCtx::default();
8524 let counters = FakeTcpCounters::default();
8525 for _ in 0..3 {
8526 assert_eq!(
8527 state.buffers_mut().into_send_buffer().unwrap().enqueue_data(data),
8528 data.len()
8529 );
8530 }
8531
8532 assert_eq!(state.assert_established().snd.rtt_sampler, RttSampler::NotTracking);
8533 let seg = state
8534 .poll_send_with_default_options(data_len_u32, clock.now(), &counters.refs())
8535 .expect("generate segment");
8536 assert_eq!(seg.header.seq, ISS_1 + 1);
8537 assert_eq!(seg.len(), data_len_u32);
8538 let expect_sampler = RttSampler::Tracking {
8539 range: (ISS_1 + 1)..(ISS_1 + 1 + seg.len()),
8540 timestamp: clock.now(),
8541 };
8542 assert_eq!(state.assert_established().snd.rtt_sampler, expect_sampler);
8543 clock.sleep(CLOCK_STEP);
8545 let seg = state
8546 .poll_send_with_default_options(data_len_u32, clock.now(), &counters.refs())
8547 .expect("generate segment");
8548 assert_eq!(seg.header.seq, ISS_1 + 1 + data.len());
8549 assert_eq!(seg.len(), data_len_u32);
8550 let established = state.assert_established();
8552 assert_eq!(established.snd.rtt_sampler, expect_sampler);
8553
8554 assert_eq!(established.snd.rtt_estimator.srtt(), None);
8556
8557 let (retransmit, ack_number) = match scenario {
8558 RttTestScenario::AckPartial => (false, ISS_1 + 1 + 1),
8559 RttTestScenario::AckOne => (false, ISS_1 + 1 + data.len()),
8560 RttTestScenario::AckTwo => (false, ISS_1 + 1 + data.len() * 2),
8561 RttTestScenario::Retransmit => (true, ISS_1 + 1 + data.len() * 2),
8562 };
8563
8564 if retransmit {
8565 let timeout = state.poll_send_at().expect("timeout should be present");
8567 clock.time = timeout;
8568 let seg = state
8569 .poll_send_with_default_options(data_len_u32, clock.now(), &counters.refs())
8570 .expect("generate segment");
8571 assert_eq!(seg.header.seq, ISS_1 + 1);
8573 assert_eq!(state.assert_established().snd.rtt_sampler, RttSampler::NotTracking);
8575 } else {
8576 clock.sleep(CLOCK_STEP);
8577 }
8578
8579 assert_eq!(
8580 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
8581 Segment::ack(ISS_2 + 1, ack_number, WindowSize::DEFAULT >> WindowScale::default()),
8582 clock.now(),
8583 &counters.refs()
8584 ),
8585 (None, None)
8586 );
8587 let established = state.assert_established();
8588 assert_eq!(established.snd.rtt_sampler, RttSampler::NotTracking);
8590 if retransmit {
8591 assert_eq!(established.snd.rtt_estimator.srtt(), None);
8593 } else {
8594 assert_eq!(established.snd.rtt_estimator.srtt(), Some(CLOCK_STEP * 2));
8597 }
8598
8599 clock.sleep(CLOCK_STEP);
8600 let seg = state
8601 .poll_send_with_default_options(data_len_u32, clock.now(), &counters.refs())
8602 .expect("generate segment");
8603 let seq = seg.header.seq;
8604 assert_eq!(seq, ISS_1 + 1 + data.len() * 2);
8605 let expect_sampler =
8606 RttSampler::Tracking { range: seq..(seq + seg.len()), timestamp: clock.now() };
8607 assert_eq!(state.assert_established().snd.rtt_sampler, expect_sampler);
8608
8609 clock.sleep(CLOCK_STEP);
8613 assert_eq!(
8614 state.on_segment_with_default_options::<(), ClientlessBufferProvider>(
8615 Segment::ack(
8616 ISS_2 + 1,
8617 ISS_1 + 1 + data.len() * 2,
8618 WindowSize::DEFAULT >> WindowScale::default()
8619 ),
8620 clock.now(),
8621 &counters.refs()
8622 ),
8623 (None, None)
8624 );
8625 assert_eq!(state.assert_established().snd.rtt_sampler, expect_sampler);
8626 }
8627}