Skip to main content

netlink/
neighbors.rs

1// Copyright 2026 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A module for managing neighbor information by receiving RTM_*NEIGH Netlink
6//! messages and maintaining neighbor table state from Netstack.
7
8use std::collections::{BTreeMap, HashMap, HashSet};
9use std::net::IpAddr;
10use std::num::NonZeroU64;
11
12use crate::Errno;
13use crate::client::InternalClient;
14use crate::logging::{log_debug, log_warn};
15use crate::messaging::Sender;
16use crate::protocol_family::ProtocolFamily;
17use crate::protocol_family::route::NetlinkRoute;
18use crate::util::respond_to_completer;
19use derivative::Derivative;
20use futures::StreamExt as _;
21use futures::channel::oneshot;
22use net_types::ip::IpVersion;
23use netlink_packet_core::{
24    NLM_F_APPEND, NLM_F_CREATE, NLM_F_EXCL, NLM_F_MULTIPART, NLM_F_REPLACE, NetlinkMessage,
25};
26use netlink_packet_route::neighbour::{
27    NeighbourAddress, NeighbourAttribute, NeighbourFlags, NeighbourHeader, NeighbourMessage,
28    NeighbourState,
29};
30use netlink_packet_route::route::RouteType;
31use netlink_packet_route::{AddressFamily, RouteNetlinkMessage};
32use thiserror::Error;
33
34use {
35    fidl_fuchsia_net as fnet, fidl_fuchsia_net_ext as fnet_ext,
36    fidl_fuchsia_net_interfaces_ext as fnet_interfaces_ext,
37    fidl_fuchsia_net_neighbor as fnet_neighbor, fidl_fuchsia_net_neighbor_ext as fnet_neighbor_ext,
38};
39
40/// NetlinkNeighborMessage conversion related errors.
41#[derive(Debug, PartialEq)]
42pub(crate) enum NetlinkNeighborMessageConversionError {
43    /// Interface id could not be downcasted to fit into the expected u32.
44    InvalidInterfaceId(u64),
45}
46
47/// A wrapper type for the netlink_packet_route `NeighbourMessage` to enable conversions
48/// from [`fnet_neighbor_ext::Entry`].
49#[derive(Clone, Debug, Eq, PartialEq)]
50pub(crate) struct NetlinkNeighborMessage(pub(crate) NeighbourMessage);
51
52impl NetlinkNeighborMessage {
53    pub(crate) fn optionally_from(
54        neighbor: fnet_neighbor_ext::Entry,
55    ) -> Option<NetlinkNeighborMessage> {
56        match neighbor.try_into() {
57            Ok(message) => Some(message),
58            Err(NetlinkNeighborMessageConversionError::InvalidInterfaceId(id)) => {
59                log_warn!("Invalid interface id found in neighbor table entry: {}", id);
60                None
61            }
62        }
63    }
64
65    /// Wrap the inner [`NeighbourMessage`] in an [`RtnlMessage::NewNeighbour`].
66    pub(crate) fn into_rtnl_new_neighbor(
67        self,
68        sequence_number: u32,
69        is_dump: bool,
70    ) -> NetlinkMessage<RouteNetlinkMessage> {
71        let NetlinkNeighborMessage(message) = self;
72        let mut msg: NetlinkMessage<RouteNetlinkMessage> =
73            RouteNetlinkMessage::NewNeighbour(message).into();
74        msg.header.sequence_number = sequence_number;
75        if is_dump {
76            msg.header.flags |= NLM_F_MULTIPART;
77        }
78        msg.finalize();
79        msg
80    }
81}
82
83impl TryFrom<fnet_neighbor_ext::Entry> for NetlinkNeighborMessage {
84    type Error = NetlinkNeighborMessageConversionError;
85
86    fn try_from(
87        neighbor: fnet_neighbor_ext::Entry,
88    ) -> Result<NetlinkNeighborMessage, NetlinkNeighborMessageConversionError> {
89        let mut header = NeighbourHeader::default();
90        let fnet_ext::IpAddress(addr) = neighbor.neighbor.into();
91        header.family = match addr {
92            IpAddr::V4(_) => AddressFamily::Inet,
93            IpAddr::V6(_) => AddressFamily::Inet6,
94        };
95        header.ifindex = neighbor.interface.get().try_into().map_err(|_| {
96            NetlinkNeighborMessageConversionError::InvalidInterfaceId(neighbor.interface.get())
97        })?;
98        header.state = match neighbor.state {
99            fnet_neighbor::EntryState::Delay => NeighbourState::Delay,
100            fnet_neighbor::EntryState::Incomplete => NeighbourState::Incomplete,
101            fnet_neighbor::EntryState::Probe => NeighbourState::Probe,
102            fnet_neighbor::EntryState::Reachable => NeighbourState::Reachable,
103            fnet_neighbor::EntryState::Stale => NeighbourState::Stale,
104            fnet_neighbor::EntryState::Static => NeighbourState::Permanent,
105            fnet_neighbor::EntryState::Unreachable => NeighbourState::Failed,
106        };
107        // Unlike Linux, Netstack3 only keeps unicast addresses in its neighbor
108        // tables so there's no need to derive this from the address and/or
109        // interface properties.
110        header.kind = RouteType::Unicast;
111
112        let mut attributes = vec![];
113        attributes.push(NeighbourAttribute::Destination(match addr {
114            IpAddr::V4(addr) => addr.into(),
115            IpAddr::V6(addr) => addr.into(),
116        }));
117        if let Some(mac) = neighbor.mac {
118            attributes.push(NeighbourAttribute::LinkLocalAddress(mac.octets.into()));
119        }
120        // TODO(https://fxbug.dev/488135156): Include the `CacheInfo` attribute
121        // with the last update time set.
122
123        let mut msg = NeighbourMessage::default();
124        msg.header = header;
125        msg.attributes = attributes;
126        Ok(NetlinkNeighborMessage(msg))
127    }
128}
129
130fn neighbor_fidl_ip(
131    family: AddressFamily,
132    address: Option<&NeighbourAddress>,
133) -> Result<fnet::IpAddress, RequestError> {
134    match family {
135        AddressFamily::Inet => match address {
136            Some(NeighbourAddress::Inet(addr)) => Ok(fnet_ext::IpAddress(IpAddr::V4(*addr)).into()),
137            Some(_) => Err(RequestError::AddressFamilyMismatch(family)),
138            None => Err(RequestError::MissingIpAddress),
139        },
140        AddressFamily::Inet6 => match address {
141            Some(NeighbourAddress::Inet6(addr)) => {
142                Ok(fnet_ext::IpAddress(IpAddr::V6(*addr)).into())
143            }
144            Some(_) => Err(RequestError::AddressFamilyMismatch(family)),
145            None => Err(RequestError::MissingIpAddress),
146        },
147        _ => Err(RequestError::InvalidAddressFamily(family)),
148    }
149}
150
151/// Arguments for an RTM_GETNEIGH [`Request`].
152#[derive(Copy, Clone, Debug, PartialEq, Eq)]
153pub(crate) enum GetNeighborArgs {
154    Dump { ip_version: Option<IpVersion>, interface: Option<NonZeroU64> },
155    Get { ip: fnet::IpAddress, interface: NonZeroU64 },
156}
157
158impl GetNeighborArgs {
159    // Attempts to convert a netlink_packet_route `NeighbourMessage` into
160    // `GetNeighborArgs`.
161    pub(crate) fn try_from_rtnl_neighbor(
162        message: &NeighbourMessage,
163        is_dump: bool,
164    ) -> Result<Self, RequestError> {
165        if is_dump {
166            Self::dump_request_from_rtnl_neighbor(message)
167                .inspect_err(|e| log_debug!("{e} in dump neighbors request"))
168        } else {
169            Self::get_request_from_rtnl_neighbor(message)
170                .inspect_err(|e| log_debug!("{e} in get neighbors request"))
171        }
172    }
173
174    fn dump_request_from_rtnl_neighbor(message: &NeighbourMessage) -> Result<Self, RequestError> {
175        let NeighbourHeader { family, flags, .. } = &message.header;
176        if flags.contains(NeighbourFlags::Proxy) {
177            // Netstack3 does not support ARP/NDP proxying.
178            // TODO(https://fxbug.dev/42111873): Support ARP/NDP proxying.
179            log_warn!("unsupported Proxy flag in dump neighbors request");
180            return Err(RequestError::UnsupportedFlags(*flags));
181        }
182        // TODO(https://fxbug.dev/456508664): Support strict validation of dump
183        // requests.
184        let ip_version = match family {
185            AddressFamily::Unspec => None,
186            AddressFamily::Inet => Some(IpVersion::V4),
187            AddressFamily::Inet6 => Some(IpVersion::V6),
188            family => {
189                return Err(RequestError::InvalidAddressFamily(*family));
190            }
191        };
192        // Note that the interface index is pulled from the attribute here,
193        // whereas it's pulled from the header for get requests. This is
194        // intentional, in order to maintain consistency with Linux's behavior.
195        let interface = message
196            .attributes
197            .iter()
198            .find_map(|attr| match attr {
199                NeighbourAttribute::IfIndex(ifindex) => Some(u64::from(*ifindex).try_into()),
200                _ => None,
201            })
202            .transpose()
203            // 0 is treated as a lack of filter.
204            .unwrap_or(None);
205        Ok(GetNeighborArgs::Dump { ip_version, interface })
206    }
207
208    fn get_request_from_rtnl_neighbor(message: &NeighbourMessage) -> Result<Self, RequestError> {
209        let NeighbourHeader { ifindex, family, state, flags, kind } = &message.header;
210        if *state != NeighbourState::None {
211            return Err(RequestError::InvalidState {
212                actual: *state,
213                expected: NeighbourState::None,
214            });
215        }
216        if *kind != RouteType::Unspec {
217            return Err(RequestError::InvalidKind(*kind));
218        }
219        if flags.intersects(!NeighbourFlags::Proxy) {
220            return Err(RequestError::InvalidFlags(*flags));
221        }
222        if flags.contains(NeighbourFlags::Proxy) {
223            // Netstack3 does not support ARP/NDP proxying.
224            // TODO(https://fxbug.dev/42111873): Support ARP/NDP proxying.
225            log_warn!("unsupported Proxy flag in get neighbor request");
226            return Err(RequestError::UnsupportedFlags(*flags));
227        }
228
229        let (address, unsupported) = message.attributes.iter().fold(
230            (None, false),
231            |(address_acc, unsupported_acc), attr| {
232                match attr {
233                    NeighbourAttribute::Destination(addr) => {
234                        // Note: In the event the Destination attribute is
235                        // provided multiple times, keep the first.
236                        (address_acc.or(Some(addr)), unsupported_acc)
237                    }
238                    _ => {
239                        if !unsupported_acc {
240                            // Only log for the first invalid attribute to avoid spamming.
241                            log_warn!(
242                                "unsupported request attribute: {attr:?} in get neighbor\
243                                request; only `DST` is supported"
244                            );
245                        }
246                        (address_acc, true)
247                    }
248                }
249            },
250        );
251        if unsupported {
252            return Err(RequestError::InvalidAttribute);
253        }
254        let ip = neighbor_fidl_ip(*family, address)?;
255        // Note that the interface index is pulled from the header here, whereas
256        // it's pulled from the attribute for dump requests. This is
257        // intentional, in order to maintain consistency with Linux's behavior.
258        let interface =
259            u64::from(*ifindex).try_into().map_err(|_| RequestError::MissingInterface)?;
260        Ok(GetNeighborArgs::Get { ip, interface })
261    }
262}
263
264/// Arguments for an RTM_NEWNEIGH [`Request`].
265#[derive(Copy, Clone, Debug, PartialEq, Eq)]
266pub(crate) enum NewNeighborArgs {
267    CreateStatic { ip: fnet::IpAddress, interface: NonZeroU64, mac: fnet::MacAddress },
268    ProbeExisting { ip: fnet::IpAddress, interface: NonZeroU64 },
269}
270
271impl NewNeighborArgs {
272    // Attempts to convert a netlink_packet_route `NeighbourMessage` into
273    // `NewNeighborArgs`.
274    pub(crate) fn try_from_rtnl_neighbor(
275        message: &NeighbourMessage,
276        netlink_flags: u16,
277    ) -> Result<Self, RequestError> {
278        Self::try_from_rtnl_neighbor_internal(message, netlink_flags)
279            .inspect_err(|e| log_debug!("{e} in new neighbor request"))
280    }
281
282    fn try_from_rtnl_neighbor_internal(
283        message: &NeighbourMessage,
284        netlink_flags: u16,
285    ) -> Result<Self, RequestError> {
286        let NeighbourHeader { ifindex, family, flags, state, .. } = &message.header;
287        if flags.contains(NeighbourFlags::Proxy) {
288            // Netstack3 does not support ARP/NDP proxying.
289            // TODO(https://fxbug.dev/42111873): Support ARP/NDP proxying.
290            log_warn!("unsupported Proxy flag in new neighbor request");
291            return Err(RequestError::UnsupportedFlags(*flags));
292        }
293
294        // Read common attributes required for identifying the neighbor.
295
296        let (ip_addr, ll_addr) =
297            message.attributes.iter().fold((None, None), |acc @ (ip, ll), attr| match attr {
298                // Note: In the event an attribute is provided multiple times,
299                // keep the first value.
300                NeighbourAttribute::Destination(addr) => (ip.or(Some(addr)), ll),
301                NeighbourAttribute::LinkLocalAddress(addr) => (ip, ll.or(Some(addr))),
302                _ => acc,
303            });
304        let ip = neighbor_fidl_ip(*family, ip_addr)?;
305        let interface =
306            u64::from(*ifindex).try_into().map_err(|_| RequestError::MissingInterface)?;
307
308        // Determine the specific operation and check operation-specific
309        // attributes.
310
311        let new_neighbor_flags = NLM_F_CREATE | NLM_F_REPLACE | NLM_F_EXCL | NLM_F_APPEND;
312        let set_flags = netlink_flags & new_neighbor_flags;
313        if set_flags == NLM_F_REPLACE {
314            // If the caller only specified `NLM_F_REPLACE`, the only case
315            // Netstack3 supports is triggering an immediate neighbor probe.
316            if *state != NeighbourState::Probe {
317                return Err(RequestError::InvalidState {
318                    actual: *state,
319                    expected: NeighbourState::Probe,
320                });
321            }
322            // Setting the link address while triggering a neighbor probe is
323            // unsupported.
324            if ll_addr.is_some() {
325                return Err(RequestError::InvalidAttribute);
326            }
327            Ok(NewNeighborArgs::ProbeExisting { interface, ip })
328        } else if set_flags == (NLM_F_CREATE | NLM_F_REPLACE) {
329            // If the caller specified `NLM_F_CREATE`, Netstack3 only supports
330            // addition of static neighbors.
331            if *state != NeighbourState::Permanent {
332                return Err(RequestError::InvalidState {
333                    actual: *state,
334                    expected: NeighbourState::Permanent,
335                });
336            }
337            let mac = ll_addr.ok_or(RequestError::MissingMacAddress).and_then(|mac| {
338                mac.clone()
339                    .try_into()
340                    .map_err(|_| RequestError::InvalidMacAddress)
341                    .map(|octets| fnet::MacAddress { octets })
342            })?;
343            Ok(NewNeighborArgs::CreateStatic { interface, ip, mac })
344        } else {
345            Err(RequestError::UnsupportedOperation)
346        }
347    }
348}
349
350/// Arguments for an RTM_DELNEIGH [`Request`].
351#[derive(Copy, Clone, Debug, PartialEq, Eq)]
352pub(crate) struct DelNeighborArgs {
353    pub(crate) ip: fnet::IpAddress,
354    pub(crate) interface: NonZeroU64,
355}
356
357impl DelNeighborArgs {
358    // Attempts to convert a netlink_packet_route `NeighbourMessage` into
359    // `DelNeighborArgs`.
360    pub(crate) fn try_from_rtnl_neighbor(message: &NeighbourMessage) -> Result<Self, RequestError> {
361        Self::try_from_rtnl_neighbor_internal(message)
362            .inspect_err(|e| log_debug!("{e} in del neighbor request"))
363    }
364
365    fn try_from_rtnl_neighbor_internal(message: &NeighbourMessage) -> Result<Self, RequestError> {
366        let NeighbourHeader { ifindex, family, flags, .. } = &message.header;
367        if flags.contains(NeighbourFlags::Proxy) {
368            // Netstack3 does not support ARP/NDP proxying.
369            // TODO(https://fxbug.dev/42111873): Support ARP/NDP proxying.
370            log_warn!("unsupported Proxy flag in del neighbor request");
371            return Err(RequestError::UnsupportedFlags(*flags));
372        }
373
374        let address = message.attributes.iter().find_map(|attr| match attr {
375            NeighbourAttribute::Destination(addr) => Some(addr),
376            _ => None,
377        });
378        let ip = neighbor_fidl_ip(*family, address)?;
379        let interface =
380            u64::from(*ifindex).try_into().map_err(|_| RequestError::MissingInterface)?;
381        Ok(Self { interface, ip })
382    }
383}
384
385/// [`Request`] arguments associated with neighbors.
386#[derive(Copy, Clone, Debug, PartialEq, Eq)]
387pub(crate) enum NeighborRequestArgs {
388    /// RTM_GETNEIGH
389    Get(GetNeighborArgs),
390    /// RTM_NEWNEIGH
391    New(NewNeighborArgs),
392    /// RTM_DELNEIGH
393    Del(DelNeighborArgs),
394}
395
396/// An error encountered while handling a [`Request`].
397#[derive(Copy, Clone, Debug, PartialEq, Eq, Error)]
398pub(crate) enum RequestError {
399    /// Invalid state in neighbor header.
400    #[error("invalid state; expected={expected:?}, actual={actual:?}")]
401    InvalidState { actual: NeighbourState, expected: NeighbourState },
402    /// Invalid kind in neighbor header.
403    #[error("invalid kind: {0:?}")]
404    InvalidKind(RouteType),
405    /// Invalid flags in neighbor header.
406    #[error("invalid flags: {0:?}")]
407    InvalidFlags(NeighbourFlags),
408    /// Unsupported flags.
409    #[error("unsupported flags: {0:?}")]
410    UnsupportedFlags(NeighbourFlags),
411    /// Invalid address family.
412    #[error("invalid address family: {0:?}")]
413    InvalidAddressFamily(AddressFamily),
414    /// Address family in request header doesn't match family of address.
415    // In practice this should never be encountered:
416    // `NeighbourAddress::parse_with_param` parses the address based on the
417    // address family from the header, and a discrepancy between the expected
418    // and actual address length results in a parsing failure.
419    #[error("address family mismatch; expected={0:?}")]
420    AddressFamilyMismatch(AddressFamily),
421    /// Request doesn't specify required neighbor IP address.
422    #[error("missing required `DST` attribute")]
423    MissingIpAddress,
424    /// Request doesn't specify required neighbor MAC address.
425    #[error("missing required `LLADDR` attribute")]
426    MissingMacAddress,
427    /// Request doesn't specify required interface.
428    #[error("missing required interface")]
429    MissingInterface,
430    /// Request specifies invalid attribute(s).
431    #[error("invalid request attribute")]
432    InvalidAttribute,
433    /// No such neighbor.
434    #[error("no such neighbor")]
435    NeighborNotFound,
436    /// Interface not found.
437    #[error("no such interface")]
438    InterfaceNotFound,
439    /// Invalid neighbor IP address.
440    #[error("invalid neighbor IP address")]
441    InvalidIpAddress,
442    /// Invalid neighbor MAC address.
443    #[error("invalid neighbor MAC address")]
444    InvalidMacAddress,
445    /// Interface not supported.
446    #[error("interface not supported")]
447    InterfaceUnsupported,
448    /// Neighbor link address unknown.
449    #[error("link address unknown")]
450    LinkAddressUnknown,
451    /// Operation not supported.
452    #[error("unsupported operation")]
453    UnsupportedOperation,
454}
455
456impl From<RequestError> for Errno {
457    fn from(value: RequestError) -> Self {
458        match value {
459            RequestError::InvalidState { .. } => Errno::EINVAL,
460            RequestError::InvalidKind(_) => Errno::EINVAL,
461            RequestError::InvalidFlags(_) => Errno::EINVAL,
462            RequestError::UnsupportedFlags(_) => Errno::ENOTSUP,
463            RequestError::InvalidAddressFamily(_) => Errno::EAFNOSUPPORT,
464            RequestError::AddressFamilyMismatch(_) => Errno::EINVAL,
465            RequestError::MissingIpAddress => Errno::EINVAL,
466            RequestError::MissingMacAddress => Errno::EINVAL,
467            RequestError::MissingInterface => Errno::EINVAL,
468            RequestError::InvalidAttribute => Errno::EINVAL,
469            RequestError::NeighborNotFound => Errno::ENOENT,
470            RequestError::InterfaceNotFound => Errno::ENODEV,
471            RequestError::InvalidIpAddress => Errno::EINVAL,
472            RequestError::InvalidMacAddress => Errno::EINVAL,
473            RequestError::InterfaceUnsupported => Errno::ENOTSUP,
474            RequestError::LinkAddressUnknown => Errno::EINVAL,
475            RequestError::UnsupportedOperation => Errno::ENOTSUP,
476        }
477    }
478}
479
480impl From<fnet_neighbor::ControllerError> for RequestError {
481    fn from(value: fnet_neighbor::ControllerError) -> Self {
482        use fnet_neighbor::ControllerError;
483        match value {
484            ControllerError::InterfaceNotFound => RequestError::InterfaceNotFound,
485            ControllerError::InterfaceNotSupported => RequestError::InterfaceUnsupported,
486            ControllerError::InvalidIpAddress => RequestError::InvalidIpAddress,
487            ControllerError::MacAddressNotUnicast => RequestError::InvalidMacAddress,
488            ControllerError::NeighborNotFound => RequestError::NeighborNotFound,
489            ControllerError::LinkAddressUnknown => RequestError::LinkAddressUnknown,
490            ControllerError::__SourceBreaking { unknown_ordinal: e } => {
491                panic!("encountered unknown controller error: {e:?}")
492            }
493        }
494    }
495}
496
497/// Trait abstracting the ability to check if an interface exists.
498pub(crate) trait LookupIfInterfaceExists {
499    /// Returns whether an interface exists at the provided index.
500    fn exists(&self, interface: NonZeroU64) -> bool;
501}
502
503type InterfaceMap = BTreeMap<
504    u64,
505    fnet_interfaces_ext::PropertiesAndState<
506        crate::interfaces::InterfaceState,
507        fnet_interfaces_ext::AllInterest,
508    >,
509>;
510
511impl LookupIfInterfaceExists for InterfaceMap {
512    fn exists(&self, interface: NonZeroU64) -> bool {
513        self.contains_key(&interface.get())
514    }
515}
516
517/// A request associated with neighbors.
518#[derive(Derivative)]
519#[derivative(Debug(bound = ""))]
520pub(crate) struct Request<S: Sender<<NetlinkRoute as ProtocolFamily>::Response>> {
521    /// The resource and operation-specific argument(s) for this request.
522    pub args: NeighborRequestArgs,
523    /// The request's sequence number.
524    ///
525    /// This value will be copied verbatim into any message sent as a result of
526    /// this request.
527    pub sequence_number: u32,
528    /// The client that made the request.
529    pub client: InternalClient<NetlinkRoute, S>,
530    /// A completer that will have the result of the request sent over.
531    pub completer: oneshot::Sender<Result<(), RequestError>>,
532}
533
534/// A subset of `NeighborRequestArgs`, containing only `Request` types that can be pending.
535#[derive(Clone, Debug, PartialEq, Eq)]
536pub(crate) enum PendingNeighborRequestArgs {
537    /// RTM_NEWNEIGH
538    New(NewNeighborArgs),
539    /// RTM_DELNEIGH
540    Del(DelNeighborArgs),
541}
542
543#[derive(Derivative)]
544#[derivative(Debug(bound = ""))]
545pub(crate) struct PendingNeighborRequest<S: Sender<<NetlinkRoute as ProtocolFamily>::Response>> {
546    request_args: PendingNeighborRequestArgs,
547    client: InternalClient<NetlinkRoute, S>,
548    completer: oneshot::Sender<Result<(), RequestError>>,
549}
550
551/// Errors related to handling neighbor events from Netstack.
552#[derive(Debug, Error, PartialEq)]
553pub(crate) enum HandleWatchEventError {
554    /// An event indicated a neighbor was removed that was not previously known.
555    #[error("Netstack reported removal of an unknown neighbor: {0:?}")]
556    UnknownNeighborRemoved(fnet_neighbor_ext::Entry),
557    /// An event indicated a neighbor was changed that was not previously known.
558    #[error("Netstack reported change of an unknown neighbor: {0:?}")]
559    UnknownNeighborChanged(fnet_neighbor_ext::Entry),
560    /// An event indicated a neighbor was added that conflicts with a known
561    /// neighbor.
562    #[error(
563        "Netstack reported addition of a neighbor that already exists: \
564        existing={existing:?}, new={new:?}"
565    )]
566    ConflictingNeighborAdded { existing: fnet_neighbor_ext::Entry, new: fnet_neighbor_ext::Entry },
567    /// An `Existing` or `Idle` event was received after collecting the initial
568    /// neighbors from the event stream.
569    #[error("Netstack reported unexpected event: {0:?}")]
570    UnexpectedEventReceived(fnet_neighbor_ext::Event),
571}
572
573#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
574struct NeighborKey {
575    interface: NonZeroU64,
576    neighbor: fnet::IpAddress,
577}
578
579impl From<&fnet_neighbor_ext::Entry> for NeighborKey {
580    fn from(
581        fnet_neighbor_ext::Entry { interface, neighbor, .. }: &fnet_neighbor_ext::Entry,
582    ) -> NeighborKey {
583        NeighborKey { interface: *interface, neighbor: *neighbor }
584    }
585}
586
587/// Handles asynchronous work related to RTM_*NEIGH messages.
588///
589/// Can respond to RTM_*NEIGH message requests.
590pub(crate) struct NeighborsWorker {
591    neighbor_table: HashMap<NeighborKey, fnet_neighbor_ext::Entry>,
592    neighbors_controller: fnet_neighbor::ControllerProxy,
593}
594
595impl NeighborsWorker {
596    /// Create the Netlink Neighbors Worker.
597    ///
598    /// Panics if the existing neighbors cannot be retrieved from
599    /// `neighbors_view` or if the response contains conflicting neighbors.
600    pub(crate) async fn create(
601        neighbors_view: &fnet_neighbor::ViewProxy,
602        neighbors_controller: fnet_neighbor::ControllerProxy,
603    ) -> (
604        Self,
605        impl futures::Stream<
606            Item = Result<fnet_neighbor_ext::Event, fnet_neighbor_ext::EntryIteratorError>,
607        > + Unpin
608        + 'static,
609    ) {
610        let mut neighbor_event_stream = Box::pin(
611            fnet_neighbor_ext::event_stream_from_view(neighbors_view)
612                .expect("connecting to fuchsia.net.neighbors.View FIDL should succeed"),
613        );
614        let existing_neighbors: HashSet<fnet_neighbor_ext::Entry> =
615            fnet_neighbor_ext::collect_neighbors_until_idle(neighbor_event_stream.by_ref())
616                .await
617                .expect("determining existing neighbors should succeed");
618        let existing_count = existing_neighbors.len();
619        let neighbor_table = existing_neighbors
620            .into_iter()
621            .map(|e| (NeighborKey::from(&e), e))
622            .collect::<HashMap<_, _>>();
623        assert_eq!(
624            neighbor_table.len(),
625            existing_count,
626            "conflicting existing entry in neighbor table"
627        );
628        (Self { neighbor_table, neighbors_controller }, neighbor_event_stream)
629    }
630
631    pub(crate) fn handle_neighbor_watcher_event(
632        &mut self,
633        event: fnet_neighbor_ext::Event,
634    ) -> Result<(), HandleWatchEventError> {
635        match event {
636            fnet_neighbor_ext::Event::Removed(entry) => {
637                match self.neighbor_table.remove(&(&entry).into()) {
638                    Some(_) => Ok(()),
639                    None => Err(HandleWatchEventError::UnknownNeighborRemoved(entry)),
640                }
641            }
642            fnet_neighbor_ext::Event::Added(entry) => {
643                match self.neighbor_table.insert((&entry).into(), entry.clone()) {
644                    Some(existing) => Err(HandleWatchEventError::ConflictingNeighborAdded {
645                        existing,
646                        new: entry,
647                    }),
648                    None => Ok(()),
649                }
650            }
651            fnet_neighbor_ext::Event::Changed(entry) => {
652                match self.neighbor_table.insert((&entry).into(), entry.clone()) {
653                    Some(_) => Ok(()),
654                    None => Err(HandleWatchEventError::UnknownNeighborChanged(entry)),
655                }
656            }
657            e @ fnet_neighbor_ext::Event::Existing(_) | e @ fnet_neighbor_ext::Event::Idle => {
658                Err(HandleWatchEventError::UnexpectedEventReceived(e))
659            }
660        }
661    }
662
663    pub(crate) async fn handle_request<S: Sender<<NetlinkRoute as ProtocolFamily>::Response>>(
664        &mut self,
665        Request { args, mut client, sequence_number, completer }: Request<S>,
666        interface_lookup: &impl LookupIfInterfaceExists,
667    ) -> Option<PendingNeighborRequest<S>> {
668        enum HandleResult {
669            Done(Result<(), RequestError>),
670            Pending(PendingNeighborRequestArgs),
671        }
672        let result = match args {
673            NeighborRequestArgs::Get(args) => match args {
674                GetNeighborArgs::Dump { ip_version, interface } => {
675                    self.neighbor_table
676                        .values()
677                        .filter(|n| {
678                            ip_version.map_or(true, |ip_version| match n.neighbor {
679                                fnet::IpAddress::Ipv4(_) => ip_version == IpVersion::V4,
680                                fnet::IpAddress::Ipv6(_) => ip_version == IpVersion::V6,
681                            })
682                        })
683                        .filter(|n| interface.map_or(true, |i| n.interface == i))
684                        .filter_map(|e| NetlinkNeighborMessage::optionally_from(e.clone()))
685                        .for_each(|m| {
686                            client.send_unicast(m.into_rtnl_new_neighbor(sequence_number, true));
687                        });
688                    HandleResult::Done(Ok(()))
689                }
690                GetNeighborArgs::Get { ip, interface } => {
691                    let neighbor = self
692                        .neighbor_table
693                        .get(&NeighborKey { interface, neighbor: ip })
694                        .map(|e| NetlinkNeighborMessage::optionally_from(e.clone()))
695                        .flatten();
696                    match neighbor {
697                        Some(msg) => {
698                            client.send_unicast(msg.into_rtnl_new_neighbor(sequence_number, false));
699                            HandleResult::Done(Ok(()))
700                        }
701                        None => {
702                            let err = if interface_lookup.exists(interface) {
703                                RequestError::NeighborNotFound
704                            } else {
705                                RequestError::InterfaceNotFound
706                            };
707                            HandleResult::Done(Err(err))
708                        }
709                    }
710                }
711            },
712            NeighborRequestArgs::New(args) => match args {
713                args @ NewNeighborArgs::CreateStatic { ip, interface, mac } => {
714                    let response = self
715                        .neighbors_controller
716                        .add_entry(interface.get(), &ip, &mac)
717                        .await
718                        .expect("sent neighbor controller request");
719                    match response {
720                        Ok(_) => HandleResult::Pending(PendingNeighborRequestArgs::New(args)),
721                        Err(e) => HandleResult::Done(Err(e.into())),
722                    }
723                }
724                args @ NewNeighborArgs::ProbeExisting { ip, interface } => {
725                    let response = self
726                        .neighbors_controller
727                        .probe_entry(interface.get(), &ip)
728                        .await
729                        .expect("sent neighbor controller request");
730                    match response {
731                        Ok(_) => HandleResult::Pending(PendingNeighborRequestArgs::New(args)),
732                        Err(e) => HandleResult::Done(Err(e.into())),
733                    }
734                }
735            },
736            NeighborRequestArgs::Del(args @ DelNeighborArgs { interface, ip }) => {
737                let response = self
738                    .neighbors_controller
739                    .remove_entry(interface.get(), &ip)
740                    .await
741                    .expect("sent neighbor controller request");
742                match response {
743                    Ok(_) => HandleResult::Pending(PendingNeighborRequestArgs::Del(args)),
744                    Err(e) => HandleResult::Done(Err(e.into())),
745                }
746            }
747        };
748
749        match result {
750            HandleResult::Done(result) => {
751                log_debug!("handled request {args:?} from {client} with result = {result:?}");
752                respond_to_completer(client, completer, result, args);
753                None
754            }
755            HandleResult::Pending(request_args) => {
756                log_debug!("pending request {args:?} from {client}");
757                Some(PendingNeighborRequest { request_args, client, completer })
758            }
759        }
760    }
761
762    /// Checks whether a `PendingRequest` can be marked completed given the
763    /// current state of the worker. If so, notifies the request's completer and
764    /// returns `None`. If not, returns the `PendingRequest` as `Some`.
765    ///
766    /// TODO(https://fxbug.dev/488124265): Use synchronization primitives to
767    /// more robustly match requests to their corresponding watch events.
768    pub(crate) fn handle_pending_request<S: Sender<<NetlinkRoute as ProtocolFamily>::Response>>(
769        &self,
770        pending_neighbor_request: PendingNeighborRequest<S>,
771    ) -> Option<PendingNeighborRequest<S>> {
772        let PendingNeighborRequest { request_args, client: _, completer: _ } =
773            &pending_neighbor_request;
774
775        let done = match request_args {
776            PendingNeighborRequestArgs::New(NewNeighborArgs::ProbeExisting { ip, interface }) => {
777                // Assuming the `ProbeEntry` call succeeds, this is guaranteed
778                // to complete eventually, despite the fact that Netstack does
779                // not generate a `Changed` event if the neighbor is already in
780                // the expected state.
781                //
782                // If the neighbor was not in the expected state when Netstack
783                // processed the request, then a `Changed` event is generated,
784                // and since this method is called after each event that Netlink
785                // processes, this condition must eventually be true.
786                //
787                // If the neighbor *was* in the expected state when Netstack
788                // processed the request, then no `Changed` event is generated,
789                // but it's necessarily true that the immediately preceding
790                // `Added` or `Changed` event for the neighbor must contain the
791                // expected state.
792                //
793                // Here there are two cases to consider: Netlink has either
794                // already processed that event, or not.
795                //
796                // In the former case, the fact that there cannot have been
797                // intervening events means that this check will succeed on the
798                // call to this method that immediately follows the Controller
799                // request in the event loop.
800                //
801                // In the latter case, the event will be processed in a later
802                // iteration of the event loop, at which point this condition
803                // will become true.
804                self.neighbor_table
805                    .get(&NeighborKey { interface: *interface, neighbor: *ip })
806                    .map_or(false, |entry| entry.state == fnet_neighbor::EntryState::Probe)
807            }
808            PendingNeighborRequestArgs::New(NewNeighborArgs::CreateStatic {
809                ip,
810                interface,
811                mac,
812            }) => {
813                // It's also true here that Netstack does not generate an event
814                // if the neighbor is already in the expected state, but this is
815                // nevertheless guaranteed to complete eventually by the same
816                // logic as above.
817                self.neighbor_table
818                    .get(&NeighborKey { interface: *interface, neighbor: *ip })
819                    .map_or(false, |entry| {
820                        entry.mac.is_some_and(|m| m == *mac)
821                            && entry.state == fnet_neighbor::EntryState::Static
822                    })
823            }
824            PendingNeighborRequestArgs::Del(DelNeighborArgs { ip, interface }) => !self
825                .neighbor_table
826                .contains_key(&NeighborKey { interface: *interface, neighbor: *ip }),
827        };
828
829        if done {
830            log_debug!("completed pending request; req = {pending_neighbor_request:?}");
831            let PendingNeighborRequest { request_args, client, completer } =
832                pending_neighbor_request;
833
834            respond_to_completer(client, completer, Ok(()), request_args);
835            None
836        } else {
837            // Put the pending request back so that it can be handled later.
838            log_debug!("pending request not done yet; req = {pending_neighbor_request:?}");
839            Some(pending_neighbor_request)
840        }
841    }
842}
843
844#[cfg(test)]
845mod tests {
846    use crate::client::ClientTable;
847    use crate::client::testutil::{CLIENT_ID_1, new_fake_client};
848    use crate::interfaces::testutil::FakeInterfacesHandler;
849    use crate::messaging::testutil::FakeSender;
850    use crate::route_eventloop::{
851        EventLoopComponent, EventLoopInputs, EventLoopSpec, EventLoopState, IncludedWorkers,
852        Optional, Required, UnifiedRequest,
853    };
854
855    use super::*;
856
857    use assert_matches::assert_matches;
858    use fidl_fuchsia_net_neighbor::ViewRequest;
859    use fidl_fuchsia_net_neighbor_ext::testutil::EventSpec;
860    use futures::channel::mpsc;
861    use futures::{FutureExt as _, SinkExt as _, TryStreamExt as _, pin_mut};
862    use maplit::hashset;
863    use net_declare::{fidl_ip, std_ip_v4, std_ip_v6};
864    use netlink_packet_core::NetlinkPayload;
865    use netlink_packet_route::neighbour::{NeighbourAddress, NeighbourFlags};
866    use std::collections::HashSet;
867    use test_case::test_case;
868    use {
869        fidl_fuchsia_net as fnet, fidl_fuchsia_net_interfaces as fnet_interfaces,
870        fidl_fuchsia_net_root as fnet_root,
871    };
872
873    fn valid_neighbor_entry() -> fnet_neighbor_ext::Entry {
874        fnet_neighbor_ext::Entry {
875            interface: NonZeroU64::new(1).unwrap(),
876            neighbor: fidl_ip!("192.168.0.1"),
877            state: fnet_neighbor::EntryState::Reachable,
878            mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
879            updated_at: 123456,
880        }
881    }
882
883    #[test]
884    fn netlink_neighbor_message_from_entry_invalid_iface_id() {
885        let entry = fnet_neighbor_ext::Entry {
886            interface: NonZeroU64::new(u64::MAX).unwrap(),
887            ..valid_neighbor_entry()
888        };
889
890        assert_eq!(
891            NetlinkNeighborMessage::try_from(entry),
892            Err(NetlinkNeighborMessageConversionError::InvalidInterfaceId(u64::MAX))
893        );
894    }
895
896    #[test]
897    fn netlink_neighbor_message_from_entry_valid_iface_id() {
898        assert_matches!(
899            NetlinkNeighborMessage::try_from(fnet_neighbor_ext::Entry {
900                interface: NonZeroU64::new(1).unwrap(),
901                ..valid_neighbor_entry()
902            }),
903            Ok(NetlinkNeighborMessage(NeighbourMessage {
904                header: NeighbourHeader { ifindex: 1, .. },
905                ..
906            }))
907        );
908    }
909
910    #[test_case(fnet_neighbor::EntryState::Delay, NeighbourState::Delay; "delay")]
911    #[test_case(fnet_neighbor::EntryState::Incomplete, NeighbourState::Incomplete; "incomplete")]
912    #[test_case(fnet_neighbor::EntryState::Probe, NeighbourState::Probe; "probe")]
913    #[test_case(fnet_neighbor::EntryState::Reachable, NeighbourState::Reachable; "reachable")]
914    #[test_case(fnet_neighbor::EntryState::Stale, NeighbourState::Stale; "stale")]
915    #[test_case(fnet_neighbor::EntryState::Static, NeighbourState::Permanent; "permanent")]
916    #[test_case(fnet_neighbor::EntryState::Unreachable, NeighbourState::Failed; "failed")]
917    fn netlink_neighbor_message_from_entry_state_converted(
918        fidl_state: fnet_neighbor::EntryState,
919        expected: NeighbourState,
920    ) {
921        assert_matches!(
922            NetlinkNeighborMessage::try_from(fnet_neighbor_ext::Entry {
923                state: fidl_state,
924                ..valid_neighbor_entry()
925            }),
926            Ok(NetlinkNeighborMessage(NeighbourMessage {
927                header: NeighbourHeader { state, .. },
928                ..
929            })) if state == expected
930        );
931    }
932
933    #[test]
934    fn netlink_neighbor_message_from_entry_ipv4() {
935        let fidl_entry = fnet_neighbor_ext::Entry {
936            neighbor: fidl_ip!("192.168.0.1"),
937            ..valid_neighbor_entry()
938        };
939        let NetlinkNeighborMessage(message) =
940            fidl_entry.try_into().expect("should be able to convert valid neighbor entry");
941
942        assert_eq!(message.header.family, AddressFamily::Inet);
943        let expected_address: NeighbourAddress = std_ip_v4!("192.168.0.1").into();
944        assert_matches!(
945            &message.attributes[..],
946            [
947                NeighbourAttribute::Destination(address),
948                NeighbourAttribute::LinkLocalAddress(_)
949            ] if *address == expected_address
950        );
951    }
952
953    #[test]
954    fn netlink_neighbor_message_from_entry_ipv6() {
955        let fidl_entry =
956            fnet_neighbor_ext::Entry { neighbor: fidl_ip!("fe80::1"), ..valid_neighbor_entry() };
957        let NetlinkNeighborMessage(message) =
958            fidl_entry.try_into().expect("should be able to convert valid neighbor entry");
959
960        assert_eq!(message.header.family, AddressFamily::Inet6);
961        let expected_address: NeighbourAddress = std_ip_v6!("fe80::1").into();
962        assert_matches!(
963            &message.attributes[..],
964            [
965                NeighbourAttribute::Destination(address),
966                NeighbourAttribute::LinkLocalAddress(_)
967            ] if *address == expected_address
968        );
969    }
970
971    #[test]
972    fn netlink_neighbor_message_from_entry_address_link_local_present() {
973        let fidl_entry = fnet_neighbor_ext::Entry {
974            mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
975            ..valid_neighbor_entry()
976        };
977        let NetlinkNeighborMessage(message) =
978            fidl_entry.try_into().expect("should be able to convert valid neighbor entry");
979
980        assert_matches!(
981            &message.attributes[..],
982            [
983                NeighbourAttribute::Destination(_),
984                NeighbourAttribute::LinkLocalAddress(addr)
985            ] if addr == &[0, 1, 2, 3, 4, 5]
986        );
987    }
988
989    #[test]
990    fn netlink_neighbor_message_from_entry_address_link_local_absent() {
991        let fidl_entry = fnet_neighbor_ext::Entry { mac: None, ..valid_neighbor_entry() };
992        let NetlinkNeighborMessage(message) =
993            fidl_entry.try_into().expect("should be able to convert valid neighbor entry");
994
995        assert_matches!(&message.attributes[..], [NeighbourAttribute::Destination(_)]);
996    }
997
998    #[test]
999    fn netlink_neighbor_message_optionally_from_failure() {
1000        assert_eq!(
1001            NetlinkNeighborMessage::optionally_from(fnet_neighbor_ext::Entry {
1002                interface: NonZeroU64::new(u64::MAX).unwrap(),
1003                ..valid_neighbor_entry()
1004            }),
1005            None
1006        );
1007    }
1008
1009    #[test]
1010    fn netlink_neighbor_message_optionally_from_success() {
1011        let fidl_entry = fnet_neighbor_ext::Entry {
1012            interface: NonZeroU64::new(1).unwrap(),
1013            neighbor: fidl_ip!("192.168.0.1"),
1014            state: fnet_neighbor::EntryState::Reachable,
1015            mac: None,
1016            updated_at: 123456,
1017        };
1018
1019        let mut expected_message = NeighbourMessage::default();
1020        expected_message.header = NeighbourHeader {
1021            ifindex: 1,
1022            family: AddressFamily::Inet,
1023            state: NeighbourState::Reachable,
1024            flags: NeighbourFlags::empty(),
1025            kind: RouteType::Unicast,
1026        };
1027        expected_message.attributes =
1028            vec![NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into())];
1029
1030        assert_eq!(
1031            NetlinkNeighborMessage::optionally_from(fidl_entry),
1032            Some(NetlinkNeighborMessage(expected_message))
1033        );
1034    }
1035
1036    #[test]
1037    fn netlink_neighbor_message_into_rtnl_new_neighbor() {
1038        let message: NetlinkNeighborMessage = valid_neighbor_entry()
1039            .try_into()
1040            .expect("should be able to convert valid neighbor entry");
1041        let NetlinkNeighborMessage(payload) = &message;
1042
1043        let expected_payload =
1044            NetlinkPayload::InnerMessage(RouteNetlinkMessage::NewNeighbour(payload.clone()));
1045
1046        let result = message.clone().into_rtnl_new_neighbor(1, true);
1047        assert_eq!(result.payload, expected_payload);
1048        assert_eq!(result.header.sequence_number, 1);
1049        assert_eq!(result.header.flags & NLM_F_MULTIPART, NLM_F_MULTIPART);
1050
1051        let result = message.into_rtnl_new_neighbor(1, false);
1052        assert_eq!(result.payload, expected_payload);
1053        assert_ne!(result.header.flags & NLM_F_MULTIPART, NLM_F_MULTIPART);
1054    }
1055
1056    #[test]
1057    fn neighbor_keyed_by_interface_and_ip() {
1058        let entry = fnet_neighbor_ext::Entry {
1059            interface: NonZeroU64::new(1).unwrap(),
1060            neighbor: fidl_ip!("192.168.0.1"),
1061            mac: None,
1062            state: fnet_neighbor::EntryState::Reachable,
1063            updated_at: 123456,
1064        };
1065
1066        let same_iface_and_ip = fnet_neighbor_ext::Entry {
1067            mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
1068            state: fnet_neighbor::EntryState::Stale,
1069            updated_at: 654321,
1070            ..entry
1071        };
1072        assert_eq!(NeighborKey::from(&entry), NeighborKey::from(&same_iface_and_ip));
1073
1074        let different_iface =
1075            fnet_neighbor_ext::Entry { interface: NonZeroU64::new(2).unwrap(), ..entry };
1076        assert_ne!(NeighborKey::from(&entry), NeighborKey::from(&different_iface));
1077
1078        let different_ip = fnet_neighbor_ext::Entry { neighbor: fidl_ip!("192.168.0.2"), ..entry };
1079        assert_ne!(NeighborKey::from(&entry), NeighborKey::from(&different_ip));
1080
1081        let different_iface_and_ip = fnet_neighbor_ext::Entry {
1082            interface: NonZeroU64::new(2).unwrap(),
1083            neighbor: fidl_ip!("192.168.0.2"),
1084            ..entry
1085        };
1086        assert_ne!(NeighborKey::from(&entry), NeighborKey::from(&different_iface_and_ip));
1087    }
1088
1089    #[fuchsia::test]
1090    #[should_panic(expected = "determining existing neighbors should succeed")]
1091    async fn neighbors_worker_create_panics_on_view_protocol_error() {
1092        let (controller, _controller_server_end) =
1093            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1094        let (view, view_server_end) = fidl::endpoints::create_proxy::<fnet_neighbor::ViewMarker>();
1095        // Close the channel without responding.
1096        drop(view_server_end);
1097
1098        let (_worker, _remaining) = NeighborsWorker::create(&view, controller).await;
1099    }
1100
1101    #[fuchsia::test]
1102    #[should_panic(expected = "determining existing neighbors should succeed")]
1103    async fn neighbors_worker_create_panics_on_event_stream_error() {
1104        let (controller, _controller_server_end) =
1105            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1106        let (view, view_server_end) = fidl::endpoints::create_proxy::<fnet_neighbor::ViewMarker>();
1107        let mut view_request_stream = view_server_end.into_stream();
1108
1109        let entry_iter_fut = view_request_stream
1110            .next()
1111            .then(|req| {
1112                match req
1113                    .expect("View request_stream unexpectedly ended")
1114                    .expect("failed to receive `OpenEntryIterator` request")
1115                {
1116                    ViewRequest::OpenEntryIterator { it, .. } => {
1117                        // Close the channel without responding.
1118                        drop(it);
1119                        futures::future::ready(())
1120                    }
1121                }
1122            })
1123            .fuse();
1124
1125        let worker_fut = NeighborsWorker::create(&view, controller);
1126
1127        let ((), (_worker, _remaining)) = futures::join!(entry_iter_fut, worker_fut);
1128    }
1129
1130    #[fuchsia::test]
1131    #[should_panic(expected = "conflicting existing entry")]
1132    async fn neighbors_worker_create_panics_on_conflicting_entry() {
1133        let events: Vec<_> = [
1134            // Create two neighbors with the same `NeighborKey` but differing
1135            // fields; truly duplicate entries are ignored.
1136            fnet_neighbor_ext::Entry {
1137                state: fnet_neighbor::EntryState::Reachable,
1138                ..valid_neighbor_entry()
1139            },
1140            fnet_neighbor_ext::Entry {
1141                state: fnet_neighbor::EntryState::Stale,
1142                ..valid_neighbor_entry()
1143            },
1144        ]
1145        .into_iter()
1146        .map(Into::into)
1147        .map(fnet_neighbor::EntryIteratorItem::Existing)
1148        .chain(std::iter::once(fnet_neighbor::EntryIteratorItem::Idle(fnet_neighbor::IdleEvent)))
1149        .collect();
1150        let batches = vec![events];
1151        let (view, server_fut) =
1152            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(batches));
1153
1154        let (controller, _controller_server_end) =
1155            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1156        let worker_fut = NeighborsWorker::create(&view, controller);
1157
1158        let ((), (_worker, _remaining)) = futures::join!(server_fut, worker_fut);
1159    }
1160
1161    #[fuchsia::test]
1162    async fn neighbors_worker_create_success() {
1163        use fnet_neighbor_ext::testutil::EventSpec::*;
1164        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1165            Existing(1),
1166            Existing(2),
1167            Existing(3),
1168            Idle,
1169            Added(4),
1170        ]);
1171        let (view, server_fut) =
1172            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![
1173                events.clone(),
1174            ]));
1175
1176        let (controller, _controller_server_end) =
1177            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1178        let worker_fut = NeighborsWorker::create(&view, controller);
1179
1180        let ((), (worker, event_stream)) = futures::join!(server_fut, worker_fut);
1181
1182        let remaining_events: Vec<_> = event_stream.collect().await;
1183        assert_matches!(
1184            &remaining_events[..],
1185            [
1186                Ok(fnet_neighbor_ext::Event::Added(_)),
1187                Err(fnet_neighbor_ext::EntryIteratorError::Fidl(
1188                    fidl::Error::ClientChannelClosed { .. }
1189                ))
1190            ]
1191        );
1192
1193        for event in events {
1194            match event {
1195                fnet_neighbor::EntryIteratorItem::Existing(fidl_entry) => {
1196                    let entry: fnet_neighbor_ext::Entry = fidl_entry.try_into().unwrap();
1197                    assert_eq!(worker.neighbor_table.get(&(&entry).into()), Some(&entry));
1198                }
1199                _ => {}
1200            }
1201        }
1202    }
1203
1204    #[test_case(
1205        EventSpec::Added(2),
1206        |e| matches!(e, HandleWatchEventError::ConflictingNeighborAdded { .. });
1207        "conflicting added"
1208    )]
1209    #[test_case(
1210        EventSpec::Removed(4),
1211        |e| matches!(e, HandleWatchEventError::UnknownNeighborRemoved(_));
1212        "unknown removed"
1213    )]
1214    #[test_case(
1215        EventSpec::Changed(4),
1216        |e| matches!(e, HandleWatchEventError::UnknownNeighborChanged(_));
1217        "unknown changed"
1218    )]
1219    #[test_case(
1220        EventSpec::Existing(4),
1221        |e| matches!(e, HandleWatchEventError::UnexpectedEventReceived(_));
1222        "existing after initial collection"
1223    )]
1224    #[test_case(
1225        EventSpec::Idle,
1226        |e| matches!(e, HandleWatchEventError::UnexpectedEventReceived(_));
1227        "idle after initial collection"
1228    )]
1229    #[fuchsia::test]
1230    async fn neighbors_worker_handle_watch_event_failure(
1231        spec: EventSpec,
1232        error_matcher: fn(&HandleWatchEventError) -> bool,
1233    ) {
1234        use fnet_neighbor_ext::testutil::EventSpec::*;
1235        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1236            Existing(1),
1237            Existing(2),
1238            Existing(3),
1239            Idle,
1240            spec,
1241        ]);
1242        let (view, server_fut) =
1243            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![
1244                events.clone(),
1245            ]));
1246
1247        let (controller, _controller_server_end) =
1248            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1249        let worker_fut = NeighborsWorker::create(&view, controller);
1250
1251        let ((), (mut worker, event_stream)) = futures::join!(server_fut, worker_fut);
1252
1253        let remaining_events: Vec<_> = event_stream.collect().await;
1254        assert_eq!(remaining_events.len(), 2);
1255        match &remaining_events[0] {
1256            Ok(event) => {
1257                assert_matches!(
1258                    worker.handle_neighbor_watcher_event(event.clone()),
1259                    Err(error) if error_matcher(&error)
1260                );
1261            }
1262            _ => panic!("expected bad event in stream"),
1263        }
1264        match &remaining_events[1] {
1265            Err(fnet_neighbor_ext::EntryIteratorError::Fidl(
1266                fidl::Error::ClientChannelClosed { .. },
1267            )) => {}
1268            _ => panic!("expected PEER_CLOSED error at end of stream"),
1269        }
1270    }
1271
1272    #[fuchsia::test]
1273    async fn neighbors_worker_handle_added_event() {
1274        use fnet_neighbor_ext::testutil::EventSpec::*;
1275        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1276            Existing(1),
1277            Existing(2),
1278            Existing(3),
1279            Idle,
1280            Added(4),
1281        ]);
1282        let (view, server_fut) =
1283            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![
1284                events.clone(),
1285            ]));
1286
1287        let (controller, _controller_server_end) =
1288            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1289        let worker_fut = NeighborsWorker::create(&view, controller);
1290
1291        let ((), (mut worker, event_stream)) = futures::join!(server_fut, worker_fut);
1292
1293        let remaining_events: Vec<_> = event_stream.collect().await;
1294        assert_eq!(remaining_events.len(), 2);
1295        match &remaining_events[0] {
1296            Ok(e @ fnet_neighbor_ext::Event::Added(entry)) => {
1297                let key = NeighborKey::from(entry);
1298                assert_eq!(worker.neighbor_table.get(&key), None);
1299                assert_matches!(worker.handle_neighbor_watcher_event(e.clone()), Ok(_));
1300                assert_eq!(worker.neighbor_table.get(&key), Some(entry));
1301            }
1302            _ => panic!("expected Added event in stream"),
1303        }
1304        match &remaining_events[1] {
1305            Err(fnet_neighbor_ext::EntryIteratorError::Fidl(
1306                fidl::Error::ClientChannelClosed { .. },
1307            )) => {}
1308            _ => panic!("expected PEER_CLOSED error at end of stream"),
1309        }
1310    }
1311
1312    #[fuchsia::test]
1313    async fn neighbors_worker_handle_removed_event() {
1314        use fnet_neighbor_ext::testutil::EventSpec::*;
1315        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1316            Existing(1),
1317            Existing(2),
1318            Existing(3),
1319            Idle,
1320            Removed(2),
1321        ]);
1322        let (view, server_fut) =
1323            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![
1324                events.clone(),
1325            ]));
1326
1327        let (controller, _controller_server_end) =
1328            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1329        let worker_fut = NeighborsWorker::create(&view, controller);
1330
1331        let ((), (mut worker, event_stream)) = futures::join!(server_fut, worker_fut);
1332
1333        let remaining_events: Vec<_> = event_stream.collect().await;
1334        assert_eq!(remaining_events.len(), 2);
1335        match &remaining_events[0] {
1336            Ok(e @ fnet_neighbor_ext::Event::Removed(entry)) => {
1337                let key = NeighborKey::from(entry);
1338                assert_eq!(worker.neighbor_table.get(&key), Some(entry));
1339                assert_matches!(worker.handle_neighbor_watcher_event(e.clone()), Ok(_));
1340                assert_eq!(worker.neighbor_table.get(&key), None);
1341            }
1342            _ => panic!("expected Removed event in stream"),
1343        }
1344        match &remaining_events[1] {
1345            Err(fnet_neighbor_ext::EntryIteratorError::Fidl(
1346                fidl::Error::ClientChannelClosed { .. },
1347            )) => {}
1348            _ => panic!("expected PEER_CLOSED error at end of stream"),
1349        }
1350    }
1351
1352    #[fuchsia::test]
1353    async fn neighbors_worker_handle_changed_event() {
1354        use fnet_neighbor_ext::testutil::EventSpec::*;
1355        let mut events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1356            Existing(1),
1357            Existing(2),
1358            Existing(3),
1359            Idle,
1360            Changed(2),
1361        ]);
1362        match &mut events[1] {
1363            fnet_neighbor::EntryIteratorItem::Existing(entry) => {
1364                entry.updated_at = Some(1234);
1365            }
1366            _ => panic!("expected Existing event in stream"),
1367        }
1368        match &mut events[4] {
1369            fnet_neighbor::EntryIteratorItem::Changed(entry) => {
1370                entry.updated_at = Some(5678);
1371            }
1372            _ => panic!("expected Changed event in stream"),
1373        }
1374
1375        let (view, server_fut) =
1376            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![
1377                events.clone(),
1378            ]));
1379
1380        let (controller, _controller_server_end) =
1381            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1382        let worker_fut = NeighborsWorker::create(&view, controller);
1383
1384        let ((), (mut worker, event_stream)) = futures::join!(server_fut, worker_fut);
1385
1386        let remaining_events: Vec<_> = event_stream.collect().await;
1387        assert_eq!(remaining_events.len(), 2);
1388        match &remaining_events[0] {
1389            Ok(e @ fnet_neighbor_ext::Event::Changed(entry)) => {
1390                let key = NeighborKey::from(entry);
1391                assert_matches!(
1392                    worker.neighbor_table.get(&key),
1393                    Some(fnet_neighbor_ext::Entry { updated_at: 1234, .. })
1394                );
1395                assert_matches!(worker.handle_neighbor_watcher_event(e.clone()), Ok(_));
1396                assert_matches!(
1397                    worker.neighbor_table.get(&key),
1398                    Some(fnet_neighbor_ext::Entry { updated_at: 5678, .. })
1399                );
1400            }
1401            _ => panic!("expected Changed event in stream"),
1402        }
1403        match &remaining_events[1] {
1404            Err(fnet_neighbor_ext::EntryIteratorError::Fidl(
1405                fidl::Error::ClientChannelClosed { .. },
1406            )) => {}
1407            _ => panic!("expected PEER_CLOSED error at end of stream"),
1408        }
1409    }
1410
1411    impl LookupIfInterfaceExists for HashSet<u64> {
1412        fn exists(&self, idx: NonZeroU64) -> bool {
1413            self.contains(&idx.get())
1414        }
1415    }
1416
1417    #[test_case(HashSet::new(), RequestError::InterfaceNotFound; "interface does not exist")]
1418    #[test_case(
1419        hashset!{1}, RequestError::NeighborNotFound;
1420        "interface exists"
1421    )]
1422    #[fuchsia::test]
1423    async fn neighbors_worker_handle_get_neighbor_not_found(
1424        interface_lookup: HashSet<u64>,
1425        expected_error: RequestError,
1426    ) {
1427        let (mut sender_sink, client, _async_work_drain_task) =
1428            new_fake_client(CLIENT_ID_1, vec![]);
1429        let (completer, completer_rcv) = oneshot::channel();
1430        let request = Request {
1431            args: NeighborRequestArgs::Get(GetNeighborArgs::Get {
1432                ip: fidl_ip!("192.168.0.1"),
1433                interface: NonZeroU64::new(1).unwrap(),
1434            }),
1435            sequence_number: 1,
1436            client,
1437            completer,
1438        };
1439
1440        let events: Vec<_> = [
1441            fnet_neighbor_ext::Entry {
1442                interface: NonZeroU64::new(2).unwrap(),
1443                neighbor: fidl_ip!("192.168.0.1"),
1444                ..valid_neighbor_entry()
1445            },
1446            fnet_neighbor_ext::Entry {
1447                interface: NonZeroU64::new(1).unwrap(),
1448                neighbor: fidl_ip!("fe80::2"),
1449                ..valid_neighbor_entry()
1450            },
1451        ]
1452        .into_iter()
1453        .map(Into::into)
1454        .map(fnet_neighbor::EntryIteratorItem::Existing)
1455        .chain(std::iter::once(fnet_neighbor::EntryIteratorItem::Idle(fnet_neighbor::IdleEvent)))
1456        .collect();
1457
1458        let batches = vec![events];
1459        let (view, server_fut) =
1460            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(batches));
1461
1462        let (controller, _controller_server_end) =
1463            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1464        let worker_fut = NeighborsWorker::create(&view, controller);
1465        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
1466
1467        assert_matches!(
1468            worker.handle_request(request, &interface_lookup).await,
1469            None // No pending work expected.
1470        );
1471
1472        let result = completer_rcv.await.expect("completer channel should not be closed");
1473        assert_matches!(result, Err(e) if e == expected_error);
1474        assert_eq!(&sender_sink.take_messages()[..], &[]);
1475    }
1476
1477    #[test_case(
1478        GetNeighborArgs::Dump{ ip_version: None, interface: None },
1479        &[1, 2, 3, 4];
1480        "dump all"
1481    )]
1482    #[test_case(
1483        GetNeighborArgs::Dump{ ip_version: Some(IpVersion::V4), interface: None },
1484        &[1, 3];
1485        "dump ipv4 only"
1486    )]
1487    #[test_case(
1488        GetNeighborArgs::Dump{ ip_version: Some(IpVersion::V6), interface: None },
1489        &[2, 4];
1490        "dump ipv6 only"
1491    )]
1492    #[test_case(
1493        GetNeighborArgs::Dump{
1494            ip_version: Some(IpVersion::V6),
1495            interface: Some(NonZeroU64::new(4).unwrap())
1496        },
1497        &[4];
1498        "dump interface 4 ipv6"
1499    )]
1500    #[test_case(
1501        GetNeighborArgs::Dump{
1502            ip_version: Some(IpVersion::V4),
1503            interface: Some(NonZeroU64::new(4).unwrap())
1504        },
1505        &[];
1506        "dump interface 4 ipv4"
1507    )]
1508    #[test_case(
1509        GetNeighborArgs::Get{ ip: fidl_ip!("192.168.0.1"), interface: NonZeroU64::new(1).unwrap() },
1510        &[1];
1511        "get ipv4"
1512    )]
1513    #[test_case(
1514        GetNeighborArgs::Get{ ip: fidl_ip!("fe80::2"), interface: NonZeroU64::new(2).unwrap() },
1515        &[2];
1516        "get ipv6"
1517    )]
1518    #[fuchsia::test]
1519    async fn neighbors_worker_handle_get_request(
1520        get_args: GetNeighborArgs,
1521        expected_ifindexes: &[u32],
1522    ) {
1523        let (mut sender_sink, client, _async_work_drain_task) =
1524            new_fake_client(CLIENT_ID_1, vec![]);
1525        let (completer, completer_rcv) = oneshot::channel();
1526        let request = Request {
1527            args: NeighborRequestArgs::Get(get_args),
1528            sequence_number: 1,
1529            client,
1530            completer,
1531        };
1532
1533        let events: Vec<_> = [
1534            fnet_neighbor_ext::Entry {
1535                interface: NonZeroU64::new(1).unwrap(),
1536                neighbor: fidl_ip!("192.168.0.1"),
1537                ..valid_neighbor_entry()
1538            },
1539            fnet_neighbor_ext::Entry {
1540                interface: NonZeroU64::new(2).unwrap(),
1541                neighbor: fidl_ip!("fe80::2"),
1542                ..valid_neighbor_entry()
1543            },
1544            fnet_neighbor_ext::Entry {
1545                interface: NonZeroU64::new(3).unwrap(),
1546                neighbor: fidl_ip!("192.168.0.3"),
1547                ..valid_neighbor_entry()
1548            },
1549            fnet_neighbor_ext::Entry {
1550                interface: NonZeroU64::new(4).unwrap(),
1551                neighbor: fidl_ip!("fe80::4"),
1552                ..valid_neighbor_entry()
1553            },
1554        ]
1555        .into_iter()
1556        .map(Into::into)
1557        .map(fnet_neighbor::EntryIteratorItem::Existing)
1558        .chain(std::iter::once(fnet_neighbor::EntryIteratorItem::Idle(fnet_neighbor::IdleEvent)))
1559        .collect();
1560
1561        let batches = vec![events];
1562        let (view, server_fut) =
1563            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(batches));
1564
1565        let (controller, _controller_server_end) =
1566            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1567        let worker_fut = NeighborsWorker::create(&view, controller);
1568        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
1569
1570        assert_matches!(
1571            worker.handle_request(request, &BTreeMap::new()).await,
1572            None // No pending work expected.
1573        );
1574
1575        completer_rcv
1576            .await
1577            .expect("completer channel should not be closed")
1578            .expect("request handling result should be OK");
1579
1580        let mut ifindexes_seen = Vec::new();
1581        for sent_message in sender_sink.take_messages() {
1582            match sent_message.message.payload {
1583                NetlinkPayload::InnerMessage(RouteNetlinkMessage::NewNeighbour(
1584                    NeighbourMessage { header: NeighbourHeader { ifindex, .. }, .. },
1585                )) => {
1586                    ifindexes_seen.push(ifindex);
1587                }
1588                _ => panic!("unexpected message sent"),
1589            }
1590        }
1591        ifindexes_seen.sort();
1592        assert_eq!(&ifindexes_seen[..], expected_ifindexes);
1593    }
1594
1595    #[fuchsia::test]
1596    async fn neighbors_worker_handle_del_request_controller_error() {
1597        let (_sender_sink, client, _async_work_drain_task) = new_fake_client(CLIENT_ID_1, vec![]);
1598        let (completer, completer_rcv) = oneshot::channel();
1599        let request = Request {
1600            args: NeighborRequestArgs::Del(DelNeighborArgs {
1601                ip: fidl_ip!("192.168.0.1"),
1602                interface: NonZeroU64::new(1).unwrap(),
1603            }),
1604            sequence_number: 1,
1605            client,
1606            completer,
1607        };
1608
1609        let events = {
1610            use fnet_neighbor_ext::testutil::EventSpec::*;
1611            fnet_neighbor_ext::testutil::generate_events_from_spec(&[Idle])
1612        };
1613        let (view, server_fut) =
1614            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![events]));
1615        let (controller, mut controller_request_stream) =
1616            fidl::endpoints::create_proxy_and_stream::<fnet_neighbor::ControllerMarker>();
1617        let worker_fut = NeighborsWorker::create(&view, controller);
1618        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
1619
1620        let interfaces = BTreeMap::new();
1621        let handle_request_fut = worker.handle_request(request, &interfaces);
1622        let controller_fut = async {
1623            match controller_request_stream
1624                .next()
1625                .await
1626                .expect("controller stream should not be closed")
1627                .expect("failed to receive controller request")
1628            {
1629                fnet_neighbor::ControllerRequest::RemoveEntry {
1630                    interface: _,
1631                    neighbor: _,
1632                    responder,
1633                } => {
1634                    responder
1635                        .send(Err(fnet_neighbor::ControllerError::InterfaceNotFound))
1636                        .expect("failed to send error response");
1637                }
1638                _ => panic!("unexpected controller request"),
1639            }
1640        };
1641
1642        let (handle_result, ()) = futures::join!(handle_request_fut, controller_fut);
1643        assert_matches!(handle_result, None);
1644
1645        let result = completer_rcv.await.expect("completer channel should not be closed");
1646        assert_matches!(result, Err(RequestError::InterfaceNotFound));
1647    }
1648
1649    #[fuchsia::test]
1650    async fn neighbors_worker_handle_del_request_success() {
1651        let (_sender_sink, client, _async_work_drain_task) = new_fake_client(CLIENT_ID_1, vec![]);
1652        let (completer, _completer_rcv) = oneshot::channel();
1653        let args =
1654            DelNeighborArgs { ip: fidl_ip!("192.168.0.1"), interface: NonZeroU64::new(1).unwrap() };
1655        let request =
1656            Request { args: NeighborRequestArgs::Del(args), sequence_number: 1, client, completer };
1657
1658        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1659            fnet_neighbor_ext::testutil::EventSpec::Idle,
1660        ]);
1661        let batches = vec![events];
1662        let (view, server_fut) =
1663            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(batches));
1664        let (controller, mut controller_request_stream) =
1665            fidl::endpoints::create_proxy_and_stream::<fnet_neighbor::ControllerMarker>();
1666        let worker_fut = NeighborsWorker::create(&view, controller);
1667        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
1668
1669        let interfaces = BTreeMap::new();
1670        let handle_request_fut = worker.handle_request(request, &interfaces);
1671        let controller_fut = async {
1672            match controller_request_stream
1673                .next()
1674                .await
1675                .expect("controller stream should not be closed")
1676                .expect("failed to receive controller request")
1677            {
1678                fnet_neighbor::ControllerRequest::RemoveEntry {
1679                    interface: _,
1680                    neighbor: _,
1681                    responder,
1682                } => {
1683                    responder.send(Ok(())).expect("failed to send success response");
1684                }
1685                _ => panic!("unexpected controller request"),
1686            }
1687        };
1688
1689        let (handle_result, ()) = futures::join!(handle_request_fut, controller_fut);
1690        let pending = handle_result.expect("expected pending request");
1691        assert_eq!(pending.request_args, PendingNeighborRequestArgs::Del(args));
1692    }
1693
1694    #[fuchsia::test]
1695    async fn neighbors_worker_handle_pending_del_request() {
1696        let (_sender_sink, client, _async_work_drain_task) = new_fake_client(CLIENT_ID_1, vec![]);
1697        let (completer, mut completer_rcv) = oneshot::channel();
1698        let args =
1699            DelNeighborArgs { ip: fidl_ip!("192.168.0.1"), interface: NonZeroU64::new(1).unwrap() };
1700        let pending = PendingNeighborRequest {
1701            request_args: PendingNeighborRequestArgs::Del(args),
1702            client,
1703            completer,
1704        };
1705
1706        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1707            fnet_neighbor_ext::testutil::EventSpec::Idle,
1708        ]);
1709        let batches = vec![events];
1710        let (view, server_fut) =
1711            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(batches));
1712        let (controller, _controller_server_end) =
1713            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1714        let worker_fut = NeighborsWorker::create(&view, controller);
1715        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
1716
1717        let key = NeighborKey { interface: args.interface, neighbor: args.ip };
1718        let _ = worker.neighbor_table.insert(
1719            key,
1720            fnet_neighbor_ext::Entry {
1721                interface: args.interface,
1722                neighbor: args.ip,
1723                ..valid_neighbor_entry()
1724            },
1725        );
1726
1727        // Still present, should remain pending.
1728        let pending = worker.handle_pending_request(pending).expect("expected pending");
1729        assert_matches!(completer_rcv.try_recv(), Ok(None)); // Completer still blocked.
1730
1731        // Remove from table, should complete.
1732        let _ = worker.neighbor_table.remove(&key);
1733        assert_matches!(worker.handle_pending_request(pending), None);
1734
1735        let result = completer_rcv.try_recv().expect("completer channel should not be closed");
1736        assert_matches!(result, Some(Ok(())));
1737    }
1738
1739    #[fuchsia::test]
1740    async fn neighbors_worker_handle_create_static_request_controller_error() {
1741        let (_sender_sink, client, _async_work_drain_task) = new_fake_client(CLIENT_ID_1, vec![]);
1742        let (completer, completer_rcv) = oneshot::channel();
1743        let args = NewNeighborArgs::CreateStatic {
1744            ip: fidl_ip!("192.168.0.1"),
1745            interface: NonZeroU64::new(1).unwrap(),
1746            mac: fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] },
1747        };
1748        let request =
1749            Request { args: NeighborRequestArgs::New(args), sequence_number: 1, client, completer };
1750
1751        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1752            fnet_neighbor_ext::testutil::EventSpec::Idle,
1753        ]);
1754        let batches = vec![events];
1755        let (view, server_fut) =
1756            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(batches));
1757        let (controller, mut controller_request_stream) =
1758            fidl::endpoints::create_proxy_and_stream::<fnet_neighbor::ControllerMarker>();
1759        let worker_fut = NeighborsWorker::create(&view, controller);
1760        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
1761
1762        let interfaces = BTreeMap::new();
1763        let handle_request_fut = worker.handle_request(request, &interfaces);
1764        let controller_fut = async {
1765            match controller_request_stream
1766                .next()
1767                .await
1768                .expect("controller stream should not be closed")
1769                .expect("failed to receive controller request")
1770            {
1771                fnet_neighbor::ControllerRequest::AddEntry {
1772                    interface: _,
1773                    neighbor: _,
1774                    mac: _,
1775                    responder,
1776                } => {
1777                    responder
1778                        .send(Err(fnet_neighbor::ControllerError::InterfaceNotFound))
1779                        .expect("failed to send error response");
1780                }
1781                _ => panic!("unexpected controller request"),
1782            }
1783        };
1784
1785        let (handle_result, ()) = futures::join!(handle_request_fut, controller_fut);
1786        assert_matches!(handle_result, None);
1787
1788        let result = completer_rcv.await.expect("completer channel should not be closed");
1789        assert_matches!(result, Err(RequestError::InterfaceNotFound));
1790    }
1791
1792    #[fuchsia::test]
1793    async fn neighbors_worker_handle_create_static_request_success() {
1794        let (_sender_sink, client, _async_work_drain_task) = new_fake_client(CLIENT_ID_1, vec![]);
1795        let (completer, _completer_rcv) = oneshot::channel();
1796        let args = NewNeighborArgs::CreateStatic {
1797            ip: fidl_ip!("192.168.0.1"),
1798            interface: NonZeroU64::new(1).unwrap(),
1799            mac: fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] },
1800        };
1801        let request =
1802            Request { args: NeighborRequestArgs::New(args), sequence_number: 1, client, completer };
1803
1804        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1805            fnet_neighbor_ext::testutil::EventSpec::Idle,
1806        ]);
1807        let (view, server_fut) =
1808            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![events]));
1809        let (controller, mut controller_request_stream) =
1810            fidl::endpoints::create_proxy_and_stream::<fnet_neighbor::ControllerMarker>();
1811        let worker_fut = NeighborsWorker::create(&view, controller);
1812        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
1813
1814        let interfaces = BTreeMap::new();
1815        let handle_request_fut = worker.handle_request(request, &interfaces);
1816        let controller_fut = async {
1817            match controller_request_stream
1818                .next()
1819                .await
1820                .expect("controller stream should not be closed")
1821                .expect("failed to receive controller request")
1822            {
1823                fnet_neighbor::ControllerRequest::AddEntry {
1824                    interface: _,
1825                    neighbor: _,
1826                    mac: _,
1827                    responder,
1828                } => {
1829                    responder.send(Ok(())).expect("failed to send success response");
1830                }
1831                _ => panic!("unexpected controller request"),
1832            }
1833        };
1834
1835        let (handle_result, ()) = futures::join!(handle_request_fut, controller_fut);
1836        let pending = handle_result.expect("expected pending request");
1837        assert_eq!(pending.request_args, PendingNeighborRequestArgs::New(args));
1838    }
1839
1840    #[fuchsia::test]
1841    async fn neighbors_worker_handle_probe_request_controller_error() {
1842        let (_sender_sink, client, _async_work_drain_task) = new_fake_client(CLIENT_ID_1, vec![]);
1843        let (completer, completer_rcv) = oneshot::channel();
1844        let args = NewNeighborArgs::ProbeExisting {
1845            ip: fidl_ip!("192.168.0.1"),
1846            interface: NonZeroU64::new(1).unwrap(),
1847        };
1848        let request =
1849            Request { args: NeighborRequestArgs::New(args), sequence_number: 1, client, completer };
1850
1851        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1852            fnet_neighbor_ext::testutil::EventSpec::Idle,
1853        ]);
1854        let (view, server_fut) =
1855            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![events]));
1856        let (controller, mut controller_request_stream) =
1857            fidl::endpoints::create_proxy_and_stream::<fnet_neighbor::ControllerMarker>();
1858        let worker_fut = NeighborsWorker::create(&view, controller);
1859        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
1860
1861        let interfaces = BTreeMap::new();
1862        let handle_request_fut = worker.handle_request(request, &interfaces);
1863        let controller_fut = async {
1864            match controller_request_stream
1865                .next()
1866                .await
1867                .expect("controller stream should not be closed")
1868                .expect("failed to receive controller request")
1869            {
1870                fnet_neighbor::ControllerRequest::ProbeEntry {
1871                    interface: _,
1872                    neighbor: _,
1873                    responder,
1874                } => {
1875                    responder
1876                        .send(Err(fnet_neighbor::ControllerError::InterfaceNotFound))
1877                        .expect("failed to send error response");
1878                }
1879                _ => panic!("unexpected controller request"),
1880            }
1881        };
1882
1883        let (handle_result, ()) = futures::join!(handle_request_fut, controller_fut);
1884        assert_matches!(handle_result, None);
1885
1886        let result = completer_rcv.await.expect("completer channel should not be closed");
1887        assert_matches!(result, Err(RequestError::InterfaceNotFound));
1888    }
1889
1890    #[fuchsia::test]
1891    async fn neighbors_worker_handle_probe_request_success() {
1892        let (_sender_sink, client, _async_work_drain_task) = new_fake_client(CLIENT_ID_1, vec![]);
1893        let (completer, _completer_rcv) = oneshot::channel();
1894        let args = NewNeighborArgs::ProbeExisting {
1895            ip: fidl_ip!("192.168.0.1"),
1896            interface: NonZeroU64::new(1).unwrap(),
1897        };
1898        let request =
1899            Request { args: NeighborRequestArgs::New(args), sequence_number: 1, client, completer };
1900
1901        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1902            fnet_neighbor_ext::testutil::EventSpec::Idle,
1903        ]);
1904        let (view, server_fut) =
1905            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![events]));
1906        let (controller, mut controller_request_stream) =
1907            fidl::endpoints::create_proxy_and_stream::<fnet_neighbor::ControllerMarker>();
1908        let worker_fut = NeighborsWorker::create(&view, controller);
1909        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
1910
1911        let interfaces = BTreeMap::new();
1912        let handle_request_fut = worker.handle_request(request, &interfaces);
1913        let controller_fut = async {
1914            match controller_request_stream
1915                .next()
1916                .await
1917                .expect("controller stream should not be closed")
1918                .expect("failed to receive controller request")
1919            {
1920                fnet_neighbor::ControllerRequest::ProbeEntry {
1921                    interface: _,
1922                    neighbor: _,
1923                    responder,
1924                } => {
1925                    responder.send(Ok(())).expect("failed to send success response");
1926                }
1927                _ => panic!("unexpected controller request"),
1928            }
1929        };
1930
1931        let (handle_result, ()) = futures::join!(handle_request_fut, controller_fut);
1932        let pending = handle_result.expect("expected pending request");
1933        assert_eq!(pending.request_args, PendingNeighborRequestArgs::New(args));
1934    }
1935
1936    #[fuchsia::test]
1937    async fn neighbors_worker_handle_pending_create_static_request() {
1938        let (_sender_sink, client, _async_work_drain_task) = new_fake_client(CLIENT_ID_1, vec![]);
1939        let (completer, mut completer_rcv) = oneshot::channel();
1940        let neighbor = valid_neighbor_entry();
1941        let args = NewNeighborArgs::CreateStatic {
1942            ip: neighbor.neighbor,
1943            interface: neighbor.interface,
1944            mac: neighbor.mac.unwrap(),
1945        };
1946        let pending = PendingNeighborRequest {
1947            request_args: PendingNeighborRequestArgs::New(args),
1948            client,
1949            completer,
1950        };
1951
1952        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1953            fnet_neighbor_ext::testutil::EventSpec::Idle,
1954        ]);
1955        let (view, server_fut) =
1956            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![events]));
1957        let (controller, _controller_server_end) =
1958            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1959        let worker_fut = NeighborsWorker::create(&view, controller);
1960        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
1961
1962        let key = NeighborKey { interface: neighbor.interface, neighbor: neighbor.neighbor };
1963
1964        // Not present, should remain pending.
1965        let pending = worker.handle_pending_request(pending).expect("expected pending");
1966        assert_matches!(completer_rcv.try_recv(), Ok(None));
1967
1968        // Insert with wrong MAC, should remain pending.
1969        let _ = worker.neighbor_table.insert(
1970            key,
1971            fnet_neighbor_ext::Entry {
1972                mac: Some(fnet::MacAddress { octets: [0, 0, 0, 0, 0, 0] }),
1973                state: fnet_neighbor::EntryState::Static,
1974                ..neighbor
1975            },
1976        );
1977        let pending = worker.handle_pending_request(pending).expect("expected pending");
1978        assert_matches!(completer_rcv.try_recv(), Ok(None));
1979
1980        // Insert with wrong state, should remain pending.
1981        let _ = worker.neighbor_table.insert(
1982            key,
1983            fnet_neighbor_ext::Entry {
1984                mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
1985                state: fnet_neighbor::EntryState::Reachable,
1986                ..neighbor
1987            },
1988        );
1989        let pending = worker.handle_pending_request(pending).expect("expected pending");
1990        assert_matches!(completer_rcv.try_recv(), Ok(None));
1991
1992        // Insert correct entry, should complete.
1993        let _ = worker.neighbor_table.insert(
1994            key,
1995            fnet_neighbor_ext::Entry {
1996                mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
1997                state: fnet_neighbor::EntryState::Static,
1998                ..neighbor
1999            },
2000        );
2001        assert_matches!(worker.handle_pending_request(pending), None);
2002
2003        let result = completer_rcv.try_recv().expect("completer channel should not be closed");
2004        assert_matches!(result, Some(Ok(())));
2005    }
2006
2007    #[fuchsia::test]
2008    async fn neighbors_worker_handle_pending_probe_request() {
2009        let (_sender_sink, client, _async_work_drain_task) = new_fake_client(CLIENT_ID_1, vec![]);
2010        let (completer, mut completer_rcv) = oneshot::channel();
2011        let neighbor = valid_neighbor_entry();
2012        let args =
2013            NewNeighborArgs::ProbeExisting { ip: neighbor.neighbor, interface: neighbor.interface };
2014        let pending = PendingNeighborRequest {
2015            request_args: PendingNeighborRequestArgs::New(args),
2016            client,
2017            completer,
2018        };
2019
2020        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
2021            fnet_neighbor_ext::testutil::EventSpec::Idle,
2022        ]);
2023        let (view, server_fut) =
2024            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![events]));
2025        let (controller, _controller_server_end) =
2026            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
2027        let worker_fut = NeighborsWorker::create(&view, controller);
2028        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
2029
2030        let key = NeighborKey { interface: neighbor.interface, neighbor: neighbor.neighbor };
2031
2032        // Not present, should remain pending.
2033        let pending = worker.handle_pending_request(pending).expect("expected pending");
2034        assert_matches!(completer_rcv.try_recv(), Ok(None));
2035
2036        // Insert with wrong state, should remain pending.
2037        let _ = worker.neighbor_table.insert(
2038            key,
2039            fnet_neighbor_ext::Entry { state: fnet_neighbor::EntryState::Reachable, ..neighbor },
2040        );
2041        let pending = worker.handle_pending_request(pending).expect("expected pending");
2042        assert_matches!(completer_rcv.try_recv(), Ok(None));
2043
2044        // Insert correct entry, should complete.
2045        let _ = worker.neighbor_table.insert(
2046            key,
2047            fnet_neighbor_ext::Entry { state: fnet_neighbor::EntryState::Probe, ..neighbor },
2048        );
2049        assert_matches!(worker.handle_pending_request(pending), None);
2050
2051        let result = completer_rcv.try_recv().expect("completer channel should not be closed");
2052        assert_matches!(result, Some(Ok(())));
2053    }
2054
2055    #[test_case(
2056        false,
2057        NeighbourHeader {
2058            ifindex: 1,
2059            family: AddressFamily::Inet,
2060            ..Default::default()
2061        },
2062        &[
2063            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2064        ] => Ok(GetNeighborArgs::Get {
2065            ip: fidl_ip!("192.168.0.1"),
2066            interface: NonZeroU64::new(1).unwrap(),
2067        });
2068        "get ipv4 success"
2069    )]
2070    #[test_case(
2071        false,
2072        NeighbourHeader {
2073            ifindex: 1,
2074            family: AddressFamily::Inet6,
2075            ..Default::default()
2076        },
2077        &[
2078            NeighbourAttribute::Destination(std_ip_v6!("fe80::1").into()),
2079        ] => Ok(GetNeighborArgs::Get {
2080            ip: fidl_ip!("fe80::1"),
2081            interface: NonZeroU64::new(1).unwrap(),
2082        });
2083        "get ipv6 success"
2084    )]
2085    #[test_case(
2086        false,
2087        NeighbourHeader {
2088            ifindex: 1,
2089            family: AddressFamily::Inet,
2090            state: NeighbourState::Reachable,
2091            ..Default::default()
2092        },
2093        &[
2094            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2095        ] => Err(RequestError::InvalidState {
2096            actual: NeighbourState::Reachable, expected: NeighbourState::None
2097        });
2098        "get invalid request state"
2099    )]
2100    #[test_case(
2101        false,
2102        NeighbourHeader {
2103            ifindex: 1,
2104            family: AddressFamily::Inet,
2105            kind: RouteType::Broadcast,
2106            ..Default::default()
2107        },
2108        &[
2109            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2110        ] => Err(RequestError::InvalidKind(RouteType::Broadcast));
2111        "get invalid request kind"
2112    )]
2113    #[test_case(
2114        false,
2115        NeighbourHeader {
2116            ifindex: 1,
2117            family: AddressFamily::Inet,
2118            flags: NeighbourFlags::Router,
2119            ..Default::default()
2120        },
2121        &[
2122            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2123        ] => Err(RequestError::InvalidFlags(NeighbourFlags::Router));
2124        "get invalid request flag"
2125    )]
2126    #[test_case(
2127        false,
2128        NeighbourHeader {
2129            ifindex: 1,
2130            family: AddressFamily::Inet,
2131            flags: NeighbourFlags::Proxy,
2132            ..Default::default()
2133        },
2134        &[
2135            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2136        ] => Err(RequestError::UnsupportedFlags(NeighbourFlags::Proxy));
2137        "get unsupported request flag"
2138    )]
2139    #[test_case(
2140        false,
2141        NeighbourHeader {
2142            ifindex: 1,
2143            family: AddressFamily::Inet6,
2144            ..Default::default()
2145        },
2146        &[
2147            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2148        ] => Err(RequestError::AddressFamilyMismatch(AddressFamily::Inet6));
2149        "get address family mismatch"
2150    )]
2151    #[test_case(
2152        false,
2153        NeighbourHeader {
2154            family: AddressFamily::Inet,
2155            ..Default::default()
2156        },
2157        &[
2158            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2159        ] => Err(RequestError::MissingInterface);
2160        "get interface unspecified"
2161    )]
2162    #[test_case(
2163        false,
2164        NeighbourHeader {
2165            ifindex: 1,
2166            family: AddressFamily::Inet,
2167            ..Default::default()
2168        },
2169        &[] => Err(RequestError::MissingIpAddress);
2170        "get destination unspecified"
2171    )]
2172    #[test_case(
2173        false,
2174        NeighbourHeader {
2175            ifindex: 1,
2176            family: AddressFamily::Inet,
2177            ..Default::default()
2178        },
2179        &[
2180            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2181            NeighbourAttribute::LinkLocalAddress(vec![0, 1, 2, 3, 4, 5]),
2182        ] => Err(RequestError::InvalidAttribute);
2183        "get invalid attribute"
2184    )]
2185    #[test_case(
2186        true,
2187        NeighbourHeader::default(),
2188        &[] => Ok(GetNeighborArgs::Dump {
2189            ip_version: None,
2190            interface: None,
2191        });
2192        "dump all"
2193    )]
2194    #[test_case(
2195        true,
2196        NeighbourHeader {
2197            family: AddressFamily::Inet,
2198            ..Default::default()
2199        },
2200        &[] => Ok(GetNeighborArgs::Dump {
2201            ip_version: Some(IpVersion::V4),
2202            interface: None,
2203        });
2204        "dump ipv4 only"
2205    )]
2206    #[test_case(
2207        true,
2208        NeighbourHeader {
2209            family: AddressFamily::Inet6,
2210            ..Default::default()
2211        },
2212        &[] => Ok(GetNeighborArgs::Dump {
2213            ip_version: Some(IpVersion::V6),
2214            interface: None,
2215        });
2216        "dump ipv6 only"
2217    )]
2218    #[test_case(
2219        true,
2220        NeighbourHeader::default(),
2221        &[
2222            NeighbourAttribute::IfIndex(1),
2223        ] => Ok(GetNeighborArgs::Dump {
2224            ip_version: None,
2225            interface: Some(NonZeroU64::new(1).unwrap()),
2226        });
2227        "dump interface 1"
2228    )]
2229    #[test_case(
2230        true,
2231        NeighbourHeader::default(),
2232        &[
2233            NeighbourAttribute::IfIndex(0),
2234        ] => Ok(GetNeighborArgs::Dump {
2235            ip_version: None,
2236            interface: None,
2237        });
2238        "dump interface 0 treated as all interfaces"
2239    )]
2240    #[test_case(
2241        true,
2242        NeighbourHeader {
2243            family: AddressFamily::Local,
2244            ..Default::default()
2245        },
2246        &[] => Err(RequestError::InvalidAddressFamily(AddressFamily::Local));
2247        "dump invalid address family"
2248    )]
2249    #[test_case(
2250        true,
2251        NeighbourHeader::default(),
2252        &[
2253            NeighbourAttribute::LinkLocalAddress(vec![0, 1, 2, 3, 4, 5]),
2254        ] => Ok(GetNeighborArgs::Dump {
2255            ip_version: None,
2256            interface: None,
2257        });
2258        "dump unsupported attribute ignored (non-strict)"
2259    )]
2260    #[test_case(
2261        true,
2262        NeighbourHeader {
2263            flags: NeighbourFlags::Proxy,
2264            ..Default::default()
2265        },
2266        &[] => Err(RequestError::UnsupportedFlags(NeighbourFlags::Proxy));
2267        "dump unsupported request flag"
2268    )]
2269    #[fuchsia::test]
2270    fn get_neighbor_args_try_from_rtnl_neighbor(
2271        is_dump: bool,
2272        header: NeighbourHeader,
2273        attrs: &[NeighbourAttribute],
2274    ) -> Result<GetNeighborArgs, RequestError> {
2275        let mut message = NeighbourMessage::default();
2276        message.header = header;
2277        message.attributes = attrs.to_vec();
2278        GetNeighborArgs::try_from_rtnl_neighbor(&message, is_dump)
2279    }
2280
2281    #[test_case(
2282        NeighbourHeader {
2283            ifindex: 1,
2284            family: AddressFamily::Inet,
2285            ..Default::default()
2286        },
2287        &[
2288            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2289        ] => Ok(DelNeighborArgs {
2290            ip: fidl_ip!("192.168.0.1"),
2291            interface: NonZeroU64::new(1).unwrap(),
2292        });
2293        "del ipv4 success"
2294    )]
2295    #[test_case(
2296        NeighbourHeader {
2297            ifindex: 1,
2298            family: AddressFamily::Inet6,
2299            ..Default::default()
2300        },
2301        &[
2302            NeighbourAttribute::Destination(std_ip_v6!("fe80::1").into()),
2303        ] => Ok(DelNeighborArgs {
2304            ip: fidl_ip!("fe80::1"),
2305            interface: NonZeroU64::new(1).unwrap(),
2306        });
2307        "del ipv6 success"
2308    )]
2309    #[test_case(
2310        NeighbourHeader {
2311            ifindex: 1,
2312            family: AddressFamily::Inet,
2313            flags: NeighbourFlags::Proxy,
2314            ..Default::default()
2315        },
2316        &[
2317            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2318        ] => Err(RequestError::UnsupportedFlags(NeighbourFlags::Proxy));
2319        "del unsupported request flag"
2320    )]
2321    #[test_case(
2322        NeighbourHeader {
2323            family: AddressFamily::Inet,
2324            ..Default::default()
2325        },
2326        &[
2327            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2328        ] => Err(RequestError::MissingInterface);
2329        "del interface unspecified"
2330    )]
2331    #[test_case(
2332        NeighbourHeader {
2333            ifindex: 1,
2334            family: AddressFamily::Inet,
2335            ..Default::default()
2336        },
2337        &[] => Err(RequestError::MissingIpAddress);
2338        "del destination unspecified"
2339    )]
2340    #[fuchsia::test]
2341    fn del_neighbor_args_try_from_rtnl_neighbor(
2342        header: NeighbourHeader,
2343        attrs: &[NeighbourAttribute],
2344    ) -> Result<DelNeighborArgs, RequestError> {
2345        let mut message = NeighbourMessage::default();
2346        message.header = header;
2347        message.attributes = attrs.to_vec();
2348        DelNeighborArgs::try_from_rtnl_neighbor(&message)
2349    }
2350
2351    #[test_case(
2352        NLM_F_CREATE | NLM_F_REPLACE,
2353        NeighbourHeader {
2354            ifindex: 1,
2355            family: AddressFamily::Inet,
2356            state: NeighbourState::Permanent,
2357            ..Default::default()
2358        },
2359        &[
2360            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2361            NeighbourAttribute::LinkLocalAddress(vec![0, 1, 2, 3, 4, 5]),
2362        ] => Ok(NewNeighborArgs::CreateStatic {
2363            ip: fidl_ip!("192.168.0.1"),
2364            interface: NonZeroU64::new(1).unwrap(),
2365            mac: fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] },
2366        });
2367        "create static ipv4 success"
2368    )]
2369    #[test_case(
2370        NLM_F_CREATE | NLM_F_REPLACE,
2371        NeighbourHeader {
2372            ifindex: 1,
2373            family: AddressFamily::Inet6,
2374            state: NeighbourState::Permanent,
2375            ..Default::default()
2376        },
2377        &[
2378            NeighbourAttribute::Destination(std_ip_v6!("fe80::1").into()),
2379            NeighbourAttribute::LinkLocalAddress(vec![0, 1, 2, 3, 4, 5]),
2380        ] => Ok(NewNeighborArgs::CreateStatic {
2381            ip: fidl_ip!("fe80::1"),
2382            interface: NonZeroU64::new(1).unwrap(),
2383            mac: fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] },
2384        });
2385        "create static ipv6 success"
2386    )]
2387    #[test_case(
2388        NLM_F_REPLACE,
2389        NeighbourHeader {
2390            ifindex: 1,
2391            family: AddressFamily::Inet,
2392            state: NeighbourState::Probe,
2393            ..Default::default()
2394        },
2395        &[
2396            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2397        ] => Ok(NewNeighborArgs::ProbeExisting {
2398            ip: fidl_ip!("192.168.0.1"),
2399            interface: NonZeroU64::new(1).unwrap(),
2400        });
2401        "probe existing ipv4 success"
2402    )]
2403    #[test_case(
2404        NLM_F_REPLACE,
2405        NeighbourHeader {
2406            ifindex: 1,
2407            family: AddressFamily::Inet6,
2408            state: NeighbourState::Probe,
2409            ..Default::default()
2410        },
2411        &[
2412            NeighbourAttribute::Destination(std_ip_v6!("fe80::1").into()),
2413        ] => Ok(NewNeighborArgs::ProbeExisting {
2414            ip: fidl_ip!("fe80::1"),
2415            interface: NonZeroU64::new(1).unwrap(),
2416        });
2417        "probe existing ipv6 success"
2418    )]
2419    #[test_case(
2420        NLM_F_REPLACE,
2421        NeighbourHeader {
2422            ifindex: 1,
2423            family: AddressFamily::Inet,
2424            flags: NeighbourFlags::Proxy,
2425            ..Default::default()
2426        },
2427        &[
2428            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2429        ] => Err(RequestError::UnsupportedFlags(NeighbourFlags::Proxy));
2430        "unsupported proxy flag"
2431    )]
2432    #[test_case(
2433        NLM_F_CREATE | NLM_F_REPLACE,
2434        NeighbourHeader {
2435            ifindex: 1,
2436            family: AddressFamily::Inet,
2437            state: NeighbourState::Reachable,
2438            ..Default::default()
2439        },
2440        &[
2441            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2442            NeighbourAttribute::LinkLocalAddress(vec![0, 1, 2, 3, 4, 5]),
2443        ] => Err(RequestError::InvalidState {
2444            actual: NeighbourState::Reachable,
2445            expected: NeighbourState::Permanent,
2446        });
2447        "create invalid state"
2448    )]
2449    #[test_case(
2450        NLM_F_REPLACE,
2451        NeighbourHeader {
2452            ifindex: 1,
2453            family: AddressFamily::Inet,
2454            state: NeighbourState::Permanent,
2455            ..Default::default()
2456        },
2457        &[
2458            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2459        ] => Err(RequestError::InvalidState {
2460            actual: NeighbourState::Permanent,
2461            expected: NeighbourState::Probe,
2462        });
2463        "probe invalid state"
2464    )]
2465    #[test_case(
2466        NLM_F_CREATE | NLM_F_REPLACE,
2467        NeighbourHeader {
2468            ifindex: 1,
2469            family: AddressFamily::Inet,
2470            state: NeighbourState::Permanent,
2471            ..Default::default()
2472        },
2473        &[
2474            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2475        ] => Err(RequestError::MissingMacAddress);
2476        "create missing mac address"
2477    )]
2478    #[test_case(
2479        NLM_F_EXCL,
2480        NeighbourHeader {
2481            ifindex: 1,
2482            family: AddressFamily::Inet,
2483            ..Default::default()
2484        },
2485        &[
2486            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2487        ] => Err(RequestError::UnsupportedOperation);
2488        "unsupported operation flags"
2489    )]
2490    #[fuchsia::test]
2491    fn new_neighbor_args_try_from_rtnl_neighbor(
2492        netlink_flags: u16,
2493        header: NeighbourHeader,
2494        attrs: &[NeighbourAttribute],
2495    ) -> Result<NewNeighborArgs, RequestError> {
2496        let mut message = NeighbourMessage::default();
2497        message.header = header;
2498        message.attributes = attrs.to_vec();
2499        NewNeighborArgs::try_from_rtnl_neighbor(&message, netlink_flags)
2500    }
2501
2502    #[test_case(
2503        RequestError::InvalidState {
2504            actual: NeighbourState::Reachable, expected:NeighbourState::None
2505        } => Errno::EINVAL;
2506        "invalid state"
2507    )]
2508    #[test_case(RequestError::InvalidKind(RouteType::Broadcast) => Errno::EINVAL; "invalid kind")]
2509    #[test_case(
2510        RequestError::InvalidFlags(NeighbourFlags::Router) => Errno::EINVAL;
2511        "invalid flags"
2512    )]
2513    #[test_case(
2514        RequestError::UnsupportedFlags(NeighbourFlags::Proxy) => Errno::ENOTSUP;
2515        "unsupported flags"
2516    )]
2517    #[test_case(
2518        RequestError::InvalidAddressFamily(AddressFamily::Local) => Errno::EAFNOSUPPORT;
2519        "invalid address family"
2520    )]
2521    #[test_case(
2522        RequestError::AddressFamilyMismatch(AddressFamily::Inet6) => Errno::EINVAL;
2523        "address family mismatch"
2524    )]
2525    #[test_case(RequestError::MissingInterface => Errno::EINVAL; "interface unspecified")]
2526    #[test_case(RequestError::MissingIpAddress => Errno::EINVAL; "destination unspecified")]
2527    #[test_case(RequestError::InvalidAttribute => Errno::EINVAL; "invalid attribute")]
2528    #[test_case(RequestError::NeighborNotFound => Errno::ENOENT; "neighbor not found")]
2529    #[test_case(RequestError::InterfaceNotFound => Errno::ENODEV; "interface not found")]
2530    #[test_case(RequestError::InvalidIpAddress => Errno::EINVAL; "invalid IP address")]
2531    #[test_case(RequestError::InvalidMacAddress => Errno::EINVAL; "invalid MAC address")]
2532    #[test_case(RequestError::InterfaceUnsupported => Errno::ENOTSUP; "unsupported interface")]
2533    #[fuchsia::test]
2534    fn request_error_into_errno(error: RequestError) -> Errno {
2535        error.into()
2536    }
2537
2538    enum NeighborsAndInterfaces {}
2539    impl EventLoopSpec for NeighborsAndInterfaces {
2540        type NeighborWorker = Required;
2541
2542        type InterfacesProxy = Required;
2543        type InterfacesStateProxy = Required;
2544        type InterfacesHandler = Required;
2545        type RouteClients = Required;
2546        type InterfacesWorker = Required;
2547
2548        type V4RoutesState = Optional;
2549        type V6RoutesState = Optional;
2550        type V4RoutesSetProvider = Optional;
2551        type V6RoutesSetProvider = Optional;
2552        type V4RouteTableProvider = Optional;
2553        type V6RouteTableProvider = Optional;
2554
2555        type RoutesV4Worker = Optional;
2556        type RoutesV6Worker = Optional;
2557        type RuleV4Worker = Optional;
2558        type RuleV6Worker = Optional;
2559        type NduseroptWorker = Optional;
2560    }
2561
2562    const TEST_SEQUENCE_NUMBER: u32 = 1234;
2563
2564    struct EventLoopSetup {
2565        event_loop: EventLoopState<
2566            FakeInterfacesHandler,
2567            FakeSender<RouteNetlinkMessage>,
2568            NeighborsAndInterfaces,
2569        >,
2570        request_sink: mpsc::Sender<UnifiedRequest<FakeSender<RouteNetlinkMessage>>>,
2571        neighbors_controller_request_stream: fnet_neighbor::ControllerRequestStream,
2572        neighbor_event_sink: mpsc::UnboundedSender<Vec<fnet_neighbor::EntryIteratorItem>>,
2573        interface_event_sink: mpsc::UnboundedSender<fnet_interfaces::Event>,
2574    }
2575
2576    async fn build_event_loop(
2577        scope: &fuchsia_async::Scope,
2578        neighbor_events: &[EventSpec],
2579    ) -> EventLoopSetup {
2580        let included_workers = IncludedWorkers {
2581            routes_v4: EventLoopComponent::Absent(Optional),
2582            routes_v6: EventLoopComponent::Absent(Optional),
2583            interfaces: EventLoopComponent::Present(()),
2584            rules_v4: EventLoopComponent::Absent(Optional),
2585            rules_v6: EventLoopComponent::Absent(Optional),
2586            neighbors: EventLoopComponent::Present(()),
2587            nduseropt: EventLoopComponent::Absent(Optional),
2588        };
2589        let (request_sink, request_stream) = mpsc::channel(1);
2590
2591        // Configure fake neighbor watch events.
2592
2593        let (neighbors_controller, neighbors_controller_request_stream) =
2594            fidl::endpoints::create_proxy_and_stream::<fnet_neighbor::ControllerMarker>();
2595
2596        let (neighbors_view, neighbor_event_sink) = {
2597            let events = fnet_neighbor_ext::testutil::generate_events_from_spec(neighbor_events);
2598            let (event_sender, event_receiver) = mpsc::unbounded();
2599            event_sender.unbounded_send(events).expect("failed to send events");
2600            let (neighbors_view, neighbors_fut) =
2601                fnet_neighbor_ext::testutil::create_fake_view(event_receiver);
2602            let _join_handle = scope.spawn(neighbors_fut);
2603            (neighbors_view, event_sender)
2604        };
2605
2606        // Configure fake interface watch events.
2607
2608        let (interfaces_handler, _interfaces_handler_sink) =
2609            crate::interfaces::testutil::FakeInterfacesHandler::new();
2610        let (interfaces_proxy, _interfaces) =
2611            fidl::endpoints::create_proxy::<fnet_root::InterfacesMarker>();
2612        let (interfaces_state_proxy, interfaces_state) =
2613            fidl::endpoints::create_proxy::<fnet_interfaces::StateMarker>();
2614        let route_clients = ClientTable::default();
2615
2616        let interface_event_sink = {
2617            let if_stream = interfaces_state.into_stream();
2618            let if_watcher_stream = if_stream
2619                .and_then(|req| match req {
2620                    fnet_interfaces::StateRequest::GetWatcher {
2621                        options: _,
2622                        watcher,
2623                        control_handle: _,
2624                    } => futures::future::ready(Ok(watcher.into_stream())),
2625                })
2626                .try_flatten()
2627                .map(|res| res.expect("watcher stream error"));
2628            let (event_sender, event_receiver) = mpsc::unbounded();
2629            event_sender
2630                .unbounded_send(fnet_interfaces::Event::Idle(fnet_interfaces::Empty))
2631                .expect("failed to send event");
2632            let interfaces_fut =
2633                if_watcher_stream.zip(event_receiver).for_each(|(req, update)| async move {
2634                    match req {
2635                        fnet_interfaces::WatcherRequest::Watch { responder } => {
2636                            responder.send(&update).expect("send watch response")
2637                        }
2638                    }
2639                });
2640            let _join_handle = scope.spawn(interfaces_fut);
2641            event_sender
2642        };
2643
2644        // Set up the event loop.
2645
2646        let (_async_work_sink, async_work_receiver) = mpsc::unbounded();
2647        let base_inputs: EventLoopInputs<
2648            FakeInterfacesHandler,
2649            FakeSender<RouteNetlinkMessage>,
2650            NeighborsAndInterfaces,
2651        > = EventLoopInputs {
2652            neighbors_view: EventLoopComponent::Present(neighbors_view),
2653            neighbors_controller: EventLoopComponent::Present(neighbors_controller),
2654
2655            route_clients: EventLoopComponent::Present(route_clients),
2656            interfaces_handler: EventLoopComponent::Present(interfaces_handler),
2657            interfaces_proxy: EventLoopComponent::Present(interfaces_proxy),
2658            interfaces_state_proxy: EventLoopComponent::Present(interfaces_state_proxy),
2659
2660            async_work_receiver,
2661
2662            v4_routes_state: EventLoopComponent::Absent(Optional),
2663            v6_routes_state: EventLoopComponent::Absent(Optional),
2664            v4_main_route_table: EventLoopComponent::Absent(Optional),
2665            v6_main_route_table: EventLoopComponent::Absent(Optional),
2666            v4_route_table_provider: EventLoopComponent::Absent(Optional),
2667            v6_route_table_provider: EventLoopComponent::Absent(Optional),
2668            v4_rule_table: EventLoopComponent::Absent(Optional),
2669            v6_rule_table: EventLoopComponent::Absent(Optional),
2670            ndp_option_watcher_provider: EventLoopComponent::Absent(Optional),
2671
2672            unified_request_stream: request_stream,
2673        };
2674
2675        let event_loop = base_inputs.initialize(included_workers).await;
2676        EventLoopSetup {
2677            event_loop,
2678            request_sink,
2679            neighbors_controller_request_stream,
2680            neighbor_event_sink,
2681            interface_event_sink,
2682        }
2683    }
2684
2685    #[fuchsia::test]
2686    async fn event_loop_with_watch_events_and_get_request() {
2687        let scope = fuchsia_async::Scope::new();
2688        use fnet_neighbor_ext::testutil::EventSpec::*;
2689        let EventLoopSetup {
2690            mut event_loop,
2691            mut request_sink,
2692            neighbors_controller_request_stream: _,
2693            neighbor_event_sink,
2694            interface_event_sink,
2695        } = build_event_loop(&scope, &[Existing(1), Existing(2), Existing(3), Idle, Added(4)])
2696            .await;
2697
2698        // Wait for `Added` event to be processed.
2699        event_loop.run_one_step_in_tests().await;
2700
2701        // Send a dump request and check the response.
2702
2703        let (mut response_sink, neighbor_client, async_work_drain_task) =
2704            crate::client::testutil::new_fake_client::<NetlinkRoute>(
2705                crate::client::testutil::CLIENT_ID_1,
2706                [],
2707            );
2708        let _join_handle = scope.spawn(async_work_drain_task);
2709
2710        let (completer, waiter) = oneshot::channel();
2711        let get_request: UnifiedRequest<FakeSender<RouteNetlinkMessage>> =
2712            UnifiedRequest::NeighborsRequest(Request {
2713                args: NeighborRequestArgs::Get(GetNeighborArgs::Dump {
2714                    ip_version: None,
2715                    interface: None,
2716                }),
2717                sequence_number: TEST_SEQUENCE_NUMBER,
2718                client: neighbor_client.clone(),
2719                completer,
2720            });
2721        request_sink.send(get_request).await.unwrap();
2722
2723        // Wait for client request to be processed.
2724        event_loop.run_one_step_in_tests().await;
2725        assert_matches!(waiter.await.unwrap(), Ok(()));
2726
2727        let responses = response_sink.take_messages();
2728        assert_eq!(responses.len(), 4); // 3 existing + 1 added.
2729        for response in responses {
2730            assert_matches!(
2731                response.message.payload,
2732                NetlinkPayload::InnerMessage(RouteNetlinkMessage::NewNeighbour(_))
2733            );
2734        }
2735
2736        neighbor_event_sink.close_channel();
2737        interface_event_sink.close_channel();
2738        drop(neighbor_client);
2739        scope.join().await;
2740    }
2741
2742    #[fuchsia::test]
2743    async fn event_loop_with_watch_events_and_delete_request() {
2744        let scope = fuchsia_async::Scope::new();
2745        let neighbor_events = {
2746            use fnet_neighbor_ext::testutil::EventSpec::*;
2747            vec![Existing(1), Existing(2), Idle]
2748        };
2749        let EventLoopSetup {
2750            mut event_loop,
2751            mut request_sink,
2752            neighbors_controller_request_stream,
2753            neighbor_event_sink,
2754            interface_event_sink,
2755        } = build_event_loop(&scope, &neighbor_events).await;
2756
2757        let fnet_neighbor::EntryIteratorItem::Existing(to_delete) =
2758            fnet_neighbor_ext::testutil::generate_event_from_spec(&neighbor_events[0])
2759        else {
2760            panic!("unexpected event")
2761        };
2762        let to_delete: fnet_neighbor_ext::Entry =
2763            to_delete.try_into().expect("extension conversion failed");
2764
2765        let (mut response_sink, neighbor_client, async_work_drain_task) =
2766            crate::client::testutil::new_fake_client::<NetlinkRoute>(
2767                crate::client::testutil::CLIENT_ID_1,
2768                [],
2769            );
2770        let _join_handle = scope.spawn(async_work_drain_task);
2771
2772        // Send an RTM_DELNEIGH request for an existing neighbor.
2773
2774        let (completer, waiter) = oneshot::channel();
2775        let waiter = waiter.fuse();
2776        pin_mut!(waiter);
2777
2778        let del_request: UnifiedRequest<FakeSender<RouteNetlinkMessage>> =
2779            UnifiedRequest::NeighborsRequest(Request {
2780                args: NeighborRequestArgs::Del(DelNeighborArgs {
2781                    ip: to_delete.neighbor,
2782                    interface: to_delete.interface,
2783                }),
2784                sequence_number: TEST_SEQUENCE_NUMBER,
2785                client: neighbor_client.clone(),
2786                completer,
2787            });
2788        request_sink.send(del_request).await.expect("failed to send delete request");
2789
2790        // Handle the expected Controller.RemoveEntry request and verify that
2791        // the RTM_DELNEIGH request is still pending.
2792
2793        let controller_req_fut = neighbors_controller_request_stream
2794            .into_future()
2795            .then(async move |(req, _rest)| {
2796                match req
2797                    .expect("Controller request stream unexpectedly ended")
2798                    .expect("failed to receive `Controller` request")
2799                {
2800                    fnet_neighbor::ControllerRequest::RemoveEntry {
2801                        interface,
2802                        neighbor,
2803                        responder,
2804                        ..
2805                    } => {
2806                        assert_eq!(interface, to_delete.interface.get());
2807                        assert_eq!(neighbor, to_delete.neighbor);
2808                        responder.send(Ok(())).expect("failed to respond to RemoveEntry");
2809                    }
2810                    _ => panic!("unexpected controller request"),
2811                }
2812            })
2813            .fuse();
2814        let _join_handle = scope.spawn(controller_req_fut);
2815        event_loop.run_one_step_in_tests().await;
2816
2817        assert_matches!(waiter.as_mut().now_or_never(), None);
2818        assert_eq!(response_sink.take_messages().len(), 0);
2819
2820        // Send an unrelated neighbor watch event and verify that the request is
2821        // still pending.
2822
2823        {
2824            use fnet_neighbor_ext::testutil::EventSpec::*;
2825            neighbor_event_sink
2826                .unbounded_send(fnet_neighbor_ext::testutil::generate_events_from_spec(&[Added(3)]))
2827                .expect("failed to send event");
2828        }
2829
2830        event_loop.run_one_step_in_tests().await;
2831        assert_matches!(waiter.as_mut().now_or_never(), None);
2832        assert_eq!(response_sink.take_messages().len(), 0);
2833
2834        // Send a neighbor watch event indicating successful removal and verify
2835        // that the request is completed.
2836
2837        {
2838            use fnet_neighbor_ext::testutil::EventSpec::*;
2839            neighbor_event_sink
2840                .unbounded_send(fnet_neighbor_ext::testutil::generate_events_from_spec(&[Removed(
2841                    1,
2842                )]))
2843                .expect("failed to send event");
2844        }
2845
2846        event_loop.run_one_step_in_tests().await;
2847        assert_matches!(waiter.await.expect("waiter channel should not be closed"), Ok(()));
2848        // The event loop & worker aren't responsible for the final response to
2849        // the client (either none, or ACK if requested), so there's nothing
2850        // more to check here.
2851
2852        neighbor_event_sink.close_channel();
2853        interface_event_sink.close_channel();
2854        drop(neighbor_client);
2855        scope.join().await;
2856    }
2857
2858    #[fuchsia::test]
2859    async fn event_loop_with_watch_events_create_static_neighbor() {
2860        let scope = fuchsia_async::Scope::new();
2861        let neighbor_events = vec![fnet_neighbor_ext::testutil::EventSpec::Idle];
2862        let EventLoopSetup {
2863            mut event_loop,
2864            mut request_sink,
2865            neighbors_controller_request_stream,
2866            neighbor_event_sink,
2867            interface_event_sink,
2868        } = build_event_loop(&scope, &neighbor_events).await;
2869
2870        let (mut response_sink, neighbor_client, async_work_drain_task) =
2871            crate::client::testutil::new_fake_client::<NetlinkRoute>(
2872                crate::client::testutil::CLIENT_ID_1,
2873                [],
2874            );
2875        let _join_handle = scope.spawn(async_work_drain_task);
2876
2877        // Create expected entry but without sending neighbor event.
2878
2879        let (event, entry) = {
2880            use fnet_neighbor_ext::testutil::EventSpec::*;
2881            let mut added = fnet_neighbor_ext::testutil::generate_event_from_spec(&Added(1));
2882            let fnet_neighbor::EntryIteratorItem::Added(to_add) = &mut added else {
2883                panic!("unexpected event")
2884            };
2885            to_add.state = Some(fnet_neighbor::EntryState::Static);
2886            let entry: fnet_neighbor_ext::Entry =
2887                to_add.clone().try_into().expect("extension conversion failed");
2888            (added, entry)
2889        };
2890
2891        // Send an RTM_NEWNEIGH request for an existing neighbor.
2892
2893        let (completer, waiter) = oneshot::channel();
2894        let waiter = waiter.fuse();
2895        pin_mut!(waiter);
2896
2897        let create_request: UnifiedRequest<FakeSender<RouteNetlinkMessage>> =
2898            UnifiedRequest::NeighborsRequest(Request {
2899                args: NeighborRequestArgs::New(NewNeighborArgs::CreateStatic {
2900                    ip: entry.neighbor,
2901                    interface: entry.interface,
2902                    mac: entry.mac.unwrap(),
2903                }),
2904                sequence_number: TEST_SEQUENCE_NUMBER,
2905                client: neighbor_client.clone(),
2906                completer,
2907            });
2908        request_sink.send(create_request).await.expect("failed to send create request");
2909
2910        // Handle the expected Controller.AddEntry request and verify that the
2911        // RTM_NEWNEIGH request is still pending.
2912
2913        let controller_req_fut = neighbors_controller_request_stream
2914            .into_future()
2915            .then(async move |(req, _rest)| {
2916                match req
2917                    .expect("Controller request stream unexpectedly ended")
2918                    .expect("failed to receive `Controller` request")
2919                {
2920                    fnet_neighbor::ControllerRequest::AddEntry {
2921                        interface,
2922                        neighbor,
2923                        mac,
2924                        responder,
2925                    } => {
2926                        assert_eq!(interface, entry.interface.get());
2927                        assert_eq!(neighbor, entry.neighbor);
2928                        assert_eq!(mac, entry.mac.unwrap());
2929                        responder.send(Ok(())).expect("failed to respond to AddEntry");
2930                    }
2931                    _ => panic!("unexpected controller request"),
2932                }
2933            })
2934            .fuse();
2935        let _join_handle = scope.spawn(controller_req_fut);
2936
2937        event_loop.run_one_step_in_tests().await;
2938        assert_matches!(waiter.as_mut().now_or_never(), None);
2939        assert_eq!(response_sink.take_messages().len(), 0);
2940
2941        // Send an unrelated neighbor watch event and verify that the request is
2942        // still pending.
2943
2944        {
2945            use fnet_neighbor_ext::testutil::EventSpec::*;
2946            neighbor_event_sink
2947                .unbounded_send(fnet_neighbor_ext::testutil::generate_events_from_spec(&[Added(3)]))
2948                .expect("failed to send event");
2949        }
2950
2951        event_loop.run_one_step_in_tests().await;
2952        assert_matches!(waiter.as_mut().now_or_never(), None);
2953        assert_eq!(response_sink.take_messages().len(), 0);
2954
2955        // Send a neighbor watch event indicating successful creation and verify
2956        // that the request is completed.
2957
2958        neighbor_event_sink.unbounded_send(vec![event]).expect("failed to send event");
2959
2960        event_loop.run_one_step_in_tests().await;
2961        assert_matches!(waiter.await.expect("waiter channel should not be closed"), Ok(()));
2962        // The event loop & worker aren't responsible for the final response to
2963        // the client (either none, or ACK if requested), so there's nothing
2964        // more to check here.
2965
2966        neighbor_event_sink.close_channel();
2967        interface_event_sink.close_channel();
2968        drop(neighbor_client);
2969        scope.join().await;
2970    }
2971
2972    #[fuchsia::test]
2973    async fn event_loop_with_watch_events_create_already_existing_succeeds_without_event() {
2974        let scope = fuchsia_async::Scope::new();
2975        let neighbor_events = vec![fnet_neighbor_ext::testutil::EventSpec::Idle];
2976        let EventLoopSetup {
2977            mut event_loop,
2978            mut request_sink,
2979            neighbors_controller_request_stream,
2980            neighbor_event_sink,
2981            interface_event_sink,
2982        } = build_event_loop(&scope, &neighbor_events).await;
2983
2984        let (mut response_sink, neighbor_client, async_work_drain_task) =
2985            crate::client::testutil::new_fake_client::<NetlinkRoute>(
2986                crate::client::testutil::CLIENT_ID_1,
2987                [],
2988            );
2989        let _join_handle = scope.spawn(async_work_drain_task);
2990
2991        // Report existing static entry.
2992
2993        let (event, entry) = {
2994            use fnet_neighbor_ext::testutil::EventSpec::*;
2995            let mut added = fnet_neighbor_ext::testutil::generate_event_from_spec(&Added(1));
2996            let fnet_neighbor::EntryIteratorItem::Added(to_add) = &mut added else {
2997                panic!("unexpected event")
2998            };
2999            to_add.state = Some(fnet_neighbor::EntryState::Static);
3000            let entry: fnet_neighbor_ext::Entry =
3001                to_add.clone().try_into().expect("extension conversion failed");
3002            (added, entry)
3003        };
3004        neighbor_event_sink.unbounded_send(vec![event]).expect("failed to send event");
3005        event_loop.run_one_step_in_tests().await;
3006
3007        // Send an RTM_NEWNEIGH request for an existing neighbor.
3008
3009        let (completer, waiter) = oneshot::channel();
3010        let waiter = waiter.fuse();
3011        pin_mut!(waiter);
3012
3013        let create_request: UnifiedRequest<FakeSender<RouteNetlinkMessage>> =
3014            UnifiedRequest::NeighborsRequest(Request {
3015                args: NeighborRequestArgs::New(NewNeighborArgs::CreateStatic {
3016                    ip: entry.neighbor,
3017                    interface: entry.interface,
3018                    mac: entry.mac.unwrap(),
3019                }),
3020                sequence_number: TEST_SEQUENCE_NUMBER,
3021                client: neighbor_client.clone(),
3022                completer,
3023            });
3024        request_sink.send(create_request).await.expect("failed to send create request");
3025
3026        // Handle the expected Controller.AddEntry request and verify that the
3027        // RTM_NEWNEIGH request is not pending.
3028
3029        let controller_req_fut = neighbors_controller_request_stream
3030            .into_future()
3031            .then(async move |(req, _rest)| {
3032                match req
3033                    .expect("Controller request stream unexpectedly ended")
3034                    .expect("failed to receive `Controller` request")
3035                {
3036                    fnet_neighbor::ControllerRequest::AddEntry {
3037                        interface,
3038                        neighbor,
3039                        mac,
3040                        responder,
3041                    } => {
3042                        assert_eq!(interface, entry.interface.get());
3043                        assert_eq!(neighbor, entry.neighbor);
3044                        assert_eq!(mac, entry.mac.unwrap());
3045                        responder.send(Ok(())).expect("failed to respond to AddEntry");
3046                    }
3047                    _ => panic!("unexpected controller request"),
3048                }
3049            })
3050            .fuse();
3051        let _join_handle = scope.spawn(controller_req_fut);
3052
3053        event_loop.run_one_step_in_tests().await;
3054        assert_matches!(waiter.await.expect("waiter channel should not be closed"), Ok(()));
3055        assert_eq!(response_sink.take_messages().len(), 0);
3056        // The event loop & worker aren't responsible for the final response to
3057        // the client (either none, or ACK if requested), so there's nothing
3058        // more to check here.
3059
3060        neighbor_event_sink.close_channel();
3061        interface_event_sink.close_channel();
3062        drop(neighbor_client);
3063        scope.join().await;
3064    }
3065
3066    #[fuchsia::test]
3067    async fn event_loop_with_watch_events_probe_neighbor() {
3068        let scope = fuchsia_async::Scope::new();
3069        let neighbor_events = vec![fnet_neighbor_ext::testutil::EventSpec::Idle];
3070        let EventLoopSetup {
3071            mut event_loop,
3072            mut request_sink,
3073            neighbors_controller_request_stream,
3074            neighbor_event_sink,
3075            interface_event_sink,
3076        } = build_event_loop(&scope, &neighbor_events).await;
3077
3078        let (mut response_sink, neighbor_client, async_work_drain_task) =
3079            crate::client::testutil::new_fake_client::<NetlinkRoute>(
3080                crate::client::testutil::CLIENT_ID_1,
3081                [],
3082            );
3083        let _join_handle = scope.spawn(async_work_drain_task);
3084
3085        // Report existing entry not in `Probe` state.
3086
3087        let (event, entry) = {
3088            use fnet_neighbor_ext::testutil::EventSpec::*;
3089            let mut added = fnet_neighbor_ext::testutil::generate_event_from_spec(&Added(1));
3090            let fnet_neighbor::EntryIteratorItem::Added(to_add) = &mut added else {
3091                panic!("unexpected event")
3092            };
3093            to_add.state = Some(fnet_neighbor::EntryState::Reachable);
3094            let entry: fnet_neighbor_ext::Entry =
3095                to_add.clone().try_into().expect("extension conversion failed");
3096            (added, entry)
3097        };
3098        neighbor_event_sink.unbounded_send(vec![event]).expect("failed to send event");
3099        event_loop.run_one_step_in_tests().await;
3100
3101        // Send an RTM_NEWNEIGH request to probe the existing neighbor.
3102
3103        let (completer, waiter) = oneshot::channel();
3104        let waiter = waiter.fuse();
3105        pin_mut!(waiter);
3106
3107        let probe_request: UnifiedRequest<FakeSender<RouteNetlinkMessage>> =
3108            UnifiedRequest::NeighborsRequest(Request {
3109                args: NeighborRequestArgs::New(NewNeighborArgs::ProbeExisting {
3110                    ip: entry.neighbor,
3111                    interface: entry.interface,
3112                }),
3113                sequence_number: TEST_SEQUENCE_NUMBER,
3114                client: neighbor_client.clone(),
3115                completer,
3116            });
3117        request_sink.send(probe_request).await.expect("failed to send create request");
3118
3119        // Handle the expected Controller.ProbeEntry request and verify that the
3120        // RTM_NEWNEIGH request is still pending.
3121
3122        let controller_req_fut = neighbors_controller_request_stream
3123            .into_future()
3124            .then(async move |(req, _rest)| {
3125                match req
3126                    .expect("Controller request stream unexpectedly ended")
3127                    .expect("failed to receive `Controller` request")
3128                {
3129                    fnet_neighbor::ControllerRequest::ProbeEntry {
3130                        interface,
3131                        neighbor,
3132                        responder,
3133                    } => {
3134                        assert_eq!(interface, entry.interface.get());
3135                        assert_eq!(neighbor, entry.neighbor);
3136                        responder.send(Ok(())).expect("failed to respond to ProbeEntry");
3137                    }
3138                    _ => panic!("unexpected controller request"),
3139                }
3140            })
3141            .fuse();
3142        let _join_handle = scope.spawn(controller_req_fut);
3143
3144        event_loop.run_one_step_in_tests().await;
3145        assert_matches!(waiter.as_mut().now_or_never(), None);
3146        assert_eq!(response_sink.take_messages().len(), 0);
3147
3148        // Send an unrelated neighbor watch event and verify that the request is
3149        // still pending.
3150
3151        {
3152            use fnet_neighbor_ext::testutil::EventSpec::*;
3153            neighbor_event_sink
3154                .unbounded_send(fnet_neighbor_ext::testutil::generate_events_from_spec(&[Added(3)]))
3155                .expect("failed to send event");
3156        }
3157
3158        event_loop.run_one_step_in_tests().await;
3159        assert_matches!(waiter.as_mut().now_or_never(), None);
3160        assert_eq!(response_sink.take_messages().len(), 0);
3161
3162        // Send a neighbor watch event indicating successful transition to
3163        // `Probe` and verify that the request is completed.
3164
3165        {
3166            let mut changed = entry.clone();
3167            changed.state = fnet_neighbor::EntryState::Probe;
3168            let changed_event = fnet_neighbor::EntryIteratorItem::Changed(changed.into());
3169            neighbor_event_sink.unbounded_send(vec![changed_event]).expect("failed to send event");
3170        }
3171
3172        event_loop.run_one_step_in_tests().await;
3173        assert_matches!(waiter.await.expect("waiter channel should not be closed"), Ok(()));
3174        // The event loop & worker aren't responsible for the final response to
3175        // the client (either none, or ACK if requested), so there's nothing
3176        // more to check here.
3177
3178        neighbor_event_sink.close_channel();
3179        interface_event_sink.close_channel();
3180        drop(neighbor_client);
3181        scope.join().await;
3182    }
3183
3184    #[fuchsia::test]
3185    async fn event_loop_with_watch_events_probe_already_probing_succeeds_without_event() {
3186        let scope = fuchsia_async::Scope::new();
3187        let neighbor_events = vec![fnet_neighbor_ext::testutil::EventSpec::Idle];
3188        let EventLoopSetup {
3189            mut event_loop,
3190            mut request_sink,
3191            neighbors_controller_request_stream,
3192            neighbor_event_sink,
3193            interface_event_sink,
3194        } = build_event_loop(&scope, &neighbor_events).await;
3195
3196        let (mut response_sink, neighbor_client, async_work_drain_task) =
3197            crate::client::testutil::new_fake_client::<NetlinkRoute>(
3198                crate::client::testutil::CLIENT_ID_1,
3199                [],
3200            );
3201        let _join_handle = scope.spawn(async_work_drain_task);
3202
3203        // Report existing `Probe` entry.
3204
3205        let (event, entry) = {
3206            use fnet_neighbor_ext::testutil::EventSpec::*;
3207            let mut added = fnet_neighbor_ext::testutil::generate_event_from_spec(&Added(1));
3208            let fnet_neighbor::EntryIteratorItem::Added(to_add) = &mut added else {
3209                panic!("unexpected event")
3210            };
3211            to_add.state = Some(fnet_neighbor::EntryState::Probe);
3212            let entry: fnet_neighbor_ext::Entry =
3213                to_add.clone().try_into().expect("extension conversion failed");
3214            (added, entry)
3215        };
3216        neighbor_event_sink.unbounded_send(vec![event]).expect("failed to send event");
3217        event_loop.run_one_step_in_tests().await;
3218
3219        // Send an RTM_NEWNEIGH request for the neighbor in `Probe` state.
3220
3221        let (completer, waiter) = oneshot::channel();
3222        let waiter = waiter.fuse();
3223        pin_mut!(waiter);
3224
3225        let probe_request: UnifiedRequest<FakeSender<RouteNetlinkMessage>> =
3226            UnifiedRequest::NeighborsRequest(Request {
3227                args: NeighborRequestArgs::New(NewNeighborArgs::ProbeExisting {
3228                    ip: entry.neighbor,
3229                    interface: entry.interface,
3230                }),
3231                sequence_number: TEST_SEQUENCE_NUMBER,
3232                client: neighbor_client.clone(),
3233                completer,
3234            });
3235        request_sink.send(probe_request).await.expect("failed to send create request");
3236
3237        // Handle the expected Controller.ProbeEntry request and verify that the
3238        // RTM_NEWNEIGH request is not pending.
3239
3240        let controller_req_fut = neighbors_controller_request_stream
3241            .into_future()
3242            .then(async move |(req, _rest)| {
3243                match req
3244                    .expect("Controller request stream unexpectedly ended")
3245                    .expect("failed to receive `Controller` request")
3246                {
3247                    fnet_neighbor::ControllerRequest::ProbeEntry {
3248                        interface,
3249                        neighbor,
3250                        responder,
3251                    } => {
3252                        assert_eq!(interface, entry.interface.get());
3253                        assert_eq!(neighbor, entry.neighbor);
3254                        responder.send(Ok(())).expect("failed to respond to ProbeEntry");
3255                    }
3256                    _ => panic!("unexpected controller request"),
3257                }
3258            })
3259            .fuse();
3260        let _join_handle = scope.spawn(controller_req_fut);
3261
3262        event_loop.run_one_step_in_tests().await;
3263        assert_matches!(waiter.await.expect("waiter channel should not be closed"), Ok(()));
3264        assert_eq!(response_sink.take_messages().len(), 0);
3265        // The event loop & worker aren't responsible for the final response to
3266        // the client (either none, or ACK if requested), so there's nothing
3267        // more to check here.
3268
3269        neighbor_event_sink.close_channel();
3270        interface_event_sink.close_channel();
3271        drop(neighbor_client);
3272        scope.join().await;
3273    }
3274}