Skip to main content

netstack3_tcp/socket/
demux.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Defines the entry point of TCP packets, by directing them into the correct
6//! state machine.
7
8use core::fmt::Debug;
9use core::num::NonZeroU16;
10
11use assert_matches::assert_matches;
12use log::{debug, error, warn};
13use net_types::ip::Ip;
14use net_types::{MulticastAddress as _, SpecifiedAddr, Witness as _};
15use netstack3_base::socket::{
16    AddrIsMappedError, AddrVec, AddrVecIter, ConnAddr, ConnIpAddr, InsertError, IpAddrVec,
17    ListenerAddr, ListenerIpAddr, SocketIpAddr, SocketIpAddrExt as _,
18};
19use netstack3_base::{
20    BidirectionalConverter as _, Control, CounterContext, CtxPair, EitherDeviceId, IpDeviceAddr,
21    Marks, Mss, NotFoundError, Payload, Segment, SegmentHeader, SeqNum, StrongDeviceIdentifier,
22    VerifiedTcpSegment, WeakDeviceIdentifier,
23};
24use netstack3_filter::{
25    FilterIpExt, SocketIngressFilterResult, SocketOpsFilter, TransportPacketSerializer,
26};
27use netstack3_hashmap::hash_map;
28use netstack3_ip::socket::{IpSockCreationError, IpSocketArgs, MmsError};
29use netstack3_ip::{
30    IpHeaderInfo, IpTransportContext, LocalDeliveryPacketInfo, ReceiveIpPacketMeta,
31    TransportIpContext,
32};
33use netstack3_trace::trace_duration;
34use packet::{
35    BufferMut, BufferView as _, EmptyBuf, FragmentedByteSlice, InnerPacketBuilder,
36    NestablePacketBuilder as _, ParseBuffer,
37};
38use packet_formats::error::ParseError;
39use packet_formats::ip::IpProto;
40use packet_formats::tcp::{
41    TcpFlowAndSeqNum, TcpOptionsTooLongError, TcpParseArgs, TcpSegment, TcpSegmentBuilder,
42    TcpSegmentBuilderWithOptions, TcpSegmentRaw,
43};
44
45use crate::internal::base::{BufferSizes, ConnectionError, SocketOptions, TcpIpSockOptions};
46use crate::internal::counters::{
47    self, TcpCounterContext, TcpCountersRefs, TcpCountersWithoutSocket,
48};
49use crate::internal::socket::generators::{IsnGenerator, TimestampOffsetGenerator};
50use crate::internal::socket::{
51    self, AsThisStack as _, Connection, CoreTxMetadataContext, DemuxState, DeviceIpSocketHandler,
52    DoSendLimit, DualStackBaseIpExt, DualStackDemuxIdConverter as _, DualStackIpExt, EitherStack,
53    HandshakeStatus, Listener, ListenerAddrState, MaybeDualStack, PrimaryRc, TcpApi,
54    TcpBindingsContext, TcpBindingsTypes, TcpContext, TcpDemuxContext, TcpDualStackContext,
55    TcpIpTransportContext, TcpPortSpec, TcpSocketId, TcpSocketSetEntry, TcpSocketState,
56    TcpSocketStateInner, TcpSocketTxMetadata,
57};
58use crate::internal::state::{
59    BufferProvider, Closed, DataAcked, Initial, NewlyClosed, State, TimeWait,
60};
61
62impl<BT: TcpBindingsTypes> BufferProvider<BT::ReceiveBuffer, BT::SendBuffer> for BT {
63    type ActiveOpen = BT::ListenerNotifierOrProvidedBuffers;
64
65    type PassiveOpen = BT::ReturnedBuffers;
66
67    fn new_passive_open_buffers(
68        buffer_sizes: BufferSizes,
69    ) -> (BT::ReceiveBuffer, BT::SendBuffer, Self::PassiveOpen) {
70        BT::new_passive_open_buffers(buffer_sizes)
71    }
72}
73
74/// Alias for a SocketId that can reference either V4 or V6 socket.
75pub type DualStackTcpSocketId<I, D, BT> = <I as DualStackBaseIpExt>::DemuxSocketId<D, BT>;
76
77impl<I, BC, CC> IpTransportContext<I, BC, CC> for TcpIpTransportContext
78where
79    I: DualStackIpExt,
80    BC: TcpBindingsContext<CC::DeviceId>
81        + BufferProvider<
82            BC::ReceiveBuffer,
83            BC::SendBuffer,
84            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
85            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
86        >,
87    CC: TcpContext<I, BC> + TcpContext<I::OtherVersion, BC>,
88{
89    type EarlyDemuxSocket = DualStackTcpSocketId<I, CC::WeakDeviceId, BC>;
90
91    fn early_demux<B: ParseBuffer>(
92        core_ctx: &mut CC,
93        device: &CC::DeviceId,
94        src_ip: I::Addr,
95        dst_ip: I::Addr,
96        buffer: B,
97    ) -> Option<Self::EarlyDemuxSocket> {
98        early_demux_ip_packet::<I, _, _, _>(core_ctx, device, src_ip, dst_ip, buffer)
99    }
100
101    fn receive_icmp_error(
102        core_ctx: &mut CC,
103        bindings_ctx: &mut BC,
104        _device: &CC::DeviceId,
105        original_src_ip: Option<SpecifiedAddr<I::Addr>>,
106        original_dst_ip: SpecifiedAddr<I::Addr>,
107        mut original_body: &[u8],
108        err: I::ErrorCode,
109    ) {
110        let mut buffer = &mut original_body;
111        let Some(flow_and_seqnum) = buffer.take_obj_front::<TcpFlowAndSeqNum>() else {
112            error!("received an ICMP error but its body is less than 8 bytes");
113            return;
114        };
115
116        let Some(original_src_ip) = original_src_ip else { return };
117        let Some(original_src_port) = NonZeroU16::new(flow_and_seqnum.src_port()) else { return };
118        let Some(original_dst_port) = NonZeroU16::new(flow_and_seqnum.dst_port()) else { return };
119        let original_seqnum = SeqNum::new(flow_and_seqnum.sequence_num());
120
121        TcpApi::<I, _>::new(CtxPair { core_ctx, bindings_ctx }).on_icmp_error(
122            original_src_ip,
123            original_dst_ip,
124            original_src_port,
125            original_dst_port,
126            original_seqnum,
127            err.into(),
128        );
129    }
130
131    fn receive_ip_packet<B: BufferMut, H: IpHeaderInfo<I>>(
132        core_ctx: &mut CC,
133        bindings_ctx: &mut BC,
134        device: &CC::DeviceId,
135        remote_ip: I::RecvSrcAddr,
136        local_ip: SpecifiedAddr<I::Addr>,
137        mut buffer: B,
138        info: &mut LocalDeliveryPacketInfo<I, H>,
139        early_demux_socket: Option<Self::EarlyDemuxSocket>,
140    ) -> Result<(), (B, I::IcmpError)> {
141        let LocalDeliveryPacketInfo { meta, header_info, marks } = info;
142        let ReceiveIpPacketMeta { broadcast, transparent_override, parsing_context } = meta;
143        if let Some(delivery) = transparent_override {
144            warn!(
145                "TODO(https://fxbug.dev/337009139): transparent proxy not supported for TCP \
146                sockets; will not override dispatch to perform local delivery to {delivery:?}"
147            );
148        }
149
150        // Per RFC 9293, Section 3.9.2.3 (referencing RFC 1122):
151        //   A TCP implementation MUST silently discard an incoming SYN
152        //   segment that is addressed to a broadcast or multicast address
153        //   [(MUST-57)].
154        //
155        // and
156        //
157        //   ... this guidance is applicable to all incoming segments, not just
158        //   SYNs ...
159        if broadcast.is_some() {
160            CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
161                .invalid_ip_addrs_received
162                .increment();
163            debug!("tcp: dropping broadcast TCP packet");
164            return Ok(());
165        }
166        if local_ip.is_multicast() {
167            CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
168                .invalid_ip_addrs_received
169                .increment();
170            debug!("tcp: dropping multicast TCP packet");
171            return Ok(());
172        }
173
174        let remote_ip = match SpecifiedAddr::new(remote_ip.into_addr()) {
175            None => {
176                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
177                    .invalid_ip_addrs_received
178                    .increment();
179                debug!("tcp: source address unspecified, dropping the packet");
180                return Ok(());
181            }
182            Some(src_ip) => src_ip,
183        };
184        let remote_ip: SocketIpAddr<_> = match remote_ip.try_into() {
185            Ok(remote_ip) => remote_ip,
186            Err(AddrIsMappedError {}) => {
187                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
188                    .invalid_ip_addrs_received
189                    .increment();
190                debug!("tcp: source address is mapped (ipv4-mapped-ipv6), dropping the packet");
191                return Ok(());
192            }
193        };
194        let local_ip: SocketIpAddr<_> = match local_ip.try_into() {
195            Ok(local_ip) => local_ip,
196            Err(AddrIsMappedError {}) => {
197                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
198                    .invalid_ip_addrs_received
199                    .increment();
200                debug!("tcp: local address is mapped (ipv4-mapped-ipv6), dropping the packet");
201                return Ok(());
202            }
203        };
204        let packet = match buffer.parse_with::<_, TcpSegment<_>>(TcpParseArgs::with_context(
205            remote_ip.addr(),
206            local_ip.addr(),
207            parsing_context,
208        )) {
209            Ok(packet) => packet,
210            Err(err) => {
211                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
212                    .invalid_segments_received
213                    .increment();
214                debug!("tcp: failed parsing incoming packet {:?}", err);
215                match err {
216                    ParseError::Checksum => {
217                        CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
218                            .checksum_errors
219                            .increment();
220                    }
221                    ParseError::NotSupported | ParseError::NotExpected | ParseError::Format => {}
222                }
223                return Ok(());
224            }
225        };
226        let local_port = packet.dst_port();
227        let remote_port = packet.src_port();
228        let incoming = match VerifiedTcpSegment::try_from(packet) {
229            Ok(segment) => segment,
230            Err(err) => {
231                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
232                    .invalid_segments_received
233                    .increment();
234                debug!("tcp: malformed segment {:?}", err);
235                return Ok(());
236            }
237        };
238        let conn_addr =
239            ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) };
240
241        CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
242            .valid_segments_received
243            .increment();
244        handle_incoming_packet::<I, _, _, _>(
245            core_ctx,
246            bindings_ctx,
247            conn_addr,
248            device,
249            header_info,
250            &incoming,
251            marks,
252            early_demux_socket,
253        );
254        Ok(())
255    }
256}
257
258fn early_demux_ip_packet<I, BC, CC, B>(
259    core_ctx: &mut CC,
260    device: &CC::DeviceId,
261    src_ip: I::Addr,
262    dst_ip: I::Addr,
263    mut buffer: B,
264) -> Option<DualStackTcpSocketId<I, CC::WeakDeviceId, BC>>
265where
266    I: DualStackIpExt,
267    BC: TcpBindingsContext<CC::DeviceId>,
268    CC: TcpContext<I, BC> + TcpContext<I::OtherVersion, BC>,
269    B: ParseBuffer,
270{
271    let Ok(packet) = buffer.parse_with::<_, TcpSegmentRaw<_>>(()) else {
272        // If we fail to parse the packet then just return None. Invalid
273        // packets are handled later.
274        return None;
275    };
276
277    let src_ip = SocketIpAddr::new(src_ip)?;
278    let dst_ip = SocketIpAddr::new(dst_ip)?;
279    let (src_port, dst_port) = packet.flow_header().src_dst();
280    let src_port = NonZeroU16::new(src_port)?;
281    let dst_port = NonZeroU16::new(dst_port)?;
282    let device = device.downgrade();
283
284    core_ctx.with_demux(|demux: &DemuxState<I, _, _>| {
285        demux
286            .socketmap
287            .lookup_connected((src_ip, src_port), (dst_ip, dst_port), device)
288            .map(|entry| entry.id())
289    })
290}
291
292fn handle_incoming_packet<WireI, BC, CC, H>(
293    core_ctx: &mut CC,
294    bindings_ctx: &mut BC,
295    conn_addr: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
296    incoming_device: &CC::DeviceId,
297    header_info: &H,
298    incoming: &VerifiedTcpSegment<'_>,
299    marks: &Marks,
300    mut early_demux_socket: Option<DualStackTcpSocketId<WireI, CC::WeakDeviceId, BC>>,
301) where
302    WireI: DualStackIpExt,
303    BC: TcpBindingsContext<CC::DeviceId>
304        + BufferProvider<
305            BC::ReceiveBuffer,
306            BC::SendBuffer,
307            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
308            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
309        >,
310    CC: TcpContext<WireI, BC> + TcpContext<WireI::OtherVersion, BC>,
311    H: IpHeaderInfo<WireI>,
312{
313    trace_duration!("tcp::handle_incoming_packet");
314    let mut tw_reuse = None;
315
316    // If we have an early demux socket, then we don't need to look up
317    // connected sockets again. We may still need to lookup listener sockets
318    // when the socket we found a in the timed wait state.
319    let addr: IpAddrVec<WireI, _> = if early_demux_socket.is_some() {
320        let (ip, port) = conn_addr.local;
321        IpAddrVec::new_listener(ip, port)
322    } else {
323        conn_addr.into()
324    };
325    let mut addrs_to_search = AddrVecIter::<WireI, CC::WeakDeviceId, TcpPortSpec>::with_device(
326        addr,
327        incoming_device.downgrade(),
328    );
329
330    enum FoundSocket<S> {
331        // Typically holds the demux ID of the found socket, but may hold
332        // `None` if the found socket was destroyed as a result of the segment.
333        Yes(Option<S>),
334        No,
335    }
336    let found_socket = loop {
337        let sock = if let Some(early_demux_socket) = early_demux_socket.take() {
338            let device = match WireI::as_dual_stack_ip_socket(&early_demux_socket) {
339                EitherStack::ThisStack(conn_id) => conn_id.get_bound_device(core_ctx),
340                EitherStack::OtherStack(conn_id) => conn_id.get_bound_device(core_ctx),
341            };
342            let conn_addr = ConnAddr { ip: conn_addr, device };
343            Some(SocketLookupResult::Connection(early_demux_socket, conn_addr))
344        } else {
345            core_ctx.with_demux(|demux| lookup_socket::<WireI, CC, BC>(demux, &mut addrs_to_search))
346        };
347        match sock {
348            None => break FoundSocket::No,
349            Some(SocketLookupResult::Connection(demux_conn_id, conn_addr)) => {
350                // It is not possible to have two same connections that
351                // share the same local and remote IPs and ports.
352                assert_eq!(tw_reuse, None);
353                let disposition = match WireI::as_dual_stack_ip_socket(&demux_conn_id) {
354                    EitherStack::ThisStack(conn_id) => {
355                        try_handle_incoming_for_connection_dual_stack(
356                            core_ctx,
357                            bindings_ctx,
358                            conn_id,
359                            incoming_device,
360                            header_info,
361                            &incoming,
362                        )
363                    }
364                    EitherStack::OtherStack(conn_id) => {
365                        try_handle_incoming_for_connection_dual_stack(
366                            core_ctx,
367                            bindings_ctx,
368                            conn_id,
369                            incoming_device,
370                            header_info,
371                            &incoming,
372                        )
373                    }
374                };
375                match disposition {
376                    ConnectionIncomingSegmentDisposition::Destroy => {
377                        WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, demux_conn_id);
378                        break FoundSocket::Yes(None);
379                    }
380                    ConnectionIncomingSegmentDisposition::FoundSocket
381                    | ConnectionIncomingSegmentDisposition::Filtered => {
382                        break FoundSocket::Yes(Some(demux_conn_id));
383                    }
384                    ConnectionIncomingSegmentDisposition::ReuseCandidateForListener => {
385                        tw_reuse = Some((demux_conn_id, conn_addr));
386                    }
387                }
388            }
389            Some(SocketLookupResult::Listener((demux_listener_id, _listener_addr))) => {
390                match WireI::as_dual_stack_ip_socket(&demux_listener_id) {
391                    EitherStack::ThisStack(listener_id) => {
392                        let disposition = core_ctx.with_socket_mut_generators_transport_demux(
393                            &listener_id,
394                            |core_ctx, socket_state, isn, timestamp_offset| match core_ctx {
395                                MaybeDualStack::NotDualStack((core_ctx, converter)) => {
396                                    try_handle_incoming_for_listener::<WireI, WireI, CC, BC, _, _>(
397                                        core_ctx,
398                                        bindings_ctx,
399                                        &listener_id,
400                                        isn,
401                                        timestamp_offset,
402                                        socket_state,
403                                        header_info,
404                                        incoming,
405                                        conn_addr,
406                                        incoming_device,
407                                        &mut tw_reuse,
408                                        move |conn, addr| converter.convert_back((conn, addr)),
409                                        WireI::into_demux_socket_id,
410                                        marks,
411                                    )
412                                }
413                                MaybeDualStack::DualStack((core_ctx, converter)) => {
414                                    try_handle_incoming_for_listener::<_, _, CC, BC, _, _>(
415                                        core_ctx,
416                                        bindings_ctx,
417                                        &listener_id,
418                                        isn,
419                                        timestamp_offset,
420                                        socket_state,
421                                        header_info,
422                                        incoming,
423                                        conn_addr,
424                                        incoming_device,
425                                        &mut tw_reuse,
426                                        move |conn, addr| {
427                                            converter
428                                                .convert_back(EitherStack::ThisStack((conn, addr)))
429                                        },
430                                        WireI::into_demux_socket_id,
431                                        marks,
432                                    )
433                                }
434                            },
435                        );
436                        if try_handle_listener_incoming_disposition(
437                            core_ctx,
438                            bindings_ctx,
439                            disposition,
440                            &demux_listener_id,
441                            &mut tw_reuse,
442                            &mut addrs_to_search,
443                            conn_addr,
444                            incoming_device,
445                        ) {
446                            break FoundSocket::Yes(Some(demux_listener_id));
447                        }
448                    }
449                    EitherStack::OtherStack(listener_id) => {
450                        let disposition = core_ctx.with_socket_mut_generators_transport_demux(
451                            &listener_id,
452                            |core_ctx, socket_state, isn, timestamp_offset| {
453                                match core_ctx {
454                                    MaybeDualStack::NotDualStack((_core_ctx, _converter)) => {
455                                        // TODO(https://issues.fuchsia.dev/316408184):
456                                        // Remove this unreachable!.
457                                        unreachable!("OtherStack socket ID with non dual stack");
458                                    }
459                                    MaybeDualStack::DualStack((core_ctx, converter)) => {
460                                        let other_demux_id_converter =
461                                            core_ctx.other_demux_id_converter();
462                                        try_handle_incoming_for_listener::<_, _, CC, BC, _, _>(
463                                            core_ctx,
464                                            bindings_ctx,
465                                            &listener_id,
466                                            isn,
467                                            timestamp_offset,
468                                            socket_state,
469                                            header_info,
470                                            incoming,
471                                            conn_addr,
472                                            incoming_device,
473                                            &mut tw_reuse,
474                                            move |conn, addr| {
475                                                converter.convert_back(EitherStack::OtherStack((
476                                                    conn, addr,
477                                                )))
478                                            },
479                                            move |id| other_demux_id_converter.convert(id),
480                                            marks,
481                                        )
482                                    }
483                                }
484                            },
485                        );
486                        if try_handle_listener_incoming_disposition::<_, _, CC, BC, _>(
487                            core_ctx,
488                            bindings_ctx,
489                            disposition,
490                            &demux_listener_id,
491                            &mut tw_reuse,
492                            &mut addrs_to_search,
493                            conn_addr,
494                            incoming_device,
495                        ) {
496                            break FoundSocket::Yes(Some(demux_listener_id));
497                        }
498                    }
499                };
500            }
501        }
502    };
503
504    let demux_id = match found_socket {
505        FoundSocket::No => {
506            CounterContext::<TcpCountersWithoutSocket<WireI>>::counters(core_ctx)
507                .received_segments_no_dispatch
508                .increment();
509
510            // There is no existing TCP state, pretend it is closed
511            // and generate a RST if needed.
512            // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-21):
513            // CLOSED is fictional because it represents the state when
514            // there is no TCB, and therefore, no connection.
515            if let Some(seg) =
516                (Closed { reason: None::<Option<ConnectionError>> }.on_segment(&incoming.into()))
517            {
518                socket::send_tcp_segment::<WireI, WireI, _, _, _>(
519                    core_ctx,
520                    bindings_ctx,
521                    None,
522                    None,
523                    conn_addr,
524                    seg.into_empty(),
525                    &TcpIpSockOptions { marks: *marks },
526                );
527            }
528            None
529        }
530        FoundSocket::Yes(demux_id) => {
531            counters::increment_counter_with_optional_demux_id::<WireI, _, _, _, _>(
532                core_ctx,
533                demux_id.as_ref(),
534                |c| &c.received_segments_dispatched,
535            );
536            demux_id
537        }
538    };
539
540    if let Some(control) = incoming.control() {
541        counters::increment_counter_with_optional_demux_id::<WireI, _, _, _, _>(
542            core_ctx,
543            demux_id.as_ref(),
544            |c| match control {
545                Control::RST => &c.resets_received,
546                Control::SYN => &c.syns_received,
547                Control::FIN => &c.fins_received,
548            },
549        )
550    }
551}
552
553enum SocketLookupResult<I: DualStackIpExt, D: WeakDeviceIdentifier, BT: TcpBindingsTypes> {
554    Connection(I::DemuxSocketId<D, BT>, ConnAddr<ConnIpAddr<I::Addr, NonZeroU16, NonZeroU16>, D>),
555    Listener((I::DemuxSocketId<D, BT>, ListenerAddr<ListenerIpAddr<I::Addr, NonZeroU16>, D>)),
556}
557
558fn lookup_socket<I, CC, BC>(
559    DemuxState { socketmap, .. }: &DemuxState<I, CC::WeakDeviceId, BC>,
560    addrs_to_search: &mut AddrVecIter<I, CC::WeakDeviceId, TcpPortSpec>,
561) -> Option<SocketLookupResult<I, CC::WeakDeviceId, BC>>
562where
563    I: DualStackIpExt,
564    BC: TcpBindingsContext<CC::DeviceId>,
565    CC: TcpContext<I, BC>,
566{
567    addrs_to_search.find_map(|addr| {
568        match addr {
569            // Connections are always searched before listeners because they
570            // are more specific.
571            AddrVec::Conn(conn_addr) => {
572                socketmap.conns().get_by_addr(&conn_addr).map(|conn_addr_state| {
573                    SocketLookupResult::Connection(conn_addr_state.id(), conn_addr)
574                })
575            }
576            AddrVec::Listen(listener_addr) => {
577                // If we have a listener and the incoming segment is a SYN, we
578                // allocate a new connection entry in the demuxer.
579                // TODO(https://fxbug.dev/42052878): Support SYN cookies.
580
581                socketmap
582                    .listeners()
583                    .get_by_addr(&listener_addr)
584                    .and_then(|addr_state| match addr_state {
585                        ListenerAddrState::ExclusiveListener(id) => Some(id.clone()),
586                        ListenerAddrState::Shared { listener: Some(id), bound: _ } => {
587                            Some(id.clone())
588                        }
589                        ListenerAddrState::ExclusiveBound(_)
590                        | ListenerAddrState::Shared { listener: None, bound: _ } => None,
591                    })
592                    .map(|id| SocketLookupResult::Listener((id, listener_addr)))
593            }
594        }
595    })
596}
597
598#[derive(PartialEq, Eq)]
599enum ConnectionIncomingSegmentDisposition {
600    FoundSocket,
601    Filtered,
602    ReuseCandidateForListener,
603    Destroy,
604}
605
606enum ListenerIncomingSegmentDisposition<S> {
607    FoundSocket,
608    Filtered,
609    ConflictingConnection,
610    NoMatchingSocket,
611    NewConnection(S),
612}
613
614fn try_handle_incoming_for_connection_dual_stack<SockI, WireI, CC, BC, H>(
615    core_ctx: &mut CC,
616    bindings_ctx: &mut BC,
617    conn_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
618    incoming_device: &CC::DeviceId,
619    header_info: &H,
620    incoming: &VerifiedTcpSegment<'_>,
621) -> ConnectionIncomingSegmentDisposition
622where
623    SockI: DualStackIpExt,
624    WireI: Ip,
625    BC: TcpBindingsContext<CC::DeviceId>
626        + BufferProvider<
627            BC::ReceiveBuffer,
628            BC::SendBuffer,
629            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
630            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
631        >,
632    CC: TcpContext<SockI, BC>,
633    H: IpHeaderInfo<WireI>,
634{
635    core_ctx.with_socket_mut_transport_demux(conn_id, |core_ctx, socket_state| {
636        let TcpSocketState { socket_state, sharing: _, ip_options: _, socket_options } =
637            socket_state;
638
639        match run_socket_ingress_filter(
640            bindings_ctx,
641            incoming_device,
642            conn_id.socket_info(),
643            socket_options,
644            header_info,
645            incoming.tcp_segment(),
646        ) {
647            SocketIngressFilterResult::Accept => (),
648            SocketIngressFilterResult::Drop => {
649                return ConnectionIncomingSegmentDisposition::Filtered;
650            }
651        }
652
653        let (conn_and_addr, timer) = assert_matches!(
654            socket_state,
655            TcpSocketStateInner::Connected { conn, timer } => (conn, timer),
656            "invalid socket ID"
657        );
658        let this_or_other_stack = match core_ctx {
659            MaybeDualStack::DualStack((core_ctx, converter)) => {
660                match converter.convert(conn_and_addr) {
661                    EitherStack::ThisStack((conn, conn_addr)) => {
662                        // The socket belongs to the current stack, so we
663                        // want to deliver the segment to this stack.
664                        // Use `as_this_stack` to make the context types
665                        // match with the non-dual-stack case.
666                        EitherStack::ThisStack((
667                            core_ctx.as_this_stack(),
668                            conn,
669                            conn_addr,
670                            SockI::into_demux_socket_id(conn_id.clone()),
671                        ))
672                    }
673                    EitherStack::OtherStack((conn, conn_addr)) => {
674                        // We need to deliver from the other stack. i.e. we
675                        // need to deliver an IPv4 packet to the IPv6 stack.
676                        let demux_sock_id = core_ctx.into_other_demux_socket_id(conn_id.clone());
677                        EitherStack::OtherStack((core_ctx, conn, conn_addr, demux_sock_id))
678                    }
679                }
680            }
681            MaybeDualStack::NotDualStack((core_ctx, converter)) => {
682                let (conn, conn_addr) = converter.convert(conn_and_addr);
683                // Similar to the first case, we need deliver to this stack,
684                // but use `as_this_stack` to make the types match.
685                EitherStack::ThisStack((
686                    core_ctx.as_this_stack(),
687                    conn,
688                    conn_addr,
689                    SockI::into_demux_socket_id(conn_id.clone()),
690                ))
691            }
692        };
693
694        match this_or_other_stack {
695            EitherStack::ThisStack((core_ctx, conn, conn_addr, demux_conn_id)) => {
696                try_handle_incoming_for_connection::<_, _, CC, _, _>(
697                    core_ctx,
698                    bindings_ctx,
699                    conn_addr.clone(),
700                    conn_id,
701                    demux_conn_id,
702                    socket_options,
703                    conn,
704                    timer,
705                    incoming.into(),
706                )
707            }
708            EitherStack::OtherStack((core_ctx, conn, conn_addr, demux_conn_id)) => {
709                try_handle_incoming_for_connection::<_, _, CC, _, _>(
710                    core_ctx,
711                    bindings_ctx,
712                    conn_addr.clone(),
713                    conn_id,
714                    demux_conn_id,
715                    socket_options,
716                    conn,
717                    timer,
718                    incoming.into(),
719                )
720            }
721        }
722    })
723}
724
725/// Tries to handle the incoming segment by providing it to a connected socket.
726///
727/// Returns `FoundSocket` if the segment was handled; Otherwise,
728/// `ReuseCandidateForListener` will be returned if there is a defunct socket
729/// that is currently in TIME_WAIT, which is ready to be reused if there is an
730/// active listener listening on the port.
731fn try_handle_incoming_for_connection<SockI, WireI, CC, BC, DC>(
732    core_ctx: &mut DC,
733    bindings_ctx: &mut BC,
734    conn_addr: ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
735    conn_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
736    demux_id: WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
737    socket_options: &SocketOptions,
738    conn: &mut Connection<SockI, WireI, CC::WeakDeviceId, BC>,
739    timer: &mut BC::Timer,
740    incoming: Segment<&[u8]>,
741) -> ConnectionIncomingSegmentDisposition
742where
743    SockI: DualStackIpExt,
744    WireI: DualStackIpExt,
745    BC: TcpBindingsContext<CC::DeviceId>
746        + BufferProvider<
747            BC::ReceiveBuffer,
748            BC::SendBuffer,
749            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
750            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
751        >,
752    CC: TcpContext<SockI, BC>,
753    DC: TransportIpContext<WireI, BC, DeviceId = CC::DeviceId, WeakDeviceId = CC::WeakDeviceId>
754        + DeviceIpSocketHandler<SockI, BC>
755        + TcpDemuxContext<WireI, CC::WeakDeviceId, BC>
756        + TcpCounterContext<SockI, CC::WeakDeviceId, BC>
757        + CoreTxMetadataContext<TcpSocketTxMetadata<SockI, CC::WeakDeviceId, BC>, BC>,
758{
759    let Connection { accept_queue, state, ip_sock, defunct, soft_error: _, handshake_status } =
760        conn;
761
762    // Per RFC 9293 Section 3.6.1:
763    //   When a connection is closed actively, it MUST linger in the TIME-WAIT
764    //   state for a time 2xMSL (Maximum Segment Lifetime) (MUST-13). However,
765    //   it MAY accept a new SYN from the remote TCP endpoint to reopen the
766    //   connection directly from TIME-WAIT state (MAY-2), if it:
767    //
768    //   (1) assigns its initial sequence number for the new connection to be
769    //       larger than the largest sequence number it used on the previous
770    //       connection incarnation, and
771    //   (2) returns to TIME-WAIT state if the SYN turns out to be an old
772    //       duplicate.
773    if *defunct
774        && incoming.header().control == Some(Control::SYN)
775        && incoming.header().ack.is_none()
776    {
777        if let State::TimeWait(TimeWait { last_seq: _, closed_rcv, expiry: _, snd_info: _ }) = state
778        {
779            if !incoming.header().seq.before(closed_rcv.ack) {
780                return ConnectionIncomingSegmentDisposition::ReuseCandidateForListener;
781            }
782        }
783    }
784    let (reply, passive_open, data_acked, newly_closed) = state.on_segment::<_, BC>(
785        &conn_id.either(),
786        &TcpCountersRefs::from_ctx(core_ctx, conn_id),
787        incoming,
788        bindings_ctx.now(),
789        socket_options,
790        *defunct,
791    );
792
793    match data_acked {
794        DataAcked::Yes => {
795            core_ctx.confirm_reachable(bindings_ctx, ip_sock, &socket_options.ip_options)
796        }
797        DataAcked::No => {}
798    }
799
800    match state {
801        State::Listen(_) => {
802            unreachable!("has an invalid status: {:?}", conn.state)
803        }
804        State::SynSent(_) | State::SynRcvd(_) => {
805            assert_eq!(*handshake_status, HandshakeStatus::Pending)
806        }
807        State::Established(_)
808        | State::FinWait1(_)
809        | State::FinWait2(_)
810        | State::Closing(_)
811        | State::CloseWait(_)
812        | State::LastAck(_)
813        | State::TimeWait(_) => {
814            if handshake_status
815                .update_if_pending(HandshakeStatus::Completed { reported: accept_queue.is_some() })
816            {
817                core_ctx.confirm_reachable(bindings_ctx, ip_sock, &socket_options.ip_options);
818            }
819        }
820        State::Closed(Closed { reason }) => {
821            // We remove the socket from the socketmap and cancel the timers
822            // regardless of the socket being defunct or not. The justification
823            // is that CLOSED is a synthetic state and it means no connection
824            // exists, thus it should not exist in the demuxer.
825            //
826            // If the socket was already in the closed state we can assume it's
827            // no longer in the demux.
828            socket::handle_newly_closed(
829                core_ctx,
830                bindings_ctx,
831                newly_closed,
832                &demux_id,
833                &conn_addr,
834                timer,
835            );
836            if let Some(accept_queue) = accept_queue {
837                accept_queue.remove(&conn_id);
838                *defunct = true;
839            }
840            if *defunct {
841                // If the client has promised to not touch the socket again,
842                // we can destroy the socket finally.
843                return ConnectionIncomingSegmentDisposition::Destroy;
844            }
845            let _: bool = handshake_status.update_if_pending(match reason {
846                None => HandshakeStatus::Completed { reported: accept_queue.is_some() },
847                Some(_err) => HandshakeStatus::Aborted,
848            });
849        }
850    }
851
852    if let Some(seg) = reply {
853        socket::send_tcp_segment(
854            core_ctx,
855            bindings_ctx,
856            Some(conn_id),
857            Some(&ip_sock),
858            conn_addr.ip,
859            seg.into_empty(),
860            &socket_options.ip_options,
861        );
862    }
863
864    // Send any enqueued data, if there is any.
865    socket::do_send_inner_and_then_handle_newly_closed(
866        conn_id,
867        &demux_id,
868        socket_options,
869        conn,
870        DoSendLimit::MultipleSegments,
871        &conn_addr,
872        timer,
873        core_ctx,
874        bindings_ctx,
875    );
876
877    // Enqueue the connection to the associated listener
878    // socket's accept queue.
879    if let Some(passive_open) = passive_open {
880        let accept_queue = conn.accept_queue.as_ref().expect("no accept queue but passive open");
881        accept_queue.notify_ready(conn_id, passive_open);
882    }
883
884    // We found a valid connection for the segment.
885    ConnectionIncomingSegmentDisposition::FoundSocket
886}
887
888/// Responds to the disposition returned by [`try_handle_incoming_for_listener`].
889///
890/// Returns true if we have found the right socket and there is no need to
891/// continue the iteration for finding the next-best candidate.
892fn try_handle_listener_incoming_disposition<SockI, WireI, CC, BC, Addr>(
893    core_ctx: &mut CC,
894    bindings_ctx: &mut BC,
895    disposition: ListenerIncomingSegmentDisposition<PrimaryRc<SockI, CC::WeakDeviceId, BC>>,
896    demux_listener_id: &WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
897    tw_reuse: &mut Option<(WireI::DemuxSocketId<CC::WeakDeviceId, BC>, Addr)>,
898    addrs_to_search: &mut AddrVecIter<WireI, CC::WeakDeviceId, TcpPortSpec>,
899    conn_addr: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
900    incoming_device: &CC::DeviceId,
901) -> bool
902where
903    SockI: DualStackIpExt,
904    WireI: DualStackIpExt,
905    CC: TcpContext<SockI, BC> + TcpContext<WireI, BC> + TcpContext<WireI::OtherVersion, BC>,
906    BC: TcpBindingsContext<CC::DeviceId>,
907{
908    match disposition {
909        ListenerIncomingSegmentDisposition::FoundSocket => true,
910        ListenerIncomingSegmentDisposition::Filtered => true,
911        ListenerIncomingSegmentDisposition::ConflictingConnection => {
912            // We're about to rewind the lookup. If we got a
913            // conflicting connection it means tw_reuse has been
914            // removed from the demux state and we need to destroy
915            // it.
916            if let Some((tw_reuse, _)) = tw_reuse.take() {
917                WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, tw_reuse);
918            }
919
920            // Reset the address vector iterator and go again, a
921            // conflicting connection was found.
922            *addrs_to_search = AddrVecIter::<WireI, CC::WeakDeviceId, TcpPortSpec>::with_device(
923                conn_addr.into(),
924                incoming_device.downgrade(),
925            );
926            false
927        }
928        ListenerIncomingSegmentDisposition::NoMatchingSocket => false,
929        ListenerIncomingSegmentDisposition::NewConnection(primary) => {
930            // If we have a new connection, we need to add it to the
931            // set of all sockets.
932
933            // First things first, if we got here then tw_reuse is
934            // gone so we need to destroy it.
935            if let Some((tw_reuse, _)) = tw_reuse.take() {
936                WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, tw_reuse);
937            }
938
939            // Now put the new connection into the socket map.
940            //
941            // Note that there's a possible subtle race here where
942            // another thread could have already operated further on
943            // this connection and marked it for destruction which
944            // puts the entry in the DOA state, if we see that we
945            // must immediately destroy the socket after having put
946            // it in the map.
947            let id = TcpSocketId(PrimaryRc::clone_strong(&primary));
948            let to_destroy = core_ctx.with_all_sockets_mut(move |all_sockets| {
949                let insert_entry = TcpSocketSetEntry::Primary(primary);
950                match all_sockets.entry(id) {
951                    hash_map::Entry::Vacant(v) => {
952                        let _: &mut _ = v.insert(insert_entry);
953                        None
954                    }
955                    hash_map::Entry::Occupied(mut o) => {
956                        // We're holding on to the primary ref, the
957                        // only possible state here should be a DOA
958                        // entry.
959                        assert_matches!(
960                            core::mem::replace(o.get_mut(), insert_entry),
961                            TcpSocketSetEntry::DeadOnArrival
962                        );
963                        Some(o.key().clone())
964                    }
965                }
966            });
967            // NB: we're releasing and reaquiring the
968            // all_sockets_mut lock here for the convenience of not
969            // needing different versions of `destroy_socket`. This
970            // should be fine because the race this is solving
971            // should not be common. If we have correct thread
972            // attribution per flow it should effectively become
973            // impossible so we go for code simplicity here.
974            if let Some(to_destroy) = to_destroy {
975                socket::destroy_socket(core_ctx, bindings_ctx, to_destroy);
976            }
977            counters::increment_counter_for_demux_id::<WireI, _, _, _, _>(
978                core_ctx,
979                demux_listener_id,
980                |c| &c.passive_connection_openings,
981            );
982            true
983        }
984    }
985}
986
987/// Tries to handle an incoming segment by passing it to a listening socket.
988///
989/// Returns `FoundSocket` if the segment was handled, otherwise `NoMatchingSocket`.
990fn try_handle_incoming_for_listener<SockI, WireI, CC, BC, DC, H>(
991    core_ctx: &mut DC,
992    bindings_ctx: &mut BC,
993    listener_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
994    isn: &IsnGenerator<BC::Instant>,
995    timestamp_offset: &TimestampOffsetGenerator<BC::Instant>,
996    socket_state: &mut TcpSocketState<SockI, CC::WeakDeviceId, BC>,
997    header_info: &H,
998    incoming: &VerifiedTcpSegment<'_>,
999    incoming_addrs: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
1000    incoming_device: &CC::DeviceId,
1001    tw_reuse: &mut Option<(
1002        WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
1003        ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
1004    )>,
1005    make_connection: impl FnOnce(
1006        Connection<SockI, WireI, CC::WeakDeviceId, BC>,
1007        ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
1008    ) -> SockI::ConnectionAndAddr<CC::WeakDeviceId, BC>,
1009    make_demux_id: impl Fn(
1010        TcpSocketId<SockI, CC::WeakDeviceId, BC>,
1011    ) -> WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
1012    marks: &Marks,
1013) -> ListenerIncomingSegmentDisposition<PrimaryRc<SockI, CC::WeakDeviceId, BC>>
1014where
1015    SockI: DualStackIpExt,
1016    WireI: DualStackIpExt,
1017    BC: TcpBindingsContext<CC::DeviceId>
1018        + BufferProvider<
1019            BC::ReceiveBuffer,
1020            BC::SendBuffer,
1021            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
1022            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
1023        >,
1024    CC: TcpContext<SockI, BC>,
1025    DC: TransportIpContext<WireI, BC, DeviceId = CC::DeviceId, WeakDeviceId = CC::WeakDeviceId>
1026        + DeviceIpSocketHandler<WireI, BC>
1027        + TcpDemuxContext<WireI, CC::WeakDeviceId, BC>
1028        + TcpCounterContext<SockI, CC::WeakDeviceId, BC>
1029        + CoreTxMetadataContext<TcpSocketTxMetadata<SockI, CC::WeakDeviceId, BC>, BC>,
1030    H: IpHeaderInfo<WireI>,
1031{
1032    let Listener { addr: listener_addr, accept_queue, backlog, buffer_sizes } =
1033        match &socket_state.socket_state {
1034            TcpSocketStateInner::Bound(_) => {
1035                // If the socket is only bound, but not listening.
1036                return ListenerIncomingSegmentDisposition::NoMatchingSocket;
1037            }
1038            TcpSocketStateInner::Listener(listener) => listener,
1039            _ => panic!("unexpected socket state: {:?}", socket_state.socket_state),
1040        };
1041
1042    let ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) } =
1043        incoming_addrs;
1044
1045    match run_socket_ingress_filter(
1046        bindings_ctx,
1047        incoming_device,
1048        listener_id.socket_info(),
1049        &socket_state.socket_options,
1050        header_info,
1051        incoming.tcp_segment(),
1052    ) {
1053        SocketIngressFilterResult::Accept => (),
1054        SocketIngressFilterResult::Drop => {
1055            return ListenerIncomingSegmentDisposition::Filtered;
1056        }
1057    }
1058
1059    // Note that this checks happens at the very beginning, before we try to
1060    // reuse the connection in TIME-WAIT, this is because we need to store the
1061    // reused connection in the accept queue so we have to respect its limit.
1062    if accept_queue.len() == backlog.get() {
1063        core_ctx.increment_both(listener_id, |counters| &counters.listener_queue_overflow);
1064        core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1065        debug!("incoming SYN dropped because of the full backlog of the listener");
1066        return ListenerIncomingSegmentDisposition::FoundSocket;
1067    }
1068
1069    // Ensure that if the remote address requires a zone, we propagate that to
1070    // the address for the connected socket.
1071    let bound_device = listener_addr.as_ref().clone();
1072    let bound_device = if remote_ip.as_ref().must_have_zone() {
1073        Some(bound_device.map_or(EitherDeviceId::Strong(incoming_device), EitherDeviceId::Weak))
1074    } else {
1075        bound_device.map(EitherDeviceId::Weak)
1076    };
1077
1078    let ip_options = TcpIpSockOptions { marks: *marks, ..socket_state.socket_options.ip_options };
1079    let socket_options = SocketOptions { ip_options, ..socket_state.socket_options };
1080
1081    let bound_device = bound_device.as_ref().map(|d| d.as_ref());
1082    let ip_sock = match core_ctx.new_ip_socket(
1083        bindings_ctx,
1084        IpSocketArgs {
1085            device: bound_device,
1086            local_ip: IpDeviceAddr::new_from_socket_ip_addr(local_ip),
1087            remote_ip,
1088            proto: IpProto::Tcp.into(),
1089            options: &ip_options,
1090        },
1091    ) {
1092        Ok(ip_sock) => ip_sock,
1093        err @ Err(IpSockCreationError::Route(_)) => {
1094            core_ctx.increment_both(listener_id, |counters| &counters.passive_open_no_route_errors);
1095            core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1096            debug!("cannot construct an ip socket to the SYN originator: {:?}, ignoring", err);
1097            return ListenerIncomingSegmentDisposition::NoMatchingSocket;
1098        }
1099    };
1100
1101    let isn = isn.generate(
1102        bindings_ctx.now(),
1103        (ip_sock.local_ip().clone().into(), local_port),
1104        (ip_sock.remote_ip().clone(), remote_port),
1105    );
1106    let timestamp_offset = timestamp_offset.generate::<SocketIpAddr<WireI::Addr>, NonZeroU16>(
1107        bindings_ctx.now(),
1108        (ip_sock.local_ip().clone().into(), local_port),
1109        (ip_sock.remote_ip().clone(), remote_port),
1110    );
1111    let device_mms = match core_ctx.get_mms(bindings_ctx, &ip_sock, &socket_options.ip_options) {
1112        Ok(mms) => mms,
1113        Err(err) => {
1114            // If we cannot find a device or the device's MTU is too small,
1115            // there isn't much we can do here since sending a RST back is
1116            // impossible, we just need to silent drop the segment.
1117            error!("Cannot find a device with large enough MTU for the connection");
1118            core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1119            match err {
1120                MmsError::NoDevice(_) | MmsError::MTUTooSmall(_) => {
1121                    return ListenerIncomingSegmentDisposition::FoundSocket;
1122                }
1123            }
1124        }
1125    };
1126    let Some(device_mss) = Mss::from_mms(device_mms) else {
1127        return ListenerIncomingSegmentDisposition::FoundSocket;
1128    };
1129
1130    let mut state = State::Listen(Closed::<Initial>::listen(
1131        isn,
1132        timestamp_offset,
1133        buffer_sizes.clone(),
1134        device_mss,
1135        Mss::default::<WireI>(),
1136        socket_options.user_timeout,
1137    ));
1138
1139    // Prepare a reply to be sent out.
1140    //
1141    // We might end up discarding the reply in case we can't instantiate this
1142    // new connection.
1143    let result = state.on_segment::<_, BC>(
1144        // NB: This is a bit of a lie, we're passing the listener ID to process
1145        // the first segment because we don't have an ID allocated yet. This is
1146        // okay because the state machine ID is only for debugging purposes.
1147        &listener_id.either(),
1148        &TcpCountersRefs::from_ctx(core_ctx, listener_id),
1149        incoming.into(),
1150        bindings_ctx.now(),
1151        &SocketOptions::default(),
1152        false, /* defunct */
1153    );
1154    let reply = assert_matches!(
1155        result,
1156        (reply, None, /* data_acked */ _, NewlyClosed::No /* can't become closed */) => reply
1157    );
1158
1159    let result = if matches!(state, State::SynRcvd(_)) {
1160        let poll_send_at = state.poll_send_at().expect("no retrans timer");
1161        let bound_device = ip_sock.device().cloned();
1162
1163        let addr = ConnAddr {
1164            ip: ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) },
1165            device: bound_device,
1166        };
1167
1168        let new_socket = core_ctx.with_demux_mut(|DemuxState { socketmap, .. }| {
1169            // If we're reusing an entry, remove it from the demux before
1170            // proceeding.
1171            //
1172            // We could just reuse the old allocation for the new connection but
1173            // because of the restrictions on the socket map data structure (for
1174            // good reasons), we can't update the sharing info unconditionally.
1175            // So here we just remove the old connection and create a new one.
1176            // Also this approach has the benefit of not accidentally persisting
1177            // the old state that we don't want.
1178            if let Some((tw_reuse, conn_addr)) = tw_reuse {
1179                match socketmap.conns_mut().remove(tw_reuse, &conn_addr) {
1180                    Ok(()) => {
1181                        // NB: We're removing the tw_reuse connection from the
1182                        // demux here, but not canceling its timer. The timer is
1183                        // canceled via drop when we destroy the socket. Special
1184                        // care is taken when handling timers in the time wait
1185                        // state to account for this.
1186                    }
1187                    Err(NotFoundError) => {
1188                        // We could lose a race trying to reuse the tw_reuse
1189                        // socket, so we just accept the loss and be happy that
1190                        // the conn_addr we want to use is free.
1191                    }
1192                }
1193            }
1194
1195            // Try to create and add the new socket to the demux.
1196            let accept_queue_clone = accept_queue.clone();
1197            let ip_sock = ip_sock.clone();
1198            let bindings_ctx_moved = &mut *bindings_ctx;
1199            let sharing = socket_state.sharing;
1200            match socketmap.conns_mut().try_insert_with(addr, sharing, move |addr, sharing| {
1201                let conn = make_connection(
1202                    Connection {
1203                        accept_queue: Some(accept_queue_clone),
1204                        state,
1205                        ip_sock,
1206                        defunct: false,
1207                        soft_error: None,
1208                        handshake_status: HandshakeStatus::Pending,
1209                    },
1210                    addr,
1211                );
1212
1213                let (id, primary) = TcpSocketId::new_cyclic(
1214                    |weak| {
1215                        let mut timer = CC::new_timer(bindings_ctx_moved, weak);
1216                        // Schedule the timer here because we can't acquire the lock
1217                        // later. This only runs when inserting into the demux
1218                        // succeeds so it's okay.
1219                        assert_eq!(
1220                            bindings_ctx_moved.schedule_timer_instant(poll_send_at, &mut timer),
1221                            None
1222                        );
1223                        TcpSocketStateInner::Connected { conn, timer }
1224                    },
1225                    sharing,
1226                    socket_options,
1227                );
1228                (make_demux_id(id.clone()), (primary, id))
1229            }) {
1230                Ok((_entry, (primary, id))) => {
1231                    // Make sure the new socket is in the pending accept queue
1232                    // before we release the demux lock.
1233                    accept_queue.push_pending(id);
1234                    Some(primary)
1235                }
1236                Err(e) => {
1237                    // The only error we accept here is if the entry exists
1238                    // fully, any indirect conflicts are unexpected because we
1239                    // know the listener is still alive and installed in the
1240                    // demux.
1241                    assert_matches!(e, InsertError::Exists);
1242                    // If we fail to insert it means we lost a race and this
1243                    // packet is destined to a connection that is already
1244                    // established. In that case we should tell the demux code
1245                    // to retry demuxing it all over again.
1246                    None
1247                }
1248            }
1249        });
1250
1251        match new_socket {
1252            Some(new_socket) => ListenerIncomingSegmentDisposition::NewConnection(new_socket),
1253            None => {
1254                // We didn't create a new connection, short circuit early and
1255                // don't send out the pending segment.
1256                core_ctx
1257                    .increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1258                return ListenerIncomingSegmentDisposition::ConflictingConnection;
1259            }
1260        }
1261    } else {
1262        // We found a valid listener for the segment even if the connection
1263        // state is not a newly pending connection.
1264        ListenerIncomingSegmentDisposition::FoundSocket
1265    };
1266
1267    // We can send a reply now if we got here.
1268    if let Some(seg) = reply {
1269        socket::send_tcp_segment(
1270            core_ctx,
1271            bindings_ctx,
1272            Some(&listener_id),
1273            Some(&ip_sock),
1274            incoming_addrs,
1275            seg.into_empty(),
1276            &socket_options.ip_options,
1277        );
1278    }
1279
1280    result
1281}
1282
1283pub(super) fn tcp_serialize_segment<'a, I, P>(
1284    header: &'a SegmentHeader,
1285    data: P,
1286    conn_addr: ConnIpAddr<I::Addr, NonZeroU16, NonZeroU16>,
1287) -> impl TransportPacketSerializer<I, Buffer = EmptyBuf> + Debug + 'a
1288where
1289    I: FilterIpExt,
1290    P: InnerPacketBuilder + Debug + Payload + 'a,
1291{
1292    let SegmentHeader { seq, ack, wnd, control, options, push } = header;
1293    let ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) } = conn_addr;
1294    let mut builder = TcpSegmentBuilder::new(
1295        local_ip.addr(),
1296        remote_ip.addr(),
1297        local_port,
1298        remote_port,
1299        (*seq).into(),
1300        ack.map(Into::into),
1301        u16::from(*wnd),
1302    );
1303    builder.psh(*push);
1304    match control {
1305        None => {}
1306        Some(Control::SYN) => builder.syn(true),
1307        Some(Control::FIN) => builder.fin(true),
1308        Some(Control::RST) => builder.rst(true),
1309    }
1310    TcpSegmentBuilderWithOptions::new(builder, options.builder())
1311        .unwrap_or_else(|TcpOptionsTooLongError| {
1312            panic!("Too many TCP options");
1313        })
1314        .wrap_body(data.into_serializer())
1315}
1316
1317fn run_socket_ingress_filter<I, BC, D>(
1318    bindings_ctx: &BC,
1319    incoming_device: &D,
1320    socket_info: netstack3_base::socket::SocketInfo,
1321    socket_options: &SocketOptions,
1322    header_info: &impl IpHeaderInfo<I>,
1323    tcp_segment: &TcpSegment<&'_ [u8]>,
1324) -> SocketIngressFilterResult
1325where
1326    I: Ip,
1327    BC: TcpBindingsContext<D>,
1328    D: StrongDeviceIdentifier,
1329{
1330    let [ip_prefix, ip_options] = header_info.as_bytes();
1331    let [tcp_prefix, tcp_options, data] = tcp_segment.as_bytes();
1332    let mut slices = [ip_prefix, ip_options, tcp_prefix, tcp_options, data];
1333    let data = FragmentedByteSlice::new(&mut slices);
1334
1335    bindings_ctx.socket_ops_filter().on_ingress(
1336        I::VERSION,
1337        data,
1338        incoming_device,
1339        socket_info,
1340        &socket_options.ip_options.marks,
1341    )
1342}
1343
1344#[cfg(test)]
1345mod test {
1346    use ip_test_macro::ip_test;
1347    use netstack3_base::{
1348        HandshakeOptions, NetworkSerializationContext, Options, ResetOptions, SackBlocks,
1349        SegmentOptions, UnscaledWindowSize,
1350    };
1351    use packet::Serializer as _;
1352    use packet_formats::tcp::options::TcpOptions as _;
1353    use test_case::test_case;
1354
1355    use super::*;
1356
1357    trait TestIpExt: netstack3_base::testutil::TestIpExt + FilterIpExt {}
1358    impl<T> TestIpExt for T where T: netstack3_base::testutil::TestIpExt + FilterIpExt {}
1359
1360    const SEQ: SeqNum = SeqNum::new(12345);
1361    const ACK: SeqNum = SeqNum::new(67890);
1362    const FAKE_DATA: &'static [u8] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 0];
1363
1364    #[ip_test(I)]
1365    #[test_case(
1366        Segment::syn(SEQ, UnscaledWindowSize::from(u16::MAX),
1367        HandshakeOptions::default()), &[]
1368        ; "syn")]
1369    #[test_case(
1370        Segment::syn(SEQ, UnscaledWindowSize::from(u16::MAX),
1371        HandshakeOptions {
1372            mss: Some(Mss::new(1440).unwrap()),
1373            ..Default::default() }), &[]
1374            ; "syn with mss")]
1375    #[test_case(
1376        Segment::ack(SEQ, ACK, UnscaledWindowSize::from(u16::MAX), SegmentOptions::default()),
1377        &[]; "ack")]
1378    #[test_case(Segment::with_fake_data(SEQ, ACK, FAKE_DATA), FAKE_DATA; "data")]
1379    #[test_case(Segment::new_assert_no_discard(SegmentHeader {
1380            seq: SEQ,
1381            ack: Some(ACK),
1382            push: true,
1383            wnd: UnscaledWindowSize::from(u16::MAX),
1384            ..Default::default()
1385        },
1386        FAKE_DATA
1387    ), FAKE_DATA; "push")]
1388    fn tcp_serialize_segment<I: TestIpExt>(segment: Segment<&[u8]>, expected_body: &[u8]) {
1389        const SOURCE_PORT: NonZeroU16 = NonZeroU16::new(1111).unwrap();
1390        const DEST_PORT: NonZeroU16 = NonZeroU16::new(2222).unwrap();
1391
1392        let (header, data) = segment.into_parts();
1393        let serializer = super::tcp_serialize_segment::<I, _>(
1394            &header,
1395            data,
1396            ConnIpAddr {
1397                local: (SocketIpAddr::try_from(I::TEST_ADDRS.local_ip).unwrap(), SOURCE_PORT),
1398                remote: (SocketIpAddr::try_from(I::TEST_ADDRS.remote_ip).unwrap(), DEST_PORT),
1399            },
1400        );
1401
1402        let mut serialized = serializer
1403            .serialize_vec_outer(&mut NetworkSerializationContext::default())
1404            .unwrap()
1405            .unwrap_b();
1406        let parsed_segment = serialized
1407            .parse_with::<_, TcpSegment<_>>(TcpParseArgs::new(
1408                *I::TEST_ADDRS.remote_ip,
1409                *I::TEST_ADDRS.local_ip,
1410            ))
1411            .expect("is valid segment");
1412
1413        assert_eq!(parsed_segment.src_port(), SOURCE_PORT);
1414        assert_eq!(parsed_segment.dst_port(), DEST_PORT);
1415        assert_eq!(parsed_segment.seq_num(), u32::from(SEQ));
1416        assert_eq!(parsed_segment.psh(), header.push);
1417        assert_eq!(
1418            UnscaledWindowSize::from(parsed_segment.window_size()),
1419            UnscaledWindowSize::from(u16::MAX)
1420        );
1421
1422        let (mss, window_scale, sack_permitted, sack_blocks, timestamp) = match header.options {
1423            Options::Handshake(HandshakeOptions {
1424                mss,
1425                window_scale,
1426                sack_permitted,
1427                timestamp,
1428            }) => (mss, window_scale, sack_permitted, SackBlocks::EMPTY, timestamp),
1429            Options::Segment(SegmentOptions { timestamp, sack_blocks }) => {
1430                (None, None, false, sack_blocks, timestamp)
1431            }
1432            Options::Reset(ResetOptions { timestamp }) => {
1433                (None, None, false, SackBlocks::EMPTY, timestamp)
1434            }
1435        };
1436        assert_eq!(mss.map(|mss| mss.get()), parsed_segment.options().mss());
1437        assert_eq!(window_scale.map(|ws| ws.get()), parsed_segment.options().window_scale());
1438        assert_eq!(sack_permitted, parsed_segment.options().sack_permitted());
1439        assert_eq!(sack_blocks.as_slice(), parsed_segment.options().sack_blocks());
1440        assert_eq!(
1441            timestamp.as_ref().map(Into::into).as_ref(),
1442            parsed_segment.options().timestamp()
1443        );
1444
1445        assert_eq!(parsed_segment.into_body(), expected_body);
1446    }
1447}