netstack3_tcp/socket/
demux.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Defines the entry point of TCP packets, by directing them into the correct
6//! state machine.
7
8use alloc::collections::hash_map;
9use core::fmt::Debug;
10use core::num::NonZeroU16;
11
12use assert_matches::assert_matches;
13use log::{debug, error, warn};
14use net_types::SpecifiedAddr;
15use netstack3_base::socket::{
16    AddrIsMappedError, AddrVec, AddrVecIter, ConnAddr, ConnIpAddr, InsertError, ListenerAddr,
17    ListenerIpAddr, SocketIpAddr, SocketIpAddrExt as _,
18};
19use netstack3_base::{
20    BidirectionalConverter as _, Control, CounterContext, CtxPair, EitherDeviceId, IpDeviceAddr,
21    Marks, Mss, NotFoundError, Payload, Segment, SegmentHeader, SeqNum,
22    StrongDeviceIdentifier as _, WeakDeviceIdentifier,
23};
24use netstack3_filter::TransportPacketSerializer;
25use netstack3_ip::socket::{IpSockCreationError, MmsError};
26use netstack3_ip::{
27    IpHeaderInfo, IpTransportContext, LocalDeliveryPacketInfo, ReceiveIpPacketMeta,
28    TransportIpContext, TransportReceiveError,
29};
30use netstack3_trace::trace_duration;
31use packet::{BufferMut, BufferView as _, EmptyBuf, InnerPacketBuilder, Serializer as _};
32use packet_formats::error::ParseError;
33use packet_formats::ip::{IpExt, IpProto};
34use packet_formats::tcp::{
35    TcpFlowAndSeqNum, TcpOptionsTooLongError, TcpParseArgs, TcpSegment, TcpSegmentBuilder,
36    TcpSegmentBuilderWithOptions,
37};
38
39use crate::internal::base::{BufferSizes, ConnectionError, SocketOptions, TcpIpSockOptions};
40use crate::internal::counters::{
41    self, TcpCounterContext, TcpCountersRefs, TcpCountersWithoutSocket,
42};
43use crate::internal::socket::isn::IsnGenerator;
44use crate::internal::socket::{
45    self, AsThisStack as _, BoundSocketState, Connection, DemuxState, DeviceIpSocketHandler,
46    DualStackDemuxIdConverter as _, DualStackIpExt, EitherStack, HandshakeStatus, Listener,
47    ListenerAddrState, ListenerSharingState, MaybeDualStack, MaybeListener, PrimaryRc, TcpApi,
48    TcpBindingsContext, TcpBindingsTypes, TcpContext, TcpDemuxContext, TcpDualStackContext,
49    TcpIpTransportContext, TcpPortSpec, TcpSocketId, TcpSocketSetEntry, TcpSocketState,
50    TcpSocketStateInner,
51};
52use crate::internal::state::{
53    BufferProvider, Closed, DataAcked, Initial, NewlyClosed, State, TimeWait,
54};
55
56impl<BT: TcpBindingsTypes> BufferProvider<BT::ReceiveBuffer, BT::SendBuffer> for BT {
57    type ActiveOpen = BT::ListenerNotifierOrProvidedBuffers;
58
59    type PassiveOpen = BT::ReturnedBuffers;
60
61    fn new_passive_open_buffers(
62        buffer_sizes: BufferSizes,
63    ) -> (BT::ReceiveBuffer, BT::SendBuffer, Self::PassiveOpen) {
64        BT::new_passive_open_buffers(buffer_sizes)
65    }
66}
67
68impl<I, BC, CC> IpTransportContext<I, BC, CC> for TcpIpTransportContext
69where
70    I: DualStackIpExt,
71    BC: TcpBindingsContext
72        + BufferProvider<
73            BC::ReceiveBuffer,
74            BC::SendBuffer,
75            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
76            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
77        >,
78    CC: TcpContext<I, BC> + TcpContext<I::OtherVersion, BC>,
79{
80    fn receive_icmp_error(
81        core_ctx: &mut CC,
82        bindings_ctx: &mut BC,
83        _device: &CC::DeviceId,
84        original_src_ip: Option<SpecifiedAddr<I::Addr>>,
85        original_dst_ip: SpecifiedAddr<I::Addr>,
86        mut original_body: &[u8],
87        err: I::ErrorCode,
88    ) {
89        let mut buffer = &mut original_body;
90        let Some(flow_and_seqnum) = buffer.take_obj_front::<TcpFlowAndSeqNum>() else {
91            error!("received an ICMP error but its body is less than 8 bytes");
92            return;
93        };
94
95        let Some(original_src_ip) = original_src_ip else { return };
96        let Some(original_src_port) = NonZeroU16::new(flow_and_seqnum.src_port()) else { return };
97        let Some(original_dst_port) = NonZeroU16::new(flow_and_seqnum.dst_port()) else { return };
98        let original_seqnum = SeqNum::new(flow_and_seqnum.sequence_num());
99
100        TcpApi::<I, _>::new(CtxPair { core_ctx, bindings_ctx }).on_icmp_error(
101            original_src_ip,
102            original_dst_ip,
103            original_src_port,
104            original_dst_port,
105            original_seqnum,
106            err.into(),
107        );
108    }
109
110    fn receive_ip_packet<B: BufferMut, H: IpHeaderInfo<I>>(
111        core_ctx: &mut CC,
112        bindings_ctx: &mut BC,
113        device: &CC::DeviceId,
114        remote_ip: I::RecvSrcAddr,
115        local_ip: SpecifiedAddr<I::Addr>,
116        mut buffer: B,
117        info: &LocalDeliveryPacketInfo<I, H>,
118    ) -> Result<(), (B, TransportReceiveError)> {
119        let LocalDeliveryPacketInfo { meta, header_info: _, marks } = info;
120        let ReceiveIpPacketMeta { broadcast, transparent_override } = meta;
121        if let Some(delivery) = transparent_override {
122            warn!(
123                "TODO(https://fxbug.dev/337009139): transparent proxy not supported for TCP \
124                sockets; will not override dispatch to perform local delivery to {delivery:?}"
125            );
126        }
127
128        if broadcast.is_some() {
129            CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
130                .invalid_ip_addrs_received
131                .increment();
132            debug!("tcp: dropping broadcast TCP packet");
133            return Ok(());
134        }
135
136        let remote_ip = match SpecifiedAddr::new(remote_ip.into()) {
137            None => {
138                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
139                    .invalid_ip_addrs_received
140                    .increment();
141                debug!("tcp: source address unspecified, dropping the packet");
142                return Ok(());
143            }
144            Some(src_ip) => src_ip,
145        };
146        let remote_ip: SocketIpAddr<_> = match remote_ip.try_into() {
147            Ok(remote_ip) => remote_ip,
148            Err(AddrIsMappedError {}) => {
149                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
150                    .invalid_ip_addrs_received
151                    .increment();
152                debug!("tcp: source address is mapped (ipv4-mapped-ipv6), dropping the packet");
153                return Ok(());
154            }
155        };
156        let local_ip: SocketIpAddr<_> = match local_ip.try_into() {
157            Ok(local_ip) => local_ip,
158            Err(AddrIsMappedError {}) => {
159                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
160                    .invalid_ip_addrs_received
161                    .increment();
162                debug!("tcp: local address is mapped (ipv4-mapped-ipv6), dropping the packet");
163                return Ok(());
164            }
165        };
166        let packet = match buffer
167            .parse_with::<_, TcpSegment<_>>(TcpParseArgs::new(remote_ip.addr(), local_ip.addr()))
168        {
169            Ok(packet) => packet,
170            Err(err) => {
171                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
172                    .invalid_segments_received
173                    .increment();
174                debug!("tcp: failed parsing incoming packet {:?}", err);
175                match err {
176                    ParseError::Checksum => {
177                        CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
178                            .checksum_errors
179                            .increment();
180                    }
181                    ParseError::NotSupported | ParseError::NotExpected | ParseError::Format => {}
182                }
183                return Ok(());
184            }
185        };
186        let local_port = packet.dst_port();
187        let remote_port = packet.src_port();
188        let incoming = match Segment::try_from(packet) {
189            Ok(segment) => segment,
190            Err(err) => {
191                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
192                    .invalid_segments_received
193                    .increment();
194                debug!("tcp: malformed segment {:?}", err);
195                return Ok(());
196            }
197        };
198        let conn_addr =
199            ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) };
200
201        CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
202            .valid_segments_received
203            .increment();
204        handle_incoming_packet::<I, _, _>(
205            core_ctx,
206            bindings_ctx,
207            conn_addr,
208            device,
209            incoming,
210            marks,
211        );
212        Ok(())
213    }
214}
215
216fn handle_incoming_packet<WireI, BC, CC>(
217    core_ctx: &mut CC,
218    bindings_ctx: &mut BC,
219    conn_addr: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
220    incoming_device: &CC::DeviceId,
221    incoming: Segment<&[u8]>,
222    marks: &Marks,
223) where
224    WireI: DualStackIpExt,
225    BC: TcpBindingsContext
226        + BufferProvider<
227            BC::ReceiveBuffer,
228            BC::SendBuffer,
229            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
230            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
231        >,
232    CC: TcpContext<WireI, BC> + TcpContext<WireI::OtherVersion, BC>,
233{
234    trace_duration!(c"tcp::handle_incoming_packet");
235    let mut tw_reuse = None;
236
237    let mut addrs_to_search = AddrVecIter::<WireI, CC::WeakDeviceId, TcpPortSpec>::with_device(
238        conn_addr.into(),
239        incoming_device.downgrade(),
240    );
241
242    enum FoundSocket<S> {
243        // Typically holds the demux ID of the found socket, but may hold
244        // `None` if the found socket was destroyed as a result of the segment.
245        Yes(Option<S>),
246        No,
247    }
248    let found_socket = loop {
249        let sock = core_ctx
250            .with_demux(|demux| lookup_socket::<WireI, CC, BC>(demux, &mut addrs_to_search));
251        match sock {
252            None => break FoundSocket::No,
253            Some(SocketLookupResult::Connection(demux_conn_id, conn_addr)) => {
254                // It is not possible to have two same connections that
255                // share the same local and remote IPs and ports.
256                assert_eq!(tw_reuse, None);
257                let disposition = match WireI::as_dual_stack_ip_socket(&demux_conn_id) {
258                    EitherStack::ThisStack(conn_id) => {
259                        try_handle_incoming_for_connection_dual_stack(
260                            core_ctx,
261                            bindings_ctx,
262                            conn_id,
263                            incoming.clone(),
264                        )
265                    }
266                    EitherStack::OtherStack(conn_id) => {
267                        try_handle_incoming_for_connection_dual_stack(
268                            core_ctx,
269                            bindings_ctx,
270                            conn_id,
271                            incoming.clone(),
272                        )
273                    }
274                };
275                match disposition {
276                    ConnectionIncomingSegmentDisposition::Destroy => {
277                        WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, demux_conn_id);
278                        break FoundSocket::Yes(None);
279                    }
280                    ConnectionIncomingSegmentDisposition::FoundSocket => {
281                        break FoundSocket::Yes(Some(demux_conn_id));
282                    }
283                    ConnectionIncomingSegmentDisposition::ReuseCandidateForListener => {
284                        tw_reuse = Some((demux_conn_id, conn_addr));
285                    }
286                }
287            }
288            Some(SocketLookupResult::Listener((demux_listener_id, _listener_addr))) => {
289                match WireI::as_dual_stack_ip_socket(&demux_listener_id) {
290                    EitherStack::ThisStack(listener_id) => {
291                        let disposition = core_ctx.with_socket_mut_isn_transport_demux(
292                            &listener_id,
293                            |core_ctx, socket_state, isn| {
294                                let TcpSocketState { socket_state, ip_options: _ } = socket_state;
295                                match core_ctx {
296                                    MaybeDualStack::NotDualStack((core_ctx, converter)) => {
297                                        try_handle_incoming_for_listener::<WireI, WireI, CC, BC, _>(
298                                            core_ctx,
299                                            bindings_ctx,
300                                            &listener_id,
301                                            isn,
302                                            socket_state,
303                                            incoming.clone(),
304                                            conn_addr,
305                                            incoming_device,
306                                            &mut tw_reuse,
307                                            move |conn, addr| converter.convert_back((conn, addr)),
308                                            WireI::into_demux_socket_id,
309                                            marks,
310                                        )
311                                    }
312                                    MaybeDualStack::DualStack((core_ctx, converter)) => {
313                                        try_handle_incoming_for_listener::<_, _, CC, BC, _>(
314                                            core_ctx,
315                                            bindings_ctx,
316                                            &listener_id,
317                                            isn,
318                                            socket_state,
319                                            incoming.clone(),
320                                            conn_addr,
321                                            incoming_device,
322                                            &mut tw_reuse,
323                                            move |conn, addr| {
324                                                converter.convert_back(EitherStack::ThisStack((
325                                                    conn, addr,
326                                                )))
327                                            },
328                                            WireI::into_demux_socket_id,
329                                            marks,
330                                        )
331                                    }
332                                }
333                            },
334                        );
335                        if try_handle_listener_incoming_disposition(
336                            core_ctx,
337                            bindings_ctx,
338                            disposition,
339                            &demux_listener_id,
340                            &mut tw_reuse,
341                            &mut addrs_to_search,
342                            conn_addr,
343                            incoming_device,
344                        ) {
345                            break FoundSocket::Yes(Some(demux_listener_id));
346                        }
347                    }
348                    EitherStack::OtherStack(listener_id) => {
349                        let disposition = core_ctx.with_socket_mut_isn_transport_demux(
350                            &listener_id,
351                            |core_ctx, socket_state, isn| {
352                                let TcpSocketState { socket_state, ip_options: _ } = socket_state;
353                                match core_ctx {
354                                    MaybeDualStack::NotDualStack((_core_ctx, _converter)) => {
355                                        // TODO(https://issues.fuchsia.dev/316408184):
356                                        // Remove this unreachable!.
357                                        unreachable!("OtherStack socket ID with non dual stack");
358                                    }
359                                    MaybeDualStack::DualStack((core_ctx, converter)) => {
360                                        let other_demux_id_converter =
361                                            core_ctx.other_demux_id_converter();
362                                        try_handle_incoming_for_listener::<_, _, CC, BC, _>(
363                                            core_ctx,
364                                            bindings_ctx,
365                                            &listener_id,
366                                            isn,
367                                            socket_state,
368                                            incoming.clone(),
369                                            conn_addr,
370                                            incoming_device,
371                                            &mut tw_reuse,
372                                            move |conn, addr| {
373                                                converter.convert_back(EitherStack::OtherStack((
374                                                    conn, addr,
375                                                )))
376                                            },
377                                            move |id| other_demux_id_converter.convert(id),
378                                            marks,
379                                        )
380                                    }
381                                }
382                            },
383                        );
384                        if try_handle_listener_incoming_disposition::<_, _, CC, BC, _>(
385                            core_ctx,
386                            bindings_ctx,
387                            disposition,
388                            &demux_listener_id,
389                            &mut tw_reuse,
390                            &mut addrs_to_search,
391                            conn_addr,
392                            incoming_device,
393                        ) {
394                            break FoundSocket::Yes(Some(demux_listener_id));
395                        }
396                    }
397                };
398            }
399        }
400    };
401
402    let demux_id = match found_socket {
403        FoundSocket::No => {
404            CounterContext::<TcpCountersWithoutSocket<WireI>>::counters(core_ctx)
405                .received_segments_no_dispatch
406                .increment();
407
408            // There is no existing TCP state, pretend it is closed
409            // and generate a RST if needed.
410            // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-21):
411            // CLOSED is fictional because it represents the state when
412            // there is no TCB, and therefore, no connection.
413            if let Some(seg) =
414                (Closed { reason: None::<Option<ConnectionError>> }.on_segment(&incoming))
415            {
416                socket::send_tcp_segment::<WireI, WireI, _, _, _>(
417                    core_ctx,
418                    bindings_ctx,
419                    None,
420                    None,
421                    conn_addr,
422                    seg.into_empty(),
423                    &TcpIpSockOptions { marks: *marks },
424                );
425            }
426            None
427        }
428        FoundSocket::Yes(demux_id) => {
429            counters::increment_counter_with_optional_demux_id::<WireI, _, _, _, _>(
430                core_ctx,
431                demux_id.as_ref(),
432                |c| &c.received_segments_dispatched,
433            );
434            demux_id
435        }
436    };
437
438    match incoming.header.control {
439        None => {}
440        Some(control) => counters::increment_counter_with_optional_demux_id::<WireI, _, _, _, _>(
441            core_ctx,
442            demux_id.as_ref(),
443            |c| match control {
444                Control::RST => &c.resets_received,
445                Control::SYN => &c.syns_received,
446                Control::FIN => &c.fins_received,
447            },
448        ),
449    }
450}
451
452enum SocketLookupResult<I: DualStackIpExt, D: WeakDeviceIdentifier, BT: TcpBindingsTypes> {
453    Connection(I::DemuxSocketId<D, BT>, ConnAddr<ConnIpAddr<I::Addr, NonZeroU16, NonZeroU16>, D>),
454    Listener((I::DemuxSocketId<D, BT>, ListenerAddr<ListenerIpAddr<I::Addr, NonZeroU16>, D>)),
455}
456
457fn lookup_socket<I, CC, BC>(
458    DemuxState { socketmap, .. }: &DemuxState<I, CC::WeakDeviceId, BC>,
459    addrs_to_search: &mut AddrVecIter<I, CC::WeakDeviceId, TcpPortSpec>,
460) -> Option<SocketLookupResult<I, CC::WeakDeviceId, BC>>
461where
462    I: DualStackIpExt,
463    BC: TcpBindingsContext,
464    CC: TcpContext<I, BC>,
465{
466    addrs_to_search.find_map(|addr| {
467        match addr {
468            // Connections are always searched before listeners because they
469            // are more specific.
470            AddrVec::Conn(conn_addr) => {
471                socketmap.conns().get_by_addr(&conn_addr).map(|conn_addr_state| {
472                    SocketLookupResult::Connection(conn_addr_state.id(), conn_addr)
473                })
474            }
475            AddrVec::Listen(listener_addr) => {
476                // If we have a listener and the incoming segment is a SYN, we
477                // allocate a new connection entry in the demuxer.
478                // TODO(https://fxbug.dev/42052878): Support SYN cookies.
479
480                socketmap
481                    .listeners()
482                    .get_by_addr(&listener_addr)
483                    .and_then(|addr_state| match addr_state {
484                        ListenerAddrState::ExclusiveListener(id) => Some(id.clone()),
485                        ListenerAddrState::Shared { listener: Some(id), bound: _ } => {
486                            Some(id.clone())
487                        }
488                        ListenerAddrState::ExclusiveBound(_)
489                        | ListenerAddrState::Shared { listener: None, bound: _ } => None,
490                    })
491                    .map(|id| SocketLookupResult::Listener((id, listener_addr)))
492            }
493        }
494    })
495}
496
497#[derive(PartialEq, Eq)]
498enum ConnectionIncomingSegmentDisposition {
499    FoundSocket,
500    ReuseCandidateForListener,
501    Destroy,
502}
503
504enum ListenerIncomingSegmentDisposition<S> {
505    FoundSocket,
506    ConflictingConnection,
507    NoMatchingSocket,
508    NewConnection(S),
509}
510
511fn try_handle_incoming_for_connection_dual_stack<SockI, CC, BC>(
512    core_ctx: &mut CC,
513    bindings_ctx: &mut BC,
514    conn_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
515    incoming: Segment<&[u8]>,
516) -> ConnectionIncomingSegmentDisposition
517where
518    SockI: DualStackIpExt,
519    BC: TcpBindingsContext
520        + BufferProvider<
521            BC::ReceiveBuffer,
522            BC::SendBuffer,
523            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
524            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
525        >,
526    CC: TcpContext<SockI, BC>,
527{
528    core_ctx.with_socket_mut_transport_demux(conn_id, |core_ctx, socket_state| {
529        let TcpSocketState { socket_state, ip_options: _ } = socket_state;
530        let (conn_and_addr, timer) = assert_matches!(
531            socket_state,
532            TcpSocketStateInner::Bound(BoundSocketState::Connected {
533                 conn, timer, sharing: _
534            }) => (conn , timer),
535            "invalid socket ID"
536        );
537        let this_or_other_stack = match core_ctx {
538            MaybeDualStack::DualStack((core_ctx, converter)) => {
539                match converter.convert(conn_and_addr) {
540                    EitherStack::ThisStack((conn, conn_addr)) => {
541                        // The socket belongs to the current stack, so we
542                        // want to deliver the segment to this stack.
543                        // Use `as_this_stack` to make the context types
544                        // match with the non-dual-stack case.
545                        EitherStack::ThisStack((
546                            core_ctx.as_this_stack(),
547                            conn,
548                            conn_addr,
549                            SockI::into_demux_socket_id(conn_id.clone()),
550                        ))
551                    }
552                    EitherStack::OtherStack((conn, conn_addr)) => {
553                        // We need to deliver from the other stack. i.e. we
554                        // need to deliver an IPv4 packet to the IPv6 stack.
555                        let demux_sock_id = core_ctx.into_other_demux_socket_id(conn_id.clone());
556                        EitherStack::OtherStack((core_ctx, conn, conn_addr, demux_sock_id))
557                    }
558                }
559            }
560            MaybeDualStack::NotDualStack((core_ctx, converter)) => {
561                let (conn, conn_addr) = converter.convert(conn_and_addr);
562                // Similar to the first case, we need deliver to this stack,
563                // but use `as_this_stack` to make the types match.
564                EitherStack::ThisStack((
565                    core_ctx.as_this_stack(),
566                    conn,
567                    conn_addr,
568                    SockI::into_demux_socket_id(conn_id.clone()),
569                ))
570            }
571        };
572
573        match this_or_other_stack {
574            EitherStack::ThisStack((core_ctx, conn, conn_addr, demux_conn_id)) => {
575                try_handle_incoming_for_connection::<_, _, CC, _, _>(
576                    core_ctx,
577                    bindings_ctx,
578                    conn_addr.clone(),
579                    conn_id,
580                    demux_conn_id,
581                    conn,
582                    timer,
583                    incoming,
584                )
585            }
586            EitherStack::OtherStack((core_ctx, conn, conn_addr, demux_conn_id)) => {
587                try_handle_incoming_for_connection::<_, _, CC, _, _>(
588                    core_ctx,
589                    bindings_ctx,
590                    conn_addr.clone(),
591                    conn_id,
592                    demux_conn_id,
593                    conn,
594                    timer,
595                    incoming,
596                )
597            }
598        }
599    })
600}
601
602/// Tries to handle the incoming segment by providing it to a connected socket.
603///
604/// Returns `FoundSocket` if the segment was handled; Otherwise,
605/// `ReuseCandidateForListener` will be returned if there is a defunct socket
606/// that is currently in TIME_WAIT, which is ready to be reused if there is an
607/// active listener listening on the port.
608fn try_handle_incoming_for_connection<SockI, WireI, CC, BC, DC>(
609    core_ctx: &mut DC,
610    bindings_ctx: &mut BC,
611    conn_addr: ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
612    conn_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
613    demux_id: WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
614    conn: &mut Connection<SockI, WireI, CC::WeakDeviceId, BC>,
615    timer: &mut BC::Timer,
616    incoming: Segment<&[u8]>,
617) -> ConnectionIncomingSegmentDisposition
618where
619    SockI: DualStackIpExt,
620    WireI: DualStackIpExt,
621    BC: TcpBindingsContext
622        + BufferProvider<
623            BC::ReceiveBuffer,
624            BC::SendBuffer,
625            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
626            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
627        >,
628    CC: TcpContext<SockI, BC>,
629    DC: TransportIpContext<WireI, BC, DeviceId = CC::DeviceId, WeakDeviceId = CC::WeakDeviceId>
630        + DeviceIpSocketHandler<SockI, BC>
631        + TcpDemuxContext<WireI, CC::WeakDeviceId, BC>
632        + TcpCounterContext<SockI, CC::WeakDeviceId, BC>,
633{
634    let Connection {
635        accept_queue,
636        state,
637        ip_sock,
638        defunct,
639        socket_options,
640        soft_error: _,
641        handshake_status,
642    } = conn;
643
644    // Per RFC 9293 Section 3.6.1:
645    //   When a connection is closed actively, it MUST linger in the TIME-WAIT
646    //   state for a time 2xMSL (Maximum Segment Lifetime) (MUST-13). However,
647    //   it MAY accept a new SYN from the remote TCP endpoint to reopen the
648    //   connection directly from TIME-WAIT state (MAY-2), if it:
649    //
650    //   (1) assigns its initial sequence number for the new connection to be
651    //       larger than the largest sequence number it used on the previous
652    //       connection incarnation, and
653    //   (2) returns to TIME-WAIT state if the SYN turns out to be an old
654    //       duplicate.
655    if *defunct && incoming.header.control == Some(Control::SYN) && incoming.header.ack.is_none() {
656        if let State::TimeWait(TimeWait { last_seq: _, closed_rcv, expiry: _ }) = state {
657            if !incoming.header.seq.before(closed_rcv.ack) {
658                return ConnectionIncomingSegmentDisposition::ReuseCandidateForListener;
659            }
660        }
661    }
662    let (reply, passive_open, data_acked, newly_closed) = state.on_segment::<_, BC>(
663        &conn_id.either(),
664        &TcpCountersRefs::from_ctx(core_ctx, conn_id),
665        incoming,
666        bindings_ctx.now(),
667        socket_options,
668        *defunct,
669    );
670
671    match data_acked {
672        DataAcked::Yes => {
673            core_ctx.confirm_reachable(bindings_ctx, ip_sock, &socket_options.ip_options)
674        }
675        DataAcked::No => {}
676    }
677
678    match state {
679        State::Listen(_) => {
680            unreachable!("has an invalid status: {:?}", conn.state)
681        }
682        State::SynSent(_) | State::SynRcvd(_) => {
683            assert_eq!(*handshake_status, HandshakeStatus::Pending)
684        }
685        State::Established(_)
686        | State::FinWait1(_)
687        | State::FinWait2(_)
688        | State::Closing(_)
689        | State::CloseWait(_)
690        | State::LastAck(_)
691        | State::TimeWait(_) => {
692            if handshake_status
693                .update_if_pending(HandshakeStatus::Completed { reported: accept_queue.is_some() })
694            {
695                core_ctx.confirm_reachable(bindings_ctx, ip_sock, &socket_options.ip_options);
696            }
697        }
698        State::Closed(Closed { reason }) => {
699            // We remove the socket from the socketmap and cancel the timers
700            // regardless of the socket being defunct or not. The justification
701            // is that CLOSED is a synthetic state and it means no connection
702            // exists, thus it should not exist in the demuxer.
703            //
704            // If the socket was already in the closed state we can assume it's
705            // no longer in the demux.
706            socket::handle_newly_closed(
707                core_ctx,
708                bindings_ctx,
709                newly_closed,
710                &demux_id,
711                &conn_addr,
712                timer,
713            );
714            if let Some(accept_queue) = accept_queue {
715                accept_queue.remove(&conn_id);
716                *defunct = true;
717            }
718            if *defunct {
719                // If the client has promised to not touch the socket again,
720                // we can destroy the socket finally.
721                return ConnectionIncomingSegmentDisposition::Destroy;
722            }
723            let _: bool = handshake_status.update_if_pending(match reason {
724                None => HandshakeStatus::Completed { reported: accept_queue.is_some() },
725                Some(_err) => HandshakeStatus::Aborted,
726            });
727        }
728    }
729
730    if let Some(seg) = reply {
731        socket::send_tcp_segment(
732            core_ctx,
733            bindings_ctx,
734            Some(conn_id),
735            Some(&ip_sock),
736            conn_addr.ip,
737            seg.into_empty(),
738            &socket_options.ip_options,
739        );
740    }
741
742    // Send any enqueued data, if there is any.
743    socket::do_send_inner_and_then_handle_newly_closed(
744        conn_id,
745        demux_id,
746        conn,
747        &conn_addr,
748        timer,
749        core_ctx,
750        bindings_ctx,
751    );
752
753    // Enqueue the connection to the associated listener
754    // socket's accept queue.
755    if let Some(passive_open) = passive_open {
756        let accept_queue = conn.accept_queue.as_ref().expect("no accept queue but passive open");
757        accept_queue.notify_ready(conn_id, passive_open);
758    }
759
760    // We found a valid connection for the segment.
761    ConnectionIncomingSegmentDisposition::FoundSocket
762}
763
764/// Responds to the disposition returned by [`try_handle_incoming_for_listener`].
765///
766/// Returns true if we have found the right socket and there is no need to
767/// continue the iteration for finding the next-best candidate.
768fn try_handle_listener_incoming_disposition<SockI, WireI, CC, BC, Addr>(
769    core_ctx: &mut CC,
770    bindings_ctx: &mut BC,
771    disposition: ListenerIncomingSegmentDisposition<PrimaryRc<SockI, CC::WeakDeviceId, BC>>,
772    demux_listener_id: &WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
773    tw_reuse: &mut Option<(WireI::DemuxSocketId<CC::WeakDeviceId, BC>, Addr)>,
774    addrs_to_search: &mut AddrVecIter<WireI, CC::WeakDeviceId, TcpPortSpec>,
775    conn_addr: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
776    incoming_device: &CC::DeviceId,
777) -> bool
778where
779    SockI: DualStackIpExt,
780    WireI: DualStackIpExt,
781    CC: TcpContext<SockI, BC> + TcpContext<WireI, BC> + TcpContext<WireI::OtherVersion, BC>,
782    BC: TcpBindingsContext,
783{
784    match disposition {
785        ListenerIncomingSegmentDisposition::FoundSocket => true,
786        ListenerIncomingSegmentDisposition::ConflictingConnection => {
787            // We're about to rewind the lookup. If we got a
788            // conflicting connection it means tw_reuse has been
789            // removed from the demux state and we need to destroy
790            // it.
791            if let Some((tw_reuse, _)) = tw_reuse.take() {
792                WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, tw_reuse);
793            }
794
795            // Reset the address vector iterator and go again, a
796            // conflicting connection was found.
797            *addrs_to_search = AddrVecIter::<WireI, CC::WeakDeviceId, TcpPortSpec>::with_device(
798                conn_addr.into(),
799                incoming_device.downgrade(),
800            );
801            false
802        }
803        ListenerIncomingSegmentDisposition::NoMatchingSocket => false,
804        ListenerIncomingSegmentDisposition::NewConnection(primary) => {
805            // If we have a new connection, we need to add it to the
806            // set of all sockets.
807
808            // First things first, if we got here then tw_reuse is
809            // gone so we need to destroy it.
810            if let Some((tw_reuse, _)) = tw_reuse.take() {
811                WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, tw_reuse);
812            }
813
814            // Now put the new connection into the socket map.
815            //
816            // Note that there's a possible subtle race here where
817            // another thread could have already operated further on
818            // this connection and marked it for destruction which
819            // puts the entry in the DOA state, if we see that we
820            // must immediately destroy the socket after having put
821            // it in the map.
822            let id = TcpSocketId(PrimaryRc::clone_strong(&primary));
823            let to_destroy = core_ctx.with_all_sockets_mut(move |all_sockets| {
824                let insert_entry = TcpSocketSetEntry::Primary(primary);
825                match all_sockets.entry(id) {
826                    hash_map::Entry::Vacant(v) => {
827                        let _: &mut _ = v.insert(insert_entry);
828                        None
829                    }
830                    hash_map::Entry::Occupied(mut o) => {
831                        // We're holding on to the primary ref, the
832                        // only possible state here should be a DOA
833                        // entry.
834                        assert_matches!(
835                            core::mem::replace(o.get_mut(), insert_entry),
836                            TcpSocketSetEntry::DeadOnArrival
837                        );
838                        Some(o.key().clone())
839                    }
840                }
841            });
842            // NB: we're releasing and reaquiring the
843            // all_sockets_mut lock here for the convenience of not
844            // needing different versions of `destroy_socket`. This
845            // should be fine because the race this is solving
846            // should not be common. If we have correct thread
847            // attribution per flow it should effectively become
848            // impossible so we go for code simplicity here.
849            if let Some(to_destroy) = to_destroy {
850                socket::destroy_socket(core_ctx, bindings_ctx, to_destroy);
851            }
852            counters::increment_counter_for_demux_id::<WireI, _, _, _, _>(
853                core_ctx,
854                demux_listener_id,
855                |c| &c.passive_connection_openings,
856            );
857            true
858        }
859    }
860}
861
862/// Tries to handle an incoming segment by passing it to a listening socket.
863///
864/// Returns `FoundSocket` if the segment was handled, otherwise `NoMatchingSocket`.
865fn try_handle_incoming_for_listener<SockI, WireI, CC, BC, DC>(
866    core_ctx: &mut DC,
867    bindings_ctx: &mut BC,
868    listener_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
869    isn: &IsnGenerator<BC::Instant>,
870    socket_state: &mut TcpSocketStateInner<SockI, CC::WeakDeviceId, BC>,
871    incoming: Segment<&[u8]>,
872    incoming_addrs: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
873    incoming_device: &CC::DeviceId,
874    tw_reuse: &mut Option<(
875        WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
876        ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
877    )>,
878    make_connection: impl FnOnce(
879        Connection<SockI, WireI, CC::WeakDeviceId, BC>,
880        ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
881    ) -> SockI::ConnectionAndAddr<CC::WeakDeviceId, BC>,
882    make_demux_id: impl Fn(
883        TcpSocketId<SockI, CC::WeakDeviceId, BC>,
884    ) -> WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
885    marks: &Marks,
886) -> ListenerIncomingSegmentDisposition<PrimaryRc<SockI, CC::WeakDeviceId, BC>>
887where
888    SockI: DualStackIpExt,
889    WireI: DualStackIpExt,
890    BC: TcpBindingsContext
891        + BufferProvider<
892            BC::ReceiveBuffer,
893            BC::SendBuffer,
894            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
895            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
896        >,
897    CC: TcpContext<SockI, BC>,
898    DC: TransportIpContext<WireI, BC, DeviceId = CC::DeviceId, WeakDeviceId = CC::WeakDeviceId>
899        + DeviceIpSocketHandler<WireI, BC>
900        + TcpDemuxContext<WireI, CC::WeakDeviceId, BC>
901        + TcpCounterContext<SockI, CC::WeakDeviceId, BC>,
902{
903    let (maybe_listener, sharing, listener_addr) = assert_matches!(
904        socket_state,
905        TcpSocketStateInner::Bound(BoundSocketState::Listener(l)) => l,
906        "invalid socket ID"
907    );
908
909    let ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) } =
910        incoming_addrs;
911
912    let Listener { accept_queue, backlog, buffer_sizes, socket_options } = match maybe_listener {
913        MaybeListener::Bound(_bound) => {
914            // If the socket is only bound, but not listening.
915            return ListenerIncomingSegmentDisposition::NoMatchingSocket;
916        }
917        MaybeListener::Listener(listener) => listener,
918    };
919
920    // Note that this checks happens at the very beginning, before we try to
921    // reuse the connection in TIME-WAIT, this is because we need to store the
922    // reused connection in the accept queue so we have to respect its limit.
923    if accept_queue.len() == backlog.get() {
924        core_ctx.increment_both(listener_id, |counters| &counters.listener_queue_overflow);
925        core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
926        debug!("incoming SYN dropped because of the full backlog of the listener");
927        return ListenerIncomingSegmentDisposition::FoundSocket;
928    }
929
930    // Ensure that if the remote address requires a zone, we propagate that to
931    // the address for the connected socket.
932    let bound_device = listener_addr.as_ref().clone();
933    let bound_device = if remote_ip.as_ref().must_have_zone() {
934        Some(bound_device.map_or(EitherDeviceId::Strong(incoming_device), EitherDeviceId::Weak))
935    } else {
936        bound_device.map(EitherDeviceId::Weak)
937    };
938
939    let ip_options = TcpIpSockOptions { marks: *marks, ..socket_options.ip_options };
940    let socket_options = SocketOptions { ip_options, ..*socket_options };
941
942    let bound_device = bound_device.as_ref().map(|d| d.as_ref());
943    let ip_sock = match core_ctx.new_ip_socket(
944        bindings_ctx,
945        bound_device,
946        IpDeviceAddr::new_from_socket_ip_addr(local_ip),
947        remote_ip,
948        IpProto::Tcp.into(),
949        &ip_options,
950    ) {
951        Ok(ip_sock) => ip_sock,
952        err @ Err(IpSockCreationError::Route(_)) => {
953            core_ctx.increment_both(listener_id, |counters| &counters.passive_open_no_route_errors);
954            core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
955            debug!("cannot construct an ip socket to the SYN originator: {:?}, ignoring", err);
956            return ListenerIncomingSegmentDisposition::NoMatchingSocket;
957        }
958    };
959
960    let isn = isn.generate(
961        bindings_ctx.now(),
962        (ip_sock.local_ip().clone().into(), local_port),
963        (ip_sock.remote_ip().clone(), remote_port),
964    );
965    let device_mms = match core_ctx.get_mms(bindings_ctx, &ip_sock, &socket_options.ip_options) {
966        Ok(mms) => mms,
967        Err(err) => {
968            // If we cannot find a device or the device's MTU is too small,
969            // there isn't much we can do here since sending a RST back is
970            // impossible, we just need to silent drop the segment.
971            error!("Cannot find a device with large enough MTU for the connection");
972            core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
973            match err {
974                MmsError::NoDevice(_) | MmsError::MTUTooSmall(_) => {
975                    return ListenerIncomingSegmentDisposition::FoundSocket;
976                }
977            }
978        }
979    };
980    let Some(device_mss) = Mss::from_mms(device_mms) else {
981        return ListenerIncomingSegmentDisposition::FoundSocket;
982    };
983
984    let mut state = State::Listen(Closed::<Initial>::listen(
985        isn,
986        buffer_sizes.clone(),
987        device_mss,
988        Mss::default::<WireI>(),
989        socket_options.user_timeout,
990    ));
991
992    // Prepare a reply to be sent out.
993    //
994    // We might end up discarding the reply in case we can't instantiate this
995    // new connection.
996    let result = state.on_segment::<_, BC>(
997        // NB: This is a bit of a lie, we're passing the listener ID to process
998        // the first segment because we don't have an ID allocated yet. This is
999        // okay because the state machine ID is only for debugging purposes.
1000        &listener_id.either(),
1001        &TcpCountersRefs::from_ctx(core_ctx, listener_id),
1002        incoming,
1003        bindings_ctx.now(),
1004        &SocketOptions::default(),
1005        false, /* defunct */
1006    );
1007    let reply = assert_matches!(
1008        result,
1009        (reply, None, /* data_acked */ _, NewlyClosed::No /* can't become closed */) => reply
1010    );
1011
1012    let result = if matches!(state, State::SynRcvd(_)) {
1013        let poll_send_at = state.poll_send_at().expect("no retrans timer");
1014        let ListenerSharingState { sharing, listening: _ } = *sharing;
1015        let bound_device = ip_sock.device().cloned();
1016
1017        let addr = ConnAddr {
1018            ip: ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) },
1019            device: bound_device,
1020        };
1021
1022        let new_socket = core_ctx.with_demux_mut(|DemuxState { socketmap, .. }| {
1023            // If we're reusing an entry, remove it from the demux before
1024            // proceeding.
1025            //
1026            // We could just reuse the old allocation for the new connection but
1027            // because of the restrictions on the socket map data structure (for
1028            // good reasons), we can't update the sharing info unconditionally.
1029            // So here we just remove the old connection and create a new one.
1030            // Also this approach has the benefit of not accidentally persisting
1031            // the old state that we don't want.
1032            if let Some((tw_reuse, conn_addr)) = tw_reuse {
1033                match socketmap.conns_mut().remove(tw_reuse, &conn_addr) {
1034                    Ok(()) => {
1035                        // NB: We're removing the tw_reuse connection from the
1036                        // demux here, but not canceling its timer. The timer is
1037                        // canceled via drop when we destroy the socket. Special
1038                        // care is taken when handling timers in the time wait
1039                        // state to account for this.
1040                    }
1041                    Err(NotFoundError) => {
1042                        // We could lose a race trying to reuse the tw_reuse
1043                        // socket, so we just accept the loss and be happy that
1044                        // the conn_addr we want to use is free.
1045                    }
1046                }
1047            }
1048
1049            // Try to create and add the new socket to the demux.
1050            let accept_queue_clone = accept_queue.clone();
1051            let ip_sock = ip_sock.clone();
1052            let bindings_ctx_moved = &mut *bindings_ctx;
1053            match socketmap.conns_mut().try_insert_with(addr, sharing, move |addr, sharing| {
1054                let conn = make_connection(
1055                    Connection {
1056                        accept_queue: Some(accept_queue_clone),
1057                        state,
1058                        ip_sock,
1059                        defunct: false,
1060                        socket_options,
1061                        soft_error: None,
1062                        handshake_status: HandshakeStatus::Pending,
1063                    },
1064                    addr,
1065                );
1066
1067                let (id, primary) = TcpSocketId::new_cyclic(|weak| {
1068                    let mut timer = CC::new_timer(bindings_ctx_moved, weak);
1069                    // Schedule the timer here because we can't acquire the lock
1070                    // later. This only runs when inserting into the demux
1071                    // succeeds so it's okay.
1072                    assert_eq!(
1073                        bindings_ctx_moved.schedule_timer_instant(poll_send_at, &mut timer),
1074                        None
1075                    );
1076                    TcpSocketStateInner::Bound(BoundSocketState::Connected { conn, sharing, timer })
1077                });
1078                (make_demux_id(id.clone()), (primary, id))
1079            }) {
1080                Ok((_entry, (primary, id))) => {
1081                    // Make sure the new socket is in the pending accept queue
1082                    // before we release the demux lock.
1083                    accept_queue.push_pending(id);
1084                    Some(primary)
1085                }
1086                Err((e, _sharing_state)) => {
1087                    // The only error we accept here is if the entry exists
1088                    // fully, any indirect conflicts are unexpected because we
1089                    // know the listener is still alive and installed in the
1090                    // demux.
1091                    assert_matches!(e, InsertError::Exists);
1092                    // If we fail to insert it means we lost a race and this
1093                    // packet is destined to a connection that is already
1094                    // established. In that case we should tell the demux code
1095                    // to retry demuxing it all over again.
1096                    None
1097                }
1098            }
1099        });
1100
1101        match new_socket {
1102            Some(new_socket) => ListenerIncomingSegmentDisposition::NewConnection(new_socket),
1103            None => {
1104                // We didn't create a new connection, short circuit early and
1105                // don't send out the pending segment.
1106                core_ctx
1107                    .increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1108                return ListenerIncomingSegmentDisposition::ConflictingConnection;
1109            }
1110        }
1111    } else {
1112        // We found a valid listener for the segment even if the connection
1113        // state is not a newly pending connection.
1114        ListenerIncomingSegmentDisposition::FoundSocket
1115    };
1116
1117    // We can send a reply now if we got here.
1118    if let Some(seg) = reply {
1119        socket::send_tcp_segment(
1120            core_ctx,
1121            bindings_ctx,
1122            Some(&listener_id),
1123            Some(&ip_sock),
1124            incoming_addrs,
1125            seg.into_empty(),
1126            &socket_options.ip_options,
1127        );
1128    }
1129
1130    result
1131}
1132
1133pub(super) fn tcp_serialize_segment<'a, I, P>(
1134    header: &'a SegmentHeader,
1135    data: P,
1136    conn_addr: ConnIpAddr<I::Addr, NonZeroU16, NonZeroU16>,
1137) -> impl TransportPacketSerializer<I, Buffer = EmptyBuf> + Debug + 'a
1138where
1139    I: IpExt,
1140    P: InnerPacketBuilder + Debug + Payload + 'a,
1141{
1142    let SegmentHeader { seq, ack, wnd, control, options, .. } = header;
1143    let ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) } = conn_addr;
1144    let mut builder = TcpSegmentBuilder::new(
1145        local_ip.addr(),
1146        remote_ip.addr(),
1147        local_port,
1148        remote_port,
1149        (*seq).into(),
1150        ack.map(Into::into),
1151        u16::from(*wnd),
1152    );
1153    match control {
1154        None => {}
1155        Some(Control::SYN) => builder.syn(true),
1156        Some(Control::FIN) => builder.fin(true),
1157        Some(Control::RST) => builder.rst(true),
1158    }
1159    data.into_serializer().encapsulate(
1160        TcpSegmentBuilderWithOptions::new(builder, options.iter()).unwrap_or_else(
1161            |TcpOptionsTooLongError| {
1162                panic!("Too many TCP options");
1163            },
1164        ),
1165    )
1166}
1167
1168#[cfg(test)]
1169mod test {
1170    use ip_test_macro::ip_test;
1171    use netstack3_base::testutil::TestIpExt;
1172    use netstack3_base::{HandshakeOptions, UnscaledWindowSize};
1173    use packet::ParseBuffer as _;
1174    use test_case::test_case;
1175
1176    use super::*;
1177
1178    const SEQ: SeqNum = SeqNum::new(12345);
1179    const ACK: SeqNum = SeqNum::new(67890);
1180    const FAKE_DATA: &'static [u8] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 0];
1181
1182    #[ip_test(I)]
1183    #[test_case(
1184        Segment::syn(SEQ, UnscaledWindowSize::from(u16::MAX),
1185        HandshakeOptions::default().into()), &[]
1186        ; "syn")]
1187    #[test_case(
1188        Segment::syn(SEQ, UnscaledWindowSize::from(u16::MAX),
1189        HandshakeOptions {
1190            mss: Some(Mss(NonZeroU16::new(1440 as u16).unwrap())),
1191            ..Default::default() }.into()), &[]
1192            ; "syn with mss")]
1193    #[test_case(Segment::ack(SEQ, ACK, UnscaledWindowSize::from(u16::MAX)), &[]; "ack")]
1194    #[test_case(Segment::with_fake_data(SEQ, ACK, FAKE_DATA), FAKE_DATA; "data")]
1195    fn tcp_serialize_segment<I: TestIpExt>(segment: Segment<&[u8]>, expected_body: &[u8]) {
1196        const SOURCE_PORT: NonZeroU16 = NonZeroU16::new(1111).unwrap();
1197        const DEST_PORT: NonZeroU16 = NonZeroU16::new(2222).unwrap();
1198
1199        let Segment { header, data } = segment;
1200        let serializer = super::tcp_serialize_segment::<I, _>(
1201            &header,
1202            data,
1203            ConnIpAddr {
1204                local: (SocketIpAddr::try_from(I::TEST_ADDRS.local_ip).unwrap(), SOURCE_PORT),
1205                remote: (SocketIpAddr::try_from(I::TEST_ADDRS.remote_ip).unwrap(), DEST_PORT),
1206            },
1207        );
1208
1209        let mut serialized = serializer.serialize_vec_outer().unwrap().unwrap_b();
1210        let parsed_segment = serialized
1211            .parse_with::<_, TcpSegment<_>>(TcpParseArgs::new(
1212                *I::TEST_ADDRS.remote_ip,
1213                *I::TEST_ADDRS.local_ip,
1214            ))
1215            .expect("is valid segment");
1216
1217        assert_eq!(parsed_segment.src_port(), SOURCE_PORT);
1218        assert_eq!(parsed_segment.dst_port(), DEST_PORT);
1219        assert_eq!(parsed_segment.seq_num(), u32::from(SEQ));
1220        assert_eq!(
1221            UnscaledWindowSize::from(parsed_segment.window_size()),
1222            UnscaledWindowSize::from(u16::MAX)
1223        );
1224        let options = header.options;
1225        assert_eq!(options.iter().count(), parsed_segment.iter_options().count());
1226        for (orig, parsed) in options.iter().zip(parsed_segment.iter_options()) {
1227            assert_eq!(orig, parsed);
1228        }
1229        assert_eq!(parsed_segment.into_body(), expected_body);
1230    }
1231}