netstack3_tcp/socket/
demux.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Defines the entry point of TCP packets, by directing them into the correct
6//! state machine.
7
8use alloc::collections::hash_map;
9use core::fmt::Debug;
10use core::num::NonZeroU16;
11
12use assert_matches::assert_matches;
13use log::{debug, error, warn};
14use net_types::SpecifiedAddr;
15use netstack3_base::socket::{
16    AddrIsMappedError, AddrVec, AddrVecIter, ConnAddr, ConnIpAddr, InsertError, ListenerAddr,
17    ListenerIpAddr, SocketIpAddr, SocketIpAddrExt as _,
18};
19use netstack3_base::{
20    BidirectionalConverter as _, Control, CounterContext, CtxPair, EitherDeviceId, IpDeviceAddr,
21    Marks, Mss, NotFoundError, Payload, Segment, SegmentHeader, SeqNum,
22    StrongDeviceIdentifier as _, WeakDeviceIdentifier,
23};
24use netstack3_filter::TransportPacketSerializer;
25use netstack3_ip::socket::{IpSockCreationError, MmsError};
26use netstack3_ip::{
27    IpHeaderInfo, IpTransportContext, LocalDeliveryPacketInfo, ReceiveIpPacketMeta,
28    TransportIpContext, TransportReceiveError,
29};
30use netstack3_trace::trace_duration;
31use packet::{BufferMut, BufferView as _, EmptyBuf, InnerPacketBuilder, Serializer as _};
32use packet_formats::error::ParseError;
33use packet_formats::ip::{IpExt, IpProto};
34use packet_formats::tcp::{
35    TcpFlowAndSeqNum, TcpOptionsTooLongError, TcpParseArgs, TcpSegment, TcpSegmentBuilder,
36    TcpSegmentBuilderWithOptions,
37};
38
39use crate::internal::base::{BufferSizes, ConnectionError, SocketOptions, TcpIpSockOptions};
40use crate::internal::counters::{
41    self, TcpCounterContext, TcpCountersRefs, TcpCountersWithoutSocket,
42};
43use crate::internal::socket::isn::IsnGenerator;
44use crate::internal::socket::{
45    self, AsThisStack as _, BoundSocketState, Connection, DemuxState, DeviceIpSocketHandler,
46    DualStackDemuxIdConverter as _, DualStackIpExt, EitherStack, HandshakeStatus, Listener,
47    ListenerAddrState, ListenerSharingState, MaybeDualStack, MaybeListener, PrimaryRc, TcpApi,
48    TcpBindingsContext, TcpBindingsTypes, TcpContext, TcpDemuxContext, TcpDualStackContext,
49    TcpIpTransportContext, TcpPortSpec, TcpSocketId, TcpSocketSetEntry, TcpSocketState,
50    TcpSocketStateInner,
51};
52use crate::internal::state::{
53    BufferProvider, Closed, DataAcked, Initial, NewlyClosed, State, TimeWait,
54};
55
56impl<BT: TcpBindingsTypes> BufferProvider<BT::ReceiveBuffer, BT::SendBuffer> for BT {
57    type ActiveOpen = BT::ListenerNotifierOrProvidedBuffers;
58
59    type PassiveOpen = BT::ReturnedBuffers;
60
61    fn new_passive_open_buffers(
62        buffer_sizes: BufferSizes,
63    ) -> (BT::ReceiveBuffer, BT::SendBuffer, Self::PassiveOpen) {
64        BT::new_passive_open_buffers(buffer_sizes)
65    }
66}
67
68impl<I, BC, CC> IpTransportContext<I, BC, CC> for TcpIpTransportContext
69where
70    I: DualStackIpExt,
71    BC: TcpBindingsContext
72        + BufferProvider<
73            BC::ReceiveBuffer,
74            BC::SendBuffer,
75            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
76            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
77        >,
78    CC: TcpContext<I, BC> + TcpContext<I::OtherVersion, BC>,
79{
80    fn receive_icmp_error(
81        core_ctx: &mut CC,
82        bindings_ctx: &mut BC,
83        _device: &CC::DeviceId,
84        original_src_ip: Option<SpecifiedAddr<I::Addr>>,
85        original_dst_ip: SpecifiedAddr<I::Addr>,
86        mut original_body: &[u8],
87        err: I::ErrorCode,
88    ) {
89        let mut buffer = &mut original_body;
90        let Some(flow_and_seqnum) = buffer.take_obj_front::<TcpFlowAndSeqNum>() else {
91            error!("received an ICMP error but its body is less than 8 bytes");
92            return;
93        };
94
95        let Some(original_src_ip) = original_src_ip else { return };
96        let Some(original_src_port) = NonZeroU16::new(flow_and_seqnum.src_port()) else { return };
97        let Some(original_dst_port) = NonZeroU16::new(flow_and_seqnum.dst_port()) else { return };
98        let original_seqnum = SeqNum::new(flow_and_seqnum.sequence_num());
99
100        TcpApi::<I, _>::new(CtxPair { core_ctx, bindings_ctx }).on_icmp_error(
101            original_src_ip,
102            original_dst_ip,
103            original_src_port,
104            original_dst_port,
105            original_seqnum,
106            err.into(),
107        );
108    }
109
110    fn receive_ip_packet<B: BufferMut, H: IpHeaderInfo<I>>(
111        core_ctx: &mut CC,
112        bindings_ctx: &mut BC,
113        device: &CC::DeviceId,
114        remote_ip: I::RecvSrcAddr,
115        local_ip: SpecifiedAddr<I::Addr>,
116        mut buffer: B,
117        info: &LocalDeliveryPacketInfo<I, H>,
118    ) -> Result<(), (B, TransportReceiveError)> {
119        let LocalDeliveryPacketInfo { meta, header_info: _, marks } = info;
120        let ReceiveIpPacketMeta { broadcast, transparent_override } = meta;
121        if let Some(delivery) = transparent_override {
122            warn!(
123                "TODO(https://fxbug.dev/337009139): transparent proxy not supported for TCP \
124                sockets; will not override dispatch to perform local delivery to {delivery:?}"
125            );
126        }
127
128        if broadcast.is_some() {
129            CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
130                .invalid_ip_addrs_received
131                .increment();
132            debug!("tcp: dropping broadcast TCP packet");
133            return Ok(());
134        }
135
136        let remote_ip = match SpecifiedAddr::new(remote_ip.into()) {
137            None => {
138                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
139                    .invalid_ip_addrs_received
140                    .increment();
141                debug!("tcp: source address unspecified, dropping the packet");
142                return Ok(());
143            }
144            Some(src_ip) => src_ip,
145        };
146        let remote_ip: SocketIpAddr<_> = match remote_ip.try_into() {
147            Ok(remote_ip) => remote_ip,
148            Err(AddrIsMappedError {}) => {
149                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
150                    .invalid_ip_addrs_received
151                    .increment();
152                debug!("tcp: source address is mapped (ipv4-mapped-ipv6), dropping the packet");
153                return Ok(());
154            }
155        };
156        let local_ip: SocketIpAddr<_> = match local_ip.try_into() {
157            Ok(local_ip) => local_ip,
158            Err(AddrIsMappedError {}) => {
159                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
160                    .invalid_ip_addrs_received
161                    .increment();
162                debug!("tcp: local address is mapped (ipv4-mapped-ipv6), dropping the packet");
163                return Ok(());
164            }
165        };
166        let packet = match buffer
167            .parse_with::<_, TcpSegment<_>>(TcpParseArgs::new(remote_ip.addr(), local_ip.addr()))
168        {
169            Ok(packet) => packet,
170            Err(err) => {
171                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
172                    .invalid_segments_received
173                    .increment();
174                debug!("tcp: failed parsing incoming packet {:?}", err);
175                match err {
176                    ParseError::Checksum => {
177                        CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
178                            .checksum_errors
179                            .increment();
180                    }
181                    ParseError::NotSupported | ParseError::NotExpected | ParseError::Format => {}
182                }
183                return Ok(());
184            }
185        };
186        let local_port = packet.dst_port();
187        let remote_port = packet.src_port();
188        let incoming = match Segment::try_from(packet) {
189            Ok(segment) => segment,
190            Err(err) => {
191                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
192                    .invalid_segments_received
193                    .increment();
194                debug!("tcp: malformed segment {:?}", err);
195                return Ok(());
196            }
197        };
198        let conn_addr =
199            ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) };
200
201        CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
202            .valid_segments_received
203            .increment();
204        handle_incoming_packet::<I, _, _>(
205            core_ctx,
206            bindings_ctx,
207            conn_addr,
208            device,
209            incoming,
210            marks,
211        );
212        Ok(())
213    }
214}
215
216fn handle_incoming_packet<WireI, BC, CC>(
217    core_ctx: &mut CC,
218    bindings_ctx: &mut BC,
219    conn_addr: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
220    incoming_device: &CC::DeviceId,
221    incoming: Segment<&[u8]>,
222    marks: &Marks,
223) where
224    WireI: DualStackIpExt,
225    BC: TcpBindingsContext
226        + BufferProvider<
227            BC::ReceiveBuffer,
228            BC::SendBuffer,
229            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
230            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
231        >,
232    CC: TcpContext<WireI, BC> + TcpContext<WireI::OtherVersion, BC>,
233{
234    trace_duration!(c"tcp::handle_incoming_packet");
235    let mut tw_reuse = None;
236
237    let mut addrs_to_search = AddrVecIter::<WireI, CC::WeakDeviceId, TcpPortSpec>::with_device(
238        conn_addr.into(),
239        incoming_device.downgrade(),
240    );
241
242    enum FoundSocket<S> {
243        // Typically holds the demux ID of the found socket, but may hold
244        // `None` if the found socket was destroyed as a result of the segment.
245        Yes(Option<S>),
246        No,
247    }
248    let found_socket = loop {
249        let sock = core_ctx
250            .with_demux(|demux| lookup_socket::<WireI, CC, BC>(demux, &mut addrs_to_search));
251        match sock {
252            None => break FoundSocket::No,
253            Some(SocketLookupResult::Connection(demux_conn_id, conn_addr)) => {
254                // It is not possible to have two same connections that
255                // share the same local and remote IPs and ports.
256                assert_eq!(tw_reuse, None);
257                let disposition = match WireI::as_dual_stack_ip_socket(&demux_conn_id) {
258                    EitherStack::ThisStack(conn_id) => {
259                        try_handle_incoming_for_connection_dual_stack(
260                            core_ctx,
261                            bindings_ctx,
262                            conn_id,
263                            incoming.clone(),
264                        )
265                    }
266                    EitherStack::OtherStack(conn_id) => {
267                        try_handle_incoming_for_connection_dual_stack(
268                            core_ctx,
269                            bindings_ctx,
270                            conn_id,
271                            incoming.clone(),
272                        )
273                    }
274                };
275                match disposition {
276                    ConnectionIncomingSegmentDisposition::Destroy => {
277                        WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, demux_conn_id);
278                        break FoundSocket::Yes(None);
279                    }
280                    ConnectionIncomingSegmentDisposition::FoundSocket => {
281                        break FoundSocket::Yes(Some(demux_conn_id));
282                    }
283                    ConnectionIncomingSegmentDisposition::ReuseCandidateForListener => {
284                        tw_reuse = Some((demux_conn_id, conn_addr));
285                    }
286                }
287            }
288            Some(SocketLookupResult::Listener((demux_listener_id, _listener_addr))) => {
289                match WireI::as_dual_stack_ip_socket(&demux_listener_id) {
290                    EitherStack::ThisStack(listener_id) => {
291                        let disposition = core_ctx.with_socket_mut_isn_transport_demux(
292                            &listener_id,
293                            |core_ctx, socket_state, isn| {
294                                let TcpSocketState { socket_state, ip_options: _ } = socket_state;
295                                match core_ctx {
296                                    MaybeDualStack::NotDualStack((core_ctx, converter)) => {
297                                        try_handle_incoming_for_listener::<WireI, WireI, CC, BC, _>(
298                                            core_ctx,
299                                            bindings_ctx,
300                                            &listener_id,
301                                            isn,
302                                            socket_state,
303                                            incoming.clone(),
304                                            conn_addr,
305                                            incoming_device,
306                                            &mut tw_reuse,
307                                            move |conn, addr| converter.convert_back((conn, addr)),
308                                            WireI::into_demux_socket_id,
309                                            marks,
310                                        )
311                                    }
312                                    MaybeDualStack::DualStack((core_ctx, converter)) => {
313                                        try_handle_incoming_for_listener::<_, _, CC, BC, _>(
314                                            core_ctx,
315                                            bindings_ctx,
316                                            &listener_id,
317                                            isn,
318                                            socket_state,
319                                            incoming.clone(),
320                                            conn_addr,
321                                            incoming_device,
322                                            &mut tw_reuse,
323                                            move |conn, addr| {
324                                                converter.convert_back(EitherStack::ThisStack((
325                                                    conn, addr,
326                                                )))
327                                            },
328                                            WireI::into_demux_socket_id,
329                                            marks,
330                                        )
331                                    }
332                                }
333                            },
334                        );
335                        if try_handle_listener_incoming_disposition(
336                            core_ctx,
337                            bindings_ctx,
338                            disposition,
339                            &demux_listener_id,
340                            &mut tw_reuse,
341                            &mut addrs_to_search,
342                            conn_addr,
343                            incoming_device,
344                        ) {
345                            break FoundSocket::Yes(Some(demux_listener_id));
346                        }
347                    }
348                    EitherStack::OtherStack(listener_id) => {
349                        let disposition = core_ctx.with_socket_mut_isn_transport_demux(
350                            &listener_id,
351                            |core_ctx, socket_state, isn| {
352                                let TcpSocketState { socket_state, ip_options: _ } = socket_state;
353                                match core_ctx {
354                                    MaybeDualStack::NotDualStack((_core_ctx, _converter)) => {
355                                        // TODO(https://issues.fuchsia.dev/316408184):
356                                        // Remove this unreachable!.
357                                        unreachable!("OtherStack socket ID with non dual stack");
358                                    }
359                                    MaybeDualStack::DualStack((core_ctx, converter)) => {
360                                        let other_demux_id_converter =
361                                            core_ctx.other_demux_id_converter();
362                                        try_handle_incoming_for_listener::<_, _, CC, BC, _>(
363                                            core_ctx,
364                                            bindings_ctx,
365                                            &listener_id,
366                                            isn,
367                                            socket_state,
368                                            incoming.clone(),
369                                            conn_addr,
370                                            incoming_device,
371                                            &mut tw_reuse,
372                                            move |conn, addr| {
373                                                converter.convert_back(EitherStack::OtherStack((
374                                                    conn, addr,
375                                                )))
376                                            },
377                                            move |id| other_demux_id_converter.convert(id),
378                                            marks,
379                                        )
380                                    }
381                                }
382                            },
383                        );
384                        if try_handle_listener_incoming_disposition::<_, _, CC, BC, _>(
385                            core_ctx,
386                            bindings_ctx,
387                            disposition,
388                            &demux_listener_id,
389                            &mut tw_reuse,
390                            &mut addrs_to_search,
391                            conn_addr,
392                            incoming_device,
393                        ) {
394                            break FoundSocket::Yes(Some(demux_listener_id));
395                        }
396                    }
397                };
398            }
399        }
400    };
401
402    let demux_id = match found_socket {
403        FoundSocket::No => {
404            CounterContext::<TcpCountersWithoutSocket<WireI>>::counters(core_ctx)
405                .received_segments_no_dispatch
406                .increment();
407
408            // There is no existing TCP state, pretend it is closed
409            // and generate a RST if needed.
410            // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-21):
411            // CLOSED is fictional because it represents the state when
412            // there is no TCB, and therefore, no connection.
413            if let Some(seg) =
414                (Closed { reason: None::<Option<ConnectionError>> }.on_segment(&incoming))
415            {
416                socket::send_tcp_segment::<WireI, WireI, _, _, _>(
417                    core_ctx,
418                    bindings_ctx,
419                    None,
420                    None,
421                    conn_addr,
422                    seg.into_empty(),
423                    &TcpIpSockOptions { marks: *marks },
424                );
425            }
426            None
427        }
428        FoundSocket::Yes(demux_id) => {
429            counters::increment_counter_with_optional_demux_id::<WireI, _, _, _, _>(
430                core_ctx,
431                demux_id.as_ref(),
432                |c| &c.received_segments_dispatched,
433            );
434            demux_id
435        }
436    };
437
438    match incoming.header().control {
439        None => {}
440        Some(control) => counters::increment_counter_with_optional_demux_id::<WireI, _, _, _, _>(
441            core_ctx,
442            demux_id.as_ref(),
443            |c| match control {
444                Control::RST => &c.resets_received,
445                Control::SYN => &c.syns_received,
446                Control::FIN => &c.fins_received,
447            },
448        ),
449    }
450}
451
452enum SocketLookupResult<I: DualStackIpExt, D: WeakDeviceIdentifier, BT: TcpBindingsTypes> {
453    Connection(I::DemuxSocketId<D, BT>, ConnAddr<ConnIpAddr<I::Addr, NonZeroU16, NonZeroU16>, D>),
454    Listener((I::DemuxSocketId<D, BT>, ListenerAddr<ListenerIpAddr<I::Addr, NonZeroU16>, D>)),
455}
456
457fn lookup_socket<I, CC, BC>(
458    DemuxState { socketmap, .. }: &DemuxState<I, CC::WeakDeviceId, BC>,
459    addrs_to_search: &mut AddrVecIter<I, CC::WeakDeviceId, TcpPortSpec>,
460) -> Option<SocketLookupResult<I, CC::WeakDeviceId, BC>>
461where
462    I: DualStackIpExt,
463    BC: TcpBindingsContext,
464    CC: TcpContext<I, BC>,
465{
466    addrs_to_search.find_map(|addr| {
467        match addr {
468            // Connections are always searched before listeners because they
469            // are more specific.
470            AddrVec::Conn(conn_addr) => {
471                socketmap.conns().get_by_addr(&conn_addr).map(|conn_addr_state| {
472                    SocketLookupResult::Connection(conn_addr_state.id(), conn_addr)
473                })
474            }
475            AddrVec::Listen(listener_addr) => {
476                // If we have a listener and the incoming segment is a SYN, we
477                // allocate a new connection entry in the demuxer.
478                // TODO(https://fxbug.dev/42052878): Support SYN cookies.
479
480                socketmap
481                    .listeners()
482                    .get_by_addr(&listener_addr)
483                    .and_then(|addr_state| match addr_state {
484                        ListenerAddrState::ExclusiveListener(id) => Some(id.clone()),
485                        ListenerAddrState::Shared { listener: Some(id), bound: _ } => {
486                            Some(id.clone())
487                        }
488                        ListenerAddrState::ExclusiveBound(_)
489                        | ListenerAddrState::Shared { listener: None, bound: _ } => None,
490                    })
491                    .map(|id| SocketLookupResult::Listener((id, listener_addr)))
492            }
493        }
494    })
495}
496
497#[derive(PartialEq, Eq)]
498enum ConnectionIncomingSegmentDisposition {
499    FoundSocket,
500    ReuseCandidateForListener,
501    Destroy,
502}
503
504enum ListenerIncomingSegmentDisposition<S> {
505    FoundSocket,
506    ConflictingConnection,
507    NoMatchingSocket,
508    NewConnection(S),
509}
510
511fn try_handle_incoming_for_connection_dual_stack<SockI, CC, BC>(
512    core_ctx: &mut CC,
513    bindings_ctx: &mut BC,
514    conn_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
515    incoming: Segment<&[u8]>,
516) -> ConnectionIncomingSegmentDisposition
517where
518    SockI: DualStackIpExt,
519    BC: TcpBindingsContext
520        + BufferProvider<
521            BC::ReceiveBuffer,
522            BC::SendBuffer,
523            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
524            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
525        >,
526    CC: TcpContext<SockI, BC>,
527{
528    core_ctx.with_socket_mut_transport_demux(conn_id, |core_ctx, socket_state| {
529        let TcpSocketState { socket_state, ip_options: _ } = socket_state;
530        let (conn_and_addr, timer) = assert_matches!(
531            socket_state,
532            TcpSocketStateInner::Bound(BoundSocketState::Connected {
533                 conn, timer, sharing: _
534            }) => (conn , timer),
535            "invalid socket ID"
536        );
537        let this_or_other_stack = match core_ctx {
538            MaybeDualStack::DualStack((core_ctx, converter)) => {
539                match converter.convert(conn_and_addr) {
540                    EitherStack::ThisStack((conn, conn_addr)) => {
541                        // The socket belongs to the current stack, so we
542                        // want to deliver the segment to this stack.
543                        // Use `as_this_stack` to make the context types
544                        // match with the non-dual-stack case.
545                        EitherStack::ThisStack((
546                            core_ctx.as_this_stack(),
547                            conn,
548                            conn_addr,
549                            SockI::into_demux_socket_id(conn_id.clone()),
550                        ))
551                    }
552                    EitherStack::OtherStack((conn, conn_addr)) => {
553                        // We need to deliver from the other stack. i.e. we
554                        // need to deliver an IPv4 packet to the IPv6 stack.
555                        let demux_sock_id = core_ctx.into_other_demux_socket_id(conn_id.clone());
556                        EitherStack::OtherStack((core_ctx, conn, conn_addr, demux_sock_id))
557                    }
558                }
559            }
560            MaybeDualStack::NotDualStack((core_ctx, converter)) => {
561                let (conn, conn_addr) = converter.convert(conn_and_addr);
562                // Similar to the first case, we need deliver to this stack,
563                // but use `as_this_stack` to make the types match.
564                EitherStack::ThisStack((
565                    core_ctx.as_this_stack(),
566                    conn,
567                    conn_addr,
568                    SockI::into_demux_socket_id(conn_id.clone()),
569                ))
570            }
571        };
572
573        match this_or_other_stack {
574            EitherStack::ThisStack((core_ctx, conn, conn_addr, demux_conn_id)) => {
575                try_handle_incoming_for_connection::<_, _, CC, _, _>(
576                    core_ctx,
577                    bindings_ctx,
578                    conn_addr.clone(),
579                    conn_id,
580                    demux_conn_id,
581                    conn,
582                    timer,
583                    incoming,
584                )
585            }
586            EitherStack::OtherStack((core_ctx, conn, conn_addr, demux_conn_id)) => {
587                try_handle_incoming_for_connection::<_, _, CC, _, _>(
588                    core_ctx,
589                    bindings_ctx,
590                    conn_addr.clone(),
591                    conn_id,
592                    demux_conn_id,
593                    conn,
594                    timer,
595                    incoming,
596                )
597            }
598        }
599    })
600}
601
602/// Tries to handle the incoming segment by providing it to a connected socket.
603///
604/// Returns `FoundSocket` if the segment was handled; Otherwise,
605/// `ReuseCandidateForListener` will be returned if there is a defunct socket
606/// that is currently in TIME_WAIT, which is ready to be reused if there is an
607/// active listener listening on the port.
608fn try_handle_incoming_for_connection<SockI, WireI, CC, BC, DC>(
609    core_ctx: &mut DC,
610    bindings_ctx: &mut BC,
611    conn_addr: ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
612    conn_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
613    demux_id: WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
614    conn: &mut Connection<SockI, WireI, CC::WeakDeviceId, BC>,
615    timer: &mut BC::Timer,
616    incoming: Segment<&[u8]>,
617) -> ConnectionIncomingSegmentDisposition
618where
619    SockI: DualStackIpExt,
620    WireI: DualStackIpExt,
621    BC: TcpBindingsContext
622        + BufferProvider<
623            BC::ReceiveBuffer,
624            BC::SendBuffer,
625            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
626            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
627        >,
628    CC: TcpContext<SockI, BC>,
629    DC: TransportIpContext<WireI, BC, DeviceId = CC::DeviceId, WeakDeviceId = CC::WeakDeviceId>
630        + DeviceIpSocketHandler<SockI, BC>
631        + TcpDemuxContext<WireI, CC::WeakDeviceId, BC>
632        + TcpCounterContext<SockI, CC::WeakDeviceId, BC>,
633{
634    let Connection {
635        accept_queue,
636        state,
637        ip_sock,
638        defunct,
639        socket_options,
640        soft_error: _,
641        handshake_status,
642    } = conn;
643
644    // Per RFC 9293 Section 3.6.1:
645    //   When a connection is closed actively, it MUST linger in the TIME-WAIT
646    //   state for a time 2xMSL (Maximum Segment Lifetime) (MUST-13). However,
647    //   it MAY accept a new SYN from the remote TCP endpoint to reopen the
648    //   connection directly from TIME-WAIT state (MAY-2), if it:
649    //
650    //   (1) assigns its initial sequence number for the new connection to be
651    //       larger than the largest sequence number it used on the previous
652    //       connection incarnation, and
653    //   (2) returns to TIME-WAIT state if the SYN turns out to be an old
654    //       duplicate.
655    if *defunct
656        && incoming.header().control == Some(Control::SYN)
657        && incoming.header().ack.is_none()
658    {
659        if let State::TimeWait(TimeWait { last_seq: _, closed_rcv, expiry: _ }) = state {
660            if !incoming.header().seq.before(closed_rcv.ack) {
661                return ConnectionIncomingSegmentDisposition::ReuseCandidateForListener;
662            }
663        }
664    }
665    let (reply, passive_open, data_acked, newly_closed) = state.on_segment::<_, BC>(
666        &conn_id.either(),
667        &TcpCountersRefs::from_ctx(core_ctx, conn_id),
668        incoming,
669        bindings_ctx.now(),
670        socket_options,
671        *defunct,
672    );
673
674    match data_acked {
675        DataAcked::Yes => {
676            core_ctx.confirm_reachable(bindings_ctx, ip_sock, &socket_options.ip_options)
677        }
678        DataAcked::No => {}
679    }
680
681    match state {
682        State::Listen(_) => {
683            unreachable!("has an invalid status: {:?}", conn.state)
684        }
685        State::SynSent(_) | State::SynRcvd(_) => {
686            assert_eq!(*handshake_status, HandshakeStatus::Pending)
687        }
688        State::Established(_)
689        | State::FinWait1(_)
690        | State::FinWait2(_)
691        | State::Closing(_)
692        | State::CloseWait(_)
693        | State::LastAck(_)
694        | State::TimeWait(_) => {
695            if handshake_status
696                .update_if_pending(HandshakeStatus::Completed { reported: accept_queue.is_some() })
697            {
698                core_ctx.confirm_reachable(bindings_ctx, ip_sock, &socket_options.ip_options);
699            }
700        }
701        State::Closed(Closed { reason }) => {
702            // We remove the socket from the socketmap and cancel the timers
703            // regardless of the socket being defunct or not. The justification
704            // is that CLOSED is a synthetic state and it means no connection
705            // exists, thus it should not exist in the demuxer.
706            //
707            // If the socket was already in the closed state we can assume it's
708            // no longer in the demux.
709            socket::handle_newly_closed(
710                core_ctx,
711                bindings_ctx,
712                newly_closed,
713                &demux_id,
714                &conn_addr,
715                timer,
716            );
717            if let Some(accept_queue) = accept_queue {
718                accept_queue.remove(&conn_id);
719                *defunct = true;
720            }
721            if *defunct {
722                // If the client has promised to not touch the socket again,
723                // we can destroy the socket finally.
724                return ConnectionIncomingSegmentDisposition::Destroy;
725            }
726            let _: bool = handshake_status.update_if_pending(match reason {
727                None => HandshakeStatus::Completed { reported: accept_queue.is_some() },
728                Some(_err) => HandshakeStatus::Aborted,
729            });
730        }
731    }
732
733    if let Some(seg) = reply {
734        socket::send_tcp_segment(
735            core_ctx,
736            bindings_ctx,
737            Some(conn_id),
738            Some(&ip_sock),
739            conn_addr.ip,
740            seg.into_empty(),
741            &socket_options.ip_options,
742        );
743    }
744
745    // Send any enqueued data, if there is any.
746    let limit = None;
747    socket::do_send_inner_and_then_handle_newly_closed(
748        conn_id,
749        &demux_id,
750        conn,
751        limit,
752        &conn_addr,
753        timer,
754        core_ctx,
755        bindings_ctx,
756    );
757
758    // Enqueue the connection to the associated listener
759    // socket's accept queue.
760    if let Some(passive_open) = passive_open {
761        let accept_queue = conn.accept_queue.as_ref().expect("no accept queue but passive open");
762        accept_queue.notify_ready(conn_id, passive_open);
763    }
764
765    // We found a valid connection for the segment.
766    ConnectionIncomingSegmentDisposition::FoundSocket
767}
768
769/// Responds to the disposition returned by [`try_handle_incoming_for_listener`].
770///
771/// Returns true if we have found the right socket and there is no need to
772/// continue the iteration for finding the next-best candidate.
773fn try_handle_listener_incoming_disposition<SockI, WireI, CC, BC, Addr>(
774    core_ctx: &mut CC,
775    bindings_ctx: &mut BC,
776    disposition: ListenerIncomingSegmentDisposition<PrimaryRc<SockI, CC::WeakDeviceId, BC>>,
777    demux_listener_id: &WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
778    tw_reuse: &mut Option<(WireI::DemuxSocketId<CC::WeakDeviceId, BC>, Addr)>,
779    addrs_to_search: &mut AddrVecIter<WireI, CC::WeakDeviceId, TcpPortSpec>,
780    conn_addr: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
781    incoming_device: &CC::DeviceId,
782) -> bool
783where
784    SockI: DualStackIpExt,
785    WireI: DualStackIpExt,
786    CC: TcpContext<SockI, BC> + TcpContext<WireI, BC> + TcpContext<WireI::OtherVersion, BC>,
787    BC: TcpBindingsContext,
788{
789    match disposition {
790        ListenerIncomingSegmentDisposition::FoundSocket => true,
791        ListenerIncomingSegmentDisposition::ConflictingConnection => {
792            // We're about to rewind the lookup. If we got a
793            // conflicting connection it means tw_reuse has been
794            // removed from the demux state and we need to destroy
795            // it.
796            if let Some((tw_reuse, _)) = tw_reuse.take() {
797                WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, tw_reuse);
798            }
799
800            // Reset the address vector iterator and go again, a
801            // conflicting connection was found.
802            *addrs_to_search = AddrVecIter::<WireI, CC::WeakDeviceId, TcpPortSpec>::with_device(
803                conn_addr.into(),
804                incoming_device.downgrade(),
805            );
806            false
807        }
808        ListenerIncomingSegmentDisposition::NoMatchingSocket => false,
809        ListenerIncomingSegmentDisposition::NewConnection(primary) => {
810            // If we have a new connection, we need to add it to the
811            // set of all sockets.
812
813            // First things first, if we got here then tw_reuse is
814            // gone so we need to destroy it.
815            if let Some((tw_reuse, _)) = tw_reuse.take() {
816                WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, tw_reuse);
817            }
818
819            // Now put the new connection into the socket map.
820            //
821            // Note that there's a possible subtle race here where
822            // another thread could have already operated further on
823            // this connection and marked it for destruction which
824            // puts the entry in the DOA state, if we see that we
825            // must immediately destroy the socket after having put
826            // it in the map.
827            let id = TcpSocketId(PrimaryRc::clone_strong(&primary));
828            let to_destroy = core_ctx.with_all_sockets_mut(move |all_sockets| {
829                let insert_entry = TcpSocketSetEntry::Primary(primary);
830                match all_sockets.entry(id) {
831                    hash_map::Entry::Vacant(v) => {
832                        let _: &mut _ = v.insert(insert_entry);
833                        None
834                    }
835                    hash_map::Entry::Occupied(mut o) => {
836                        // We're holding on to the primary ref, the
837                        // only possible state here should be a DOA
838                        // entry.
839                        assert_matches!(
840                            core::mem::replace(o.get_mut(), insert_entry),
841                            TcpSocketSetEntry::DeadOnArrival
842                        );
843                        Some(o.key().clone())
844                    }
845                }
846            });
847            // NB: we're releasing and reaquiring the
848            // all_sockets_mut lock here for the convenience of not
849            // needing different versions of `destroy_socket`. This
850            // should be fine because the race this is solving
851            // should not be common. If we have correct thread
852            // attribution per flow it should effectively become
853            // impossible so we go for code simplicity here.
854            if let Some(to_destroy) = to_destroy {
855                socket::destroy_socket(core_ctx, bindings_ctx, to_destroy);
856            }
857            counters::increment_counter_for_demux_id::<WireI, _, _, _, _>(
858                core_ctx,
859                demux_listener_id,
860                |c| &c.passive_connection_openings,
861            );
862            true
863        }
864    }
865}
866
867/// Tries to handle an incoming segment by passing it to a listening socket.
868///
869/// Returns `FoundSocket` if the segment was handled, otherwise `NoMatchingSocket`.
870fn try_handle_incoming_for_listener<SockI, WireI, CC, BC, DC>(
871    core_ctx: &mut DC,
872    bindings_ctx: &mut BC,
873    listener_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
874    isn: &IsnGenerator<BC::Instant>,
875    socket_state: &mut TcpSocketStateInner<SockI, CC::WeakDeviceId, BC>,
876    incoming: Segment<&[u8]>,
877    incoming_addrs: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
878    incoming_device: &CC::DeviceId,
879    tw_reuse: &mut Option<(
880        WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
881        ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
882    )>,
883    make_connection: impl FnOnce(
884        Connection<SockI, WireI, CC::WeakDeviceId, BC>,
885        ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
886    ) -> SockI::ConnectionAndAddr<CC::WeakDeviceId, BC>,
887    make_demux_id: impl Fn(
888        TcpSocketId<SockI, CC::WeakDeviceId, BC>,
889    ) -> WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
890    marks: &Marks,
891) -> ListenerIncomingSegmentDisposition<PrimaryRc<SockI, CC::WeakDeviceId, BC>>
892where
893    SockI: DualStackIpExt,
894    WireI: DualStackIpExt,
895    BC: TcpBindingsContext
896        + BufferProvider<
897            BC::ReceiveBuffer,
898            BC::SendBuffer,
899            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
900            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
901        >,
902    CC: TcpContext<SockI, BC>,
903    DC: TransportIpContext<WireI, BC, DeviceId = CC::DeviceId, WeakDeviceId = CC::WeakDeviceId>
904        + DeviceIpSocketHandler<WireI, BC>
905        + TcpDemuxContext<WireI, CC::WeakDeviceId, BC>
906        + TcpCounterContext<SockI, CC::WeakDeviceId, BC>,
907{
908    let (maybe_listener, sharing, listener_addr) = assert_matches!(
909        socket_state,
910        TcpSocketStateInner::Bound(BoundSocketState::Listener(l)) => l,
911        "invalid socket ID"
912    );
913
914    let ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) } =
915        incoming_addrs;
916
917    let Listener { accept_queue, backlog, buffer_sizes, socket_options } = match maybe_listener {
918        MaybeListener::Bound(_bound) => {
919            // If the socket is only bound, but not listening.
920            return ListenerIncomingSegmentDisposition::NoMatchingSocket;
921        }
922        MaybeListener::Listener(listener) => listener,
923    };
924
925    // Note that this checks happens at the very beginning, before we try to
926    // reuse the connection in TIME-WAIT, this is because we need to store the
927    // reused connection in the accept queue so we have to respect its limit.
928    if accept_queue.len() == backlog.get() {
929        core_ctx.increment_both(listener_id, |counters| &counters.listener_queue_overflow);
930        core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
931        debug!("incoming SYN dropped because of the full backlog of the listener");
932        return ListenerIncomingSegmentDisposition::FoundSocket;
933    }
934
935    // Ensure that if the remote address requires a zone, we propagate that to
936    // the address for the connected socket.
937    let bound_device = listener_addr.as_ref().clone();
938    let bound_device = if remote_ip.as_ref().must_have_zone() {
939        Some(bound_device.map_or(EitherDeviceId::Strong(incoming_device), EitherDeviceId::Weak))
940    } else {
941        bound_device.map(EitherDeviceId::Weak)
942    };
943
944    let ip_options = TcpIpSockOptions { marks: *marks, ..socket_options.ip_options };
945    let socket_options = SocketOptions { ip_options, ..*socket_options };
946
947    let bound_device = bound_device.as_ref().map(|d| d.as_ref());
948    let ip_sock = match core_ctx.new_ip_socket(
949        bindings_ctx,
950        bound_device,
951        IpDeviceAddr::new_from_socket_ip_addr(local_ip),
952        remote_ip,
953        IpProto::Tcp.into(),
954        &ip_options,
955    ) {
956        Ok(ip_sock) => ip_sock,
957        err @ Err(IpSockCreationError::Route(_)) => {
958            core_ctx.increment_both(listener_id, |counters| &counters.passive_open_no_route_errors);
959            core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
960            debug!("cannot construct an ip socket to the SYN originator: {:?}, ignoring", err);
961            return ListenerIncomingSegmentDisposition::NoMatchingSocket;
962        }
963    };
964
965    let isn = isn.generate(
966        bindings_ctx.now(),
967        (ip_sock.local_ip().clone().into(), local_port),
968        (ip_sock.remote_ip().clone(), remote_port),
969    );
970    let device_mms = match core_ctx.get_mms(bindings_ctx, &ip_sock, &socket_options.ip_options) {
971        Ok(mms) => mms,
972        Err(err) => {
973            // If we cannot find a device or the device's MTU is too small,
974            // there isn't much we can do here since sending a RST back is
975            // impossible, we just need to silent drop the segment.
976            error!("Cannot find a device with large enough MTU for the connection");
977            core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
978            match err {
979                MmsError::NoDevice(_) | MmsError::MTUTooSmall(_) => {
980                    return ListenerIncomingSegmentDisposition::FoundSocket;
981                }
982            }
983        }
984    };
985    let Some(device_mss) = Mss::from_mms(device_mms) else {
986        return ListenerIncomingSegmentDisposition::FoundSocket;
987    };
988
989    let mut state = State::Listen(Closed::<Initial>::listen(
990        isn,
991        buffer_sizes.clone(),
992        device_mss,
993        Mss::default::<WireI>(),
994        socket_options.user_timeout,
995    ));
996
997    // Prepare a reply to be sent out.
998    //
999    // We might end up discarding the reply in case we can't instantiate this
1000    // new connection.
1001    let result = state.on_segment::<_, BC>(
1002        // NB: This is a bit of a lie, we're passing the listener ID to process
1003        // the first segment because we don't have an ID allocated yet. This is
1004        // okay because the state machine ID is only for debugging purposes.
1005        &listener_id.either(),
1006        &TcpCountersRefs::from_ctx(core_ctx, listener_id),
1007        incoming,
1008        bindings_ctx.now(),
1009        &SocketOptions::default(),
1010        false, /* defunct */
1011    );
1012    let reply = assert_matches!(
1013        result,
1014        (reply, None, /* data_acked */ _, NewlyClosed::No /* can't become closed */) => reply
1015    );
1016
1017    let result = if matches!(state, State::SynRcvd(_)) {
1018        let poll_send_at = state.poll_send_at().expect("no retrans timer");
1019        let ListenerSharingState { sharing, listening: _ } = *sharing;
1020        let bound_device = ip_sock.device().cloned();
1021
1022        let addr = ConnAddr {
1023            ip: ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) },
1024            device: bound_device,
1025        };
1026
1027        let new_socket = core_ctx.with_demux_mut(|DemuxState { socketmap, .. }| {
1028            // If we're reusing an entry, remove it from the demux before
1029            // proceeding.
1030            //
1031            // We could just reuse the old allocation for the new connection but
1032            // because of the restrictions on the socket map data structure (for
1033            // good reasons), we can't update the sharing info unconditionally.
1034            // So here we just remove the old connection and create a new one.
1035            // Also this approach has the benefit of not accidentally persisting
1036            // the old state that we don't want.
1037            if let Some((tw_reuse, conn_addr)) = tw_reuse {
1038                match socketmap.conns_mut().remove(tw_reuse, &conn_addr) {
1039                    Ok(()) => {
1040                        // NB: We're removing the tw_reuse connection from the
1041                        // demux here, but not canceling its timer. The timer is
1042                        // canceled via drop when we destroy the socket. Special
1043                        // care is taken when handling timers in the time wait
1044                        // state to account for this.
1045                    }
1046                    Err(NotFoundError) => {
1047                        // We could lose a race trying to reuse the tw_reuse
1048                        // socket, so we just accept the loss and be happy that
1049                        // the conn_addr we want to use is free.
1050                    }
1051                }
1052            }
1053
1054            // Try to create and add the new socket to the demux.
1055            let accept_queue_clone = accept_queue.clone();
1056            let ip_sock = ip_sock.clone();
1057            let bindings_ctx_moved = &mut *bindings_ctx;
1058            match socketmap.conns_mut().try_insert_with(addr, sharing, move |addr, sharing| {
1059                let conn = make_connection(
1060                    Connection {
1061                        accept_queue: Some(accept_queue_clone),
1062                        state,
1063                        ip_sock,
1064                        defunct: false,
1065                        socket_options,
1066                        soft_error: None,
1067                        handshake_status: HandshakeStatus::Pending,
1068                    },
1069                    addr,
1070                );
1071
1072                let (id, primary) = TcpSocketId::new_cyclic(|weak| {
1073                    let mut timer = CC::new_timer(bindings_ctx_moved, weak);
1074                    // Schedule the timer here because we can't acquire the lock
1075                    // later. This only runs when inserting into the demux
1076                    // succeeds so it's okay.
1077                    assert_eq!(
1078                        bindings_ctx_moved.schedule_timer_instant(poll_send_at, &mut timer),
1079                        None
1080                    );
1081                    TcpSocketStateInner::Bound(BoundSocketState::Connected { conn, sharing, timer })
1082                });
1083                (make_demux_id(id.clone()), (primary, id))
1084            }) {
1085                Ok((_entry, (primary, id))) => {
1086                    // Make sure the new socket is in the pending accept queue
1087                    // before we release the demux lock.
1088                    accept_queue.push_pending(id);
1089                    Some(primary)
1090                }
1091                Err((e, _sharing_state)) => {
1092                    // The only error we accept here is if the entry exists
1093                    // fully, any indirect conflicts are unexpected because we
1094                    // know the listener is still alive and installed in the
1095                    // demux.
1096                    assert_matches!(e, InsertError::Exists);
1097                    // If we fail to insert it means we lost a race and this
1098                    // packet is destined to a connection that is already
1099                    // established. In that case we should tell the demux code
1100                    // to retry demuxing it all over again.
1101                    None
1102                }
1103            }
1104        });
1105
1106        match new_socket {
1107            Some(new_socket) => ListenerIncomingSegmentDisposition::NewConnection(new_socket),
1108            None => {
1109                // We didn't create a new connection, short circuit early and
1110                // don't send out the pending segment.
1111                core_ctx
1112                    .increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1113                return ListenerIncomingSegmentDisposition::ConflictingConnection;
1114            }
1115        }
1116    } else {
1117        // We found a valid listener for the segment even if the connection
1118        // state is not a newly pending connection.
1119        ListenerIncomingSegmentDisposition::FoundSocket
1120    };
1121
1122    // We can send a reply now if we got here.
1123    if let Some(seg) = reply {
1124        socket::send_tcp_segment(
1125            core_ctx,
1126            bindings_ctx,
1127            Some(&listener_id),
1128            Some(&ip_sock),
1129            incoming_addrs,
1130            seg.into_empty(),
1131            &socket_options.ip_options,
1132        );
1133    }
1134
1135    result
1136}
1137
1138pub(super) fn tcp_serialize_segment<'a, I, P>(
1139    header: &'a SegmentHeader,
1140    data: P,
1141    conn_addr: ConnIpAddr<I::Addr, NonZeroU16, NonZeroU16>,
1142) -> impl TransportPacketSerializer<I, Buffer = EmptyBuf> + Debug + 'a
1143where
1144    I: IpExt,
1145    P: InnerPacketBuilder + Debug + Payload + 'a,
1146{
1147    let SegmentHeader { seq, ack, wnd, control, options, push } = header;
1148    let ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) } = conn_addr;
1149    let mut builder = TcpSegmentBuilder::new(
1150        local_ip.addr(),
1151        remote_ip.addr(),
1152        local_port,
1153        remote_port,
1154        (*seq).into(),
1155        ack.map(Into::into),
1156        u16::from(*wnd),
1157    );
1158    builder.psh(*push);
1159    match control {
1160        None => {}
1161        Some(Control::SYN) => builder.syn(true),
1162        Some(Control::FIN) => builder.fin(true),
1163        Some(Control::RST) => builder.rst(true),
1164    }
1165    data.into_serializer().encapsulate(
1166        TcpSegmentBuilderWithOptions::new(builder, options.iter()).unwrap_or_else(
1167            |TcpOptionsTooLongError| {
1168                panic!("Too many TCP options");
1169            },
1170        ),
1171    )
1172}
1173
1174#[cfg(test)]
1175mod test {
1176    use ip_test_macro::ip_test;
1177    use netstack3_base::testutil::TestIpExt;
1178    use netstack3_base::{HandshakeOptions, UnscaledWindowSize};
1179    use packet::ParseBuffer as _;
1180    use test_case::test_case;
1181
1182    use super::*;
1183
1184    const SEQ: SeqNum = SeqNum::new(12345);
1185    const ACK: SeqNum = SeqNum::new(67890);
1186    const FAKE_DATA: &'static [u8] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 0];
1187
1188    #[ip_test(I)]
1189    #[test_case(
1190        Segment::syn(SEQ, UnscaledWindowSize::from(u16::MAX),
1191        HandshakeOptions::default().into()), &[]
1192        ; "syn")]
1193    #[test_case(
1194        Segment::syn(SEQ, UnscaledWindowSize::from(u16::MAX),
1195        HandshakeOptions {
1196            mss: Some(Mss(NonZeroU16::new(1440 as u16).unwrap())),
1197            ..Default::default() }.into()), &[]
1198            ; "syn with mss")]
1199    #[test_case(Segment::ack(SEQ, ACK, UnscaledWindowSize::from(u16::MAX)), &[]; "ack")]
1200    #[test_case(Segment::with_fake_data(SEQ, ACK, FAKE_DATA), FAKE_DATA; "data")]
1201    #[test_case(Segment::new_assert_no_discard(SegmentHeader {
1202            seq: SEQ,
1203            ack: Some(ACK),
1204            push: true,
1205            wnd: UnscaledWindowSize::from(u16::MAX),
1206            ..Default::default()
1207        },
1208        FAKE_DATA
1209    ), FAKE_DATA; "push")]
1210    fn tcp_serialize_segment<I: TestIpExt>(segment: Segment<&[u8]>, expected_body: &[u8]) {
1211        const SOURCE_PORT: NonZeroU16 = NonZeroU16::new(1111).unwrap();
1212        const DEST_PORT: NonZeroU16 = NonZeroU16::new(2222).unwrap();
1213
1214        let (header, data) = segment.into_parts();
1215        let serializer = super::tcp_serialize_segment::<I, _>(
1216            &header,
1217            data,
1218            ConnIpAddr {
1219                local: (SocketIpAddr::try_from(I::TEST_ADDRS.local_ip).unwrap(), SOURCE_PORT),
1220                remote: (SocketIpAddr::try_from(I::TEST_ADDRS.remote_ip).unwrap(), DEST_PORT),
1221            },
1222        );
1223
1224        let mut serialized = serializer.serialize_vec_outer().unwrap().unwrap_b();
1225        let parsed_segment = serialized
1226            .parse_with::<_, TcpSegment<_>>(TcpParseArgs::new(
1227                *I::TEST_ADDRS.remote_ip,
1228                *I::TEST_ADDRS.local_ip,
1229            ))
1230            .expect("is valid segment");
1231
1232        assert_eq!(parsed_segment.src_port(), SOURCE_PORT);
1233        assert_eq!(parsed_segment.dst_port(), DEST_PORT);
1234        assert_eq!(parsed_segment.seq_num(), u32::from(SEQ));
1235        assert_eq!(parsed_segment.psh(), header.push);
1236        assert_eq!(
1237            UnscaledWindowSize::from(parsed_segment.window_size()),
1238            UnscaledWindowSize::from(u16::MAX)
1239        );
1240        let options = header.options;
1241        assert_eq!(options.iter().count(), parsed_segment.iter_options().count());
1242        for (orig, parsed) in options.iter().zip(parsed_segment.iter_options()) {
1243            assert_eq!(orig, parsed);
1244        }
1245        assert_eq!(parsed_segment.into_body(), expected_body);
1246    }
1247}