Skip to main content

fidl_fuchsia_net_sockets_ext/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Extensions for the fuchsia.sockets FIDL library.
6
7#![warn(
8    missing_docs,
9    unreachable_patterns,
10    clippy::useless_conversion,
11    clippy::redundant_clone,
12    clippy::precedence
13)]
14
15use fidl_fuchsia_net as fnet;
16use fidl_fuchsia_net_ext::{IntoExt, Marks};
17use fidl_fuchsia_net_matchers as fnet_matchers;
18use fidl_fuchsia_net_matchers_ext as fnet_matchers_ext;
19use fidl_fuchsia_net_sockets as fnet_sockets;
20use fidl_fuchsia_net_tcp as fnet_tcp;
21use fidl_fuchsia_net_udp as fnet_udp;
22use futures::{Stream, TryStreamExt as _};
23use net_types::ip::{self, GenericOverIp, Ip, IpInvariant, Ipv4, Ipv6};
24use thiserror::Error;
25
26/// An extension type for [`fnet_sockets::IpSocketMatcher`].
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub enum IpSocketMatcher {
29    /// Matches against the IP version of the socket.
30    Family(ip::IpVersion),
31    /// Matches against the source address of the socket.
32    SrcAddr(fnet_matchers_ext::BoundAddress),
33    /// Matches against the destination address of the socket.
34    DstAddr(fnet_matchers_ext::BoundAddress),
35    /// Matches against transport protocol fields of the socket.
36    Proto(fnet_matchers_ext::SocketTransportProtocol),
37    /// Matches against the (bound, i.e. SO_BINDTODEVICE) interface of the
38    /// socket.
39    BoundInterface(fnet_matchers_ext::BoundInterface),
40    /// Matches against the cookie of the socket (i.e. SO_COOKIE)
41    Cookie(fnet_matchers::SocketCookie),
42    /// Matches against one mark of the socket.
43    Mark(fnet_matchers_ext::MarkInDomain),
44}
45
46/// Errors returned by the conversion from [`fnet_sockets::IpSocketMatcher`]
47/// to [`IpSocketMatcher`].
48#[derive(Debug, PartialEq, Error)]
49pub enum IpSocketMatcherError {
50    /// A union type was unknown.
51    #[error("got unexpected union variant: {0}")]
52    UnknownUnionVariant(u64),
53    /// An error was encountered when converting one of the address matchers.
54    #[error("address matcher conversion failure: {0}")]
55    Address(fnet_matchers_ext::BoundAddressError),
56    /// An error was encountered when converting the transport protocol
57    /// matcher.
58    #[error("protocol matcher conversion failure: {0}")]
59    TransportProtocol(fnet_matchers_ext::SocketTransportProtocolError),
60    /// An error was encountered while converting the interface matcher.
61    #[error("bound interface matcher conversion failure: {0}")]
62    BoundInterface(fnet_matchers_ext::BoundInterfaceError),
63    /// An error was encountered when converting one of the mark matchers.
64    #[error("mark matcher conversion failure: {0}")]
65    Mark(fnet_matchers_ext::MarkInDomainError),
66}
67
68impl TryFrom<fnet_sockets::IpSocketMatcher> for IpSocketMatcher {
69    type Error = IpSocketMatcherError;
70
71    fn try_from(matcher: fnet_sockets::IpSocketMatcher) -> Result<Self, Self::Error> {
72        match matcher {
73            fnet_sockets::IpSocketMatcher::Family(ip_version) => {
74                Ok(Self::Family(ip_version.into_ext()))
75            }
76            fnet_sockets::IpSocketMatcher::SrcAddr(addr) => {
77                Ok(Self::SrcAddr(addr.try_into().map_err(|e| IpSocketMatcherError::Address(e))?))
78            }
79            fnet_sockets::IpSocketMatcher::DstAddr(addr) => {
80                Ok(Self::DstAddr(addr.try_into().map_err(|e| IpSocketMatcherError::Address(e))?))
81            }
82            fnet_sockets::IpSocketMatcher::Proto(proto) => Ok(Self::Proto(
83                proto.try_into().map_err(|e| IpSocketMatcherError::TransportProtocol(e))?,
84            )),
85            fnet_sockets::IpSocketMatcher::BoundInterface(bound_interface) => {
86                Ok(Self::BoundInterface(
87                    bound_interface
88                        .try_into()
89                        .map_err(|e| IpSocketMatcherError::BoundInterface(e))?,
90                ))
91            }
92            fnet_sockets::IpSocketMatcher::Cookie(cookie) => Ok(Self::Cookie(cookie)),
93            fnet_sockets::IpSocketMatcher::Mark(mark) => {
94                Ok(Self::Mark(mark.try_into().map_err(|e| IpSocketMatcherError::Mark(e))?))
95            }
96            fnet_sockets::IpSocketMatcher::__SourceBreaking { unknown_ordinal } => {
97                Err(IpSocketMatcherError::UnknownUnionVariant(unknown_ordinal))
98            }
99        }
100    }
101}
102
103impl From<IpSocketMatcher> for fnet_sockets::IpSocketMatcher {
104    fn from(value: IpSocketMatcher) -> Self {
105        match value {
106            IpSocketMatcher::Family(ip_version) => {
107                fnet_sockets::IpSocketMatcher::Family(ip_version.into_ext())
108            }
109            IpSocketMatcher::SrcAddr(address) => {
110                fnet_sockets::IpSocketMatcher::SrcAddr(address.into())
111            }
112            IpSocketMatcher::DstAddr(address) => {
113                fnet_sockets::IpSocketMatcher::DstAddr(address.into())
114            }
115            IpSocketMatcher::Proto(socket_transport_protocol) => {
116                fnet_sockets::IpSocketMatcher::Proto(socket_transport_protocol.into())
117            }
118            IpSocketMatcher::BoundInterface(mark) => {
119                fnet_sockets::IpSocketMatcher::BoundInterface(mark.into())
120            }
121            IpSocketMatcher::Cookie(socket_cookie) => {
122                fnet_sockets::IpSocketMatcher::Cookie(socket_cookie)
123            }
124            IpSocketMatcher::Mark(mark) => fnet_sockets::IpSocketMatcher::Mark(mark.into()),
125        }
126    }
127}
128
129/// Extension type for [`fnet_sockets::IpSocketState`].
130#[derive(Debug, PartialEq, Eq, Clone)]
131pub enum IpSocketState {
132    /// IPv4 socket state.
133    V4(IpSocketStateSpecific<Ipv4>),
134    /// IPv6 socket state.
135    V6(IpSocketStateSpecific<Ipv6>),
136}
137
138/// Error type for [`IpSocketState`] conversion.
139#[derive(Debug, Error, PartialEq)]
140pub enum IpSocketStateError {
141    /// Missing a required field.
142    #[error("missing field: {0}")]
143    MissingField(&'static str),
144    /// The socket address version does not match the expected version.
145    #[error("version mismatch")]
146    VersionMismatch,
147    /// The transport state is invalid.
148    #[error("transport state error: {0}")]
149    Transport(IpSocketTransportStateError),
150}
151
152impl TryFrom<fnet_sockets::IpSocketState> for IpSocketState {
153    type Error = IpSocketStateError;
154
155    fn try_from(value: fnet_sockets::IpSocketState) -> Result<Self, Self::Error> {
156        fn convert_address<I: Ip>(addr: fnet::IpAddress) -> Result<I::Addr, IpSocketStateError> {
157            I::map_ip::<_, Option<I::Addr>>(
158                IpInvariant(addr.into_ext()),
159                |IpInvariant(addr)| match addr {
160                    net_types::ip::IpAddr::V4(addr) => Some(addr),
161                    _ => None,
162                },
163                |IpInvariant(addr)| match addr {
164                    net_types::ip::IpAddr::V6(addr) => Some(addr),
165                    _ => None,
166                },
167            )
168            .ok_or(IpSocketStateError::VersionMismatch)
169        }
170
171        fn to_ip_socket_specific<I: Ip>(
172            src_addr: Option<fnet::IpAddress>,
173            dst_addr: Option<fnet::IpAddress>,
174            cookie: u64,
175            marks: fnet::Marks,
176            transport: fnet_sockets::IpSocketTransportState,
177        ) -> Result<IpSocketStateSpecific<I>, IpSocketStateError> {
178            let src_addr: Option<I::Addr> = src_addr.map(convert_address::<I>).transpose()?;
179            let dst_addr: Option<I::Addr> = dst_addr.map(convert_address::<I>).transpose()?;
180
181            Ok(IpSocketStateSpecific {
182                src_addr,
183                dst_addr,
184                cookie,
185                marks: marks.into(),
186                transport: transport.try_into().map_err(IpSocketStateError::Transport)?,
187            })
188        }
189
190        let fnet_sockets::IpSocketState {
191            family,
192            src_addr,
193            dst_addr,
194            cookie,
195            marks,
196            transport,
197            __source_breaking,
198        } = value;
199
200        let family = family.ok_or(IpSocketStateError::MissingField("family"))?;
201        let cookie = cookie.ok_or(IpSocketStateError::MissingField("cookie"))?;
202        let marks = marks.ok_or(IpSocketStateError::MissingField("marks"))?;
203        let transport = transport.ok_or(IpSocketStateError::MissingField("transport"))?;
204
205        match family {
206            fnet::IpVersion::V4 => Ok(IpSocketState::V4(to_ip_socket_specific(
207                src_addr, dst_addr, cookie, marks, transport,
208            )?)),
209            fnet::IpVersion::V6 => Ok(IpSocketState::V6(to_ip_socket_specific(
210                src_addr, dst_addr, cookie, marks, transport,
211            )?)),
212        }
213    }
214}
215
216impl From<IpSocketState> for fnet_sockets::IpSocketState {
217    fn from(state: IpSocketState) -> Self {
218        match state {
219            IpSocketState::V4(state) => state.into(),
220            IpSocketState::V6(state) => state.into(),
221        }
222    }
223}
224
225/// Lowest-level socket state information that ensures all fields are for the
226/// same IP version.
227#[derive(Debug, PartialEq, Eq, Clone, GenericOverIp)]
228#[generic_over_ip(I, Ip)]
229pub struct IpSocketStateSpecific<I: Ip> {
230    /// The source address of the socket.
231    pub src_addr: Option<I::Addr>,
232    /// The destination address of the socket.
233    pub dst_addr: Option<I::Addr>,
234    /// The cookie of the socket.
235    pub cookie: u64,
236    /// The marks of the socket.
237    pub marks: Marks,
238    /// The transport state of the socket.
239    pub transport: IpSocketTransportState,
240}
241
242impl<I: Ip> From<IpSocketStateSpecific<I>> for fnet_sockets::IpSocketState {
243    fn from(value: IpSocketStateSpecific<I>) -> Self {
244        let IpSocketStateSpecific { src_addr, dst_addr, cookie, marks, transport } = value;
245
246        fnet_sockets::IpSocketState {
247            family: Some(I::VERSION.into_ext()),
248            src_addr: src_addr.map(|a| net_types::ip::IpAddr::from(a).into_ext()),
249            dst_addr: dst_addr.map(|a| net_types::ip::IpAddr::from(a).into_ext()),
250            cookie: Some(cookie),
251            marks: Some(marks.into()),
252            transport: Some(transport.into()),
253            __source_breaking: fidl::marker::SourceBreaking,
254        }
255    }
256}
257
258/// Extension type for [`fnet_sockets::IpSocketTransportState`].
259#[derive(Debug, PartialEq, Eq, Clone)]
260pub enum IpSocketTransportState {
261    /// TCP socket state.
262    Tcp(IpSocketTcpState),
263    /// UDP socket state.
264    Udp(IpSocketUdpState),
265}
266
267/// Error type for [`IpSocketTransportState`] conversion.
268#[derive(Debug, PartialEq, Error)]
269pub enum IpSocketTransportStateError {
270    /// Error converting a TCP socket state.
271    #[error("tcp validation error: {0}")]
272    Tcp(IpSocketTcpStateError),
273    /// Error converting a UDP socket state.
274    #[error("udp validation error: {0}")]
275    Udp(IpSocketUdpStateError),
276    /// A union type was unknown.
277    #[error("got unexpected union variant: {0}")]
278    UnknownUnionVariant(u64),
279}
280
281impl TryFrom<fnet_sockets::IpSocketTransportState> for IpSocketTransportState {
282    type Error = IpSocketTransportStateError;
283
284    fn try_from(value: fnet_sockets::IpSocketTransportState) -> Result<Self, Self::Error> {
285        match value {
286            fnet_sockets::IpSocketTransportState::Tcp(tcp) => Ok(IpSocketTransportState::Tcp(
287                tcp.try_into().map_err(IpSocketTransportStateError::Tcp)?,
288            )),
289            fnet_sockets::IpSocketTransportState::Udp(udp) => Ok(IpSocketTransportState::Udp(
290                udp.try_into().map_err(IpSocketTransportStateError::Udp)?,
291            )),
292            fnet_sockets::IpSocketTransportState::__SourceBreaking { unknown_ordinal } => {
293                Err(IpSocketTransportStateError::UnknownUnionVariant(unknown_ordinal))
294            }
295        }
296    }
297}
298
299impl From<IpSocketTransportState> for fnet_sockets::IpSocketTransportState {
300    fn from(state: IpSocketTransportState) -> Self {
301        match state {
302            IpSocketTransportState::Tcp(tcp) => {
303                fnet_sockets::IpSocketTransportState::Tcp(tcp.into())
304            }
305            IpSocketTransportState::Udp(udp) => {
306                fnet_sockets::IpSocketTransportState::Udp(udp.into())
307            }
308        }
309    }
310}
311
312/// Extension type for [`fnet_sockets::IpSocketTcpState`].
313#[derive(Debug, PartialEq, Eq, Clone)]
314pub struct IpSocketTcpState {
315    /// The source port of the socket.
316    pub src_port: Option<u16>,
317    /// The destination port of the socket.
318    pub dst_port: Option<u16>,
319    /// The TCP state machine state for the socket.
320    pub state: fnet_tcp::State,
321    /// Extended TCP information if the TCP_INFO extension was requested.
322    pub tcp_info: Option<TcpInfo>,
323}
324
325/// Error type for [`IpSocketTcpState`] conversion.
326#[derive(Debug, PartialEq, Error)]
327pub enum IpSocketTcpStateError {
328    /// Missing a required field.
329    #[error("missing field: {0}")]
330    MissingField(&'static str),
331    /// Error converting a [`TcpInfo`].
332    #[error("tcp info error: {0}")]
333    TcpInfo(TcpInfoError),
334}
335
336impl TryFrom<fnet_sockets::IpSocketTcpState> for IpSocketTcpState {
337    type Error = IpSocketTcpStateError;
338
339    fn try_from(value: fnet_sockets::IpSocketTcpState) -> Result<Self, Self::Error> {
340        let fnet_sockets::IpSocketTcpState {
341            src_port,
342            dst_port,
343            state,
344            tcp_info,
345            __source_breaking,
346        } = value;
347
348        let state = state.ok_or(IpSocketTcpStateError::MissingField("state"))?;
349
350        Ok(IpSocketTcpState {
351            src_port,
352            dst_port,
353            state,
354            tcp_info: tcp_info
355                .map(|t| t.try_into())
356                .transpose()
357                .map_err(|e| IpSocketTcpStateError::TcpInfo(e))?,
358        })
359    }
360}
361
362impl From<IpSocketTcpState> for fnet_sockets::IpSocketTcpState {
363    fn from(state: IpSocketTcpState) -> Self {
364        let IpSocketTcpState { src_port, dst_port, state, tcp_info } = state;
365        fnet_sockets::IpSocketTcpState {
366            src_port,
367            dst_port,
368            state: Some(state),
369            tcp_info: tcp_info.map(Into::into),
370            __source_breaking: fidl::marker::SourceBreaking,
371        }
372    }
373}
374
375/// Extension type for [`fnet_tcp::Info`].
376#[derive(Debug, PartialEq, Eq, Clone)]
377pub struct TcpInfo {
378    /// The state of the TCP connection.
379    pub state: fnet_tcp::State,
380    /// The congestion control state of the TCP connection.
381    pub ca_state: fnet_tcp::CongestionControlState,
382    /// The retransmission timeout of the TCP connection in microseconds.
383    pub rto_usec: Option<u32>,
384    /// The time since the most recent data was sent on the connection in milliseconds.
385    pub tcpi_last_data_sent_msec: Option<u32>,
386    /// The time since the most recent ACK was received in milliseconds.
387    pub tcpi_last_ack_recv_msec: Option<u32>,
388    /// The estimated smoothed roundtrip time in microseconds.
389    pub rtt_usec: Option<u32>,
390    /// The smoothed mean deviation of the roundtrip time in microseconds.
391    pub rtt_var_usec: Option<u32>,
392    /// The sending slow start threshold in segments.
393    pub snd_ssthresh: u32,
394    /// The current sending congestion window in segments.
395    pub snd_cwnd: u32,
396    /// The total number of retransmissions.
397    pub tcpi_total_retrans: u32,
398    /// The total number of segments sent.
399    pub tcpi_segs_out: u64,
400    /// The total number of segments received.
401    pub tcpi_segs_in: u64,
402    /// Whether reordering has been seen on the connection.
403    pub reorder_seen: bool,
404}
405
406/// Error type for [`TcpInfo`] conversion.
407#[derive(Debug, PartialEq, Error)]
408pub enum TcpInfoError {
409    /// Missing a required field.
410    #[error("missing field: {0}")]
411    MissingField(&'static str),
412}
413
414impl TryFrom<fnet_tcp::Info> for TcpInfo {
415    type Error = TcpInfoError;
416
417    fn try_from(value: fnet_tcp::Info) -> Result<Self, Self::Error> {
418        let fnet_tcp::Info {
419            state,
420            ca_state,
421            rto_usec,
422            tcpi_last_data_sent_msec,
423            tcpi_last_ack_recv_msec,
424            rtt_usec,
425            rtt_var_usec,
426            snd_ssthresh,
427            snd_cwnd,
428            tcpi_total_retrans,
429            tcpi_segs_out,
430            tcpi_segs_in,
431            reorder_seen,
432            __source_breaking,
433        } = value;
434
435        Ok(TcpInfo {
436            state: state.ok_or(TcpInfoError::MissingField("state"))?,
437            ca_state: ca_state.ok_or(TcpInfoError::MissingField("ca_state"))?,
438            rto_usec,
439            tcpi_last_data_sent_msec,
440            tcpi_last_ack_recv_msec,
441            rtt_usec,
442            rtt_var_usec,
443            snd_ssthresh: snd_ssthresh.ok_or(TcpInfoError::MissingField("snd_ssthresh"))?,
444            snd_cwnd: snd_cwnd.ok_or(TcpInfoError::MissingField("snd_cwnd"))?,
445            tcpi_total_retrans: tcpi_total_retrans
446                .ok_or(TcpInfoError::MissingField("tcpi_total_retrans"))?,
447            tcpi_segs_out: tcpi_segs_out.ok_or(TcpInfoError::MissingField("tcpi_segs_out"))?,
448            tcpi_segs_in: tcpi_segs_in.ok_or(TcpInfoError::MissingField("tcpi_segs_in"))?,
449            reorder_seen: reorder_seen.ok_or(TcpInfoError::MissingField("reorder_seen"))?,
450        })
451    }
452}
453
454impl From<TcpInfo> for fnet_tcp::Info {
455    fn from(info: TcpInfo) -> Self {
456        let TcpInfo {
457            state,
458            ca_state,
459            rto_usec,
460            tcpi_last_data_sent_msec,
461            tcpi_last_ack_recv_msec,
462            rtt_usec,
463            rtt_var_usec,
464            snd_ssthresh,
465            snd_cwnd,
466            tcpi_total_retrans,
467            tcpi_segs_out,
468            tcpi_segs_in,
469            reorder_seen,
470        } = info;
471        fnet_tcp::Info {
472            state: Some(state),
473            ca_state: Some(ca_state),
474            rto_usec: rto_usec,
475            tcpi_last_data_sent_msec,
476            tcpi_last_ack_recv_msec,
477            rtt_usec: rtt_usec,
478            rtt_var_usec: rtt_var_usec,
479            snd_ssthresh: Some(snd_ssthresh),
480            snd_cwnd: Some(snd_cwnd),
481            tcpi_total_retrans: Some(tcpi_total_retrans),
482            tcpi_segs_out: Some(tcpi_segs_out),
483            tcpi_segs_in: Some(tcpi_segs_in),
484            reorder_seen: Some(reorder_seen),
485            __source_breaking: fidl::marker::SourceBreaking,
486        }
487    }
488}
489
490/// Extension type for [`fnet_sockets::IpSocketUdpState`].
491#[derive(Debug, PartialEq, Eq, Clone)]
492pub struct IpSocketUdpState {
493    /// The source port of the socket.
494    pub src_port: Option<u16>,
495    /// The destination port of the socket.
496    pub dst_port: Option<u16>,
497    /// The UDP pseudo-state machine state for the socket.
498    pub state: fnet_udp::State,
499}
500
501/// Error type for [`IpSocketUdpState`] conversion.
502#[derive(Debug, PartialEq, Error)]
503pub enum IpSocketUdpStateError {
504    /// Missing a required field.
505    #[error("missing field: {0}")]
506    MissingField(&'static str),
507}
508
509impl TryFrom<fnet_sockets::IpSocketUdpState> for IpSocketUdpState {
510    type Error = IpSocketUdpStateError;
511
512    fn try_from(value: fnet_sockets::IpSocketUdpState) -> Result<Self, Self::Error> {
513        let fnet_sockets::IpSocketUdpState { src_port, dst_port, state, __source_breaking } = value;
514
515        let state = state.ok_or(IpSocketUdpStateError::MissingField("state"))?;
516
517        Ok(IpSocketUdpState { src_port, dst_port, state })
518    }
519}
520
521impl From<IpSocketUdpState> for fnet_sockets::IpSocketUdpState {
522    fn from(state: IpSocketUdpState) -> Self {
523        let IpSocketUdpState { src_port, dst_port, state } = state;
524        fnet_sockets::IpSocketUdpState {
525            src_port,
526            dst_port,
527            state: Some(state),
528            __source_breaking: fidl::marker::SourceBreaking,
529        }
530    }
531}
532
533/// Errors returned by [`iterate_ip`]
534#[derive(Debug, Error)]
535pub enum IterateIpError {
536    /// The specified matcher was the first invalid one.
537    #[error("invalid matcher at position {0}")]
538    InvalidMatcher(usize),
539    /// An unknown response was received on the call to `Diagnostics.IterateIp`
540    #[error("unknown ordinal on Diagnostics.IterateIp call: {0}")]
541    UnknownOrdinal(u64),
542    /// A low-level FIDL error was encountered on the call to
543    /// `Diagnostics.IterateIp`.
544    #[error("fidl error during Diagnostics.IterateIp call: {0}")]
545    Fidl(fidl::Error),
546}
547
548impl From<fidl::Error> for IterateIpError {
549    fn from(e: fidl::Error) -> Self {
550        IterateIpError::Fidl(e)
551    }
552}
553
554/// Errors returned by the stream returned from [`iterate_ip`].
555#[derive(Debug, Error)]
556pub enum IpIteratorError {
557    /// The netstack returned an empty batch of sockets
558    #[error("received empty batch of sockets")]
559    EmptyBatch,
560    /// A low-level FIDL error was encountered on the call to
561    /// `Diagnostics.IterateIp`.
562    #[error("fidl error during Diagnostics.IterateIp call: {0}")]
563    Fidl(fidl::Error),
564    /// An error was encountered while converting a socket state.
565    #[error("error converting socket state: {0}")]
566    Conversion(IpSocketStateError),
567}
568
569impl From<fidl::Error> for IpIteratorError {
570    fn from(e: fidl::Error) -> Self {
571        IpIteratorError::Fidl(e)
572    }
573}
574
575/// Send a request to `Diagnostics.IterateIp` and drive the resulting
576/// `IpIterator`.
577///
578/// `IpIterator` returns a series of batches of sockets matching the query, the
579/// returned stream flattens those batches into individual sockets. If an error
580/// is encuontered during iteration, it is returned and iteration halts.
581//
582// TODO(https://github.com/rust-lang/rust/issues/130043): Remove types from the
583// precise capturing clause on the stream.
584pub async fn iterate_ip<M, I>(
585    diagnostics: &fnet_sockets::DiagnosticsProxy,
586    extensions: fnet_sockets::Extensions,
587    matchers: M,
588) -> Result<impl Stream<Item = Result<IpSocketState, IpIteratorError>> + use<M, I>, IterateIpError>
589where
590    M: IntoIterator<Item = I>,
591    I: Into<fnet_sockets::IpSocketMatcher>,
592{
593    let (proxy, server_end) = fidl::endpoints::create_proxy::<fnet_sockets::IpIteratorMarker>();
594    match diagnostics
595        .iterate_ip(
596            server_end,
597            extensions,
598            &matchers.into_iter().map(Into::into).collect::<Vec<_>>()[..],
599        )
600        .await?
601    {
602        fnet_sockets::IterateIpResult::Ok(_empty) => Ok(()),
603        fnet_sockets::IterateIpResult::InvalidMatcher(fnet_sockets::InvalidMatcher { index }) => {
604            Err(IterateIpError::InvalidMatcher(index as usize))
605        }
606        fnet_sockets::IterateIpResult::__SourceBreaking { unknown_ordinal } => {
607            Err(IterateIpError::UnknownOrdinal(unknown_ordinal))
608        }
609    }?;
610
611    Ok(futures::stream::try_unfold((proxy, true), |(proxy, has_more)| async move {
612        if !has_more {
613            return Ok(None);
614        }
615
616        let (batch, has_more) = proxy.next().await?;
617        if batch.is_empty() && has_more {
618            Err(IpIteratorError::EmptyBatch)
619        } else {
620            Ok(Some((
621                futures::stream::iter(
622                    batch
623                        .into_iter()
624                        .map(|s| s.try_into().map_err(|e| IpIteratorError::Conversion(e))),
625                ),
626                (proxy, has_more),
627            )))
628        }
629    })
630    .try_flatten())
631}
632
633/// Errors returned by [`disconnect_ip`]
634#[derive(Debug, Error)]
635pub enum DisconnectIpError {
636    /// The specified matcher was the first invalid one.
637    #[error("invalid matcher at position {0}")]
638    InvalidMatcher(usize),
639    /// Specified matchers would a priori match all sockets.
640    #[error("matchers were unconstrained")]
641    UnconstrainedMatchers,
642    /// An unknown response was received on the call to `Control.DisconnectIp`
643    #[error("unknown ordinal on Control.DisconnectIp call: {0}")]
644    UnknownOrdinal(u64),
645    /// A low-level FIDL error was encountered on the call to
646    /// `Control.DisconnectIp`.
647    #[error("fidl error during Control.DisconnectIp call: {0}")]
648    Fidl(fidl::Error),
649}
650
651/// Send a request to `Control.DisconnectIp` with the provided matchers.
652pub async fn disconnect_ip<M, I>(
653    control: &fnet_sockets::ControlProxy,
654    matchers: M,
655) -> Result<usize, DisconnectIpError>
656where
657    M: IntoIterator<Item = I>,
658    I: Into<fnet_sockets::IpSocketMatcher>,
659{
660    match control
661        .disconnect_ip(&fnet_sockets::ControlDisconnectIpRequest {
662            matchers: Some(matchers.into_iter().map(Into::into).collect()),
663            __source_breaking: fidl::marker::SourceBreaking,
664        })
665        .await
666    {
667        Ok(r) => match r {
668            fnet_sockets::DisconnectIpResult::Ok(fnet_sockets::DisconnectIpResponse {
669                disconnected,
670            }) => {
671                // Unwrap is safe because usize is always at least u32.
672                Ok(disconnected.try_into().unwrap())
673            }
674            fnet_sockets::DisconnectIpResult::InvalidMatcher(fnet_sockets::InvalidMatcher {
675                index,
676            }) => {
677                // Unwrap is safe because usize is always at least u32.
678                Err(DisconnectIpError::InvalidMatcher(index.try_into().unwrap()))
679            }
680            fnet_sockets::DisconnectIpResult::UnconstrainedMatchers(fnet_sockets::Empty) => {
681                Err(DisconnectIpError::UnconstrainedMatchers)
682            }
683            fnet_sockets::DisconnectIpResult::__SourceBreaking { unknown_ordinal } => {
684                Err(DisconnectIpError::UnknownOrdinal(unknown_ordinal))
685            }
686        },
687        Err(e) => Err(DisconnectIpError::Fidl(e)),
688    }
689}
690
691#[cfg(test)]
692mod tests {
693    use super::*;
694
695    use std::num::NonZeroU64;
696
697    use assert_matches::assert_matches;
698    use fidl_fuchsia_net as fnet;
699    use fidl_fuchsia_net_tcp as fnet_tcp;
700    use futures::{FutureExt as _, StreamExt as _, future, pin_mut};
701    use net_declare::{fidl_ip, fidl_subnet, net_ip_v4, net_ip_v6};
702    use test_case::test_case;
703
704    #[test_case(
705        fnet_sockets::IpSocketMatcher::Family(fnet::IpVersion::V4),
706        IpSocketMatcher::Family(ip::IpVersion::V4);
707        "FamilyIpv4"
708    )]
709    #[test_case(
710        fnet_sockets::IpSocketMatcher::Family(fnet::IpVersion::V6),
711        IpSocketMatcher::Family(ip::IpVersion::V6);
712        "FamilyIpv6"
713    )]
714    #[test_case(
715        fnet_sockets::IpSocketMatcher::SrcAddr(fnet_matchers::BoundAddress::Bound(
716            fnet_matchers::Address {
717                matcher: fnet_matchers::AddressMatcherType::Subnet(fidl_subnet!("192.0.2.0/24")),
718                invert: true,
719            }
720        )),
721        IpSocketMatcher::SrcAddr(fnet_matchers_ext::BoundAddress::Bound(
722            fnet_matchers_ext::Address {
723                matcher: fnet_matchers_ext::AddressMatcherType::Subnet(
724                    fnet_matchers_ext::Subnet::try_from(fidl_subnet!("192.0.2.0/24")).unwrap()
725                ),
726                invert: true,
727            }
728        ));
729        "SrcAddr"
730    )]
731    #[test_case(
732        fnet_sockets::IpSocketMatcher::DstAddr(fnet_matchers::BoundAddress::Bound(
733            fnet_matchers::Address {
734                matcher: fnet_matchers::AddressMatcherType::Subnet(fidl_subnet!("2001:db8::/32")),
735                invert: false,
736            }
737        )),
738        IpSocketMatcher::DstAddr(fnet_matchers_ext::BoundAddress::Bound(
739            fnet_matchers_ext::Address {
740                matcher: fnet_matchers_ext::AddressMatcherType::Subnet(
741                    fnet_matchers_ext::Subnet::try_from(fidl_subnet!("2001:db8::/32")).unwrap()
742                ),
743                invert: false,
744            }
745        ));
746        "DstAddr"
747    )]
748    #[test_case(
749        fnet_sockets::IpSocketMatcher::Proto(fnet_matchers::SocketTransportProtocol::Tcp(
750            fnet_matchers::TcpSocket::Empty(fnet_matchers::Empty)
751        )),
752        IpSocketMatcher::Proto(fnet_matchers_ext::SocketTransportProtocol::Tcp(
753            fnet_matchers_ext::TcpSocket::Empty
754        ));
755        "ProtoTcp"
756    )]
757    #[test_case(
758        fnet_sockets::IpSocketMatcher::Proto(fnet_matchers::SocketTransportProtocol::Udp(
759            fnet_matchers::UdpSocket::Empty(fnet_matchers::Empty)
760        )),
761        IpSocketMatcher::Proto(fnet_matchers_ext::SocketTransportProtocol::Udp(
762            fnet_matchers_ext::UdpSocket::Empty
763        ));
764        "ProtoUdp"
765    )]
766    #[test_case(
767        fnet_sockets::IpSocketMatcher::BoundInterface(fnet_matchers::BoundInterface::Unbound(
768            fnet_matchers::Empty
769        )),
770        IpSocketMatcher::BoundInterface(fnet_matchers_ext::BoundInterface::Unbound);
771        "BoundInterfaceUnbound"
772    )]
773    #[test_case(
774        fnet_sockets::IpSocketMatcher::BoundInterface(fnet_matchers::BoundInterface::Bound(
775            fnet_matchers::Interface::Id(1)
776        )),
777        IpSocketMatcher::BoundInterface(fnet_matchers_ext::BoundInterface::Bound(
778            fnet_matchers_ext::Interface::Id(NonZeroU64::new(1).unwrap())
779        ));
780        "BoundInterfaceBound"
781    )]
782    #[test_case(
783        fnet_sockets::IpSocketMatcher::Cookie(fnet_matchers::SocketCookie {
784            cookie: 12345,
785            invert: false,
786        }),
787        IpSocketMatcher::Cookie(fnet_matchers::SocketCookie {
788            cookie: 12345,
789            invert: false,
790        });
791        "Cookie"
792    )]
793    #[test_case(
794        fnet_sockets::IpSocketMatcher::Mark(fnet_matchers::MarkInDomain {
795            domain: fnet::MarkDomain::Mark1,
796            mark: fnet_matchers::Mark::Unmarked(fnet_matchers::Unmarked),
797        }),
798        IpSocketMatcher::Mark(fnet_matchers_ext::MarkInDomain {
799            domain: fnet::MarkDomain::Mark1,
800            mark: fnet_matchers_ext::Mark::Unmarked,
801        });
802        "Mark"
803    )]
804    #[test_case(
805        fnet_sockets::IpSocketMatcher::SrcAddr(fnet_matchers::BoundAddress::Unbound(fnet_matchers::Empty)),
806        IpSocketMatcher::SrcAddr(fnet_matchers_ext::BoundAddress::Unbound);
807        "SrcAddrUnbound"
808    )]
809    #[test_case(
810        fnet_sockets::IpSocketMatcher::DstAddr(fnet_matchers::BoundAddress::Unbound(fnet_matchers::Empty)),
811        IpSocketMatcher::DstAddr(fnet_matchers_ext::BoundAddress::Unbound);
812        "DstAddrUnbound"
813    )]
814    #[test_case(
815        fnet_sockets::IpSocketMatcher::Proto(fnet_matchers::SocketTransportProtocol::Tcp(
816            fnet_matchers::TcpSocket::SrcPort(fnet_matchers::BoundPort::Unbound(fnet_matchers::Empty))
817        )),
818        IpSocketMatcher::Proto(fnet_matchers_ext::SocketTransportProtocol::Tcp(
819            fnet_matchers_ext::TcpSocket::SrcPort(fnet_matchers_ext::BoundPort::Unbound)
820        ));
821        "ProtoTcpSrcPortUnbound"
822    )]
823    #[test_case(
824        fnet_sockets::IpSocketMatcher::Proto(fnet_matchers::SocketTransportProtocol::Tcp(
825            fnet_matchers::TcpSocket::DstPort(fnet_matchers::BoundPort::Unbound(fnet_matchers::Empty))
826        )),
827        IpSocketMatcher::Proto(fnet_matchers_ext::SocketTransportProtocol::Tcp(
828            fnet_matchers_ext::TcpSocket::DstPort(fnet_matchers_ext::BoundPort::Unbound)
829        ));
830        "ProtoTcpDstPortUnbound"
831    )]
832    #[test_case(
833        fnet_sockets::IpSocketMatcher::Proto(fnet_matchers::SocketTransportProtocol::Udp(
834            fnet_matchers::UdpSocket::SrcPort(fnet_matchers::BoundPort::Unbound(fnet_matchers::Empty))
835        )),
836        IpSocketMatcher::Proto(fnet_matchers_ext::SocketTransportProtocol::Udp(
837            fnet_matchers_ext::UdpSocket::SrcPort(fnet_matchers_ext::BoundPort::Unbound)
838        ));
839        "ProtoUdpSrcPortUnbound"
840    )]
841    #[test_case(
842        fnet_sockets::IpSocketMatcher::Proto(fnet_matchers::SocketTransportProtocol::Udp(
843            fnet_matchers::UdpSocket::DstPort(fnet_matchers::BoundPort::Unbound(fnet_matchers::Empty))
844        )),
845        IpSocketMatcher::Proto(fnet_matchers_ext::SocketTransportProtocol::Udp(
846            fnet_matchers_ext::UdpSocket::DstPort(fnet_matchers_ext::BoundPort::Unbound)
847        ));
848        "ProtoUdpDstPortUnbound"
849    )]
850    #[test_case(
851        fnet_tcp::Info {
852            state: Some(fnet_tcp::State::Established),
853            ca_state: Some(fnet_tcp::CongestionControlState::Open),
854            rto_usec: Some(1),
855            tcpi_last_data_sent_msec: Some(2),
856            tcpi_last_ack_recv_msec: Some(3),
857            rtt_usec: Some(4),
858            rtt_var_usec: Some(5),
859            snd_ssthresh: Some(6),
860            snd_cwnd: Some(7),
861            tcpi_total_retrans: Some(8),
862            tcpi_segs_out: Some(9),
863            tcpi_segs_in: Some(10),
864            reorder_seen: Some(true),
865            __source_breaking: fidl::marker::SourceBreaking,
866        },
867        TcpInfo {
868            state: fnet_tcp::State::Established,
869            ca_state: fnet_tcp::CongestionControlState::Open,
870            rto_usec: Some(1),
871            tcpi_last_data_sent_msec: Some(2),
872            tcpi_last_ack_recv_msec: Some(3),
873            rtt_usec: Some(4),
874            rtt_var_usec: Some(5),
875            snd_ssthresh: 6,
876            snd_cwnd: 7,
877            tcpi_total_retrans: 8,
878            tcpi_segs_out: 9,
879            tcpi_segs_in: 10,
880            reorder_seen: true,
881        };
882        "TcpInfo"
883    )]
884    #[test_case(
885        fnet_sockets::IpSocketState {
886            family: Some(fnet::IpVersion::V4),
887            src_addr: Some(fidl_ip!("192.168.1.1")),
888            dst_addr: Some(fidl_ip!("192.168.1.2")),
889            cookie: Some(1234),
890            marks: Some(fnet::Marks {
891                mark_1: Some(1111),
892                mark_2: None,
893                __source_breaking: fidl::marker::SourceBreaking,
894            }),
895            transport: Some(fnet_sockets::IpSocketTransportState::Tcp(
896                fnet_sockets::IpSocketTcpState {
897                    src_port: Some(1111),
898                    dst_port: Some(2222),
899                    state: Some(fnet_tcp::State::Established),
900                    tcp_info: None,
901                    __source_breaking: fidl::marker::SourceBreaking,
902                },
903            )),
904            __source_breaking: fidl::marker::SourceBreaking,
905        },
906        IpSocketState::V4(IpSocketStateSpecific {
907            src_addr: Some(net_ip_v4!("192.168.1.1")),
908            dst_addr: Some(net_ip_v4!("192.168.1.2")),
909            cookie: 1234,
910            marks: fnet::Marks {
911                mark_1: Some(1111),
912                mark_2: None,
913                __source_breaking: fidl::marker::SourceBreaking,
914            }.into(),
915            transport: IpSocketTransportState::Tcp(IpSocketTcpState {
916                src_port: Some(1111),
917                dst_port: Some(2222),
918                state: fnet_tcp::State::Established,
919                tcp_info: None,
920            }),
921        });
922        "IpSocketStateV4"
923    )]
924    #[test_case(
925        fnet_sockets::IpSocketState {
926            family: Some(fnet::IpVersion::V6),
927            src_addr: Some(fidl_ip!("2001:db8::1")),
928            dst_addr: Some(fidl_ip!("2001:db8::2")),
929            cookie: Some(1234),
930            marks: Some(fnet::Marks {
931                mark_1: Some(1111),
932                mark_2: None,
933                __source_breaking: fidl::marker::SourceBreaking,
934            }),
935            transport: Some(fnet_sockets::IpSocketTransportState::Udp(
936                fnet_sockets::IpSocketUdpState {
937                    src_port: Some(3333),
938                    dst_port: Some(4444),
939                    state: Some(fnet_udp::State::Connected),
940                    __source_breaking: fidl::marker::SourceBreaking,
941                },
942            )),
943            __source_breaking: fidl::marker::SourceBreaking,
944        },
945        IpSocketState::V6(IpSocketStateSpecific {
946            src_addr: Some(net_ip_v6!("2001:db8::1")),
947            dst_addr: Some(net_ip_v6!("2001:db8::2")),
948            cookie: 1234,
949            marks: fnet::Marks {
950                mark_1: Some(1111),
951                mark_2: None,
952                __source_breaking: fidl::marker::SourceBreaking,
953            }.into(),
954            transport: IpSocketTransportState::Udp(IpSocketUdpState {
955                src_port: Some(3333),
956                dst_port: Some(4444),
957                state: fnet_udp::State::Connected,
958            }),
959        });
960        "IpSocketStateV6"
961    )]
962    fn convert_from_fidl_and_back<F, E>(fidl_type: F, local_type: E)
963    where
964        E: TryFrom<F> + Clone + std::fmt::Debug + PartialEq,
965        <E as TryFrom<F>>::Error: std::fmt::Debug + PartialEq,
966        F: From<E> + Clone + std::fmt::Debug + PartialEq,
967    {
968        assert_eq!(fidl_type.clone().try_into(), Ok(local_type.clone()));
969        assert_eq!(<_ as Into<F>>::into(local_type), fidl_type);
970    }
971
972    #[test_case(
973        fnet_sockets::IpSocketMatcher::__SourceBreaking { unknown_ordinal: 100 } =>
974            Err(IpSocketMatcherError::UnknownUnionVariant(100));
975        "UnknownUnionVariant"
976    )]
977    #[test_case(
978        fnet_sockets::IpSocketMatcher::SrcAddr(fnet_matchers::BoundAddress::Bound(
979            fnet_matchers::Address {
980                matcher: fnet_matchers::AddressMatcherType::__SourceBreaking { unknown_ordinal: 100 },
981                invert: false,
982            }
983        )) => Err(IpSocketMatcherError::Address(fnet_matchers_ext::BoundAddressError::Address(
984            fnet_matchers_ext::AddressError::AddressMatcherType(
985                fnet_matchers_ext::AddressMatcherTypeError::UnknownUnionVariant
986            )
987        )));
988        "AddressError"
989    )]
990    #[test_case(
991        fnet_sockets::IpSocketMatcher::Proto(
992            fnet_matchers::SocketTransportProtocol::__SourceBreaking { unknown_ordinal: 100 }
993        ) => Err(IpSocketMatcherError::TransportProtocol(
994            fnet_matchers_ext::SocketTransportProtocolError::UnknownUnionVariant(100)
995        ));
996        "TransportProtocolError"
997    )]
998    #[test_case(
999        fnet_sockets::IpSocketMatcher::BoundInterface(
1000            fnet_matchers::BoundInterface::__SourceBreaking { unknown_ordinal: 100 }
1001        ) => Err(IpSocketMatcherError::BoundInterface(
1002            fnet_matchers_ext::BoundInterfaceError::UnknownUnionVariant(100)
1003        ));
1004        "BoundInterfaceError"
1005    )]
1006    #[test_case(
1007        fnet_sockets::IpSocketMatcher::Mark(fnet_matchers::MarkInDomain {
1008            domain: fnet::MarkDomain::Mark1,
1009            mark: fnet_matchers::Mark::__SourceBreaking { unknown_ordinal: 100 },
1010        }) => Err(IpSocketMatcherError::Mark(
1011            fnet_matchers_ext::MarkInDomainError::Mark(
1012                fnet_matchers_ext::MarkError::UnknownUnionVariant(100)
1013            )
1014        ));
1015        "MarkError"
1016    )]
1017    fn ip_socket_matcher_try_from_error(
1018        fidl: fnet_sockets::IpSocketMatcher,
1019    ) -> Result<IpSocketMatcher, IpSocketMatcherError> {
1020        IpSocketMatcher::try_from(fidl)
1021    }
1022
1023    #[test_case(
1024        fnet_sockets::IpSocketState {
1025            family: None,
1026            src_addr: Some(fidl_ip!("192.168.1.1")),
1027            dst_addr: Some(fidl_ip!("192.168.1.2")),
1028            cookie: Some(1234),
1029            marks: Some(fnet::Marks {
1030                mark_1: Some(1111),
1031                mark_2: None,
1032                __source_breaking: fidl::marker::SourceBreaking,
1033            }),
1034            transport: Some(fnet_sockets::IpSocketTransportState::Tcp(
1035                fnet_sockets::IpSocketTcpState {
1036                    src_port: Some(1111),
1037                    dst_port: Some(2222),
1038                    state: Some(fnet_tcp::State::Established),
1039                    tcp_info: None,
1040                    __source_breaking: fidl::marker::SourceBreaking,
1041                },
1042            )),
1043            __source_breaking: fidl::marker::SourceBreaking,
1044        } => Err(IpSocketStateError::MissingField("family"));
1045        "MissingFamily"
1046    )]
1047    #[test_case(
1048        fnet_sockets::IpSocketState {
1049            family: Some(fnet::IpVersion::V4),
1050            src_addr: Some(fidl_ip!("192.168.1.1")),
1051            dst_addr: Some(fidl_ip!("192.168.1.2")),
1052            cookie: None,
1053            marks: Some(fnet::Marks {
1054                mark_1: Some(1111),
1055                mark_2: None,
1056                __source_breaking: fidl::marker::SourceBreaking,
1057            }),
1058            transport: Some(fnet_sockets::IpSocketTransportState::Tcp(
1059                fnet_sockets::IpSocketTcpState {
1060                    src_port: Some(1111),
1061                    dst_port: Some(2222),
1062                    state: Some(fnet_tcp::State::Established),
1063                    tcp_info: None,
1064                    __source_breaking: fidl::marker::SourceBreaking,
1065                },
1066            )),
1067            __source_breaking: fidl::marker::SourceBreaking,
1068        } => Err(IpSocketStateError::MissingField("cookie"));
1069        "MissingCookie"
1070    )]
1071    #[test_case(
1072        fnet_sockets::IpSocketState {
1073            family: Some(fnet::IpVersion::V4),
1074            src_addr: Some(fidl_ip!("192.168.1.1")),
1075            dst_addr: Some(fidl_ip!("192.168.1.2")),
1076            cookie: Some(1234),
1077            marks: None,
1078            transport: Some(fnet_sockets::IpSocketTransportState::Tcp(
1079                fnet_sockets::IpSocketTcpState {
1080                    src_port: Some(1111),
1081                    dst_port: Some(2222),
1082                    state: Some(fnet_tcp::State::Established),
1083                    tcp_info: None,
1084                    __source_breaking: fidl::marker::SourceBreaking,
1085                },
1086            )),
1087            __source_breaking: fidl::marker::SourceBreaking,
1088        } => Err(IpSocketStateError::MissingField("marks"));
1089        "MissingMarks"
1090    )]
1091    #[test_case(
1092        fnet_sockets::IpSocketState {
1093            family: Some(fnet::IpVersion::V4),
1094            src_addr: Some(fidl_ip!("192.168.1.1")),
1095            dst_addr: Some(fidl_ip!("192.168.1.2")),
1096            cookie: Some(1234),
1097            marks: Some(fnet::Marks {
1098                mark_1: Some(1111),
1099                mark_2: None,
1100                __source_breaking: fidl::marker::SourceBreaking,
1101            }),
1102            transport: None,
1103            __source_breaking: fidl::marker::SourceBreaking,
1104        } => Err(IpSocketStateError::MissingField("transport"));
1105        "MissingTransport"
1106    )]
1107    #[test_case(
1108        fnet_sockets::IpSocketState {
1109            family: Some(fnet::IpVersion::V4),
1110            src_addr: Some(fidl_ip!("192.168.1.1")),
1111            dst_addr: Some(fidl_ip!("2001:db8::2")),
1112            cookie: Some(1234),
1113            marks: Some(fnet::Marks {
1114                mark_1: Some(1111),
1115                mark_2: None,
1116                __source_breaking: fidl::marker::SourceBreaking,
1117            }),
1118            transport: Some(fnet_sockets::IpSocketTransportState::Tcp(
1119                fnet_sockets::IpSocketTcpState {
1120                    src_port: Some(1111),
1121                    dst_port: Some(2222),
1122                    state: Some(fnet_tcp::State::Established),
1123                    tcp_info: None,
1124                    __source_breaking: fidl::marker::SourceBreaking,
1125                },
1126            )),
1127            __source_breaking: fidl::marker::SourceBreaking,
1128        } => Err(IpSocketStateError::VersionMismatch);
1129        "VersionMismatchV4"
1130    )]
1131    #[test_case(
1132        fnet_sockets::IpSocketState {
1133            family: Some(fnet::IpVersion::V6),
1134            src_addr: Some(fidl_ip!("192.168.1.1")),
1135            dst_addr: Some(fidl_ip!("2001:db8::2")),
1136            cookie: Some(1234),
1137            marks: Some(fnet::Marks {
1138                mark_1: Some(1111),
1139                mark_2: None,
1140                __source_breaking: fidl::marker::SourceBreaking,
1141            }),
1142            transport: Some(fnet_sockets::IpSocketTransportState::Tcp(
1143                fnet_sockets::IpSocketTcpState {
1144                    src_port: Some(1111),
1145                    dst_port: Some(2222),
1146                    state: Some(fnet_tcp::State::Established),
1147                    tcp_info: None,
1148                    __source_breaking: fidl::marker::SourceBreaking,
1149                },
1150            )),
1151            __source_breaking: fidl::marker::SourceBreaking,
1152        } => Err(IpSocketStateError::VersionMismatch);
1153        "VersionMismatchV6"
1154    )]
1155    #[test_case(
1156        fnet_sockets::IpSocketState {
1157            family: Some(fnet::IpVersion::V4),
1158            src_addr: Some(fidl_ip!("192.168.1.1")),
1159            dst_addr: Some(fidl_ip!("192.168.1.2")),
1160            cookie: Some(1234),
1161            marks: Some(fnet::Marks {
1162                mark_1: Some(1111),
1163                mark_2: None,
1164                __source_breaking: fidl::marker::SourceBreaking,
1165            }),
1166            transport: Some(fnet_sockets::IpSocketTransportState::Tcp(
1167                fnet_sockets::IpSocketTcpState {
1168                    src_port: Some(1111),
1169                    dst_port: Some(2222),
1170                    state: None,
1171                    tcp_info: None,
1172                    __source_breaking: fidl::marker::SourceBreaking,
1173                },
1174            )),
1175            __source_breaking: fidl::marker::SourceBreaking,
1176        } => Err(IpSocketStateError::Transport(IpSocketTransportStateError::Tcp(
1177                IpSocketTcpStateError::MissingField("state"),
1178        )));
1179        "MissingTcpState"
1180    )]
1181    #[test_case(
1182        fnet_sockets::IpSocketState {
1183            family: Some(fnet::IpVersion::V6),
1184            src_addr: Some(fidl_ip!("2001:db8::1")),
1185            dst_addr: Some(fidl_ip!("2001:db8::2")),
1186            cookie: Some(1234),
1187            marks: Some(fnet::Marks {
1188                mark_1: Some(1111),
1189                mark_2: None,
1190                __source_breaking: fidl::marker::SourceBreaking,
1191            }),
1192            transport: Some(fnet_sockets::IpSocketTransportState::Udp(
1193                fnet_sockets::IpSocketUdpState {
1194                    src_port: Some(3333),
1195                    dst_port: Some(4444),
1196                    state: None,
1197                    __source_breaking: fidl::marker::SourceBreaking,
1198                },
1199            )),
1200            __source_breaking: fidl::marker::SourceBreaking,
1201        } => Err(IpSocketStateError::Transport(IpSocketTransportStateError::Udp(
1202                IpSocketUdpStateError::MissingField("state"),
1203        )));
1204        "MissingUdpState"
1205    )]
1206    fn ip_socket_state_try_from_error(
1207        fidl: fnet_sockets::IpSocketState,
1208    ) -> Result<IpSocketState, IpSocketStateError> {
1209        IpSocketState::try_from(fidl)
1210    }
1211
1212    #[fuchsia_async::run_singlethreaded(test)]
1213    async fn iterate_ip_diagnostics_iterate_ip_error() {
1214        async fn serve_matcher_error(req: fnet_sockets::DiagnosticsRequest) {
1215            match req {
1216                fnet_sockets::DiagnosticsRequest::IterateIp {
1217                    s: _,
1218                    extensions: _,
1219                    matchers: _,
1220                    responder,
1221                } => responder
1222                    .send(&fnet_sockets::IterateIpResult::InvalidMatcher(
1223                        fnet_sockets::InvalidMatcher { index: 0 },
1224                    ))
1225                    .unwrap(),
1226            };
1227        }
1228
1229        let (diagnostics, diagnostics_server_end) =
1230            fidl::endpoints::create_proxy::<fnet_sockets::DiagnosticsMarker>();
1231
1232        let (mut diagnostics_request_stream, _control_handle) =
1233            diagnostics_server_end.into_stream_and_control_handle();
1234        let server_fut = diagnostics_request_stream
1235            .next()
1236            .then(|req| {
1237                serve_matcher_error(req.expect("Request stream ended unexpectedly").unwrap())
1238            })
1239            .fuse();
1240        let client_fut = iterate_ip::<[IpSocketMatcher; 0], _>(
1241            &diagnostics,
1242            fnet_sockets::Extensions::empty(),
1243            [],
1244        );
1245
1246        pin_mut!(server_fut);
1247        pin_mut!(client_fut);
1248
1249        let ((), resp) = future::join(server_fut, client_fut).await;
1250
1251        assert_matches!(
1252            // Discard the stream because it can't be formatted.
1253            resp.map(|_| ()),
1254            Err(IterateIpError::InvalidMatcher(0))
1255        );
1256    }
1257
1258    #[fuchsia_async::run_singlethreaded(test)]
1259    async fn iterate_ip_next_error() {
1260        async fn serve_matcher(req: fnet_sockets::DiagnosticsRequest) {
1261            match req {
1262                fnet_sockets::DiagnosticsRequest::IterateIp {
1263                    s,
1264                    extensions: _,
1265                    matchers: _,
1266                    responder,
1267                } => {
1268                    s.close_with_epitaph(zx_status::Status::PEER_CLOSED).unwrap();
1269                    responder.send(&fnet_sockets::IterateIpResult::Ok(fnet_sockets::Empty)).unwrap()
1270                }
1271            }
1272        }
1273
1274        let (diagnostics, diagnostics_server_end) =
1275            fidl::endpoints::create_proxy::<fnet_sockets::DiagnosticsMarker>();
1276
1277        let (mut diagnostics_request_stream, _control_handle) =
1278            diagnostics_server_end.into_stream_and_control_handle();
1279        let server_fut = diagnostics_request_stream
1280            .next()
1281            .then(|req| serve_matcher(req.expect("Request stream ended unexpectedly").unwrap()))
1282            .fuse();
1283
1284        let client_fut = iterate_ip::<[IpSocketMatcher; 0], _>(
1285            &diagnostics,
1286            fnet_sockets::Extensions::empty(),
1287            [],
1288        );
1289
1290        let ((), resp) = future::join(server_fut, client_fut).await;
1291        let stream = resp.unwrap();
1292        pin_mut!(stream);
1293
1294        assert_matches!(
1295            stream.try_next().await,
1296            Err(IpIteratorError::Fidl(fidl::Error::ClientChannelClosed { .. }))
1297        );
1298    }
1299
1300    #[fuchsia_async::run_singlethreaded(test)]
1301    async fn iterate_ip_empty_batch() {
1302        async fn serve_matcher(req: fnet_sockets::DiagnosticsRequest) {
1303            match req {
1304                fnet_sockets::DiagnosticsRequest::IterateIp {
1305                    s,
1306                    extensions: _,
1307                    matchers: _,
1308                    responder,
1309                } => {
1310                    responder
1311                        .send(&fnet_sockets::IterateIpResult::Ok(fnet_sockets::Empty))
1312                        .unwrap();
1313
1314                    let (mut stream, _control) = s.into_stream_and_control_handle();
1315                    match stream.next().await.unwrap().unwrap() {
1316                        fidl_fuchsia_net_sockets::IpIteratorRequest::Next { responder } => {
1317                            // Send an empty batch but indicate there's more to come.
1318                            responder.send(&[], true).unwrap();
1319                        }
1320                        fidl_fuchsia_net_sockets::IpIteratorRequest::_UnknownMethod { .. } => {
1321                            unreachable!()
1322                        }
1323                    }
1324                }
1325            }
1326        }
1327
1328        let (diagnostics, diagnostics_server_end) =
1329            fidl::endpoints::create_proxy::<fnet_sockets::DiagnosticsMarker>();
1330
1331        let (mut diagnostics_request_stream, _control_handle) =
1332            diagnostics_server_end.into_stream_and_control_handle();
1333        let server_fut = diagnostics_request_stream
1334            .next()
1335            .then(|req| serve_matcher(req.expect("Request stream ended unexpectedly").unwrap()))
1336            .fuse();
1337
1338        let client_fut = async {
1339            let stream = iterate_ip::<[IpSocketMatcher; 0], _>(
1340                &diagnostics,
1341                fnet_sockets::Extensions::empty(),
1342                [],
1343            )
1344            .await
1345            .unwrap();
1346            pin_mut!(stream);
1347            stream.try_next().await
1348        };
1349
1350        let ((), resp) = future::join(server_fut, client_fut).await;
1351        assert_matches!(resp, Err(IpIteratorError::EmptyBatch));
1352    }
1353
1354    #[fuchsia_async::run_singlethreaded(test)]
1355    async fn iterate_ip_success() {
1356        let socket_1 = fnet_sockets::IpSocketState {
1357            family: Some(fnet::IpVersion::V4),
1358            src_addr: Some(fidl_ip!("192.168.1.1")),
1359            dst_addr: Some(fidl_ip!("192.168.1.2")),
1360            cookie: Some(1234),
1361            marks: Some(fnet::Marks {
1362                mark_1: Some(1111),
1363                mark_2: None,
1364                __source_breaking: fidl::marker::SourceBreaking,
1365            }),
1366            transport: Some(fnet_sockets::IpSocketTransportState::Tcp(
1367                fnet_sockets::IpSocketTcpState {
1368                    src_port: Some(1111),
1369                    dst_port: Some(2222),
1370                    state: Some(fnet_tcp::State::Established),
1371                    tcp_info: None,
1372                    __source_breaking: fidl::marker::SourceBreaking,
1373                },
1374            )),
1375            __source_breaking: fidl::marker::SourceBreaking,
1376        };
1377
1378        let socket_2 = fnet_sockets::IpSocketState {
1379            family: Some(fnet::IpVersion::V4),
1380            src_addr: Some(fidl_ip!("192.168.8.1")),
1381            dst_addr: Some(fidl_ip!("192.168.8.2")),
1382            cookie: Some(9876),
1383            marks: Some(fnet::Marks {
1384                mark_1: None,
1385                mark_2: Some(2222),
1386                __source_breaking: fidl::marker::SourceBreaking,
1387            }),
1388            transport: Some(fnet_sockets::IpSocketTransportState::Tcp(
1389                fnet_sockets::IpSocketTcpState {
1390                    src_port: Some(3333),
1391                    dst_port: Some(4444),
1392                    state: Some(fnet_tcp::State::TimeWait),
1393                    tcp_info: None,
1394                    __source_breaking: fidl::marker::SourceBreaking,
1395                },
1396            )),
1397            __source_breaking: fidl::marker::SourceBreaking,
1398        };
1399
1400        let socket_3 = fnet_sockets::IpSocketState {
1401            family: Some(fnet::IpVersion::V6),
1402            src_addr: Some(fidl_ip!("2001:db8::1")),
1403            dst_addr: Some(fidl_ip!("2001:db8::2")),
1404            cookie: Some(5678),
1405            marks: Some(fnet::Marks {
1406                mark_1: None,
1407                mark_2: None,
1408                __source_breaking: fidl::marker::SourceBreaking,
1409            }),
1410            transport: Some(fnet_sockets::IpSocketTransportState::Tcp(
1411                fnet_sockets::IpSocketTcpState {
1412                    src_port: Some(5555),
1413                    dst_port: Some(6666),
1414                    state: Some(fnet_tcp::State::TimeWait),
1415                    tcp_info: None,
1416                    __source_breaking: fidl::marker::SourceBreaking,
1417                },
1418            )),
1419            __source_breaking: fidl::marker::SourceBreaking,
1420        };
1421
1422        let (diagnostics, diagnostics_server_end) =
1423            fidl::endpoints::create_proxy::<fnet_sockets::DiagnosticsMarker>();
1424
1425        let serve_matcher = async |req: fnet_sockets::DiagnosticsRequest| {
1426            let responses = &[vec![socket_1.clone()], vec![socket_2.clone(), socket_3.clone()]];
1427
1428            match req {
1429                fnet_sockets::DiagnosticsRequest::IterateIp {
1430                    s,
1431                    extensions: _,
1432                    matchers: _,
1433                    responder,
1434                } => {
1435                    responder
1436                        .send(&fnet_sockets::IterateIpResult::Ok(fnet_sockets::Empty))
1437                        .unwrap();
1438
1439                    let (mut stream, _control) = s.into_stream_and_control_handle();
1440                    for (i, resp) in responses.iter().enumerate() {
1441                        match stream.next().await.unwrap().unwrap() {
1442                            fidl_fuchsia_net_sockets::IpIteratorRequest::Next { responder } => {
1443                                let has_more = i < responses.len() - 1;
1444                                responder.send(&resp, has_more).unwrap();
1445                            }
1446                            fidl_fuchsia_net_sockets::IpIteratorRequest::_UnknownMethod {
1447                                ..
1448                            } => {
1449                                unreachable!()
1450                            }
1451                        }
1452                    }
1453                }
1454            };
1455        };
1456
1457        let (mut diagnostics_request_stream, _control_handle) =
1458            diagnostics_server_end.into_stream_and_control_handle();
1459
1460        let server_fut = diagnostics_request_stream
1461            .next()
1462            .then(|req| serve_matcher(req.expect("Request stream ended unexpectedly").unwrap()))
1463            .fuse();
1464
1465        let client_fut = async {
1466            iterate_ip::<[IpSocketMatcher; 0], _>(
1467                &diagnostics,
1468                fnet_sockets::Extensions::empty(),
1469                [],
1470            )
1471            .await
1472            .unwrap()
1473            .try_collect::<Vec<_>>()
1474            .await
1475            .unwrap()
1476        };
1477
1478        let ((), sockets) = future::join(server_fut, client_fut).await;
1479        assert_eq!(
1480            sockets,
1481            vec![
1482                socket_1.clone().try_into().unwrap(),
1483                socket_2.clone().try_into().unwrap(),
1484                socket_3.clone().try_into().unwrap()
1485            ]
1486        );
1487    }
1488}