1use core::convert::Infallible as Never;
9use core::fmt::Debug;
10use core::num::NonZeroU16;
11
12use assert_matches::assert_matches;
13use log::{debug, error, warn};
14use net_types::ip::Ip;
15use net_types::{SpecifiedAddr, Witness as _};
16use netstack3_base::socket::{
17 AddrIsMappedError, AddrVec, AddrVecIter, ConnAddr, ConnIpAddr, InsertError, ListenerAddr,
18 ListenerIpAddr, SocketCookie, SocketIpAddr, SocketIpAddrExt as _,
19};
20use netstack3_base::{
21 BidirectionalConverter as _, Control, CounterContext, CtxPair, EitherDeviceId, IpDeviceAddr,
22 Marks, Mss, NotFoundError, Payload, Segment, SegmentHeader, SeqNum, StrongDeviceIdentifier,
23 VerifiedTcpSegment, WeakDeviceIdentifier,
24};
25use netstack3_filter::{
26 FilterIpExt, SocketIngressFilterResult, SocketOpsFilter, TransportPacketSerializer,
27};
28use netstack3_hashmap::hash_map;
29use netstack3_ip::socket::{IpSockCreationError, IpSocketArgs, MmsError};
30use netstack3_ip::{
31 IpHeaderInfo, IpTransportContext, LocalDeliveryPacketInfo, ReceiveIpPacketMeta,
32 TransportIpContext, TransportReceiveError,
33};
34use netstack3_trace::trace_duration;
35use packet::{
36 BufferMut, BufferView as _, EmptyBuf, FragmentedByteSlice, InnerPacketBuilder, PacketBuilder,
37 ParseBuffer,
38};
39use packet_formats::error::ParseError;
40use packet_formats::ip::IpProto;
41use packet_formats::tcp::{
42 TcpFlowAndSeqNum, TcpOptionsTooLongError, TcpParseArgs, TcpSegment, TcpSegmentBuilder,
43 TcpSegmentBuilderWithOptions,
44};
45
46use crate::internal::base::{BufferSizes, ConnectionError, SocketOptions, TcpIpSockOptions};
47use crate::internal::counters::{
48 self, TcpCounterContext, TcpCountersRefs, TcpCountersWithoutSocket,
49};
50use crate::internal::socket::generators::{IsnGenerator, TimestampOffsetGenerator};
51use crate::internal::socket::{
52 self, AsThisStack as _, Connection, CoreTxMetadataContext, DemuxState, DeviceIpSocketHandler,
53 DoSendLimit, DualStackDemuxIdConverter as _, DualStackIpExt, EitherStack, HandshakeStatus,
54 Listener, ListenerAddrState, MaybeDualStack, PrimaryRc, TcpApi, TcpBindingsContext,
55 TcpBindingsTypes, TcpContext, TcpDemuxContext, TcpDualStackContext, TcpIpTransportContext,
56 TcpPortSpec, TcpSocketId, TcpSocketSetEntry, TcpSocketState, TcpSocketStateInner,
57 TcpSocketTxMetadata,
58};
59use crate::internal::state::{
60 BufferProvider, Closed, DataAcked, Initial, NewlyClosed, State, TimeWait,
61};
62
63impl<BT: TcpBindingsTypes> BufferProvider<BT::ReceiveBuffer, BT::SendBuffer> for BT {
64 type ActiveOpen = BT::ListenerNotifierOrProvidedBuffers;
65
66 type PassiveOpen = BT::ReturnedBuffers;
67
68 fn new_passive_open_buffers(
69 buffer_sizes: BufferSizes,
70 ) -> (BT::ReceiveBuffer, BT::SendBuffer, Self::PassiveOpen) {
71 BT::new_passive_open_buffers(buffer_sizes)
72 }
73}
74
75impl<I, BC, CC> IpTransportContext<I, BC, CC> for TcpIpTransportContext
76where
77 I: DualStackIpExt,
78 BC: TcpBindingsContext<CC::DeviceId>
79 + BufferProvider<
80 BC::ReceiveBuffer,
81 BC::SendBuffer,
82 ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
83 PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
84 >,
85 CC: TcpContext<I, BC> + TcpContext<I::OtherVersion, BC>,
86{
87 type EarlyDemuxSocket = Never;
89
90 fn early_demux<B: ParseBuffer>(
91 _core_ctx: &mut CC,
92 _device: &CC::DeviceId,
93 _src_ip: I::Addr,
94 _dst_ip: I::Addr,
95 _buffer: B,
96 ) -> Option<Self::EarlyDemuxSocket> {
97 None
98 }
99
100 fn receive_icmp_error(
101 core_ctx: &mut CC,
102 bindings_ctx: &mut BC,
103 _device: &CC::DeviceId,
104 original_src_ip: Option<SpecifiedAddr<I::Addr>>,
105 original_dst_ip: SpecifiedAddr<I::Addr>,
106 mut original_body: &[u8],
107 err: I::ErrorCode,
108 ) {
109 let mut buffer = &mut original_body;
110 let Some(flow_and_seqnum) = buffer.take_obj_front::<TcpFlowAndSeqNum>() else {
111 error!("received an ICMP error but its body is less than 8 bytes");
112 return;
113 };
114
115 let Some(original_src_ip) = original_src_ip else { return };
116 let Some(original_src_port) = NonZeroU16::new(flow_and_seqnum.src_port()) else { return };
117 let Some(original_dst_port) = NonZeroU16::new(flow_and_seqnum.dst_port()) else { return };
118 let original_seqnum = SeqNum::new(flow_and_seqnum.sequence_num());
119
120 TcpApi::<I, _>::new(CtxPair { core_ctx, bindings_ctx }).on_icmp_error(
121 original_src_ip,
122 original_dst_ip,
123 original_src_port,
124 original_dst_port,
125 original_seqnum,
126 err.into(),
127 );
128 }
129
130 fn receive_ip_packet<B: BufferMut, H: IpHeaderInfo<I>>(
131 core_ctx: &mut CC,
132 bindings_ctx: &mut BC,
133 device: &CC::DeviceId,
134 remote_ip: I::RecvSrcAddr,
135 local_ip: SpecifiedAddr<I::Addr>,
136 mut buffer: B,
137 info: &LocalDeliveryPacketInfo<I, H>,
138 _early_demux_socket: Option<Never>,
139 ) -> Result<(), (B, TransportReceiveError)> {
140 let LocalDeliveryPacketInfo { meta, header_info, marks } = info;
141 let ReceiveIpPacketMeta { broadcast, transparent_override } = meta;
142 if let Some(delivery) = transparent_override {
143 warn!(
144 "TODO(https://fxbug.dev/337009139): transparent proxy not supported for TCP \
145 sockets; will not override dispatch to perform local delivery to {delivery:?}"
146 );
147 }
148
149 if broadcast.is_some() {
150 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
151 .invalid_ip_addrs_received
152 .increment();
153 debug!("tcp: dropping broadcast TCP packet");
154 return Ok(());
155 }
156
157 let remote_ip = match SpecifiedAddr::new(remote_ip.into_addr()) {
158 None => {
159 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
160 .invalid_ip_addrs_received
161 .increment();
162 debug!("tcp: source address unspecified, dropping the packet");
163 return Ok(());
164 }
165 Some(src_ip) => src_ip,
166 };
167 let remote_ip: SocketIpAddr<_> = match remote_ip.try_into() {
168 Ok(remote_ip) => remote_ip,
169 Err(AddrIsMappedError {}) => {
170 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
171 .invalid_ip_addrs_received
172 .increment();
173 debug!("tcp: source address is mapped (ipv4-mapped-ipv6), dropping the packet");
174 return Ok(());
175 }
176 };
177 let local_ip: SocketIpAddr<_> = match local_ip.try_into() {
178 Ok(local_ip) => local_ip,
179 Err(AddrIsMappedError {}) => {
180 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
181 .invalid_ip_addrs_received
182 .increment();
183 debug!("tcp: local address is mapped (ipv4-mapped-ipv6), dropping the packet");
184 return Ok(());
185 }
186 };
187 let packet = match buffer
188 .parse_with::<_, TcpSegment<_>>(TcpParseArgs::new(remote_ip.addr(), local_ip.addr()))
189 {
190 Ok(packet) => packet,
191 Err(err) => {
192 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
193 .invalid_segments_received
194 .increment();
195 debug!("tcp: failed parsing incoming packet {:?}", err);
196 match err {
197 ParseError::Checksum => {
198 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
199 .checksum_errors
200 .increment();
201 }
202 ParseError::NotSupported | ParseError::NotExpected | ParseError::Format => {}
203 }
204 return Ok(());
205 }
206 };
207 let local_port = packet.dst_port();
208 let remote_port = packet.src_port();
209 let incoming = match VerifiedTcpSegment::try_from(packet) {
210 Ok(segment) => segment,
211 Err(err) => {
212 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
213 .invalid_segments_received
214 .increment();
215 debug!("tcp: malformed segment {:?}", err);
216 return Ok(());
217 }
218 };
219 let conn_addr =
220 ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) };
221
222 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
223 .valid_segments_received
224 .increment();
225 handle_incoming_packet::<I, _, _, _>(
226 core_ctx,
227 bindings_ctx,
228 conn_addr,
229 device,
230 header_info,
231 &incoming,
232 marks,
233 );
234 Ok(())
235 }
236}
237
238fn handle_incoming_packet<WireI, BC, CC, H>(
239 core_ctx: &mut CC,
240 bindings_ctx: &mut BC,
241 conn_addr: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
242 incoming_device: &CC::DeviceId,
243 header_info: &H,
244 incoming: &VerifiedTcpSegment<'_>,
245 marks: &Marks,
246) where
247 WireI: DualStackIpExt,
248 BC: TcpBindingsContext<CC::DeviceId>
249 + BufferProvider<
250 BC::ReceiveBuffer,
251 BC::SendBuffer,
252 ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
253 PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
254 >,
255 CC: TcpContext<WireI, BC> + TcpContext<WireI::OtherVersion, BC>,
256 H: IpHeaderInfo<WireI>,
257{
258 trace_duration!("tcp::handle_incoming_packet");
259 let mut tw_reuse = None;
260
261 let mut addrs_to_search = AddrVecIter::<WireI, CC::WeakDeviceId, TcpPortSpec>::with_device(
262 conn_addr.into(),
263 incoming_device.downgrade(),
264 );
265
266 enum FoundSocket<S> {
267 Yes(Option<S>),
270 No,
271 }
272 let found_socket = loop {
273 let sock = core_ctx
274 .with_demux(|demux| lookup_socket::<WireI, CC, BC>(demux, &mut addrs_to_search));
275 match sock {
276 None => break FoundSocket::No,
277 Some(SocketLookupResult::Connection(demux_conn_id, conn_addr)) => {
278 assert_eq!(tw_reuse, None);
281 let disposition = match WireI::as_dual_stack_ip_socket(&demux_conn_id) {
282 EitherStack::ThisStack(conn_id) => {
283 try_handle_incoming_for_connection_dual_stack(
284 core_ctx,
285 bindings_ctx,
286 conn_id,
287 incoming_device,
288 header_info,
289 &incoming,
290 )
291 }
292 EitherStack::OtherStack(conn_id) => {
293 try_handle_incoming_for_connection_dual_stack(
294 core_ctx,
295 bindings_ctx,
296 conn_id,
297 incoming_device,
298 header_info,
299 &incoming,
300 )
301 }
302 };
303 match disposition {
304 ConnectionIncomingSegmentDisposition::Destroy => {
305 WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, demux_conn_id);
306 break FoundSocket::Yes(None);
307 }
308 ConnectionIncomingSegmentDisposition::FoundSocket
309 | ConnectionIncomingSegmentDisposition::Filtered => {
310 break FoundSocket::Yes(Some(demux_conn_id));
311 }
312 ConnectionIncomingSegmentDisposition::ReuseCandidateForListener => {
313 tw_reuse = Some((demux_conn_id, conn_addr));
314 }
315 }
316 }
317 Some(SocketLookupResult::Listener((demux_listener_id, _listener_addr))) => {
318 match WireI::as_dual_stack_ip_socket(&demux_listener_id) {
319 EitherStack::ThisStack(listener_id) => {
320 let disposition = core_ctx.with_socket_mut_generators_transport_demux(
321 &listener_id,
322 |core_ctx, socket_state, isn, timestamp_offset| match core_ctx {
323 MaybeDualStack::NotDualStack((core_ctx, converter)) => {
324 try_handle_incoming_for_listener::<WireI, WireI, CC, BC, _, _>(
325 core_ctx,
326 bindings_ctx,
327 &listener_id,
328 isn,
329 timestamp_offset,
330 socket_state,
331 header_info,
332 incoming,
333 conn_addr,
334 incoming_device,
335 &mut tw_reuse,
336 move |conn, addr| converter.convert_back((conn, addr)),
337 WireI::into_demux_socket_id,
338 marks,
339 )
340 }
341 MaybeDualStack::DualStack((core_ctx, converter)) => {
342 try_handle_incoming_for_listener::<_, _, CC, BC, _, _>(
343 core_ctx,
344 bindings_ctx,
345 &listener_id,
346 isn,
347 timestamp_offset,
348 socket_state,
349 header_info,
350 incoming,
351 conn_addr,
352 incoming_device,
353 &mut tw_reuse,
354 move |conn, addr| {
355 converter
356 .convert_back(EitherStack::ThisStack((conn, addr)))
357 },
358 WireI::into_demux_socket_id,
359 marks,
360 )
361 }
362 },
363 );
364 if try_handle_listener_incoming_disposition(
365 core_ctx,
366 bindings_ctx,
367 disposition,
368 &demux_listener_id,
369 &mut tw_reuse,
370 &mut addrs_to_search,
371 conn_addr,
372 incoming_device,
373 ) {
374 break FoundSocket::Yes(Some(demux_listener_id));
375 }
376 }
377 EitherStack::OtherStack(listener_id) => {
378 let disposition = core_ctx.with_socket_mut_generators_transport_demux(
379 &listener_id,
380 |core_ctx, socket_state, isn, timestamp_offset| {
381 match core_ctx {
382 MaybeDualStack::NotDualStack((_core_ctx, _converter)) => {
383 unreachable!("OtherStack socket ID with non dual stack");
386 }
387 MaybeDualStack::DualStack((core_ctx, converter)) => {
388 let other_demux_id_converter =
389 core_ctx.other_demux_id_converter();
390 try_handle_incoming_for_listener::<_, _, CC, BC, _, _>(
391 core_ctx,
392 bindings_ctx,
393 &listener_id,
394 isn,
395 timestamp_offset,
396 socket_state,
397 header_info,
398 incoming,
399 conn_addr,
400 incoming_device,
401 &mut tw_reuse,
402 move |conn, addr| {
403 converter.convert_back(EitherStack::OtherStack((
404 conn, addr,
405 )))
406 },
407 move |id| other_demux_id_converter.convert(id),
408 marks,
409 )
410 }
411 }
412 },
413 );
414 if try_handle_listener_incoming_disposition::<_, _, CC, BC, _>(
415 core_ctx,
416 bindings_ctx,
417 disposition,
418 &demux_listener_id,
419 &mut tw_reuse,
420 &mut addrs_to_search,
421 conn_addr,
422 incoming_device,
423 ) {
424 break FoundSocket::Yes(Some(demux_listener_id));
425 }
426 }
427 };
428 }
429 }
430 };
431
432 let demux_id = match found_socket {
433 FoundSocket::No => {
434 CounterContext::<TcpCountersWithoutSocket<WireI>>::counters(core_ctx)
435 .received_segments_no_dispatch
436 .increment();
437
438 if let Some(seg) =
444 (Closed { reason: None::<Option<ConnectionError>> }.on_segment(&incoming.into()))
445 {
446 socket::send_tcp_segment::<WireI, WireI, _, _, _>(
447 core_ctx,
448 bindings_ctx,
449 None,
450 None,
451 conn_addr,
452 seg.into_empty(),
453 &TcpIpSockOptions { marks: *marks },
454 );
455 }
456 None
457 }
458 FoundSocket::Yes(demux_id) => {
459 counters::increment_counter_with_optional_demux_id::<WireI, _, _, _, _>(
460 core_ctx,
461 demux_id.as_ref(),
462 |c| &c.received_segments_dispatched,
463 );
464 demux_id
465 }
466 };
467
468 if let Some(control) = incoming.control() {
469 counters::increment_counter_with_optional_demux_id::<WireI, _, _, _, _>(
470 core_ctx,
471 demux_id.as_ref(),
472 |c| match control {
473 Control::RST => &c.resets_received,
474 Control::SYN => &c.syns_received,
475 Control::FIN => &c.fins_received,
476 },
477 )
478 }
479}
480
481enum SocketLookupResult<I: DualStackIpExt, D: WeakDeviceIdentifier, BT: TcpBindingsTypes> {
482 Connection(I::DemuxSocketId<D, BT>, ConnAddr<ConnIpAddr<I::Addr, NonZeroU16, NonZeroU16>, D>),
483 Listener((I::DemuxSocketId<D, BT>, ListenerAddr<ListenerIpAddr<I::Addr, NonZeroU16>, D>)),
484}
485
486fn lookup_socket<I, CC, BC>(
487 DemuxState { socketmap, .. }: &DemuxState<I, CC::WeakDeviceId, BC>,
488 addrs_to_search: &mut AddrVecIter<I, CC::WeakDeviceId, TcpPortSpec>,
489) -> Option<SocketLookupResult<I, CC::WeakDeviceId, BC>>
490where
491 I: DualStackIpExt,
492 BC: TcpBindingsContext<CC::DeviceId>,
493 CC: TcpContext<I, BC>,
494{
495 addrs_to_search.find_map(|addr| {
496 match addr {
497 AddrVec::Conn(conn_addr) => {
500 socketmap.conns().get_by_addr(&conn_addr).map(|conn_addr_state| {
501 SocketLookupResult::Connection(conn_addr_state.id(), conn_addr)
502 })
503 }
504 AddrVec::Listen(listener_addr) => {
505 socketmap
510 .listeners()
511 .get_by_addr(&listener_addr)
512 .and_then(|addr_state| match addr_state {
513 ListenerAddrState::ExclusiveListener(id) => Some(id.clone()),
514 ListenerAddrState::Shared { listener: Some(id), bound: _ } => {
515 Some(id.clone())
516 }
517 ListenerAddrState::ExclusiveBound(_)
518 | ListenerAddrState::Shared { listener: None, bound: _ } => None,
519 })
520 .map(|id| SocketLookupResult::Listener((id, listener_addr)))
521 }
522 }
523 })
524}
525
526#[derive(PartialEq, Eq)]
527enum ConnectionIncomingSegmentDisposition {
528 FoundSocket,
529 Filtered,
530 ReuseCandidateForListener,
531 Destroy,
532}
533
534enum ListenerIncomingSegmentDisposition<S> {
535 FoundSocket,
536 Filtered,
537 ConflictingConnection,
538 NoMatchingSocket,
539 NewConnection(S),
540}
541
542fn try_handle_incoming_for_connection_dual_stack<SockI, WireI, CC, BC, H>(
543 core_ctx: &mut CC,
544 bindings_ctx: &mut BC,
545 conn_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
546 incoming_device: &CC::DeviceId,
547 header_info: &H,
548 incoming: &VerifiedTcpSegment<'_>,
549) -> ConnectionIncomingSegmentDisposition
550where
551 SockI: DualStackIpExt,
552 WireI: Ip,
553 BC: TcpBindingsContext<CC::DeviceId>
554 + BufferProvider<
555 BC::ReceiveBuffer,
556 BC::SendBuffer,
557 ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
558 PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
559 >,
560 CC: TcpContext<SockI, BC>,
561 H: IpHeaderInfo<WireI>,
562{
563 core_ctx.with_socket_mut_transport_demux(conn_id, |core_ctx, socket_state| {
564 let TcpSocketState { socket_state, sharing: _, ip_options: _, socket_options } =
565 socket_state;
566
567 match run_socket_ingress_filter(
568 bindings_ctx,
569 incoming_device,
570 conn_id.socket_cookie(),
571 socket_options,
572 header_info,
573 incoming.tcp_segment(),
574 ) {
575 SocketIngressFilterResult::Accept => (),
576 SocketIngressFilterResult::Drop => {
577 return ConnectionIncomingSegmentDisposition::Filtered;
578 }
579 }
580
581 let (conn_and_addr, timer) = assert_matches!(
582 socket_state,
583 TcpSocketStateInner::Connected { conn, timer } => (conn, timer),
584 "invalid socket ID"
585 );
586 let this_or_other_stack = match core_ctx {
587 MaybeDualStack::DualStack((core_ctx, converter)) => {
588 match converter.convert(conn_and_addr) {
589 EitherStack::ThisStack((conn, conn_addr)) => {
590 EitherStack::ThisStack((
595 core_ctx.as_this_stack(),
596 conn,
597 conn_addr,
598 SockI::into_demux_socket_id(conn_id.clone()),
599 ))
600 }
601 EitherStack::OtherStack((conn, conn_addr)) => {
602 let demux_sock_id = core_ctx.into_other_demux_socket_id(conn_id.clone());
605 EitherStack::OtherStack((core_ctx, conn, conn_addr, demux_sock_id))
606 }
607 }
608 }
609 MaybeDualStack::NotDualStack((core_ctx, converter)) => {
610 let (conn, conn_addr) = converter.convert(conn_and_addr);
611 EitherStack::ThisStack((
614 core_ctx.as_this_stack(),
615 conn,
616 conn_addr,
617 SockI::into_demux_socket_id(conn_id.clone()),
618 ))
619 }
620 };
621
622 match this_or_other_stack {
623 EitherStack::ThisStack((core_ctx, conn, conn_addr, demux_conn_id)) => {
624 try_handle_incoming_for_connection::<_, _, CC, _, _>(
625 core_ctx,
626 bindings_ctx,
627 conn_addr.clone(),
628 conn_id,
629 demux_conn_id,
630 socket_options,
631 conn,
632 timer,
633 incoming.into(),
634 )
635 }
636 EitherStack::OtherStack((core_ctx, conn, conn_addr, demux_conn_id)) => {
637 try_handle_incoming_for_connection::<_, _, CC, _, _>(
638 core_ctx,
639 bindings_ctx,
640 conn_addr.clone(),
641 conn_id,
642 demux_conn_id,
643 socket_options,
644 conn,
645 timer,
646 incoming.into(),
647 )
648 }
649 }
650 })
651}
652
653fn try_handle_incoming_for_connection<SockI, WireI, CC, BC, DC>(
660 core_ctx: &mut DC,
661 bindings_ctx: &mut BC,
662 conn_addr: ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
663 conn_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
664 demux_id: WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
665 socket_options: &SocketOptions,
666 conn: &mut Connection<SockI, WireI, CC::WeakDeviceId, BC>,
667 timer: &mut BC::Timer,
668 incoming: Segment<&[u8]>,
669) -> ConnectionIncomingSegmentDisposition
670where
671 SockI: DualStackIpExt,
672 WireI: DualStackIpExt,
673 BC: TcpBindingsContext<CC::DeviceId>
674 + BufferProvider<
675 BC::ReceiveBuffer,
676 BC::SendBuffer,
677 ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
678 PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
679 >,
680 CC: TcpContext<SockI, BC>,
681 DC: TransportIpContext<WireI, BC, DeviceId = CC::DeviceId, WeakDeviceId = CC::WeakDeviceId>
682 + DeviceIpSocketHandler<SockI, BC>
683 + TcpDemuxContext<WireI, CC::WeakDeviceId, BC>
684 + TcpCounterContext<SockI, CC::WeakDeviceId, BC>
685 + CoreTxMetadataContext<TcpSocketTxMetadata<SockI, CC::WeakDeviceId, BC>, BC>,
686{
687 let Connection { accept_queue, state, ip_sock, defunct, soft_error: _, handshake_status } =
688 conn;
689
690 if *defunct
702 && incoming.header().control == Some(Control::SYN)
703 && incoming.header().ack.is_none()
704 {
705 if let State::TimeWait(TimeWait { last_seq: _, closed_rcv, expiry: _, snd_info: _ }) = state
706 {
707 if !incoming.header().seq.before(closed_rcv.ack) {
708 return ConnectionIncomingSegmentDisposition::ReuseCandidateForListener;
709 }
710 }
711 }
712 let (reply, passive_open, data_acked, newly_closed) = state.on_segment::<_, BC>(
713 &conn_id.either(),
714 &TcpCountersRefs::from_ctx(core_ctx, conn_id),
715 incoming,
716 bindings_ctx.now(),
717 socket_options,
718 *defunct,
719 );
720
721 match data_acked {
722 DataAcked::Yes => {
723 core_ctx.confirm_reachable(bindings_ctx, ip_sock, &socket_options.ip_options)
724 }
725 DataAcked::No => {}
726 }
727
728 match state {
729 State::Listen(_) => {
730 unreachable!("has an invalid status: {:?}", conn.state)
731 }
732 State::SynSent(_) | State::SynRcvd(_) => {
733 assert_eq!(*handshake_status, HandshakeStatus::Pending)
734 }
735 State::Established(_)
736 | State::FinWait1(_)
737 | State::FinWait2(_)
738 | State::Closing(_)
739 | State::CloseWait(_)
740 | State::LastAck(_)
741 | State::TimeWait(_) => {
742 if handshake_status
743 .update_if_pending(HandshakeStatus::Completed { reported: accept_queue.is_some() })
744 {
745 core_ctx.confirm_reachable(bindings_ctx, ip_sock, &socket_options.ip_options);
746 }
747 }
748 State::Closed(Closed { reason }) => {
749 socket::handle_newly_closed(
757 core_ctx,
758 bindings_ctx,
759 newly_closed,
760 &demux_id,
761 &conn_addr,
762 timer,
763 );
764 if let Some(accept_queue) = accept_queue {
765 accept_queue.remove(&conn_id);
766 *defunct = true;
767 }
768 if *defunct {
769 return ConnectionIncomingSegmentDisposition::Destroy;
772 }
773 let _: bool = handshake_status.update_if_pending(match reason {
774 None => HandshakeStatus::Completed { reported: accept_queue.is_some() },
775 Some(_err) => HandshakeStatus::Aborted,
776 });
777 }
778 }
779
780 if let Some(seg) = reply {
781 socket::send_tcp_segment(
782 core_ctx,
783 bindings_ctx,
784 Some(conn_id),
785 Some(&ip_sock),
786 conn_addr.ip,
787 seg.into_empty(),
788 &socket_options.ip_options,
789 );
790 }
791
792 socket::do_send_inner_and_then_handle_newly_closed(
794 conn_id,
795 &demux_id,
796 socket_options,
797 conn,
798 DoSendLimit::MultipleSegments,
799 &conn_addr,
800 timer,
801 core_ctx,
802 bindings_ctx,
803 );
804
805 if let Some(passive_open) = passive_open {
808 let accept_queue = conn.accept_queue.as_ref().expect("no accept queue but passive open");
809 accept_queue.notify_ready(conn_id, passive_open);
810 }
811
812 ConnectionIncomingSegmentDisposition::FoundSocket
814}
815
816fn try_handle_listener_incoming_disposition<SockI, WireI, CC, BC, Addr>(
821 core_ctx: &mut CC,
822 bindings_ctx: &mut BC,
823 disposition: ListenerIncomingSegmentDisposition<PrimaryRc<SockI, CC::WeakDeviceId, BC>>,
824 demux_listener_id: &WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
825 tw_reuse: &mut Option<(WireI::DemuxSocketId<CC::WeakDeviceId, BC>, Addr)>,
826 addrs_to_search: &mut AddrVecIter<WireI, CC::WeakDeviceId, TcpPortSpec>,
827 conn_addr: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
828 incoming_device: &CC::DeviceId,
829) -> bool
830where
831 SockI: DualStackIpExt,
832 WireI: DualStackIpExt,
833 CC: TcpContext<SockI, BC> + TcpContext<WireI, BC> + TcpContext<WireI::OtherVersion, BC>,
834 BC: TcpBindingsContext<CC::DeviceId>,
835{
836 match disposition {
837 ListenerIncomingSegmentDisposition::FoundSocket => true,
838 ListenerIncomingSegmentDisposition::Filtered => true,
839 ListenerIncomingSegmentDisposition::ConflictingConnection => {
840 if let Some((tw_reuse, _)) = tw_reuse.take() {
845 WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, tw_reuse);
846 }
847
848 *addrs_to_search = AddrVecIter::<WireI, CC::WeakDeviceId, TcpPortSpec>::with_device(
851 conn_addr.into(),
852 incoming_device.downgrade(),
853 );
854 false
855 }
856 ListenerIncomingSegmentDisposition::NoMatchingSocket => false,
857 ListenerIncomingSegmentDisposition::NewConnection(primary) => {
858 if let Some((tw_reuse, _)) = tw_reuse.take() {
864 WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, tw_reuse);
865 }
866
867 let id = TcpSocketId(PrimaryRc::clone_strong(&primary));
876 let to_destroy = core_ctx.with_all_sockets_mut(move |all_sockets| {
877 let insert_entry = TcpSocketSetEntry::Primary(primary);
878 match all_sockets.entry(id) {
879 hash_map::Entry::Vacant(v) => {
880 let _: &mut _ = v.insert(insert_entry);
881 None
882 }
883 hash_map::Entry::Occupied(mut o) => {
884 assert_matches!(
888 core::mem::replace(o.get_mut(), insert_entry),
889 TcpSocketSetEntry::DeadOnArrival
890 );
891 Some(o.key().clone())
892 }
893 }
894 });
895 if let Some(to_destroy) = to_destroy {
903 socket::destroy_socket(core_ctx, bindings_ctx, to_destroy);
904 }
905 counters::increment_counter_for_demux_id::<WireI, _, _, _, _>(
906 core_ctx,
907 demux_listener_id,
908 |c| &c.passive_connection_openings,
909 );
910 true
911 }
912 }
913}
914
915fn try_handle_incoming_for_listener<SockI, WireI, CC, BC, DC, H>(
919 core_ctx: &mut DC,
920 bindings_ctx: &mut BC,
921 listener_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
922 isn: &IsnGenerator<BC::Instant>,
923 timestamp_offset: &TimestampOffsetGenerator<BC::Instant>,
924 socket_state: &mut TcpSocketState<SockI, CC::WeakDeviceId, BC>,
925 header_info: &H,
926 incoming: &VerifiedTcpSegment<'_>,
927 incoming_addrs: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
928 incoming_device: &CC::DeviceId,
929 tw_reuse: &mut Option<(
930 WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
931 ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
932 )>,
933 make_connection: impl FnOnce(
934 Connection<SockI, WireI, CC::WeakDeviceId, BC>,
935 ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
936 ) -> SockI::ConnectionAndAddr<CC::WeakDeviceId, BC>,
937 make_demux_id: impl Fn(
938 TcpSocketId<SockI, CC::WeakDeviceId, BC>,
939 ) -> WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
940 marks: &Marks,
941) -> ListenerIncomingSegmentDisposition<PrimaryRc<SockI, CC::WeakDeviceId, BC>>
942where
943 SockI: DualStackIpExt,
944 WireI: DualStackIpExt,
945 BC: TcpBindingsContext<CC::DeviceId>
946 + BufferProvider<
947 BC::ReceiveBuffer,
948 BC::SendBuffer,
949 ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
950 PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
951 >,
952 CC: TcpContext<SockI, BC>,
953 DC: TransportIpContext<WireI, BC, DeviceId = CC::DeviceId, WeakDeviceId = CC::WeakDeviceId>
954 + DeviceIpSocketHandler<WireI, BC>
955 + TcpDemuxContext<WireI, CC::WeakDeviceId, BC>
956 + TcpCounterContext<SockI, CC::WeakDeviceId, BC>
957 + CoreTxMetadataContext<TcpSocketTxMetadata<SockI, CC::WeakDeviceId, BC>, BC>,
958 H: IpHeaderInfo<WireI>,
959{
960 let Listener { addr: listener_addr, accept_queue, backlog, buffer_sizes } =
961 match &socket_state.socket_state {
962 TcpSocketStateInner::Bound(_) => {
963 return ListenerIncomingSegmentDisposition::NoMatchingSocket;
965 }
966 TcpSocketStateInner::Listener(listener) => listener,
967 _ => panic!("unexpected socket state: {:?}", socket_state.socket_state),
968 };
969
970 let ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) } =
971 incoming_addrs;
972
973 match run_socket_ingress_filter(
974 bindings_ctx,
975 incoming_device,
976 listener_id.socket_cookie(),
977 &socket_state.socket_options,
978 header_info,
979 incoming.tcp_segment(),
980 ) {
981 SocketIngressFilterResult::Accept => (),
982 SocketIngressFilterResult::Drop => {
983 return ListenerIncomingSegmentDisposition::Filtered;
984 }
985 }
986
987 if accept_queue.len() == backlog.get() {
991 core_ctx.increment_both(listener_id, |counters| &counters.listener_queue_overflow);
992 core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
993 debug!("incoming SYN dropped because of the full backlog of the listener");
994 return ListenerIncomingSegmentDisposition::FoundSocket;
995 }
996
997 let bound_device = listener_addr.as_ref().clone();
1000 let bound_device = if remote_ip.as_ref().must_have_zone() {
1001 Some(bound_device.map_or(EitherDeviceId::Strong(incoming_device), EitherDeviceId::Weak))
1002 } else {
1003 bound_device.map(EitherDeviceId::Weak)
1004 };
1005
1006 let ip_options = TcpIpSockOptions { marks: *marks, ..socket_state.socket_options.ip_options };
1007 let socket_options = SocketOptions { ip_options, ..socket_state.socket_options };
1008
1009 let bound_device = bound_device.as_ref().map(|d| d.as_ref());
1010 let ip_sock = match core_ctx.new_ip_socket(
1011 bindings_ctx,
1012 IpSocketArgs {
1013 device: bound_device,
1014 local_ip: IpDeviceAddr::new_from_socket_ip_addr(local_ip),
1015 remote_ip,
1016 proto: IpProto::Tcp.into(),
1017 options: &ip_options,
1018 },
1019 ) {
1020 Ok(ip_sock) => ip_sock,
1021 err @ Err(IpSockCreationError::Route(_)) => {
1022 core_ctx.increment_both(listener_id, |counters| &counters.passive_open_no_route_errors);
1023 core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1024 debug!("cannot construct an ip socket to the SYN originator: {:?}, ignoring", err);
1025 return ListenerIncomingSegmentDisposition::NoMatchingSocket;
1026 }
1027 };
1028
1029 let isn = isn.generate(
1030 bindings_ctx.now(),
1031 (ip_sock.local_ip().clone().into(), local_port),
1032 (ip_sock.remote_ip().clone(), remote_port),
1033 );
1034 let timestamp_offset = timestamp_offset.generate::<SocketIpAddr<WireI::Addr>, NonZeroU16>(
1035 bindings_ctx.now(),
1036 (ip_sock.local_ip().clone().into(), local_port),
1037 (ip_sock.remote_ip().clone(), remote_port),
1038 );
1039 let device_mms = match core_ctx.get_mms(bindings_ctx, &ip_sock, &socket_options.ip_options) {
1040 Ok(mms) => mms,
1041 Err(err) => {
1042 error!("Cannot find a device with large enough MTU for the connection");
1046 core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1047 match err {
1048 MmsError::NoDevice(_) | MmsError::MTUTooSmall(_) => {
1049 return ListenerIncomingSegmentDisposition::FoundSocket;
1050 }
1051 }
1052 }
1053 };
1054 let Some(device_mss) = Mss::from_mms(device_mms) else {
1055 return ListenerIncomingSegmentDisposition::FoundSocket;
1056 };
1057
1058 let mut state = State::Listen(Closed::<Initial>::listen(
1059 isn,
1060 timestamp_offset,
1061 buffer_sizes.clone(),
1062 device_mss,
1063 Mss::default::<WireI>(),
1064 socket_options.user_timeout,
1065 ));
1066
1067 let result = state.on_segment::<_, BC>(
1072 &listener_id.either(),
1076 &TcpCountersRefs::from_ctx(core_ctx, listener_id),
1077 incoming.into(),
1078 bindings_ctx.now(),
1079 &SocketOptions::default(),
1080 false, );
1082 let reply = assert_matches!(
1083 result,
1084 (reply, None, _, NewlyClosed::No ) => reply
1085 );
1086
1087 let result = if matches!(state, State::SynRcvd(_)) {
1088 let poll_send_at = state.poll_send_at().expect("no retrans timer");
1089 let bound_device = ip_sock.device().cloned();
1090
1091 let addr = ConnAddr {
1092 ip: ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) },
1093 device: bound_device,
1094 };
1095
1096 let new_socket = core_ctx.with_demux_mut(|DemuxState { socketmap, .. }| {
1097 if let Some((tw_reuse, conn_addr)) = tw_reuse {
1107 match socketmap.conns_mut().remove(tw_reuse, &conn_addr) {
1108 Ok(()) => {
1109 }
1115 Err(NotFoundError) => {
1116 }
1120 }
1121 }
1122
1123 let accept_queue_clone = accept_queue.clone();
1125 let ip_sock = ip_sock.clone();
1126 let bindings_ctx_moved = &mut *bindings_ctx;
1127 let sharing = socket_state.sharing;
1128 match socketmap.conns_mut().try_insert_with(addr, sharing, move |addr, sharing| {
1129 let conn = make_connection(
1130 Connection {
1131 accept_queue: Some(accept_queue_clone),
1132 state,
1133 ip_sock,
1134 defunct: false,
1135 soft_error: None,
1136 handshake_status: HandshakeStatus::Pending,
1137 },
1138 addr,
1139 );
1140
1141 let (id, primary) = TcpSocketId::new_cyclic(
1142 |weak| {
1143 let mut timer = CC::new_timer(bindings_ctx_moved, weak);
1144 assert_eq!(
1148 bindings_ctx_moved.schedule_timer_instant(poll_send_at, &mut timer),
1149 None
1150 );
1151 TcpSocketStateInner::Connected { conn, timer }
1152 },
1153 sharing,
1154 socket_options,
1155 );
1156 (make_demux_id(id.clone()), (primary, id))
1157 }) {
1158 Ok((_entry, (primary, id))) => {
1159 accept_queue.push_pending(id);
1162 Some(primary)
1163 }
1164 Err((e, _sharing_state)) => {
1165 assert_matches!(e, InsertError::Exists);
1170 None
1175 }
1176 }
1177 });
1178
1179 match new_socket {
1180 Some(new_socket) => ListenerIncomingSegmentDisposition::NewConnection(new_socket),
1181 None => {
1182 core_ctx
1185 .increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1186 return ListenerIncomingSegmentDisposition::ConflictingConnection;
1187 }
1188 }
1189 } else {
1190 ListenerIncomingSegmentDisposition::FoundSocket
1193 };
1194
1195 if let Some(seg) = reply {
1197 socket::send_tcp_segment(
1198 core_ctx,
1199 bindings_ctx,
1200 Some(&listener_id),
1201 Some(&ip_sock),
1202 incoming_addrs,
1203 seg.into_empty(),
1204 &socket_options.ip_options,
1205 );
1206 }
1207
1208 result
1209}
1210
1211pub(super) fn tcp_serialize_segment<'a, I, P>(
1212 header: &'a SegmentHeader,
1213 data: P,
1214 conn_addr: ConnIpAddr<I::Addr, NonZeroU16, NonZeroU16>,
1215) -> impl TransportPacketSerializer<I, Buffer = EmptyBuf> + Debug + 'a
1216where
1217 I: FilterIpExt,
1218 P: InnerPacketBuilder + Debug + Payload + 'a,
1219{
1220 let SegmentHeader { seq, ack, wnd, control, options, push } = header;
1221 let ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) } = conn_addr;
1222 let mut builder = TcpSegmentBuilder::new(
1223 local_ip.addr(),
1224 remote_ip.addr(),
1225 local_port,
1226 remote_port,
1227 (*seq).into(),
1228 ack.map(Into::into),
1229 u16::from(*wnd),
1230 );
1231 builder.psh(*push);
1232 match control {
1233 None => {}
1234 Some(Control::SYN) => builder.syn(true),
1235 Some(Control::FIN) => builder.fin(true),
1236 Some(Control::RST) => builder.rst(true),
1237 }
1238 TcpSegmentBuilderWithOptions::new(builder, options.builder())
1239 .unwrap_or_else(|TcpOptionsTooLongError| {
1240 panic!("Too many TCP options");
1241 })
1242 .wrap_body(data.into_serializer())
1243}
1244
1245fn run_socket_ingress_filter<I, BC, D>(
1246 bindings_ctx: &BC,
1247 incoming_device: &D,
1248 socket_cookie: SocketCookie,
1249 socket_options: &SocketOptions,
1250 header_info: &impl IpHeaderInfo<I>,
1251 tcp_segment: &TcpSegment<&'_ [u8]>,
1252) -> SocketIngressFilterResult
1253where
1254 I: Ip,
1255 BC: TcpBindingsContext<D>,
1256 D: StrongDeviceIdentifier,
1257{
1258 let [ip_prefix, ip_options] = header_info.as_bytes();
1259 let [tcp_prefix, tcp_options, data] = tcp_segment.as_bytes();
1260 let mut slices = [ip_prefix, ip_options, tcp_prefix, tcp_options, data];
1261 let data = FragmentedByteSlice::new(&mut slices);
1262
1263 bindings_ctx.socket_ops_filter().on_ingress(
1264 I::VERSION,
1265 data,
1266 incoming_device,
1267 socket_cookie,
1268 &socket_options.ip_options.marks,
1269 )
1270}
1271
1272#[cfg(test)]
1273mod test {
1274 use ip_test_macro::ip_test;
1275 use netstack3_base::{
1276 HandshakeOptions, Options, ResetOptions, SackBlocks, SegmentOptions, UnscaledWindowSize,
1277 };
1278 use packet::{ParseBuffer as _, Serializer as _};
1279 use packet_formats::tcp::options::TcpOptions as _;
1280 use test_case::test_case;
1281
1282 use super::*;
1283
1284 trait TestIpExt: netstack3_base::testutil::TestIpExt + FilterIpExt {}
1285 impl<T> TestIpExt for T where T: netstack3_base::testutil::TestIpExt + FilterIpExt {}
1286
1287 const SEQ: SeqNum = SeqNum::new(12345);
1288 const ACK: SeqNum = SeqNum::new(67890);
1289 const FAKE_DATA: &'static [u8] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 0];
1290
1291 #[ip_test(I)]
1292 #[test_case(
1293 Segment::syn(SEQ, UnscaledWindowSize::from(u16::MAX),
1294 HandshakeOptions::default()), &[]
1295 ; "syn")]
1296 #[test_case(
1297 Segment::syn(SEQ, UnscaledWindowSize::from(u16::MAX),
1298 HandshakeOptions {
1299 mss: Some(Mss::new(1440).unwrap()),
1300 ..Default::default() }), &[]
1301 ; "syn with mss")]
1302 #[test_case(
1303 Segment::ack(SEQ, ACK, UnscaledWindowSize::from(u16::MAX), SegmentOptions::default()),
1304 &[]; "ack")]
1305 #[test_case(Segment::with_fake_data(SEQ, ACK, FAKE_DATA), FAKE_DATA; "data")]
1306 #[test_case(Segment::new_assert_no_discard(SegmentHeader {
1307 seq: SEQ,
1308 ack: Some(ACK),
1309 push: true,
1310 wnd: UnscaledWindowSize::from(u16::MAX),
1311 ..Default::default()
1312 },
1313 FAKE_DATA
1314 ), FAKE_DATA; "push")]
1315 fn tcp_serialize_segment<I: TestIpExt>(segment: Segment<&[u8]>, expected_body: &[u8]) {
1316 const SOURCE_PORT: NonZeroU16 = NonZeroU16::new(1111).unwrap();
1317 const DEST_PORT: NonZeroU16 = NonZeroU16::new(2222).unwrap();
1318
1319 let (header, data) = segment.into_parts();
1320 let serializer = super::tcp_serialize_segment::<I, _>(
1321 &header,
1322 data,
1323 ConnIpAddr {
1324 local: (SocketIpAddr::try_from(I::TEST_ADDRS.local_ip).unwrap(), SOURCE_PORT),
1325 remote: (SocketIpAddr::try_from(I::TEST_ADDRS.remote_ip).unwrap(), DEST_PORT),
1326 },
1327 );
1328
1329 let mut serialized = serializer.serialize_vec_outer().unwrap().unwrap_b();
1330 let parsed_segment = serialized
1331 .parse_with::<_, TcpSegment<_>>(TcpParseArgs::new(
1332 *I::TEST_ADDRS.remote_ip,
1333 *I::TEST_ADDRS.local_ip,
1334 ))
1335 .expect("is valid segment");
1336
1337 assert_eq!(parsed_segment.src_port(), SOURCE_PORT);
1338 assert_eq!(parsed_segment.dst_port(), DEST_PORT);
1339 assert_eq!(parsed_segment.seq_num(), u32::from(SEQ));
1340 assert_eq!(parsed_segment.psh(), header.push);
1341 assert_eq!(
1342 UnscaledWindowSize::from(parsed_segment.window_size()),
1343 UnscaledWindowSize::from(u16::MAX)
1344 );
1345
1346 let (mss, window_scale, sack_permitted, sack_blocks, timestamp) = match header.options {
1347 Options::Handshake(HandshakeOptions {
1348 mss,
1349 window_scale,
1350 sack_permitted,
1351 timestamp,
1352 }) => (mss, window_scale, sack_permitted, SackBlocks::EMPTY, timestamp),
1353 Options::Segment(SegmentOptions { timestamp, sack_blocks }) => {
1354 (None, None, false, sack_blocks, timestamp)
1355 }
1356 Options::Reset(ResetOptions { timestamp }) => {
1357 (None, None, false, SackBlocks::EMPTY, timestamp)
1358 }
1359 };
1360 assert_eq!(mss.map(|mss| mss.get()), parsed_segment.options().mss());
1361 assert_eq!(window_scale.map(|ws| ws.get()), parsed_segment.options().window_scale());
1362 assert_eq!(sack_permitted, parsed_segment.options().sack_permitted());
1363 assert_eq!(sack_blocks.as_slice(), parsed_segment.options().sack_blocks());
1364 assert_eq!(
1365 timestamp.as_ref().map(Into::into).as_ref(),
1366 parsed_segment.options().timestamp()
1367 );
1368
1369 assert_eq!(parsed_segment.into_body(), expected_body);
1370 }
1371}