netstack3_tcp/socket/
demux.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Defines the entry point of TCP packets, by directing them into the correct
6//! state machine.
7
8use core::convert::Infallible as Never;
9use core::fmt::Debug;
10use core::num::NonZeroU16;
11
12use assert_matches::assert_matches;
13use log::{debug, error, warn};
14use net_types::ip::Ip;
15use net_types::{SpecifiedAddr, Witness as _};
16use netstack3_base::socket::{
17    AddrIsMappedError, AddrVec, AddrVecIter, ConnAddr, ConnIpAddr, InsertError, ListenerAddr,
18    ListenerIpAddr, SocketCookie, SocketIpAddr, SocketIpAddrExt as _,
19};
20use netstack3_base::{
21    BidirectionalConverter as _, Control, CounterContext, CtxPair, EitherDeviceId, IpDeviceAddr,
22    Marks, Mss, NotFoundError, Payload, Segment, SegmentHeader, SeqNum, StrongDeviceIdentifier,
23    VerifiedTcpSegment, WeakDeviceIdentifier,
24};
25use netstack3_filter::{
26    FilterIpExt, SocketIngressFilterResult, SocketOpsFilter, TransportPacketSerializer,
27};
28use netstack3_hashmap::hash_map;
29use netstack3_ip::socket::{IpSockCreationError, IpSocketArgs, MmsError};
30use netstack3_ip::{
31    IpHeaderInfo, IpTransportContext, LocalDeliveryPacketInfo, ReceiveIpPacketMeta,
32    TransportIpContext, TransportReceiveError,
33};
34use netstack3_trace::trace_duration;
35use packet::{
36    BufferMut, BufferView as _, EmptyBuf, FragmentedByteSlice, InnerPacketBuilder, PacketBuilder,
37    ParseBuffer,
38};
39use packet_formats::error::ParseError;
40use packet_formats::ip::IpProto;
41use packet_formats::tcp::{
42    TcpFlowAndSeqNum, TcpOptionsTooLongError, TcpParseArgs, TcpSegment, TcpSegmentBuilder,
43    TcpSegmentBuilderWithOptions,
44};
45
46use crate::internal::base::{BufferSizes, ConnectionError, SocketOptions, TcpIpSockOptions};
47use crate::internal::counters::{
48    self, TcpCounterContext, TcpCountersRefs, TcpCountersWithoutSocket,
49};
50use crate::internal::socket::generators::{IsnGenerator, TimestampOffsetGenerator};
51use crate::internal::socket::{
52    self, AsThisStack as _, Connection, CoreTxMetadataContext, DemuxState, DeviceIpSocketHandler,
53    DoSendLimit, DualStackDemuxIdConverter as _, DualStackIpExt, EitherStack, HandshakeStatus,
54    Listener, ListenerAddrState, MaybeDualStack, PrimaryRc, TcpApi, TcpBindingsContext,
55    TcpBindingsTypes, TcpContext, TcpDemuxContext, TcpDualStackContext, TcpIpTransportContext,
56    TcpPortSpec, TcpSocketId, TcpSocketSetEntry, TcpSocketState, TcpSocketStateInner,
57    TcpSocketTxMetadata,
58};
59use crate::internal::state::{
60    BufferProvider, Closed, DataAcked, Initial, NewlyClosed, State, TimeWait,
61};
62
63impl<BT: TcpBindingsTypes> BufferProvider<BT::ReceiveBuffer, BT::SendBuffer> for BT {
64    type ActiveOpen = BT::ListenerNotifierOrProvidedBuffers;
65
66    type PassiveOpen = BT::ReturnedBuffers;
67
68    fn new_passive_open_buffers(
69        buffer_sizes: BufferSizes,
70    ) -> (BT::ReceiveBuffer, BT::SendBuffer, Self::PassiveOpen) {
71        BT::new_passive_open_buffers(buffer_sizes)
72    }
73}
74
75impl<I, BC, CC> IpTransportContext<I, BC, CC> for TcpIpTransportContext
76where
77    I: DualStackIpExt,
78    BC: TcpBindingsContext<CC::DeviceId>
79        + BufferProvider<
80            BC::ReceiveBuffer,
81            BC::SendBuffer,
82            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
83            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
84        >,
85    CC: TcpContext<I, BC> + TcpContext<I::OtherVersion, BC>,
86{
87    // TODO(https://fxbug.dev/473819144) Implement early demux for TCP.
88    type EarlyDemuxSocket = Never;
89
90    fn early_demux<B: ParseBuffer>(
91        _core_ctx: &mut CC,
92        _device: &CC::DeviceId,
93        _src_ip: I::Addr,
94        _dst_ip: I::Addr,
95        _buffer: B,
96    ) -> Option<Self::EarlyDemuxSocket> {
97        None
98    }
99
100    fn receive_icmp_error(
101        core_ctx: &mut CC,
102        bindings_ctx: &mut BC,
103        _device: &CC::DeviceId,
104        original_src_ip: Option<SpecifiedAddr<I::Addr>>,
105        original_dst_ip: SpecifiedAddr<I::Addr>,
106        mut original_body: &[u8],
107        err: I::ErrorCode,
108    ) {
109        let mut buffer = &mut original_body;
110        let Some(flow_and_seqnum) = buffer.take_obj_front::<TcpFlowAndSeqNum>() else {
111            error!("received an ICMP error but its body is less than 8 bytes");
112            return;
113        };
114
115        let Some(original_src_ip) = original_src_ip else { return };
116        let Some(original_src_port) = NonZeroU16::new(flow_and_seqnum.src_port()) else { return };
117        let Some(original_dst_port) = NonZeroU16::new(flow_and_seqnum.dst_port()) else { return };
118        let original_seqnum = SeqNum::new(flow_and_seqnum.sequence_num());
119
120        TcpApi::<I, _>::new(CtxPair { core_ctx, bindings_ctx }).on_icmp_error(
121            original_src_ip,
122            original_dst_ip,
123            original_src_port,
124            original_dst_port,
125            original_seqnum,
126            err.into(),
127        );
128    }
129
130    fn receive_ip_packet<B: BufferMut, H: IpHeaderInfo<I>>(
131        core_ctx: &mut CC,
132        bindings_ctx: &mut BC,
133        device: &CC::DeviceId,
134        remote_ip: I::RecvSrcAddr,
135        local_ip: SpecifiedAddr<I::Addr>,
136        mut buffer: B,
137        info: &LocalDeliveryPacketInfo<I, H>,
138        _early_demux_socket: Option<Never>,
139    ) -> Result<(), (B, TransportReceiveError)> {
140        let LocalDeliveryPacketInfo { meta, header_info, marks } = info;
141        let ReceiveIpPacketMeta { broadcast, transparent_override } = meta;
142        if let Some(delivery) = transparent_override {
143            warn!(
144                "TODO(https://fxbug.dev/337009139): transparent proxy not supported for TCP \
145                sockets; will not override dispatch to perform local delivery to {delivery:?}"
146            );
147        }
148
149        if broadcast.is_some() {
150            CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
151                .invalid_ip_addrs_received
152                .increment();
153            debug!("tcp: dropping broadcast TCP packet");
154            return Ok(());
155        }
156
157        let remote_ip = match SpecifiedAddr::new(remote_ip.into_addr()) {
158            None => {
159                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
160                    .invalid_ip_addrs_received
161                    .increment();
162                debug!("tcp: source address unspecified, dropping the packet");
163                return Ok(());
164            }
165            Some(src_ip) => src_ip,
166        };
167        let remote_ip: SocketIpAddr<_> = match remote_ip.try_into() {
168            Ok(remote_ip) => remote_ip,
169            Err(AddrIsMappedError {}) => {
170                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
171                    .invalid_ip_addrs_received
172                    .increment();
173                debug!("tcp: source address is mapped (ipv4-mapped-ipv6), dropping the packet");
174                return Ok(());
175            }
176        };
177        let local_ip: SocketIpAddr<_> = match local_ip.try_into() {
178            Ok(local_ip) => local_ip,
179            Err(AddrIsMappedError {}) => {
180                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
181                    .invalid_ip_addrs_received
182                    .increment();
183                debug!("tcp: local address is mapped (ipv4-mapped-ipv6), dropping the packet");
184                return Ok(());
185            }
186        };
187        let packet = match buffer
188            .parse_with::<_, TcpSegment<_>>(TcpParseArgs::new(remote_ip.addr(), local_ip.addr()))
189        {
190            Ok(packet) => packet,
191            Err(err) => {
192                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
193                    .invalid_segments_received
194                    .increment();
195                debug!("tcp: failed parsing incoming packet {:?}", err);
196                match err {
197                    ParseError::Checksum => {
198                        CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
199                            .checksum_errors
200                            .increment();
201                    }
202                    ParseError::NotSupported | ParseError::NotExpected | ParseError::Format => {}
203                }
204                return Ok(());
205            }
206        };
207        let local_port = packet.dst_port();
208        let remote_port = packet.src_port();
209        let incoming = match VerifiedTcpSegment::try_from(packet) {
210            Ok(segment) => segment,
211            Err(err) => {
212                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
213                    .invalid_segments_received
214                    .increment();
215                debug!("tcp: malformed segment {:?}", err);
216                return Ok(());
217            }
218        };
219        let conn_addr =
220            ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) };
221
222        CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
223            .valid_segments_received
224            .increment();
225        handle_incoming_packet::<I, _, _, _>(
226            core_ctx,
227            bindings_ctx,
228            conn_addr,
229            device,
230            header_info,
231            &incoming,
232            marks,
233        );
234        Ok(())
235    }
236}
237
238fn handle_incoming_packet<WireI, BC, CC, H>(
239    core_ctx: &mut CC,
240    bindings_ctx: &mut BC,
241    conn_addr: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
242    incoming_device: &CC::DeviceId,
243    header_info: &H,
244    incoming: &VerifiedTcpSegment<'_>,
245    marks: &Marks,
246) where
247    WireI: DualStackIpExt,
248    BC: TcpBindingsContext<CC::DeviceId>
249        + BufferProvider<
250            BC::ReceiveBuffer,
251            BC::SendBuffer,
252            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
253            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
254        >,
255    CC: TcpContext<WireI, BC> + TcpContext<WireI::OtherVersion, BC>,
256    H: IpHeaderInfo<WireI>,
257{
258    trace_duration!("tcp::handle_incoming_packet");
259    let mut tw_reuse = None;
260
261    let mut addrs_to_search = AddrVecIter::<WireI, CC::WeakDeviceId, TcpPortSpec>::with_device(
262        conn_addr.into(),
263        incoming_device.downgrade(),
264    );
265
266    enum FoundSocket<S> {
267        // Typically holds the demux ID of the found socket, but may hold
268        // `None` if the found socket was destroyed as a result of the segment.
269        Yes(Option<S>),
270        No,
271    }
272    let found_socket = loop {
273        let sock = core_ctx
274            .with_demux(|demux| lookup_socket::<WireI, CC, BC>(demux, &mut addrs_to_search));
275        match sock {
276            None => break FoundSocket::No,
277            Some(SocketLookupResult::Connection(demux_conn_id, conn_addr)) => {
278                // It is not possible to have two same connections that
279                // share the same local and remote IPs and ports.
280                assert_eq!(tw_reuse, None);
281                let disposition = match WireI::as_dual_stack_ip_socket(&demux_conn_id) {
282                    EitherStack::ThisStack(conn_id) => {
283                        try_handle_incoming_for_connection_dual_stack(
284                            core_ctx,
285                            bindings_ctx,
286                            conn_id,
287                            incoming_device,
288                            header_info,
289                            &incoming,
290                        )
291                    }
292                    EitherStack::OtherStack(conn_id) => {
293                        try_handle_incoming_for_connection_dual_stack(
294                            core_ctx,
295                            bindings_ctx,
296                            conn_id,
297                            incoming_device,
298                            header_info,
299                            &incoming,
300                        )
301                    }
302                };
303                match disposition {
304                    ConnectionIncomingSegmentDisposition::Destroy => {
305                        WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, demux_conn_id);
306                        break FoundSocket::Yes(None);
307                    }
308                    ConnectionIncomingSegmentDisposition::FoundSocket
309                    | ConnectionIncomingSegmentDisposition::Filtered => {
310                        break FoundSocket::Yes(Some(demux_conn_id));
311                    }
312                    ConnectionIncomingSegmentDisposition::ReuseCandidateForListener => {
313                        tw_reuse = Some((demux_conn_id, conn_addr));
314                    }
315                }
316            }
317            Some(SocketLookupResult::Listener((demux_listener_id, _listener_addr))) => {
318                match WireI::as_dual_stack_ip_socket(&demux_listener_id) {
319                    EitherStack::ThisStack(listener_id) => {
320                        let disposition = core_ctx.with_socket_mut_generators_transport_demux(
321                            &listener_id,
322                            |core_ctx, socket_state, isn, timestamp_offset| match core_ctx {
323                                MaybeDualStack::NotDualStack((core_ctx, converter)) => {
324                                    try_handle_incoming_for_listener::<WireI, WireI, CC, BC, _, _>(
325                                        core_ctx,
326                                        bindings_ctx,
327                                        &listener_id,
328                                        isn,
329                                        timestamp_offset,
330                                        socket_state,
331                                        header_info,
332                                        incoming,
333                                        conn_addr,
334                                        incoming_device,
335                                        &mut tw_reuse,
336                                        move |conn, addr| converter.convert_back((conn, addr)),
337                                        WireI::into_demux_socket_id,
338                                        marks,
339                                    )
340                                }
341                                MaybeDualStack::DualStack((core_ctx, converter)) => {
342                                    try_handle_incoming_for_listener::<_, _, CC, BC, _, _>(
343                                        core_ctx,
344                                        bindings_ctx,
345                                        &listener_id,
346                                        isn,
347                                        timestamp_offset,
348                                        socket_state,
349                                        header_info,
350                                        incoming,
351                                        conn_addr,
352                                        incoming_device,
353                                        &mut tw_reuse,
354                                        move |conn, addr| {
355                                            converter
356                                                .convert_back(EitherStack::ThisStack((conn, addr)))
357                                        },
358                                        WireI::into_demux_socket_id,
359                                        marks,
360                                    )
361                                }
362                            },
363                        );
364                        if try_handle_listener_incoming_disposition(
365                            core_ctx,
366                            bindings_ctx,
367                            disposition,
368                            &demux_listener_id,
369                            &mut tw_reuse,
370                            &mut addrs_to_search,
371                            conn_addr,
372                            incoming_device,
373                        ) {
374                            break FoundSocket::Yes(Some(demux_listener_id));
375                        }
376                    }
377                    EitherStack::OtherStack(listener_id) => {
378                        let disposition = core_ctx.with_socket_mut_generators_transport_demux(
379                            &listener_id,
380                            |core_ctx, socket_state, isn, timestamp_offset| {
381                                match core_ctx {
382                                    MaybeDualStack::NotDualStack((_core_ctx, _converter)) => {
383                                        // TODO(https://issues.fuchsia.dev/316408184):
384                                        // Remove this unreachable!.
385                                        unreachable!("OtherStack socket ID with non dual stack");
386                                    }
387                                    MaybeDualStack::DualStack((core_ctx, converter)) => {
388                                        let other_demux_id_converter =
389                                            core_ctx.other_demux_id_converter();
390                                        try_handle_incoming_for_listener::<_, _, CC, BC, _, _>(
391                                            core_ctx,
392                                            bindings_ctx,
393                                            &listener_id,
394                                            isn,
395                                            timestamp_offset,
396                                            socket_state,
397                                            header_info,
398                                            incoming,
399                                            conn_addr,
400                                            incoming_device,
401                                            &mut tw_reuse,
402                                            move |conn, addr| {
403                                                converter.convert_back(EitherStack::OtherStack((
404                                                    conn, addr,
405                                                )))
406                                            },
407                                            move |id| other_demux_id_converter.convert(id),
408                                            marks,
409                                        )
410                                    }
411                                }
412                            },
413                        );
414                        if try_handle_listener_incoming_disposition::<_, _, CC, BC, _>(
415                            core_ctx,
416                            bindings_ctx,
417                            disposition,
418                            &demux_listener_id,
419                            &mut tw_reuse,
420                            &mut addrs_to_search,
421                            conn_addr,
422                            incoming_device,
423                        ) {
424                            break FoundSocket::Yes(Some(demux_listener_id));
425                        }
426                    }
427                };
428            }
429        }
430    };
431
432    let demux_id = match found_socket {
433        FoundSocket::No => {
434            CounterContext::<TcpCountersWithoutSocket<WireI>>::counters(core_ctx)
435                .received_segments_no_dispatch
436                .increment();
437
438            // There is no existing TCP state, pretend it is closed
439            // and generate a RST if needed.
440            // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-21):
441            // CLOSED is fictional because it represents the state when
442            // there is no TCB, and therefore, no connection.
443            if let Some(seg) =
444                (Closed { reason: None::<Option<ConnectionError>> }.on_segment(&incoming.into()))
445            {
446                socket::send_tcp_segment::<WireI, WireI, _, _, _>(
447                    core_ctx,
448                    bindings_ctx,
449                    None,
450                    None,
451                    conn_addr,
452                    seg.into_empty(),
453                    &TcpIpSockOptions { marks: *marks },
454                );
455            }
456            None
457        }
458        FoundSocket::Yes(demux_id) => {
459            counters::increment_counter_with_optional_demux_id::<WireI, _, _, _, _>(
460                core_ctx,
461                demux_id.as_ref(),
462                |c| &c.received_segments_dispatched,
463            );
464            demux_id
465        }
466    };
467
468    if let Some(control) = incoming.control() {
469        counters::increment_counter_with_optional_demux_id::<WireI, _, _, _, _>(
470            core_ctx,
471            demux_id.as_ref(),
472            |c| match control {
473                Control::RST => &c.resets_received,
474                Control::SYN => &c.syns_received,
475                Control::FIN => &c.fins_received,
476            },
477        )
478    }
479}
480
481enum SocketLookupResult<I: DualStackIpExt, D: WeakDeviceIdentifier, BT: TcpBindingsTypes> {
482    Connection(I::DemuxSocketId<D, BT>, ConnAddr<ConnIpAddr<I::Addr, NonZeroU16, NonZeroU16>, D>),
483    Listener((I::DemuxSocketId<D, BT>, ListenerAddr<ListenerIpAddr<I::Addr, NonZeroU16>, D>)),
484}
485
486fn lookup_socket<I, CC, BC>(
487    DemuxState { socketmap, .. }: &DemuxState<I, CC::WeakDeviceId, BC>,
488    addrs_to_search: &mut AddrVecIter<I, CC::WeakDeviceId, TcpPortSpec>,
489) -> Option<SocketLookupResult<I, CC::WeakDeviceId, BC>>
490where
491    I: DualStackIpExt,
492    BC: TcpBindingsContext<CC::DeviceId>,
493    CC: TcpContext<I, BC>,
494{
495    addrs_to_search.find_map(|addr| {
496        match addr {
497            // Connections are always searched before listeners because they
498            // are more specific.
499            AddrVec::Conn(conn_addr) => {
500                socketmap.conns().get_by_addr(&conn_addr).map(|conn_addr_state| {
501                    SocketLookupResult::Connection(conn_addr_state.id(), conn_addr)
502                })
503            }
504            AddrVec::Listen(listener_addr) => {
505                // If we have a listener and the incoming segment is a SYN, we
506                // allocate a new connection entry in the demuxer.
507                // TODO(https://fxbug.dev/42052878): Support SYN cookies.
508
509                socketmap
510                    .listeners()
511                    .get_by_addr(&listener_addr)
512                    .and_then(|addr_state| match addr_state {
513                        ListenerAddrState::ExclusiveListener(id) => Some(id.clone()),
514                        ListenerAddrState::Shared { listener: Some(id), bound: _ } => {
515                            Some(id.clone())
516                        }
517                        ListenerAddrState::ExclusiveBound(_)
518                        | ListenerAddrState::Shared { listener: None, bound: _ } => None,
519                    })
520                    .map(|id| SocketLookupResult::Listener((id, listener_addr)))
521            }
522        }
523    })
524}
525
526#[derive(PartialEq, Eq)]
527enum ConnectionIncomingSegmentDisposition {
528    FoundSocket,
529    Filtered,
530    ReuseCandidateForListener,
531    Destroy,
532}
533
534enum ListenerIncomingSegmentDisposition<S> {
535    FoundSocket,
536    Filtered,
537    ConflictingConnection,
538    NoMatchingSocket,
539    NewConnection(S),
540}
541
542fn try_handle_incoming_for_connection_dual_stack<SockI, WireI, CC, BC, H>(
543    core_ctx: &mut CC,
544    bindings_ctx: &mut BC,
545    conn_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
546    incoming_device: &CC::DeviceId,
547    header_info: &H,
548    incoming: &VerifiedTcpSegment<'_>,
549) -> ConnectionIncomingSegmentDisposition
550where
551    SockI: DualStackIpExt,
552    WireI: Ip,
553    BC: TcpBindingsContext<CC::DeviceId>
554        + BufferProvider<
555            BC::ReceiveBuffer,
556            BC::SendBuffer,
557            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
558            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
559        >,
560    CC: TcpContext<SockI, BC>,
561    H: IpHeaderInfo<WireI>,
562{
563    core_ctx.with_socket_mut_transport_demux(conn_id, |core_ctx, socket_state| {
564        let TcpSocketState { socket_state, sharing: _, ip_options: _, socket_options } =
565            socket_state;
566
567        match run_socket_ingress_filter(
568            bindings_ctx,
569            incoming_device,
570            conn_id.socket_cookie(),
571            socket_options,
572            header_info,
573            incoming.tcp_segment(),
574        ) {
575            SocketIngressFilterResult::Accept => (),
576            SocketIngressFilterResult::Drop => {
577                return ConnectionIncomingSegmentDisposition::Filtered;
578            }
579        }
580
581        let (conn_and_addr, timer) = assert_matches!(
582            socket_state,
583            TcpSocketStateInner::Connected { conn, timer } => (conn, timer),
584            "invalid socket ID"
585        );
586        let this_or_other_stack = match core_ctx {
587            MaybeDualStack::DualStack((core_ctx, converter)) => {
588                match converter.convert(conn_and_addr) {
589                    EitherStack::ThisStack((conn, conn_addr)) => {
590                        // The socket belongs to the current stack, so we
591                        // want to deliver the segment to this stack.
592                        // Use `as_this_stack` to make the context types
593                        // match with the non-dual-stack case.
594                        EitherStack::ThisStack((
595                            core_ctx.as_this_stack(),
596                            conn,
597                            conn_addr,
598                            SockI::into_demux_socket_id(conn_id.clone()),
599                        ))
600                    }
601                    EitherStack::OtherStack((conn, conn_addr)) => {
602                        // We need to deliver from the other stack. i.e. we
603                        // need to deliver an IPv4 packet to the IPv6 stack.
604                        let demux_sock_id = core_ctx.into_other_demux_socket_id(conn_id.clone());
605                        EitherStack::OtherStack((core_ctx, conn, conn_addr, demux_sock_id))
606                    }
607                }
608            }
609            MaybeDualStack::NotDualStack((core_ctx, converter)) => {
610                let (conn, conn_addr) = converter.convert(conn_and_addr);
611                // Similar to the first case, we need deliver to this stack,
612                // but use `as_this_stack` to make the types match.
613                EitherStack::ThisStack((
614                    core_ctx.as_this_stack(),
615                    conn,
616                    conn_addr,
617                    SockI::into_demux_socket_id(conn_id.clone()),
618                ))
619            }
620        };
621
622        match this_or_other_stack {
623            EitherStack::ThisStack((core_ctx, conn, conn_addr, demux_conn_id)) => {
624                try_handle_incoming_for_connection::<_, _, CC, _, _>(
625                    core_ctx,
626                    bindings_ctx,
627                    conn_addr.clone(),
628                    conn_id,
629                    demux_conn_id,
630                    socket_options,
631                    conn,
632                    timer,
633                    incoming.into(),
634                )
635            }
636            EitherStack::OtherStack((core_ctx, conn, conn_addr, demux_conn_id)) => {
637                try_handle_incoming_for_connection::<_, _, CC, _, _>(
638                    core_ctx,
639                    bindings_ctx,
640                    conn_addr.clone(),
641                    conn_id,
642                    demux_conn_id,
643                    socket_options,
644                    conn,
645                    timer,
646                    incoming.into(),
647                )
648            }
649        }
650    })
651}
652
653/// Tries to handle the incoming segment by providing it to a connected socket.
654///
655/// Returns `FoundSocket` if the segment was handled; Otherwise,
656/// `ReuseCandidateForListener` will be returned if there is a defunct socket
657/// that is currently in TIME_WAIT, which is ready to be reused if there is an
658/// active listener listening on the port.
659fn try_handle_incoming_for_connection<SockI, WireI, CC, BC, DC>(
660    core_ctx: &mut DC,
661    bindings_ctx: &mut BC,
662    conn_addr: ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
663    conn_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
664    demux_id: WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
665    socket_options: &SocketOptions,
666    conn: &mut Connection<SockI, WireI, CC::WeakDeviceId, BC>,
667    timer: &mut BC::Timer,
668    incoming: Segment<&[u8]>,
669) -> ConnectionIncomingSegmentDisposition
670where
671    SockI: DualStackIpExt,
672    WireI: DualStackIpExt,
673    BC: TcpBindingsContext<CC::DeviceId>
674        + BufferProvider<
675            BC::ReceiveBuffer,
676            BC::SendBuffer,
677            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
678            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
679        >,
680    CC: TcpContext<SockI, BC>,
681    DC: TransportIpContext<WireI, BC, DeviceId = CC::DeviceId, WeakDeviceId = CC::WeakDeviceId>
682        + DeviceIpSocketHandler<SockI, BC>
683        + TcpDemuxContext<WireI, CC::WeakDeviceId, BC>
684        + TcpCounterContext<SockI, CC::WeakDeviceId, BC>
685        + CoreTxMetadataContext<TcpSocketTxMetadata<SockI, CC::WeakDeviceId, BC>, BC>,
686{
687    let Connection { accept_queue, state, ip_sock, defunct, soft_error: _, handshake_status } =
688        conn;
689
690    // Per RFC 9293 Section 3.6.1:
691    //   When a connection is closed actively, it MUST linger in the TIME-WAIT
692    //   state for a time 2xMSL (Maximum Segment Lifetime) (MUST-13). However,
693    //   it MAY accept a new SYN from the remote TCP endpoint to reopen the
694    //   connection directly from TIME-WAIT state (MAY-2), if it:
695    //
696    //   (1) assigns its initial sequence number for the new connection to be
697    //       larger than the largest sequence number it used on the previous
698    //       connection incarnation, and
699    //   (2) returns to TIME-WAIT state if the SYN turns out to be an old
700    //       duplicate.
701    if *defunct
702        && incoming.header().control == Some(Control::SYN)
703        && incoming.header().ack.is_none()
704    {
705        if let State::TimeWait(TimeWait { last_seq: _, closed_rcv, expiry: _, snd_info: _ }) = state
706        {
707            if !incoming.header().seq.before(closed_rcv.ack) {
708                return ConnectionIncomingSegmentDisposition::ReuseCandidateForListener;
709            }
710        }
711    }
712    let (reply, passive_open, data_acked, newly_closed) = state.on_segment::<_, BC>(
713        &conn_id.either(),
714        &TcpCountersRefs::from_ctx(core_ctx, conn_id),
715        incoming,
716        bindings_ctx.now(),
717        socket_options,
718        *defunct,
719    );
720
721    match data_acked {
722        DataAcked::Yes => {
723            core_ctx.confirm_reachable(bindings_ctx, ip_sock, &socket_options.ip_options)
724        }
725        DataAcked::No => {}
726    }
727
728    match state {
729        State::Listen(_) => {
730            unreachable!("has an invalid status: {:?}", conn.state)
731        }
732        State::SynSent(_) | State::SynRcvd(_) => {
733            assert_eq!(*handshake_status, HandshakeStatus::Pending)
734        }
735        State::Established(_)
736        | State::FinWait1(_)
737        | State::FinWait2(_)
738        | State::Closing(_)
739        | State::CloseWait(_)
740        | State::LastAck(_)
741        | State::TimeWait(_) => {
742            if handshake_status
743                .update_if_pending(HandshakeStatus::Completed { reported: accept_queue.is_some() })
744            {
745                core_ctx.confirm_reachable(bindings_ctx, ip_sock, &socket_options.ip_options);
746            }
747        }
748        State::Closed(Closed { reason }) => {
749            // We remove the socket from the socketmap and cancel the timers
750            // regardless of the socket being defunct or not. The justification
751            // is that CLOSED is a synthetic state and it means no connection
752            // exists, thus it should not exist in the demuxer.
753            //
754            // If the socket was already in the closed state we can assume it's
755            // no longer in the demux.
756            socket::handle_newly_closed(
757                core_ctx,
758                bindings_ctx,
759                newly_closed,
760                &demux_id,
761                &conn_addr,
762                timer,
763            );
764            if let Some(accept_queue) = accept_queue {
765                accept_queue.remove(&conn_id);
766                *defunct = true;
767            }
768            if *defunct {
769                // If the client has promised to not touch the socket again,
770                // we can destroy the socket finally.
771                return ConnectionIncomingSegmentDisposition::Destroy;
772            }
773            let _: bool = handshake_status.update_if_pending(match reason {
774                None => HandshakeStatus::Completed { reported: accept_queue.is_some() },
775                Some(_err) => HandshakeStatus::Aborted,
776            });
777        }
778    }
779
780    if let Some(seg) = reply {
781        socket::send_tcp_segment(
782            core_ctx,
783            bindings_ctx,
784            Some(conn_id),
785            Some(&ip_sock),
786            conn_addr.ip,
787            seg.into_empty(),
788            &socket_options.ip_options,
789        );
790    }
791
792    // Send any enqueued data, if there is any.
793    socket::do_send_inner_and_then_handle_newly_closed(
794        conn_id,
795        &demux_id,
796        socket_options,
797        conn,
798        DoSendLimit::MultipleSegments,
799        &conn_addr,
800        timer,
801        core_ctx,
802        bindings_ctx,
803    );
804
805    // Enqueue the connection to the associated listener
806    // socket's accept queue.
807    if let Some(passive_open) = passive_open {
808        let accept_queue = conn.accept_queue.as_ref().expect("no accept queue but passive open");
809        accept_queue.notify_ready(conn_id, passive_open);
810    }
811
812    // We found a valid connection for the segment.
813    ConnectionIncomingSegmentDisposition::FoundSocket
814}
815
816/// Responds to the disposition returned by [`try_handle_incoming_for_listener`].
817///
818/// Returns true if we have found the right socket and there is no need to
819/// continue the iteration for finding the next-best candidate.
820fn try_handle_listener_incoming_disposition<SockI, WireI, CC, BC, Addr>(
821    core_ctx: &mut CC,
822    bindings_ctx: &mut BC,
823    disposition: ListenerIncomingSegmentDisposition<PrimaryRc<SockI, CC::WeakDeviceId, BC>>,
824    demux_listener_id: &WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
825    tw_reuse: &mut Option<(WireI::DemuxSocketId<CC::WeakDeviceId, BC>, Addr)>,
826    addrs_to_search: &mut AddrVecIter<WireI, CC::WeakDeviceId, TcpPortSpec>,
827    conn_addr: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
828    incoming_device: &CC::DeviceId,
829) -> bool
830where
831    SockI: DualStackIpExt,
832    WireI: DualStackIpExt,
833    CC: TcpContext<SockI, BC> + TcpContext<WireI, BC> + TcpContext<WireI::OtherVersion, BC>,
834    BC: TcpBindingsContext<CC::DeviceId>,
835{
836    match disposition {
837        ListenerIncomingSegmentDisposition::FoundSocket => true,
838        ListenerIncomingSegmentDisposition::Filtered => true,
839        ListenerIncomingSegmentDisposition::ConflictingConnection => {
840            // We're about to rewind the lookup. If we got a
841            // conflicting connection it means tw_reuse has been
842            // removed from the demux state and we need to destroy
843            // it.
844            if let Some((tw_reuse, _)) = tw_reuse.take() {
845                WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, tw_reuse);
846            }
847
848            // Reset the address vector iterator and go again, a
849            // conflicting connection was found.
850            *addrs_to_search = AddrVecIter::<WireI, CC::WeakDeviceId, TcpPortSpec>::with_device(
851                conn_addr.into(),
852                incoming_device.downgrade(),
853            );
854            false
855        }
856        ListenerIncomingSegmentDisposition::NoMatchingSocket => false,
857        ListenerIncomingSegmentDisposition::NewConnection(primary) => {
858            // If we have a new connection, we need to add it to the
859            // set of all sockets.
860
861            // First things first, if we got here then tw_reuse is
862            // gone so we need to destroy it.
863            if let Some((tw_reuse, _)) = tw_reuse.take() {
864                WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, tw_reuse);
865            }
866
867            // Now put the new connection into the socket map.
868            //
869            // Note that there's a possible subtle race here where
870            // another thread could have already operated further on
871            // this connection and marked it for destruction which
872            // puts the entry in the DOA state, if we see that we
873            // must immediately destroy the socket after having put
874            // it in the map.
875            let id = TcpSocketId(PrimaryRc::clone_strong(&primary));
876            let to_destroy = core_ctx.with_all_sockets_mut(move |all_sockets| {
877                let insert_entry = TcpSocketSetEntry::Primary(primary);
878                match all_sockets.entry(id) {
879                    hash_map::Entry::Vacant(v) => {
880                        let _: &mut _ = v.insert(insert_entry);
881                        None
882                    }
883                    hash_map::Entry::Occupied(mut o) => {
884                        // We're holding on to the primary ref, the
885                        // only possible state here should be a DOA
886                        // entry.
887                        assert_matches!(
888                            core::mem::replace(o.get_mut(), insert_entry),
889                            TcpSocketSetEntry::DeadOnArrival
890                        );
891                        Some(o.key().clone())
892                    }
893                }
894            });
895            // NB: we're releasing and reaquiring the
896            // all_sockets_mut lock here for the convenience of not
897            // needing different versions of `destroy_socket`. This
898            // should be fine because the race this is solving
899            // should not be common. If we have correct thread
900            // attribution per flow it should effectively become
901            // impossible so we go for code simplicity here.
902            if let Some(to_destroy) = to_destroy {
903                socket::destroy_socket(core_ctx, bindings_ctx, to_destroy);
904            }
905            counters::increment_counter_for_demux_id::<WireI, _, _, _, _>(
906                core_ctx,
907                demux_listener_id,
908                |c| &c.passive_connection_openings,
909            );
910            true
911        }
912    }
913}
914
915/// Tries to handle an incoming segment by passing it to a listening socket.
916///
917/// Returns `FoundSocket` if the segment was handled, otherwise `NoMatchingSocket`.
918fn try_handle_incoming_for_listener<SockI, WireI, CC, BC, DC, H>(
919    core_ctx: &mut DC,
920    bindings_ctx: &mut BC,
921    listener_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
922    isn: &IsnGenerator<BC::Instant>,
923    timestamp_offset: &TimestampOffsetGenerator<BC::Instant>,
924    socket_state: &mut TcpSocketState<SockI, CC::WeakDeviceId, BC>,
925    header_info: &H,
926    incoming: &VerifiedTcpSegment<'_>,
927    incoming_addrs: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
928    incoming_device: &CC::DeviceId,
929    tw_reuse: &mut Option<(
930        WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
931        ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
932    )>,
933    make_connection: impl FnOnce(
934        Connection<SockI, WireI, CC::WeakDeviceId, BC>,
935        ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
936    ) -> SockI::ConnectionAndAddr<CC::WeakDeviceId, BC>,
937    make_demux_id: impl Fn(
938        TcpSocketId<SockI, CC::WeakDeviceId, BC>,
939    ) -> WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
940    marks: &Marks,
941) -> ListenerIncomingSegmentDisposition<PrimaryRc<SockI, CC::WeakDeviceId, BC>>
942where
943    SockI: DualStackIpExt,
944    WireI: DualStackIpExt,
945    BC: TcpBindingsContext<CC::DeviceId>
946        + BufferProvider<
947            BC::ReceiveBuffer,
948            BC::SendBuffer,
949            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
950            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
951        >,
952    CC: TcpContext<SockI, BC>,
953    DC: TransportIpContext<WireI, BC, DeviceId = CC::DeviceId, WeakDeviceId = CC::WeakDeviceId>
954        + DeviceIpSocketHandler<WireI, BC>
955        + TcpDemuxContext<WireI, CC::WeakDeviceId, BC>
956        + TcpCounterContext<SockI, CC::WeakDeviceId, BC>
957        + CoreTxMetadataContext<TcpSocketTxMetadata<SockI, CC::WeakDeviceId, BC>, BC>,
958    H: IpHeaderInfo<WireI>,
959{
960    let Listener { addr: listener_addr, accept_queue, backlog, buffer_sizes } =
961        match &socket_state.socket_state {
962            TcpSocketStateInner::Bound(_) => {
963                // If the socket is only bound, but not listening.
964                return ListenerIncomingSegmentDisposition::NoMatchingSocket;
965            }
966            TcpSocketStateInner::Listener(listener) => listener,
967            _ => panic!("unexpected socket state: {:?}", socket_state.socket_state),
968        };
969
970    let ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) } =
971        incoming_addrs;
972
973    match run_socket_ingress_filter(
974        bindings_ctx,
975        incoming_device,
976        listener_id.socket_cookie(),
977        &socket_state.socket_options,
978        header_info,
979        incoming.tcp_segment(),
980    ) {
981        SocketIngressFilterResult::Accept => (),
982        SocketIngressFilterResult::Drop => {
983            return ListenerIncomingSegmentDisposition::Filtered;
984        }
985    }
986
987    // Note that this checks happens at the very beginning, before we try to
988    // reuse the connection in TIME-WAIT, this is because we need to store the
989    // reused connection in the accept queue so we have to respect its limit.
990    if accept_queue.len() == backlog.get() {
991        core_ctx.increment_both(listener_id, |counters| &counters.listener_queue_overflow);
992        core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
993        debug!("incoming SYN dropped because of the full backlog of the listener");
994        return ListenerIncomingSegmentDisposition::FoundSocket;
995    }
996
997    // Ensure that if the remote address requires a zone, we propagate that to
998    // the address for the connected socket.
999    let bound_device = listener_addr.as_ref().clone();
1000    let bound_device = if remote_ip.as_ref().must_have_zone() {
1001        Some(bound_device.map_or(EitherDeviceId::Strong(incoming_device), EitherDeviceId::Weak))
1002    } else {
1003        bound_device.map(EitherDeviceId::Weak)
1004    };
1005
1006    let ip_options = TcpIpSockOptions { marks: *marks, ..socket_state.socket_options.ip_options };
1007    let socket_options = SocketOptions { ip_options, ..socket_state.socket_options };
1008
1009    let bound_device = bound_device.as_ref().map(|d| d.as_ref());
1010    let ip_sock = match core_ctx.new_ip_socket(
1011        bindings_ctx,
1012        IpSocketArgs {
1013            device: bound_device,
1014            local_ip: IpDeviceAddr::new_from_socket_ip_addr(local_ip),
1015            remote_ip,
1016            proto: IpProto::Tcp.into(),
1017            options: &ip_options,
1018        },
1019    ) {
1020        Ok(ip_sock) => ip_sock,
1021        err @ Err(IpSockCreationError::Route(_)) => {
1022            core_ctx.increment_both(listener_id, |counters| &counters.passive_open_no_route_errors);
1023            core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1024            debug!("cannot construct an ip socket to the SYN originator: {:?}, ignoring", err);
1025            return ListenerIncomingSegmentDisposition::NoMatchingSocket;
1026        }
1027    };
1028
1029    let isn = isn.generate(
1030        bindings_ctx.now(),
1031        (ip_sock.local_ip().clone().into(), local_port),
1032        (ip_sock.remote_ip().clone(), remote_port),
1033    );
1034    let timestamp_offset = timestamp_offset.generate::<SocketIpAddr<WireI::Addr>, NonZeroU16>(
1035        bindings_ctx.now(),
1036        (ip_sock.local_ip().clone().into(), local_port),
1037        (ip_sock.remote_ip().clone(), remote_port),
1038    );
1039    let device_mms = match core_ctx.get_mms(bindings_ctx, &ip_sock, &socket_options.ip_options) {
1040        Ok(mms) => mms,
1041        Err(err) => {
1042            // If we cannot find a device or the device's MTU is too small,
1043            // there isn't much we can do here since sending a RST back is
1044            // impossible, we just need to silent drop the segment.
1045            error!("Cannot find a device with large enough MTU for the connection");
1046            core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1047            match err {
1048                MmsError::NoDevice(_) | MmsError::MTUTooSmall(_) => {
1049                    return ListenerIncomingSegmentDisposition::FoundSocket;
1050                }
1051            }
1052        }
1053    };
1054    let Some(device_mss) = Mss::from_mms(device_mms) else {
1055        return ListenerIncomingSegmentDisposition::FoundSocket;
1056    };
1057
1058    let mut state = State::Listen(Closed::<Initial>::listen(
1059        isn,
1060        timestamp_offset,
1061        buffer_sizes.clone(),
1062        device_mss,
1063        Mss::default::<WireI>(),
1064        socket_options.user_timeout,
1065    ));
1066
1067    // Prepare a reply to be sent out.
1068    //
1069    // We might end up discarding the reply in case we can't instantiate this
1070    // new connection.
1071    let result = state.on_segment::<_, BC>(
1072        // NB: This is a bit of a lie, we're passing the listener ID to process
1073        // the first segment because we don't have an ID allocated yet. This is
1074        // okay because the state machine ID is only for debugging purposes.
1075        &listener_id.either(),
1076        &TcpCountersRefs::from_ctx(core_ctx, listener_id),
1077        incoming.into(),
1078        bindings_ctx.now(),
1079        &SocketOptions::default(),
1080        false, /* defunct */
1081    );
1082    let reply = assert_matches!(
1083        result,
1084        (reply, None, /* data_acked */ _, NewlyClosed::No /* can't become closed */) => reply
1085    );
1086
1087    let result = if matches!(state, State::SynRcvd(_)) {
1088        let poll_send_at = state.poll_send_at().expect("no retrans timer");
1089        let bound_device = ip_sock.device().cloned();
1090
1091        let addr = ConnAddr {
1092            ip: ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) },
1093            device: bound_device,
1094        };
1095
1096        let new_socket = core_ctx.with_demux_mut(|DemuxState { socketmap, .. }| {
1097            // If we're reusing an entry, remove it from the demux before
1098            // proceeding.
1099            //
1100            // We could just reuse the old allocation for the new connection but
1101            // because of the restrictions on the socket map data structure (for
1102            // good reasons), we can't update the sharing info unconditionally.
1103            // So here we just remove the old connection and create a new one.
1104            // Also this approach has the benefit of not accidentally persisting
1105            // the old state that we don't want.
1106            if let Some((tw_reuse, conn_addr)) = tw_reuse {
1107                match socketmap.conns_mut().remove(tw_reuse, &conn_addr) {
1108                    Ok(()) => {
1109                        // NB: We're removing the tw_reuse connection from the
1110                        // demux here, but not canceling its timer. The timer is
1111                        // canceled via drop when we destroy the socket. Special
1112                        // care is taken when handling timers in the time wait
1113                        // state to account for this.
1114                    }
1115                    Err(NotFoundError) => {
1116                        // We could lose a race trying to reuse the tw_reuse
1117                        // socket, so we just accept the loss and be happy that
1118                        // the conn_addr we want to use is free.
1119                    }
1120                }
1121            }
1122
1123            // Try to create and add the new socket to the demux.
1124            let accept_queue_clone = accept_queue.clone();
1125            let ip_sock = ip_sock.clone();
1126            let bindings_ctx_moved = &mut *bindings_ctx;
1127            let sharing = socket_state.sharing;
1128            match socketmap.conns_mut().try_insert_with(addr, sharing, move |addr, sharing| {
1129                let conn = make_connection(
1130                    Connection {
1131                        accept_queue: Some(accept_queue_clone),
1132                        state,
1133                        ip_sock,
1134                        defunct: false,
1135                        soft_error: None,
1136                        handshake_status: HandshakeStatus::Pending,
1137                    },
1138                    addr,
1139                );
1140
1141                let (id, primary) = TcpSocketId::new_cyclic(
1142                    |weak| {
1143                        let mut timer = CC::new_timer(bindings_ctx_moved, weak);
1144                        // Schedule the timer here because we can't acquire the lock
1145                        // later. This only runs when inserting into the demux
1146                        // succeeds so it's okay.
1147                        assert_eq!(
1148                            bindings_ctx_moved.schedule_timer_instant(poll_send_at, &mut timer),
1149                            None
1150                        );
1151                        TcpSocketStateInner::Connected { conn, timer }
1152                    },
1153                    sharing,
1154                    socket_options,
1155                );
1156                (make_demux_id(id.clone()), (primary, id))
1157            }) {
1158                Ok((_entry, (primary, id))) => {
1159                    // Make sure the new socket is in the pending accept queue
1160                    // before we release the demux lock.
1161                    accept_queue.push_pending(id);
1162                    Some(primary)
1163                }
1164                Err((e, _sharing_state)) => {
1165                    // The only error we accept here is if the entry exists
1166                    // fully, any indirect conflicts are unexpected because we
1167                    // know the listener is still alive and installed in the
1168                    // demux.
1169                    assert_matches!(e, InsertError::Exists);
1170                    // If we fail to insert it means we lost a race and this
1171                    // packet is destined to a connection that is already
1172                    // established. In that case we should tell the demux code
1173                    // to retry demuxing it all over again.
1174                    None
1175                }
1176            }
1177        });
1178
1179        match new_socket {
1180            Some(new_socket) => ListenerIncomingSegmentDisposition::NewConnection(new_socket),
1181            None => {
1182                // We didn't create a new connection, short circuit early and
1183                // don't send out the pending segment.
1184                core_ctx
1185                    .increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1186                return ListenerIncomingSegmentDisposition::ConflictingConnection;
1187            }
1188        }
1189    } else {
1190        // We found a valid listener for the segment even if the connection
1191        // state is not a newly pending connection.
1192        ListenerIncomingSegmentDisposition::FoundSocket
1193    };
1194
1195    // We can send a reply now if we got here.
1196    if let Some(seg) = reply {
1197        socket::send_tcp_segment(
1198            core_ctx,
1199            bindings_ctx,
1200            Some(&listener_id),
1201            Some(&ip_sock),
1202            incoming_addrs,
1203            seg.into_empty(),
1204            &socket_options.ip_options,
1205        );
1206    }
1207
1208    result
1209}
1210
1211pub(super) fn tcp_serialize_segment<'a, I, P>(
1212    header: &'a SegmentHeader,
1213    data: P,
1214    conn_addr: ConnIpAddr<I::Addr, NonZeroU16, NonZeroU16>,
1215) -> impl TransportPacketSerializer<I, Buffer = EmptyBuf> + Debug + 'a
1216where
1217    I: FilterIpExt,
1218    P: InnerPacketBuilder + Debug + Payload + 'a,
1219{
1220    let SegmentHeader { seq, ack, wnd, control, options, push } = header;
1221    let ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) } = conn_addr;
1222    let mut builder = TcpSegmentBuilder::new(
1223        local_ip.addr(),
1224        remote_ip.addr(),
1225        local_port,
1226        remote_port,
1227        (*seq).into(),
1228        ack.map(Into::into),
1229        u16::from(*wnd),
1230    );
1231    builder.psh(*push);
1232    match control {
1233        None => {}
1234        Some(Control::SYN) => builder.syn(true),
1235        Some(Control::FIN) => builder.fin(true),
1236        Some(Control::RST) => builder.rst(true),
1237    }
1238    TcpSegmentBuilderWithOptions::new(builder, options.builder())
1239        .unwrap_or_else(|TcpOptionsTooLongError| {
1240            panic!("Too many TCP options");
1241        })
1242        .wrap_body(data.into_serializer())
1243}
1244
1245fn run_socket_ingress_filter<I, BC, D>(
1246    bindings_ctx: &BC,
1247    incoming_device: &D,
1248    socket_cookie: SocketCookie,
1249    socket_options: &SocketOptions,
1250    header_info: &impl IpHeaderInfo<I>,
1251    tcp_segment: &TcpSegment<&'_ [u8]>,
1252) -> SocketIngressFilterResult
1253where
1254    I: Ip,
1255    BC: TcpBindingsContext<D>,
1256    D: StrongDeviceIdentifier,
1257{
1258    let [ip_prefix, ip_options] = header_info.as_bytes();
1259    let [tcp_prefix, tcp_options, data] = tcp_segment.as_bytes();
1260    let mut slices = [ip_prefix, ip_options, tcp_prefix, tcp_options, data];
1261    let data = FragmentedByteSlice::new(&mut slices);
1262
1263    bindings_ctx.socket_ops_filter().on_ingress(
1264        I::VERSION,
1265        data,
1266        incoming_device,
1267        socket_cookie,
1268        &socket_options.ip_options.marks,
1269    )
1270}
1271
1272#[cfg(test)]
1273mod test {
1274    use ip_test_macro::ip_test;
1275    use netstack3_base::{
1276        HandshakeOptions, Options, ResetOptions, SackBlocks, SegmentOptions, UnscaledWindowSize,
1277    };
1278    use packet::{ParseBuffer as _, Serializer as _};
1279    use packet_formats::tcp::options::TcpOptions as _;
1280    use test_case::test_case;
1281
1282    use super::*;
1283
1284    trait TestIpExt: netstack3_base::testutil::TestIpExt + FilterIpExt {}
1285    impl<T> TestIpExt for T where T: netstack3_base::testutil::TestIpExt + FilterIpExt {}
1286
1287    const SEQ: SeqNum = SeqNum::new(12345);
1288    const ACK: SeqNum = SeqNum::new(67890);
1289    const FAKE_DATA: &'static [u8] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 0];
1290
1291    #[ip_test(I)]
1292    #[test_case(
1293        Segment::syn(SEQ, UnscaledWindowSize::from(u16::MAX),
1294        HandshakeOptions::default()), &[]
1295        ; "syn")]
1296    #[test_case(
1297        Segment::syn(SEQ, UnscaledWindowSize::from(u16::MAX),
1298        HandshakeOptions {
1299            mss: Some(Mss::new(1440).unwrap()),
1300            ..Default::default() }), &[]
1301            ; "syn with mss")]
1302    #[test_case(
1303        Segment::ack(SEQ, ACK, UnscaledWindowSize::from(u16::MAX), SegmentOptions::default()),
1304        &[]; "ack")]
1305    #[test_case(Segment::with_fake_data(SEQ, ACK, FAKE_DATA), FAKE_DATA; "data")]
1306    #[test_case(Segment::new_assert_no_discard(SegmentHeader {
1307            seq: SEQ,
1308            ack: Some(ACK),
1309            push: true,
1310            wnd: UnscaledWindowSize::from(u16::MAX),
1311            ..Default::default()
1312        },
1313        FAKE_DATA
1314    ), FAKE_DATA; "push")]
1315    fn tcp_serialize_segment<I: TestIpExt>(segment: Segment<&[u8]>, expected_body: &[u8]) {
1316        const SOURCE_PORT: NonZeroU16 = NonZeroU16::new(1111).unwrap();
1317        const DEST_PORT: NonZeroU16 = NonZeroU16::new(2222).unwrap();
1318
1319        let (header, data) = segment.into_parts();
1320        let serializer = super::tcp_serialize_segment::<I, _>(
1321            &header,
1322            data,
1323            ConnIpAddr {
1324                local: (SocketIpAddr::try_from(I::TEST_ADDRS.local_ip).unwrap(), SOURCE_PORT),
1325                remote: (SocketIpAddr::try_from(I::TEST_ADDRS.remote_ip).unwrap(), DEST_PORT),
1326            },
1327        );
1328
1329        let mut serialized = serializer.serialize_vec_outer().unwrap().unwrap_b();
1330        let parsed_segment = serialized
1331            .parse_with::<_, TcpSegment<_>>(TcpParseArgs::new(
1332                *I::TEST_ADDRS.remote_ip,
1333                *I::TEST_ADDRS.local_ip,
1334            ))
1335            .expect("is valid segment");
1336
1337        assert_eq!(parsed_segment.src_port(), SOURCE_PORT);
1338        assert_eq!(parsed_segment.dst_port(), DEST_PORT);
1339        assert_eq!(parsed_segment.seq_num(), u32::from(SEQ));
1340        assert_eq!(parsed_segment.psh(), header.push);
1341        assert_eq!(
1342            UnscaledWindowSize::from(parsed_segment.window_size()),
1343            UnscaledWindowSize::from(u16::MAX)
1344        );
1345
1346        let (mss, window_scale, sack_permitted, sack_blocks, timestamp) = match header.options {
1347            Options::Handshake(HandshakeOptions {
1348                mss,
1349                window_scale,
1350                sack_permitted,
1351                timestamp,
1352            }) => (mss, window_scale, sack_permitted, SackBlocks::EMPTY, timestamp),
1353            Options::Segment(SegmentOptions { timestamp, sack_blocks }) => {
1354                (None, None, false, sack_blocks, timestamp)
1355            }
1356            Options::Reset(ResetOptions { timestamp }) => {
1357                (None, None, false, SackBlocks::EMPTY, timestamp)
1358            }
1359        };
1360        assert_eq!(mss.map(|mss| mss.get()), parsed_segment.options().mss());
1361        assert_eq!(window_scale.map(|ws| ws.get()), parsed_segment.options().window_scale());
1362        assert_eq!(sack_permitted, parsed_segment.options().sack_permitted());
1363        assert_eq!(sack_blocks.as_slice(), parsed_segment.options().sack_blocks());
1364        assert_eq!(
1365            timestamp.as_ref().map(Into::into).as_ref(),
1366            parsed_segment.options().timestamp()
1367        );
1368
1369        assert_eq!(parsed_segment.into_body(), expected_body);
1370    }
1371}