Skip to main content

netstack3_tcp/socket/
demux.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Defines the entry point of TCP packets, by directing them into the correct
6//! state machine.
7
8use core::fmt::Debug;
9use core::num::NonZeroU16;
10
11use assert_matches::assert_matches;
12use log::{debug, error, warn};
13use net_types::ip::Ip;
14use net_types::{SpecifiedAddr, Witness as _};
15use netstack3_base::socket::{
16    AddrIsMappedError, AddrVec, AddrVecIter, ConnAddr, ConnIpAddr, InsertError, IpAddrVec,
17    ListenerAddr, ListenerIpAddr, SocketCookie, SocketIpAddr, SocketIpAddrExt as _,
18};
19use netstack3_base::{
20    BidirectionalConverter as _, Control, CounterContext, CtxPair, EitherDeviceId, IpDeviceAddr,
21    Marks, Mss, NotFoundError, Payload, Segment, SegmentHeader, SeqNum, StrongDeviceIdentifier,
22    VerifiedTcpSegment, WeakDeviceIdentifier,
23};
24use netstack3_filter::{
25    FilterIpExt, SocketIngressFilterResult, SocketOpsFilter, TransportPacketSerializer,
26};
27use netstack3_hashmap::hash_map;
28use netstack3_ip::socket::{IpSockCreationError, IpSocketArgs, MmsError};
29use netstack3_ip::{
30    IpHeaderInfo, IpTransportContext, LocalDeliveryPacketInfo, ReceiveIpPacketMeta,
31    TransportIpContext,
32};
33use netstack3_trace::trace_duration;
34use packet::{
35    BufferMut, BufferView as _, EmptyBuf, FragmentedByteSlice, InnerPacketBuilder,
36    NestablePacketBuilder as _, ParseBuffer,
37};
38use packet_formats::error::ParseError;
39use packet_formats::ip::IpProto;
40use packet_formats::tcp::{
41    TcpFlowAndSeqNum, TcpOptionsTooLongError, TcpParseArgs, TcpSegment, TcpSegmentBuilder,
42    TcpSegmentBuilderWithOptions, TcpSegmentRaw,
43};
44
45use crate::internal::base::{BufferSizes, ConnectionError, SocketOptions, TcpIpSockOptions};
46use crate::internal::counters::{
47    self, TcpCounterContext, TcpCountersRefs, TcpCountersWithoutSocket,
48};
49use crate::internal::socket::generators::{IsnGenerator, TimestampOffsetGenerator};
50use crate::internal::socket::{
51    self, AsThisStack as _, Connection, CoreTxMetadataContext, DemuxState, DeviceIpSocketHandler,
52    DoSendLimit, DualStackBaseIpExt, DualStackDemuxIdConverter as _, DualStackIpExt, EitherStack,
53    HandshakeStatus, Listener, ListenerAddrState, MaybeDualStack, PrimaryRc, TcpApi,
54    TcpBindingsContext, TcpBindingsTypes, TcpContext, TcpDemuxContext, TcpDualStackContext,
55    TcpIpTransportContext, TcpPortSpec, TcpSocketId, TcpSocketSetEntry, TcpSocketState,
56    TcpSocketStateInner, TcpSocketTxMetadata,
57};
58use crate::internal::state::{
59    BufferProvider, Closed, DataAcked, Initial, NewlyClosed, State, TimeWait,
60};
61
62impl<BT: TcpBindingsTypes> BufferProvider<BT::ReceiveBuffer, BT::SendBuffer> for BT {
63    type ActiveOpen = BT::ListenerNotifierOrProvidedBuffers;
64
65    type PassiveOpen = BT::ReturnedBuffers;
66
67    fn new_passive_open_buffers(
68        buffer_sizes: BufferSizes,
69    ) -> (BT::ReceiveBuffer, BT::SendBuffer, Self::PassiveOpen) {
70        BT::new_passive_open_buffers(buffer_sizes)
71    }
72}
73
74/// Alias for a SocketId that can reference either V4 or V6 socket.
75pub type DualStackTcpSocketId<I, D, BT> = <I as DualStackBaseIpExt>::DemuxSocketId<D, BT>;
76
77impl<I, BC, CC> IpTransportContext<I, BC, CC> for TcpIpTransportContext
78where
79    I: DualStackIpExt,
80    BC: TcpBindingsContext<CC::DeviceId>
81        + BufferProvider<
82            BC::ReceiveBuffer,
83            BC::SendBuffer,
84            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
85            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
86        >,
87    CC: TcpContext<I, BC> + TcpContext<I::OtherVersion, BC>,
88{
89    type EarlyDemuxSocket = DualStackTcpSocketId<I, CC::WeakDeviceId, BC>;
90
91    fn early_demux<B: ParseBuffer>(
92        core_ctx: &mut CC,
93        device: &CC::DeviceId,
94        src_ip: I::Addr,
95        dst_ip: I::Addr,
96        buffer: B,
97    ) -> Option<Self::EarlyDemuxSocket> {
98        early_demux_ip_packet::<I, _, _, _>(core_ctx, device, src_ip, dst_ip, buffer)
99    }
100
101    fn receive_icmp_error(
102        core_ctx: &mut CC,
103        bindings_ctx: &mut BC,
104        _device: &CC::DeviceId,
105        original_src_ip: Option<SpecifiedAddr<I::Addr>>,
106        original_dst_ip: SpecifiedAddr<I::Addr>,
107        mut original_body: &[u8],
108        err: I::ErrorCode,
109    ) {
110        let mut buffer = &mut original_body;
111        let Some(flow_and_seqnum) = buffer.take_obj_front::<TcpFlowAndSeqNum>() else {
112            error!("received an ICMP error but its body is less than 8 bytes");
113            return;
114        };
115
116        let Some(original_src_ip) = original_src_ip else { return };
117        let Some(original_src_port) = NonZeroU16::new(flow_and_seqnum.src_port()) else { return };
118        let Some(original_dst_port) = NonZeroU16::new(flow_and_seqnum.dst_port()) else { return };
119        let original_seqnum = SeqNum::new(flow_and_seqnum.sequence_num());
120
121        TcpApi::<I, _>::new(CtxPair { core_ctx, bindings_ctx }).on_icmp_error(
122            original_src_ip,
123            original_dst_ip,
124            original_src_port,
125            original_dst_port,
126            original_seqnum,
127            err.into(),
128        );
129    }
130
131    fn receive_ip_packet<B: BufferMut, H: IpHeaderInfo<I>>(
132        core_ctx: &mut CC,
133        bindings_ctx: &mut BC,
134        device: &CC::DeviceId,
135        remote_ip: I::RecvSrcAddr,
136        local_ip: SpecifiedAddr<I::Addr>,
137        mut buffer: B,
138        info: &mut LocalDeliveryPacketInfo<I, H>,
139        early_demux_socket: Option<Self::EarlyDemuxSocket>,
140    ) -> Result<(), (B, I::IcmpError)> {
141        let LocalDeliveryPacketInfo { meta, header_info, marks } = info;
142        let ReceiveIpPacketMeta { broadcast, transparent_override, parsing_context } = meta;
143        if let Some(delivery) = transparent_override {
144            warn!(
145                "TODO(https://fxbug.dev/337009139): transparent proxy not supported for TCP \
146                sockets; will not override dispatch to perform local delivery to {delivery:?}"
147            );
148        }
149
150        if broadcast.is_some() {
151            CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
152                .invalid_ip_addrs_received
153                .increment();
154            debug!("tcp: dropping broadcast TCP packet");
155            return Ok(());
156        }
157
158        let remote_ip = match SpecifiedAddr::new(remote_ip.into_addr()) {
159            None => {
160                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
161                    .invalid_ip_addrs_received
162                    .increment();
163                debug!("tcp: source address unspecified, dropping the packet");
164                return Ok(());
165            }
166            Some(src_ip) => src_ip,
167        };
168        let remote_ip: SocketIpAddr<_> = match remote_ip.try_into() {
169            Ok(remote_ip) => remote_ip,
170            Err(AddrIsMappedError {}) => {
171                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
172                    .invalid_ip_addrs_received
173                    .increment();
174                debug!("tcp: source address is mapped (ipv4-mapped-ipv6), dropping the packet");
175                return Ok(());
176            }
177        };
178        let local_ip: SocketIpAddr<_> = match local_ip.try_into() {
179            Ok(local_ip) => local_ip,
180            Err(AddrIsMappedError {}) => {
181                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
182                    .invalid_ip_addrs_received
183                    .increment();
184                debug!("tcp: local address is mapped (ipv4-mapped-ipv6), dropping the packet");
185                return Ok(());
186            }
187        };
188        let packet = match buffer.parse_with::<_, TcpSegment<_>>(TcpParseArgs::with_context(
189            remote_ip.addr(),
190            local_ip.addr(),
191            parsing_context,
192        )) {
193            Ok(packet) => packet,
194            Err(err) => {
195                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
196                    .invalid_segments_received
197                    .increment();
198                debug!("tcp: failed parsing incoming packet {:?}", err);
199                match err {
200                    ParseError::Checksum => {
201                        CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
202                            .checksum_errors
203                            .increment();
204                    }
205                    ParseError::NotSupported | ParseError::NotExpected | ParseError::Format => {}
206                }
207                return Ok(());
208            }
209        };
210        let local_port = packet.dst_port();
211        let remote_port = packet.src_port();
212        let incoming = match VerifiedTcpSegment::try_from(packet) {
213            Ok(segment) => segment,
214            Err(err) => {
215                CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
216                    .invalid_segments_received
217                    .increment();
218                debug!("tcp: malformed segment {:?}", err);
219                return Ok(());
220            }
221        };
222        let conn_addr =
223            ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) };
224
225        CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
226            .valid_segments_received
227            .increment();
228        handle_incoming_packet::<I, _, _, _>(
229            core_ctx,
230            bindings_ctx,
231            conn_addr,
232            device,
233            header_info,
234            &incoming,
235            marks,
236            early_demux_socket,
237        );
238        Ok(())
239    }
240}
241
242fn early_demux_ip_packet<I, BC, CC, B>(
243    core_ctx: &mut CC,
244    device: &CC::DeviceId,
245    src_ip: I::Addr,
246    dst_ip: I::Addr,
247    mut buffer: B,
248) -> Option<DualStackTcpSocketId<I, CC::WeakDeviceId, BC>>
249where
250    I: DualStackIpExt,
251    BC: TcpBindingsContext<CC::DeviceId>,
252    CC: TcpContext<I, BC> + TcpContext<I::OtherVersion, BC>,
253    B: ParseBuffer,
254{
255    let Ok(packet) = buffer.parse_with::<_, TcpSegmentRaw<_>>(()) else {
256        // If we fail to parse the packet then just return None. Invalid
257        // packets are handled later.
258        return None;
259    };
260
261    let src_ip = SocketIpAddr::new(src_ip)?;
262    let dst_ip = SocketIpAddr::new(dst_ip)?;
263    let (src_port, dst_port) = packet.flow_header().src_dst();
264    let src_port = NonZeroU16::new(src_port)?;
265    let dst_port = NonZeroU16::new(dst_port)?;
266    let device = device.downgrade();
267
268    core_ctx.with_demux(|demux: &DemuxState<I, _, _>| {
269        demux
270            .socketmap
271            .lookup_connected((src_ip, src_port), (dst_ip, dst_port), device)
272            .map(|entry| entry.id())
273    })
274}
275
276fn handle_incoming_packet<WireI, BC, CC, H>(
277    core_ctx: &mut CC,
278    bindings_ctx: &mut BC,
279    conn_addr: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
280    incoming_device: &CC::DeviceId,
281    header_info: &H,
282    incoming: &VerifiedTcpSegment<'_>,
283    marks: &Marks,
284    mut early_demux_socket: Option<DualStackTcpSocketId<WireI, CC::WeakDeviceId, BC>>,
285) where
286    WireI: DualStackIpExt,
287    BC: TcpBindingsContext<CC::DeviceId>
288        + BufferProvider<
289            BC::ReceiveBuffer,
290            BC::SendBuffer,
291            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
292            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
293        >,
294    CC: TcpContext<WireI, BC> + TcpContext<WireI::OtherVersion, BC>,
295    H: IpHeaderInfo<WireI>,
296{
297    trace_duration!("tcp::handle_incoming_packet");
298    let mut tw_reuse = None;
299
300    // If we have an early demux socket, then we don't need to look up
301    // connected sockets again. We may still need to lookup listener sockets
302    // when the socket we found a in the timed wait state.
303    let addr: IpAddrVec<WireI, _> = if early_demux_socket.is_some() {
304        let (ip, port) = conn_addr.local;
305        IpAddrVec::new_listener(ip, port)
306    } else {
307        conn_addr.into()
308    };
309    let mut addrs_to_search = AddrVecIter::<WireI, CC::WeakDeviceId, TcpPortSpec>::with_device(
310        addr,
311        incoming_device.downgrade(),
312    );
313
314    enum FoundSocket<S> {
315        // Typically holds the demux ID of the found socket, but may hold
316        // `None` if the found socket was destroyed as a result of the segment.
317        Yes(Option<S>),
318        No,
319    }
320    let found_socket = loop {
321        let sock = if let Some(early_demux_socket) = early_demux_socket.take() {
322            let device = match WireI::as_dual_stack_ip_socket(&early_demux_socket) {
323                EitherStack::ThisStack(conn_id) => conn_id.get_bound_device(core_ctx),
324                EitherStack::OtherStack(conn_id) => conn_id.get_bound_device(core_ctx),
325            };
326            let conn_addr = ConnAddr { ip: conn_addr, device };
327            Some(SocketLookupResult::Connection(early_demux_socket, conn_addr))
328        } else {
329            core_ctx.with_demux(|demux| lookup_socket::<WireI, CC, BC>(demux, &mut addrs_to_search))
330        };
331        match sock {
332            None => break FoundSocket::No,
333            Some(SocketLookupResult::Connection(demux_conn_id, conn_addr)) => {
334                // It is not possible to have two same connections that
335                // share the same local and remote IPs and ports.
336                assert_eq!(tw_reuse, None);
337                let disposition = match WireI::as_dual_stack_ip_socket(&demux_conn_id) {
338                    EitherStack::ThisStack(conn_id) => {
339                        try_handle_incoming_for_connection_dual_stack(
340                            core_ctx,
341                            bindings_ctx,
342                            conn_id,
343                            incoming_device,
344                            header_info,
345                            &incoming,
346                        )
347                    }
348                    EitherStack::OtherStack(conn_id) => {
349                        try_handle_incoming_for_connection_dual_stack(
350                            core_ctx,
351                            bindings_ctx,
352                            conn_id,
353                            incoming_device,
354                            header_info,
355                            &incoming,
356                        )
357                    }
358                };
359                match disposition {
360                    ConnectionIncomingSegmentDisposition::Destroy => {
361                        WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, demux_conn_id);
362                        break FoundSocket::Yes(None);
363                    }
364                    ConnectionIncomingSegmentDisposition::FoundSocket
365                    | ConnectionIncomingSegmentDisposition::Filtered => {
366                        break FoundSocket::Yes(Some(demux_conn_id));
367                    }
368                    ConnectionIncomingSegmentDisposition::ReuseCandidateForListener => {
369                        tw_reuse = Some((demux_conn_id, conn_addr));
370                    }
371                }
372            }
373            Some(SocketLookupResult::Listener((demux_listener_id, _listener_addr))) => {
374                match WireI::as_dual_stack_ip_socket(&demux_listener_id) {
375                    EitherStack::ThisStack(listener_id) => {
376                        let disposition = core_ctx.with_socket_mut_generators_transport_demux(
377                            &listener_id,
378                            |core_ctx, socket_state, isn, timestamp_offset| match core_ctx {
379                                MaybeDualStack::NotDualStack((core_ctx, converter)) => {
380                                    try_handle_incoming_for_listener::<WireI, WireI, CC, BC, _, _>(
381                                        core_ctx,
382                                        bindings_ctx,
383                                        &listener_id,
384                                        isn,
385                                        timestamp_offset,
386                                        socket_state,
387                                        header_info,
388                                        incoming,
389                                        conn_addr,
390                                        incoming_device,
391                                        &mut tw_reuse,
392                                        move |conn, addr| converter.convert_back((conn, addr)),
393                                        WireI::into_demux_socket_id,
394                                        marks,
395                                    )
396                                }
397                                MaybeDualStack::DualStack((core_ctx, converter)) => {
398                                    try_handle_incoming_for_listener::<_, _, CC, BC, _, _>(
399                                        core_ctx,
400                                        bindings_ctx,
401                                        &listener_id,
402                                        isn,
403                                        timestamp_offset,
404                                        socket_state,
405                                        header_info,
406                                        incoming,
407                                        conn_addr,
408                                        incoming_device,
409                                        &mut tw_reuse,
410                                        move |conn, addr| {
411                                            converter
412                                                .convert_back(EitherStack::ThisStack((conn, addr)))
413                                        },
414                                        WireI::into_demux_socket_id,
415                                        marks,
416                                    )
417                                }
418                            },
419                        );
420                        if try_handle_listener_incoming_disposition(
421                            core_ctx,
422                            bindings_ctx,
423                            disposition,
424                            &demux_listener_id,
425                            &mut tw_reuse,
426                            &mut addrs_to_search,
427                            conn_addr,
428                            incoming_device,
429                        ) {
430                            break FoundSocket::Yes(Some(demux_listener_id));
431                        }
432                    }
433                    EitherStack::OtherStack(listener_id) => {
434                        let disposition = core_ctx.with_socket_mut_generators_transport_demux(
435                            &listener_id,
436                            |core_ctx, socket_state, isn, timestamp_offset| {
437                                match core_ctx {
438                                    MaybeDualStack::NotDualStack((_core_ctx, _converter)) => {
439                                        // TODO(https://issues.fuchsia.dev/316408184):
440                                        // Remove this unreachable!.
441                                        unreachable!("OtherStack socket ID with non dual stack");
442                                    }
443                                    MaybeDualStack::DualStack((core_ctx, converter)) => {
444                                        let other_demux_id_converter =
445                                            core_ctx.other_demux_id_converter();
446                                        try_handle_incoming_for_listener::<_, _, CC, BC, _, _>(
447                                            core_ctx,
448                                            bindings_ctx,
449                                            &listener_id,
450                                            isn,
451                                            timestamp_offset,
452                                            socket_state,
453                                            header_info,
454                                            incoming,
455                                            conn_addr,
456                                            incoming_device,
457                                            &mut tw_reuse,
458                                            move |conn, addr| {
459                                                converter.convert_back(EitherStack::OtherStack((
460                                                    conn, addr,
461                                                )))
462                                            },
463                                            move |id| other_demux_id_converter.convert(id),
464                                            marks,
465                                        )
466                                    }
467                                }
468                            },
469                        );
470                        if try_handle_listener_incoming_disposition::<_, _, CC, BC, _>(
471                            core_ctx,
472                            bindings_ctx,
473                            disposition,
474                            &demux_listener_id,
475                            &mut tw_reuse,
476                            &mut addrs_to_search,
477                            conn_addr,
478                            incoming_device,
479                        ) {
480                            break FoundSocket::Yes(Some(demux_listener_id));
481                        }
482                    }
483                };
484            }
485        }
486    };
487
488    let demux_id = match found_socket {
489        FoundSocket::No => {
490            CounterContext::<TcpCountersWithoutSocket<WireI>>::counters(core_ctx)
491                .received_segments_no_dispatch
492                .increment();
493
494            // There is no existing TCP state, pretend it is closed
495            // and generate a RST if needed.
496            // Per RFC 793 (https://tools.ietf.org/html/rfc793#page-21):
497            // CLOSED is fictional because it represents the state when
498            // there is no TCB, and therefore, no connection.
499            if let Some(seg) =
500                (Closed { reason: None::<Option<ConnectionError>> }.on_segment(&incoming.into()))
501            {
502                socket::send_tcp_segment::<WireI, WireI, _, _, _>(
503                    core_ctx,
504                    bindings_ctx,
505                    None,
506                    None,
507                    conn_addr,
508                    seg.into_empty(),
509                    &TcpIpSockOptions { marks: *marks },
510                );
511            }
512            None
513        }
514        FoundSocket::Yes(demux_id) => {
515            counters::increment_counter_with_optional_demux_id::<WireI, _, _, _, _>(
516                core_ctx,
517                demux_id.as_ref(),
518                |c| &c.received_segments_dispatched,
519            );
520            demux_id
521        }
522    };
523
524    if let Some(control) = incoming.control() {
525        counters::increment_counter_with_optional_demux_id::<WireI, _, _, _, _>(
526            core_ctx,
527            demux_id.as_ref(),
528            |c| match control {
529                Control::RST => &c.resets_received,
530                Control::SYN => &c.syns_received,
531                Control::FIN => &c.fins_received,
532            },
533        )
534    }
535}
536
537enum SocketLookupResult<I: DualStackIpExt, D: WeakDeviceIdentifier, BT: TcpBindingsTypes> {
538    Connection(I::DemuxSocketId<D, BT>, ConnAddr<ConnIpAddr<I::Addr, NonZeroU16, NonZeroU16>, D>),
539    Listener((I::DemuxSocketId<D, BT>, ListenerAddr<ListenerIpAddr<I::Addr, NonZeroU16>, D>)),
540}
541
542fn lookup_socket<I, CC, BC>(
543    DemuxState { socketmap, .. }: &DemuxState<I, CC::WeakDeviceId, BC>,
544    addrs_to_search: &mut AddrVecIter<I, CC::WeakDeviceId, TcpPortSpec>,
545) -> Option<SocketLookupResult<I, CC::WeakDeviceId, BC>>
546where
547    I: DualStackIpExt,
548    BC: TcpBindingsContext<CC::DeviceId>,
549    CC: TcpContext<I, BC>,
550{
551    addrs_to_search.find_map(|addr| {
552        match addr {
553            // Connections are always searched before listeners because they
554            // are more specific.
555            AddrVec::Conn(conn_addr) => {
556                socketmap.conns().get_by_addr(&conn_addr).map(|conn_addr_state| {
557                    SocketLookupResult::Connection(conn_addr_state.id(), conn_addr)
558                })
559            }
560            AddrVec::Listen(listener_addr) => {
561                // If we have a listener and the incoming segment is a SYN, we
562                // allocate a new connection entry in the demuxer.
563                // TODO(https://fxbug.dev/42052878): Support SYN cookies.
564
565                socketmap
566                    .listeners()
567                    .get_by_addr(&listener_addr)
568                    .and_then(|addr_state| match addr_state {
569                        ListenerAddrState::ExclusiveListener(id) => Some(id.clone()),
570                        ListenerAddrState::Shared { listener: Some(id), bound: _ } => {
571                            Some(id.clone())
572                        }
573                        ListenerAddrState::ExclusiveBound(_)
574                        | ListenerAddrState::Shared { listener: None, bound: _ } => None,
575                    })
576                    .map(|id| SocketLookupResult::Listener((id, listener_addr)))
577            }
578        }
579    })
580}
581
582#[derive(PartialEq, Eq)]
583enum ConnectionIncomingSegmentDisposition {
584    FoundSocket,
585    Filtered,
586    ReuseCandidateForListener,
587    Destroy,
588}
589
590enum ListenerIncomingSegmentDisposition<S> {
591    FoundSocket,
592    Filtered,
593    ConflictingConnection,
594    NoMatchingSocket,
595    NewConnection(S),
596}
597
598fn try_handle_incoming_for_connection_dual_stack<SockI, WireI, CC, BC, H>(
599    core_ctx: &mut CC,
600    bindings_ctx: &mut BC,
601    conn_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
602    incoming_device: &CC::DeviceId,
603    header_info: &H,
604    incoming: &VerifiedTcpSegment<'_>,
605) -> ConnectionIncomingSegmentDisposition
606where
607    SockI: DualStackIpExt,
608    WireI: Ip,
609    BC: TcpBindingsContext<CC::DeviceId>
610        + BufferProvider<
611            BC::ReceiveBuffer,
612            BC::SendBuffer,
613            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
614            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
615        >,
616    CC: TcpContext<SockI, BC>,
617    H: IpHeaderInfo<WireI>,
618{
619    core_ctx.with_socket_mut_transport_demux(conn_id, |core_ctx, socket_state| {
620        let TcpSocketState { socket_state, sharing: _, ip_options: _, socket_options } =
621            socket_state;
622
623        match run_socket_ingress_filter(
624            bindings_ctx,
625            incoming_device,
626            conn_id.socket_cookie(),
627            socket_options,
628            header_info,
629            incoming.tcp_segment(),
630        ) {
631            SocketIngressFilterResult::Accept => (),
632            SocketIngressFilterResult::Drop => {
633                return ConnectionIncomingSegmentDisposition::Filtered;
634            }
635        }
636
637        let (conn_and_addr, timer) = assert_matches!(
638            socket_state,
639            TcpSocketStateInner::Connected { conn, timer } => (conn, timer),
640            "invalid socket ID"
641        );
642        let this_or_other_stack = match core_ctx {
643            MaybeDualStack::DualStack((core_ctx, converter)) => {
644                match converter.convert(conn_and_addr) {
645                    EitherStack::ThisStack((conn, conn_addr)) => {
646                        // The socket belongs to the current stack, so we
647                        // want to deliver the segment to this stack.
648                        // Use `as_this_stack` to make the context types
649                        // match with the non-dual-stack case.
650                        EitherStack::ThisStack((
651                            core_ctx.as_this_stack(),
652                            conn,
653                            conn_addr,
654                            SockI::into_demux_socket_id(conn_id.clone()),
655                        ))
656                    }
657                    EitherStack::OtherStack((conn, conn_addr)) => {
658                        // We need to deliver from the other stack. i.e. we
659                        // need to deliver an IPv4 packet to the IPv6 stack.
660                        let demux_sock_id = core_ctx.into_other_demux_socket_id(conn_id.clone());
661                        EitherStack::OtherStack((core_ctx, conn, conn_addr, demux_sock_id))
662                    }
663                }
664            }
665            MaybeDualStack::NotDualStack((core_ctx, converter)) => {
666                let (conn, conn_addr) = converter.convert(conn_and_addr);
667                // Similar to the first case, we need deliver to this stack,
668                // but use `as_this_stack` to make the types match.
669                EitherStack::ThisStack((
670                    core_ctx.as_this_stack(),
671                    conn,
672                    conn_addr,
673                    SockI::into_demux_socket_id(conn_id.clone()),
674                ))
675            }
676        };
677
678        match this_or_other_stack {
679            EitherStack::ThisStack((core_ctx, conn, conn_addr, demux_conn_id)) => {
680                try_handle_incoming_for_connection::<_, _, CC, _, _>(
681                    core_ctx,
682                    bindings_ctx,
683                    conn_addr.clone(),
684                    conn_id,
685                    demux_conn_id,
686                    socket_options,
687                    conn,
688                    timer,
689                    incoming.into(),
690                )
691            }
692            EitherStack::OtherStack((core_ctx, conn, conn_addr, demux_conn_id)) => {
693                try_handle_incoming_for_connection::<_, _, CC, _, _>(
694                    core_ctx,
695                    bindings_ctx,
696                    conn_addr.clone(),
697                    conn_id,
698                    demux_conn_id,
699                    socket_options,
700                    conn,
701                    timer,
702                    incoming.into(),
703                )
704            }
705        }
706    })
707}
708
709/// Tries to handle the incoming segment by providing it to a connected socket.
710///
711/// Returns `FoundSocket` if the segment was handled; Otherwise,
712/// `ReuseCandidateForListener` will be returned if there is a defunct socket
713/// that is currently in TIME_WAIT, which is ready to be reused if there is an
714/// active listener listening on the port.
715fn try_handle_incoming_for_connection<SockI, WireI, CC, BC, DC>(
716    core_ctx: &mut DC,
717    bindings_ctx: &mut BC,
718    conn_addr: ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
719    conn_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
720    demux_id: WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
721    socket_options: &SocketOptions,
722    conn: &mut Connection<SockI, WireI, CC::WeakDeviceId, BC>,
723    timer: &mut BC::Timer,
724    incoming: Segment<&[u8]>,
725) -> ConnectionIncomingSegmentDisposition
726where
727    SockI: DualStackIpExt,
728    WireI: DualStackIpExt,
729    BC: TcpBindingsContext<CC::DeviceId>
730        + BufferProvider<
731            BC::ReceiveBuffer,
732            BC::SendBuffer,
733            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
734            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
735        >,
736    CC: TcpContext<SockI, BC>,
737    DC: TransportIpContext<WireI, BC, DeviceId = CC::DeviceId, WeakDeviceId = CC::WeakDeviceId>
738        + DeviceIpSocketHandler<SockI, BC>
739        + TcpDemuxContext<WireI, CC::WeakDeviceId, BC>
740        + TcpCounterContext<SockI, CC::WeakDeviceId, BC>
741        + CoreTxMetadataContext<TcpSocketTxMetadata<SockI, CC::WeakDeviceId, BC>, BC>,
742{
743    let Connection { accept_queue, state, ip_sock, defunct, soft_error: _, handshake_status } =
744        conn;
745
746    // Per RFC 9293 Section 3.6.1:
747    //   When a connection is closed actively, it MUST linger in the TIME-WAIT
748    //   state for a time 2xMSL (Maximum Segment Lifetime) (MUST-13). However,
749    //   it MAY accept a new SYN from the remote TCP endpoint to reopen the
750    //   connection directly from TIME-WAIT state (MAY-2), if it:
751    //
752    //   (1) assigns its initial sequence number for the new connection to be
753    //       larger than the largest sequence number it used on the previous
754    //       connection incarnation, and
755    //   (2) returns to TIME-WAIT state if the SYN turns out to be an old
756    //       duplicate.
757    if *defunct
758        && incoming.header().control == Some(Control::SYN)
759        && incoming.header().ack.is_none()
760    {
761        if let State::TimeWait(TimeWait { last_seq: _, closed_rcv, expiry: _, snd_info: _ }) = state
762        {
763            if !incoming.header().seq.before(closed_rcv.ack) {
764                return ConnectionIncomingSegmentDisposition::ReuseCandidateForListener;
765            }
766        }
767    }
768    let (reply, passive_open, data_acked, newly_closed) = state.on_segment::<_, BC>(
769        &conn_id.either(),
770        &TcpCountersRefs::from_ctx(core_ctx, conn_id),
771        incoming,
772        bindings_ctx.now(),
773        socket_options,
774        *defunct,
775    );
776
777    match data_acked {
778        DataAcked::Yes => {
779            core_ctx.confirm_reachable(bindings_ctx, ip_sock, &socket_options.ip_options)
780        }
781        DataAcked::No => {}
782    }
783
784    match state {
785        State::Listen(_) => {
786            unreachable!("has an invalid status: {:?}", conn.state)
787        }
788        State::SynSent(_) | State::SynRcvd(_) => {
789            assert_eq!(*handshake_status, HandshakeStatus::Pending)
790        }
791        State::Established(_)
792        | State::FinWait1(_)
793        | State::FinWait2(_)
794        | State::Closing(_)
795        | State::CloseWait(_)
796        | State::LastAck(_)
797        | State::TimeWait(_) => {
798            if handshake_status
799                .update_if_pending(HandshakeStatus::Completed { reported: accept_queue.is_some() })
800            {
801                core_ctx.confirm_reachable(bindings_ctx, ip_sock, &socket_options.ip_options);
802            }
803        }
804        State::Closed(Closed { reason }) => {
805            // We remove the socket from the socketmap and cancel the timers
806            // regardless of the socket being defunct or not. The justification
807            // is that CLOSED is a synthetic state and it means no connection
808            // exists, thus it should not exist in the demuxer.
809            //
810            // If the socket was already in the closed state we can assume it's
811            // no longer in the demux.
812            socket::handle_newly_closed(
813                core_ctx,
814                bindings_ctx,
815                newly_closed,
816                &demux_id,
817                &conn_addr,
818                timer,
819            );
820            if let Some(accept_queue) = accept_queue {
821                accept_queue.remove(&conn_id);
822                *defunct = true;
823            }
824            if *defunct {
825                // If the client has promised to not touch the socket again,
826                // we can destroy the socket finally.
827                return ConnectionIncomingSegmentDisposition::Destroy;
828            }
829            let _: bool = handshake_status.update_if_pending(match reason {
830                None => HandshakeStatus::Completed { reported: accept_queue.is_some() },
831                Some(_err) => HandshakeStatus::Aborted,
832            });
833        }
834    }
835
836    if let Some(seg) = reply {
837        socket::send_tcp_segment(
838            core_ctx,
839            bindings_ctx,
840            Some(conn_id),
841            Some(&ip_sock),
842            conn_addr.ip,
843            seg.into_empty(),
844            &socket_options.ip_options,
845        );
846    }
847
848    // Send any enqueued data, if there is any.
849    socket::do_send_inner_and_then_handle_newly_closed(
850        conn_id,
851        &demux_id,
852        socket_options,
853        conn,
854        DoSendLimit::MultipleSegments,
855        &conn_addr,
856        timer,
857        core_ctx,
858        bindings_ctx,
859    );
860
861    // Enqueue the connection to the associated listener
862    // socket's accept queue.
863    if let Some(passive_open) = passive_open {
864        let accept_queue = conn.accept_queue.as_ref().expect("no accept queue but passive open");
865        accept_queue.notify_ready(conn_id, passive_open);
866    }
867
868    // We found a valid connection for the segment.
869    ConnectionIncomingSegmentDisposition::FoundSocket
870}
871
872/// Responds to the disposition returned by [`try_handle_incoming_for_listener`].
873///
874/// Returns true if we have found the right socket and there is no need to
875/// continue the iteration for finding the next-best candidate.
876fn try_handle_listener_incoming_disposition<SockI, WireI, CC, BC, Addr>(
877    core_ctx: &mut CC,
878    bindings_ctx: &mut BC,
879    disposition: ListenerIncomingSegmentDisposition<PrimaryRc<SockI, CC::WeakDeviceId, BC>>,
880    demux_listener_id: &WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
881    tw_reuse: &mut Option<(WireI::DemuxSocketId<CC::WeakDeviceId, BC>, Addr)>,
882    addrs_to_search: &mut AddrVecIter<WireI, CC::WeakDeviceId, TcpPortSpec>,
883    conn_addr: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
884    incoming_device: &CC::DeviceId,
885) -> bool
886where
887    SockI: DualStackIpExt,
888    WireI: DualStackIpExt,
889    CC: TcpContext<SockI, BC> + TcpContext<WireI, BC> + TcpContext<WireI::OtherVersion, BC>,
890    BC: TcpBindingsContext<CC::DeviceId>,
891{
892    match disposition {
893        ListenerIncomingSegmentDisposition::FoundSocket => true,
894        ListenerIncomingSegmentDisposition::Filtered => true,
895        ListenerIncomingSegmentDisposition::ConflictingConnection => {
896            // We're about to rewind the lookup. If we got a
897            // conflicting connection it means tw_reuse has been
898            // removed from the demux state and we need to destroy
899            // it.
900            if let Some((tw_reuse, _)) = tw_reuse.take() {
901                WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, tw_reuse);
902            }
903
904            // Reset the address vector iterator and go again, a
905            // conflicting connection was found.
906            *addrs_to_search = AddrVecIter::<WireI, CC::WeakDeviceId, TcpPortSpec>::with_device(
907                conn_addr.into(),
908                incoming_device.downgrade(),
909            );
910            false
911        }
912        ListenerIncomingSegmentDisposition::NoMatchingSocket => false,
913        ListenerIncomingSegmentDisposition::NewConnection(primary) => {
914            // If we have a new connection, we need to add it to the
915            // set of all sockets.
916
917            // First things first, if we got here then tw_reuse is
918            // gone so we need to destroy it.
919            if let Some((tw_reuse, _)) = tw_reuse.take() {
920                WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, tw_reuse);
921            }
922
923            // Now put the new connection into the socket map.
924            //
925            // Note that there's a possible subtle race here where
926            // another thread could have already operated further on
927            // this connection and marked it for destruction which
928            // puts the entry in the DOA state, if we see that we
929            // must immediately destroy the socket after having put
930            // it in the map.
931            let id = TcpSocketId(PrimaryRc::clone_strong(&primary));
932            let to_destroy = core_ctx.with_all_sockets_mut(move |all_sockets| {
933                let insert_entry = TcpSocketSetEntry::Primary(primary);
934                match all_sockets.entry(id) {
935                    hash_map::Entry::Vacant(v) => {
936                        let _: &mut _ = v.insert(insert_entry);
937                        None
938                    }
939                    hash_map::Entry::Occupied(mut o) => {
940                        // We're holding on to the primary ref, the
941                        // only possible state here should be a DOA
942                        // entry.
943                        assert_matches!(
944                            core::mem::replace(o.get_mut(), insert_entry),
945                            TcpSocketSetEntry::DeadOnArrival
946                        );
947                        Some(o.key().clone())
948                    }
949                }
950            });
951            // NB: we're releasing and reaquiring the
952            // all_sockets_mut lock here for the convenience of not
953            // needing different versions of `destroy_socket`. This
954            // should be fine because the race this is solving
955            // should not be common. If we have correct thread
956            // attribution per flow it should effectively become
957            // impossible so we go for code simplicity here.
958            if let Some(to_destroy) = to_destroy {
959                socket::destroy_socket(core_ctx, bindings_ctx, to_destroy);
960            }
961            counters::increment_counter_for_demux_id::<WireI, _, _, _, _>(
962                core_ctx,
963                demux_listener_id,
964                |c| &c.passive_connection_openings,
965            );
966            true
967        }
968    }
969}
970
971/// Tries to handle an incoming segment by passing it to a listening socket.
972///
973/// Returns `FoundSocket` if the segment was handled, otherwise `NoMatchingSocket`.
974fn try_handle_incoming_for_listener<SockI, WireI, CC, BC, DC, H>(
975    core_ctx: &mut DC,
976    bindings_ctx: &mut BC,
977    listener_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
978    isn: &IsnGenerator<BC::Instant>,
979    timestamp_offset: &TimestampOffsetGenerator<BC::Instant>,
980    socket_state: &mut TcpSocketState<SockI, CC::WeakDeviceId, BC>,
981    header_info: &H,
982    incoming: &VerifiedTcpSegment<'_>,
983    incoming_addrs: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
984    incoming_device: &CC::DeviceId,
985    tw_reuse: &mut Option<(
986        WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
987        ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
988    )>,
989    make_connection: impl FnOnce(
990        Connection<SockI, WireI, CC::WeakDeviceId, BC>,
991        ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
992    ) -> SockI::ConnectionAndAddr<CC::WeakDeviceId, BC>,
993    make_demux_id: impl Fn(
994        TcpSocketId<SockI, CC::WeakDeviceId, BC>,
995    ) -> WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
996    marks: &Marks,
997) -> ListenerIncomingSegmentDisposition<PrimaryRc<SockI, CC::WeakDeviceId, BC>>
998where
999    SockI: DualStackIpExt,
1000    WireI: DualStackIpExt,
1001    BC: TcpBindingsContext<CC::DeviceId>
1002        + BufferProvider<
1003            BC::ReceiveBuffer,
1004            BC::SendBuffer,
1005            ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
1006            PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
1007        >,
1008    CC: TcpContext<SockI, BC>,
1009    DC: TransportIpContext<WireI, BC, DeviceId = CC::DeviceId, WeakDeviceId = CC::WeakDeviceId>
1010        + DeviceIpSocketHandler<WireI, BC>
1011        + TcpDemuxContext<WireI, CC::WeakDeviceId, BC>
1012        + TcpCounterContext<SockI, CC::WeakDeviceId, BC>
1013        + CoreTxMetadataContext<TcpSocketTxMetadata<SockI, CC::WeakDeviceId, BC>, BC>,
1014    H: IpHeaderInfo<WireI>,
1015{
1016    let Listener { addr: listener_addr, accept_queue, backlog, buffer_sizes } =
1017        match &socket_state.socket_state {
1018            TcpSocketStateInner::Bound(_) => {
1019                // If the socket is only bound, but not listening.
1020                return ListenerIncomingSegmentDisposition::NoMatchingSocket;
1021            }
1022            TcpSocketStateInner::Listener(listener) => listener,
1023            _ => panic!("unexpected socket state: {:?}", socket_state.socket_state),
1024        };
1025
1026    let ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) } =
1027        incoming_addrs;
1028
1029    match run_socket_ingress_filter(
1030        bindings_ctx,
1031        incoming_device,
1032        listener_id.socket_cookie(),
1033        &socket_state.socket_options,
1034        header_info,
1035        incoming.tcp_segment(),
1036    ) {
1037        SocketIngressFilterResult::Accept => (),
1038        SocketIngressFilterResult::Drop => {
1039            return ListenerIncomingSegmentDisposition::Filtered;
1040        }
1041    }
1042
1043    // Note that this checks happens at the very beginning, before we try to
1044    // reuse the connection in TIME-WAIT, this is because we need to store the
1045    // reused connection in the accept queue so we have to respect its limit.
1046    if accept_queue.len() == backlog.get() {
1047        core_ctx.increment_both(listener_id, |counters| &counters.listener_queue_overflow);
1048        core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1049        debug!("incoming SYN dropped because of the full backlog of the listener");
1050        return ListenerIncomingSegmentDisposition::FoundSocket;
1051    }
1052
1053    // Ensure that if the remote address requires a zone, we propagate that to
1054    // the address for the connected socket.
1055    let bound_device = listener_addr.as_ref().clone();
1056    let bound_device = if remote_ip.as_ref().must_have_zone() {
1057        Some(bound_device.map_or(EitherDeviceId::Strong(incoming_device), EitherDeviceId::Weak))
1058    } else {
1059        bound_device.map(EitherDeviceId::Weak)
1060    };
1061
1062    let ip_options = TcpIpSockOptions { marks: *marks, ..socket_state.socket_options.ip_options };
1063    let socket_options = SocketOptions { ip_options, ..socket_state.socket_options };
1064
1065    let bound_device = bound_device.as_ref().map(|d| d.as_ref());
1066    let ip_sock = match core_ctx.new_ip_socket(
1067        bindings_ctx,
1068        IpSocketArgs {
1069            device: bound_device,
1070            local_ip: IpDeviceAddr::new_from_socket_ip_addr(local_ip),
1071            remote_ip,
1072            proto: IpProto::Tcp.into(),
1073            options: &ip_options,
1074        },
1075    ) {
1076        Ok(ip_sock) => ip_sock,
1077        err @ Err(IpSockCreationError::Route(_)) => {
1078            core_ctx.increment_both(listener_id, |counters| &counters.passive_open_no_route_errors);
1079            core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1080            debug!("cannot construct an ip socket to the SYN originator: {:?}, ignoring", err);
1081            return ListenerIncomingSegmentDisposition::NoMatchingSocket;
1082        }
1083    };
1084
1085    let isn = isn.generate(
1086        bindings_ctx.now(),
1087        (ip_sock.local_ip().clone().into(), local_port),
1088        (ip_sock.remote_ip().clone(), remote_port),
1089    );
1090    let timestamp_offset = timestamp_offset.generate::<SocketIpAddr<WireI::Addr>, NonZeroU16>(
1091        bindings_ctx.now(),
1092        (ip_sock.local_ip().clone().into(), local_port),
1093        (ip_sock.remote_ip().clone(), remote_port),
1094    );
1095    let device_mms = match core_ctx.get_mms(bindings_ctx, &ip_sock, &socket_options.ip_options) {
1096        Ok(mms) => mms,
1097        Err(err) => {
1098            // If we cannot find a device or the device's MTU is too small,
1099            // there isn't much we can do here since sending a RST back is
1100            // impossible, we just need to silent drop the segment.
1101            error!("Cannot find a device with large enough MTU for the connection");
1102            core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1103            match err {
1104                MmsError::NoDevice(_) | MmsError::MTUTooSmall(_) => {
1105                    return ListenerIncomingSegmentDisposition::FoundSocket;
1106                }
1107            }
1108        }
1109    };
1110    let Some(device_mss) = Mss::from_mms(device_mms) else {
1111        return ListenerIncomingSegmentDisposition::FoundSocket;
1112    };
1113
1114    let mut state = State::Listen(Closed::<Initial>::listen(
1115        isn,
1116        timestamp_offset,
1117        buffer_sizes.clone(),
1118        device_mss,
1119        Mss::default::<WireI>(),
1120        socket_options.user_timeout,
1121    ));
1122
1123    // Prepare a reply to be sent out.
1124    //
1125    // We might end up discarding the reply in case we can't instantiate this
1126    // new connection.
1127    let result = state.on_segment::<_, BC>(
1128        // NB: This is a bit of a lie, we're passing the listener ID to process
1129        // the first segment because we don't have an ID allocated yet. This is
1130        // okay because the state machine ID is only for debugging purposes.
1131        &listener_id.either(),
1132        &TcpCountersRefs::from_ctx(core_ctx, listener_id),
1133        incoming.into(),
1134        bindings_ctx.now(),
1135        &SocketOptions::default(),
1136        false, /* defunct */
1137    );
1138    let reply = assert_matches!(
1139        result,
1140        (reply, None, /* data_acked */ _, NewlyClosed::No /* can't become closed */) => reply
1141    );
1142
1143    let result = if matches!(state, State::SynRcvd(_)) {
1144        let poll_send_at = state.poll_send_at().expect("no retrans timer");
1145        let bound_device = ip_sock.device().cloned();
1146
1147        let addr = ConnAddr {
1148            ip: ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) },
1149            device: bound_device,
1150        };
1151
1152        let new_socket = core_ctx.with_demux_mut(|DemuxState { socketmap, .. }| {
1153            // If we're reusing an entry, remove it from the demux before
1154            // proceeding.
1155            //
1156            // We could just reuse the old allocation for the new connection but
1157            // because of the restrictions on the socket map data structure (for
1158            // good reasons), we can't update the sharing info unconditionally.
1159            // So here we just remove the old connection and create a new one.
1160            // Also this approach has the benefit of not accidentally persisting
1161            // the old state that we don't want.
1162            if let Some((tw_reuse, conn_addr)) = tw_reuse {
1163                match socketmap.conns_mut().remove(tw_reuse, &conn_addr) {
1164                    Ok(()) => {
1165                        // NB: We're removing the tw_reuse connection from the
1166                        // demux here, but not canceling its timer. The timer is
1167                        // canceled via drop when we destroy the socket. Special
1168                        // care is taken when handling timers in the time wait
1169                        // state to account for this.
1170                    }
1171                    Err(NotFoundError) => {
1172                        // We could lose a race trying to reuse the tw_reuse
1173                        // socket, so we just accept the loss and be happy that
1174                        // the conn_addr we want to use is free.
1175                    }
1176                }
1177            }
1178
1179            // Try to create and add the new socket to the demux.
1180            let accept_queue_clone = accept_queue.clone();
1181            let ip_sock = ip_sock.clone();
1182            let bindings_ctx_moved = &mut *bindings_ctx;
1183            let sharing = socket_state.sharing;
1184            match socketmap.conns_mut().try_insert_with(addr, sharing, move |addr, sharing| {
1185                let conn = make_connection(
1186                    Connection {
1187                        accept_queue: Some(accept_queue_clone),
1188                        state,
1189                        ip_sock,
1190                        defunct: false,
1191                        soft_error: None,
1192                        handshake_status: HandshakeStatus::Pending,
1193                    },
1194                    addr,
1195                );
1196
1197                let (id, primary) = TcpSocketId::new_cyclic(
1198                    |weak| {
1199                        let mut timer = CC::new_timer(bindings_ctx_moved, weak);
1200                        // Schedule the timer here because we can't acquire the lock
1201                        // later. This only runs when inserting into the demux
1202                        // succeeds so it's okay.
1203                        assert_eq!(
1204                            bindings_ctx_moved.schedule_timer_instant(poll_send_at, &mut timer),
1205                            None
1206                        );
1207                        TcpSocketStateInner::Connected { conn, timer }
1208                    },
1209                    sharing,
1210                    socket_options,
1211                );
1212                (make_demux_id(id.clone()), (primary, id))
1213            }) {
1214                Ok((_entry, (primary, id))) => {
1215                    // Make sure the new socket is in the pending accept queue
1216                    // before we release the demux lock.
1217                    accept_queue.push_pending(id);
1218                    Some(primary)
1219                }
1220                Err(e) => {
1221                    // The only error we accept here is if the entry exists
1222                    // fully, any indirect conflicts are unexpected because we
1223                    // know the listener is still alive and installed in the
1224                    // demux.
1225                    assert_matches!(e, InsertError::Exists);
1226                    // If we fail to insert it means we lost a race and this
1227                    // packet is destined to a connection that is already
1228                    // established. In that case we should tell the demux code
1229                    // to retry demuxing it all over again.
1230                    None
1231                }
1232            }
1233        });
1234
1235        match new_socket {
1236            Some(new_socket) => ListenerIncomingSegmentDisposition::NewConnection(new_socket),
1237            None => {
1238                // We didn't create a new connection, short circuit early and
1239                // don't send out the pending segment.
1240                core_ctx
1241                    .increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1242                return ListenerIncomingSegmentDisposition::ConflictingConnection;
1243            }
1244        }
1245    } else {
1246        // We found a valid listener for the segment even if the connection
1247        // state is not a newly pending connection.
1248        ListenerIncomingSegmentDisposition::FoundSocket
1249    };
1250
1251    // We can send a reply now if we got here.
1252    if let Some(seg) = reply {
1253        socket::send_tcp_segment(
1254            core_ctx,
1255            bindings_ctx,
1256            Some(&listener_id),
1257            Some(&ip_sock),
1258            incoming_addrs,
1259            seg.into_empty(),
1260            &socket_options.ip_options,
1261        );
1262    }
1263
1264    result
1265}
1266
1267pub(super) fn tcp_serialize_segment<'a, I, P>(
1268    header: &'a SegmentHeader,
1269    data: P,
1270    conn_addr: ConnIpAddr<I::Addr, NonZeroU16, NonZeroU16>,
1271) -> impl TransportPacketSerializer<I, Buffer = EmptyBuf> + Debug + 'a
1272where
1273    I: FilterIpExt,
1274    P: InnerPacketBuilder + Debug + Payload + 'a,
1275{
1276    let SegmentHeader { seq, ack, wnd, control, options, push } = header;
1277    let ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) } = conn_addr;
1278    let mut builder = TcpSegmentBuilder::new(
1279        local_ip.addr(),
1280        remote_ip.addr(),
1281        local_port,
1282        remote_port,
1283        (*seq).into(),
1284        ack.map(Into::into),
1285        u16::from(*wnd),
1286    );
1287    builder.psh(*push);
1288    match control {
1289        None => {}
1290        Some(Control::SYN) => builder.syn(true),
1291        Some(Control::FIN) => builder.fin(true),
1292        Some(Control::RST) => builder.rst(true),
1293    }
1294    TcpSegmentBuilderWithOptions::new(builder, options.builder())
1295        .unwrap_or_else(|TcpOptionsTooLongError| {
1296            panic!("Too many TCP options");
1297        })
1298        .wrap_body(data.into_serializer())
1299}
1300
1301fn run_socket_ingress_filter<I, BC, D>(
1302    bindings_ctx: &BC,
1303    incoming_device: &D,
1304    socket_cookie: SocketCookie,
1305    socket_options: &SocketOptions,
1306    header_info: &impl IpHeaderInfo<I>,
1307    tcp_segment: &TcpSegment<&'_ [u8]>,
1308) -> SocketIngressFilterResult
1309where
1310    I: Ip,
1311    BC: TcpBindingsContext<D>,
1312    D: StrongDeviceIdentifier,
1313{
1314    let [ip_prefix, ip_options] = header_info.as_bytes();
1315    let [tcp_prefix, tcp_options, data] = tcp_segment.as_bytes();
1316    let mut slices = [ip_prefix, ip_options, tcp_prefix, tcp_options, data];
1317    let data = FragmentedByteSlice::new(&mut slices);
1318
1319    bindings_ctx.socket_ops_filter().on_ingress(
1320        I::VERSION,
1321        data,
1322        incoming_device,
1323        socket_cookie,
1324        &socket_options.ip_options.marks,
1325    )
1326}
1327
1328#[cfg(test)]
1329mod test {
1330    use ip_test_macro::ip_test;
1331    use netstack3_base::{
1332        HandshakeOptions, NetworkSerializationContext, Options, ResetOptions, SackBlocks,
1333        SegmentOptions, UnscaledWindowSize,
1334    };
1335    use packet::{ParseBuffer as _, Serializer as _};
1336    use packet_formats::tcp::options::TcpOptions as _;
1337    use test_case::test_case;
1338
1339    use super::*;
1340
1341    trait TestIpExt: netstack3_base::testutil::TestIpExt + FilterIpExt {}
1342    impl<T> TestIpExt for T where T: netstack3_base::testutil::TestIpExt + FilterIpExt {}
1343
1344    const SEQ: SeqNum = SeqNum::new(12345);
1345    const ACK: SeqNum = SeqNum::new(67890);
1346    const FAKE_DATA: &'static [u8] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 0];
1347
1348    #[ip_test(I)]
1349    #[test_case(
1350        Segment::syn(SEQ, UnscaledWindowSize::from(u16::MAX),
1351        HandshakeOptions::default()), &[]
1352        ; "syn")]
1353    #[test_case(
1354        Segment::syn(SEQ, UnscaledWindowSize::from(u16::MAX),
1355        HandshakeOptions {
1356            mss: Some(Mss::new(1440).unwrap()),
1357            ..Default::default() }), &[]
1358            ; "syn with mss")]
1359    #[test_case(
1360        Segment::ack(SEQ, ACK, UnscaledWindowSize::from(u16::MAX), SegmentOptions::default()),
1361        &[]; "ack")]
1362    #[test_case(Segment::with_fake_data(SEQ, ACK, FAKE_DATA), FAKE_DATA; "data")]
1363    #[test_case(Segment::new_assert_no_discard(SegmentHeader {
1364            seq: SEQ,
1365            ack: Some(ACK),
1366            push: true,
1367            wnd: UnscaledWindowSize::from(u16::MAX),
1368            ..Default::default()
1369        },
1370        FAKE_DATA
1371    ), FAKE_DATA; "push")]
1372    fn tcp_serialize_segment<I: TestIpExt>(segment: Segment<&[u8]>, expected_body: &[u8]) {
1373        const SOURCE_PORT: NonZeroU16 = NonZeroU16::new(1111).unwrap();
1374        const DEST_PORT: NonZeroU16 = NonZeroU16::new(2222).unwrap();
1375
1376        let (header, data) = segment.into_parts();
1377        let serializer = super::tcp_serialize_segment::<I, _>(
1378            &header,
1379            data,
1380            ConnIpAddr {
1381                local: (SocketIpAddr::try_from(I::TEST_ADDRS.local_ip).unwrap(), SOURCE_PORT),
1382                remote: (SocketIpAddr::try_from(I::TEST_ADDRS.remote_ip).unwrap(), DEST_PORT),
1383            },
1384        );
1385
1386        let mut serialized = serializer
1387            .serialize_vec_outer(&mut NetworkSerializationContext::default())
1388            .unwrap()
1389            .unwrap_b();
1390        let parsed_segment = serialized
1391            .parse_with::<_, TcpSegment<_>>(TcpParseArgs::new(
1392                *I::TEST_ADDRS.remote_ip,
1393                *I::TEST_ADDRS.local_ip,
1394            ))
1395            .expect("is valid segment");
1396
1397        assert_eq!(parsed_segment.src_port(), SOURCE_PORT);
1398        assert_eq!(parsed_segment.dst_port(), DEST_PORT);
1399        assert_eq!(parsed_segment.seq_num(), u32::from(SEQ));
1400        assert_eq!(parsed_segment.psh(), header.push);
1401        assert_eq!(
1402            UnscaledWindowSize::from(parsed_segment.window_size()),
1403            UnscaledWindowSize::from(u16::MAX)
1404        );
1405
1406        let (mss, window_scale, sack_permitted, sack_blocks, timestamp) = match header.options {
1407            Options::Handshake(HandshakeOptions {
1408                mss,
1409                window_scale,
1410                sack_permitted,
1411                timestamp,
1412            }) => (mss, window_scale, sack_permitted, SackBlocks::EMPTY, timestamp),
1413            Options::Segment(SegmentOptions { timestamp, sack_blocks }) => {
1414                (None, None, false, sack_blocks, timestamp)
1415            }
1416            Options::Reset(ResetOptions { timestamp }) => {
1417                (None, None, false, SackBlocks::EMPTY, timestamp)
1418            }
1419        };
1420        assert_eq!(mss.map(|mss| mss.get()), parsed_segment.options().mss());
1421        assert_eq!(window_scale.map(|ws| ws.get()), parsed_segment.options().window_scale());
1422        assert_eq!(sack_permitted, parsed_segment.options().sack_permitted());
1423        assert_eq!(sack_blocks.as_slice(), parsed_segment.options().sack_blocks());
1424        assert_eq!(
1425            timestamp.as_ref().map(Into::into).as_ref(),
1426            parsed_segment.options().timestamp()
1427        );
1428
1429        assert_eq!(parsed_segment.into_body(), expected_body);
1430    }
1431}