Skip to main content

netstack3_tcp/socket/
demux.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Defines the entry point of TCP packets, by directing them into the correct
6//! state machine.
7
8use core::fmt::Debug;
9use core::num::NonZeroU16;
10
11use assert_matches::assert_matches;
12use log::{debug, error, warn};
13use net_types::ip::Ip;
14use net_types::{SpecifiedAddr, Witness as _};
15use netstack3_base::socket::{
16    AddrIsMappedError, AddrVec, AddrVecIter, ConnAddr, ConnIpAddr, InsertError, IpAddrVec,
17    ListenerAddr, ListenerIpAddr, SocketCookie, SocketIpAddr, SocketIpAddrExt as _,
18};
19use netstack3_base::{
20    BidirectionalConverter as _, Control, CounterContext, CtxPair, EitherDeviceId, IpDeviceAddr,
21    Marks, Mss, NotFoundError, Payload, Segment, SegmentHeader, SeqNum, StrongDeviceIdentifier,
22    VerifiedTcpSegment, WeakDeviceIdentifier,
23};
24use netstack3_filter::{
25    FilterIpExt, SocketIngressFilterResult, SocketOpsFilter, TransportPacketSerializer,
26};
27use netstack3_hashmap::hash_map;
28use netstack3_ip::socket::{IpSockCreationError, IpSocketArgs, MmsError};
29use netstack3_ip::{
30    IpHeaderInfo, IpTransportContext, LocalDeliveryPacketInfo, ReceiveIpPacketMeta,
31    TransportIpContext,
32};
33use netstack3_trace::trace_duration;
34use packet::{
35    BufferMut, BufferView as _, EmptyBuf, FragmentedByteSlice, InnerPacketBuilder, PacketBuilder,
36    ParseBuffer,
37};
38use packet_formats::error::ParseError;
39use packet_formats::ip::IpProto;
40use packet_formats::tcp::{
41    TcpFlowAndSeqNum, TcpOptionsTooLongError, TcpParseArgs, TcpSegment, TcpSegmentBuilder,
42    TcpSegmentBuilderWithOptions, TcpSegmentRaw,
43};
44
45use crate::internal::base::{BufferSizes, ConnectionError, SocketOptions, TcpIpSockOptions};
46use crate::internal::counters::{
47    self, TcpCounterContext, TcpCountersRefs, TcpCountersWithoutSocket,
48};
49use crate::internal::socket::generators::{IsnGenerator, TimestampOffsetGenerator};
50use crate::internal::socket::{
51    self, AsThisStack as _, Connection, CoreTxMetadataContext, DemuxState, DeviceIpSocketHandler,
52    DoSendLimit, DualStackBaseIpExt, DualStackDemuxIdConverter as _, DualStackIpExt, EitherStack,
53    HandshakeStatus, Listener, ListenerAddrState, MaybeDualStack, PrimaryRc, TcpApi,
54    TcpBindingsContext, TcpBindingsTypes, TcpContext, TcpDemuxContext, TcpDualStackContext,
55    TcpIpTransportContext, TcpPortSpec, TcpSocketId, TcpSocketSetEntry, TcpSocketState,
56    TcpSocketStateInner, TcpSocketTxMetadata,
57};
58use crate::internal::state::{
59    BufferProvider, Closed, DataAcked, Initial, NewlyClosed, State, TimeWait,
60};
61
62impl<BT: TcpBindingsTypes> BufferProvider<BT::ReceiveBuffer, BT::SendBuffer> for BT {
63    type ActiveOpen = BT::ListenerNotifierOrProvidedBuffers;
64
65    type PassiveOpen = BT::ReturnedBuffers;
66
67    fn new_passive_open_buffers(
68        buffer_sizes: BufferSizes,
69    ) -> (BT::ReceiveBuffer, BT::SendBuffer, Self::PassiveOpen) {
70        BT::new_passive_open_buffers(buffer_sizes)
71    }
72}
73
74/// Alias for a SocketId that can reference either V4 or V6 socket.
75pub type DualStackTcpSocketId<I, D, BT> = <I as DualStackBaseIpExt>::DemuxSocketId<D, BT>;
76
77impl<I, BC, CC> IpTransportContext<I, BC, CC> for TcpIpTransportContext
78where
79    I: DualStackIpExt,
80    BC: TcpBindingsContext<CC::DeviceId>
81        + BufferProvider<
82            BC::ReceiveBuffer,
83            BC::SendBuffer,
84            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
85            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
86        >,
87    CC: TcpContext<I, BC> + TcpContext<I::OtherVersion, BC>,
88{
89    type EarlyDemuxSocket = DualStackTcpSocketId<I, CC::WeakDeviceId, BC>;
90
91    fn early_demux<B: ParseBuffer>(
92        core_ctx: &mut CC,
93        device: &CC::DeviceId,
94        src_ip: I::Addr,
95        dst_ip: I::Addr,
96        buffer: B,
97    ) -> Option<Self::EarlyDemuxSocket> {
98        early_demux_ip_packet::<I, _, _, _>(core_ctx, device, src_ip, dst_ip, buffer)
99    }
100
101    fn receive_icmp_error(
102        core_ctx: &mut CC,
103        bindings_ctx: &mut BC,
104        _device: &CC::DeviceId,
105        original_src_ip: Option<SpecifiedAddr<I::Addr>>,
106        original_dst_ip: SpecifiedAddr<I::Addr>,
107        mut original_body: &[u8],
108        err: I::ErrorCode,
109    ) {
110        let mut buffer = &mut original_body;
111        let Some(flow_and_seqnum) = buffer.take_obj_front::<TcpFlowAndSeqNum>() else {
112            error!("received an ICMP error but its body is less than 8 bytes");
113            return;
114        };
115
116        let Some(original_src_ip) = original_src_ip else { return };
117        let Some(original_src_port) = NonZeroU16::new(flow_and_seqnum.src_port()) else { return };
118        let Some(original_dst_port) = NonZeroU16::new(flow_and_seqnum.dst_port()) else { return };
119        let original_seqnum = SeqNum::new(flow_and_seqnum.sequence_num());
120
121        TcpApi::<I, _>::new(CtxPair { core_ctx, bindings_ctx }).on_icmp_error(
122            original_src_ip,
123            original_dst_ip,
124            original_src_port,
125            original_dst_port,
126            original_seqnum,
127            err.into(),
128        );
129    }
130
131    fn receive_ip_packet<B: BufferMut, H: IpHeaderInfo<I>>(
132        core_ctx: &mut CC,
133        bindings_ctx: &mut BC,
134        device: &CC::DeviceId,
135        remote_ip: I::RecvSrcAddr,
136        local_ip: SpecifiedAddr<I::Addr>,
137        mut buffer: B,
138        info: &LocalDeliveryPacketInfo<I, H>,
139        early_demux_socket: Option<Self::EarlyDemuxSocket>,
140    ) -> Result<(), (B, I::IcmpError)> {
141        let LocalDeliveryPacketInfo { meta, header_info, marks } = info;
142        let ReceiveIpPacketMeta { broadcast, transparent_override } = meta;
143        if let Some(delivery) = transparent_override {
144            warn!(
145                "TODO(https://fxbug.dev/337009139): transparent proxy not supported for TCP \
146                sockets; will not override dispatch to perform local delivery to {delivery:?}"
147            );
148        }
149
150        if broadcast.is_some() {
151            CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
152                .invalid_ip_addrs_received
153                .increment();
154            debug!("tcp: dropping broadcast TCP packet");
155            return Ok(());
156        }
157
158        let remote_ip = match SpecifiedAddr::new(remote_ip.into_addr()) {
159            None => {
160                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
161                    .invalid_ip_addrs_received
162                    .increment();
163                debug!("tcp: source address unspecified, dropping the packet");
164                return Ok(());
165            }
166            Some(src_ip) => src_ip,
167        };
168        let remote_ip: SocketIpAddr<_> = match remote_ip.try_into() {
169            Ok(remote_ip) => remote_ip,
170            Err(AddrIsMappedError {}) => {
171                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
172                    .invalid_ip_addrs_received
173                    .increment();
174                debug!("tcp: source address is mapped (ipv4-mapped-ipv6), dropping the packet");
175                return Ok(());
176            }
177        };
178        let local_ip: SocketIpAddr<_> = match local_ip.try_into() {
179            Ok(local_ip) => local_ip,
180            Err(AddrIsMappedError {}) => {
181                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
182                    .invalid_ip_addrs_received
183                    .increment();
184                debug!("tcp: local address is mapped (ipv4-mapped-ipv6), dropping the packet");
185                return Ok(());
186            }
187        };
188        let packet = match buffer
189            .parse_with::<_, TcpSegment<_>>(TcpParseArgs::new(remote_ip.addr(), local_ip.addr()))
190        {
191            Ok(packet) => packet,
192            Err(err) => {
193                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
194                    .invalid_segments_received
195                    .increment();
196                debug!("tcp: failed parsing incoming packet {:?}", err);
197                match err {
198                    ParseError::Checksum => {
199                        CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
200                            .checksum_errors
201                            .increment();
202                    }
203                    ParseError::NotSupported | ParseError::NotExpected | ParseError::Format => {}
204                }
205                return Ok(());
206            }
207        };
208        let local_port = packet.dst_port();
209        let remote_port = packet.src_port();
210        let incoming = match VerifiedTcpSegment::try_from(packet) {
211            Ok(segment) => segment,
212            Err(err) => {
213                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
214                    .invalid_segments_received
215                    .increment();
216                debug!("tcp: malformed segment {:?}", err);
217                return Ok(());
218            }
219        };
220        let conn_addr =
221            ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) };
222
223        CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
224            .valid_segments_received
225            .increment();
226        handle_incoming_packet::<I, _, _, _>(
227            core_ctx,
228            bindings_ctx,
229            conn_addr,
230            device,
231            header_info,
232            &incoming,
233            marks,
234            early_demux_socket,
235        );
236        Ok(())
237    }
238}
239
240fn early_demux_ip_packet<I, BC, CC, B>(
241    core_ctx: &mut CC,
242    device: &CC::DeviceId,
243    src_ip: I::Addr,
244    dst_ip: I::Addr,
245    mut buffer: B,
246) -> Option<DualStackTcpSocketId<I, CC::WeakDeviceId, BC>>
247where
248    I: DualStackIpExt,
249    BC: TcpBindingsContext<CC::DeviceId>,
250    CC: TcpContext<I, BC> + TcpContext<I::OtherVersion, BC>,
251    B: ParseBuffer,
252{
253    let Ok(packet) = buffer.parse_with::<_, TcpSegmentRaw<_>>(()) else {
254        // If we fail to parse the packet then just return None. Invalid
255        // packets are handled later.
256        return None;
257    };
258
259    let src_ip = SocketIpAddr::new(src_ip)?;
260    let dst_ip = SocketIpAddr::new(dst_ip)?;
261    let (src_port, dst_port) = packet.flow_header().src_dst();
262    let src_port = NonZeroU16::new(src_port)?;
263    let dst_port = NonZeroU16::new(dst_port)?;
264    let device = device.downgrade();
265
266    core_ctx.with_demux(|demux: &DemuxState<I, _, _>| {
267        demux
268            .socketmap
269            .lookup_connected((src_ip, src_port), (dst_ip, dst_port), device)
270            .map(|entry| entry.id())
271    })
272}
273
274fn handle_incoming_packet<WireI, BC, CC, H>(
275    core_ctx: &mut CC,
276    bindings_ctx: &mut BC,
277    conn_addr: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
278    incoming_device: &CC::DeviceId,
279    header_info: &H,
280    incoming: &VerifiedTcpSegment<'_>,
281    marks: &Marks,
282    mut early_demux_socket: Option<DualStackTcpSocketId<WireI, CC::WeakDeviceId, BC>>,
283) where
284    WireI: DualStackIpExt,
285    BC: TcpBindingsContext<CC::DeviceId>
286        + BufferProvider<
287            BC::ReceiveBuffer,
288            BC::SendBuffer,
289            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
290            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
291        >,
292    CC: TcpContext<WireI, BC> + TcpContext<WireI::OtherVersion, BC>,
293    H: IpHeaderInfo<WireI>,
294{
295    trace_duration!("tcp::handle_incoming_packet");
296    let mut tw_reuse = None;
297
298    // If we have an early demux socket, then we don't need to look up
299    // connected sockets again. We may still need to lookup listener sockets
300    // when the socket we found a in the timed wait state.
301    let addr: IpAddrVec<WireI, _> = if early_demux_socket.is_some() {
302        let (ip, port) = conn_addr.local;
303        IpAddrVec::new_listener(ip, port)
304    } else {
305        conn_addr.into()
306    };
307    let mut addrs_to_search = AddrVecIter::<WireI, CC::WeakDeviceId, TcpPortSpec>::with_device(
308        addr,
309        incoming_device.downgrade(),
310    );
311
312    enum FoundSocket<S> {
313        // Typically holds the demux ID of the found socket, but may hold
314        // `None` if the found socket was destroyed as a result of the segment.
315        Yes(Option<S>),
316        No,
317    }
318    let found_socket = loop {
319        let sock = if let Some(early_demux_socket) = early_demux_socket.take() {
320            let device = match WireI::as_dual_stack_ip_socket(&early_demux_socket) {
321                EitherStack::ThisStack(conn_id) => conn_id.get_bound_device(core_ctx),
322                EitherStack::OtherStack(conn_id) => conn_id.get_bound_device(core_ctx),
323            };
324            let conn_addr = ConnAddr { ip: conn_addr, device };
325            Some(SocketLookupResult::Connection(early_demux_socket, conn_addr))
326        } else {
327            core_ctx.with_demux(|demux| lookup_socket::<WireI, CC, BC>(demux, &mut addrs_to_search))
328        };
329        match sock {
330            None => break FoundSocket::No,
331            Some(SocketLookupResult::Connection(demux_conn_id, conn_addr)) => {
332                // It is not possible to have two same connections that
333                // share the same local and remote IPs and ports.
334                assert_eq!(tw_reuse, None);
335                let disposition = match WireI::as_dual_stack_ip_socket(&demux_conn_id) {
336                    EitherStack::ThisStack(conn_id) => {
337                        try_handle_incoming_for_connection_dual_stack(
338                            core_ctx,
339                            bindings_ctx,
340                            conn_id,
341                            incoming_device,
342                            header_info,
343                            &incoming,
344                        )
345                    }
346                    EitherStack::OtherStack(conn_id) => {
347                        try_handle_incoming_for_connection_dual_stack(
348                            core_ctx,
349                            bindings_ctx,
350                            conn_id,
351                            incoming_device,
352                            header_info,
353                            &incoming,
354                        )
355                    }
356                };
357                match disposition {
358                    ConnectionIncomingSegmentDisposition::Destroy => {
359                        WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, demux_conn_id);
360                        break FoundSocket::Yes(None);
361                    }
362                    ConnectionIncomingSegmentDisposition::FoundSocket
363                    | ConnectionIncomingSegmentDisposition::Filtered => {
364                        break FoundSocket::Yes(Some(demux_conn_id));
365                    }
366                    ConnectionIncomingSegmentDisposition::ReuseCandidateForListener => {
367                        tw_reuse = Some((demux_conn_id, conn_addr));
368                    }
369                }
370            }
371            Some(SocketLookupResult::Listener((demux_listener_id, _listener_addr))) => {
372                match WireI::as_dual_stack_ip_socket(&demux_listener_id) {
373                    EitherStack::ThisStack(listener_id) => {
374                        let disposition = core_ctx.with_socket_mut_generators_transport_demux(
375                            &listener_id,
376                            |core_ctx, socket_state, isn, timestamp_offset| match core_ctx {
377                                MaybeDualStack::NotDualStack((core_ctx, converter)) => {
378                                    try_handle_incoming_for_listener::<WireI, WireI, CC, BC, _, _>(
379                                        core_ctx,
380                                        bindings_ctx,
381                                        &listener_id,
382                                        isn,
383                                        timestamp_offset,
384                                        socket_state,
385                                        header_info,
386                                        incoming,
387                                        conn_addr,
388                                        incoming_device,
389                                        &mut tw_reuse,
390                                        move |conn, addr| converter.convert_back((conn, addr)),
391                                        WireI::into_demux_socket_id,
392                                        marks,
393                                    )
394                                }
395                                MaybeDualStack::DualStack((core_ctx, converter)) => {
396                                    try_handle_incoming_for_listener::<_, _, CC, BC, _, _>(
397                                        core_ctx,
398                                        bindings_ctx,
399                                        &listener_id,
400                                        isn,
401                                        timestamp_offset,
402                                        socket_state,
403                                        header_info,
404                                        incoming,
405                                        conn_addr,
406                                        incoming_device,
407                                        &mut tw_reuse,
408                                        move |conn, addr| {
409                                            converter
410                                                .convert_back(EitherStack::ThisStack((conn, addr)))
411                                        },
412                                        WireI::into_demux_socket_id,
413                                        marks,
414                                    )
415                                }
416                            },
417                        );
418                        if try_handle_listener_incoming_disposition(
419                            core_ctx,
420                            bindings_ctx,
421                            disposition,
422                            &demux_listener_id,
423                            &mut tw_reuse,
424                            &mut addrs_to_search,
425                            conn_addr,
426                            incoming_device,
427                        ) {
428                            break FoundSocket::Yes(Some(demux_listener_id));
429                        }
430                    }
431                    EitherStack::OtherStack(listener_id) => {
432                        let disposition = core_ctx.with_socket_mut_generators_transport_demux(
433                            &listener_id,
434                            |core_ctx, socket_state, isn, timestamp_offset| {
435                                match core_ctx {
436                                    MaybeDualStack::NotDualStack((_core_ctx, _converter)) => {
437                                        // TODO(https://issues.fuchsia.dev/316408184):
438                                        // Remove this unreachable!.
439                                        unreachable!("OtherStack socket ID with non dual stack");
440                                    }
441                                    MaybeDualStack::DualStack((core_ctx, converter)) => {
442                                        let other_demux_id_converter =
443                                            core_ctx.other_demux_id_converter();
444                                        try_handle_incoming_for_listener::<_, _, CC, BC, _, _>(
445                                            core_ctx,
446                                            bindings_ctx,
447                                            &listener_id,
448                                            isn,
449                                            timestamp_offset,
450                                            socket_state,
451                                            header_info,
452                                            incoming,
453                                            conn_addr,
454                                            incoming_device,
455                                            &mut tw_reuse,
456                                            move |conn, addr| {
457                                                converter.convert_back(EitherStack::OtherStack((
458                                                    conn, addr,
459                                                )))
460                                            },
461                                            move |id| other_demux_id_converter.convert(id),
462                                            marks,
463                                        )
464                                    }
465                                }
466                            },
467                        );
468                        if try_handle_listener_incoming_disposition::<_, _, CC, BC, _>(
469                            core_ctx,
470                            bindings_ctx,
471                            disposition,
472                            &demux_listener_id,
473                            &mut tw_reuse,
474                            &mut addrs_to_search,
475                            conn_addr,
476                            incoming_device,
477                        ) {
478                            break FoundSocket::Yes(Some(demux_listener_id));
479                        }
480                    }
481                };
482            }
483        }
484    };
485
486    let demux_id = match found_socket {
487        FoundSocket::No => {
488            CounterContext::<TcpCountersWithoutSocket<WireI>>::counters(core_ctx)
489                .received_segments_no_dispatch
490                .increment();
491
492            // There is no existing TCP state, pretend it is closed
493            // and generate a RST if needed.
494            // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-21):
495            // CLOSED is fictional because it represents the state when
496            // there is no TCB, and therefore, no connection.
497            if let Some(seg) =
498                (Closed { reason: None::<Option<ConnectionError>> }.on_segment(&incoming.into()))
499            {
500                socket::send_tcp_segment::<WireI, WireI, _, _, _>(
501                    core_ctx,
502                    bindings_ctx,
503                    None,
504                    None,
505                    conn_addr,
506                    seg.into_empty(),
507                    &TcpIpSockOptions { marks: *marks },
508                );
509            }
510            None
511        }
512        FoundSocket::Yes(demux_id) => {
513            counters::increment_counter_with_optional_demux_id::<WireI, _, _, _, _>(
514                core_ctx,
515                demux_id.as_ref(),
516                |c| &c.received_segments_dispatched,
517            );
518            demux_id
519        }
520    };
521
522    if let Some(control) = incoming.control() {
523        counters::increment_counter_with_optional_demux_id::<WireI, _, _, _, _>(
524            core_ctx,
525            demux_id.as_ref(),
526            |c| match control {
527                Control::RST => &c.resets_received,
528                Control::SYN => &c.syns_received,
529                Control::FIN => &c.fins_received,
530            },
531        )
532    }
533}
534
535enum SocketLookupResult<I: DualStackIpExt, D: WeakDeviceIdentifier, BT: TcpBindingsTypes> {
536    Connection(I::DemuxSocketId<D, BT>, ConnAddr<ConnIpAddr<I::Addr, NonZeroU16, NonZeroU16>, D>),
537    Listener((I::DemuxSocketId<D, BT>, ListenerAddr<ListenerIpAddr<I::Addr, NonZeroU16>, D>)),
538}
539
540fn lookup_socket<I, CC, BC>(
541    DemuxState { socketmap, .. }: &DemuxState<I, CC::WeakDeviceId, BC>,
542    addrs_to_search: &mut AddrVecIter<I, CC::WeakDeviceId, TcpPortSpec>,
543) -> Option<SocketLookupResult<I, CC::WeakDeviceId, BC>>
544where
545    I: DualStackIpExt,
546    BC: TcpBindingsContext<CC::DeviceId>,
547    CC: TcpContext<I, BC>,
548{
549    addrs_to_search.find_map(|addr| {
550        match addr {
551            // Connections are always searched before listeners because they
552            // are more specific.
553            AddrVec::Conn(conn_addr) => {
554                socketmap.conns().get_by_addr(&conn_addr).map(|conn_addr_state| {
555                    SocketLookupResult::Connection(conn_addr_state.id(), conn_addr)
556                })
557            }
558            AddrVec::Listen(listener_addr) => {
559                // If we have a listener and the incoming segment is a SYN, we
560                // allocate a new connection entry in the demuxer.
561                // TODO(https://fxbug.dev/42052878): Support SYN cookies.
562
563                socketmap
564                    .listeners()
565                    .get_by_addr(&listener_addr)
566                    .and_then(|addr_state| match addr_state {
567                        ListenerAddrState::ExclusiveListener(id) => Some(id.clone()),
568                        ListenerAddrState::Shared { listener: Some(id), bound: _ } => {
569                            Some(id.clone())
570                        }
571                        ListenerAddrState::ExclusiveBound(_)
572                        | ListenerAddrState::Shared { listener: None, bound: _ } => None,
573                    })
574                    .map(|id| SocketLookupResult::Listener((id, listener_addr)))
575            }
576        }
577    })
578}
579
580#[derive(PartialEq, Eq)]
581enum ConnectionIncomingSegmentDisposition {
582    FoundSocket,
583    Filtered,
584    ReuseCandidateForListener,
585    Destroy,
586}
587
588enum ListenerIncomingSegmentDisposition<S> {
589    FoundSocket,
590    Filtered,
591    ConflictingConnection,
592    NoMatchingSocket,
593    NewConnection(S),
594}
595
596fn try_handle_incoming_for_connection_dual_stack<SockI, WireI, CC, BC, H>(
597    core_ctx: &mut CC,
598    bindings_ctx: &mut BC,
599    conn_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
600    incoming_device: &CC::DeviceId,
601    header_info: &H,
602    incoming: &VerifiedTcpSegment<'_>,
603) -> ConnectionIncomingSegmentDisposition
604where
605    SockI: DualStackIpExt,
606    WireI: Ip,
607    BC: TcpBindingsContext<CC::DeviceId>
608        + BufferProvider<
609            BC::ReceiveBuffer,
610            BC::SendBuffer,
611            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
612            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
613        >,
614    CC: TcpContext<SockI, BC>,
615    H: IpHeaderInfo<WireI>,
616{
617    core_ctx.with_socket_mut_transport_demux(conn_id, |core_ctx, socket_state| {
618        let TcpSocketState { socket_state, sharing: _, ip_options: _, socket_options } =
619            socket_state;
620
621        match run_socket_ingress_filter(
622            bindings_ctx,
623            incoming_device,
624            conn_id.socket_cookie(),
625            socket_options,
626            header_info,
627            incoming.tcp_segment(),
628        ) {
629            SocketIngressFilterResult::Accept => (),
630            SocketIngressFilterResult::Drop => {
631                return ConnectionIncomingSegmentDisposition::Filtered;
632            }
633        }
634
635        let (conn_and_addr, timer) = assert_matches!(
636            socket_state,
637            TcpSocketStateInner::Connected { conn, timer } => (conn, timer),
638            "invalid socket ID"
639        );
640        let this_or_other_stack = match core_ctx {
641            MaybeDualStack::DualStack((core_ctx, converter)) => {
642                match converter.convert(conn_and_addr) {
643                    EitherStack::ThisStack((conn, conn_addr)) => {
644                        // The socket belongs to the current stack, so we
645                        // want to deliver the segment to this stack.
646                        // Use `as_this_stack` to make the context types
647                        // match with the non-dual-stack case.
648                        EitherStack::ThisStack((
649                            core_ctx.as_this_stack(),
650                            conn,
651                            conn_addr,
652                            SockI::into_demux_socket_id(conn_id.clone()),
653                        ))
654                    }
655                    EitherStack::OtherStack((conn, conn_addr)) => {
656                        // We need to deliver from the other stack. i.e. we
657                        // need to deliver an IPv4 packet to the IPv6 stack.
658                        let demux_sock_id = core_ctx.into_other_demux_socket_id(conn_id.clone());
659                        EitherStack::OtherStack((core_ctx, conn, conn_addr, demux_sock_id))
660                    }
661                }
662            }
663            MaybeDualStack::NotDualStack((core_ctx, converter)) => {
664                let (conn, conn_addr) = converter.convert(conn_and_addr);
665                // Similar to the first case, we need deliver to this stack,
666                // but use `as_this_stack` to make the types match.
667                EitherStack::ThisStack((
668                    core_ctx.as_this_stack(),
669                    conn,
670                    conn_addr,
671                    SockI::into_demux_socket_id(conn_id.clone()),
672                ))
673            }
674        };
675
676        match this_or_other_stack {
677            EitherStack::ThisStack((core_ctx, conn, conn_addr, demux_conn_id)) => {
678                try_handle_incoming_for_connection::<_, _, CC, _, _>(
679                    core_ctx,
680                    bindings_ctx,
681                    conn_addr.clone(),
682                    conn_id,
683                    demux_conn_id,
684                    socket_options,
685                    conn,
686                    timer,
687                    incoming.into(),
688                )
689            }
690            EitherStack::OtherStack((core_ctx, conn, conn_addr, demux_conn_id)) => {
691                try_handle_incoming_for_connection::<_, _, CC, _, _>(
692                    core_ctx,
693                    bindings_ctx,
694                    conn_addr.clone(),
695                    conn_id,
696                    demux_conn_id,
697                    socket_options,
698                    conn,
699                    timer,
700                    incoming.into(),
701                )
702            }
703        }
704    })
705}
706
707/// Tries to handle the incoming segment by providing it to a connected socket.
708///
709/// Returns `FoundSocket` if the segment was handled; Otherwise,
710/// `ReuseCandidateForListener` will be returned if there is a defunct socket
711/// that is currently in TIME_WAIT, which is ready to be reused if there is an
712/// active listener listening on the port.
713fn try_handle_incoming_for_connection<SockI, WireI, CC, BC, DC>(
714    core_ctx: &mut DC,
715    bindings_ctx: &mut BC,
716    conn_addr: ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
717    conn_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
718    demux_id: WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
719    socket_options: &SocketOptions,
720    conn: &mut Connection<SockI, WireI, CC::WeakDeviceId, BC>,
721    timer: &mut BC::Timer,
722    incoming: Segment<&[u8]>,
723) -> ConnectionIncomingSegmentDisposition
724where
725    SockI: DualStackIpExt,
726    WireI: DualStackIpExt,
727    BC: TcpBindingsContext<CC::DeviceId>
728        + BufferProvider<
729            BC::ReceiveBuffer,
730            BC::SendBuffer,
731            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
732            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
733        >,
734    CC: TcpContext<SockI, BC>,
735    DC: TransportIpContext<WireI, BC, DeviceId = CC::DeviceId, WeakDeviceId = CC::WeakDeviceId>
736        + DeviceIpSocketHandler<SockI, BC>
737        + TcpDemuxContext<WireI, CC::WeakDeviceId, BC>
738        + TcpCounterContext<SockI, CC::WeakDeviceId, BC>
739        + CoreTxMetadataContext<TcpSocketTxMetadata<SockI, CC::WeakDeviceId, BC>, BC>,
740{
741    let Connection { accept_queue, state, ip_sock, defunct, soft_error: _, handshake_status } =
742        conn;
743
744    // Per RFC 9293 Section 3.6.1:
745    //   When a connection is closed actively, it MUST linger in the TIME-WAIT
746    //   state for a time 2xMSL (Maximum Segment Lifetime) (MUST-13). However,
747    //   it MAY accept a new SYN from the remote TCP endpoint to reopen the
748    //   connection directly from TIME-WAIT state (MAY-2), if it:
749    //
750    //   (1) assigns its initial sequence number for the new connection to be
751    //       larger than the largest sequence number it used on the previous
752    //       connection incarnation, and
753    //   (2) returns to TIME-WAIT state if the SYN turns out to be an old
754    //       duplicate.
755    if *defunct
756        && incoming.header().control == Some(Control::SYN)
757        && incoming.header().ack.is_none()
758    {
759        if let State::TimeWait(TimeWait { last_seq: _, closed_rcv, expiry: _, snd_info: _ }) = state
760        {
761            if !incoming.header().seq.before(closed_rcv.ack) {
762                return ConnectionIncomingSegmentDisposition::ReuseCandidateForListener;
763            }
764        }
765    }
766    let (reply, passive_open, data_acked, newly_closed) = state.on_segment::<_, BC>(
767        &conn_id.either(),
768        &TcpCountersRefs::from_ctx(core_ctx, conn_id),
769        incoming,
770        bindings_ctx.now(),
771        socket_options,
772        *defunct,
773    );
774
775    match data_acked {
776        DataAcked::Yes => {
777            core_ctx.confirm_reachable(bindings_ctx, ip_sock, &socket_options.ip_options)
778        }
779        DataAcked::No => {}
780    }
781
782    match state {
783        State::Listen(_) => {
784            unreachable!("has an invalid status: {:?}", conn.state)
785        }
786        State::SynSent(_) | State::SynRcvd(_) => {
787            assert_eq!(*handshake_status, HandshakeStatus::Pending)
788        }
789        State::Established(_)
790        | State::FinWait1(_)
791        | State::FinWait2(_)
792        | State::Closing(_)
793        | State::CloseWait(_)
794        | State::LastAck(_)
795        | State::TimeWait(_) => {
796            if handshake_status
797                .update_if_pending(HandshakeStatus::Completed { reported: accept_queue.is_some() })
798            {
799                core_ctx.confirm_reachable(bindings_ctx, ip_sock, &socket_options.ip_options);
800            }
801        }
802        State::Closed(Closed { reason }) => {
803            // We remove the socket from the socketmap and cancel the timers
804            // regardless of the socket being defunct or not. The justification
805            // is that CLOSED is a synthetic state and it means no connection
806            // exists, thus it should not exist in the demuxer.
807            //
808            // If the socket was already in the closed state we can assume it's
809            // no longer in the demux.
810            socket::handle_newly_closed(
811                core_ctx,
812                bindings_ctx,
813                newly_closed,
814                &demux_id,
815                &conn_addr,
816                timer,
817            );
818            if let Some(accept_queue) = accept_queue {
819                accept_queue.remove(&conn_id);
820                *defunct = true;
821            }
822            if *defunct {
823                // If the client has promised to not touch the socket again,
824                // we can destroy the socket finally.
825                return ConnectionIncomingSegmentDisposition::Destroy;
826            }
827            let _: bool = handshake_status.update_if_pending(match reason {
828                None => HandshakeStatus::Completed { reported: accept_queue.is_some() },
829                Some(_err) => HandshakeStatus::Aborted,
830            });
831        }
832    }
833
834    if let Some(seg) = reply {
835        socket::send_tcp_segment(
836            core_ctx,
837            bindings_ctx,
838            Some(conn_id),
839            Some(&ip_sock),
840            conn_addr.ip,
841            seg.into_empty(),
842            &socket_options.ip_options,
843        );
844    }
845
846    // Send any enqueued data, if there is any.
847    socket::do_send_inner_and_then_handle_newly_closed(
848        conn_id,
849        &demux_id,
850        socket_options,
851        conn,
852        DoSendLimit::MultipleSegments,
853        &conn_addr,
854        timer,
855        core_ctx,
856        bindings_ctx,
857    );
858
859    // Enqueue the connection to the associated listener
860    // socket's accept queue.
861    if let Some(passive_open) = passive_open {
862        let accept_queue = conn.accept_queue.as_ref().expect("no accept queue but passive open");
863        accept_queue.notify_ready(conn_id, passive_open);
864    }
865
866    // We found a valid connection for the segment.
867    ConnectionIncomingSegmentDisposition::FoundSocket
868}
869
870/// Responds to the disposition returned by [`try_handle_incoming_for_listener`].
871///
872/// Returns true if we have found the right socket and there is no need to
873/// continue the iteration for finding the next-best candidate.
874fn try_handle_listener_incoming_disposition<SockI, WireI, CC, BC, Addr>(
875    core_ctx: &mut CC,
876    bindings_ctx: &mut BC,
877    disposition: ListenerIncomingSegmentDisposition<PrimaryRc<SockI, CC::WeakDeviceId, BC>>,
878    demux_listener_id: &WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
879    tw_reuse: &mut Option<(WireI::DemuxSocketId<CC::WeakDeviceId, BC>, Addr)>,
880    addrs_to_search: &mut AddrVecIter<WireI, CC::WeakDeviceId, TcpPortSpec>,
881    conn_addr: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
882    incoming_device: &CC::DeviceId,
883) -> bool
884where
885    SockI: DualStackIpExt,
886    WireI: DualStackIpExt,
887    CC: TcpContext<SockI, BC> + TcpContext<WireI, BC> + TcpContext<WireI::OtherVersion, BC>,
888    BC: TcpBindingsContext<CC::DeviceId>,
889{
890    match disposition {
891        ListenerIncomingSegmentDisposition::FoundSocket => true,
892        ListenerIncomingSegmentDisposition::Filtered => true,
893        ListenerIncomingSegmentDisposition::ConflictingConnection => {
894            // We're about to rewind the lookup. If we got a
895            // conflicting connection it means tw_reuse has been
896            // removed from the demux state and we need to destroy
897            // it.
898            if let Some((tw_reuse, _)) = tw_reuse.take() {
899                WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, tw_reuse);
900            }
901
902            // Reset the address vector iterator and go again, a
903            // conflicting connection was found.
904            *addrs_to_search = AddrVecIter::<WireI, CC::WeakDeviceId, TcpPortSpec>::with_device(
905                conn_addr.into(),
906                incoming_device.downgrade(),
907            );
908            false
909        }
910        ListenerIncomingSegmentDisposition::NoMatchingSocket => false,
911        ListenerIncomingSegmentDisposition::NewConnection(primary) => {
912            // If we have a new connection, we need to add it to the
913            // set of all sockets.
914
915            // First things first, if we got here then tw_reuse is
916            // gone so we need to destroy it.
917            if let Some((tw_reuse, _)) = tw_reuse.take() {
918                WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, tw_reuse);
919            }
920
921            // Now put the new connection into the socket map.
922            //
923            // Note that there's a possible subtle race here where
924            // another thread could have already operated further on
925            // this connection and marked it for destruction which
926            // puts the entry in the DOA state, if we see that we
927            // must immediately destroy the socket after having put
928            // it in the map.
929            let id = TcpSocketId(PrimaryRc::clone_strong(&primary));
930            let to_destroy = core_ctx.with_all_sockets_mut(move |all_sockets| {
931                let insert_entry = TcpSocketSetEntry::Primary(primary);
932                match all_sockets.entry(id) {
933                    hash_map::Entry::Vacant(v) => {
934                        let _: &mut _ = v.insert(insert_entry);
935                        None
936                    }
937                    hash_map::Entry::Occupied(mut o) => {
938                        // We're holding on to the primary ref, the
939                        // only possible state here should be a DOA
940                        // entry.
941                        assert_matches!(
942                            core::mem::replace(o.get_mut(), insert_entry),
943                            TcpSocketSetEntry::DeadOnArrival
944                        );
945                        Some(o.key().clone())
946                    }
947                }
948            });
949            // NB: we're releasing and reaquiring the
950            // all_sockets_mut lock here for the convenience of not
951            // needing different versions of `destroy_socket`. This
952            // should be fine because the race this is solving
953            // should not be common. If we have correct thread
954            // attribution per flow it should effectively become
955            // impossible so we go for code simplicity here.
956            if let Some(to_destroy) = to_destroy {
957                socket::destroy_socket(core_ctx, bindings_ctx, to_destroy);
958            }
959            counters::increment_counter_for_demux_id::<WireI, _, _, _, _>(
960                core_ctx,
961                demux_listener_id,
962                |c| &c.passive_connection_openings,
963            );
964            true
965        }
966    }
967}
968
969/// Tries to handle an incoming segment by passing it to a listening socket.
970///
971/// Returns `FoundSocket` if the segment was handled, otherwise `NoMatchingSocket`.
972fn try_handle_incoming_for_listener<SockI, WireI, CC, BC, DC, H>(
973    core_ctx: &mut DC,
974    bindings_ctx: &mut BC,
975    listener_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
976    isn: &IsnGenerator<BC::Instant>,
977    timestamp_offset: &TimestampOffsetGenerator<BC::Instant>,
978    socket_state: &mut TcpSocketState<SockI, CC::WeakDeviceId, BC>,
979    header_info: &H,
980    incoming: &VerifiedTcpSegment<'_>,
981    incoming_addrs: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
982    incoming_device: &CC::DeviceId,
983    tw_reuse: &mut Option<(
984        WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
985        ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
986    )>,
987    make_connection: impl FnOnce(
988        Connection<SockI, WireI, CC::WeakDeviceId, BC>,
989        ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
990    ) -> SockI::ConnectionAndAddr<CC::WeakDeviceId, BC>,
991    make_demux_id: impl Fn(
992        TcpSocketId<SockI, CC::WeakDeviceId, BC>,
993    ) -> WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
994    marks: &Marks,
995) -> ListenerIncomingSegmentDisposition<PrimaryRc<SockI, CC::WeakDeviceId, BC>>
996where
997    SockI: DualStackIpExt,
998    WireI: DualStackIpExt,
999    BC: TcpBindingsContext<CC::DeviceId>
1000        + BufferProvider<
1001            BC::ReceiveBuffer,
1002            BC::SendBuffer,
1003            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
1004            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
1005        >,
1006    CC: TcpContext<SockI, BC>,
1007    DC: TransportIpContext<WireI, BC, DeviceId = CC::DeviceId, WeakDeviceId = CC::WeakDeviceId>
1008        + DeviceIpSocketHandler<WireI, BC>
1009        + TcpDemuxContext<WireI, CC::WeakDeviceId, BC>
1010        + TcpCounterContext<SockI, CC::WeakDeviceId, BC>
1011        + CoreTxMetadataContext<TcpSocketTxMetadata<SockI, CC::WeakDeviceId, BC>, BC>,
1012    H: IpHeaderInfo<WireI>,
1013{
1014    let Listener { addr: listener_addr, accept_queue, backlog, buffer_sizes } =
1015        match &socket_state.socket_state {
1016            TcpSocketStateInner::Bound(_) => {
1017                // If the socket is only bound, but not listening.
1018                return ListenerIncomingSegmentDisposition::NoMatchingSocket;
1019            }
1020            TcpSocketStateInner::Listener(listener) => listener,
1021            _ => panic!("unexpected socket state: {:?}", socket_state.socket_state),
1022        };
1023
1024    let ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) } =
1025        incoming_addrs;
1026
1027    match run_socket_ingress_filter(
1028        bindings_ctx,
1029        incoming_device,
1030        listener_id.socket_cookie(),
1031        &socket_state.socket_options,
1032        header_info,
1033        incoming.tcp_segment(),
1034    ) {
1035        SocketIngressFilterResult::Accept => (),
1036        SocketIngressFilterResult::Drop => {
1037            return ListenerIncomingSegmentDisposition::Filtered;
1038        }
1039    }
1040
1041    // Note that this checks happens at the very beginning, before we try to
1042    // reuse the connection in TIME-WAIT, this is because we need to store the
1043    // reused connection in the accept queue so we have to respect its limit.
1044    if accept_queue.len() == backlog.get() {
1045        core_ctx.increment_both(listener_id, |counters| &counters.listener_queue_overflow);
1046        core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1047        debug!("incoming SYN dropped because of the full backlog of the listener");
1048        return ListenerIncomingSegmentDisposition::FoundSocket;
1049    }
1050
1051    // Ensure that if the remote address requires a zone, we propagate that to
1052    // the address for the connected socket.
1053    let bound_device = listener_addr.as_ref().clone();
1054    let bound_device = if remote_ip.as_ref().must_have_zone() {
1055        Some(bound_device.map_or(EitherDeviceId::Strong(incoming_device), EitherDeviceId::Weak))
1056    } else {
1057        bound_device.map(EitherDeviceId::Weak)
1058    };
1059
1060    let ip_options = TcpIpSockOptions { marks: *marks, ..socket_state.socket_options.ip_options };
1061    let socket_options = SocketOptions { ip_options, ..socket_state.socket_options };
1062
1063    let bound_device = bound_device.as_ref().map(|d| d.as_ref());
1064    let ip_sock = match core_ctx.new_ip_socket(
1065        bindings_ctx,
1066        IpSocketArgs {
1067            device: bound_device,
1068            local_ip: IpDeviceAddr::new_from_socket_ip_addr(local_ip),
1069            remote_ip,
1070            proto: IpProto::Tcp.into(),
1071            options: &ip_options,
1072        },
1073    ) {
1074        Ok(ip_sock) => ip_sock,
1075        err @ Err(IpSockCreationError::Route(_)) => {
1076            core_ctx.increment_both(listener_id, |counters| &counters.passive_open_no_route_errors);
1077            core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1078            debug!("cannot construct an ip socket to the SYN originator: {:?}, ignoring", err);
1079            return ListenerIncomingSegmentDisposition::NoMatchingSocket;
1080        }
1081    };
1082
1083    let isn = isn.generate(
1084        bindings_ctx.now(),
1085        (ip_sock.local_ip().clone().into(), local_port),
1086        (ip_sock.remote_ip().clone(), remote_port),
1087    );
1088    let timestamp_offset = timestamp_offset.generate::<SocketIpAddr<WireI::Addr>, NonZeroU16>(
1089        bindings_ctx.now(),
1090        (ip_sock.local_ip().clone().into(), local_port),
1091        (ip_sock.remote_ip().clone(), remote_port),
1092    );
1093    let device_mms = match core_ctx.get_mms(bindings_ctx, &ip_sock, &socket_options.ip_options) {
1094        Ok(mms) => mms,
1095        Err(err) => {
1096            // If we cannot find a device or the device's MTU is too small,
1097            // there isn't much we can do here since sending a RST back is
1098            // impossible, we just need to silent drop the segment.
1099            error!("Cannot find a device with large enough MTU for the connection");
1100            core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1101            match err {
1102                MmsError::NoDevice(_) | MmsError::MTUTooSmall(_) => {
1103                    return ListenerIncomingSegmentDisposition::FoundSocket;
1104                }
1105            }
1106        }
1107    };
1108    let Some(device_mss) = Mss::from_mms(device_mms) else {
1109        return ListenerIncomingSegmentDisposition::FoundSocket;
1110    };
1111
1112    let mut state = State::Listen(Closed::<Initial>::listen(
1113        isn,
1114        timestamp_offset,
1115        buffer_sizes.clone(),
1116        device_mss,
1117        Mss::default::<WireI>(),
1118        socket_options.user_timeout,
1119    ));
1120
1121    // Prepare a reply to be sent out.
1122    //
1123    // We might end up discarding the reply in case we can't instantiate this
1124    // new connection.
1125    let result = state.on_segment::<_, BC>(
1126        // NB: This is a bit of a lie, we're passing the listener ID to process
1127        // the first segment because we don't have an ID allocated yet. This is
1128        // okay because the state machine ID is only for debugging purposes.
1129        &listener_id.either(),
1130        &TcpCountersRefs::from_ctx(core_ctx, listener_id),
1131        incoming.into(),
1132        bindings_ctx.now(),
1133        &SocketOptions::default(),
1134        false, /* defunct */
1135    );
1136    let reply = assert_matches!(
1137        result,
1138        (reply, None, /* data_acked */ _, NewlyClosed::No /* can't become closed */) => reply
1139    );
1140
1141    let result = if matches!(state, State::SynRcvd(_)) {
1142        let poll_send_at = state.poll_send_at().expect("no retrans timer");
1143        let bound_device = ip_sock.device().cloned();
1144
1145        let addr = ConnAddr {
1146            ip: ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) },
1147            device: bound_device,
1148        };
1149
1150        let new_socket = core_ctx.with_demux_mut(|DemuxState { socketmap, .. }| {
1151            // If we're reusing an entry, remove it from the demux before
1152            // proceeding.
1153            //
1154            // We could just reuse the old allocation for the new connection but
1155            // because of the restrictions on the socket map data structure (for
1156            // good reasons), we can't update the sharing info unconditionally.
1157            // So here we just remove the old connection and create a new one.
1158            // Also this approach has the benefit of not accidentally persisting
1159            // the old state that we don't want.
1160            if let Some((tw_reuse, conn_addr)) = tw_reuse {
1161                match socketmap.conns_mut().remove(tw_reuse, &conn_addr) {
1162                    Ok(()) => {
1163                        // NB: We're removing the tw_reuse connection from the
1164                        // demux here, but not canceling its timer. The timer is
1165                        // canceled via drop when we destroy the socket. Special
1166                        // care is taken when handling timers in the time wait
1167                        // state to account for this.
1168                    }
1169                    Err(NotFoundError) => {
1170                        // We could lose a race trying to reuse the tw_reuse
1171                        // socket, so we just accept the loss and be happy that
1172                        // the conn_addr we want to use is free.
1173                    }
1174                }
1175            }
1176
1177            // Try to create and add the new socket to the demux.
1178            let accept_queue_clone = accept_queue.clone();
1179            let ip_sock = ip_sock.clone();
1180            let bindings_ctx_moved = &mut *bindings_ctx;
1181            let sharing = socket_state.sharing;
1182            match socketmap.conns_mut().try_insert_with(addr, sharing, move |addr, sharing| {
1183                let conn = make_connection(
1184                    Connection {
1185                        accept_queue: Some(accept_queue_clone),
1186                        state,
1187                        ip_sock,
1188                        defunct: false,
1189                        soft_error: None,
1190                        handshake_status: HandshakeStatus::Pending,
1191                    },
1192                    addr,
1193                );
1194
1195                let (id, primary) = TcpSocketId::new_cyclic(
1196                    |weak| {
1197                        let mut timer = CC::new_timer(bindings_ctx_moved, weak);
1198                        // Schedule the timer here because we can't acquire the lock
1199                        // later. This only runs when inserting into the demux
1200                        // succeeds so it's okay.
1201                        assert_eq!(
1202                            bindings_ctx_moved.schedule_timer_instant(poll_send_at, &mut timer),
1203                            None
1204                        );
1205                        TcpSocketStateInner::Connected { conn, timer }
1206                    },
1207                    sharing,
1208                    socket_options,
1209                );
1210                (make_demux_id(id.clone()), (primary, id))
1211            }) {
1212                Ok((_entry, (primary, id))) => {
1213                    // Make sure the new socket is in the pending accept queue
1214                    // before we release the demux lock.
1215                    accept_queue.push_pending(id);
1216                    Some(primary)
1217                }
1218                Err(e) => {
1219                    // The only error we accept here is if the entry exists
1220                    // fully, any indirect conflicts are unexpected because we
1221                    // know the listener is still alive and installed in the
1222                    // demux.
1223                    assert_matches!(e, InsertError::Exists);
1224                    // If we fail to insert it means we lost a race and this
1225                    // packet is destined to a connection that is already
1226                    // established. In that case we should tell the demux code
1227                    // to retry demuxing it all over again.
1228                    None
1229                }
1230            }
1231        });
1232
1233        match new_socket {
1234            Some(new_socket) => ListenerIncomingSegmentDisposition::NewConnection(new_socket),
1235            None => {
1236                // We didn't create a new connection, short circuit early and
1237                // don't send out the pending segment.
1238                core_ctx
1239                    .increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1240                return ListenerIncomingSegmentDisposition::ConflictingConnection;
1241            }
1242        }
1243    } else {
1244        // We found a valid listener for the segment even if the connection
1245        // state is not a newly pending connection.
1246        ListenerIncomingSegmentDisposition::FoundSocket
1247    };
1248
1249    // We can send a reply now if we got here.
1250    if let Some(seg) = reply {
1251        socket::send_tcp_segment(
1252            core_ctx,
1253            bindings_ctx,
1254            Some(&listener_id),
1255            Some(&ip_sock),
1256            incoming_addrs,
1257            seg.into_empty(),
1258            &socket_options.ip_options,
1259        );
1260    }
1261
1262    result
1263}
1264
1265pub(super) fn tcp_serialize_segment<'a, I, P>(
1266    header: &'a SegmentHeader,
1267    data: P,
1268    conn_addr: ConnIpAddr<I::Addr, NonZeroU16, NonZeroU16>,
1269) -> impl TransportPacketSerializer<I, Buffer = EmptyBuf> + Debug + 'a
1270where
1271    I: FilterIpExt,
1272    P: InnerPacketBuilder + Debug + Payload + 'a,
1273{
1274    let SegmentHeader { seq, ack, wnd, control, options, push } = header;
1275    let ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) } = conn_addr;
1276    let mut builder = TcpSegmentBuilder::new(
1277        local_ip.addr(),
1278        remote_ip.addr(),
1279        local_port,
1280        remote_port,
1281        (*seq).into(),
1282        ack.map(Into::into),
1283        u16::from(*wnd),
1284    );
1285    builder.psh(*push);
1286    match control {
1287        None => {}
1288        Some(Control::SYN) => builder.syn(true),
1289        Some(Control::FIN) => builder.fin(true),
1290        Some(Control::RST) => builder.rst(true),
1291    }
1292    TcpSegmentBuilderWithOptions::new(builder, options.builder())
1293        .unwrap_or_else(|TcpOptionsTooLongError| {
1294            panic!("Too many TCP options");
1295        })
1296        .wrap_body(data.into_serializer())
1297}
1298
1299fn run_socket_ingress_filter<I, BC, D>(
1300    bindings_ctx: &BC,
1301    incoming_device: &D,
1302    socket_cookie: SocketCookie,
1303    socket_options: &SocketOptions,
1304    header_info: &impl IpHeaderInfo<I>,
1305    tcp_segment: &TcpSegment<&'_ [u8]>,
1306) -> SocketIngressFilterResult
1307where
1308    I: Ip,
1309    BC: TcpBindingsContext<D>,
1310    D: StrongDeviceIdentifier,
1311{
1312    let [ip_prefix, ip_options] = header_info.as_bytes();
1313    let [tcp_prefix, tcp_options, data] = tcp_segment.as_bytes();
1314    let mut slices = [ip_prefix, ip_options, tcp_prefix, tcp_options, data];
1315    let data = FragmentedByteSlice::new(&mut slices);
1316
1317    bindings_ctx.socket_ops_filter().on_ingress(
1318        I::VERSION,
1319        data,
1320        incoming_device,
1321        socket_cookie,
1322        &socket_options.ip_options.marks,
1323    )
1324}
1325
1326#[cfg(test)]
1327mod test {
1328    use ip_test_macro::ip_test;
1329    use netstack3_base::{
1330        HandshakeOptions, Options, ResetOptions, SackBlocks, SegmentOptions, UnscaledWindowSize,
1331    };
1332    use packet::{ParseBuffer as _, Serializer as _};
1333    use packet_formats::tcp::options::TcpOptions as _;
1334    use test_case::test_case;
1335
1336    use super::*;
1337
1338    trait TestIpExt: netstack3_base::testutil::TestIpExt + FilterIpExt {}
1339    impl<T> TestIpExt for T where T: netstack3_base::testutil::TestIpExt + FilterIpExt {}
1340
1341    const SEQ: SeqNum = SeqNum::new(12345);
1342    const ACK: SeqNum = SeqNum::new(67890);
1343    const FAKE_DATA: &'static [u8] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 0];
1344
1345    #[ip_test(I)]
1346    #[test_case(
1347        Segment::syn(SEQ, UnscaledWindowSize::from(u16::MAX),
1348        HandshakeOptions::default()), &[]
1349        ; "syn")]
1350    #[test_case(
1351        Segment::syn(SEQ, UnscaledWindowSize::from(u16::MAX),
1352        HandshakeOptions {
1353            mss: Some(Mss::new(1440).unwrap()),
1354            ..Default::default() }), &[]
1355            ; "syn with mss")]
1356    #[test_case(
1357        Segment::ack(SEQ, ACK, UnscaledWindowSize::from(u16::MAX), SegmentOptions::default()),
1358        &[]; "ack")]
1359    #[test_case(Segment::with_fake_data(SEQ, ACK, FAKE_DATA), FAKE_DATA; "data")]
1360    #[test_case(Segment::new_assert_no_discard(SegmentHeader {
1361            seq: SEQ,
1362            ack: Some(ACK),
1363            push: true,
1364            wnd: UnscaledWindowSize::from(u16::MAX),
1365            ..Default::default()
1366        },
1367        FAKE_DATA
1368    ), FAKE_DATA; "push")]
1369    fn tcp_serialize_segment<I: TestIpExt>(segment: Segment<&[u8]>, expected_body: &[u8]) {
1370        const SOURCE_PORT: NonZeroU16 = NonZeroU16::new(1111).unwrap();
1371        const DEST_PORT: NonZeroU16 = NonZeroU16::new(2222).unwrap();
1372
1373        let (header, data) = segment.into_parts();
1374        let serializer = super::tcp_serialize_segment::<I, _>(
1375            &header,
1376            data,
1377            ConnIpAddr {
1378                local: (SocketIpAddr::try_from(I::TEST_ADDRS.local_ip).unwrap(), SOURCE_PORT),
1379                remote: (SocketIpAddr::try_from(I::TEST_ADDRS.remote_ip).unwrap(), DEST_PORT),
1380            },
1381        );
1382
1383        let mut serialized = serializer.serialize_vec_outer().unwrap().unwrap_b();
1384        let parsed_segment = serialized
1385            .parse_with::<_, TcpSegment<_>>(TcpParseArgs::new(
1386                *I::TEST_ADDRS.remote_ip,
1387                *I::TEST_ADDRS.local_ip,
1388            ))
1389            .expect("is valid segment");
1390
1391        assert_eq!(parsed_segment.src_port(), SOURCE_PORT);
1392        assert_eq!(parsed_segment.dst_port(), DEST_PORT);
1393        assert_eq!(parsed_segment.seq_num(), u32::from(SEQ));
1394        assert_eq!(parsed_segment.psh(), header.push);
1395        assert_eq!(
1396            UnscaledWindowSize::from(parsed_segment.window_size()),
1397            UnscaledWindowSize::from(u16::MAX)
1398        );
1399
1400        let (mss, window_scale, sack_permitted, sack_blocks, timestamp) = match header.options {
1401            Options::Handshake(HandshakeOptions {
1402                mss,
1403                window_scale,
1404                sack_permitted,
1405                timestamp,
1406            }) => (mss, window_scale, sack_permitted, SackBlocks::EMPTY, timestamp),
1407            Options::Segment(SegmentOptions { timestamp, sack_blocks }) => {
1408                (None, None, false, sack_blocks, timestamp)
1409            }
1410            Options::Reset(ResetOptions { timestamp }) => {
1411                (None, None, false, SackBlocks::EMPTY, timestamp)
1412            }
1413        };
1414        assert_eq!(mss.map(|mss| mss.get()), parsed_segment.options().mss());
1415        assert_eq!(window_scale.map(|ws| ws.get()), parsed_segment.options().window_scale());
1416        assert_eq!(sack_permitted, parsed_segment.options().sack_permitted());
1417        assert_eq!(sack_blocks.as_slice(), parsed_segment.options().sack_blocks());
1418        assert_eq!(
1419            timestamp.as_ref().map(Into::into).as_ref(),
1420            parsed_segment.options().timestamp()
1421        );
1422
1423        assert_eq!(parsed_segment.into_body(), expected_body);
1424    }
1425}