1use core::fmt::Debug;
9use core::num::NonZeroU16;
10
11use assert_matches::assert_matches;
12use log::{debug, error, warn};
13use net_types::ip::Ip;
14use net_types::{SpecifiedAddr, Witness as _};
15use netstack3_base::socket::{
16 AddrIsMappedError, AddrVec, AddrVecIter, ConnAddr, ConnIpAddr, InsertError, IpAddrVec,
17 ListenerAddr, ListenerIpAddr, SocketCookie, SocketIpAddr, SocketIpAddrExt as _,
18};
19use netstack3_base::{
20 BidirectionalConverter as _, Control, CounterContext, CtxPair, EitherDeviceId, IpDeviceAddr,
21 Marks, Mss, NotFoundError, Payload, Segment, SegmentHeader, SeqNum, StrongDeviceIdentifier,
22 VerifiedTcpSegment, WeakDeviceIdentifier,
23};
24use netstack3_filter::{
25 FilterIpExt, SocketIngressFilterResult, SocketOpsFilter, TransportPacketSerializer,
26};
27use netstack3_hashmap::hash_map;
28use netstack3_ip::socket::{IpSockCreationError, IpSocketArgs, MmsError};
29use netstack3_ip::{
30 IpHeaderInfo, IpTransportContext, LocalDeliveryPacketInfo, ReceiveIpPacketMeta,
31 TransportIpContext,
32};
33use netstack3_trace::trace_duration;
34use packet::{
35 BufferMut, BufferView as _, EmptyBuf, FragmentedByteSlice, InnerPacketBuilder, PacketBuilder,
36 ParseBuffer,
37};
38use packet_formats::error::ParseError;
39use packet_formats::ip::IpProto;
40use packet_formats::tcp::{
41 TcpFlowAndSeqNum, TcpOptionsTooLongError, TcpParseArgs, TcpSegment, TcpSegmentBuilder,
42 TcpSegmentBuilderWithOptions, TcpSegmentRaw,
43};
44
45use crate::internal::base::{BufferSizes, ConnectionError, SocketOptions, TcpIpSockOptions};
46use crate::internal::counters::{
47 self, TcpCounterContext, TcpCountersRefs, TcpCountersWithoutSocket,
48};
49use crate::internal::socket::generators::{IsnGenerator, TimestampOffsetGenerator};
50use crate::internal::socket::{
51 self, AsThisStack as _, Connection, CoreTxMetadataContext, DemuxState, DeviceIpSocketHandler,
52 DoSendLimit, DualStackBaseIpExt, DualStackDemuxIdConverter as _, DualStackIpExt, EitherStack,
53 HandshakeStatus, Listener, ListenerAddrState, MaybeDualStack, PrimaryRc, TcpApi,
54 TcpBindingsContext, TcpBindingsTypes, TcpContext, TcpDemuxContext, TcpDualStackContext,
55 TcpIpTransportContext, TcpPortSpec, TcpSocketId, TcpSocketSetEntry, TcpSocketState,
56 TcpSocketStateInner, TcpSocketTxMetadata,
57};
58use crate::internal::state::{
59 BufferProvider, Closed, DataAcked, Initial, NewlyClosed, State, TimeWait,
60};
61
62impl<BT: TcpBindingsTypes> BufferProvider<BT::ReceiveBuffer, BT::SendBuffer> for BT {
63 type ActiveOpen = BT::ListenerNotifierOrProvidedBuffers;
64
65 type PassiveOpen = BT::ReturnedBuffers;
66
67 fn new_passive_open_buffers(
68 buffer_sizes: BufferSizes,
69 ) -> (BT::ReceiveBuffer, BT::SendBuffer, Self::PassiveOpen) {
70 BT::new_passive_open_buffers(buffer_sizes)
71 }
72}
73
74pub type DualStackTcpSocketId<I, D, BT> = <I as DualStackBaseIpExt>::DemuxSocketId<D, BT>;
76
77impl<I, BC, CC> IpTransportContext<I, BC, CC> for TcpIpTransportContext
78where
79 I: DualStackIpExt,
80 BC: TcpBindingsContext<CC::DeviceId>
81 + BufferProvider<
82 BC::ReceiveBuffer,
83 BC::SendBuffer,
84 ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
85 PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
86 >,
87 CC: TcpContext<I, BC> + TcpContext<I::OtherVersion, BC>,
88{
89 type EarlyDemuxSocket = DualStackTcpSocketId<I, CC::WeakDeviceId, BC>;
90
91 fn early_demux<B: ParseBuffer>(
92 core_ctx: &mut CC,
93 device: &CC::DeviceId,
94 src_ip: I::Addr,
95 dst_ip: I::Addr,
96 buffer: B,
97 ) -> Option<Self::EarlyDemuxSocket> {
98 early_demux_ip_packet::<I, _, _, _>(core_ctx, device, src_ip, dst_ip, buffer)
99 }
100
101 fn receive_icmp_error(
102 core_ctx: &mut CC,
103 bindings_ctx: &mut BC,
104 _device: &CC::DeviceId,
105 original_src_ip: Option<SpecifiedAddr<I::Addr>>,
106 original_dst_ip: SpecifiedAddr<I::Addr>,
107 mut original_body: &[u8],
108 err: I::ErrorCode,
109 ) {
110 let mut buffer = &mut original_body;
111 let Some(flow_and_seqnum) = buffer.take_obj_front::<TcpFlowAndSeqNum>() else {
112 error!("received an ICMP error but its body is less than 8 bytes");
113 return;
114 };
115
116 let Some(original_src_ip) = original_src_ip else { return };
117 let Some(original_src_port) = NonZeroU16::new(flow_and_seqnum.src_port()) else { return };
118 let Some(original_dst_port) = NonZeroU16::new(flow_and_seqnum.dst_port()) else { return };
119 let original_seqnum = SeqNum::new(flow_and_seqnum.sequence_num());
120
121 TcpApi::<I, _>::new(CtxPair { core_ctx, bindings_ctx }).on_icmp_error(
122 original_src_ip,
123 original_dst_ip,
124 original_src_port,
125 original_dst_port,
126 original_seqnum,
127 err.into(),
128 );
129 }
130
131 fn receive_ip_packet<B: BufferMut, H: IpHeaderInfo<I>>(
132 core_ctx: &mut CC,
133 bindings_ctx: &mut BC,
134 device: &CC::DeviceId,
135 remote_ip: I::RecvSrcAddr,
136 local_ip: SpecifiedAddr<I::Addr>,
137 mut buffer: B,
138 info: &LocalDeliveryPacketInfo<I, H>,
139 early_demux_socket: Option<Self::EarlyDemuxSocket>,
140 ) -> Result<(), (B, I::IcmpError)> {
141 let LocalDeliveryPacketInfo { meta, header_info, marks } = info;
142 let ReceiveIpPacketMeta { broadcast, transparent_override } = meta;
143 if let Some(delivery) = transparent_override {
144 warn!(
145 "TODO(https://fxbug.dev/337009139): transparent proxy not supported for TCP \
146 sockets; will not override dispatch to perform local delivery to {delivery:?}"
147 );
148 }
149
150 if broadcast.is_some() {
151 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
152 .invalid_ip_addrs_received
153 .increment();
154 debug!("tcp: dropping broadcast TCP packet");
155 return Ok(());
156 }
157
158 let remote_ip = match SpecifiedAddr::new(remote_ip.into_addr()) {
159 None => {
160 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
161 .invalid_ip_addrs_received
162 .increment();
163 debug!("tcp: source address unspecified, dropping the packet");
164 return Ok(());
165 }
166 Some(src_ip) => src_ip,
167 };
168 let remote_ip: SocketIpAddr<_> = match remote_ip.try_into() {
169 Ok(remote_ip) => remote_ip,
170 Err(AddrIsMappedError {}) => {
171 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
172 .invalid_ip_addrs_received
173 .increment();
174 debug!("tcp: source address is mapped (ipv4-mapped-ipv6), dropping the packet");
175 return Ok(());
176 }
177 };
178 let local_ip: SocketIpAddr<_> = match local_ip.try_into() {
179 Ok(local_ip) => local_ip,
180 Err(AddrIsMappedError {}) => {
181 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
182 .invalid_ip_addrs_received
183 .increment();
184 debug!("tcp: local address is mapped (ipv4-mapped-ipv6), dropping the packet");
185 return Ok(());
186 }
187 };
188 let packet = match buffer
189 .parse_with::<_, TcpSegment<_>>(TcpParseArgs::new(remote_ip.addr(), local_ip.addr()))
190 {
191 Ok(packet) => packet,
192 Err(err) => {
193 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
194 .invalid_segments_received
195 .increment();
196 debug!("tcp: failed parsing incoming packet {:?}", err);
197 match err {
198 ParseError::Checksum => {
199 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
200 .checksum_errors
201 .increment();
202 }
203 ParseError::NotSupported | ParseError::NotExpected | ParseError::Format => {}
204 }
205 return Ok(());
206 }
207 };
208 let local_port = packet.dst_port();
209 let remote_port = packet.src_port();
210 let incoming = match VerifiedTcpSegment::try_from(packet) {
211 Ok(segment) => segment,
212 Err(err) => {
213 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
214 .invalid_segments_received
215 .increment();
216 debug!("tcp: malformed segment {:?}", err);
217 return Ok(());
218 }
219 };
220 let conn_addr =
221 ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) };
222
223 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
224 .valid_segments_received
225 .increment();
226 handle_incoming_packet::<I, _, _, _>(
227 core_ctx,
228 bindings_ctx,
229 conn_addr,
230 device,
231 header_info,
232 &incoming,
233 marks,
234 early_demux_socket,
235 );
236 Ok(())
237 }
238}
239
240fn early_demux_ip_packet<I, BC, CC, B>(
241 core_ctx: &mut CC,
242 device: &CC::DeviceId,
243 src_ip: I::Addr,
244 dst_ip: I::Addr,
245 mut buffer: B,
246) -> Option<DualStackTcpSocketId<I, CC::WeakDeviceId, BC>>
247where
248 I: DualStackIpExt,
249 BC: TcpBindingsContext<CC::DeviceId>,
250 CC: TcpContext<I, BC> + TcpContext<I::OtherVersion, BC>,
251 B: ParseBuffer,
252{
253 let Ok(packet) = buffer.parse_with::<_, TcpSegmentRaw<_>>(()) else {
254 return None;
257 };
258
259 let src_ip = SocketIpAddr::new(src_ip)?;
260 let dst_ip = SocketIpAddr::new(dst_ip)?;
261 let (src_port, dst_port) = packet.flow_header().src_dst();
262 let src_port = NonZeroU16::new(src_port)?;
263 let dst_port = NonZeroU16::new(dst_port)?;
264 let device = device.downgrade();
265
266 core_ctx.with_demux(|demux: &DemuxState<I, _, _>| {
267 demux
268 .socketmap
269 .lookup_connected((src_ip, src_port), (dst_ip, dst_port), device)
270 .map(|entry| entry.id())
271 })
272}
273
274fn handle_incoming_packet<WireI, BC, CC, H>(
275 core_ctx: &mut CC,
276 bindings_ctx: &mut BC,
277 conn_addr: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
278 incoming_device: &CC::DeviceId,
279 header_info: &H,
280 incoming: &VerifiedTcpSegment<'_>,
281 marks: &Marks,
282 mut early_demux_socket: Option<DualStackTcpSocketId<WireI, CC::WeakDeviceId, BC>>,
283) where
284 WireI: DualStackIpExt,
285 BC: TcpBindingsContext<CC::DeviceId>
286 + BufferProvider<
287 BC::ReceiveBuffer,
288 BC::SendBuffer,
289 ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
290 PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
291 >,
292 CC: TcpContext<WireI, BC> + TcpContext<WireI::OtherVersion, BC>,
293 H: IpHeaderInfo<WireI>,
294{
295 trace_duration!("tcp::handle_incoming_packet");
296 let mut tw_reuse = None;
297
298 let addr: IpAddrVec<WireI, _> = if early_demux_socket.is_some() {
302 let (ip, port) = conn_addr.local;
303 IpAddrVec::new_listener(ip, port)
304 } else {
305 conn_addr.into()
306 };
307 let mut addrs_to_search = AddrVecIter::<WireI, CC::WeakDeviceId, TcpPortSpec>::with_device(
308 addr,
309 incoming_device.downgrade(),
310 );
311
312 enum FoundSocket<S> {
313 Yes(Option<S>),
316 No,
317 }
318 let found_socket = loop {
319 let sock = if let Some(early_demux_socket) = early_demux_socket.take() {
320 let device = match WireI::as_dual_stack_ip_socket(&early_demux_socket) {
321 EitherStack::ThisStack(conn_id) => conn_id.get_bound_device(core_ctx),
322 EitherStack::OtherStack(conn_id) => conn_id.get_bound_device(core_ctx),
323 };
324 let conn_addr = ConnAddr { ip: conn_addr, device };
325 Some(SocketLookupResult::Connection(early_demux_socket, conn_addr))
326 } else {
327 core_ctx.with_demux(|demux| lookup_socket::<WireI, CC, BC>(demux, &mut addrs_to_search))
328 };
329 match sock {
330 None => break FoundSocket::No,
331 Some(SocketLookupResult::Connection(demux_conn_id, conn_addr)) => {
332 assert_eq!(tw_reuse, None);
335 let disposition = match WireI::as_dual_stack_ip_socket(&demux_conn_id) {
336 EitherStack::ThisStack(conn_id) => {
337 try_handle_incoming_for_connection_dual_stack(
338 core_ctx,
339 bindings_ctx,
340 conn_id,
341 incoming_device,
342 header_info,
343 &incoming,
344 )
345 }
346 EitherStack::OtherStack(conn_id) => {
347 try_handle_incoming_for_connection_dual_stack(
348 core_ctx,
349 bindings_ctx,
350 conn_id,
351 incoming_device,
352 header_info,
353 &incoming,
354 )
355 }
356 };
357 match disposition {
358 ConnectionIncomingSegmentDisposition::Destroy => {
359 WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, demux_conn_id);
360 break FoundSocket::Yes(None);
361 }
362 ConnectionIncomingSegmentDisposition::FoundSocket
363 | ConnectionIncomingSegmentDisposition::Filtered => {
364 break FoundSocket::Yes(Some(demux_conn_id));
365 }
366 ConnectionIncomingSegmentDisposition::ReuseCandidateForListener => {
367 tw_reuse = Some((demux_conn_id, conn_addr));
368 }
369 }
370 }
371 Some(SocketLookupResult::Listener((demux_listener_id, _listener_addr))) => {
372 match WireI::as_dual_stack_ip_socket(&demux_listener_id) {
373 EitherStack::ThisStack(listener_id) => {
374 let disposition = core_ctx.with_socket_mut_generators_transport_demux(
375 &listener_id,
376 |core_ctx, socket_state, isn, timestamp_offset| match core_ctx {
377 MaybeDualStack::NotDualStack((core_ctx, converter)) => {
378 try_handle_incoming_for_listener::<WireI, WireI, CC, BC, _, _>(
379 core_ctx,
380 bindings_ctx,
381 &listener_id,
382 isn,
383 timestamp_offset,
384 socket_state,
385 header_info,
386 incoming,
387 conn_addr,
388 incoming_device,
389 &mut tw_reuse,
390 move |conn, addr| converter.convert_back((conn, addr)),
391 WireI::into_demux_socket_id,
392 marks,
393 )
394 }
395 MaybeDualStack::DualStack((core_ctx, converter)) => {
396 try_handle_incoming_for_listener::<_, _, CC, BC, _, _>(
397 core_ctx,
398 bindings_ctx,
399 &listener_id,
400 isn,
401 timestamp_offset,
402 socket_state,
403 header_info,
404 incoming,
405 conn_addr,
406 incoming_device,
407 &mut tw_reuse,
408 move |conn, addr| {
409 converter
410 .convert_back(EitherStack::ThisStack((conn, addr)))
411 },
412 WireI::into_demux_socket_id,
413 marks,
414 )
415 }
416 },
417 );
418 if try_handle_listener_incoming_disposition(
419 core_ctx,
420 bindings_ctx,
421 disposition,
422 &demux_listener_id,
423 &mut tw_reuse,
424 &mut addrs_to_search,
425 conn_addr,
426 incoming_device,
427 ) {
428 break FoundSocket::Yes(Some(demux_listener_id));
429 }
430 }
431 EitherStack::OtherStack(listener_id) => {
432 let disposition = core_ctx.with_socket_mut_generators_transport_demux(
433 &listener_id,
434 |core_ctx, socket_state, isn, timestamp_offset| {
435 match core_ctx {
436 MaybeDualStack::NotDualStack((_core_ctx, _converter)) => {
437 unreachable!("OtherStack socket ID with non dual stack");
440 }
441 MaybeDualStack::DualStack((core_ctx, converter)) => {
442 let other_demux_id_converter =
443 core_ctx.other_demux_id_converter();
444 try_handle_incoming_for_listener::<_, _, CC, BC, _, _>(
445 core_ctx,
446 bindings_ctx,
447 &listener_id,
448 isn,
449 timestamp_offset,
450 socket_state,
451 header_info,
452 incoming,
453 conn_addr,
454 incoming_device,
455 &mut tw_reuse,
456 move |conn, addr| {
457 converter.convert_back(EitherStack::OtherStack((
458 conn, addr,
459 )))
460 },
461 move |id| other_demux_id_converter.convert(id),
462 marks,
463 )
464 }
465 }
466 },
467 );
468 if try_handle_listener_incoming_disposition::<_, _, CC, BC, _>(
469 core_ctx,
470 bindings_ctx,
471 disposition,
472 &demux_listener_id,
473 &mut tw_reuse,
474 &mut addrs_to_search,
475 conn_addr,
476 incoming_device,
477 ) {
478 break FoundSocket::Yes(Some(demux_listener_id));
479 }
480 }
481 };
482 }
483 }
484 };
485
486 let demux_id = match found_socket {
487 FoundSocket::No => {
488 CounterContext::<TcpCountersWithoutSocket<WireI>>::counters(core_ctx)
489 .received_segments_no_dispatch
490 .increment();
491
492 if let Some(seg) =
498 (Closed { reason: None::<Option<ConnectionError>> }.on_segment(&incoming.into()))
499 {
500 socket::send_tcp_segment::<WireI, WireI, _, _, _>(
501 core_ctx,
502 bindings_ctx,
503 None,
504 None,
505 conn_addr,
506 seg.into_empty(),
507 &TcpIpSockOptions { marks: *marks },
508 );
509 }
510 None
511 }
512 FoundSocket::Yes(demux_id) => {
513 counters::increment_counter_with_optional_demux_id::<WireI, _, _, _, _>(
514 core_ctx,
515 demux_id.as_ref(),
516 |c| &c.received_segments_dispatched,
517 );
518 demux_id
519 }
520 };
521
522 if let Some(control) = incoming.control() {
523 counters::increment_counter_with_optional_demux_id::<WireI, _, _, _, _>(
524 core_ctx,
525 demux_id.as_ref(),
526 |c| match control {
527 Control::RST => &c.resets_received,
528 Control::SYN => &c.syns_received,
529 Control::FIN => &c.fins_received,
530 },
531 )
532 }
533}
534
535enum SocketLookupResult<I: DualStackIpExt, D: WeakDeviceIdentifier, BT: TcpBindingsTypes> {
536 Connection(I::DemuxSocketId<D, BT>, ConnAddr<ConnIpAddr<I::Addr, NonZeroU16, NonZeroU16>, D>),
537 Listener((I::DemuxSocketId<D, BT>, ListenerAddr<ListenerIpAddr<I::Addr, NonZeroU16>, D>)),
538}
539
540fn lookup_socket<I, CC, BC>(
541 DemuxState { socketmap, .. }: &DemuxState<I, CC::WeakDeviceId, BC>,
542 addrs_to_search: &mut AddrVecIter<I, CC::WeakDeviceId, TcpPortSpec>,
543) -> Option<SocketLookupResult<I, CC::WeakDeviceId, BC>>
544where
545 I: DualStackIpExt,
546 BC: TcpBindingsContext<CC::DeviceId>,
547 CC: TcpContext<I, BC>,
548{
549 addrs_to_search.find_map(|addr| {
550 match addr {
551 AddrVec::Conn(conn_addr) => {
554 socketmap.conns().get_by_addr(&conn_addr).map(|conn_addr_state| {
555 SocketLookupResult::Connection(conn_addr_state.id(), conn_addr)
556 })
557 }
558 AddrVec::Listen(listener_addr) => {
559 socketmap
564 .listeners()
565 .get_by_addr(&listener_addr)
566 .and_then(|addr_state| match addr_state {
567 ListenerAddrState::ExclusiveListener(id) => Some(id.clone()),
568 ListenerAddrState::Shared { listener: Some(id), bound: _ } => {
569 Some(id.clone())
570 }
571 ListenerAddrState::ExclusiveBound(_)
572 | ListenerAddrState::Shared { listener: None, bound: _ } => None,
573 })
574 .map(|id| SocketLookupResult::Listener((id, listener_addr)))
575 }
576 }
577 })
578}
579
580#[derive(PartialEq, Eq)]
581enum ConnectionIncomingSegmentDisposition {
582 FoundSocket,
583 Filtered,
584 ReuseCandidateForListener,
585 Destroy,
586}
587
588enum ListenerIncomingSegmentDisposition<S> {
589 FoundSocket,
590 Filtered,
591 ConflictingConnection,
592 NoMatchingSocket,
593 NewConnection(S),
594}
595
596fn try_handle_incoming_for_connection_dual_stack<SockI, WireI, CC, BC, H>(
597 core_ctx: &mut CC,
598 bindings_ctx: &mut BC,
599 conn_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
600 incoming_device: &CC::DeviceId,
601 header_info: &H,
602 incoming: &VerifiedTcpSegment<'_>,
603) -> ConnectionIncomingSegmentDisposition
604where
605 SockI: DualStackIpExt,
606 WireI: Ip,
607 BC: TcpBindingsContext<CC::DeviceId>
608 + BufferProvider<
609 BC::ReceiveBuffer,
610 BC::SendBuffer,
611 ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
612 PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
613 >,
614 CC: TcpContext<SockI, BC>,
615 H: IpHeaderInfo<WireI>,
616{
617 core_ctx.with_socket_mut_transport_demux(conn_id, |core_ctx, socket_state| {
618 let TcpSocketState { socket_state, sharing: _, ip_options: _, socket_options } =
619 socket_state;
620
621 match run_socket_ingress_filter(
622 bindings_ctx,
623 incoming_device,
624 conn_id.socket_cookie(),
625 socket_options,
626 header_info,
627 incoming.tcp_segment(),
628 ) {
629 SocketIngressFilterResult::Accept => (),
630 SocketIngressFilterResult::Drop => {
631 return ConnectionIncomingSegmentDisposition::Filtered;
632 }
633 }
634
635 let (conn_and_addr, timer) = assert_matches!(
636 socket_state,
637 TcpSocketStateInner::Connected { conn, timer } => (conn, timer),
638 "invalid socket ID"
639 );
640 let this_or_other_stack = match core_ctx {
641 MaybeDualStack::DualStack((core_ctx, converter)) => {
642 match converter.convert(conn_and_addr) {
643 EitherStack::ThisStack((conn, conn_addr)) => {
644 EitherStack::ThisStack((
649 core_ctx.as_this_stack(),
650 conn,
651 conn_addr,
652 SockI::into_demux_socket_id(conn_id.clone()),
653 ))
654 }
655 EitherStack::OtherStack((conn, conn_addr)) => {
656 let demux_sock_id = core_ctx.into_other_demux_socket_id(conn_id.clone());
659 EitherStack::OtherStack((core_ctx, conn, conn_addr, demux_sock_id))
660 }
661 }
662 }
663 MaybeDualStack::NotDualStack((core_ctx, converter)) => {
664 let (conn, conn_addr) = converter.convert(conn_and_addr);
665 EitherStack::ThisStack((
668 core_ctx.as_this_stack(),
669 conn,
670 conn_addr,
671 SockI::into_demux_socket_id(conn_id.clone()),
672 ))
673 }
674 };
675
676 match this_or_other_stack {
677 EitherStack::ThisStack((core_ctx, conn, conn_addr, demux_conn_id)) => {
678 try_handle_incoming_for_connection::<_, _, CC, _, _>(
679 core_ctx,
680 bindings_ctx,
681 conn_addr.clone(),
682 conn_id,
683 demux_conn_id,
684 socket_options,
685 conn,
686 timer,
687 incoming.into(),
688 )
689 }
690 EitherStack::OtherStack((core_ctx, conn, conn_addr, demux_conn_id)) => {
691 try_handle_incoming_for_connection::<_, _, CC, _, _>(
692 core_ctx,
693 bindings_ctx,
694 conn_addr.clone(),
695 conn_id,
696 demux_conn_id,
697 socket_options,
698 conn,
699 timer,
700 incoming.into(),
701 )
702 }
703 }
704 })
705}
706
707fn try_handle_incoming_for_connection<SockI, WireI, CC, BC, DC>(
714 core_ctx: &mut DC,
715 bindings_ctx: &mut BC,
716 conn_addr: ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
717 conn_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
718 demux_id: WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
719 socket_options: &SocketOptions,
720 conn: &mut Connection<SockI, WireI, CC::WeakDeviceId, BC>,
721 timer: &mut BC::Timer,
722 incoming: Segment<&[u8]>,
723) -> ConnectionIncomingSegmentDisposition
724where
725 SockI: DualStackIpExt,
726 WireI: DualStackIpExt,
727 BC: TcpBindingsContext<CC::DeviceId>
728 + BufferProvider<
729 BC::ReceiveBuffer,
730 BC::SendBuffer,
731 ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
732 PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
733 >,
734 CC: TcpContext<SockI, BC>,
735 DC: TransportIpContext<WireI, BC, DeviceId = CC::DeviceId, WeakDeviceId = CC::WeakDeviceId>
736 + DeviceIpSocketHandler<SockI, BC>
737 + TcpDemuxContext<WireI, CC::WeakDeviceId, BC>
738 + TcpCounterContext<SockI, CC::WeakDeviceId, BC>
739 + CoreTxMetadataContext<TcpSocketTxMetadata<SockI, CC::WeakDeviceId, BC>, BC>,
740{
741 let Connection { accept_queue, state, ip_sock, defunct, soft_error: _, handshake_status } =
742 conn;
743
744 if *defunct
756 && incoming.header().control == Some(Control::SYN)
757 && incoming.header().ack.is_none()
758 {
759 if let State::TimeWait(TimeWait { last_seq: _, closed_rcv, expiry: _, snd_info: _ }) = state
760 {
761 if !incoming.header().seq.before(closed_rcv.ack) {
762 return ConnectionIncomingSegmentDisposition::ReuseCandidateForListener;
763 }
764 }
765 }
766 let (reply, passive_open, data_acked, newly_closed) = state.on_segment::<_, BC>(
767 &conn_id.either(),
768 &TcpCountersRefs::from_ctx(core_ctx, conn_id),
769 incoming,
770 bindings_ctx.now(),
771 socket_options,
772 *defunct,
773 );
774
775 match data_acked {
776 DataAcked::Yes => {
777 core_ctx.confirm_reachable(bindings_ctx, ip_sock, &socket_options.ip_options)
778 }
779 DataAcked::No => {}
780 }
781
782 match state {
783 State::Listen(_) => {
784 unreachable!("has an invalid status: {:?}", conn.state)
785 }
786 State::SynSent(_) | State::SynRcvd(_) => {
787 assert_eq!(*handshake_status, HandshakeStatus::Pending)
788 }
789 State::Established(_)
790 | State::FinWait1(_)
791 | State::FinWait2(_)
792 | State::Closing(_)
793 | State::CloseWait(_)
794 | State::LastAck(_)
795 | State::TimeWait(_) => {
796 if handshake_status
797 .update_if_pending(HandshakeStatus::Completed { reported: accept_queue.is_some() })
798 {
799 core_ctx.confirm_reachable(bindings_ctx, ip_sock, &socket_options.ip_options);
800 }
801 }
802 State::Closed(Closed { reason }) => {
803 socket::handle_newly_closed(
811 core_ctx,
812 bindings_ctx,
813 newly_closed,
814 &demux_id,
815 &conn_addr,
816 timer,
817 );
818 if let Some(accept_queue) = accept_queue {
819 accept_queue.remove(&conn_id);
820 *defunct = true;
821 }
822 if *defunct {
823 return ConnectionIncomingSegmentDisposition::Destroy;
826 }
827 let _: bool = handshake_status.update_if_pending(match reason {
828 None => HandshakeStatus::Completed { reported: accept_queue.is_some() },
829 Some(_err) => HandshakeStatus::Aborted,
830 });
831 }
832 }
833
834 if let Some(seg) = reply {
835 socket::send_tcp_segment(
836 core_ctx,
837 bindings_ctx,
838 Some(conn_id),
839 Some(&ip_sock),
840 conn_addr.ip,
841 seg.into_empty(),
842 &socket_options.ip_options,
843 );
844 }
845
846 socket::do_send_inner_and_then_handle_newly_closed(
848 conn_id,
849 &demux_id,
850 socket_options,
851 conn,
852 DoSendLimit::MultipleSegments,
853 &conn_addr,
854 timer,
855 core_ctx,
856 bindings_ctx,
857 );
858
859 if let Some(passive_open) = passive_open {
862 let accept_queue = conn.accept_queue.as_ref().expect("no accept queue but passive open");
863 accept_queue.notify_ready(conn_id, passive_open);
864 }
865
866 ConnectionIncomingSegmentDisposition::FoundSocket
868}
869
870fn try_handle_listener_incoming_disposition<SockI, WireI, CC, BC, Addr>(
875 core_ctx: &mut CC,
876 bindings_ctx: &mut BC,
877 disposition: ListenerIncomingSegmentDisposition<PrimaryRc<SockI, CC::WeakDeviceId, BC>>,
878 demux_listener_id: &WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
879 tw_reuse: &mut Option<(WireI::DemuxSocketId<CC::WeakDeviceId, BC>, Addr)>,
880 addrs_to_search: &mut AddrVecIter<WireI, CC::WeakDeviceId, TcpPortSpec>,
881 conn_addr: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
882 incoming_device: &CC::DeviceId,
883) -> bool
884where
885 SockI: DualStackIpExt,
886 WireI: DualStackIpExt,
887 CC: TcpContext<SockI, BC> + TcpContext<WireI, BC> + TcpContext<WireI::OtherVersion, BC>,
888 BC: TcpBindingsContext<CC::DeviceId>,
889{
890 match disposition {
891 ListenerIncomingSegmentDisposition::FoundSocket => true,
892 ListenerIncomingSegmentDisposition::Filtered => true,
893 ListenerIncomingSegmentDisposition::ConflictingConnection => {
894 if let Some((tw_reuse, _)) = tw_reuse.take() {
899 WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, tw_reuse);
900 }
901
902 *addrs_to_search = AddrVecIter::<WireI, CC::WeakDeviceId, TcpPortSpec>::with_device(
905 conn_addr.into(),
906 incoming_device.downgrade(),
907 );
908 false
909 }
910 ListenerIncomingSegmentDisposition::NoMatchingSocket => false,
911 ListenerIncomingSegmentDisposition::NewConnection(primary) => {
912 if let Some((tw_reuse, _)) = tw_reuse.take() {
918 WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, tw_reuse);
919 }
920
921 let id = TcpSocketId(PrimaryRc::clone_strong(&primary));
930 let to_destroy = core_ctx.with_all_sockets_mut(move |all_sockets| {
931 let insert_entry = TcpSocketSetEntry::Primary(primary);
932 match all_sockets.entry(id) {
933 hash_map::Entry::Vacant(v) => {
934 let _: &mut _ = v.insert(insert_entry);
935 None
936 }
937 hash_map::Entry::Occupied(mut o) => {
938 assert_matches!(
942 core::mem::replace(o.get_mut(), insert_entry),
943 TcpSocketSetEntry::DeadOnArrival
944 );
945 Some(o.key().clone())
946 }
947 }
948 });
949 if let Some(to_destroy) = to_destroy {
957 socket::destroy_socket(core_ctx, bindings_ctx, to_destroy);
958 }
959 counters::increment_counter_for_demux_id::<WireI, _, _, _, _>(
960 core_ctx,
961 demux_listener_id,
962 |c| &c.passive_connection_openings,
963 );
964 true
965 }
966 }
967}
968
969fn try_handle_incoming_for_listener<SockI, WireI, CC, BC, DC, H>(
973 core_ctx: &mut DC,
974 bindings_ctx: &mut BC,
975 listener_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
976 isn: &IsnGenerator<BC::Instant>,
977 timestamp_offset: &TimestampOffsetGenerator<BC::Instant>,
978 socket_state: &mut TcpSocketState<SockI, CC::WeakDeviceId, BC>,
979 header_info: &H,
980 incoming: &VerifiedTcpSegment<'_>,
981 incoming_addrs: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
982 incoming_device: &CC::DeviceId,
983 tw_reuse: &mut Option<(
984 WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
985 ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
986 )>,
987 make_connection: impl FnOnce(
988 Connection<SockI, WireI, CC::WeakDeviceId, BC>,
989 ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
990 ) -> SockI::ConnectionAndAddr<CC::WeakDeviceId, BC>,
991 make_demux_id: impl Fn(
992 TcpSocketId<SockI, CC::WeakDeviceId, BC>,
993 ) -> WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
994 marks: &Marks,
995) -> ListenerIncomingSegmentDisposition<PrimaryRc<SockI, CC::WeakDeviceId, BC>>
996where
997 SockI: DualStackIpExt,
998 WireI: DualStackIpExt,
999 BC: TcpBindingsContext<CC::DeviceId>
1000 + BufferProvider<
1001 BC::ReceiveBuffer,
1002 BC::SendBuffer,
1003 ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
1004 PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
1005 >,
1006 CC: TcpContext<SockI, BC>,
1007 DC: TransportIpContext<WireI, BC, DeviceId = CC::DeviceId, WeakDeviceId = CC::WeakDeviceId>
1008 + DeviceIpSocketHandler<WireI, BC>
1009 + TcpDemuxContext<WireI, CC::WeakDeviceId, BC>
1010 + TcpCounterContext<SockI, CC::WeakDeviceId, BC>
1011 + CoreTxMetadataContext<TcpSocketTxMetadata<SockI, CC::WeakDeviceId, BC>, BC>,
1012 H: IpHeaderInfo<WireI>,
1013{
1014 let Listener { addr: listener_addr, accept_queue, backlog, buffer_sizes } =
1015 match &socket_state.socket_state {
1016 TcpSocketStateInner::Bound(_) => {
1017 return ListenerIncomingSegmentDisposition::NoMatchingSocket;
1019 }
1020 TcpSocketStateInner::Listener(listener) => listener,
1021 _ => panic!("unexpected socket state: {:?}", socket_state.socket_state),
1022 };
1023
1024 let ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) } =
1025 incoming_addrs;
1026
1027 match run_socket_ingress_filter(
1028 bindings_ctx,
1029 incoming_device,
1030 listener_id.socket_cookie(),
1031 &socket_state.socket_options,
1032 header_info,
1033 incoming.tcp_segment(),
1034 ) {
1035 SocketIngressFilterResult::Accept => (),
1036 SocketIngressFilterResult::Drop => {
1037 return ListenerIncomingSegmentDisposition::Filtered;
1038 }
1039 }
1040
1041 if accept_queue.len() == backlog.get() {
1045 core_ctx.increment_both(listener_id, |counters| &counters.listener_queue_overflow);
1046 core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1047 debug!("incoming SYN dropped because of the full backlog of the listener");
1048 return ListenerIncomingSegmentDisposition::FoundSocket;
1049 }
1050
1051 let bound_device = listener_addr.as_ref().clone();
1054 let bound_device = if remote_ip.as_ref().must_have_zone() {
1055 Some(bound_device.map_or(EitherDeviceId::Strong(incoming_device), EitherDeviceId::Weak))
1056 } else {
1057 bound_device.map(EitherDeviceId::Weak)
1058 };
1059
1060 let ip_options = TcpIpSockOptions { marks: *marks, ..socket_state.socket_options.ip_options };
1061 let socket_options = SocketOptions { ip_options, ..socket_state.socket_options };
1062
1063 let bound_device = bound_device.as_ref().map(|d| d.as_ref());
1064 let ip_sock = match core_ctx.new_ip_socket(
1065 bindings_ctx,
1066 IpSocketArgs {
1067 device: bound_device,
1068 local_ip: IpDeviceAddr::new_from_socket_ip_addr(local_ip),
1069 remote_ip,
1070 proto: IpProto::Tcp.into(),
1071 options: &ip_options,
1072 },
1073 ) {
1074 Ok(ip_sock) => ip_sock,
1075 err @ Err(IpSockCreationError::Route(_)) => {
1076 core_ctx.increment_both(listener_id, |counters| &counters.passive_open_no_route_errors);
1077 core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1078 debug!("cannot construct an ip socket to the SYN originator: {:?}, ignoring", err);
1079 return ListenerIncomingSegmentDisposition::NoMatchingSocket;
1080 }
1081 };
1082
1083 let isn = isn.generate(
1084 bindings_ctx.now(),
1085 (ip_sock.local_ip().clone().into(), local_port),
1086 (ip_sock.remote_ip().clone(), remote_port),
1087 );
1088 let timestamp_offset = timestamp_offset.generate::<SocketIpAddr<WireI::Addr>, NonZeroU16>(
1089 bindings_ctx.now(),
1090 (ip_sock.local_ip().clone().into(), local_port),
1091 (ip_sock.remote_ip().clone(), remote_port),
1092 );
1093 let device_mms = match core_ctx.get_mms(bindings_ctx, &ip_sock, &socket_options.ip_options) {
1094 Ok(mms) => mms,
1095 Err(err) => {
1096 error!("Cannot find a device with large enough MTU for the connection");
1100 core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1101 match err {
1102 MmsError::NoDevice(_) | MmsError::MTUTooSmall(_) => {
1103 return ListenerIncomingSegmentDisposition::FoundSocket;
1104 }
1105 }
1106 }
1107 };
1108 let Some(device_mss) = Mss::from_mms(device_mms) else {
1109 return ListenerIncomingSegmentDisposition::FoundSocket;
1110 };
1111
1112 let mut state = State::Listen(Closed::<Initial>::listen(
1113 isn,
1114 timestamp_offset,
1115 buffer_sizes.clone(),
1116 device_mss,
1117 Mss::default::<WireI>(),
1118 socket_options.user_timeout,
1119 ));
1120
1121 let result = state.on_segment::<_, BC>(
1126 &listener_id.either(),
1130 &TcpCountersRefs::from_ctx(core_ctx, listener_id),
1131 incoming.into(),
1132 bindings_ctx.now(),
1133 &SocketOptions::default(),
1134 false, );
1136 let reply = assert_matches!(
1137 result,
1138 (reply, None, _, NewlyClosed::No ) => reply
1139 );
1140
1141 let result = if matches!(state, State::SynRcvd(_)) {
1142 let poll_send_at = state.poll_send_at().expect("no retrans timer");
1143 let bound_device = ip_sock.device().cloned();
1144
1145 let addr = ConnAddr {
1146 ip: ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) },
1147 device: bound_device,
1148 };
1149
1150 let new_socket = core_ctx.with_demux_mut(|DemuxState { socketmap, .. }| {
1151 if let Some((tw_reuse, conn_addr)) = tw_reuse {
1161 match socketmap.conns_mut().remove(tw_reuse, &conn_addr) {
1162 Ok(()) => {
1163 }
1169 Err(NotFoundError) => {
1170 }
1174 }
1175 }
1176
1177 let accept_queue_clone = accept_queue.clone();
1179 let ip_sock = ip_sock.clone();
1180 let bindings_ctx_moved = &mut *bindings_ctx;
1181 let sharing = socket_state.sharing;
1182 match socketmap.conns_mut().try_insert_with(addr, sharing, move |addr, sharing| {
1183 let conn = make_connection(
1184 Connection {
1185 accept_queue: Some(accept_queue_clone),
1186 state,
1187 ip_sock,
1188 defunct: false,
1189 soft_error: None,
1190 handshake_status: HandshakeStatus::Pending,
1191 },
1192 addr,
1193 );
1194
1195 let (id, primary) = TcpSocketId::new_cyclic(
1196 |weak| {
1197 let mut timer = CC::new_timer(bindings_ctx_moved, weak);
1198 assert_eq!(
1202 bindings_ctx_moved.schedule_timer_instant(poll_send_at, &mut timer),
1203 None
1204 );
1205 TcpSocketStateInner::Connected { conn, timer }
1206 },
1207 sharing,
1208 socket_options,
1209 );
1210 (make_demux_id(id.clone()), (primary, id))
1211 }) {
1212 Ok((_entry, (primary, id))) => {
1213 accept_queue.push_pending(id);
1216 Some(primary)
1217 }
1218 Err(e) => {
1219 assert_matches!(e, InsertError::Exists);
1224 None
1229 }
1230 }
1231 });
1232
1233 match new_socket {
1234 Some(new_socket) => ListenerIncomingSegmentDisposition::NewConnection(new_socket),
1235 None => {
1236 core_ctx
1239 .increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1240 return ListenerIncomingSegmentDisposition::ConflictingConnection;
1241 }
1242 }
1243 } else {
1244 ListenerIncomingSegmentDisposition::FoundSocket
1247 };
1248
1249 if let Some(seg) = reply {
1251 socket::send_tcp_segment(
1252 core_ctx,
1253 bindings_ctx,
1254 Some(&listener_id),
1255 Some(&ip_sock),
1256 incoming_addrs,
1257 seg.into_empty(),
1258 &socket_options.ip_options,
1259 );
1260 }
1261
1262 result
1263}
1264
1265pub(super) fn tcp_serialize_segment<'a, I, P>(
1266 header: &'a SegmentHeader,
1267 data: P,
1268 conn_addr: ConnIpAddr<I::Addr, NonZeroU16, NonZeroU16>,
1269) -> impl TransportPacketSerializer<I, Buffer = EmptyBuf> + Debug + 'a
1270where
1271 I: FilterIpExt,
1272 P: InnerPacketBuilder + Debug + Payload + 'a,
1273{
1274 let SegmentHeader { seq, ack, wnd, control, options, push } = header;
1275 let ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) } = conn_addr;
1276 let mut builder = TcpSegmentBuilder::new(
1277 local_ip.addr(),
1278 remote_ip.addr(),
1279 local_port,
1280 remote_port,
1281 (*seq).into(),
1282 ack.map(Into::into),
1283 u16::from(*wnd),
1284 );
1285 builder.psh(*push);
1286 match control {
1287 None => {}
1288 Some(Control::SYN) => builder.syn(true),
1289 Some(Control::FIN) => builder.fin(true),
1290 Some(Control::RST) => builder.rst(true),
1291 }
1292 TcpSegmentBuilderWithOptions::new(builder, options.builder())
1293 .unwrap_or_else(|TcpOptionsTooLongError| {
1294 panic!("Too many TCP options");
1295 })
1296 .wrap_body(data.into_serializer())
1297}
1298
1299fn run_socket_ingress_filter<I, BC, D>(
1300 bindings_ctx: &BC,
1301 incoming_device: &D,
1302 socket_cookie: SocketCookie,
1303 socket_options: &SocketOptions,
1304 header_info: &impl IpHeaderInfo<I>,
1305 tcp_segment: &TcpSegment<&'_ [u8]>,
1306) -> SocketIngressFilterResult
1307where
1308 I: Ip,
1309 BC: TcpBindingsContext<D>,
1310 D: StrongDeviceIdentifier,
1311{
1312 let [ip_prefix, ip_options] = header_info.as_bytes();
1313 let [tcp_prefix, tcp_options, data] = tcp_segment.as_bytes();
1314 let mut slices = [ip_prefix, ip_options, tcp_prefix, tcp_options, data];
1315 let data = FragmentedByteSlice::new(&mut slices);
1316
1317 bindings_ctx.socket_ops_filter().on_ingress(
1318 I::VERSION,
1319 data,
1320 incoming_device,
1321 socket_cookie,
1322 &socket_options.ip_options.marks,
1323 )
1324}
1325
1326#[cfg(test)]
1327mod test {
1328 use ip_test_macro::ip_test;
1329 use netstack3_base::{
1330 HandshakeOptions, Options, ResetOptions, SackBlocks, SegmentOptions, UnscaledWindowSize,
1331 };
1332 use packet::{ParseBuffer as _, Serializer as _};
1333 use packet_formats::tcp::options::TcpOptions as _;
1334 use test_case::test_case;
1335
1336 use super::*;
1337
1338 trait TestIpExt: netstack3_base::testutil::TestIpExt + FilterIpExt {}
1339 impl<T> TestIpExt for T where T: netstack3_base::testutil::TestIpExt + FilterIpExt {}
1340
1341 const SEQ: SeqNum = SeqNum::new(12345);
1342 const ACK: SeqNum = SeqNum::new(67890);
1343 const FAKE_DATA: &'static [u8] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 0];
1344
1345 #[ip_test(I)]
1346 #[test_case(
1347 Segment::syn(SEQ, UnscaledWindowSize::from(u16::MAX),
1348 HandshakeOptions::default()), &[]
1349 ; "syn")]
1350 #[test_case(
1351 Segment::syn(SEQ, UnscaledWindowSize::from(u16::MAX),
1352 HandshakeOptions {
1353 mss: Some(Mss::new(1440).unwrap()),
1354 ..Default::default() }), &[]
1355 ; "syn with mss")]
1356 #[test_case(
1357 Segment::ack(SEQ, ACK, UnscaledWindowSize::from(u16::MAX), SegmentOptions::default()),
1358 &[]; "ack")]
1359 #[test_case(Segment::with_fake_data(SEQ, ACK, FAKE_DATA), FAKE_DATA; "data")]
1360 #[test_case(Segment::new_assert_no_discard(SegmentHeader {
1361 seq: SEQ,
1362 ack: Some(ACK),
1363 push: true,
1364 wnd: UnscaledWindowSize::from(u16::MAX),
1365 ..Default::default()
1366 },
1367 FAKE_DATA
1368 ), FAKE_DATA; "push")]
1369 fn tcp_serialize_segment<I: TestIpExt>(segment: Segment<&[u8]>, expected_body: &[u8]) {
1370 const SOURCE_PORT: NonZeroU16 = NonZeroU16::new(1111).unwrap();
1371 const DEST_PORT: NonZeroU16 = NonZeroU16::new(2222).unwrap();
1372
1373 let (header, data) = segment.into_parts();
1374 let serializer = super::tcp_serialize_segment::<I, _>(
1375 &header,
1376 data,
1377 ConnIpAddr {
1378 local: (SocketIpAddr::try_from(I::TEST_ADDRS.local_ip).unwrap(), SOURCE_PORT),
1379 remote: (SocketIpAddr::try_from(I::TEST_ADDRS.remote_ip).unwrap(), DEST_PORT),
1380 },
1381 );
1382
1383 let mut serialized = serializer.serialize_vec_outer().unwrap().unwrap_b();
1384 let parsed_segment = serialized
1385 .parse_with::<_, TcpSegment<_>>(TcpParseArgs::new(
1386 *I::TEST_ADDRS.remote_ip,
1387 *I::TEST_ADDRS.local_ip,
1388 ))
1389 .expect("is valid segment");
1390
1391 assert_eq!(parsed_segment.src_port(), SOURCE_PORT);
1392 assert_eq!(parsed_segment.dst_port(), DEST_PORT);
1393 assert_eq!(parsed_segment.seq_num(), u32::from(SEQ));
1394 assert_eq!(parsed_segment.psh(), header.push);
1395 assert_eq!(
1396 UnscaledWindowSize::from(parsed_segment.window_size()),
1397 UnscaledWindowSize::from(u16::MAX)
1398 );
1399
1400 let (mss, window_scale, sack_permitted, sack_blocks, timestamp) = match header.options {
1401 Options::Handshake(HandshakeOptions {
1402 mss,
1403 window_scale,
1404 sack_permitted,
1405 timestamp,
1406 }) => (mss, window_scale, sack_permitted, SackBlocks::EMPTY, timestamp),
1407 Options::Segment(SegmentOptions { timestamp, sack_blocks }) => {
1408 (None, None, false, sack_blocks, timestamp)
1409 }
1410 Options::Reset(ResetOptions { timestamp }) => {
1411 (None, None, false, SackBlocks::EMPTY, timestamp)
1412 }
1413 };
1414 assert_eq!(mss.map(|mss| mss.get()), parsed_segment.options().mss());
1415 assert_eq!(window_scale.map(|ws| ws.get()), parsed_segment.options().window_scale());
1416 assert_eq!(sack_permitted, parsed_segment.options().sack_permitted());
1417 assert_eq!(sack_blocks.as_slice(), parsed_segment.options().sack_blocks());
1418 assert_eq!(
1419 timestamp.as_ref().map(Into::into).as_ref(),
1420 parsed_segment.options().timestamp()
1421 );
1422
1423 assert_eq!(parsed_segment.into_body(), expected_body);
1424 }
1425}