1use core::fmt::Debug;
9use core::num::NonZeroU16;
10
11use assert_matches::assert_matches;
12use log::{debug, error, warn};
13use net_types::ip::Ip;
14use net_types::{SpecifiedAddr, Witness as _};
15use netstack3_base::socket::{
16 AddrIsMappedError, AddrVec, AddrVecIter, ConnAddr, ConnIpAddr, InsertError, ListenerAddr,
17 ListenerIpAddr, SocketCookie, SocketIpAddr, SocketIpAddrExt as _,
18};
19use netstack3_base::{
20 BidirectionalConverter as _, Control, CounterContext, CtxPair, EitherDeviceId, IpDeviceAddr,
21 Marks, Mss, NotFoundError, Payload, Segment, SegmentHeader, SeqNum, StrongDeviceIdentifier,
22 VerifiedTcpSegment, WeakDeviceIdentifier,
23};
24use netstack3_filter::{
25 FilterIpExt, SocketIngressFilterResult, SocketOpsFilter, TransportPacketSerializer,
26};
27use netstack3_hashmap::hash_map;
28use netstack3_ip::socket::{IpSockCreationError, IpSocketArgs, MmsError};
29use netstack3_ip::{
30 IpHeaderInfo, IpTransportContext, LocalDeliveryPacketInfo, ReceiveIpPacketMeta,
31 TransportIpContext, TransportReceiveError,
32};
33use netstack3_trace::trace_duration;
34use packet::{
35 BufferMut, BufferView as _, EmptyBuf, FragmentedByteSlice, InnerPacketBuilder, PacketBuilder,
36};
37use packet_formats::error::ParseError;
38use packet_formats::ip::IpProto;
39use packet_formats::tcp::{
40 TcpFlowAndSeqNum, TcpOptionsTooLongError, TcpParseArgs, TcpSegment, TcpSegmentBuilder,
41 TcpSegmentBuilderWithOptions,
42};
43
44use crate::internal::base::{BufferSizes, ConnectionError, SocketOptions, TcpIpSockOptions};
45use crate::internal::counters::{
46 self, TcpCounterContext, TcpCountersRefs, TcpCountersWithoutSocket,
47};
48use crate::internal::socket::generators::{IsnGenerator, TimestampOffsetGenerator};
49use crate::internal::socket::{
50 self, AsThisStack as _, BoundSocketState, Connection, CoreTxMetadataContext, DemuxState,
51 DeviceIpSocketHandler, DoSendLimit, DualStackDemuxIdConverter as _, DualStackIpExt,
52 EitherStack, HandshakeStatus, Listener, ListenerAddrState, ListenerSharingState,
53 MaybeDualStack, MaybeListener, PrimaryRc, TcpApi, TcpBindingsContext, TcpBindingsTypes,
54 TcpContext, TcpDemuxContext, TcpDualStackContext, TcpIpTransportContext, TcpPortSpec,
55 TcpSocketId, TcpSocketSetEntry, TcpSocketState, TcpSocketStateInner, TcpSocketTxMetadata,
56};
57use crate::internal::state::{
58 BufferProvider, Closed, DataAcked, Initial, NewlyClosed, State, TimeWait,
59};
60
61impl<BT: TcpBindingsTypes> BufferProvider<BT::ReceiveBuffer, BT::SendBuffer> for BT {
62 type ActiveOpen = BT::ListenerNotifierOrProvidedBuffers;
63
64 type PassiveOpen = BT::ReturnedBuffers;
65
66 fn new_passive_open_buffers(
67 buffer_sizes: BufferSizes,
68 ) -> (BT::ReceiveBuffer, BT::SendBuffer, Self::PassiveOpen) {
69 BT::new_passive_open_buffers(buffer_sizes)
70 }
71}
72
73impl<I, BC, CC> IpTransportContext<I, BC, CC> for TcpIpTransportContext
74where
75 I: DualStackIpExt,
76 BC: TcpBindingsContext<CC::DeviceId>
77 + BufferProvider<
78 BC::ReceiveBuffer,
79 BC::SendBuffer,
80 ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
81 PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
82 >,
83 CC: TcpContext<I, BC> + TcpContext<I::OtherVersion, BC>,
84{
85 fn receive_icmp_error(
86 core_ctx: &mut CC,
87 bindings_ctx: &mut BC,
88 _device: &CC::DeviceId,
89 original_src_ip: Option<SpecifiedAddr<I::Addr>>,
90 original_dst_ip: SpecifiedAddr<I::Addr>,
91 mut original_body: &[u8],
92 err: I::ErrorCode,
93 ) {
94 let mut buffer = &mut original_body;
95 let Some(flow_and_seqnum) = buffer.take_obj_front::<TcpFlowAndSeqNum>() else {
96 error!("received an ICMP error but its body is less than 8 bytes");
97 return;
98 };
99
100 let Some(original_src_ip) = original_src_ip else { return };
101 let Some(original_src_port) = NonZeroU16::new(flow_and_seqnum.src_port()) else { return };
102 let Some(original_dst_port) = NonZeroU16::new(flow_and_seqnum.dst_port()) else { return };
103 let original_seqnum = SeqNum::new(flow_and_seqnum.sequence_num());
104
105 TcpApi::<I, _>::new(CtxPair { core_ctx, bindings_ctx }).on_icmp_error(
106 original_src_ip,
107 original_dst_ip,
108 original_src_port,
109 original_dst_port,
110 original_seqnum,
111 err.into(),
112 );
113 }
114
115 fn receive_ip_packet<B: BufferMut, H: IpHeaderInfo<I>>(
116 core_ctx: &mut CC,
117 bindings_ctx: &mut BC,
118 device: &CC::DeviceId,
119 remote_ip: I::RecvSrcAddr,
120 local_ip: SpecifiedAddr<I::Addr>,
121 mut buffer: B,
122 info: &LocalDeliveryPacketInfo<I, H>,
123 ) -> Result<(), (B, TransportReceiveError)> {
124 let LocalDeliveryPacketInfo { meta, header_info, marks } = info;
125 let ReceiveIpPacketMeta { broadcast, transparent_override } = meta;
126 if let Some(delivery) = transparent_override {
127 warn!(
128 "TODO(https://fxbug.dev/337009139): transparent proxy not supported for TCP \
129 sockets; will not override dispatch to perform local delivery to {delivery:?}"
130 );
131 }
132
133 if broadcast.is_some() {
134 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
135 .invalid_ip_addrs_received
136 .increment();
137 debug!("tcp: dropping broadcast TCP packet");
138 return Ok(());
139 }
140
141 let remote_ip = match SpecifiedAddr::new(remote_ip.into_addr()) {
142 None => {
143 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
144 .invalid_ip_addrs_received
145 .increment();
146 debug!("tcp: source address unspecified, dropping the packet");
147 return Ok(());
148 }
149 Some(src_ip) => src_ip,
150 };
151 let remote_ip: SocketIpAddr<_> = match remote_ip.try_into() {
152 Ok(remote_ip) => remote_ip,
153 Err(AddrIsMappedError {}) => {
154 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
155 .invalid_ip_addrs_received
156 .increment();
157 debug!("tcp: source address is mapped (ipv4-mapped-ipv6), dropping the packet");
158 return Ok(());
159 }
160 };
161 let local_ip: SocketIpAddr<_> = match local_ip.try_into() {
162 Ok(local_ip) => local_ip,
163 Err(AddrIsMappedError {}) => {
164 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
165 .invalid_ip_addrs_received
166 .increment();
167 debug!("tcp: local address is mapped (ipv4-mapped-ipv6), dropping the packet");
168 return Ok(());
169 }
170 };
171 let packet = match buffer
172 .parse_with::<_, TcpSegment<_>>(TcpParseArgs::new(remote_ip.addr(), local_ip.addr()))
173 {
174 Ok(packet) => packet,
175 Err(err) => {
176 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
177 .invalid_segments_received
178 .increment();
179 debug!("tcp: failed parsing incoming packet {:?}", err);
180 match err {
181 ParseError::Checksum => {
182 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
183 .checksum_errors
184 .increment();
185 }
186 ParseError::NotSupported | ParseError::NotExpected | ParseError::Format => {}
187 }
188 return Ok(());
189 }
190 };
191 let local_port = packet.dst_port();
192 let remote_port = packet.src_port();
193 let incoming = match VerifiedTcpSegment::try_from(packet) {
194 Ok(segment) => segment,
195 Err(err) => {
196 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
197 .invalid_segments_received
198 .increment();
199 debug!("tcp: malformed segment {:?}", err);
200 return Ok(());
201 }
202 };
203 let conn_addr =
204 ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) };
205
206 CounterContext::<TcpCountersWithoutSocket<I>>::counters(core_ctx)
207 .valid_segments_received
208 .increment();
209 handle_incoming_packet::<I, _, _, _>(
210 core_ctx,
211 bindings_ctx,
212 conn_addr,
213 device,
214 header_info,
215 &incoming,
216 marks,
217 );
218 Ok(())
219 }
220}
221
222fn handle_incoming_packet<WireI, BC, CC, H>(
223 core_ctx: &mut CC,
224 bindings_ctx: &mut BC,
225 conn_addr: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
226 incoming_device: &CC::DeviceId,
227 header_info: &H,
228 incoming: &VerifiedTcpSegment<'_>,
229 marks: &Marks,
230) where
231 WireI: DualStackIpExt,
232 BC: TcpBindingsContext<CC::DeviceId>
233 + BufferProvider<
234 BC::ReceiveBuffer,
235 BC::SendBuffer,
236 ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
237 PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
238 >,
239 CC: TcpContext<WireI, BC> + TcpContext<WireI::OtherVersion, BC>,
240 H: IpHeaderInfo<WireI>,
241{
242 trace_duration!(c"tcp::handle_incoming_packet");
243 let mut tw_reuse = None;
244
245 let mut addrs_to_search = AddrVecIter::<WireI, CC::WeakDeviceId, TcpPortSpec>::with_device(
246 conn_addr.into(),
247 incoming_device.downgrade(),
248 );
249
250 enum FoundSocket<S> {
251 Yes(Option<S>),
254 No,
255 }
256 let found_socket = loop {
257 let sock = core_ctx
258 .with_demux(|demux| lookup_socket::<WireI, CC, BC>(demux, &mut addrs_to_search));
259 match sock {
260 None => break FoundSocket::No,
261 Some(SocketLookupResult::Connection(demux_conn_id, conn_addr)) => {
262 assert_eq!(tw_reuse, None);
265 let disposition = match WireI::as_dual_stack_ip_socket(&demux_conn_id) {
266 EitherStack::ThisStack(conn_id) => {
267 try_handle_incoming_for_connection_dual_stack(
268 core_ctx,
269 bindings_ctx,
270 conn_id,
271 incoming_device,
272 header_info,
273 &incoming,
274 )
275 }
276 EitherStack::OtherStack(conn_id) => {
277 try_handle_incoming_for_connection_dual_stack(
278 core_ctx,
279 bindings_ctx,
280 conn_id,
281 incoming_device,
282 header_info,
283 &incoming,
284 )
285 }
286 };
287 match disposition {
288 ConnectionIncomingSegmentDisposition::Destroy => {
289 WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, demux_conn_id);
290 break FoundSocket::Yes(None);
291 }
292 ConnectionIncomingSegmentDisposition::FoundSocket
293 | ConnectionIncomingSegmentDisposition::Filtered => {
294 break FoundSocket::Yes(Some(demux_conn_id));
295 }
296 ConnectionIncomingSegmentDisposition::ReuseCandidateForListener => {
297 tw_reuse = Some((demux_conn_id, conn_addr));
298 }
299 }
300 }
301 Some(SocketLookupResult::Listener((demux_listener_id, _listener_addr))) => {
302 match WireI::as_dual_stack_ip_socket(&demux_listener_id) {
303 EitherStack::ThisStack(listener_id) => {
304 let disposition = core_ctx.with_socket_mut_generators_transport_demux(
305 &listener_id,
306 |core_ctx, socket_state, isn, timestamp_offset| match core_ctx {
307 MaybeDualStack::NotDualStack((core_ctx, converter)) => {
308 try_handle_incoming_for_listener::<WireI, WireI, CC, BC, _, _>(
309 core_ctx,
310 bindings_ctx,
311 &listener_id,
312 isn,
313 timestamp_offset,
314 socket_state,
315 header_info,
316 incoming,
317 conn_addr,
318 incoming_device,
319 &mut tw_reuse,
320 move |conn, addr| converter.convert_back((conn, addr)),
321 WireI::into_demux_socket_id,
322 marks,
323 )
324 }
325 MaybeDualStack::DualStack((core_ctx, converter)) => {
326 try_handle_incoming_for_listener::<_, _, CC, BC, _, _>(
327 core_ctx,
328 bindings_ctx,
329 &listener_id,
330 isn,
331 timestamp_offset,
332 socket_state,
333 header_info,
334 incoming,
335 conn_addr,
336 incoming_device,
337 &mut tw_reuse,
338 move |conn, addr| {
339 converter
340 .convert_back(EitherStack::ThisStack((conn, addr)))
341 },
342 WireI::into_demux_socket_id,
343 marks,
344 )
345 }
346 },
347 );
348 if try_handle_listener_incoming_disposition(
349 core_ctx,
350 bindings_ctx,
351 disposition,
352 &demux_listener_id,
353 &mut tw_reuse,
354 &mut addrs_to_search,
355 conn_addr,
356 incoming_device,
357 ) {
358 break FoundSocket::Yes(Some(demux_listener_id));
359 }
360 }
361 EitherStack::OtherStack(listener_id) => {
362 let disposition = core_ctx.with_socket_mut_generators_transport_demux(
363 &listener_id,
364 |core_ctx, socket_state, isn, timestamp_offset| {
365 match core_ctx {
366 MaybeDualStack::NotDualStack((_core_ctx, _converter)) => {
367 unreachable!("OtherStack socket ID with non dual stack");
370 }
371 MaybeDualStack::DualStack((core_ctx, converter)) => {
372 let other_demux_id_converter =
373 core_ctx.other_demux_id_converter();
374 try_handle_incoming_for_listener::<_, _, CC, BC, _, _>(
375 core_ctx,
376 bindings_ctx,
377 &listener_id,
378 isn,
379 timestamp_offset,
380 socket_state,
381 header_info,
382 incoming,
383 conn_addr,
384 incoming_device,
385 &mut tw_reuse,
386 move |conn, addr| {
387 converter.convert_back(EitherStack::OtherStack((
388 conn, addr,
389 )))
390 },
391 move |id| other_demux_id_converter.convert(id),
392 marks,
393 )
394 }
395 }
396 },
397 );
398 if try_handle_listener_incoming_disposition::<_, _, CC, BC, _>(
399 core_ctx,
400 bindings_ctx,
401 disposition,
402 &demux_listener_id,
403 &mut tw_reuse,
404 &mut addrs_to_search,
405 conn_addr,
406 incoming_device,
407 ) {
408 break FoundSocket::Yes(Some(demux_listener_id));
409 }
410 }
411 };
412 }
413 }
414 };
415
416 let demux_id = match found_socket {
417 FoundSocket::No => {
418 CounterContext::<TcpCountersWithoutSocket<WireI>>::counters(core_ctx)
419 .received_segments_no_dispatch
420 .increment();
421
422 if let Some(seg) =
428 (Closed { reason: None::<Option<ConnectionError>> }.on_segment(&incoming.into()))
429 {
430 socket::send_tcp_segment::<WireI, WireI, _, _, _>(
431 core_ctx,
432 bindings_ctx,
433 None,
434 None,
435 conn_addr,
436 seg.into_empty(),
437 &TcpIpSockOptions { marks: *marks },
438 );
439 }
440 None
441 }
442 FoundSocket::Yes(demux_id) => {
443 counters::increment_counter_with_optional_demux_id::<WireI, _, _, _, _>(
444 core_ctx,
445 demux_id.as_ref(),
446 |c| &c.received_segments_dispatched,
447 );
448 demux_id
449 }
450 };
451
452 if let Some(control) = incoming.control() {
453 counters::increment_counter_with_optional_demux_id::<WireI, _, _, _, _>(
454 core_ctx,
455 demux_id.as_ref(),
456 |c| match control {
457 Control::RST => &c.resets_received,
458 Control::SYN => &c.syns_received,
459 Control::FIN => &c.fins_received,
460 },
461 )
462 }
463}
464
465enum SocketLookupResult<I: DualStackIpExt, D: WeakDeviceIdentifier, BT: TcpBindingsTypes> {
466 Connection(I::DemuxSocketId<D, BT>, ConnAddr<ConnIpAddr<I::Addr, NonZeroU16, NonZeroU16>, D>),
467 Listener((I::DemuxSocketId<D, BT>, ListenerAddr<ListenerIpAddr<I::Addr, NonZeroU16>, D>)),
468}
469
470fn lookup_socket<I, CC, BC>(
471 DemuxState { socketmap, .. }: &DemuxState<I, CC::WeakDeviceId, BC>,
472 addrs_to_search: &mut AddrVecIter<I, CC::WeakDeviceId, TcpPortSpec>,
473) -> Option<SocketLookupResult<I, CC::WeakDeviceId, BC>>
474where
475 I: DualStackIpExt,
476 BC: TcpBindingsContext<CC::DeviceId>,
477 CC: TcpContext<I, BC>,
478{
479 addrs_to_search.find_map(|addr| {
480 match addr {
481 AddrVec::Conn(conn_addr) => {
484 socketmap.conns().get_by_addr(&conn_addr).map(|conn_addr_state| {
485 SocketLookupResult::Connection(conn_addr_state.id(), conn_addr)
486 })
487 }
488 AddrVec::Listen(listener_addr) => {
489 socketmap
494 .listeners()
495 .get_by_addr(&listener_addr)
496 .and_then(|addr_state| match addr_state {
497 ListenerAddrState::ExclusiveListener(id) => Some(id.clone()),
498 ListenerAddrState::Shared { listener: Some(id), bound: _ } => {
499 Some(id.clone())
500 }
501 ListenerAddrState::ExclusiveBound(_)
502 | ListenerAddrState::Shared { listener: None, bound: _ } => None,
503 })
504 .map(|id| SocketLookupResult::Listener((id, listener_addr)))
505 }
506 }
507 })
508}
509
510#[derive(PartialEq, Eq)]
511enum ConnectionIncomingSegmentDisposition {
512 FoundSocket,
513 Filtered,
514 ReuseCandidateForListener,
515 Destroy,
516}
517
518enum ListenerIncomingSegmentDisposition<S> {
519 FoundSocket,
520 Filtered,
521 ConflictingConnection,
522 NoMatchingSocket,
523 NewConnection(S),
524}
525
526fn try_handle_incoming_for_connection_dual_stack<SockI, WireI, CC, BC, H>(
527 core_ctx: &mut CC,
528 bindings_ctx: &mut BC,
529 conn_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
530 incoming_device: &CC::DeviceId,
531 header_info: &H,
532 incoming: &VerifiedTcpSegment<'_>,
533) -> ConnectionIncomingSegmentDisposition
534where
535 SockI: DualStackIpExt,
536 WireI: Ip,
537 BC: TcpBindingsContext<CC::DeviceId>
538 + BufferProvider<
539 BC::ReceiveBuffer,
540 BC::SendBuffer,
541 ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
542 PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
543 >,
544 CC: TcpContext<SockI, BC>,
545 H: IpHeaderInfo<WireI>,
546{
547 core_ctx.with_socket_mut_transport_demux(conn_id, |core_ctx, socket_state| {
548 let TcpSocketState { socket_state, ip_options: _, socket_options } = socket_state;
549
550 match run_socket_ingress_filter(
551 bindings_ctx,
552 incoming_device,
553 conn_id.socket_cookie(),
554 socket_options,
555 header_info,
556 incoming.tcp_segment(),
557 ) {
558 SocketIngressFilterResult::Accept => (),
559 SocketIngressFilterResult::Drop => {
560 return ConnectionIncomingSegmentDisposition::Filtered;
561 }
562 }
563
564 let (conn_and_addr, timer) = assert_matches!(
565 socket_state,
566 TcpSocketStateInner::Bound(BoundSocketState::Connected {
567 conn, timer, sharing: _
568 }) => (conn , timer),
569 "invalid socket ID"
570 );
571 let this_or_other_stack = match core_ctx {
572 MaybeDualStack::DualStack((core_ctx, converter)) => {
573 match converter.convert(conn_and_addr) {
574 EitherStack::ThisStack((conn, conn_addr)) => {
575 EitherStack::ThisStack((
580 core_ctx.as_this_stack(),
581 conn,
582 conn_addr,
583 SockI::into_demux_socket_id(conn_id.clone()),
584 ))
585 }
586 EitherStack::OtherStack((conn, conn_addr)) => {
587 let demux_sock_id = core_ctx.into_other_demux_socket_id(conn_id.clone());
590 EitherStack::OtherStack((core_ctx, conn, conn_addr, demux_sock_id))
591 }
592 }
593 }
594 MaybeDualStack::NotDualStack((core_ctx, converter)) => {
595 let (conn, conn_addr) = converter.convert(conn_and_addr);
596 EitherStack::ThisStack((
599 core_ctx.as_this_stack(),
600 conn,
601 conn_addr,
602 SockI::into_demux_socket_id(conn_id.clone()),
603 ))
604 }
605 };
606
607 match this_or_other_stack {
608 EitherStack::ThisStack((core_ctx, conn, conn_addr, demux_conn_id)) => {
609 try_handle_incoming_for_connection::<_, _, CC, _, _>(
610 core_ctx,
611 bindings_ctx,
612 conn_addr.clone(),
613 conn_id,
614 demux_conn_id,
615 socket_options,
616 conn,
617 timer,
618 incoming.into(),
619 )
620 }
621 EitherStack::OtherStack((core_ctx, conn, conn_addr, demux_conn_id)) => {
622 try_handle_incoming_for_connection::<_, _, CC, _, _>(
623 core_ctx,
624 bindings_ctx,
625 conn_addr.clone(),
626 conn_id,
627 demux_conn_id,
628 socket_options,
629 conn,
630 timer,
631 incoming.into(),
632 )
633 }
634 }
635 })
636}
637
638fn try_handle_incoming_for_connection<SockI, WireI, CC, BC, DC>(
645 core_ctx: &mut DC,
646 bindings_ctx: &mut BC,
647 conn_addr: ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
648 conn_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
649 demux_id: WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
650 socket_options: &SocketOptions,
651 conn: &mut Connection<SockI, WireI, CC::WeakDeviceId, BC>,
652 timer: &mut BC::Timer,
653 incoming: Segment<&[u8]>,
654) -> ConnectionIncomingSegmentDisposition
655where
656 SockI: DualStackIpExt,
657 WireI: DualStackIpExt,
658 BC: TcpBindingsContext<CC::DeviceId>
659 + BufferProvider<
660 BC::ReceiveBuffer,
661 BC::SendBuffer,
662 ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
663 PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
664 >,
665 CC: TcpContext<SockI, BC>,
666 DC: TransportIpContext<WireI, BC, DeviceId = CC::DeviceId, WeakDeviceId = CC::WeakDeviceId>
667 + DeviceIpSocketHandler<SockI, BC>
668 + TcpDemuxContext<WireI, CC::WeakDeviceId, BC>
669 + TcpCounterContext<SockI, CC::WeakDeviceId, BC>
670 + CoreTxMetadataContext<TcpSocketTxMetadata<SockI, CC::WeakDeviceId, BC>, BC>,
671{
672 let Connection { accept_queue, state, ip_sock, defunct, soft_error: _, handshake_status } =
673 conn;
674
675 if *defunct
687 && incoming.header().control == Some(Control::SYN)
688 && incoming.header().ack.is_none()
689 {
690 if let State::TimeWait(TimeWait { last_seq: _, closed_rcv, expiry: _ }) = state {
691 if !incoming.header().seq.before(closed_rcv.ack) {
692 return ConnectionIncomingSegmentDisposition::ReuseCandidateForListener;
693 }
694 }
695 }
696 let (reply, passive_open, data_acked, newly_closed) = state.on_segment::<_, BC>(
697 &conn_id.either(),
698 &TcpCountersRefs::from_ctx(core_ctx, conn_id),
699 incoming,
700 bindings_ctx.now(),
701 socket_options,
702 *defunct,
703 );
704
705 match data_acked {
706 DataAcked::Yes => {
707 core_ctx.confirm_reachable(bindings_ctx, ip_sock, &socket_options.ip_options)
708 }
709 DataAcked::No => {}
710 }
711
712 match state {
713 State::Listen(_) => {
714 unreachable!("has an invalid status: {:?}", conn.state)
715 }
716 State::SynSent(_) | State::SynRcvd(_) => {
717 assert_eq!(*handshake_status, HandshakeStatus::Pending)
718 }
719 State::Established(_)
720 | State::FinWait1(_)
721 | State::FinWait2(_)
722 | State::Closing(_)
723 | State::CloseWait(_)
724 | State::LastAck(_)
725 | State::TimeWait(_) => {
726 if handshake_status
727 .update_if_pending(HandshakeStatus::Completed { reported: accept_queue.is_some() })
728 {
729 core_ctx.confirm_reachable(bindings_ctx, ip_sock, &socket_options.ip_options);
730 }
731 }
732 State::Closed(Closed { reason }) => {
733 socket::handle_newly_closed(
741 core_ctx,
742 bindings_ctx,
743 newly_closed,
744 &demux_id,
745 &conn_addr,
746 timer,
747 );
748 if let Some(accept_queue) = accept_queue {
749 accept_queue.remove(&conn_id);
750 *defunct = true;
751 }
752 if *defunct {
753 return ConnectionIncomingSegmentDisposition::Destroy;
756 }
757 let _: bool = handshake_status.update_if_pending(match reason {
758 None => HandshakeStatus::Completed { reported: accept_queue.is_some() },
759 Some(_err) => HandshakeStatus::Aborted,
760 });
761 }
762 }
763
764 if let Some(seg) = reply {
765 socket::send_tcp_segment(
766 core_ctx,
767 bindings_ctx,
768 Some(conn_id),
769 Some(&ip_sock),
770 conn_addr.ip,
771 seg.into_empty(),
772 &socket_options.ip_options,
773 );
774 }
775
776 socket::do_send_inner_and_then_handle_newly_closed(
778 conn_id,
779 &demux_id,
780 socket_options,
781 conn,
782 DoSendLimit::MultipleSegments,
783 &conn_addr,
784 timer,
785 core_ctx,
786 bindings_ctx,
787 );
788
789 if let Some(passive_open) = passive_open {
792 let accept_queue = conn.accept_queue.as_ref().expect("no accept queue but passive open");
793 accept_queue.notify_ready(conn_id, passive_open);
794 }
795
796 ConnectionIncomingSegmentDisposition::FoundSocket
798}
799
800fn try_handle_listener_incoming_disposition<SockI, WireI, CC, BC, Addr>(
805 core_ctx: &mut CC,
806 bindings_ctx: &mut BC,
807 disposition: ListenerIncomingSegmentDisposition<PrimaryRc<SockI, CC::WeakDeviceId, BC>>,
808 demux_listener_id: &WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
809 tw_reuse: &mut Option<(WireI::DemuxSocketId<CC::WeakDeviceId, BC>, Addr)>,
810 addrs_to_search: &mut AddrVecIter<WireI, CC::WeakDeviceId, TcpPortSpec>,
811 conn_addr: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
812 incoming_device: &CC::DeviceId,
813) -> bool
814where
815 SockI: DualStackIpExt,
816 WireI: DualStackIpExt,
817 CC: TcpContext<SockI, BC> + TcpContext<WireI, BC> + TcpContext<WireI::OtherVersion, BC>,
818 BC: TcpBindingsContext<CC::DeviceId>,
819{
820 match disposition {
821 ListenerIncomingSegmentDisposition::FoundSocket => true,
822 ListenerIncomingSegmentDisposition::Filtered => true,
823 ListenerIncomingSegmentDisposition::ConflictingConnection => {
824 if let Some((tw_reuse, _)) = tw_reuse.take() {
829 WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, tw_reuse);
830 }
831
832 *addrs_to_search = AddrVecIter::<WireI, CC::WeakDeviceId, TcpPortSpec>::with_device(
835 conn_addr.into(),
836 incoming_device.downgrade(),
837 );
838 false
839 }
840 ListenerIncomingSegmentDisposition::NoMatchingSocket => false,
841 ListenerIncomingSegmentDisposition::NewConnection(primary) => {
842 if let Some((tw_reuse, _)) = tw_reuse.take() {
848 WireI::destroy_socket_with_demux_id(core_ctx, bindings_ctx, tw_reuse);
849 }
850
851 let id = TcpSocketId(PrimaryRc::clone_strong(&primary));
860 let to_destroy = core_ctx.with_all_sockets_mut(move |all_sockets| {
861 let insert_entry = TcpSocketSetEntry::Primary(primary);
862 match all_sockets.entry(id) {
863 hash_map::Entry::Vacant(v) => {
864 let _: &mut _ = v.insert(insert_entry);
865 None
866 }
867 hash_map::Entry::Occupied(mut o) => {
868 assert_matches!(
872 core::mem::replace(o.get_mut(), insert_entry),
873 TcpSocketSetEntry::DeadOnArrival
874 );
875 Some(o.key().clone())
876 }
877 }
878 });
879 if let Some(to_destroy) = to_destroy {
887 socket::destroy_socket(core_ctx, bindings_ctx, to_destroy);
888 }
889 counters::increment_counter_for_demux_id::<WireI, _, _, _, _>(
890 core_ctx,
891 demux_listener_id,
892 |c| &c.passive_connection_openings,
893 );
894 true
895 }
896 }
897}
898
899fn try_handle_incoming_for_listener<SockI, WireI, CC, BC, DC, H>(
903 core_ctx: &mut DC,
904 bindings_ctx: &mut BC,
905 listener_id: &TcpSocketId<SockI, CC::WeakDeviceId, BC>,
906 isn: &IsnGenerator<BC::Instant>,
907 timestamp_offset: &TimestampOffsetGenerator<BC::Instant>,
908 socket_state: &mut TcpSocketState<SockI, CC::WeakDeviceId, BC>,
909 header_info: &H,
910 incoming: &VerifiedTcpSegment<'_>,
911 incoming_addrs: ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>,
912 incoming_device: &CC::DeviceId,
913 tw_reuse: &mut Option<(
914 WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
915 ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
916 )>,
917 make_connection: impl FnOnce(
918 Connection<SockI, WireI, CC::WeakDeviceId, BC>,
919 ConnAddr<ConnIpAddr<WireI::Addr, NonZeroU16, NonZeroU16>, CC::WeakDeviceId>,
920 ) -> SockI::ConnectionAndAddr<CC::WeakDeviceId, BC>,
921 make_demux_id: impl Fn(
922 TcpSocketId<SockI, CC::WeakDeviceId, BC>,
923 ) -> WireI::DemuxSocketId<CC::WeakDeviceId, BC>,
924 marks: &Marks,
925) -> ListenerIncomingSegmentDisposition<PrimaryRc<SockI, CC::WeakDeviceId, BC>>
926where
927 SockI: DualStackIpExt,
928 WireI: DualStackIpExt,
929 BC: TcpBindingsContext<CC::DeviceId>
930 + BufferProvider<
931 BC::ReceiveBuffer,
932 BC::SendBuffer,
933 ActiveOpen = <BC as TcpBindingsTypes>::ListenerNotifierOrProvidedBuffers,
934 PassiveOpen = <BC as TcpBindingsTypes>::ReturnedBuffers,
935 >,
936 CC: TcpContext<SockI, BC>,
937 DC: TransportIpContext<WireI, BC, DeviceId = CC::DeviceId, WeakDeviceId = CC::WeakDeviceId>
938 + DeviceIpSocketHandler<WireI, BC>
939 + TcpDemuxContext<WireI, CC::WeakDeviceId, BC>
940 + TcpCounterContext<SockI, CC::WeakDeviceId, BC>
941 + CoreTxMetadataContext<TcpSocketTxMetadata<SockI, CC::WeakDeviceId, BC>, BC>,
942 H: IpHeaderInfo<WireI>,
943{
944 let (maybe_listener, sharing, listener_addr) = assert_matches!(
945 &socket_state.socket_state,
946 TcpSocketStateInner::Bound(BoundSocketState::Listener(l)) => l,
947 "invalid socket ID"
948 );
949
950 let ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) } =
951 incoming_addrs;
952
953 let Listener { accept_queue, backlog, buffer_sizes } = match maybe_listener {
954 MaybeListener::Bound(_bound) => {
955 return ListenerIncomingSegmentDisposition::NoMatchingSocket;
957 }
958 MaybeListener::Listener(listener) => listener,
959 };
960
961 match run_socket_ingress_filter(
962 bindings_ctx,
963 incoming_device,
964 listener_id.socket_cookie(),
965 &socket_state.socket_options,
966 header_info,
967 incoming.tcp_segment(),
968 ) {
969 SocketIngressFilterResult::Accept => (),
970 SocketIngressFilterResult::Drop => {
971 return ListenerIncomingSegmentDisposition::Filtered;
972 }
973 }
974
975 if accept_queue.len() == backlog.get() {
979 core_ctx.increment_both(listener_id, |counters| &counters.listener_queue_overflow);
980 core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
981 debug!("incoming SYN dropped because of the full backlog of the listener");
982 return ListenerIncomingSegmentDisposition::FoundSocket;
983 }
984
985 let bound_device = listener_addr.as_ref().clone();
988 let bound_device = if remote_ip.as_ref().must_have_zone() {
989 Some(bound_device.map_or(EitherDeviceId::Strong(incoming_device), EitherDeviceId::Weak))
990 } else {
991 bound_device.map(EitherDeviceId::Weak)
992 };
993
994 let ip_options = TcpIpSockOptions { marks: *marks, ..socket_state.socket_options.ip_options };
995 let socket_options = SocketOptions { ip_options, ..socket_state.socket_options };
996
997 let bound_device = bound_device.as_ref().map(|d| d.as_ref());
998 let ip_sock = match core_ctx.new_ip_socket(
999 bindings_ctx,
1000 IpSocketArgs {
1001 device: bound_device,
1002 local_ip: IpDeviceAddr::new_from_socket_ip_addr(local_ip),
1003 remote_ip,
1004 proto: IpProto::Tcp.into(),
1005 options: &ip_options,
1006 },
1007 ) {
1008 Ok(ip_sock) => ip_sock,
1009 err @ Err(IpSockCreationError::Route(_)) => {
1010 core_ctx.increment_both(listener_id, |counters| &counters.passive_open_no_route_errors);
1011 core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1012 debug!("cannot construct an ip socket to the SYN originator: {:?}, ignoring", err);
1013 return ListenerIncomingSegmentDisposition::NoMatchingSocket;
1014 }
1015 };
1016
1017 let isn = isn.generate(
1018 bindings_ctx.now(),
1019 (ip_sock.local_ip().clone().into(), local_port),
1020 (ip_sock.remote_ip().clone(), remote_port),
1021 );
1022 let timestamp_offset = timestamp_offset.generate::<SocketIpAddr<WireI::Addr>, NonZeroU16>(
1023 bindings_ctx.now(),
1024 (ip_sock.local_ip().clone().into(), local_port),
1025 (ip_sock.remote_ip().clone(), remote_port),
1026 );
1027 let device_mms = match core_ctx.get_mms(bindings_ctx, &ip_sock, &socket_options.ip_options) {
1028 Ok(mms) => mms,
1029 Err(err) => {
1030 error!("Cannot find a device with large enough MTU for the connection");
1034 core_ctx.increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1035 match err {
1036 MmsError::NoDevice(_) | MmsError::MTUTooSmall(_) => {
1037 return ListenerIncomingSegmentDisposition::FoundSocket;
1038 }
1039 }
1040 }
1041 };
1042 let Some(device_mss) = Mss::from_mms(device_mms) else {
1043 return ListenerIncomingSegmentDisposition::FoundSocket;
1044 };
1045
1046 let mut state = State::Listen(Closed::<Initial>::listen(
1047 isn,
1048 timestamp_offset,
1049 buffer_sizes.clone(),
1050 device_mss,
1051 Mss::default::<WireI>(),
1052 socket_options.user_timeout,
1053 ));
1054
1055 let result = state.on_segment::<_, BC>(
1060 &listener_id.either(),
1064 &TcpCountersRefs::from_ctx(core_ctx, listener_id),
1065 incoming.into(),
1066 bindings_ctx.now(),
1067 &SocketOptions::default(),
1068 false, );
1070 let reply = assert_matches!(
1071 result,
1072 (reply, None, _, NewlyClosed::No ) => reply
1073 );
1074
1075 let result = if matches!(state, State::SynRcvd(_)) {
1076 let poll_send_at = state.poll_send_at().expect("no retrans timer");
1077 let ListenerSharingState { sharing, listening: _ } = *sharing;
1078 let bound_device = ip_sock.device().cloned();
1079
1080 let addr = ConnAddr {
1081 ip: ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) },
1082 device: bound_device,
1083 };
1084
1085 let new_socket = core_ctx.with_demux_mut(|DemuxState { socketmap, .. }| {
1086 if let Some((tw_reuse, conn_addr)) = tw_reuse {
1096 match socketmap.conns_mut().remove(tw_reuse, &conn_addr) {
1097 Ok(()) => {
1098 }
1104 Err(NotFoundError) => {
1105 }
1109 }
1110 }
1111
1112 let accept_queue_clone = accept_queue.clone();
1114 let ip_sock = ip_sock.clone();
1115 let bindings_ctx_moved = &mut *bindings_ctx;
1116 match socketmap.conns_mut().try_insert_with(addr, sharing, move |addr, sharing| {
1117 let conn = make_connection(
1118 Connection {
1119 accept_queue: Some(accept_queue_clone),
1120 state,
1121 ip_sock,
1122 defunct: false,
1123 soft_error: None,
1124 handshake_status: HandshakeStatus::Pending,
1125 },
1126 addr,
1127 );
1128
1129 let (id, primary) = TcpSocketId::new_cyclic(
1130 |weak| {
1131 let mut timer = CC::new_timer(bindings_ctx_moved, weak);
1132 assert_eq!(
1136 bindings_ctx_moved.schedule_timer_instant(poll_send_at, &mut timer),
1137 None
1138 );
1139 TcpSocketStateInner::Bound(BoundSocketState::Connected {
1140 conn,
1141 sharing,
1142 timer,
1143 })
1144 },
1145 socket_options,
1146 );
1147 (make_demux_id(id.clone()), (primary, id))
1148 }) {
1149 Ok((_entry, (primary, id))) => {
1150 accept_queue.push_pending(id);
1153 Some(primary)
1154 }
1155 Err((e, _sharing_state)) => {
1156 assert_matches!(e, InsertError::Exists);
1161 None
1166 }
1167 }
1168 });
1169
1170 match new_socket {
1171 Some(new_socket) => ListenerIncomingSegmentDisposition::NewConnection(new_socket),
1172 None => {
1173 core_ctx
1176 .increment_both(listener_id, |counters| &counters.failed_connection_attempts);
1177 return ListenerIncomingSegmentDisposition::ConflictingConnection;
1178 }
1179 }
1180 } else {
1181 ListenerIncomingSegmentDisposition::FoundSocket
1184 };
1185
1186 if let Some(seg) = reply {
1188 socket::send_tcp_segment(
1189 core_ctx,
1190 bindings_ctx,
1191 Some(&listener_id),
1192 Some(&ip_sock),
1193 incoming_addrs,
1194 seg.into_empty(),
1195 &socket_options.ip_options,
1196 );
1197 }
1198
1199 result
1200}
1201
1202pub(super) fn tcp_serialize_segment<'a, I, P>(
1203 header: &'a SegmentHeader,
1204 data: P,
1205 conn_addr: ConnIpAddr<I::Addr, NonZeroU16, NonZeroU16>,
1206) -> impl TransportPacketSerializer<I, Buffer = EmptyBuf> + Debug + 'a
1207where
1208 I: FilterIpExt,
1209 P: InnerPacketBuilder + Debug + Payload + 'a,
1210{
1211 let SegmentHeader { seq, ack, wnd, control, options, push } = header;
1212 let ConnIpAddr { local: (local_ip, local_port), remote: (remote_ip, remote_port) } = conn_addr;
1213 let mut builder = TcpSegmentBuilder::new(
1214 local_ip.addr(),
1215 remote_ip.addr(),
1216 local_port,
1217 remote_port,
1218 (*seq).into(),
1219 ack.map(Into::into),
1220 u16::from(*wnd),
1221 );
1222 builder.psh(*push);
1223 match control {
1224 None => {}
1225 Some(Control::SYN) => builder.syn(true),
1226 Some(Control::FIN) => builder.fin(true),
1227 Some(Control::RST) => builder.rst(true),
1228 }
1229 TcpSegmentBuilderWithOptions::new(builder, options.builder())
1230 .unwrap_or_else(|TcpOptionsTooLongError| {
1231 panic!("Too many TCP options");
1232 })
1233 .wrap_body(data.into_serializer())
1234}
1235
1236fn run_socket_ingress_filter<I, BC, D>(
1237 bindings_ctx: &BC,
1238 incoming_device: &D,
1239 socket_cookie: SocketCookie,
1240 socket_options: &SocketOptions,
1241 header_info: &impl IpHeaderInfo<I>,
1242 tcp_segment: &TcpSegment<&'_ [u8]>,
1243) -> SocketIngressFilterResult
1244where
1245 I: Ip,
1246 BC: TcpBindingsContext<D>,
1247 D: StrongDeviceIdentifier,
1248{
1249 let [ip_prefix, ip_options] = header_info.as_bytes();
1250 let [tcp_prefix, tcp_options, data] = tcp_segment.as_bytes();
1251 let mut slices = [ip_prefix, ip_options, tcp_prefix, tcp_options, data];
1252 let data = FragmentedByteSlice::new(&mut slices);
1253
1254 bindings_ctx.socket_ops_filter().on_ingress(
1255 I::VERSION,
1256 data,
1257 incoming_device,
1258 socket_cookie,
1259 &socket_options.ip_options.marks,
1260 )
1261}
1262
1263#[cfg(test)]
1264mod test {
1265 use ip_test_macro::ip_test;
1266 use netstack3_base::{
1267 HandshakeOptions, Options, ResetOptions, SackBlocks, SegmentOptions, UnscaledWindowSize,
1268 };
1269 use packet::{ParseBuffer as _, Serializer as _};
1270 use packet_formats::tcp::options::TcpOptions as _;
1271 use test_case::test_case;
1272
1273 use super::*;
1274
1275 trait TestIpExt: netstack3_base::testutil::TestIpExt + FilterIpExt {}
1276 impl<T> TestIpExt for T where T: netstack3_base::testutil::TestIpExt + FilterIpExt {}
1277
1278 const SEQ: SeqNum = SeqNum::new(12345);
1279 const ACK: SeqNum = SeqNum::new(67890);
1280 const FAKE_DATA: &'static [u8] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 0];
1281
1282 #[ip_test(I)]
1283 #[test_case(
1284 Segment::syn(SEQ, UnscaledWindowSize::from(u16::MAX),
1285 HandshakeOptions::default()), &[]
1286 ; "syn")]
1287 #[test_case(
1288 Segment::syn(SEQ, UnscaledWindowSize::from(u16::MAX),
1289 HandshakeOptions {
1290 mss: Some(Mss::new(1440).unwrap()),
1291 ..Default::default() }), &[]
1292 ; "syn with mss")]
1293 #[test_case(
1294 Segment::ack(SEQ, ACK, UnscaledWindowSize::from(u16::MAX), SegmentOptions::default()),
1295 &[]; "ack")]
1296 #[test_case(Segment::with_fake_data(SEQ, ACK, FAKE_DATA), FAKE_DATA; "data")]
1297 #[test_case(Segment::new_assert_no_discard(SegmentHeader {
1298 seq: SEQ,
1299 ack: Some(ACK),
1300 push: true,
1301 wnd: UnscaledWindowSize::from(u16::MAX),
1302 ..Default::default()
1303 },
1304 FAKE_DATA
1305 ), FAKE_DATA; "push")]
1306 fn tcp_serialize_segment<I: TestIpExt>(segment: Segment<&[u8]>, expected_body: &[u8]) {
1307 const SOURCE_PORT: NonZeroU16 = NonZeroU16::new(1111).unwrap();
1308 const DEST_PORT: NonZeroU16 = NonZeroU16::new(2222).unwrap();
1309
1310 let (header, data) = segment.into_parts();
1311 let serializer = super::tcp_serialize_segment::<I, _>(
1312 &header,
1313 data,
1314 ConnIpAddr {
1315 local: (SocketIpAddr::try_from(I::TEST_ADDRS.local_ip).unwrap(), SOURCE_PORT),
1316 remote: (SocketIpAddr::try_from(I::TEST_ADDRS.remote_ip).unwrap(), DEST_PORT),
1317 },
1318 );
1319
1320 let mut serialized = serializer.serialize_vec_outer().unwrap().unwrap_b();
1321 let parsed_segment = serialized
1322 .parse_with::<_, TcpSegment<_>>(TcpParseArgs::new(
1323 *I::TEST_ADDRS.remote_ip,
1324 *I::TEST_ADDRS.local_ip,
1325 ))
1326 .expect("is valid segment");
1327
1328 assert_eq!(parsed_segment.src_port(), SOURCE_PORT);
1329 assert_eq!(parsed_segment.dst_port(), DEST_PORT);
1330 assert_eq!(parsed_segment.seq_num(), u32::from(SEQ));
1331 assert_eq!(parsed_segment.psh(), header.push);
1332 assert_eq!(
1333 UnscaledWindowSize::from(parsed_segment.window_size()),
1334 UnscaledWindowSize::from(u16::MAX)
1335 );
1336
1337 let (mss, window_scale, sack_permitted, sack_blocks, timestamp) = match header.options {
1338 Options::Handshake(HandshakeOptions {
1339 mss,
1340 window_scale,
1341 sack_permitted,
1342 timestamp,
1343 }) => (mss, window_scale, sack_permitted, SackBlocks::EMPTY, timestamp),
1344 Options::Segment(SegmentOptions { timestamp, sack_blocks }) => {
1345 (None, None, false, sack_blocks, timestamp)
1346 }
1347 Options::Reset(ResetOptions { timestamp }) => {
1348 (None, None, false, SackBlocks::EMPTY, timestamp)
1349 }
1350 };
1351 assert_eq!(mss.map(|mss| mss.get()), parsed_segment.options().mss());
1352 assert_eq!(window_scale.map(|ws| ws.get()), parsed_segment.options().window_scale());
1353 assert_eq!(sack_permitted, parsed_segment.options().sack_permitted());
1354 assert_eq!(sack_blocks.as_slice(), parsed_segment.options().sack_blocks());
1355 assert_eq!(
1356 timestamp.as_ref().map(Into::into).as_ref(),
1357 parsed_segment.options().timestamp()
1358 );
1359
1360 assert_eq!(parsed_segment.into_body(), expected_body);
1361 }
1362}