netstack3_tcp/socket/
demux.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Defines the entry point of TCP packets, by directing them into the correct
6//! state machine.
7
8use core::fmt::Debug;
9use core::num::NonZeroU16;
10
11use assert_matches::assert_matches;
12use log::{debug, error, warn};
13use net_types::ip::Ip;
14use net_types::{SpecifiedAddr, Witness as _};
15use netstack3_base::socket::{
16    AddrIsMappedError, AddrVec, AddrVecIter, ConnAddr, ConnIpAddr, InsertError, ListenerAddr,
17    ListenerIpAddr, SocketCookie, SocketIpAddr, SocketIpAddrExt as _,
18};
19use netstack3_base::{
20    BidirectionalConverter as _, Control, CounterContext, CtxPair, EitherDeviceId, IpDeviceAddr,
21    Marks, Mss, NotFoundError, Payload, Segment, SegmentHeader, SeqNum, StrongDeviceIdentifier,
22    VerifiedTcpSegment, WeakDeviceIdentifier,
23};
24use netstack3_filter::{
25    FilterIpExt, SocketIngressFilterResult, SocketOpsFilter, TransportPacketSerializer,
26};
27use netstack3_hashmap::hash_map;
28use netstack3_ip::socket::{IpSockCreationError, IpSocketArgs, MmsError};
29use netstack3_ip::{
30    IpHeaderInfo, IpTransportContext, LocalDeliveryPacketInfo, ReceiveIpPacketMeta,
31    TransportIpContext, TransportReceiveError,
32};
33use netstack3_trace::trace_duration;
34use packet::{
35    BufferMut, BufferView as _, EmptyBuf, FragmentedByteSlice, InnerPacketBuilder, PacketBuilder,
36};
37use packet_formats::error::ParseError;
38use packet_formats::ip::IpProto;
39use packet_formats::tcp::{
40    TcpFlowAndSeqNum, TcpOptionsTooLongError, TcpParseArgs, TcpSegment, TcpSegmentBuilder,
41    TcpSegmentBuilderWithOptions,
42};
43
44use crate::internal::base::{BufferSizes, ConnectionError, SocketOptions, TcpIpSockOptions};
45use crate::internal::counters::{
46    self, TcpCounterContext, TcpCountersRefs, TcpCountersWithoutSocket,
47};
48use crate::internal::socket::generators::{IsnGenerator, TimestampOffsetGenerator};
49use crate::internal::socket::{
50    self, AsThisStack as _, BoundSocketState, Connection, CoreTxMetadataContext, DemuxState,
51    DeviceIpSocketHandler, DoSendLimit, DualStackDemuxIdConverter as _, DualStackIpExt,
52    EitherStack, HandshakeStatus, Listener, ListenerAddrState, ListenerSharingState,
53    MaybeDualStack, MaybeListener, PrimaryRc, TcpApi, TcpBindingsContext, TcpBindingsTypes,
54    TcpContext, TcpDemuxContext, TcpDualStackContext, TcpIpTransportContext, TcpPortSpec,
55    TcpSocketId, TcpSocketSetEntry, TcpSocketState, TcpSocketStateInner, TcpSocketTxMetadata,
56};
57use crate::internal::state::{
58    BufferProvider, Closed, DataAcked, Initial, NewlyClosed, State, TimeWait,
59};
60
61impl<BT: TcpBindingsTypes> BufferProvider<BT::ReceiveBuffer, BT::SendBuffer> for BT {
62    type ActiveOpen = BT::ListenerNotifierOrProvidedBuffers;
63
64    type PassiveOpen = BT::ReturnedBuffers;
65
66    fn new_passive_open_buffers(
67        buffer_sizes: BufferSizes,
68    ) -> (BT::ReceiveBuffer, BT::SendBuffer, Self::PassiveOpen) {
69        BT::new_passive_open_buffers(buffer_sizes)
70    }
71}
72
73impl<I, BC, CC> IpTransportContext<I, BC, CC> for TcpIpTransportContext
74where
75    I: DualStackIpExt,
76    BC: TcpBindingsContext<CC::DeviceId>
77        + BufferProvider<
78            BC::ReceiveBuffer,
79            BC::SendBuffer,
80            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
81            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
82        >,
83    CC: TcpContext<I, BC> + TcpContext<I::OtherVersion, BC>,
84{
85    fn receive_icmp_error(
86        core_ctx: &mut CC,
87        bindings_ctx: &mut BC,
88        _device: &CC::DeviceId,
89        original_src_ip: Option<SpecifiedAddr<I::Addr>>,
90        original_dst_ip: SpecifiedAddr<I::Addr>,
91        mut original_body: &[u8],
92        err: I::ErrorCode,
93    ) {
94        let mut buffer = &mut original_body;
95        let Some(flow_and_seqnum) = buffer.take_obj_front::<TcpFlowAndSeqNum>() else {
96            error!("received an ICMP error but its body is less than 8 bytes");
97            return;
98        };
99
100        let Some(original_src_ip) = original_src_ip else { return };
101        let Some(original_src_port) = NonZeroU16::new(flow_and_seqnum.src_port()) else { return };
102        let Some(original_dst_port) = NonZeroU16::new(flow_and_seqnum.dst_port()) else { return };
103        let original_seqnum = SeqNum::new(flow_and_seqnum.sequence_num());
104
105        TcpApi::<I, _>::new(CtxPair { core_ctx, bindings_ctx }).on_icmp_error(
106            original_src_ip,
107            original_dst_ip,
108            original_src_port,
109            original_dst_port,
110            original_seqnum,
111            err.into(),
112        );
113    }
114
115    fn receive_ip_packet<B: BufferMut, H: IpHeaderInfo<I>>(
116        core_ctx: &mut CC,
117        bindings_ctx: &mut BC,
118        device: &CC::DeviceId,
119        remote_ip: I::RecvSrcAddr,
120        local_ip: SpecifiedAddr<I::Addr>,
121        mut buffer: B,
122        info: &LocalDeliveryPacketInfo<I, H>,
123    ) -> Result<(), (B, TransportReceiveError)> {
124        let LocalDeliveryPacketInfo { meta, header_info, marks } = info;
125        let ReceiveIpPacketMeta { broadcast, transparent_override } = meta;
126        if let Some(delivery) = transparent_override {
127            warn!(
128                "TODO(https://fxbug.dev/337009139): transparent proxy not supported for TCP \
129                sockets; will not override dispatch to perform local delivery to {delivery:?}"
130            );
131        }
132
133        if broadcast.is_some() {
134            CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
135                .invalid_ip_addrs_received
136                .increment();
137            debug!("tcp: dropping broadcast TCP packet");
138            return Ok(());
139        }
140
141        let remote_ip = match SpecifiedAddr::new(remote_ip.into_addr()) {
142            None => {
143                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
144                    .invalid_ip_addrs_received
145                    .increment();
146                debug!("tcp: source address unspecified, dropping the packet");
147                return Ok(());
148            }
149            Some(src_ip) => src_ip,
150        };
151        let remote_ip: SocketIpAddr<_> = match remote_ip.try_into() {
152            Ok(remote_ip) => remote_ip,
153            Err(AddrIsMappedError {}) => {
154                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
155                    .invalid_ip_addrs_received
156                    .increment();
157                debug!("tcp: source address is mapped (ipv4-mapped-ipv6), dropping the packet");
158                return Ok(());
159            }
160        };
161        let local_ip: SocketIpAddr<_> = match local_ip.try_into() {
162            Ok(local_ip) => local_ip,
163            Err(AddrIsMappedError {}) => {
164                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
165                    .invalid_ip_addrs_received
166                    .increment();
167                debug!("tcp: local address is mapped (ipv4-mapped-ipv6), dropping the packet");
168                return Ok(());
169            }
170        };
171        let packet = match buffer
172            .parse_with::<_, TcpSegment<_>>(TcpParseArgs::new(remote_ip.addr(), local_ip.addr()))
173        {
174            Ok(packet) => packet,
175            Err(err) => {
176                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
177                    .invalid_segments_received
178                    .increment();
179                debug!("tcp: failed parsing incoming packet {:?}", err);
180                match err {
181                    ParseError::Checksum => {
182                        CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
183                            .checksum_errors
184                            .increment();
185                    }
186                    ParseError::NotSupported | ParseError::NotExpected | ParseError::Format => {}
187                }
188                return Ok(());
189            }
190        };
191        let local_port = packet.dst_port();
192        let remote_port = packet.src_port();
193        let incoming = match VerifiedTcpSegment::try_from(packet) {
194            Ok(segment) => segment,
195            Err(err) => {
196                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
197                    .invalid_segments_received
198                    .increment();
199                debug!("tcp: malformed segment {:?}", err);
200                return Ok(());
201            }
202        };
203        let conn_addr =
204            ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) };
205
206        CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
207            .valid_segments_received
208            .increment();
209        handle_incoming_packet::<I, _, _, _>(
210            core_ctx,
211            bindings_ctx,
212            conn_addr,
213            device,
214            header_info,
215            &incoming,
216            marks,
217        );
218        Ok(())
219    }
220}
221
222fn handle_incoming_packet<WireI, BC, CC, H>(
223    core_ctx: &mut CC,
224    bindings_ctx: &mut BC,
225    conn_addr: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
226    incoming_device: &CC::DeviceId,
227    header_info: &H,
228    incoming: &VerifiedTcpSegment<'_>,
229    marks: &Marks,
230) where
231    WireI: DualStackIpExt,
232    BC: TcpBindingsContext<CC::DeviceId>
233        + BufferProvider<
234            BC::ReceiveBuffer,
235            BC::SendBuffer,
236            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
237            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
238        >,
239    CC: TcpContext<WireI, BC> + TcpContext<WireI::OtherVersion, BC>,
240    H: IpHeaderInfo<WireI>,
241{
242    trace_duration!(c"tcp::handle_incoming_packet");
243    let mut tw_reuse = None;
244
245    let mut addrs_to_search = AddrVecIter::<WireI, CC::WeakDeviceId, TcpPortSpec>::with_device(
246        conn_addr.into(),
247        incoming_device.downgrade(),
248    );
249
250    enum FoundSocket<S> {
251        // Typically holds the demux ID of the found socket, but may hold
252        // `None` if the found socket was destroyed as a result of the segment.
253        Yes(Option<S>),
254        No,
255    }
256    let found_socket = loop {
257        let sock = core_ctx
258            .with_demux(|demux| lookup_socket::<WireI, CC, BC>(demux, &mut addrs_to_search));
259        match sock {
260            None => break FoundSocket::No,
261            Some(SocketLookupResult::Connection(demux_conn_id, conn_addr)) => {
262                // It is not possible to have two same connections that
263                // share the same local and remote IPs and ports.
264                assert_eq!(tw_reuse, None);
265                let disposition = match WireI::as_dual_stack_ip_socket(&demux_conn_id) {
266                    EitherStack::ThisStack(conn_id) => {
267                        try_handle_incoming_for_connection_dual_stack(
268                            core_ctx,
269                            bindings_ctx,
270                            conn_id,
271                            incoming_device,
272                            header_info,
273                            &incoming,
274                        )
275                    }
276                    EitherStack::OtherStack(conn_id) => {
277                        try_handle_incoming_for_connection_dual_stack(
278                            core_ctx,
279                            bindings_ctx,
280                            conn_id,
281                            incoming_device,
282                            header_info,
283                            &incoming,
284                        )
285                    }
286                };
287                match disposition {
288                    ConnectionIncomingSegmentDisposition::Destroy => {
289                        WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, demux_conn_id);
290                        break FoundSocket::Yes(None);
291                    }
292                    ConnectionIncomingSegmentDisposition::FoundSocket
293                    | ConnectionIncomingSegmentDisposition::Filtered => {
294                        break FoundSocket::Yes(Some(demux_conn_id));
295                    }
296                    ConnectionIncomingSegmentDisposition::ReuseCandidateForListener => {
297                        tw_reuse = Some((demux_conn_id, conn_addr));
298                    }
299                }
300            }
301            Some(SocketLookupResult::Listener((demux_listener_id, _listener_addr))) => {
302                match WireI::as_dual_stack_ip_socket(&demux_listener_id) {
303                    EitherStack::ThisStack(listener_id) => {
304                        let disposition = core_ctx.with_socket_mut_generators_transport_demux(
305                            &listener_id,
306                            |core_ctx, socket_state, isn, timestamp_offset| match core_ctx {
307                                MaybeDualStack::NotDualStack((core_ctx, converter)) => {
308                                    try_handle_incoming_for_listener::<WireI, WireI, CC, BC, _, _>(
309                                        core_ctx,
310                                        bindings_ctx,
311                                        &listener_id,
312                                        isn,
313                                        timestamp_offset,
314                                        socket_state,
315                                        header_info,
316                                        incoming,
317                                        conn_addr,
318                                        incoming_device,
319                                        &mut tw_reuse,
320                                        move |conn, addr| converter.convert_back((conn, addr)),
321                                        WireI::into_demux_socket_id,
322                                        marks,
323                                    )
324                                }
325                                MaybeDualStack::DualStack((core_ctx, converter)) => {
326                                    try_handle_incoming_for_listener::<_, _, CC, BC, _, _>(
327                                        core_ctx,
328                                        bindings_ctx,
329                                        &listener_id,
330                                        isn,
331                                        timestamp_offset,
332                                        socket_state,
333                                        header_info,
334                                        incoming,
335                                        conn_addr,
336                                        incoming_device,
337                                        &mut tw_reuse,
338                                        move |conn, addr| {
339                                            converter
340                                                .convert_back(EitherStack::ThisStack((conn, addr)))
341                                        },
342                                        WireI::into_demux_socket_id,
343                                        marks,
344                                    )
345                                }
346                            },
347                        );
348                        if try_handle_listener_incoming_disposition(
349                            core_ctx,
350                            bindings_ctx,
351                            disposition,
352                            &demux_listener_id,
353                            &mut tw_reuse,
354                            &mut addrs_to_search,
355                            conn_addr,
356                            incoming_device,
357                        ) {
358                            break FoundSocket::Yes(Some(demux_listener_id));
359                        }
360                    }
361                    EitherStack::OtherStack(listener_id) => {
362                        let disposition = core_ctx.with_socket_mut_generators_transport_demux(
363                            &listener_id,
364                            |core_ctx, socket_state, isn, timestamp_offset| {
365                                match core_ctx {
366                                    MaybeDualStack::NotDualStack((_core_ctx, _converter)) => {
367                                        // TODO(https://issues.fuchsia.dev/316408184):
368                                        // Remove this unreachable!.
369                                        unreachable!("OtherStack socket ID with non dual stack");
370                                    }
371                                    MaybeDualStack::DualStack((core_ctx, converter)) => {
372                                        let other_demux_id_converter =
373                                            core_ctx.other_demux_id_converter();
374                                        try_handle_incoming_for_listener::<_, _, CC, BC, _, _>(
375                                            core_ctx,
376                                            bindings_ctx,
377                                            &listener_id,
378                                            isn,
379                                            timestamp_offset,
380                                            socket_state,
381                                            header_info,
382                                            incoming,
383                                            conn_addr,
384                                            incoming_device,
385                                            &mut tw_reuse,
386                                            move |conn, addr| {
387                                                converter.convert_back(EitherStack::OtherStack((
388                                                    conn, addr,
389                                                )))
390                                            },
391                                            move |id| other_demux_id_converter.convert(id),
392                                            marks,
393                                        )
394                                    }
395                                }
396                            },
397                        );
398                        if try_handle_listener_incoming_disposition::<_, _, CC, BC, _>(
399                            core_ctx,
400                            bindings_ctx,
401                            disposition,
402                            &demux_listener_id,
403                            &mut tw_reuse,
404                            &mut addrs_to_search,
405                            conn_addr,
406                            incoming_device,
407                        ) {
408                            break FoundSocket::Yes(Some(demux_listener_id));
409                        }
410                    }
411                };
412            }
413        }
414    };
415
416    let demux_id = match found_socket {
417        FoundSocket::No => {
418            CounterContext::<TcpCountersWithoutSocket<WireI>>::counters(core_ctx)
419                .received_segments_no_dispatch
420                .increment();
421
422            // There is no existing TCP state, pretend it is closed
423            // and generate a RST if needed.
424            // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-21):
425            // CLOSED is fictional because it represents the state when
426            // there is no TCB, and therefore, no connection.
427            if let Some(seg) =
428                (Closed { reason: None::<Option<ConnectionError>> }.on_segment(&incoming.into()))
429            {
430                socket::send_tcp_segment::<WireI, WireI, _, _, _>(
431                    core_ctx,
432                    bindings_ctx,
433                    None,
434                    None,
435                    conn_addr,
436                    seg.into_empty(),
437                    &TcpIpSockOptions { marks: *marks },
438                );
439            }
440            None
441        }
442        FoundSocket::Yes(demux_id) => {
443            counters::increment_counter_with_optional_demux_id::<WireI, _, _, _, _>(
444                core_ctx,
445                demux_id.as_ref(),
446                |c| &c.received_segments_dispatched,
447            );
448            demux_id
449        }
450    };
451
452    if let Some(control) = incoming.control() {
453        counters::increment_counter_with_optional_demux_id::<WireI, _, _, _, _>(
454            core_ctx,
455            demux_id.as_ref(),
456            |c| match control {
457                Control::RST => &c.resets_received,
458                Control::SYN => &c.syns_received,
459                Control::FIN => &c.fins_received,
460            },
461        )
462    }
463}
464
465enum SocketLookupResult<I: DualStackIpExt, D: WeakDeviceIdentifier, BT: TcpBindingsTypes> {
466    Connection(I::DemuxSocketId<D, BT>, ConnAddr<ConnIpAddr<I::Addr, NonZeroU16, NonZeroU16>, D>),
467    Listener((I::DemuxSocketId<D, BT>, ListenerAddr<ListenerIpAddr<I::Addr, NonZeroU16>, D>)),
468}
469
470fn lookup_socket<I, CC, BC>(
471    DemuxState { socketmap, .. }: &DemuxState<I, CC::WeakDeviceId, BC>,
472    addrs_to_search: &mut AddrVecIter<I, CC::WeakDeviceId, TcpPortSpec>,
473) -> Option<SocketLookupResult<I, CC::WeakDeviceId, BC>>
474where
475    I: DualStackIpExt,
476    BC: TcpBindingsContext<CC::DeviceId>,
477    CC: TcpContext<I, BC>,
478{
479    addrs_to_search.find_map(|addr| {
480        match addr {
481            // Connections are always searched before listeners because they
482            // are more specific.
483            AddrVec::Conn(conn_addr) => {
484                socketmap.conns().get_by_addr(&conn_addr).map(|conn_addr_state| {
485                    SocketLookupResult::Connection(conn_addr_state.id(), conn_addr)
486                })
487            }
488            AddrVec::Listen(listener_addr) => {
489                // If we have a listener and the incoming segment is a SYN, we
490                // allocate a new connection entry in the demuxer.
491                // TODO(https://fxbug.dev/42052878): Support SYN cookies.
492
493                socketmap
494                    .listeners()
495                    .get_by_addr(&listener_addr)
496                    .and_then(|addr_state| match addr_state {
497                        ListenerAddrState::ExclusiveListener(id) => Some(id.clone()),
498                        ListenerAddrState::Shared { listener: Some(id), bound: _ } => {
499                            Some(id.clone())
500                        }
501                        ListenerAddrState::ExclusiveBound(_)
502                        | ListenerAddrState::Shared { listener: None, bound: _ } => None,
503                    })
504                    .map(|id| SocketLookupResult::Listener((id, listener_addr)))
505            }
506        }
507    })
508}
509
510#[derive(PartialEq, Eq)]
511enum ConnectionIncomingSegmentDisposition {
512    FoundSocket,
513    Filtered,
514    ReuseCandidateForListener,
515    Destroy,
516}
517
518enum ListenerIncomingSegmentDisposition<S> {
519    FoundSocket,
520    Filtered,
521    ConflictingConnection,
522    NoMatchingSocket,
523    NewConnection(S),
524}
525
526fn try_handle_incoming_for_connection_dual_stack<SockI, WireI, CC, BC, H>(
527    core_ctx: &mut CC,
528    bindings_ctx: &mut BC,
529    conn_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
530    incoming_device: &CC::DeviceId,
531    header_info: &H,
532    incoming: &VerifiedTcpSegment<'_>,
533) -> ConnectionIncomingSegmentDisposition
534where
535    SockI: DualStackIpExt,
536    WireI: Ip,
537    BC: TcpBindingsContext<CC::DeviceId>
538        + BufferProvider<
539            BC::ReceiveBuffer,
540            BC::SendBuffer,
541            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
542            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
543        >,
544    CC: TcpContext<SockI, BC>,
545    H: IpHeaderInfo<WireI>,
546{
547    core_ctx.with_socket_mut_transport_demux(conn_id, |core_ctx, socket_state| {
548        let TcpSocketState { socket_state, ip_options: _, socket_options } = socket_state;
549
550        match run_socket_ingress_filter(
551            bindings_ctx,
552            incoming_device,
553            conn_id.socket_cookie(),
554            socket_options,
555            header_info,
556            incoming.tcp_segment(),
557        ) {
558            SocketIngressFilterResult::Accept => (),
559            SocketIngressFilterResult::Drop => {
560                return ConnectionIncomingSegmentDisposition::Filtered;
561            }
562        }
563
564        let (conn_and_addr, timer) = assert_matches!(
565            socket_state,
566            TcpSocketStateInner::Bound(BoundSocketState::Connected {
567                 conn, timer, sharing: _
568            }) => (conn , timer),
569            "invalid socket ID"
570        );
571        let this_or_other_stack = match core_ctx {
572            MaybeDualStack::DualStack((core_ctx, converter)) => {
573                match converter.convert(conn_and_addr) {
574                    EitherStack::ThisStack((conn, conn_addr)) => {
575                        // The socket belongs to the current stack, so we
576                        // want to deliver the segment to this stack.
577                        // Use `as_this_stack` to make the context types
578                        // match with the non-dual-stack case.
579                        EitherStack::ThisStack((
580                            core_ctx.as_this_stack(),
581                            conn,
582                            conn_addr,
583                            SockI::into_demux_socket_id(conn_id.clone()),
584                        ))
585                    }
586                    EitherStack::OtherStack((conn, conn_addr)) => {
587                        // We need to deliver from the other stack. i.e. we
588                        // need to deliver an IPv4 packet to the IPv6 stack.
589                        let demux_sock_id = core_ctx.into_other_demux_socket_id(conn_id.clone());
590                        EitherStack::OtherStack((core_ctx, conn, conn_addr, demux_sock_id))
591                    }
592                }
593            }
594            MaybeDualStack::NotDualStack((core_ctx, converter)) => {
595                let (conn, conn_addr) = converter.convert(conn_and_addr);
596                // Similar to the first case, we need deliver to this stack,
597                // but use `as_this_stack` to make the types match.
598                EitherStack::ThisStack((
599                    core_ctx.as_this_stack(),
600                    conn,
601                    conn_addr,
602                    SockI::into_demux_socket_id(conn_id.clone()),
603                ))
604            }
605        };
606
607        match this_or_other_stack {
608            EitherStack::ThisStack((core_ctx, conn, conn_addr, demux_conn_id)) => {
609                try_handle_incoming_for_connection::<_, _, CC, _, _>(
610                    core_ctx,
611                    bindings_ctx,
612                    conn_addr.clone(),
613                    conn_id,
614                    demux_conn_id,
615                    socket_options,
616                    conn,
617                    timer,
618                    incoming.into(),
619                )
620            }
621            EitherStack::OtherStack((core_ctx, conn, conn_addr, demux_conn_id)) => {
622                try_handle_incoming_for_connection::<_, _, CC, _, _>(
623                    core_ctx,
624                    bindings_ctx,
625                    conn_addr.clone(),
626                    conn_id,
627                    demux_conn_id,
628                    socket_options,
629                    conn,
630                    timer,
631                    incoming.into(),
632                )
633            }
634        }
635    })
636}
637
638/// Tries to handle the incoming segment by providing it to a connected socket.
639///
640/// Returns `FoundSocket` if the segment was handled; Otherwise,
641/// `ReuseCandidateForListener` will be returned if there is a defunct socket
642/// that is currently in TIME_WAIT, which is ready to be reused if there is an
643/// active listener listening on the port.
644fn try_handle_incoming_for_connection<SockI, WireI, CC, BC, DC>(
645    core_ctx: &mut DC,
646    bindings_ctx: &mut BC,
647    conn_addr: ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
648    conn_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
649    demux_id: WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
650    socket_options: &SocketOptions,
651    conn: &mut Connection<SockI, WireI, CC::WeakDeviceId, BC>,
652    timer: &mut BC::Timer,
653    incoming: Segment<&[u8]>,
654) -> ConnectionIncomingSegmentDisposition
655where
656    SockI: DualStackIpExt,
657    WireI: DualStackIpExt,
658    BC: TcpBindingsContext<CC::DeviceId>
659        + BufferProvider<
660            BC::ReceiveBuffer,
661            BC::SendBuffer,
662            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
663            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
664        >,
665    CC: TcpContext<SockI, BC>,
666    DC: TransportIpContext<WireI, BC, DeviceId = CC::DeviceId, WeakDeviceId = CC::WeakDeviceId>
667        + DeviceIpSocketHandler<SockI, BC>
668        + TcpDemuxContext<WireI, CC::WeakDeviceId, BC>
669        + TcpCounterContext<SockI, CC::WeakDeviceId, BC>
670        + CoreTxMetadataContext<TcpSocketTxMetadata<SockI, CC::WeakDeviceId, BC>, BC>,
671{
672    let Connection { accept_queue, state, ip_sock, defunct, soft_error: _, handshake_status } =
673        conn;
674
675    // Per RFC 9293 Section 3.6.1:
676    //   When a connection is closed actively, it MUST linger in the TIME-WAIT
677    //   state for a time 2xMSL (Maximum Segment Lifetime) (MUST-13). However,
678    //   it MAY accept a new SYN from the remote TCP endpoint to reopen the
679    //   connection directly from TIME-WAIT state (MAY-2), if it:
680    //
681    //   (1) assigns its initial sequence number for the new connection to be
682    //       larger than the largest sequence number it used on the previous
683    //       connection incarnation, and
684    //   (2) returns to TIME-WAIT state if the SYN turns out to be an old
685    //       duplicate.
686    if *defunct
687        && incoming.header().control == Some(Control::SYN)
688        && incoming.header().ack.is_none()
689    {
690        if let State::TimeWait(TimeWait { last_seq: _, closed_rcv, expiry: _ }) = state {
691            if !incoming.header().seq.before(closed_rcv.ack) {
692                return ConnectionIncomingSegmentDisposition::ReuseCandidateForListener;
693            }
694        }
695    }
696    let (reply, passive_open, data_acked, newly_closed) = state.on_segment::<_, BC>(
697        &conn_id.either(),
698        &TcpCountersRefs::from_ctx(core_ctx, conn_id),
699        incoming,
700        bindings_ctx.now(),
701        socket_options,
702        *defunct,
703    );
704
705    match data_acked {
706        DataAcked::Yes => {
707            core_ctx.confirm_reachable(bindings_ctx, ip_sock, &socket_options.ip_options)
708        }
709        DataAcked::No => {}
710    }
711
712    match state {
713        State::Listen(_) => {
714            unreachable!("has an invalid status: {:?}", conn.state)
715        }
716        State::SynSent(_) | State::SynRcvd(_) => {
717            assert_eq!(*handshake_status, HandshakeStatus::Pending)
718        }
719        State::Established(_)
720        | State::FinWait1(_)
721        | State::FinWait2(_)
722        | State::Closing(_)
723        | State::CloseWait(_)
724        | State::LastAck(_)
725        | State::TimeWait(_) => {
726            if handshake_status
727                .update_if_pending(HandshakeStatus::Completed { reported: accept_queue.is_some() })
728            {
729                core_ctx.confirm_reachable(bindings_ctx, ip_sock, &socket_options.ip_options);
730            }
731        }
732        State::Closed(Closed { reason }) => {
733            // We remove the socket from the socketmap and cancel the timers
734            // regardless of the socket being defunct or not. The justification
735            // is that CLOSED is a synthetic state and it means no connection
736            // exists, thus it should not exist in the demuxer.
737            //
738            // If the socket was already in the closed state we can assume it's
739            // no longer in the demux.
740            socket::handle_newly_closed(
741                core_ctx,
742                bindings_ctx,
743                newly_closed,
744                &demux_id,
745                &conn_addr,
746                timer,
747            );
748            if let Some(accept_queue) = accept_queue {
749                accept_queue.remove(&conn_id);
750                *defunct = true;
751            }
752            if *defunct {
753                // If the client has promised to not touch the socket again,
754                // we can destroy the socket finally.
755                return ConnectionIncomingSegmentDisposition::Destroy;
756            }
757            let _: bool = handshake_status.update_if_pending(match reason {
758                None => HandshakeStatus::Completed { reported: accept_queue.is_some() },
759                Some(_err) => HandshakeStatus::Aborted,
760            });
761        }
762    }
763
764    if let Some(seg) = reply {
765        socket::send_tcp_segment(
766            core_ctx,
767            bindings_ctx,
768            Some(conn_id),
769            Some(&ip_sock),
770            conn_addr.ip,
771            seg.into_empty(),
772            &socket_options.ip_options,
773        );
774    }
775
776    // Send any enqueued data, if there is any.
777    socket::do_send_inner_and_then_handle_newly_closed(
778        conn_id,
779        &demux_id,
780        socket_options,
781        conn,
782        DoSendLimit::MultipleSegments,
783        &conn_addr,
784        timer,
785        core_ctx,
786        bindings_ctx,
787    );
788
789    // Enqueue the connection to the associated listener
790    // socket's accept queue.
791    if let Some(passive_open) = passive_open {
792        let accept_queue = conn.accept_queue.as_ref().expect("no accept queue but passive open");
793        accept_queue.notify_ready(conn_id, passive_open);
794    }
795
796    // We found a valid connection for the segment.
797    ConnectionIncomingSegmentDisposition::FoundSocket
798}
799
800/// Responds to the disposition returned by [`try_handle_incoming_for_listener`].
801///
802/// Returns true if we have found the right socket and there is no need to
803/// continue the iteration for finding the next-best candidate.
804fn try_handle_listener_incoming_disposition<SockI, WireI, CC, BC, Addr>(
805    core_ctx: &mut CC,
806    bindings_ctx: &mut BC,
807    disposition: ListenerIncomingSegmentDisposition<PrimaryRc<SockI, CC::WeakDeviceId, BC>>,
808    demux_listener_id: &WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
809    tw_reuse: &mut Option<(WireI::DemuxSocketId<CC::WeakDeviceId, BC>, Addr)>,
810    addrs_to_search: &mut AddrVecIter<WireI, CC::WeakDeviceId, TcpPortSpec>,
811    conn_addr: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
812    incoming_device: &CC::DeviceId,
813) -> bool
814where
815    SockI: DualStackIpExt,
816    WireI: DualStackIpExt,
817    CC: TcpContext<SockI, BC> + TcpContext<WireI, BC> + TcpContext<WireI::OtherVersion, BC>,
818    BC: TcpBindingsContext<CC::DeviceId>,
819{
820    match disposition {
821        ListenerIncomingSegmentDisposition::FoundSocket => true,
822        ListenerIncomingSegmentDisposition::Filtered => true,
823        ListenerIncomingSegmentDisposition::ConflictingConnection => {
824            // We're about to rewind the lookup. If we got a
825            // conflicting connection it means tw_reuse has been
826            // removed from the demux state and we need to destroy
827            // it.
828            if let Some((tw_reuse, _)) = tw_reuse.take() {
829                WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, tw_reuse);
830            }
831
832            // Reset the address vector iterator and go again, a
833            // conflicting connection was found.
834            *addrs_to_search = AddrVecIter::<WireI, CC::WeakDeviceId, TcpPortSpec>::with_device(
835                conn_addr.into(),
836                incoming_device.downgrade(),
837            );
838            false
839        }
840        ListenerIncomingSegmentDisposition::NoMatchingSocket => false,
841        ListenerIncomingSegmentDisposition::NewConnection(primary) => {
842            // If we have a new connection, we need to add it to the
843            // set of all sockets.
844
845            // First things first, if we got here then tw_reuse is
846            // gone so we need to destroy it.
847            if let Some((tw_reuse, _)) = tw_reuse.take() {
848                WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, tw_reuse);
849            }
850
851            // Now put the new connection into the socket map.
852            //
853            // Note that there's a possible subtle race here where
854            // another thread could have already operated further on
855            // this connection and marked it for destruction which
856            // puts the entry in the DOA state, if we see that we
857            // must immediately destroy the socket after having put
858            // it in the map.
859            let id = TcpSocketId(PrimaryRc::clone_strong(&primary));
860            let to_destroy = core_ctx.with_all_sockets_mut(move |all_sockets| {
861                let insert_entry = TcpSocketSetEntry::Primary(primary);
862                match all_sockets.entry(id) {
863                    hash_map::Entry::Vacant(v) => {
864                        let _: &mut _ = v.insert(insert_entry);
865                        None
866                    }
867                    hash_map::Entry::Occupied(mut o) => {
868                        // We're holding on to the primary ref, the
869                        // only possible state here should be a DOA
870                        // entry.
871                        assert_matches!(
872                            core::mem::replace(o.get_mut(), insert_entry),
873                            TcpSocketSetEntry::DeadOnArrival
874                        );
875                        Some(o.key().clone())
876                    }
877                }
878            });
879            // NB: we're releasing and reaquiring the
880            // all_sockets_mut lock here for the convenience of not
881            // needing different versions of `destroy_socket`. This
882            // should be fine because the race this is solving
883            // should not be common. If we have correct thread
884            // attribution per flow it should effectively become
885            // impossible so we go for code simplicity here.
886            if let Some(to_destroy) = to_destroy {
887                socket::destroy_socket(core_ctx, bindings_ctx, to_destroy);
888            }
889            counters::increment_counter_for_demux_id::<WireI, _, _, _, _>(
890                core_ctx,
891                demux_listener_id,
892                |c| &c.passive_connection_openings,
893            );
894            true
895        }
896    }
897}
898
899/// Tries to handle an incoming segment by passing it to a listening socket.
900///
901/// Returns `FoundSocket` if the segment was handled, otherwise `NoMatchingSocket`.
902fn try_handle_incoming_for_listener<SockI, WireI, CC, BC, DC, H>(
903    core_ctx: &mut DC,
904    bindings_ctx: &mut BC,
905    listener_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
906    isn: &IsnGenerator<BC::Instant>,
907    timestamp_offset: &TimestampOffsetGenerator<BC::Instant>,
908    socket_state: &mut TcpSocketState<SockI, CC::WeakDeviceId, BC>,
909    header_info: &H,
910    incoming: &VerifiedTcpSegment<'_>,
911    incoming_addrs: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
912    incoming_device: &CC::DeviceId,
913    tw_reuse: &mut Option<(
914        WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
915        ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
916    )>,
917    make_connection: impl FnOnce(
918        Connection<SockI, WireI, CC::WeakDeviceId, BC>,
919        ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
920    ) -> SockI::ConnectionAndAddr<CC::WeakDeviceId, BC>,
921    make_demux_id: impl Fn(
922        TcpSocketId<SockI, CC::WeakDeviceId, BC>,
923    ) -> WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
924    marks: &Marks,
925) -> ListenerIncomingSegmentDisposition<PrimaryRc<SockI, CC::WeakDeviceId, BC>>
926where
927    SockI: DualStackIpExt,
928    WireI: DualStackIpExt,
929    BC: TcpBindingsContext<CC::DeviceId>
930        + BufferProvider<
931            BC::ReceiveBuffer,
932            BC::SendBuffer,
933            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
934            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
935        >,
936    CC: TcpContext<SockI, BC>,
937    DC: TransportIpContext<WireI, BC, DeviceId = CC::DeviceId, WeakDeviceId = CC::WeakDeviceId>
938        + DeviceIpSocketHandler<WireI, BC>
939        + TcpDemuxContext<WireI, CC::WeakDeviceId, BC>
940        + TcpCounterContext<SockI, CC::WeakDeviceId, BC>
941        + CoreTxMetadataContext<TcpSocketTxMetadata<SockI, CC::WeakDeviceId, BC>, BC>,
942    H: IpHeaderInfo<WireI>,
943{
944    let (maybe_listener, sharing, listener_addr) = assert_matches!(
945        &socket_state.socket_state,
946        TcpSocketStateInner::Bound(BoundSocketState::Listener(l)) => l,
947        "invalid socket ID"
948    );
949
950    let ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) } =
951        incoming_addrs;
952
953    let Listener { accept_queue, backlog, buffer_sizes } = match maybe_listener {
954        MaybeListener::Bound(_bound) => {
955            // If the socket is only bound, but not listening.
956            return ListenerIncomingSegmentDisposition::NoMatchingSocket;
957        }
958        MaybeListener::Listener(listener) => listener,
959    };
960
961    match run_socket_ingress_filter(
962        bindings_ctx,
963        incoming_device,
964        listener_id.socket_cookie(),
965        &socket_state.socket_options,
966        header_info,
967        incoming.tcp_segment(),
968    ) {
969        SocketIngressFilterResult::Accept => (),
970        SocketIngressFilterResult::Drop => {
971            return ListenerIncomingSegmentDisposition::Filtered;
972        }
973    }
974
975    // Note that this checks happens at the very beginning, before we try to
976    // reuse the connection in TIME-WAIT, this is because we need to store the
977    // reused connection in the accept queue so we have to respect its limit.
978    if accept_queue.len() == backlog.get() {
979        core_ctx.increment_both(listener_id, |counters| &counters.listener_queue_overflow);
980        core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
981        debug!("incoming SYN dropped because of the full backlog of the listener");
982        return ListenerIncomingSegmentDisposition::FoundSocket;
983    }
984
985    // Ensure that if the remote address requires a zone, we propagate that to
986    // the address for the connected socket.
987    let bound_device = listener_addr.as_ref().clone();
988    let bound_device = if remote_ip.as_ref().must_have_zone() {
989        Some(bound_device.map_or(EitherDeviceId::Strong(incoming_device), EitherDeviceId::Weak))
990    } else {
991        bound_device.map(EitherDeviceId::Weak)
992    };
993
994    let ip_options = TcpIpSockOptions { marks: *marks, ..socket_state.socket_options.ip_options };
995    let socket_options = SocketOptions { ip_options, ..socket_state.socket_options };
996
997    let bound_device = bound_device.as_ref().map(|d| d.as_ref());
998    let ip_sock = match core_ctx.new_ip_socket(
999        bindings_ctx,
1000        IpSocketArgs {
1001            device: bound_device,
1002            local_ip: IpDeviceAddr::new_from_socket_ip_addr(local_ip),
1003            remote_ip,
1004            proto: IpProto::Tcp.into(),
1005            options: &ip_options,
1006        },
1007    ) {
1008        Ok(ip_sock) => ip_sock,
1009        err @ Err(IpSockCreationError::Route(_)) => {
1010            core_ctx.increment_both(listener_id, |counters| &counters.passive_open_no_route_errors);
1011            core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1012            debug!("cannot construct an ip socket to the SYN originator: {:?}, ignoring", err);
1013            return ListenerIncomingSegmentDisposition::NoMatchingSocket;
1014        }
1015    };
1016
1017    let isn = isn.generate(
1018        bindings_ctx.now(),
1019        (ip_sock.local_ip().clone().into(), local_port),
1020        (ip_sock.remote_ip().clone(), remote_port),
1021    );
1022    let timestamp_offset = timestamp_offset.generate::<SocketIpAddr<WireI::Addr>, NonZeroU16>(
1023        bindings_ctx.now(),
1024        (ip_sock.local_ip().clone().into(), local_port),
1025        (ip_sock.remote_ip().clone(), remote_port),
1026    );
1027    let device_mms = match core_ctx.get_mms(bindings_ctx, &ip_sock, &socket_options.ip_options) {
1028        Ok(mms) => mms,
1029        Err(err) => {
1030            // If we cannot find a device or the device's MTU is too small,
1031            // there isn't much we can do here since sending a RST back is
1032            // impossible, we just need to silent drop the segment.
1033            error!("Cannot find a device with large enough MTU for the connection");
1034            core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1035            match err {
1036                MmsError::NoDevice(_) | MmsError::MTUTooSmall(_) => {
1037                    return ListenerIncomingSegmentDisposition::FoundSocket;
1038                }
1039            }
1040        }
1041    };
1042    let Some(device_mss) = Mss::from_mms(device_mms) else {
1043        return ListenerIncomingSegmentDisposition::FoundSocket;
1044    };
1045
1046    let mut state = State::Listen(Closed::<Initial>::listen(
1047        isn,
1048        timestamp_offset,
1049        buffer_sizes.clone(),
1050        device_mss,
1051        Mss::default::<WireI>(),
1052        socket_options.user_timeout,
1053    ));
1054
1055    // Prepare a reply to be sent out.
1056    //
1057    // We might end up discarding the reply in case we can't instantiate this
1058    // new connection.
1059    let result = state.on_segment::<_, BC>(
1060        // NB: This is a bit of a lie, we're passing the listener ID to process
1061        // the first segment because we don't have an ID allocated yet. This is
1062        // okay because the state machine ID is only for debugging purposes.
1063        &listener_id.either(),
1064        &TcpCountersRefs::from_ctx(core_ctx, listener_id),
1065        incoming.into(),
1066        bindings_ctx.now(),
1067        &SocketOptions::default(),
1068        false, /* defunct */
1069    );
1070    let reply = assert_matches!(
1071        result,
1072        (reply, None, /* data_acked */ _, NewlyClosed::No /* can't become closed */) => reply
1073    );
1074
1075    let result = if matches!(state, State::SynRcvd(_)) {
1076        let poll_send_at = state.poll_send_at().expect("no retrans timer");
1077        let ListenerSharingState { sharing, listening: _ } = *sharing;
1078        let bound_device = ip_sock.device().cloned();
1079
1080        let addr = ConnAddr {
1081            ip: ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) },
1082            device: bound_device,
1083        };
1084
1085        let new_socket = core_ctx.with_demux_mut(|DemuxState { socketmap, .. }| {
1086            // If we're reusing an entry, remove it from the demux before
1087            // proceeding.
1088            //
1089            // We could just reuse the old allocation for the new connection but
1090            // because of the restrictions on the socket map data structure (for
1091            // good reasons), we can't update the sharing info unconditionally.
1092            // So here we just remove the old connection and create a new one.
1093            // Also this approach has the benefit of not accidentally persisting
1094            // the old state that we don't want.
1095            if let Some((tw_reuse, conn_addr)) = tw_reuse {
1096                match socketmap.conns_mut().remove(tw_reuse, &conn_addr) {
1097                    Ok(()) => {
1098                        // NB: We're removing the tw_reuse connection from the
1099                        // demux here, but not canceling its timer. The timer is
1100                        // canceled via drop when we destroy the socket. Special
1101                        // care is taken when handling timers in the time wait
1102                        // state to account for this.
1103                    }
1104                    Err(NotFoundError) => {
1105                        // We could lose a race trying to reuse the tw_reuse
1106                        // socket, so we just accept the loss and be happy that
1107                        // the conn_addr we want to use is free.
1108                    }
1109                }
1110            }
1111
1112            // Try to create and add the new socket to the demux.
1113            let accept_queue_clone = accept_queue.clone();
1114            let ip_sock = ip_sock.clone();
1115            let bindings_ctx_moved = &mut *bindings_ctx;
1116            match socketmap.conns_mut().try_insert_with(addr, sharing, move |addr, sharing| {
1117                let conn = make_connection(
1118                    Connection {
1119                        accept_queue: Some(accept_queue_clone),
1120                        state,
1121                        ip_sock,
1122                        defunct: false,
1123                        soft_error: None,
1124                        handshake_status: HandshakeStatus::Pending,
1125                    },
1126                    addr,
1127                );
1128
1129                let (id, primary) = TcpSocketId::new_cyclic(
1130                    |weak| {
1131                        let mut timer = CC::new_timer(bindings_ctx_moved, weak);
1132                        // Schedule the timer here because we can't acquire the lock
1133                        // later. This only runs when inserting into the demux
1134                        // succeeds so it's okay.
1135                        assert_eq!(
1136                            bindings_ctx_moved.schedule_timer_instant(poll_send_at, &mut timer),
1137                            None
1138                        );
1139                        TcpSocketStateInner::Bound(BoundSocketState::Connected {
1140                            conn,
1141                            sharing,
1142                            timer,
1143                        })
1144                    },
1145                    socket_options,
1146                );
1147                (make_demux_id(id.clone()), (primary, id))
1148            }) {
1149                Ok((_entry, (primary, id))) => {
1150                    // Make sure the new socket is in the pending accept queue
1151                    // before we release the demux lock.
1152                    accept_queue.push_pending(id);
1153                    Some(primary)
1154                }
1155                Err((e, _sharing_state)) => {
1156                    // The only error we accept here is if the entry exists
1157                    // fully, any indirect conflicts are unexpected because we
1158                    // know the listener is still alive and installed in the
1159                    // demux.
1160                    assert_matches!(e, InsertError::Exists);
1161                    // If we fail to insert it means we lost a race and this
1162                    // packet is destined to a connection that is already
1163                    // established. In that case we should tell the demux code
1164                    // to retry demuxing it all over again.
1165                    None
1166                }
1167            }
1168        });
1169
1170        match new_socket {
1171            Some(new_socket) => ListenerIncomingSegmentDisposition::NewConnection(new_socket),
1172            None => {
1173                // We didn't create a new connection, short circuit early and
1174                // don't send out the pending segment.
1175                core_ctx
1176                    .increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1177                return ListenerIncomingSegmentDisposition::ConflictingConnection;
1178            }
1179        }
1180    } else {
1181        // We found a valid listener for the segment even if the connection
1182        // state is not a newly pending connection.
1183        ListenerIncomingSegmentDisposition::FoundSocket
1184    };
1185
1186    // We can send a reply now if we got here.
1187    if let Some(seg) = reply {
1188        socket::send_tcp_segment(
1189            core_ctx,
1190            bindings_ctx,
1191            Some(&listener_id),
1192            Some(&ip_sock),
1193            incoming_addrs,
1194            seg.into_empty(),
1195            &socket_options.ip_options,
1196        );
1197    }
1198
1199    result
1200}
1201
1202pub(super) fn tcp_serialize_segment<'a, I, P>(
1203    header: &'a SegmentHeader,
1204    data: P,
1205    conn_addr: ConnIpAddr<I::Addr, NonZeroU16, NonZeroU16>,
1206) -> impl TransportPacketSerializer<I, Buffer = EmptyBuf> + Debug + 'a
1207where
1208    I: FilterIpExt,
1209    P: InnerPacketBuilder + Debug + Payload + 'a,
1210{
1211    let SegmentHeader { seq, ack, wnd, control, options, push } = header;
1212    let ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) } = conn_addr;
1213    let mut builder = TcpSegmentBuilder::new(
1214        local_ip.addr(),
1215        remote_ip.addr(),
1216        local_port,
1217        remote_port,
1218        (*seq).into(),
1219        ack.map(Into::into),
1220        u16::from(*wnd),
1221    );
1222    builder.psh(*push);
1223    match control {
1224        None => {}
1225        Some(Control::SYN) => builder.syn(true),
1226        Some(Control::FIN) => builder.fin(true),
1227        Some(Control::RST) => builder.rst(true),
1228    }
1229    TcpSegmentBuilderWithOptions::new(builder, options.builder())
1230        .unwrap_or_else(|TcpOptionsTooLongError| {
1231            panic!("Too many TCP options");
1232        })
1233        .wrap_body(data.into_serializer())
1234}
1235
1236fn run_socket_ingress_filter<I, BC, D>(
1237    bindings_ctx: &BC,
1238    incoming_device: &D,
1239    socket_cookie: SocketCookie,
1240    socket_options: &SocketOptions,
1241    header_info: &impl IpHeaderInfo<I>,
1242    tcp_segment: &TcpSegment<&'_ [u8]>,
1243) -> SocketIngressFilterResult
1244where
1245    I: Ip,
1246    BC: TcpBindingsContext<D>,
1247    D: StrongDeviceIdentifier,
1248{
1249    let [ip_prefix, ip_options] = header_info.as_bytes();
1250    let [tcp_prefix, tcp_options, data] = tcp_segment.as_bytes();
1251    let mut slices = [ip_prefix, ip_options, tcp_prefix, tcp_options, data];
1252    let data = FragmentedByteSlice::new(&mut slices);
1253
1254    bindings_ctx.socket_ops_filter().on_ingress(
1255        I::VERSION,
1256        data,
1257        incoming_device,
1258        socket_cookie,
1259        &socket_options.ip_options.marks,
1260    )
1261}
1262
1263#[cfg(test)]
1264mod test {
1265    use ip_test_macro::ip_test;
1266    use netstack3_base::{
1267        HandshakeOptions, Options, ResetOptions, SackBlocks, SegmentOptions, UnscaledWindowSize,
1268    };
1269    use packet::{ParseBuffer as _, Serializer as _};
1270    use packet_formats::tcp::options::TcpOptions as _;
1271    use test_case::test_case;
1272
1273    use super::*;
1274
1275    trait TestIpExt: netstack3_base::testutil::TestIpExt + FilterIpExt {}
1276    impl<T> TestIpExt for T where T: netstack3_base::testutil::TestIpExt + FilterIpExt {}
1277
1278    const SEQ: SeqNum = SeqNum::new(12345);
1279    const ACK: SeqNum = SeqNum::new(67890);
1280    const FAKE_DATA: &'static [u8] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 0];
1281
1282    #[ip_test(I)]
1283    #[test_case(
1284        Segment::syn(SEQ, UnscaledWindowSize::from(u16::MAX),
1285        HandshakeOptions::default()), &[]
1286        ; "syn")]
1287    #[test_case(
1288        Segment::syn(SEQ, UnscaledWindowSize::from(u16::MAX),
1289        HandshakeOptions {
1290            mss: Some(Mss::new(1440).unwrap()),
1291            ..Default::default() }), &[]
1292            ; "syn with mss")]
1293    #[test_case(
1294        Segment::ack(SEQ, ACK, UnscaledWindowSize::from(u16::MAX), SegmentOptions::default()),
1295        &[]; "ack")]
1296    #[test_case(Segment::with_fake_data(SEQ, ACK, FAKE_DATA), FAKE_DATA; "data")]
1297    #[test_case(Segment::new_assert_no_discard(SegmentHeader {
1298            seq: SEQ,
1299            ack: Some(ACK),
1300            push: true,
1301            wnd: UnscaledWindowSize::from(u16::MAX),
1302            ..Default::default()
1303        },
1304        FAKE_DATA
1305    ), FAKE_DATA; "push")]
1306    fn tcp_serialize_segment<I: TestIpExt>(segment: Segment<&[u8]>, expected_body: &[u8]) {
1307        const SOURCE_PORT: NonZeroU16 = NonZeroU16::new(1111).unwrap();
1308        const DEST_PORT: NonZeroU16 = NonZeroU16::new(2222).unwrap();
1309
1310        let (header, data) = segment.into_parts();
1311        let serializer = super::tcp_serialize_segment::<I, _>(
1312            &header,
1313            data,
1314            ConnIpAddr {
1315                local: (SocketIpAddr::try_from(I::TEST_ADDRS.local_ip).unwrap(), SOURCE_PORT),
1316                remote: (SocketIpAddr::try_from(I::TEST_ADDRS.remote_ip).unwrap(), DEST_PORT),
1317            },
1318        );
1319
1320        let mut serialized = serializer.serialize_vec_outer().unwrap().unwrap_b();
1321        let parsed_segment = serialized
1322            .parse_with::<_, TcpSegment<_>>(TcpParseArgs::new(
1323                *I::TEST_ADDRS.remote_ip,
1324                *I::TEST_ADDRS.local_ip,
1325            ))
1326            .expect("is valid segment");
1327
1328        assert_eq!(parsed_segment.src_port(), SOURCE_PORT);
1329        assert_eq!(parsed_segment.dst_port(), DEST_PORT);
1330        assert_eq!(parsed_segment.seq_num(), u32::from(SEQ));
1331        assert_eq!(parsed_segment.psh(), header.push);
1332        assert_eq!(
1333            UnscaledWindowSize::from(parsed_segment.window_size()),
1334            UnscaledWindowSize::from(u16::MAX)
1335        );
1336
1337        let (mss, window_scale, sack_permitted, sack_blocks, timestamp) = match header.options {
1338            Options::Handshake(HandshakeOptions {
1339                mss,
1340                window_scale,
1341                sack_permitted,
1342                timestamp,
1343            }) => (mss, window_scale, sack_permitted, SackBlocks::EMPTY, timestamp),
1344            Options::Segment(SegmentOptions { timestamp, sack_blocks }) => {
1345                (None, None, false, sack_blocks, timestamp)
1346            }
1347            Options::Reset(ResetOptions { timestamp }) => {
1348                (None, None, false, SackBlocks::EMPTY, timestamp)
1349            }
1350        };
1351        assert_eq!(mss.map(|mss| mss.get()), parsed_segment.options().mss());
1352        assert_eq!(window_scale.map(|ws| ws.get()), parsed_segment.options().window_scale());
1353        assert_eq!(sack_permitted, parsed_segment.options().sack_permitted());
1354        assert_eq!(sack_blocks.as_slice(), parsed_segment.options().sack_blocks());
1355        assert_eq!(
1356            timestamp.as_ref().map(Into::into).as_ref(),
1357            parsed_segment.options().timestamp()
1358        );
1359
1360        assert_eq!(parsed_segment.into_body(), expected_body);
1361    }
1362}