Skip to main content

netlink/
neighbors.rs

1// Copyright 2026 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A module for managing neighbor information by receiving RTM_*NEIGH Netlink
6//! messages and maintaining neighbor table state from Netstack.
7
8use std::collections::{BTreeMap, HashMap, HashSet};
9use std::net::IpAddr;
10use std::num::NonZeroU64;
11
12use crate::Errno;
13use crate::client::{ClientTable, InternalClient};
14use crate::logging::{log_debug, log_warn};
15use crate::messaging::Sender;
16use crate::multicast_groups::ModernGroup;
17use crate::netlink_packet::UNSPECIFIED_SEQUENCE_NUMBER;
18use crate::protocol_family::ProtocolFamily;
19use crate::protocol_family::route::NetlinkRoute;
20use crate::util::respond_to_completer;
21use derivative::Derivative;
22use futures::StreamExt as _;
23use futures::channel::oneshot;
24use linux_uapi::rtnetlink_groups_RTNLGRP_NEIGH;
25use net_types::ip::IpVersion;
26use netlink_packet_core::{
27    NLM_F_APPEND, NLM_F_CREATE, NLM_F_EXCL, NLM_F_MULTIPART, NLM_F_REPLACE, NetlinkMessage,
28};
29use netlink_packet_route::neighbour::{
30    NeighbourAddress, NeighbourAttribute, NeighbourFlags, NeighbourHeader, NeighbourMessage,
31    NeighbourState,
32};
33use netlink_packet_route::route::RouteType;
34use netlink_packet_route::{AddressFamily, RouteNetlinkMessage};
35use thiserror::Error;
36
37use fidl_fuchsia_net as fnet;
38use fidl_fuchsia_net_ext as fnet_ext;
39use fidl_fuchsia_net_interfaces_ext as fnet_interfaces_ext;
40use fidl_fuchsia_net_neighbor as fnet_neighbor;
41use fidl_fuchsia_net_neighbor_ext as fnet_neighbor_ext;
42
43/// NetlinkNeighborMessage conversion related errors.
44#[derive(Debug, PartialEq)]
45pub(crate) enum NetlinkNeighborMessageConversionError {
46    /// Interface id could not be downcasted to fit into the expected u32.
47    InvalidInterfaceId(u64),
48}
49
50/// A wrapper type for the netlink_packet_route `NeighbourMessage` to enable conversions
51/// from [`fnet_neighbor_ext::Entry`].
52#[derive(Clone, Debug, Eq, PartialEq)]
53pub(crate) struct NetlinkNeighborMessage(pub(crate) NeighbourMessage);
54
55impl NetlinkNeighborMessage {
56    pub(crate) fn optionally_from(
57        neighbor: fnet_neighbor_ext::Entry,
58    ) -> Option<NetlinkNeighborMessage> {
59        match neighbor.try_into() {
60            Ok(message) => Some(message),
61            Err(NetlinkNeighborMessageConversionError::InvalidInterfaceId(id)) => {
62                log_warn!("Invalid interface id found in neighbor table entry: {}", id);
63                None
64            }
65        }
66    }
67
68    /// Wrap the inner [`NeighbourMessage`] in an [`RtnlMessage::NewNeighbour`].
69    pub(crate) fn into_rtnl_new_neighbor(
70        self,
71        sequence_number: u32,
72        is_dump: bool,
73    ) -> NetlinkMessage<RouteNetlinkMessage> {
74        let NetlinkNeighborMessage(message) = self;
75        let mut msg: NetlinkMessage<RouteNetlinkMessage> =
76            RouteNetlinkMessage::NewNeighbour(message).into();
77        msg.header.sequence_number = sequence_number;
78        if is_dump {
79            msg.header.flags |= NLM_F_MULTIPART;
80        }
81        msg.finalize();
82        msg
83    }
84
85    /// Wrap the inner [`NeighbourMessage`] in an [`RtnlMessage::DelNeighbour`].
86    pub(crate) fn into_rtnl_del_neighbor(self) -> NetlinkMessage<RouteNetlinkMessage> {
87        let NetlinkNeighborMessage(message) = self;
88        let mut msg: NetlinkMessage<RouteNetlinkMessage> =
89            RouteNetlinkMessage::DelNeighbour(message).into();
90        msg.finalize();
91        msg
92    }
93}
94
95impl TryFrom<fnet_neighbor_ext::Entry> for NetlinkNeighborMessage {
96    type Error = NetlinkNeighborMessageConversionError;
97
98    fn try_from(
99        neighbor: fnet_neighbor_ext::Entry,
100    ) -> Result<NetlinkNeighborMessage, NetlinkNeighborMessageConversionError> {
101        let mut header = NeighbourHeader::default();
102        let fnet_ext::IpAddress(addr) = neighbor.neighbor.into();
103        header.family = match addr {
104            IpAddr::V4(_) => AddressFamily::Inet,
105            IpAddr::V6(_) => AddressFamily::Inet6,
106        };
107        header.ifindex = neighbor.interface.get().try_into().map_err(|_| {
108            NetlinkNeighborMessageConversionError::InvalidInterfaceId(neighbor.interface.get())
109        })?;
110        header.state = match neighbor.state {
111            fnet_neighbor::EntryState::Delay => NeighbourState::Delay,
112            fnet_neighbor::EntryState::Incomplete => NeighbourState::Incomplete,
113            fnet_neighbor::EntryState::Probe => NeighbourState::Probe,
114            fnet_neighbor::EntryState::Reachable => NeighbourState::Reachable,
115            fnet_neighbor::EntryState::Stale => NeighbourState::Stale,
116            fnet_neighbor::EntryState::Static => NeighbourState::Permanent,
117            fnet_neighbor::EntryState::Unreachable => NeighbourState::Failed,
118        };
119        // Unlike Linux, Netstack3 only keeps unicast addresses in its neighbor
120        // tables so there's no need to derive this from the address and/or
121        // interface properties.
122        header.kind = RouteType::Unicast;
123
124        let mut attributes = vec![];
125        attributes.push(NeighbourAttribute::Destination(match addr {
126            IpAddr::V4(addr) => addr.into(),
127            IpAddr::V6(addr) => addr.into(),
128        }));
129        if let Some(mac) = neighbor.mac {
130            attributes.push(NeighbourAttribute::LinkLocalAddress(mac.octets.into()));
131        }
132        // TODO(https://fxbug.dev/488135156): Include the `CacheInfo` attribute
133        // with the last update time set.
134
135        let mut msg = NeighbourMessage::default();
136        msg.header = header;
137        msg.attributes = attributes;
138        Ok(NetlinkNeighborMessage(msg))
139    }
140}
141
142fn neighbor_fidl_ip(
143    family: AddressFamily,
144    address: Option<&NeighbourAddress>,
145) -> Result<fnet::IpAddress, RequestError> {
146    match family {
147        AddressFamily::Inet => match address {
148            Some(NeighbourAddress::Inet(addr)) => Ok(fnet_ext::IpAddress(IpAddr::V4(*addr)).into()),
149            Some(_) => Err(RequestError::AddressFamilyMismatch(family)),
150            None => Err(RequestError::MissingIpAddress),
151        },
152        AddressFamily::Inet6 => match address {
153            Some(NeighbourAddress::Inet6(addr)) => {
154                Ok(fnet_ext::IpAddress(IpAddr::V6(*addr)).into())
155            }
156            Some(_) => Err(RequestError::AddressFamilyMismatch(family)),
157            None => Err(RequestError::MissingIpAddress),
158        },
159        _ => Err(RequestError::InvalidAddressFamily(family)),
160    }
161}
162
163/// Arguments for an RTM_GETNEIGH [`Request`].
164#[derive(Copy, Clone, Debug, PartialEq, Eq)]
165pub(crate) enum GetNeighborArgs {
166    Dump { ip_version: Option<IpVersion>, interface: Option<NonZeroU64> },
167    Get { ip: fnet::IpAddress, interface: NonZeroU64 },
168}
169
170impl GetNeighborArgs {
171    // Attempts to convert a netlink_packet_route `NeighbourMessage` into
172    // `GetNeighborArgs`.
173    pub(crate) fn try_from_rtnl_neighbor(
174        message: &NeighbourMessage,
175        is_dump: bool,
176    ) -> Result<Self, RequestError> {
177        if is_dump {
178            Self::dump_request_from_rtnl_neighbor(message)
179                .inspect_err(|e| log_debug!("{e} in dump neighbors request"))
180        } else {
181            Self::get_request_from_rtnl_neighbor(message)
182                .inspect_err(|e| log_debug!("{e} in get neighbors request"))
183        }
184    }
185
186    fn dump_request_from_rtnl_neighbor(message: &NeighbourMessage) -> Result<Self, RequestError> {
187        let NeighbourHeader { family, flags, .. } = &message.header;
188        if flags.contains(NeighbourFlags::Proxy) {
189            // Netstack3 does not support ARP/NDP proxying.
190            // TODO(https://fxbug.dev/42111873): Support ARP/NDP proxying.
191            log_warn!("unsupported Proxy flag in dump neighbors request");
192            return Err(RequestError::UnsupportedFlags(*flags));
193        }
194        // TODO(https://fxbug.dev/500013355): Support strict validation of dump
195        // requests.
196        let ip_version = match family {
197            AddressFamily::Unspec => None,
198            AddressFamily::Inet => Some(IpVersion::V4),
199            AddressFamily::Inet6 => Some(IpVersion::V6),
200            family => {
201                return Err(RequestError::InvalidAddressFamily(*family));
202            }
203        };
204        // Note that the interface index is pulled from the attribute here,
205        // whereas it's pulled from the header for get requests. This is
206        // intentional, in order to maintain consistency with Linux's behavior.
207        let interface = message
208            .attributes
209            .iter()
210            .find_map(|attr| match attr {
211                NeighbourAttribute::IfIndex(ifindex) => Some(u64::from(*ifindex).try_into()),
212                _ => None,
213            })
214            .transpose()
215            // 0 is treated as a lack of filter.
216            .unwrap_or(None);
217        Ok(GetNeighborArgs::Dump { ip_version, interface })
218    }
219
220    fn get_request_from_rtnl_neighbor(message: &NeighbourMessage) -> Result<Self, RequestError> {
221        let NeighbourHeader { ifindex, family, state, flags, kind } = &message.header;
222        if *state != NeighbourState::None {
223            return Err(RequestError::InvalidState {
224                actual: *state,
225                expected: NeighbourState::None,
226            });
227        }
228        if *kind != RouteType::Unspec {
229            return Err(RequestError::InvalidKind(*kind));
230        }
231        if flags.intersects(!NeighbourFlags::Proxy) {
232            return Err(RequestError::InvalidFlags(*flags));
233        }
234        if flags.contains(NeighbourFlags::Proxy) {
235            // Netstack3 does not support ARP/NDP proxying.
236            // TODO(https://fxbug.dev/42111873): Support ARP/NDP proxying.
237            log_warn!("unsupported Proxy flag in get neighbor request");
238            return Err(RequestError::UnsupportedFlags(*flags));
239        }
240
241        let (address, unsupported) = message.attributes.iter().fold(
242            (None, false),
243            |(address_acc, unsupported_acc), attr| {
244                match attr {
245                    NeighbourAttribute::Destination(addr) => {
246                        // Note: In the event the Destination attribute is
247                        // provided multiple times, keep the first.
248                        (address_acc.or(Some(addr)), unsupported_acc)
249                    }
250                    _ => {
251                        if !unsupported_acc {
252                            // Only log for the first invalid attribute to avoid spamming.
253                            log_warn!(
254                                "unsupported request attribute: {attr:?} in get neighbor\
255                                request; only `DST` is supported"
256                            );
257                        }
258                        (address_acc, true)
259                    }
260                }
261            },
262        );
263        if unsupported {
264            return Err(RequestError::InvalidAttribute);
265        }
266        let ip = neighbor_fidl_ip(*family, address)?;
267        // Note that the interface index is pulled from the header here, whereas
268        // it's pulled from the attribute for dump requests. This is
269        // intentional, in order to maintain consistency with Linux's behavior.
270        let interface =
271            u64::from(*ifindex).try_into().map_err(|_| RequestError::MissingInterface)?;
272        Ok(GetNeighborArgs::Get { ip, interface })
273    }
274}
275
276/// Arguments for an RTM_NEWNEIGH [`Request`].
277#[derive(Copy, Clone, Debug, PartialEq, Eq)]
278pub(crate) enum NewNeighborArgs {
279    CreateStatic { ip: fnet::IpAddress, interface: NonZeroU64, mac: fnet::MacAddress },
280    ProbeExisting { ip: fnet::IpAddress, interface: NonZeroU64 },
281}
282
283impl NewNeighborArgs {
284    // Attempts to convert a netlink_packet_route `NeighbourMessage` into
285    // `NewNeighborArgs`.
286    pub(crate) fn try_from_rtnl_neighbor(
287        message: &NeighbourMessage,
288        netlink_flags: u16,
289    ) -> Result<Self, RequestError> {
290        Self::try_from_rtnl_neighbor_internal(message, netlink_flags)
291            .inspect_err(|e| log_debug!("{e} in new neighbor request"))
292    }
293
294    fn try_from_rtnl_neighbor_internal(
295        message: &NeighbourMessage,
296        netlink_flags: u16,
297    ) -> Result<Self, RequestError> {
298        let NeighbourHeader { ifindex, family, flags, state, .. } = &message.header;
299        if flags.contains(NeighbourFlags::Proxy) {
300            // Netstack3 does not support ARP/NDP proxying.
301            // TODO(https://fxbug.dev/42111873): Support ARP/NDP proxying.
302            log_warn!("unsupported Proxy flag in new neighbor request");
303            return Err(RequestError::UnsupportedFlags(*flags));
304        }
305
306        // Read common attributes required for identifying the neighbor.
307
308        let (ip_addr, ll_addr) =
309            message.attributes.iter().fold((None, None), |acc @ (ip, ll), attr| match attr {
310                // Note: In the event an attribute is provided multiple times,
311                // keep the first value.
312                NeighbourAttribute::Destination(addr) => (ip.or(Some(addr)), ll),
313                NeighbourAttribute::LinkLocalAddress(addr) => (ip, ll.or(Some(addr))),
314                _ => acc,
315            });
316        let ip = neighbor_fidl_ip(*family, ip_addr)?;
317        let interface =
318            u64::from(*ifindex).try_into().map_err(|_| RequestError::MissingInterface)?;
319
320        // Determine the specific operation and check operation-specific
321        // attributes.
322
323        let new_neighbor_flags = NLM_F_CREATE | NLM_F_REPLACE | NLM_F_EXCL | NLM_F_APPEND;
324        let set_flags = netlink_flags & new_neighbor_flags;
325        if set_flags == NLM_F_REPLACE {
326            // If the caller only specified `NLM_F_REPLACE`, the only case
327            // Netstack3 supports is triggering an immediate neighbor probe.
328            if *state != NeighbourState::Probe {
329                return Err(RequestError::InvalidState {
330                    actual: *state,
331                    expected: NeighbourState::Probe,
332                });
333            }
334            // Setting the link address while triggering a neighbor probe is
335            // unsupported.
336            if ll_addr.is_some() {
337                return Err(RequestError::InvalidAttribute);
338            }
339            Ok(NewNeighborArgs::ProbeExisting { interface, ip })
340        } else if set_flags == (NLM_F_CREATE | NLM_F_REPLACE) {
341            // If the caller specified `NLM_F_CREATE`, Netstack3 only supports
342            // addition of static neighbors.
343            if *state != NeighbourState::Permanent {
344                return Err(RequestError::InvalidState {
345                    actual: *state,
346                    expected: NeighbourState::Permanent,
347                });
348            }
349            let mac = ll_addr.ok_or(RequestError::MissingMacAddress).and_then(|mac| {
350                mac.clone()
351                    .try_into()
352                    .map_err(|_| RequestError::InvalidMacAddress)
353                    .map(|octets| fnet::MacAddress { octets })
354            })?;
355            Ok(NewNeighborArgs::CreateStatic { interface, ip, mac })
356        } else {
357            Err(RequestError::UnsupportedOperation)
358        }
359    }
360}
361
362/// Arguments for an RTM_DELNEIGH [`Request`].
363#[derive(Copy, Clone, Debug, PartialEq, Eq)]
364pub(crate) struct DelNeighborArgs {
365    pub(crate) ip: fnet::IpAddress,
366    pub(crate) interface: NonZeroU64,
367}
368
369impl DelNeighborArgs {
370    // Attempts to convert a netlink_packet_route `NeighbourMessage` into
371    // `DelNeighborArgs`.
372    pub(crate) fn try_from_rtnl_neighbor(message: &NeighbourMessage) -> Result<Self, RequestError> {
373        Self::try_from_rtnl_neighbor_internal(message)
374            .inspect_err(|e| log_debug!("{e} in del neighbor request"))
375    }
376
377    fn try_from_rtnl_neighbor_internal(message: &NeighbourMessage) -> Result<Self, RequestError> {
378        let NeighbourHeader { ifindex, family, flags, .. } = &message.header;
379        if flags.contains(NeighbourFlags::Proxy) {
380            // Netstack3 does not support ARP/NDP proxying.
381            // TODO(https://fxbug.dev/42111873): Support ARP/NDP proxying.
382            log_warn!("unsupported Proxy flag in del neighbor request");
383            return Err(RequestError::UnsupportedFlags(*flags));
384        }
385
386        let address = message.attributes.iter().find_map(|attr| match attr {
387            NeighbourAttribute::Destination(addr) => Some(addr),
388            _ => None,
389        });
390        let ip = neighbor_fidl_ip(*family, address)?;
391        let interface =
392            u64::from(*ifindex).try_into().map_err(|_| RequestError::MissingInterface)?;
393        Ok(Self { interface, ip })
394    }
395}
396
397/// [`Request`] arguments associated with neighbors.
398#[derive(Copy, Clone, Debug, PartialEq, Eq)]
399pub(crate) enum NeighborRequestArgs {
400    /// RTM_GETNEIGH
401    Get(GetNeighborArgs),
402    /// RTM_NEWNEIGH
403    New(NewNeighborArgs),
404    /// RTM_DELNEIGH
405    Del(DelNeighborArgs),
406}
407
408/// An error encountered while handling a [`Request`].
409#[derive(Copy, Clone, Debug, PartialEq, Eq, Error)]
410pub(crate) enum RequestError {
411    /// Invalid state in neighbor header.
412    #[error("invalid state; expected={expected:?}, actual={actual:?}")]
413    InvalidState { actual: NeighbourState, expected: NeighbourState },
414    /// Invalid kind in neighbor header.
415    #[error("invalid kind: {0:?}")]
416    InvalidKind(RouteType),
417    /// Invalid flags in neighbor header.
418    #[error("invalid flags: {0:?}")]
419    InvalidFlags(NeighbourFlags),
420    /// Unsupported flags.
421    #[error("unsupported flags: {0:?}")]
422    UnsupportedFlags(NeighbourFlags),
423    /// Invalid address family.
424    #[error("invalid address family: {0:?}")]
425    InvalidAddressFamily(AddressFamily),
426    /// Address family in request header doesn't match family of address.
427    // In practice this should never be encountered:
428    // `NeighbourAddress::parse_with_param` parses the address based on the
429    // address family from the header, and a discrepancy between the expected
430    // and actual address length results in a parsing failure.
431    #[error("address family mismatch; expected={0:?}")]
432    AddressFamilyMismatch(AddressFamily),
433    /// Request doesn't specify required neighbor IP address.
434    #[error("missing required `DST` attribute")]
435    MissingIpAddress,
436    /// Request doesn't specify required neighbor MAC address.
437    #[error("missing required `LLADDR` attribute")]
438    MissingMacAddress,
439    /// Request doesn't specify required interface.
440    #[error("missing required interface")]
441    MissingInterface,
442    /// Request specifies invalid attribute(s).
443    #[error("invalid request attribute")]
444    InvalidAttribute,
445    /// No such neighbor.
446    #[error("no such neighbor")]
447    NeighborNotFound,
448    /// Interface not found.
449    #[error("no such interface")]
450    InterfaceNotFound,
451    /// Invalid neighbor IP address.
452    #[error("invalid neighbor IP address")]
453    InvalidIpAddress,
454    /// Invalid neighbor MAC address.
455    #[error("invalid neighbor MAC address")]
456    InvalidMacAddress,
457    /// Interface not supported.
458    #[error("interface not supported")]
459    InterfaceUnsupported,
460    /// Neighbor link address unknown.
461    #[error("link address unknown")]
462    LinkAddressUnknown,
463    /// Operation not supported.
464    #[error("unsupported operation")]
465    UnsupportedOperation,
466}
467
468impl From<RequestError> for Errno {
469    fn from(value: RequestError) -> Self {
470        match value {
471            RequestError::InvalidState { .. } => Errno::EINVAL,
472            RequestError::InvalidKind(_) => Errno::EINVAL,
473            RequestError::InvalidFlags(_) => Errno::EINVAL,
474            RequestError::UnsupportedFlags(_) => Errno::ENOTSUP,
475            RequestError::InvalidAddressFamily(_) => Errno::EAFNOSUPPORT,
476            RequestError::AddressFamilyMismatch(_) => Errno::EINVAL,
477            RequestError::MissingIpAddress => Errno::EINVAL,
478            RequestError::MissingMacAddress => Errno::EINVAL,
479            RequestError::MissingInterface => Errno::EINVAL,
480            RequestError::InvalidAttribute => Errno::EINVAL,
481            RequestError::NeighborNotFound => Errno::ENOENT,
482            RequestError::InterfaceNotFound => Errno::ENODEV,
483            RequestError::InvalidIpAddress => Errno::EINVAL,
484            RequestError::InvalidMacAddress => Errno::EINVAL,
485            RequestError::InterfaceUnsupported => Errno::ENOTSUP,
486            RequestError::LinkAddressUnknown => Errno::EINVAL,
487            RequestError::UnsupportedOperation => Errno::ENOTSUP,
488        }
489    }
490}
491
492impl From<fnet_neighbor::ControllerError> for RequestError {
493    fn from(value: fnet_neighbor::ControllerError) -> Self {
494        use fnet_neighbor::ControllerError;
495        match value {
496            ControllerError::InterfaceNotFound => RequestError::InterfaceNotFound,
497            ControllerError::InterfaceNotSupported => RequestError::InterfaceUnsupported,
498            ControllerError::InvalidIpAddress => RequestError::InvalidIpAddress,
499            ControllerError::MacAddressNotUnicast => RequestError::InvalidMacAddress,
500            ControllerError::NeighborNotFound => RequestError::NeighborNotFound,
501            ControllerError::LinkAddressUnknown => RequestError::LinkAddressUnknown,
502            ControllerError::__SourceBreaking { unknown_ordinal: e } => {
503                panic!("encountered unknown controller error: {e:?}")
504            }
505        }
506    }
507}
508
509/// Trait abstracting the ability to check if an interface exists.
510pub(crate) trait LookupIfInterfaceExists {
511    /// Returns whether an interface exists at the provided index.
512    fn exists(&self, interface: NonZeroU64) -> bool;
513}
514
515type InterfaceMap = BTreeMap<
516    u64,
517    fnet_interfaces_ext::PropertiesAndState<
518        crate::interfaces::InterfaceState,
519        fnet_interfaces_ext::AllInterest,
520    >,
521>;
522
523impl LookupIfInterfaceExists for InterfaceMap {
524    fn exists(&self, interface: NonZeroU64) -> bool {
525        self.contains_key(&interface.get())
526    }
527}
528
529/// A request associated with neighbors.
530#[derive(Derivative)]
531#[derivative(Debug(bound = ""))]
532pub(crate) struct Request<S: Sender<<NetlinkRoute as ProtocolFamily>::Response>> {
533    /// The resource and operation-specific argument(s) for this request.
534    pub args: NeighborRequestArgs,
535    /// The request's sequence number.
536    ///
537    /// This value will be copied verbatim into any message sent as a result of
538    /// this request.
539    pub sequence_number: u32,
540    /// The client that made the request.
541    pub client: InternalClient<NetlinkRoute, S>,
542    /// A completer that will have the result of the request sent over.
543    pub completer: oneshot::Sender<Result<(), RequestError>>,
544}
545
546/// A subset of `NeighborRequestArgs`, containing only `Request` types that can be pending.
547#[derive(Clone, Debug, PartialEq, Eq)]
548pub(crate) enum PendingNeighborRequestArgs {
549    /// RTM_NEWNEIGH
550    New(NewNeighborArgs),
551    /// RTM_DELNEIGH
552    Del(DelNeighborArgs),
553}
554
555#[derive(Derivative)]
556#[derivative(Debug(bound = ""))]
557pub(crate) struct PendingNeighborRequest<S: Sender<<NetlinkRoute as ProtocolFamily>::Response>> {
558    request_args: PendingNeighborRequestArgs,
559    client: InternalClient<NetlinkRoute, S>,
560    completer: oneshot::Sender<Result<(), RequestError>>,
561}
562
563/// Errors related to handling neighbor events from Netstack.
564#[derive(Debug, Error, PartialEq)]
565pub(crate) enum HandleWatchEventError {
566    /// An event indicated a neighbor was removed that was not previously known.
567    #[error("Netstack reported removal of an unknown neighbor: {0:?}")]
568    UnknownNeighborRemoved(fnet_neighbor_ext::Entry),
569    /// An event indicated a neighbor was changed that was not previously known.
570    #[error("Netstack reported change of an unknown neighbor: {0:?}")]
571    UnknownNeighborChanged(fnet_neighbor_ext::Entry),
572    /// An event indicated a neighbor was added that conflicts with a known
573    /// neighbor.
574    #[error(
575        "Netstack reported addition of a neighbor that already exists: \
576        existing={existing:?}, new={new:?}"
577    )]
578    ConflictingNeighborAdded { existing: fnet_neighbor_ext::Entry, new: fnet_neighbor_ext::Entry },
579    /// An `Existing` or `Idle` event was received after collecting the initial
580    /// neighbors from the event stream.
581    #[error("Netstack reported unexpected event: {0:?}")]
582    UnexpectedEventReceived(fnet_neighbor_ext::Event),
583}
584
585#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
586struct NeighborKey {
587    interface: NonZeroU64,
588    neighbor: fnet::IpAddress,
589}
590
591impl From<&fnet_neighbor_ext::Entry> for NeighborKey {
592    fn from(
593        fnet_neighbor_ext::Entry { interface, neighbor, .. }: &fnet_neighbor_ext::Entry,
594    ) -> NeighborKey {
595        NeighborKey { interface: *interface, neighbor: *neighbor }
596    }
597}
598
599/// Handles asynchronous work related to RTM_*NEIGH messages.
600///
601/// Can respond to RTM_*NEIGH message requests.
602pub(crate) struct NeighborsWorker {
603    neighbor_table: HashMap<NeighborKey, fnet_neighbor_ext::Entry>,
604    neighbors_controller: fnet_neighbor::ControllerProxy,
605}
606
607impl NeighborsWorker {
608    /// Create the Netlink Neighbors Worker.
609    ///
610    /// Panics if the existing neighbors cannot be retrieved from
611    /// `neighbors_view` or if the response contains conflicting neighbors.
612    pub(crate) async fn create(
613        neighbors_view: &fnet_neighbor::ViewProxy,
614        neighbors_controller: fnet_neighbor::ControllerProxy,
615    ) -> (
616        Self,
617        impl futures::Stream<
618            Item = Result<fnet_neighbor_ext::Event, fnet_neighbor_ext::EntryIteratorError>,
619        > + Unpin
620        + 'static,
621    ) {
622        let mut neighbor_event_stream = Box::pin(
623            fnet_neighbor_ext::event_stream_from_view(neighbors_view)
624                .expect("connecting to fuchsia.net.neighbors.View FIDL should succeed"),
625        );
626        let existing_neighbors: HashSet<fnet_neighbor_ext::Entry> =
627            fnet_neighbor_ext::collect_neighbors_until_idle(neighbor_event_stream.by_ref())
628                .await
629                .expect("determining existing neighbors should succeed");
630        let existing_count = existing_neighbors.len();
631        let neighbor_table = existing_neighbors
632            .into_iter()
633            .map(|e| (NeighborKey::from(&e), e))
634            .collect::<HashMap<_, _>>();
635        assert_eq!(
636            neighbor_table.len(),
637            existing_count,
638            "conflicting existing entry in neighbor table"
639        );
640        (Self { neighbor_table, neighbors_controller }, neighbor_event_stream)
641    }
642
643    pub(crate) fn handle_neighbor_watcher_event<
644        S: Sender<<NetlinkRoute as ProtocolFamily>::Response>,
645    >(
646        &mut self,
647        event: fnet_neighbor_ext::Event,
648        clients: &ClientTable<NetlinkRoute, S>,
649    ) -> Result<(), HandleWatchEventError> {
650        let message_for_group = match event {
651            fnet_neighbor_ext::Event::Removed(entry) => {
652                match self.neighbor_table.remove(&(&entry).into()) {
653                    Some(_) => Ok(NetlinkNeighborMessage::optionally_from(entry)
654                        .map(NetlinkNeighborMessage::into_rtnl_del_neighbor)),
655                    None => Err(HandleWatchEventError::UnknownNeighborRemoved(entry)),
656                }
657            }
658            fnet_neighbor_ext::Event::Added(entry) => {
659                match self.neighbor_table.insert((&entry).into(), entry.clone()) {
660                    Some(existing) => Err(HandleWatchEventError::ConflictingNeighborAdded {
661                        existing,
662                        new: entry,
663                    }),
664                    None => Ok(NetlinkNeighborMessage::optionally_from(entry)
665                        .map(|n| n.into_rtnl_new_neighbor(UNSPECIFIED_SEQUENCE_NUMBER, false))),
666                }
667            }
668            fnet_neighbor_ext::Event::Changed(entry) => {
669                match self.neighbor_table.insert((&entry).into(), entry.clone()) {
670                    Some(_) => Ok(NetlinkNeighborMessage::optionally_from(entry)
671                        .map(|n| n.into_rtnl_new_neighbor(UNSPECIFIED_SEQUENCE_NUMBER, false))),
672                    None => Err(HandleWatchEventError::UnknownNeighborChanged(entry)),
673                }
674            }
675            e @ fnet_neighbor_ext::Event::Existing(_) | e @ fnet_neighbor_ext::Event::Idle => {
676                Err(HandleWatchEventError::UnexpectedEventReceived(e))
677            }
678        }?;
679        if let Some(message) = message_for_group {
680            clients.send_message_to_group(message, ModernGroup(rtnetlink_groups_RTNLGRP_NEIGH));
681        }
682        Ok(())
683    }
684
685    pub(crate) async fn handle_request<S: Sender<<NetlinkRoute as ProtocolFamily>::Response>>(
686        &mut self,
687        Request { args, mut client, sequence_number, completer }: Request<S>,
688        interface_lookup: &impl LookupIfInterfaceExists,
689    ) -> Option<PendingNeighborRequest<S>> {
690        enum HandleResult {
691            Done(Result<(), RequestError>),
692            Pending(PendingNeighborRequestArgs),
693        }
694        let result = match args {
695            NeighborRequestArgs::Get(args) => match args {
696                GetNeighborArgs::Dump { ip_version, interface } => {
697                    self.neighbor_table
698                        .values()
699                        .filter(|n| {
700                            ip_version.map_or(true, |ip_version| match n.neighbor {
701                                fnet::IpAddress::Ipv4(_) => ip_version == IpVersion::V4,
702                                fnet::IpAddress::Ipv6(_) => ip_version == IpVersion::V6,
703                            })
704                        })
705                        .filter(|n| interface.map_or(true, |i| n.interface == i))
706                        .filter_map(|e| NetlinkNeighborMessage::optionally_from(e.clone()))
707                        .for_each(|m| {
708                            client.send_unicast(m.into_rtnl_new_neighbor(sequence_number, true));
709                        });
710                    HandleResult::Done(Ok(()))
711                }
712                GetNeighborArgs::Get { ip, interface } => {
713                    let neighbor = self
714                        .neighbor_table
715                        .get(&NeighborKey { interface, neighbor: ip })
716                        .map(|e| NetlinkNeighborMessage::optionally_from(e.clone()))
717                        .flatten();
718                    match neighbor {
719                        Some(msg) => {
720                            client.send_unicast(msg.into_rtnl_new_neighbor(sequence_number, false));
721                            HandleResult::Done(Ok(()))
722                        }
723                        None => {
724                            let err = if interface_lookup.exists(interface) {
725                                RequestError::NeighborNotFound
726                            } else {
727                                RequestError::InterfaceNotFound
728                            };
729                            HandleResult::Done(Err(err))
730                        }
731                    }
732                }
733            },
734            NeighborRequestArgs::New(args) => match args {
735                args @ NewNeighborArgs::CreateStatic { ip, interface, mac } => {
736                    let response = self
737                        .neighbors_controller
738                        .add_entry(interface.get(), &ip, &mac)
739                        .await
740                        .expect("sent neighbor controller request");
741                    match response {
742                        Ok(_) => HandleResult::Pending(PendingNeighborRequestArgs::New(args)),
743                        Err(e) => HandleResult::Done(Err(e.into())),
744                    }
745                }
746                args @ NewNeighborArgs::ProbeExisting { ip, interface } => {
747                    let response = self
748                        .neighbors_controller
749                        .probe_entry(interface.get(), &ip)
750                        .await
751                        .expect("sent neighbor controller request");
752                    match response {
753                        Ok(_) => HandleResult::Pending(PendingNeighborRequestArgs::New(args)),
754                        Err(e) => HandleResult::Done(Err(e.into())),
755                    }
756                }
757            },
758            NeighborRequestArgs::Del(args @ DelNeighborArgs { interface, ip }) => {
759                let response = self
760                    .neighbors_controller
761                    .remove_entry(interface.get(), &ip)
762                    .await
763                    .expect("sent neighbor controller request");
764                match response {
765                    Ok(_) => HandleResult::Pending(PendingNeighborRequestArgs::Del(args)),
766                    Err(e) => HandleResult::Done(Err(e.into())),
767                }
768            }
769        };
770
771        match result {
772            HandleResult::Done(result) => {
773                log_debug!("handled request {args:?} from {client} with result = {result:?}");
774                respond_to_completer(client, completer, result, args);
775                None
776            }
777            HandleResult::Pending(request_args) => {
778                log_debug!("pending request {args:?} from {client}");
779                Some(PendingNeighborRequest { request_args, client, completer })
780            }
781        }
782    }
783
784    /// Checks whether a `PendingRequest` can be marked completed given the
785    /// current state of the worker. If so, notifies the request's completer and
786    /// returns `None`. If not, returns the `PendingRequest` as `Some`.
787    ///
788    /// TODO(https://fxbug.dev/488124265): Use synchronization primitives to
789    /// more robustly match requests to their corresponding watch events.
790    pub(crate) fn handle_pending_request<S: Sender<<NetlinkRoute as ProtocolFamily>::Response>>(
791        &self,
792        pending_neighbor_request: PendingNeighborRequest<S>,
793    ) -> Option<PendingNeighborRequest<S>> {
794        let PendingNeighborRequest { request_args, client: _, completer: _ } =
795            &pending_neighbor_request;
796
797        let done = match request_args {
798            PendingNeighborRequestArgs::New(NewNeighborArgs::ProbeExisting { ip, interface }) => {
799                // Assuming the `ProbeEntry` call succeeds, this is guaranteed
800                // to complete eventually, despite the fact that Netstack does
801                // not generate a `Changed` event if the neighbor is already in
802                // the expected state.
803                //
804                // If the neighbor was not in the expected state when Netstack
805                // processed the request, then a `Changed` event is generated,
806                // and since this method is called after each event that Netlink
807                // processes, this condition must eventually be true.
808                //
809                // If the neighbor *was* in the expected state when Netstack
810                // processed the request, then no `Changed` event is generated,
811                // but it's necessarily true that the immediately preceding
812                // `Added` or `Changed` event for the neighbor must contain the
813                // expected state.
814                //
815                // Here there are two cases to consider: Netlink has either
816                // already processed that event, or not.
817                //
818                // In the former case, the fact that there cannot have been
819                // intervening events means that this check will succeed on the
820                // call to this method that immediately follows the Controller
821                // request in the event loop.
822                //
823                // In the latter case, the event will be processed in a later
824                // iteration of the event loop, at which point this condition
825                // will become true.
826                self.neighbor_table
827                    .get(&NeighborKey { interface: *interface, neighbor: *ip })
828                    .map_or(false, |entry| entry.state == fnet_neighbor::EntryState::Probe)
829            }
830            PendingNeighborRequestArgs::New(NewNeighborArgs::CreateStatic {
831                ip,
832                interface,
833                mac,
834            }) => {
835                // It's also true here that Netstack does not generate an event
836                // if the neighbor is already in the expected state, but this is
837                // nevertheless guaranteed to complete eventually by the same
838                // logic as above.
839                self.neighbor_table
840                    .get(&NeighborKey { interface: *interface, neighbor: *ip })
841                    .map_or(false, |entry| {
842                        entry.mac.is_some_and(|m| m == *mac)
843                            && entry.state == fnet_neighbor::EntryState::Static
844                    })
845            }
846            PendingNeighborRequestArgs::Del(DelNeighborArgs { ip, interface }) => !self
847                .neighbor_table
848                .contains_key(&NeighborKey { interface: *interface, neighbor: *ip }),
849        };
850
851        if done {
852            log_debug!("completed pending request; req = {pending_neighbor_request:?}");
853            let PendingNeighborRequest { request_args, client, completer } =
854                pending_neighbor_request;
855
856            respond_to_completer(client, completer, Ok(()), request_args);
857            None
858        } else {
859            // Put the pending request back so that it can be handled later.
860            log_debug!("pending request not done yet; req = {pending_neighbor_request:?}");
861            Some(pending_neighbor_request)
862        }
863    }
864}
865
866#[cfg(test)]
867mod tests {
868    use crate::client::ClientTable;
869    use crate::client::testutil::{CLIENT_ID_1, CLIENT_ID_2, new_fake_client};
870    use crate::interfaces::testutil::FakeInterfacesHandler;
871    use crate::messaging::testutil::{FakeSender, SentMessage};
872    use crate::route_eventloop::{
873        EventLoopComponent, EventLoopInputs, EventLoopSpec, EventLoopState, IncludedWorkers,
874        Optional, Required, UnifiedRequest,
875    };
876
877    use super::*;
878
879    use assert_matches::assert_matches;
880    use fidl_fuchsia_net as fnet;
881    use fidl_fuchsia_net_interfaces as fnet_interfaces;
882    use fidl_fuchsia_net_neighbor::ViewRequest;
883    use fidl_fuchsia_net_neighbor_ext::testutil::EventSpec;
884    use fidl_fuchsia_net_root as fnet_root;
885    use futures::channel::mpsc;
886    use futures::{FutureExt as _, SinkExt as _, TryStreamExt as _, pin_mut};
887    use maplit::hashset;
888    use net_declare::{fidl_ip, std_ip_v4, std_ip_v6};
889    use netlink_packet_core::NetlinkPayload;
890    use netlink_packet_route::neighbour::{NeighbourAddress, NeighbourFlags};
891    use std::collections::HashSet;
892    use test_case::test_case;
893
894    fn valid_neighbor_entry() -> fnet_neighbor_ext::Entry {
895        fnet_neighbor_ext::Entry {
896            interface: NonZeroU64::new(1).unwrap(),
897            neighbor: fidl_ip!("192.168.0.1"),
898            state: fnet_neighbor::EntryState::Reachable,
899            mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
900            updated_at: 123456,
901        }
902    }
903
904    #[test]
905    fn netlink_neighbor_message_from_entry_invalid_iface_id() {
906        let entry = fnet_neighbor_ext::Entry {
907            interface: NonZeroU64::new(u64::MAX).unwrap(),
908            ..valid_neighbor_entry()
909        };
910
911        assert_eq!(
912            NetlinkNeighborMessage::try_from(entry),
913            Err(NetlinkNeighborMessageConversionError::InvalidInterfaceId(u64::MAX))
914        );
915    }
916
917    #[test]
918    fn netlink_neighbor_message_from_entry_valid_iface_id() {
919        assert_matches!(
920            NetlinkNeighborMessage::try_from(fnet_neighbor_ext::Entry {
921                interface: NonZeroU64::new(1).unwrap(),
922                ..valid_neighbor_entry()
923            }),
924            Ok(NetlinkNeighborMessage(NeighbourMessage {
925                header: NeighbourHeader { ifindex: 1, .. },
926                ..
927            }))
928        );
929    }
930
931    #[test_case(fnet_neighbor::EntryState::Delay, NeighbourState::Delay; "delay")]
932    #[test_case(fnet_neighbor::EntryState::Incomplete, NeighbourState::Incomplete; "incomplete")]
933    #[test_case(fnet_neighbor::EntryState::Probe, NeighbourState::Probe; "probe")]
934    #[test_case(fnet_neighbor::EntryState::Reachable, NeighbourState::Reachable; "reachable")]
935    #[test_case(fnet_neighbor::EntryState::Stale, NeighbourState::Stale; "stale")]
936    #[test_case(fnet_neighbor::EntryState::Static, NeighbourState::Permanent; "permanent")]
937    #[test_case(fnet_neighbor::EntryState::Unreachable, NeighbourState::Failed; "failed")]
938    fn netlink_neighbor_message_from_entry_state_converted(
939        fidl_state: fnet_neighbor::EntryState,
940        expected: NeighbourState,
941    ) {
942        assert_matches!(
943            NetlinkNeighborMessage::try_from(fnet_neighbor_ext::Entry {
944                state: fidl_state,
945                ..valid_neighbor_entry()
946            }),
947            Ok(NetlinkNeighborMessage(NeighbourMessage {
948                header: NeighbourHeader { state, .. },
949                ..
950            })) if state == expected
951        );
952    }
953
954    #[test]
955    fn netlink_neighbor_message_from_entry_ipv4() {
956        let fidl_entry = fnet_neighbor_ext::Entry {
957            neighbor: fidl_ip!("192.168.0.1"),
958            ..valid_neighbor_entry()
959        };
960        let NetlinkNeighborMessage(message) =
961            fidl_entry.try_into().expect("should be able to convert valid neighbor entry");
962
963        assert_eq!(message.header.family, AddressFamily::Inet);
964        let expected_address: NeighbourAddress = std_ip_v4!("192.168.0.1").into();
965        assert_matches!(
966            &message.attributes[..],
967            [
968                NeighbourAttribute::Destination(address),
969                NeighbourAttribute::LinkLocalAddress(_)
970            ] if *address == expected_address
971        );
972    }
973
974    #[test]
975    fn netlink_neighbor_message_from_entry_ipv6() {
976        let fidl_entry =
977            fnet_neighbor_ext::Entry { neighbor: fidl_ip!("fe80::1"), ..valid_neighbor_entry() };
978        let NetlinkNeighborMessage(message) =
979            fidl_entry.try_into().expect("should be able to convert valid neighbor entry");
980
981        assert_eq!(message.header.family, AddressFamily::Inet6);
982        let expected_address: NeighbourAddress = std_ip_v6!("fe80::1").into();
983        assert_matches!(
984            &message.attributes[..],
985            [
986                NeighbourAttribute::Destination(address),
987                NeighbourAttribute::LinkLocalAddress(_)
988            ] if *address == expected_address
989        );
990    }
991
992    #[test]
993    fn netlink_neighbor_message_from_entry_address_link_local_present() {
994        let fidl_entry = fnet_neighbor_ext::Entry {
995            mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
996            ..valid_neighbor_entry()
997        };
998        let NetlinkNeighborMessage(message) =
999            fidl_entry.try_into().expect("should be able to convert valid neighbor entry");
1000
1001        assert_matches!(
1002            &message.attributes[..],
1003            [
1004                NeighbourAttribute::Destination(_),
1005                NeighbourAttribute::LinkLocalAddress(addr)
1006            ] if addr == &[0, 1, 2, 3, 4, 5]
1007        );
1008    }
1009
1010    #[test]
1011    fn netlink_neighbor_message_from_entry_address_link_local_absent() {
1012        let fidl_entry = fnet_neighbor_ext::Entry { mac: None, ..valid_neighbor_entry() };
1013        let NetlinkNeighborMessage(message) =
1014            fidl_entry.try_into().expect("should be able to convert valid neighbor entry");
1015
1016        assert_matches!(&message.attributes[..], [NeighbourAttribute::Destination(_)]);
1017    }
1018
1019    #[test]
1020    fn netlink_neighbor_message_optionally_from_failure() {
1021        assert_eq!(
1022            NetlinkNeighborMessage::optionally_from(fnet_neighbor_ext::Entry {
1023                interface: NonZeroU64::new(u64::MAX).unwrap(),
1024                ..valid_neighbor_entry()
1025            }),
1026            None
1027        );
1028    }
1029
1030    #[test]
1031    fn netlink_neighbor_message_optionally_from_success() {
1032        let fidl_entry = fnet_neighbor_ext::Entry {
1033            interface: NonZeroU64::new(1).unwrap(),
1034            neighbor: fidl_ip!("192.168.0.1"),
1035            state: fnet_neighbor::EntryState::Reachable,
1036            mac: None,
1037            updated_at: 123456,
1038        };
1039
1040        let mut expected_message = NeighbourMessage::default();
1041        expected_message.header = NeighbourHeader {
1042            ifindex: 1,
1043            family: AddressFamily::Inet,
1044            state: NeighbourState::Reachable,
1045            flags: NeighbourFlags::empty(),
1046            kind: RouteType::Unicast,
1047        };
1048        expected_message.attributes =
1049            vec![NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into())];
1050
1051        assert_eq!(
1052            NetlinkNeighborMessage::optionally_from(fidl_entry),
1053            Some(NetlinkNeighborMessage(expected_message))
1054        );
1055    }
1056
1057    #[test]
1058    fn netlink_neighbor_message_into_rtnl_new_neighbor() {
1059        let message: NetlinkNeighborMessage = valid_neighbor_entry()
1060            .try_into()
1061            .expect("should be able to convert valid neighbor entry");
1062        let NetlinkNeighborMessage(payload) = &message;
1063
1064        let expected_payload =
1065            NetlinkPayload::InnerMessage(RouteNetlinkMessage::NewNeighbour(payload.clone()));
1066
1067        let result = message.clone().into_rtnl_new_neighbor(1, true);
1068        assert_eq!(result.payload, expected_payload);
1069        assert_eq!(result.header.sequence_number, 1);
1070        assert_eq!(result.header.flags & NLM_F_MULTIPART, NLM_F_MULTIPART);
1071
1072        let result = message.into_rtnl_new_neighbor(1, false);
1073        assert_eq!(result.payload, expected_payload);
1074        assert_ne!(result.header.flags & NLM_F_MULTIPART, NLM_F_MULTIPART);
1075    }
1076
1077    #[test]
1078    fn neighbor_keyed_by_interface_and_ip() {
1079        let entry = fnet_neighbor_ext::Entry {
1080            interface: NonZeroU64::new(1).unwrap(),
1081            neighbor: fidl_ip!("192.168.0.1"),
1082            mac: None,
1083            state: fnet_neighbor::EntryState::Reachable,
1084            updated_at: 123456,
1085        };
1086
1087        let same_iface_and_ip = fnet_neighbor_ext::Entry {
1088            mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
1089            state: fnet_neighbor::EntryState::Stale,
1090            updated_at: 654321,
1091            ..entry
1092        };
1093        assert_eq!(NeighborKey::from(&entry), NeighborKey::from(&same_iface_and_ip));
1094
1095        let different_iface =
1096            fnet_neighbor_ext::Entry { interface: NonZeroU64::new(2).unwrap(), ..entry };
1097        assert_ne!(NeighborKey::from(&entry), NeighborKey::from(&different_iface));
1098
1099        let different_ip = fnet_neighbor_ext::Entry { neighbor: fidl_ip!("192.168.0.2"), ..entry };
1100        assert_ne!(NeighborKey::from(&entry), NeighborKey::from(&different_ip));
1101
1102        let different_iface_and_ip = fnet_neighbor_ext::Entry {
1103            interface: NonZeroU64::new(2).unwrap(),
1104            neighbor: fidl_ip!("192.168.0.2"),
1105            ..entry
1106        };
1107        assert_ne!(NeighborKey::from(&entry), NeighborKey::from(&different_iface_and_ip));
1108    }
1109
1110    #[fuchsia::test]
1111    #[should_panic(expected = "determining existing neighbors should succeed")]
1112    async fn neighbors_worker_create_panics_on_view_protocol_error() {
1113        let (controller, _controller_server_end) =
1114            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1115        let (view, view_server_end) = fidl::endpoints::create_proxy::<fnet_neighbor::ViewMarker>();
1116        // Close the channel without responding.
1117        drop(view_server_end);
1118
1119        let (_worker, _remaining) = NeighborsWorker::create(&view, controller).await;
1120    }
1121
1122    #[fuchsia::test]
1123    #[should_panic(expected = "determining existing neighbors should succeed")]
1124    async fn neighbors_worker_create_panics_on_event_stream_error() {
1125        let (controller, _controller_server_end) =
1126            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1127        let (view, view_server_end) = fidl::endpoints::create_proxy::<fnet_neighbor::ViewMarker>();
1128        let mut view_request_stream = view_server_end.into_stream();
1129
1130        let entry_iter_fut = view_request_stream
1131            .next()
1132            .then(|req| {
1133                match req
1134                    .expect("View request_stream unexpectedly ended")
1135                    .expect("failed to receive `OpenEntryIterator` request")
1136                {
1137                    ViewRequest::OpenEntryIterator { it, .. } => {
1138                        // Close the channel without responding.
1139                        drop(it);
1140                        futures::future::ready(())
1141                    }
1142                }
1143            })
1144            .fuse();
1145
1146        let worker_fut = NeighborsWorker::create(&view, controller);
1147
1148        let ((), (_worker, _remaining)) = futures::join!(entry_iter_fut, worker_fut);
1149    }
1150
1151    #[fuchsia::test]
1152    #[should_panic(expected = "conflicting existing entry")]
1153    async fn neighbors_worker_create_panics_on_conflicting_entry() {
1154        let events: Vec<_> = [
1155            // Create two neighbors with the same `NeighborKey` but differing
1156            // fields; truly duplicate entries are ignored.
1157            fnet_neighbor_ext::Entry {
1158                state: fnet_neighbor::EntryState::Reachable,
1159                ..valid_neighbor_entry()
1160            },
1161            fnet_neighbor_ext::Entry {
1162                state: fnet_neighbor::EntryState::Stale,
1163                ..valid_neighbor_entry()
1164            },
1165        ]
1166        .into_iter()
1167        .map(Into::into)
1168        .map(fnet_neighbor::EntryIteratorItem::Existing)
1169        .chain(std::iter::once(fnet_neighbor::EntryIteratorItem::Idle(fnet_neighbor::IdleEvent)))
1170        .collect();
1171        let batches = vec![events];
1172        let (view, server_fut) =
1173            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(batches));
1174
1175        let (controller, _controller_server_end) =
1176            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1177        let worker_fut = NeighborsWorker::create(&view, controller);
1178
1179        let ((), (_worker, _remaining)) = futures::join!(server_fut, worker_fut);
1180    }
1181
1182    #[fuchsia::test]
1183    async fn neighbors_worker_create_success() {
1184        use fnet_neighbor_ext::testutil::EventSpec::*;
1185        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1186            Existing(1),
1187            Existing(2),
1188            Existing(3),
1189            Idle,
1190            Added(4),
1191        ]);
1192        let (view, server_fut) =
1193            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![
1194                events.clone(),
1195            ]));
1196
1197        let (controller, _controller_server_end) =
1198            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1199        let worker_fut = NeighborsWorker::create(&view, controller);
1200
1201        let ((), (worker, event_stream)) = futures::join!(server_fut, worker_fut);
1202
1203        let remaining_events: Vec<_> = event_stream.collect().await;
1204        assert_matches!(
1205            &remaining_events[..],
1206            [
1207                Ok(fnet_neighbor_ext::Event::Added(_)),
1208                Err(fnet_neighbor_ext::EntryIteratorError::Fidl(
1209                    fidl::Error::ClientChannelClosed { .. }
1210                ))
1211            ]
1212        );
1213
1214        for event in events {
1215            match event {
1216                fnet_neighbor::EntryIteratorItem::Existing(fidl_entry) => {
1217                    let entry: fnet_neighbor_ext::Entry = fidl_entry.try_into().unwrap();
1218                    assert_eq!(worker.neighbor_table.get(&(&entry).into()), Some(&entry));
1219                }
1220                _ => {}
1221            }
1222        }
1223    }
1224
1225    #[test_case(
1226        EventSpec::Added(2),
1227        |e| matches!(e, HandleWatchEventError::ConflictingNeighborAdded { .. });
1228        "conflicting added"
1229    )]
1230    #[test_case(
1231        EventSpec::Removed(4),
1232        |e| matches!(e, HandleWatchEventError::UnknownNeighborRemoved(_));
1233        "unknown removed"
1234    )]
1235    #[test_case(
1236        EventSpec::Changed(4),
1237        |e| matches!(e, HandleWatchEventError::UnknownNeighborChanged(_));
1238        "unknown changed"
1239    )]
1240    #[test_case(
1241        EventSpec::Existing(4),
1242        |e| matches!(e, HandleWatchEventError::UnexpectedEventReceived(_));
1243        "existing after initial collection"
1244    )]
1245    #[test_case(
1246        EventSpec::Idle,
1247        |e| matches!(e, HandleWatchEventError::UnexpectedEventReceived(_));
1248        "idle after initial collection"
1249    )]
1250    #[fuchsia::test]
1251    async fn neighbors_worker_handle_watch_event_failure(
1252        spec: EventSpec,
1253        error_matcher: fn(&HandleWatchEventError) -> bool,
1254    ) {
1255        use fnet_neighbor_ext::testutil::EventSpec::*;
1256        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1257            Existing(1),
1258            Existing(2),
1259            Existing(3),
1260            Idle,
1261            spec,
1262        ]);
1263        let (view, server_fut) =
1264            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![
1265                events.clone(),
1266            ]));
1267
1268        let (controller, _controller_server_end) =
1269            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1270        let worker_fut = NeighborsWorker::create(&view, controller);
1271
1272        let ((), (mut worker, event_stream)) = futures::join!(server_fut, worker_fut);
1273
1274        let client_table = ClientTable::<NetlinkRoute, FakeSender<_>>::default();
1275
1276        let remaining_events: Vec<_> = event_stream.collect().await;
1277        assert_eq!(remaining_events.len(), 2);
1278        match &remaining_events[0] {
1279            Ok(event) => {
1280                assert_matches!(
1281                    worker.handle_neighbor_watcher_event(event.clone(), &client_table),
1282                    Err(error) if error_matcher(&error)
1283                );
1284            }
1285            _ => panic!("expected bad event in stream"),
1286        }
1287        match &remaining_events[1] {
1288            Err(fnet_neighbor_ext::EntryIteratorError::Fidl(
1289                fidl::Error::ClientChannelClosed { .. },
1290            )) => {}
1291            _ => panic!("expected PEER_CLOSED error at end of stream"),
1292        }
1293    }
1294
1295    #[fuchsia::test]
1296    async fn neighbors_worker_handle_added_event() {
1297        use fnet_neighbor_ext::testutil::EventSpec::*;
1298        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1299            Existing(1),
1300            Existing(2),
1301            Existing(3),
1302            Idle,
1303            Added(4),
1304        ]);
1305        let (view, server_fut) =
1306            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![
1307                events.clone(),
1308            ]));
1309
1310        let (controller, _controller_server_end) =
1311            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1312        let worker_fut = NeighborsWorker::create(&view, controller);
1313
1314        let ((), (mut worker, event_stream)) = futures::join!(server_fut, worker_fut);
1315
1316        let client_table = ClientTable::<NetlinkRoute, FakeSender<_>>::default();
1317
1318        let remaining_events: Vec<_> = event_stream.collect().await;
1319        assert_eq!(remaining_events.len(), 2);
1320        match &remaining_events[0] {
1321            Ok(e @ fnet_neighbor_ext::Event::Added(entry)) => {
1322                let key = NeighborKey::from(entry);
1323                assert_eq!(worker.neighbor_table.get(&key), None);
1324                assert_matches!(
1325                    worker.handle_neighbor_watcher_event(e.clone(), &client_table),
1326                    Ok(_)
1327                );
1328                assert_eq!(worker.neighbor_table.get(&key), Some(entry));
1329            }
1330            _ => panic!("expected Added event in stream"),
1331        }
1332        match &remaining_events[1] {
1333            Err(fnet_neighbor_ext::EntryIteratorError::Fidl(
1334                fidl::Error::ClientChannelClosed { .. },
1335            )) => {}
1336            _ => panic!("expected PEER_CLOSED error at end of stream"),
1337        }
1338    }
1339
1340    #[fuchsia::test]
1341    async fn neighbors_worker_handle_removed_event() {
1342        use fnet_neighbor_ext::testutil::EventSpec::*;
1343        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1344            Existing(1),
1345            Existing(2),
1346            Existing(3),
1347            Idle,
1348            Removed(2),
1349        ]);
1350        let (view, server_fut) =
1351            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![
1352                events.clone(),
1353            ]));
1354
1355        let (controller, _controller_server_end) =
1356            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1357        let worker_fut = NeighborsWorker::create(&view, controller);
1358
1359        let ((), (mut worker, event_stream)) = futures::join!(server_fut, worker_fut);
1360
1361        let client_table = ClientTable::<NetlinkRoute, FakeSender<_>>::default();
1362
1363        let remaining_events: Vec<_> = event_stream.collect().await;
1364        assert_eq!(remaining_events.len(), 2);
1365        match &remaining_events[0] {
1366            Ok(e @ fnet_neighbor_ext::Event::Removed(entry)) => {
1367                let key = NeighborKey::from(entry);
1368                assert_eq!(worker.neighbor_table.get(&key), Some(entry));
1369                assert_matches!(
1370                    worker.handle_neighbor_watcher_event(e.clone(), &client_table),
1371                    Ok(_)
1372                );
1373                assert_eq!(worker.neighbor_table.get(&key), None);
1374            }
1375            _ => panic!("expected Removed event in stream"),
1376        }
1377        match &remaining_events[1] {
1378            Err(fnet_neighbor_ext::EntryIteratorError::Fidl(
1379                fidl::Error::ClientChannelClosed { .. },
1380            )) => {}
1381            _ => panic!("expected PEER_CLOSED error at end of stream"),
1382        }
1383    }
1384
1385    #[fuchsia::test]
1386    async fn neighbors_worker_handle_changed_event() {
1387        use fnet_neighbor_ext::testutil::EventSpec::*;
1388        let mut events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1389            Existing(1),
1390            Existing(2),
1391            Existing(3),
1392            Idle,
1393            Changed(2),
1394        ]);
1395        match &mut events[1] {
1396            fnet_neighbor::EntryIteratorItem::Existing(entry) => {
1397                entry.updated_at = Some(1234);
1398            }
1399            _ => panic!("expected Existing event in stream"),
1400        }
1401        match &mut events[4] {
1402            fnet_neighbor::EntryIteratorItem::Changed(entry) => {
1403                entry.updated_at = Some(5678);
1404            }
1405            _ => panic!("expected Changed event in stream"),
1406        }
1407
1408        let (view, server_fut) =
1409            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![
1410                events.clone(),
1411            ]));
1412
1413        let (controller, _controller_server_end) =
1414            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1415        let worker_fut = NeighborsWorker::create(&view, controller);
1416
1417        let ((), (mut worker, event_stream)) = futures::join!(server_fut, worker_fut);
1418
1419        let client_table = ClientTable::<NetlinkRoute, FakeSender<_>>::default();
1420
1421        let remaining_events: Vec<_> = event_stream.collect().await;
1422        assert_eq!(remaining_events.len(), 2);
1423        match &remaining_events[0] {
1424            Ok(e @ fnet_neighbor_ext::Event::Changed(entry)) => {
1425                let key = NeighborKey::from(entry);
1426                assert_matches!(
1427                    worker.neighbor_table.get(&key),
1428                    Some(fnet_neighbor_ext::Entry { updated_at: 1234, .. })
1429                );
1430                assert_matches!(
1431                    worker.handle_neighbor_watcher_event(e.clone(), &client_table),
1432                    Ok(_)
1433                );
1434                assert_matches!(
1435                    worker.neighbor_table.get(&key),
1436                    Some(fnet_neighbor_ext::Entry { updated_at: 5678, .. })
1437                );
1438            }
1439            _ => panic!("expected Changed event in stream"),
1440        }
1441        match &remaining_events[1] {
1442            Err(fnet_neighbor_ext::EntryIteratorError::Fidl(
1443                fidl::Error::ClientChannelClosed { .. },
1444            )) => {}
1445            _ => panic!("expected PEER_CLOSED error at end of stream"),
1446        }
1447    }
1448
1449    impl LookupIfInterfaceExists for HashSet<u64> {
1450        fn exists(&self, idx: NonZeroU64) -> bool {
1451            self.contains(&idx.get())
1452        }
1453    }
1454
1455    #[test_case(HashSet::new(), RequestError::InterfaceNotFound; "interface does not exist")]
1456    #[test_case(
1457        hashset!{1}, RequestError::NeighborNotFound;
1458        "interface exists"
1459    )]
1460    #[fuchsia::test]
1461    async fn neighbors_worker_handle_get_neighbor_not_found(
1462        interface_lookup: HashSet<u64>,
1463        expected_error: RequestError,
1464    ) {
1465        let (mut sender_sink, client, _async_work_drain_task) =
1466            new_fake_client(CLIENT_ID_1, vec![]);
1467        let (completer, completer_rcv) = oneshot::channel();
1468        let request = Request {
1469            args: NeighborRequestArgs::Get(GetNeighborArgs::Get {
1470                ip: fidl_ip!("192.168.0.1"),
1471                interface: NonZeroU64::new(1).unwrap(),
1472            }),
1473            sequence_number: 1,
1474            client,
1475            completer,
1476        };
1477
1478        let events: Vec<_> = [
1479            fnet_neighbor_ext::Entry {
1480                interface: NonZeroU64::new(2).unwrap(),
1481                neighbor: fidl_ip!("192.168.0.1"),
1482                ..valid_neighbor_entry()
1483            },
1484            fnet_neighbor_ext::Entry {
1485                interface: NonZeroU64::new(1).unwrap(),
1486                neighbor: fidl_ip!("fe80::2"),
1487                ..valid_neighbor_entry()
1488            },
1489        ]
1490        .into_iter()
1491        .map(Into::into)
1492        .map(fnet_neighbor::EntryIteratorItem::Existing)
1493        .chain(std::iter::once(fnet_neighbor::EntryIteratorItem::Idle(fnet_neighbor::IdleEvent)))
1494        .collect();
1495
1496        let batches = vec![events];
1497        let (view, server_fut) =
1498            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(batches));
1499
1500        let (controller, _controller_server_end) =
1501            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1502        let worker_fut = NeighborsWorker::create(&view, controller);
1503        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
1504
1505        assert_matches!(
1506            worker.handle_request(request, &interface_lookup).await,
1507            None // No pending work expected.
1508        );
1509
1510        let result = completer_rcv.await.expect("completer channel should not be closed");
1511        assert_matches!(result, Err(e) if e == expected_error);
1512        assert_eq!(&sender_sink.take_messages()[..], &[]);
1513    }
1514
1515    #[test_case(
1516        GetNeighborArgs::Dump{ ip_version: None, interface: None },
1517        &[1, 2, 3, 4];
1518        "dump all"
1519    )]
1520    #[test_case(
1521        GetNeighborArgs::Dump{ ip_version: Some(IpVersion::V4), interface: None },
1522        &[1, 3];
1523        "dump ipv4 only"
1524    )]
1525    #[test_case(
1526        GetNeighborArgs::Dump{ ip_version: Some(IpVersion::V6), interface: None },
1527        &[2, 4];
1528        "dump ipv6 only"
1529    )]
1530    #[test_case(
1531        GetNeighborArgs::Dump{
1532            ip_version: Some(IpVersion::V6),
1533            interface: Some(NonZeroU64::new(4).unwrap())
1534        },
1535        &[4];
1536        "dump interface 4 ipv6"
1537    )]
1538    #[test_case(
1539        GetNeighborArgs::Dump{
1540            ip_version: Some(IpVersion::V4),
1541            interface: Some(NonZeroU64::new(4).unwrap())
1542        },
1543        &[];
1544        "dump interface 4 ipv4"
1545    )]
1546    #[test_case(
1547        GetNeighborArgs::Get{ ip: fidl_ip!("192.168.0.1"), interface: NonZeroU64::new(1).unwrap() },
1548        &[1];
1549        "get ipv4"
1550    )]
1551    #[test_case(
1552        GetNeighborArgs::Get{ ip: fidl_ip!("fe80::2"), interface: NonZeroU64::new(2).unwrap() },
1553        &[2];
1554        "get ipv6"
1555    )]
1556    #[fuchsia::test]
1557    async fn neighbors_worker_handle_get_request(
1558        get_args: GetNeighborArgs,
1559        expected_ifindexes: &[u32],
1560    ) {
1561        let (mut sender_sink, client, _async_work_drain_task) =
1562            new_fake_client(CLIENT_ID_1, vec![]);
1563        let (completer, completer_rcv) = oneshot::channel();
1564        let request = Request {
1565            args: NeighborRequestArgs::Get(get_args),
1566            sequence_number: 1,
1567            client,
1568            completer,
1569        };
1570
1571        let events: Vec<_> = [
1572            fnet_neighbor_ext::Entry {
1573                interface: NonZeroU64::new(1).unwrap(),
1574                neighbor: fidl_ip!("192.168.0.1"),
1575                ..valid_neighbor_entry()
1576            },
1577            fnet_neighbor_ext::Entry {
1578                interface: NonZeroU64::new(2).unwrap(),
1579                neighbor: fidl_ip!("fe80::2"),
1580                ..valid_neighbor_entry()
1581            },
1582            fnet_neighbor_ext::Entry {
1583                interface: NonZeroU64::new(3).unwrap(),
1584                neighbor: fidl_ip!("192.168.0.3"),
1585                ..valid_neighbor_entry()
1586            },
1587            fnet_neighbor_ext::Entry {
1588                interface: NonZeroU64::new(4).unwrap(),
1589                neighbor: fidl_ip!("fe80::4"),
1590                ..valid_neighbor_entry()
1591            },
1592        ]
1593        .into_iter()
1594        .map(Into::into)
1595        .map(fnet_neighbor::EntryIteratorItem::Existing)
1596        .chain(std::iter::once(fnet_neighbor::EntryIteratorItem::Idle(fnet_neighbor::IdleEvent)))
1597        .collect();
1598
1599        let batches = vec![events];
1600        let (view, server_fut) =
1601            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(batches));
1602
1603        let (controller, _controller_server_end) =
1604            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1605        let worker_fut = NeighborsWorker::create(&view, controller);
1606        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
1607
1608        assert_matches!(
1609            worker.handle_request(request, &BTreeMap::new()).await,
1610            None // No pending work expected.
1611        );
1612
1613        completer_rcv
1614            .await
1615            .expect("completer channel should not be closed")
1616            .expect("request handling result should be OK");
1617
1618        let mut ifindexes_seen = Vec::new();
1619        for sent_message in sender_sink.take_messages() {
1620            match sent_message.message.payload {
1621                NetlinkPayload::InnerMessage(RouteNetlinkMessage::NewNeighbour(
1622                    NeighbourMessage { header: NeighbourHeader { ifindex, .. }, .. },
1623                )) => {
1624                    ifindexes_seen.push(ifindex);
1625                }
1626                _ => panic!("unexpected message sent"),
1627            }
1628        }
1629        ifindexes_seen.sort();
1630        assert_eq!(&ifindexes_seen[..], expected_ifindexes);
1631    }
1632
1633    #[fuchsia::test]
1634    async fn neighbors_worker_handle_del_request_controller_error() {
1635        let (_sender_sink, client, _async_work_drain_task) = new_fake_client(CLIENT_ID_1, vec![]);
1636        let (completer, completer_rcv) = oneshot::channel();
1637        let request = Request {
1638            args: NeighborRequestArgs::Del(DelNeighborArgs {
1639                ip: fidl_ip!("192.168.0.1"),
1640                interface: NonZeroU64::new(1).unwrap(),
1641            }),
1642            sequence_number: 1,
1643            client,
1644            completer,
1645        };
1646
1647        let events = {
1648            use fnet_neighbor_ext::testutil::EventSpec::*;
1649            fnet_neighbor_ext::testutil::generate_events_from_spec(&[Idle])
1650        };
1651        let (view, server_fut) =
1652            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![events]));
1653        let (controller, mut controller_request_stream) =
1654            fidl::endpoints::create_proxy_and_stream::<fnet_neighbor::ControllerMarker>();
1655        let worker_fut = NeighborsWorker::create(&view, controller);
1656        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
1657
1658        let interfaces = BTreeMap::new();
1659        let handle_request_fut = worker.handle_request(request, &interfaces);
1660        let controller_fut = async {
1661            match controller_request_stream
1662                .next()
1663                .await
1664                .expect("controller stream should not be closed")
1665                .expect("failed to receive controller request")
1666            {
1667                fnet_neighbor::ControllerRequest::RemoveEntry {
1668                    interface: _,
1669                    neighbor: _,
1670                    responder,
1671                } => {
1672                    responder
1673                        .send(Err(fnet_neighbor::ControllerError::InterfaceNotFound))
1674                        .expect("failed to send error response");
1675                }
1676                _ => panic!("unexpected controller request"),
1677            }
1678        };
1679
1680        let (handle_result, ()) = futures::join!(handle_request_fut, controller_fut);
1681        assert_matches!(handle_result, None);
1682
1683        let result = completer_rcv.await.expect("completer channel should not be closed");
1684        assert_matches!(result, Err(RequestError::InterfaceNotFound));
1685    }
1686
1687    #[fuchsia::test]
1688    async fn neighbors_worker_handle_del_request_success() {
1689        let (_sender_sink, client, _async_work_drain_task) = new_fake_client(CLIENT_ID_1, vec![]);
1690        let (completer, _completer_rcv) = oneshot::channel();
1691        let args =
1692            DelNeighborArgs { ip: fidl_ip!("192.168.0.1"), interface: NonZeroU64::new(1).unwrap() };
1693        let request =
1694            Request { args: NeighborRequestArgs::Del(args), sequence_number: 1, client, completer };
1695
1696        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1697            fnet_neighbor_ext::testutil::EventSpec::Idle,
1698        ]);
1699        let batches = vec![events];
1700        let (view, server_fut) =
1701            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(batches));
1702        let (controller, mut controller_request_stream) =
1703            fidl::endpoints::create_proxy_and_stream::<fnet_neighbor::ControllerMarker>();
1704        let worker_fut = NeighborsWorker::create(&view, controller);
1705        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
1706
1707        let interfaces = BTreeMap::new();
1708        let handle_request_fut = worker.handle_request(request, &interfaces);
1709        let controller_fut = async {
1710            match controller_request_stream
1711                .next()
1712                .await
1713                .expect("controller stream should not be closed")
1714                .expect("failed to receive controller request")
1715            {
1716                fnet_neighbor::ControllerRequest::RemoveEntry {
1717                    interface: _,
1718                    neighbor: _,
1719                    responder,
1720                } => {
1721                    responder.send(Ok(())).expect("failed to send success response");
1722                }
1723                _ => panic!("unexpected controller request"),
1724            }
1725        };
1726
1727        let (handle_result, ()) = futures::join!(handle_request_fut, controller_fut);
1728        let pending = handle_result.expect("expected pending request");
1729        assert_eq!(pending.request_args, PendingNeighborRequestArgs::Del(args));
1730    }
1731
1732    #[fuchsia::test]
1733    async fn neighbors_worker_handle_pending_del_request() {
1734        let (_sender_sink, client, _async_work_drain_task) = new_fake_client(CLIENT_ID_1, vec![]);
1735        let (completer, mut completer_rcv) = oneshot::channel();
1736        let args =
1737            DelNeighborArgs { ip: fidl_ip!("192.168.0.1"), interface: NonZeroU64::new(1).unwrap() };
1738        let pending = PendingNeighborRequest {
1739            request_args: PendingNeighborRequestArgs::Del(args),
1740            client,
1741            completer,
1742        };
1743
1744        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1745            fnet_neighbor_ext::testutil::EventSpec::Idle,
1746        ]);
1747        let batches = vec![events];
1748        let (view, server_fut) =
1749            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(batches));
1750        let (controller, _controller_server_end) =
1751            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1752        let worker_fut = NeighborsWorker::create(&view, controller);
1753        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
1754
1755        let key = NeighborKey { interface: args.interface, neighbor: args.ip };
1756        let _ = worker.neighbor_table.insert(
1757            key,
1758            fnet_neighbor_ext::Entry {
1759                interface: args.interface,
1760                neighbor: args.ip,
1761                ..valid_neighbor_entry()
1762            },
1763        );
1764
1765        // Still present, should remain pending.
1766        let pending = worker.handle_pending_request(pending).expect("expected pending");
1767        assert_matches!(completer_rcv.try_recv(), Ok(None)); // Completer still blocked.
1768
1769        // Remove from table, should complete.
1770        let _ = worker.neighbor_table.remove(&key);
1771        assert_matches!(worker.handle_pending_request(pending), None);
1772
1773        let result = completer_rcv.try_recv().expect("completer channel should not be closed");
1774        assert_matches!(result, Some(Ok(())));
1775    }
1776
1777    #[fuchsia::test]
1778    async fn neighbors_worker_handle_create_static_request_controller_error() {
1779        let (_sender_sink, client, _async_work_drain_task) = new_fake_client(CLIENT_ID_1, vec![]);
1780        let (completer, completer_rcv) = oneshot::channel();
1781        let args = NewNeighborArgs::CreateStatic {
1782            ip: fidl_ip!("192.168.0.1"),
1783            interface: NonZeroU64::new(1).unwrap(),
1784            mac: fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] },
1785        };
1786        let request =
1787            Request { args: NeighborRequestArgs::New(args), sequence_number: 1, client, completer };
1788
1789        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1790            fnet_neighbor_ext::testutil::EventSpec::Idle,
1791        ]);
1792        let batches = vec![events];
1793        let (view, server_fut) =
1794            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(batches));
1795        let (controller, mut controller_request_stream) =
1796            fidl::endpoints::create_proxy_and_stream::<fnet_neighbor::ControllerMarker>();
1797        let worker_fut = NeighborsWorker::create(&view, controller);
1798        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
1799
1800        let interfaces = BTreeMap::new();
1801        let handle_request_fut = worker.handle_request(request, &interfaces);
1802        let controller_fut = async {
1803            match controller_request_stream
1804                .next()
1805                .await
1806                .expect("controller stream should not be closed")
1807                .expect("failed to receive controller request")
1808            {
1809                fnet_neighbor::ControllerRequest::AddEntry {
1810                    interface: _,
1811                    neighbor: _,
1812                    mac: _,
1813                    responder,
1814                } => {
1815                    responder
1816                        .send(Err(fnet_neighbor::ControllerError::InterfaceNotFound))
1817                        .expect("failed to send error response");
1818                }
1819                _ => panic!("unexpected controller request"),
1820            }
1821        };
1822
1823        let (handle_result, ()) = futures::join!(handle_request_fut, controller_fut);
1824        assert_matches!(handle_result, None);
1825
1826        let result = completer_rcv.await.expect("completer channel should not be closed");
1827        assert_matches!(result, Err(RequestError::InterfaceNotFound));
1828    }
1829
1830    #[fuchsia::test]
1831    async fn neighbors_worker_handle_create_static_request_success() {
1832        let (_sender_sink, client, _async_work_drain_task) = new_fake_client(CLIENT_ID_1, vec![]);
1833        let (completer, _completer_rcv) = oneshot::channel();
1834        let args = NewNeighborArgs::CreateStatic {
1835            ip: fidl_ip!("192.168.0.1"),
1836            interface: NonZeroU64::new(1).unwrap(),
1837            mac: fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] },
1838        };
1839        let request =
1840            Request { args: NeighborRequestArgs::New(args), sequence_number: 1, client, completer };
1841
1842        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1843            fnet_neighbor_ext::testutil::EventSpec::Idle,
1844        ]);
1845        let (view, server_fut) =
1846            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![events]));
1847        let (controller, mut controller_request_stream) =
1848            fidl::endpoints::create_proxy_and_stream::<fnet_neighbor::ControllerMarker>();
1849        let worker_fut = NeighborsWorker::create(&view, controller);
1850        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
1851
1852        let interfaces = BTreeMap::new();
1853        let handle_request_fut = worker.handle_request(request, &interfaces);
1854        let controller_fut = async {
1855            match controller_request_stream
1856                .next()
1857                .await
1858                .expect("controller stream should not be closed")
1859                .expect("failed to receive controller request")
1860            {
1861                fnet_neighbor::ControllerRequest::AddEntry {
1862                    interface: _,
1863                    neighbor: _,
1864                    mac: _,
1865                    responder,
1866                } => {
1867                    responder.send(Ok(())).expect("failed to send success response");
1868                }
1869                _ => panic!("unexpected controller request"),
1870            }
1871        };
1872
1873        let (handle_result, ()) = futures::join!(handle_request_fut, controller_fut);
1874        let pending = handle_result.expect("expected pending request");
1875        assert_eq!(pending.request_args, PendingNeighborRequestArgs::New(args));
1876    }
1877
1878    #[fuchsia::test]
1879    async fn neighbors_worker_handle_probe_request_controller_error() {
1880        let (_sender_sink, client, _async_work_drain_task) = new_fake_client(CLIENT_ID_1, vec![]);
1881        let (completer, completer_rcv) = oneshot::channel();
1882        let args = NewNeighborArgs::ProbeExisting {
1883            ip: fidl_ip!("192.168.0.1"),
1884            interface: NonZeroU64::new(1).unwrap(),
1885        };
1886        let request =
1887            Request { args: NeighborRequestArgs::New(args), sequence_number: 1, client, completer };
1888
1889        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1890            fnet_neighbor_ext::testutil::EventSpec::Idle,
1891        ]);
1892        let (view, server_fut) =
1893            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![events]));
1894        let (controller, mut controller_request_stream) =
1895            fidl::endpoints::create_proxy_and_stream::<fnet_neighbor::ControllerMarker>();
1896        let worker_fut = NeighborsWorker::create(&view, controller);
1897        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
1898
1899        let interfaces = BTreeMap::new();
1900        let handle_request_fut = worker.handle_request(request, &interfaces);
1901        let controller_fut = async {
1902            match controller_request_stream
1903                .next()
1904                .await
1905                .expect("controller stream should not be closed")
1906                .expect("failed to receive controller request")
1907            {
1908                fnet_neighbor::ControllerRequest::ProbeEntry {
1909                    interface: _,
1910                    neighbor: _,
1911                    responder,
1912                } => {
1913                    responder
1914                        .send(Err(fnet_neighbor::ControllerError::InterfaceNotFound))
1915                        .expect("failed to send error response");
1916                }
1917                _ => panic!("unexpected controller request"),
1918            }
1919        };
1920
1921        let (handle_result, ()) = futures::join!(handle_request_fut, controller_fut);
1922        assert_matches!(handle_result, None);
1923
1924        let result = completer_rcv.await.expect("completer channel should not be closed");
1925        assert_matches!(result, Err(RequestError::InterfaceNotFound));
1926    }
1927
1928    #[fuchsia::test]
1929    async fn neighbors_worker_handle_probe_request_success() {
1930        let (_sender_sink, client, _async_work_drain_task) = new_fake_client(CLIENT_ID_1, vec![]);
1931        let (completer, _completer_rcv) = oneshot::channel();
1932        let args = NewNeighborArgs::ProbeExisting {
1933            ip: fidl_ip!("192.168.0.1"),
1934            interface: NonZeroU64::new(1).unwrap(),
1935        };
1936        let request =
1937            Request { args: NeighborRequestArgs::New(args), sequence_number: 1, client, completer };
1938
1939        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1940            fnet_neighbor_ext::testutil::EventSpec::Idle,
1941        ]);
1942        let (view, server_fut) =
1943            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![events]));
1944        let (controller, mut controller_request_stream) =
1945            fidl::endpoints::create_proxy_and_stream::<fnet_neighbor::ControllerMarker>();
1946        let worker_fut = NeighborsWorker::create(&view, controller);
1947        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
1948
1949        let interfaces = BTreeMap::new();
1950        let handle_request_fut = worker.handle_request(request, &interfaces);
1951        let controller_fut = async {
1952            match controller_request_stream
1953                .next()
1954                .await
1955                .expect("controller stream should not be closed")
1956                .expect("failed to receive controller request")
1957            {
1958                fnet_neighbor::ControllerRequest::ProbeEntry {
1959                    interface: _,
1960                    neighbor: _,
1961                    responder,
1962                } => {
1963                    responder.send(Ok(())).expect("failed to send success response");
1964                }
1965                _ => panic!("unexpected controller request"),
1966            }
1967        };
1968
1969        let (handle_result, ()) = futures::join!(handle_request_fut, controller_fut);
1970        let pending = handle_result.expect("expected pending request");
1971        assert_eq!(pending.request_args, PendingNeighborRequestArgs::New(args));
1972    }
1973
1974    #[fuchsia::test]
1975    async fn neighbors_worker_handle_pending_create_static_request() {
1976        let (_sender_sink, client, _async_work_drain_task) = new_fake_client(CLIENT_ID_1, vec![]);
1977        let (completer, mut completer_rcv) = oneshot::channel();
1978        let neighbor = valid_neighbor_entry();
1979        let args = NewNeighborArgs::CreateStatic {
1980            ip: neighbor.neighbor,
1981            interface: neighbor.interface,
1982            mac: neighbor.mac.unwrap(),
1983        };
1984        let pending = PendingNeighborRequest {
1985            request_args: PendingNeighborRequestArgs::New(args),
1986            client,
1987            completer,
1988        };
1989
1990        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
1991            fnet_neighbor_ext::testutil::EventSpec::Idle,
1992        ]);
1993        let (view, server_fut) =
1994            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![events]));
1995        let (controller, _controller_server_end) =
1996            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
1997        let worker_fut = NeighborsWorker::create(&view, controller);
1998        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
1999
2000        let key = NeighborKey { interface: neighbor.interface, neighbor: neighbor.neighbor };
2001
2002        // Not present, should remain pending.
2003        let pending = worker.handle_pending_request(pending).expect("expected pending");
2004        assert_matches!(completer_rcv.try_recv(), Ok(None));
2005
2006        // Insert with wrong MAC, should remain pending.
2007        let _ = worker.neighbor_table.insert(
2008            key,
2009            fnet_neighbor_ext::Entry {
2010                mac: Some(fnet::MacAddress { octets: [0, 0, 0, 0, 0, 0] }),
2011                state: fnet_neighbor::EntryState::Static,
2012                ..neighbor
2013            },
2014        );
2015        let pending = worker.handle_pending_request(pending).expect("expected pending");
2016        assert_matches!(completer_rcv.try_recv(), Ok(None));
2017
2018        // Insert with wrong state, should remain pending.
2019        let _ = worker.neighbor_table.insert(
2020            key,
2021            fnet_neighbor_ext::Entry {
2022                mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
2023                state: fnet_neighbor::EntryState::Reachable,
2024                ..neighbor
2025            },
2026        );
2027        let pending = worker.handle_pending_request(pending).expect("expected pending");
2028        assert_matches!(completer_rcv.try_recv(), Ok(None));
2029
2030        // Insert correct entry, should complete.
2031        let _ = worker.neighbor_table.insert(
2032            key,
2033            fnet_neighbor_ext::Entry {
2034                mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
2035                state: fnet_neighbor::EntryState::Static,
2036                ..neighbor
2037            },
2038        );
2039        assert_matches!(worker.handle_pending_request(pending), None);
2040
2041        let result = completer_rcv.try_recv().expect("completer channel should not be closed");
2042        assert_matches!(result, Some(Ok(())));
2043    }
2044
2045    #[fuchsia::test]
2046    async fn neighbors_worker_handle_pending_probe_request() {
2047        let (_sender_sink, client, _async_work_drain_task) = new_fake_client(CLIENT_ID_1, vec![]);
2048        let (completer, mut completer_rcv) = oneshot::channel();
2049        let neighbor = valid_neighbor_entry();
2050        let args =
2051            NewNeighborArgs::ProbeExisting { ip: neighbor.neighbor, interface: neighbor.interface };
2052        let pending = PendingNeighborRequest {
2053            request_args: PendingNeighborRequestArgs::New(args),
2054            client,
2055            completer,
2056        };
2057
2058        let events = fnet_neighbor_ext::testutil::generate_events_from_spec(&[
2059            fnet_neighbor_ext::testutil::EventSpec::Idle,
2060        ]);
2061        let (view, server_fut) =
2062            fnet_neighbor_ext::testutil::create_fake_view(futures::stream::iter(vec![events]));
2063        let (controller, _controller_server_end) =
2064            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
2065        let worker_fut = NeighborsWorker::create(&view, controller);
2066        let ((), (mut worker, _event_stream)) = futures::join!(server_fut, worker_fut);
2067
2068        let key = NeighborKey { interface: neighbor.interface, neighbor: neighbor.neighbor };
2069
2070        // Not present, should remain pending.
2071        let pending = worker.handle_pending_request(pending).expect("expected pending");
2072        assert_matches!(completer_rcv.try_recv(), Ok(None));
2073
2074        // Insert with wrong state, should remain pending.
2075        let _ = worker.neighbor_table.insert(
2076            key,
2077            fnet_neighbor_ext::Entry { state: fnet_neighbor::EntryState::Reachable, ..neighbor },
2078        );
2079        let pending = worker.handle_pending_request(pending).expect("expected pending");
2080        assert_matches!(completer_rcv.try_recv(), Ok(None));
2081
2082        // Insert correct entry, should complete.
2083        let _ = worker.neighbor_table.insert(
2084            key,
2085            fnet_neighbor_ext::Entry { state: fnet_neighbor::EntryState::Probe, ..neighbor },
2086        );
2087        assert_matches!(worker.handle_pending_request(pending), None);
2088
2089        let result = completer_rcv.try_recv().expect("completer channel should not be closed");
2090        assert_matches!(result, Some(Ok(())));
2091    }
2092
2093    #[test_case(
2094        false,
2095        NeighbourHeader {
2096            ifindex: 1,
2097            family: AddressFamily::Inet,
2098            ..Default::default()
2099        },
2100        &[
2101            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2102        ] => Ok(GetNeighborArgs::Get {
2103            ip: fidl_ip!("192.168.0.1"),
2104            interface: NonZeroU64::new(1).unwrap(),
2105        });
2106        "get ipv4 success"
2107    )]
2108    #[test_case(
2109        false,
2110        NeighbourHeader {
2111            ifindex: 1,
2112            family: AddressFamily::Inet6,
2113            ..Default::default()
2114        },
2115        &[
2116            NeighbourAttribute::Destination(std_ip_v6!("fe80::1").into()),
2117        ] => Ok(GetNeighborArgs::Get {
2118            ip: fidl_ip!("fe80::1"),
2119            interface: NonZeroU64::new(1).unwrap(),
2120        });
2121        "get ipv6 success"
2122    )]
2123    #[test_case(
2124        false,
2125        NeighbourHeader {
2126            ifindex: 1,
2127            family: AddressFamily::Inet,
2128            state: NeighbourState::Reachable,
2129            ..Default::default()
2130        },
2131        &[
2132            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2133        ] => Err(RequestError::InvalidState {
2134            actual: NeighbourState::Reachable, expected: NeighbourState::None
2135        });
2136        "get invalid request state"
2137    )]
2138    #[test_case(
2139        false,
2140        NeighbourHeader {
2141            ifindex: 1,
2142            family: AddressFamily::Inet,
2143            kind: RouteType::Broadcast,
2144            ..Default::default()
2145        },
2146        &[
2147            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2148        ] => Err(RequestError::InvalidKind(RouteType::Broadcast));
2149        "get invalid request kind"
2150    )]
2151    #[test_case(
2152        false,
2153        NeighbourHeader {
2154            ifindex: 1,
2155            family: AddressFamily::Inet,
2156            flags: NeighbourFlags::Router,
2157            ..Default::default()
2158        },
2159        &[
2160            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2161        ] => Err(RequestError::InvalidFlags(NeighbourFlags::Router));
2162        "get invalid request flag"
2163    )]
2164    #[test_case(
2165        false,
2166        NeighbourHeader {
2167            ifindex: 1,
2168            family: AddressFamily::Inet,
2169            flags: NeighbourFlags::Proxy,
2170            ..Default::default()
2171        },
2172        &[
2173            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2174        ] => Err(RequestError::UnsupportedFlags(NeighbourFlags::Proxy));
2175        "get unsupported request flag"
2176    )]
2177    #[test_case(
2178        false,
2179        NeighbourHeader {
2180            ifindex: 1,
2181            family: AddressFamily::Inet6,
2182            ..Default::default()
2183        },
2184        &[
2185            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2186        ] => Err(RequestError::AddressFamilyMismatch(AddressFamily::Inet6));
2187        "get address family mismatch"
2188    )]
2189    #[test_case(
2190        false,
2191        NeighbourHeader {
2192            family: AddressFamily::Inet,
2193            ..Default::default()
2194        },
2195        &[
2196            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2197        ] => Err(RequestError::MissingInterface);
2198        "get interface unspecified"
2199    )]
2200    #[test_case(
2201        false,
2202        NeighbourHeader {
2203            ifindex: 1,
2204            family: AddressFamily::Inet,
2205            ..Default::default()
2206        },
2207        &[] => Err(RequestError::MissingIpAddress);
2208        "get destination unspecified"
2209    )]
2210    #[test_case(
2211        false,
2212        NeighbourHeader {
2213            ifindex: 1,
2214            family: AddressFamily::Inet,
2215            ..Default::default()
2216        },
2217        &[
2218            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2219            NeighbourAttribute::LinkLocalAddress(vec![0, 1, 2, 3, 4, 5]),
2220        ] => Err(RequestError::InvalidAttribute);
2221        "get invalid attribute"
2222    )]
2223    #[test_case(
2224        true,
2225        NeighbourHeader::default(),
2226        &[] => Ok(GetNeighborArgs::Dump {
2227            ip_version: None,
2228            interface: None,
2229        });
2230        "dump all"
2231    )]
2232    #[test_case(
2233        true,
2234        NeighbourHeader {
2235            family: AddressFamily::Inet,
2236            ..Default::default()
2237        },
2238        &[] => Ok(GetNeighborArgs::Dump {
2239            ip_version: Some(IpVersion::V4),
2240            interface: None,
2241        });
2242        "dump ipv4 only"
2243    )]
2244    #[test_case(
2245        true,
2246        NeighbourHeader {
2247            family: AddressFamily::Inet6,
2248            ..Default::default()
2249        },
2250        &[] => Ok(GetNeighborArgs::Dump {
2251            ip_version: Some(IpVersion::V6),
2252            interface: None,
2253        });
2254        "dump ipv6 only"
2255    )]
2256    #[test_case(
2257        true,
2258        NeighbourHeader::default(),
2259        &[
2260            NeighbourAttribute::IfIndex(1),
2261        ] => Ok(GetNeighborArgs::Dump {
2262            ip_version: None,
2263            interface: Some(NonZeroU64::new(1).unwrap()),
2264        });
2265        "dump interface 1"
2266    )]
2267    #[test_case(
2268        true,
2269        NeighbourHeader::default(),
2270        &[
2271            NeighbourAttribute::IfIndex(0),
2272        ] => Ok(GetNeighborArgs::Dump {
2273            ip_version: None,
2274            interface: None,
2275        });
2276        "dump interface 0 treated as all interfaces"
2277    )]
2278    #[test_case(
2279        true,
2280        NeighbourHeader {
2281            family: AddressFamily::Local,
2282            ..Default::default()
2283        },
2284        &[] => Err(RequestError::InvalidAddressFamily(AddressFamily::Local));
2285        "dump invalid address family"
2286    )]
2287    #[test_case(
2288        true,
2289        NeighbourHeader::default(),
2290        &[
2291            NeighbourAttribute::LinkLocalAddress(vec![0, 1, 2, 3, 4, 5]),
2292        ] => Ok(GetNeighborArgs::Dump {
2293            ip_version: None,
2294            interface: None,
2295        });
2296        "dump unsupported attribute ignored (non-strict)"
2297    )]
2298    #[test_case(
2299        true,
2300        NeighbourHeader {
2301            flags: NeighbourFlags::Proxy,
2302            ..Default::default()
2303        },
2304        &[] => Err(RequestError::UnsupportedFlags(NeighbourFlags::Proxy));
2305        "dump unsupported request flag"
2306    )]
2307    #[fuchsia::test]
2308    fn get_neighbor_args_try_from_rtnl_neighbor(
2309        is_dump: bool,
2310        header: NeighbourHeader,
2311        attrs: &[NeighbourAttribute],
2312    ) -> Result<GetNeighborArgs, RequestError> {
2313        let mut message = NeighbourMessage::default();
2314        message.header = header;
2315        message.attributes = attrs.to_vec();
2316        GetNeighborArgs::try_from_rtnl_neighbor(&message, is_dump)
2317    }
2318
2319    #[test_case(
2320        NeighbourHeader {
2321            ifindex: 1,
2322            family: AddressFamily::Inet,
2323            ..Default::default()
2324        },
2325        &[
2326            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2327        ] => Ok(DelNeighborArgs {
2328            ip: fidl_ip!("192.168.0.1"),
2329            interface: NonZeroU64::new(1).unwrap(),
2330        });
2331        "del ipv4 success"
2332    )]
2333    #[test_case(
2334        NeighbourHeader {
2335            ifindex: 1,
2336            family: AddressFamily::Inet6,
2337            ..Default::default()
2338        },
2339        &[
2340            NeighbourAttribute::Destination(std_ip_v6!("fe80::1").into()),
2341        ] => Ok(DelNeighborArgs {
2342            ip: fidl_ip!("fe80::1"),
2343            interface: NonZeroU64::new(1).unwrap(),
2344        });
2345        "del ipv6 success"
2346    )]
2347    #[test_case(
2348        NeighbourHeader {
2349            ifindex: 1,
2350            family: AddressFamily::Inet,
2351            flags: NeighbourFlags::Proxy,
2352            ..Default::default()
2353        },
2354        &[
2355            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2356        ] => Err(RequestError::UnsupportedFlags(NeighbourFlags::Proxy));
2357        "del unsupported request flag"
2358    )]
2359    #[test_case(
2360        NeighbourHeader {
2361            family: AddressFamily::Inet,
2362            ..Default::default()
2363        },
2364        &[
2365            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2366        ] => Err(RequestError::MissingInterface);
2367        "del interface unspecified"
2368    )]
2369    #[test_case(
2370        NeighbourHeader {
2371            ifindex: 1,
2372            family: AddressFamily::Inet,
2373            ..Default::default()
2374        },
2375        &[] => Err(RequestError::MissingIpAddress);
2376        "del destination unspecified"
2377    )]
2378    #[fuchsia::test]
2379    fn del_neighbor_args_try_from_rtnl_neighbor(
2380        header: NeighbourHeader,
2381        attrs: &[NeighbourAttribute],
2382    ) -> Result<DelNeighborArgs, RequestError> {
2383        let mut message = NeighbourMessage::default();
2384        message.header = header;
2385        message.attributes = attrs.to_vec();
2386        DelNeighborArgs::try_from_rtnl_neighbor(&message)
2387    }
2388
2389    #[test_case(
2390        NLM_F_CREATE | NLM_F_REPLACE,
2391        NeighbourHeader {
2392            ifindex: 1,
2393            family: AddressFamily::Inet,
2394            state: NeighbourState::Permanent,
2395            ..Default::default()
2396        },
2397        &[
2398            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2399            NeighbourAttribute::LinkLocalAddress(vec![0, 1, 2, 3, 4, 5]),
2400        ] => Ok(NewNeighborArgs::CreateStatic {
2401            ip: fidl_ip!("192.168.0.1"),
2402            interface: NonZeroU64::new(1).unwrap(),
2403            mac: fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] },
2404        });
2405        "create static ipv4 success"
2406    )]
2407    #[test_case(
2408        NLM_F_CREATE | NLM_F_REPLACE,
2409        NeighbourHeader {
2410            ifindex: 1,
2411            family: AddressFamily::Inet6,
2412            state: NeighbourState::Permanent,
2413            ..Default::default()
2414        },
2415        &[
2416            NeighbourAttribute::Destination(std_ip_v6!("fe80::1").into()),
2417            NeighbourAttribute::LinkLocalAddress(vec![0, 1, 2, 3, 4, 5]),
2418        ] => Ok(NewNeighborArgs::CreateStatic {
2419            ip: fidl_ip!("fe80::1"),
2420            interface: NonZeroU64::new(1).unwrap(),
2421            mac: fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] },
2422        });
2423        "create static ipv6 success"
2424    )]
2425    #[test_case(
2426        NLM_F_REPLACE,
2427        NeighbourHeader {
2428            ifindex: 1,
2429            family: AddressFamily::Inet,
2430            state: NeighbourState::Probe,
2431            ..Default::default()
2432        },
2433        &[
2434            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2435        ] => Ok(NewNeighborArgs::ProbeExisting {
2436            ip: fidl_ip!("192.168.0.1"),
2437            interface: NonZeroU64::new(1).unwrap(),
2438        });
2439        "probe existing ipv4 success"
2440    )]
2441    #[test_case(
2442        NLM_F_REPLACE,
2443        NeighbourHeader {
2444            ifindex: 1,
2445            family: AddressFamily::Inet6,
2446            state: NeighbourState::Probe,
2447            ..Default::default()
2448        },
2449        &[
2450            NeighbourAttribute::Destination(std_ip_v6!("fe80::1").into()),
2451        ] => Ok(NewNeighborArgs::ProbeExisting {
2452            ip: fidl_ip!("fe80::1"),
2453            interface: NonZeroU64::new(1).unwrap(),
2454        });
2455        "probe existing ipv6 success"
2456    )]
2457    #[test_case(
2458        NLM_F_REPLACE,
2459        NeighbourHeader {
2460            ifindex: 1,
2461            family: AddressFamily::Inet,
2462            flags: NeighbourFlags::Proxy,
2463            ..Default::default()
2464        },
2465        &[
2466            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2467        ] => Err(RequestError::UnsupportedFlags(NeighbourFlags::Proxy));
2468        "unsupported proxy flag"
2469    )]
2470    #[test_case(
2471        NLM_F_CREATE | NLM_F_REPLACE,
2472        NeighbourHeader {
2473            ifindex: 1,
2474            family: AddressFamily::Inet,
2475            state: NeighbourState::Reachable,
2476            ..Default::default()
2477        },
2478        &[
2479            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2480            NeighbourAttribute::LinkLocalAddress(vec![0, 1, 2, 3, 4, 5]),
2481        ] => Err(RequestError::InvalidState {
2482            actual: NeighbourState::Reachable,
2483            expected: NeighbourState::Permanent,
2484        });
2485        "create invalid state"
2486    )]
2487    #[test_case(
2488        NLM_F_REPLACE,
2489        NeighbourHeader {
2490            ifindex: 1,
2491            family: AddressFamily::Inet,
2492            state: NeighbourState::Permanent,
2493            ..Default::default()
2494        },
2495        &[
2496            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2497        ] => Err(RequestError::InvalidState {
2498            actual: NeighbourState::Permanent,
2499            expected: NeighbourState::Probe,
2500        });
2501        "probe invalid state"
2502    )]
2503    #[test_case(
2504        NLM_F_CREATE | NLM_F_REPLACE,
2505        NeighbourHeader {
2506            ifindex: 1,
2507            family: AddressFamily::Inet,
2508            state: NeighbourState::Permanent,
2509            ..Default::default()
2510        },
2511        &[
2512            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2513        ] => Err(RequestError::MissingMacAddress);
2514        "create missing mac address"
2515    )]
2516    #[test_case(
2517        NLM_F_EXCL,
2518        NeighbourHeader {
2519            ifindex: 1,
2520            family: AddressFamily::Inet,
2521            ..Default::default()
2522        },
2523        &[
2524            NeighbourAttribute::Destination(std_ip_v4!("192.168.0.1").into()),
2525        ] => Err(RequestError::UnsupportedOperation);
2526        "unsupported operation flags"
2527    )]
2528    #[fuchsia::test]
2529    fn new_neighbor_args_try_from_rtnl_neighbor(
2530        netlink_flags: u16,
2531        header: NeighbourHeader,
2532        attrs: &[NeighbourAttribute],
2533    ) -> Result<NewNeighborArgs, RequestError> {
2534        let mut message = NeighbourMessage::default();
2535        message.header = header;
2536        message.attributes = attrs.to_vec();
2537        NewNeighborArgs::try_from_rtnl_neighbor(&message, netlink_flags)
2538    }
2539
2540    #[test_case(
2541        RequestError::InvalidState {
2542            actual: NeighbourState::Reachable, expected:NeighbourState::None
2543        } => Errno::EINVAL;
2544        "invalid state"
2545    )]
2546    #[test_case(RequestError::InvalidKind(RouteType::Broadcast) => Errno::EINVAL; "invalid kind")]
2547    #[test_case(
2548        RequestError::InvalidFlags(NeighbourFlags::Router) => Errno::EINVAL;
2549        "invalid flags"
2550    )]
2551    #[test_case(
2552        RequestError::UnsupportedFlags(NeighbourFlags::Proxy) => Errno::ENOTSUP;
2553        "unsupported flags"
2554    )]
2555    #[test_case(
2556        RequestError::InvalidAddressFamily(AddressFamily::Local) => Errno::EAFNOSUPPORT;
2557        "invalid address family"
2558    )]
2559    #[test_case(
2560        RequestError::AddressFamilyMismatch(AddressFamily::Inet6) => Errno::EINVAL;
2561        "address family mismatch"
2562    )]
2563    #[test_case(RequestError::MissingInterface => Errno::EINVAL; "interface unspecified")]
2564    #[test_case(RequestError::MissingIpAddress => Errno::EINVAL; "destination unspecified")]
2565    #[test_case(RequestError::InvalidAttribute => Errno::EINVAL; "invalid attribute")]
2566    #[test_case(RequestError::NeighborNotFound => Errno::ENOENT; "neighbor not found")]
2567    #[test_case(RequestError::InterfaceNotFound => Errno::ENODEV; "interface not found")]
2568    #[test_case(RequestError::InvalidIpAddress => Errno::EINVAL; "invalid IP address")]
2569    #[test_case(RequestError::InvalidMacAddress => Errno::EINVAL; "invalid MAC address")]
2570    #[test_case(RequestError::InterfaceUnsupported => Errno::ENOTSUP; "unsupported interface")]
2571    #[fuchsia::test]
2572    fn request_error_into_errno(error: RequestError) -> Errno {
2573        error.into()
2574    }
2575
2576    enum NeighborsAndInterfaces {}
2577    impl EventLoopSpec for NeighborsAndInterfaces {
2578        type NeighborWorker = Required;
2579
2580        type InterfacesProxy = Required;
2581        type InterfacesStateProxy = Required;
2582        type InterfacesHandler = Required;
2583        type RouteClients = Required;
2584        type InterfacesWorker = Required;
2585
2586        type V4RoutesState = Optional;
2587        type V6RoutesState = Optional;
2588        type V4RoutesSetProvider = Optional;
2589        type V6RoutesSetProvider = Optional;
2590        type V4RouteTableProvider = Optional;
2591        type V6RouteTableProvider = Optional;
2592
2593        type RoutesV4Worker = Optional;
2594        type RoutesV6Worker = Optional;
2595        type RuleV4Worker = Optional;
2596        type RuleV6Worker = Optional;
2597        type NduseroptWorker = Optional;
2598    }
2599
2600    const TEST_SEQUENCE_NUMBER: u32 = 1234;
2601
2602    struct EventLoopSetup {
2603        event_loop: EventLoopState<
2604            FakeInterfacesHandler,
2605            FakeSender<RouteNetlinkMessage>,
2606            NeighborsAndInterfaces,
2607        >,
2608        request_sink: mpsc::Sender<UnifiedRequest<FakeSender<RouteNetlinkMessage>>>,
2609        neighbors_controller_request_stream: fnet_neighbor::ControllerRequestStream,
2610        neighbor_event_sink: mpsc::UnboundedSender<Vec<fnet_neighbor::EntryIteratorItem>>,
2611        interface_event_sink: mpsc::UnboundedSender<fnet_interfaces::Event>,
2612    }
2613
2614    async fn build_event_loop(
2615        scope: &fuchsia_async::Scope,
2616        neighbor_events: &[EventSpec],
2617    ) -> EventLoopSetup {
2618        let included_workers = IncludedWorkers {
2619            routes_v4: EventLoopComponent::Absent(Optional),
2620            routes_v6: EventLoopComponent::Absent(Optional),
2621            interfaces: EventLoopComponent::Present(()),
2622            rules_v4: EventLoopComponent::Absent(Optional),
2623            rules_v6: EventLoopComponent::Absent(Optional),
2624            neighbors: EventLoopComponent::Present(()),
2625            nduseropt: EventLoopComponent::Absent(Optional),
2626        };
2627        let (request_sink, request_stream) = mpsc::channel(1);
2628
2629        // Configure fake neighbor watch events.
2630
2631        let (neighbors_controller, neighbors_controller_request_stream) =
2632            fidl::endpoints::create_proxy_and_stream::<fnet_neighbor::ControllerMarker>();
2633
2634        let (neighbors_view, neighbor_event_sink) = {
2635            let events = fnet_neighbor_ext::testutil::generate_events_from_spec(neighbor_events);
2636            let (event_sender, event_receiver) = mpsc::unbounded();
2637            event_sender.unbounded_send(events).expect("failed to send events");
2638            let (neighbors_view, neighbors_fut) =
2639                fnet_neighbor_ext::testutil::create_fake_view(event_receiver);
2640            let _join_handle = scope.spawn(neighbors_fut);
2641            (neighbors_view, event_sender)
2642        };
2643
2644        // Configure fake interface watch events.
2645
2646        let (interfaces_handler, _interfaces_handler_sink) =
2647            crate::interfaces::testutil::FakeInterfacesHandler::new();
2648        let (interfaces_proxy, _interfaces) =
2649            fidl::endpoints::create_proxy::<fnet_root::InterfacesMarker>();
2650        let (interfaces_state_proxy, interfaces_state) =
2651            fidl::endpoints::create_proxy::<fnet_interfaces::StateMarker>();
2652        let route_clients = ClientTable::default();
2653
2654        let interface_event_sink = {
2655            let if_stream = interfaces_state.into_stream();
2656            let if_watcher_stream = if_stream
2657                .and_then(|req| match req {
2658                    fnet_interfaces::StateRequest::GetWatcher {
2659                        options: _,
2660                        watcher,
2661                        control_handle: _,
2662                    } => futures::future::ready(Ok(watcher.into_stream())),
2663                })
2664                .try_flatten()
2665                .map(|res| res.expect("watcher stream error"));
2666            let (event_sender, event_receiver) = mpsc::unbounded();
2667            event_sender
2668                .unbounded_send(fnet_interfaces::Event::Idle(fnet_interfaces::Empty))
2669                .expect("failed to send event");
2670            let interfaces_fut =
2671                if_watcher_stream.zip(event_receiver).for_each(|(req, update)| async move {
2672                    match req {
2673                        fnet_interfaces::WatcherRequest::Watch { responder } => {
2674                            responder.send(&update).expect("send watch response")
2675                        }
2676                    }
2677                });
2678            let _join_handle = scope.spawn(interfaces_fut);
2679            event_sender
2680        };
2681
2682        // Set up the event loop.
2683
2684        let (_async_work_sink, async_work_receiver) = mpsc::unbounded();
2685        let base_inputs: EventLoopInputs<
2686            FakeInterfacesHandler,
2687            FakeSender<RouteNetlinkMessage>,
2688            NeighborsAndInterfaces,
2689        > = EventLoopInputs {
2690            neighbors_view: EventLoopComponent::Present(neighbors_view),
2691            neighbors_controller: EventLoopComponent::Present(neighbors_controller),
2692
2693            route_clients: EventLoopComponent::Present(route_clients),
2694            interfaces_handler: EventLoopComponent::Present(interfaces_handler),
2695            interfaces_proxy: EventLoopComponent::Present(interfaces_proxy),
2696            interfaces_state_proxy: EventLoopComponent::Present(interfaces_state_proxy),
2697
2698            async_work_receiver,
2699
2700            v4_routes_state: EventLoopComponent::Absent(Optional),
2701            v6_routes_state: EventLoopComponent::Absent(Optional),
2702            v4_main_route_table: EventLoopComponent::Absent(Optional),
2703            v6_main_route_table: EventLoopComponent::Absent(Optional),
2704            v4_route_table_provider: EventLoopComponent::Absent(Optional),
2705            v6_route_table_provider: EventLoopComponent::Absent(Optional),
2706            v4_rule_table: EventLoopComponent::Absent(Optional),
2707            v6_rule_table: EventLoopComponent::Absent(Optional),
2708            ndp_option_watcher_provider: EventLoopComponent::Absent(Optional),
2709
2710            unified_request_stream: request_stream,
2711        };
2712
2713        let event_loop = base_inputs.initialize(included_workers).await;
2714        EventLoopSetup {
2715            event_loop,
2716            request_sink,
2717            neighbors_controller_request_stream,
2718            neighbor_event_sink,
2719            interface_event_sink,
2720        }
2721    }
2722
2723    #[fuchsia::test]
2724    async fn event_loop_with_watch_events_and_get_request() {
2725        let scope = fuchsia_async::Scope::new();
2726        use fnet_neighbor_ext::testutil::EventSpec::*;
2727        let EventLoopSetup {
2728            mut event_loop,
2729            mut request_sink,
2730            neighbors_controller_request_stream: _,
2731            neighbor_event_sink,
2732            interface_event_sink,
2733        } = build_event_loop(&scope, &[Existing(1), Existing(2), Existing(3), Idle, Added(4)])
2734            .await;
2735
2736        // Wait for `Added` event to be processed.
2737        event_loop.run_one_step_in_tests().await;
2738
2739        // Send a dump request and check the response.
2740
2741        let (mut response_sink, neighbor_client, async_work_drain_task) =
2742            crate::client::testutil::new_fake_client::<NetlinkRoute>(
2743                crate::client::testutil::CLIENT_ID_1,
2744                [],
2745            );
2746        let _join_handle = scope.spawn(async_work_drain_task);
2747
2748        let (completer, waiter) = oneshot::channel();
2749        let get_request: UnifiedRequest<FakeSender<RouteNetlinkMessage>> =
2750            UnifiedRequest::NeighborsRequest(Request {
2751                args: NeighborRequestArgs::Get(GetNeighborArgs::Dump {
2752                    ip_version: None,
2753                    interface: None,
2754                }),
2755                sequence_number: TEST_SEQUENCE_NUMBER,
2756                client: neighbor_client.clone(),
2757                completer,
2758            });
2759        request_sink.send(get_request).await.unwrap();
2760
2761        // Wait for client request to be processed.
2762        event_loop.run_one_step_in_tests().await;
2763        assert_matches!(waiter.await.unwrap(), Ok(()));
2764
2765        let responses = response_sink.take_messages();
2766        assert_eq!(responses.len(), 4); // 3 existing + 1 added.
2767        for response in responses {
2768            assert_matches!(
2769                response.message.payload,
2770                NetlinkPayload::InnerMessage(RouteNetlinkMessage::NewNeighbour(_))
2771            );
2772        }
2773
2774        neighbor_event_sink.close_channel();
2775        interface_event_sink.close_channel();
2776        drop(neighbor_client);
2777        scope.join().await;
2778    }
2779
2780    #[fuchsia::test]
2781    async fn event_loop_with_watch_events_and_delete_request() {
2782        let scope = fuchsia_async::Scope::new();
2783        let neighbor_events = {
2784            use fnet_neighbor_ext::testutil::EventSpec::*;
2785            vec![Existing(1), Existing(2), Idle]
2786        };
2787        let EventLoopSetup {
2788            mut event_loop,
2789            mut request_sink,
2790            neighbors_controller_request_stream,
2791            neighbor_event_sink,
2792            interface_event_sink,
2793        } = build_event_loop(&scope, &neighbor_events).await;
2794
2795        let fnet_neighbor::EntryIteratorItem::Existing(to_delete) =
2796            fnet_neighbor_ext::testutil::generate_event_from_spec(&neighbor_events[0])
2797        else {
2798            panic!("unexpected event")
2799        };
2800        let to_delete: fnet_neighbor_ext::Entry =
2801            to_delete.try_into().expect("extension conversion failed");
2802
2803        let (mut response_sink, neighbor_client, async_work_drain_task) =
2804            crate::client::testutil::new_fake_client::<NetlinkRoute>(
2805                crate::client::testutil::CLIENT_ID_1,
2806                [],
2807            );
2808        let _join_handle = scope.spawn(async_work_drain_task);
2809
2810        // Send an RTM_DELNEIGH request for an existing neighbor.
2811
2812        let (completer, waiter) = oneshot::channel();
2813        let waiter = waiter.fuse();
2814        pin_mut!(waiter);
2815
2816        let del_request: UnifiedRequest<FakeSender<RouteNetlinkMessage>> =
2817            UnifiedRequest::NeighborsRequest(Request {
2818                args: NeighborRequestArgs::Del(DelNeighborArgs {
2819                    ip: to_delete.neighbor,
2820                    interface: to_delete.interface,
2821                }),
2822                sequence_number: TEST_SEQUENCE_NUMBER,
2823                client: neighbor_client.clone(),
2824                completer,
2825            });
2826        request_sink.send(del_request).await.expect("failed to send delete request");
2827
2828        // Handle the expected Controller.RemoveEntry request and verify that
2829        // the RTM_DELNEIGH request is still pending.
2830
2831        let controller_req_fut = neighbors_controller_request_stream
2832            .into_future()
2833            .then(async move |(req, _rest)| {
2834                match req
2835                    .expect("Controller request stream unexpectedly ended")
2836                    .expect("failed to receive `Controller` request")
2837                {
2838                    fnet_neighbor::ControllerRequest::RemoveEntry {
2839                        interface,
2840                        neighbor,
2841                        responder,
2842                        ..
2843                    } => {
2844                        assert_eq!(interface, to_delete.interface.get());
2845                        assert_eq!(neighbor, to_delete.neighbor);
2846                        responder.send(Ok(())).expect("failed to respond to RemoveEntry");
2847                    }
2848                    _ => panic!("unexpected controller request"),
2849                }
2850            })
2851            .fuse();
2852        let _join_handle = scope.spawn(controller_req_fut);
2853        event_loop.run_one_step_in_tests().await;
2854
2855        assert_matches!(waiter.as_mut().now_or_never(), None);
2856        assert_eq!(response_sink.take_messages().len(), 0);
2857
2858        // Send an unrelated neighbor watch event and verify that the request is
2859        // still pending.
2860
2861        {
2862            use fnet_neighbor_ext::testutil::EventSpec::*;
2863            neighbor_event_sink
2864                .unbounded_send(fnet_neighbor_ext::testutil::generate_events_from_spec(&[Added(3)]))
2865                .expect("failed to send event");
2866        }
2867
2868        event_loop.run_one_step_in_tests().await;
2869        assert_matches!(waiter.as_mut().now_or_never(), None);
2870        assert_eq!(response_sink.take_messages().len(), 0);
2871
2872        // Send a neighbor watch event indicating successful removal and verify
2873        // that the request is completed.
2874
2875        {
2876            use fnet_neighbor_ext::testutil::EventSpec::*;
2877            neighbor_event_sink
2878                .unbounded_send(fnet_neighbor_ext::testutil::generate_events_from_spec(&[Removed(
2879                    1,
2880                )]))
2881                .expect("failed to send event");
2882        }
2883
2884        event_loop.run_one_step_in_tests().await;
2885        assert_matches!(waiter.await.expect("waiter channel should not be closed"), Ok(()));
2886        // The event loop & worker aren't responsible for the final response to
2887        // the client (either none, or ACK if requested), so there's nothing
2888        // more to check here.
2889
2890        neighbor_event_sink.close_channel();
2891        interface_event_sink.close_channel();
2892        drop(neighbor_client);
2893        scope.join().await;
2894    }
2895
2896    #[fuchsia::test]
2897    async fn event_loop_with_watch_events_create_static_neighbor() {
2898        let scope = fuchsia_async::Scope::new();
2899        let neighbor_events = vec![fnet_neighbor_ext::testutil::EventSpec::Idle];
2900        let EventLoopSetup {
2901            mut event_loop,
2902            mut request_sink,
2903            neighbors_controller_request_stream,
2904            neighbor_event_sink,
2905            interface_event_sink,
2906        } = build_event_loop(&scope, &neighbor_events).await;
2907
2908        let (mut response_sink, neighbor_client, async_work_drain_task) =
2909            crate::client::testutil::new_fake_client::<NetlinkRoute>(
2910                crate::client::testutil::CLIENT_ID_1,
2911                [],
2912            );
2913        let _join_handle = scope.spawn(async_work_drain_task);
2914
2915        // Create expected entry but without sending neighbor event.
2916
2917        let (event, entry) = {
2918            use fnet_neighbor_ext::testutil::EventSpec::*;
2919            let mut added = fnet_neighbor_ext::testutil::generate_event_from_spec(&Added(1));
2920            let fnet_neighbor::EntryIteratorItem::Added(to_add) = &mut added else {
2921                panic!("unexpected event")
2922            };
2923            to_add.state = Some(fnet_neighbor::EntryState::Static);
2924            let entry: fnet_neighbor_ext::Entry =
2925                to_add.clone().try_into().expect("extension conversion failed");
2926            (added, entry)
2927        };
2928
2929        // Send an RTM_NEWNEIGH request for an existing neighbor.
2930
2931        let (completer, waiter) = oneshot::channel();
2932        let waiter = waiter.fuse();
2933        pin_mut!(waiter);
2934
2935        let create_request: UnifiedRequest<FakeSender<RouteNetlinkMessage>> =
2936            UnifiedRequest::NeighborsRequest(Request {
2937                args: NeighborRequestArgs::New(NewNeighborArgs::CreateStatic {
2938                    ip: entry.neighbor,
2939                    interface: entry.interface,
2940                    mac: entry.mac.unwrap(),
2941                }),
2942                sequence_number: TEST_SEQUENCE_NUMBER,
2943                client: neighbor_client.clone(),
2944                completer,
2945            });
2946        request_sink.send(create_request).await.expect("failed to send create request");
2947
2948        // Handle the expected Controller.AddEntry request and verify that the
2949        // RTM_NEWNEIGH request is still pending.
2950
2951        let controller_req_fut = neighbors_controller_request_stream
2952            .into_future()
2953            .then(async move |(req, _rest)| {
2954                match req
2955                    .expect("Controller request stream unexpectedly ended")
2956                    .expect("failed to receive `Controller` request")
2957                {
2958                    fnet_neighbor::ControllerRequest::AddEntry {
2959                        interface,
2960                        neighbor,
2961                        mac,
2962                        responder,
2963                    } => {
2964                        assert_eq!(interface, entry.interface.get());
2965                        assert_eq!(neighbor, entry.neighbor);
2966                        assert_eq!(mac, entry.mac.unwrap());
2967                        responder.send(Ok(())).expect("failed to respond to AddEntry");
2968                    }
2969                    _ => panic!("unexpected controller request"),
2970                }
2971            })
2972            .fuse();
2973        let _join_handle = scope.spawn(controller_req_fut);
2974
2975        event_loop.run_one_step_in_tests().await;
2976        assert_matches!(waiter.as_mut().now_or_never(), None);
2977        assert_eq!(response_sink.take_messages().len(), 0);
2978
2979        // Send an unrelated neighbor watch event and verify that the request is
2980        // still pending.
2981
2982        {
2983            use fnet_neighbor_ext::testutil::EventSpec::*;
2984            neighbor_event_sink
2985                .unbounded_send(fnet_neighbor_ext::testutil::generate_events_from_spec(&[Added(3)]))
2986                .expect("failed to send event");
2987        }
2988
2989        event_loop.run_one_step_in_tests().await;
2990        assert_matches!(waiter.as_mut().now_or_never(), None);
2991        assert_eq!(response_sink.take_messages().len(), 0);
2992
2993        // Send a neighbor watch event indicating successful creation and verify
2994        // that the request is completed.
2995
2996        neighbor_event_sink.unbounded_send(vec![event]).expect("failed to send event");
2997
2998        event_loop.run_one_step_in_tests().await;
2999        assert_matches!(waiter.await.expect("waiter channel should not be closed"), Ok(()));
3000        // The event loop & worker aren't responsible for the final response to
3001        // the client (either none, or ACK if requested), so there's nothing
3002        // more to check here.
3003
3004        neighbor_event_sink.close_channel();
3005        interface_event_sink.close_channel();
3006        drop(neighbor_client);
3007        scope.join().await;
3008    }
3009
3010    #[fuchsia::test]
3011    async fn event_loop_with_watch_events_create_already_existing_succeeds_without_event() {
3012        let scope = fuchsia_async::Scope::new();
3013        let neighbor_events = vec![fnet_neighbor_ext::testutil::EventSpec::Idle];
3014        let EventLoopSetup {
3015            mut event_loop,
3016            mut request_sink,
3017            neighbors_controller_request_stream,
3018            neighbor_event_sink,
3019            interface_event_sink,
3020        } = build_event_loop(&scope, &neighbor_events).await;
3021
3022        let (mut response_sink, neighbor_client, async_work_drain_task) =
3023            crate::client::testutil::new_fake_client::<NetlinkRoute>(
3024                crate::client::testutil::CLIENT_ID_1,
3025                [],
3026            );
3027        let _join_handle = scope.spawn(async_work_drain_task);
3028
3029        // Report existing static entry.
3030
3031        let (event, entry) = {
3032            use fnet_neighbor_ext::testutil::EventSpec::*;
3033            let mut added = fnet_neighbor_ext::testutil::generate_event_from_spec(&Added(1));
3034            let fnet_neighbor::EntryIteratorItem::Added(to_add) = &mut added else {
3035                panic!("unexpected event")
3036            };
3037            to_add.state = Some(fnet_neighbor::EntryState::Static);
3038            let entry: fnet_neighbor_ext::Entry =
3039                to_add.clone().try_into().expect("extension conversion failed");
3040            (added, entry)
3041        };
3042        neighbor_event_sink.unbounded_send(vec![event]).expect("failed to send event");
3043        event_loop.run_one_step_in_tests().await;
3044
3045        // Send an RTM_NEWNEIGH request for an existing neighbor.
3046
3047        let (completer, waiter) = oneshot::channel();
3048        let waiter = waiter.fuse();
3049        pin_mut!(waiter);
3050
3051        let create_request: UnifiedRequest<FakeSender<RouteNetlinkMessage>> =
3052            UnifiedRequest::NeighborsRequest(Request {
3053                args: NeighborRequestArgs::New(NewNeighborArgs::CreateStatic {
3054                    ip: entry.neighbor,
3055                    interface: entry.interface,
3056                    mac: entry.mac.unwrap(),
3057                }),
3058                sequence_number: TEST_SEQUENCE_NUMBER,
3059                client: neighbor_client.clone(),
3060                completer,
3061            });
3062        request_sink.send(create_request).await.expect("failed to send create request");
3063
3064        // Handle the expected Controller.AddEntry request and verify that the
3065        // RTM_NEWNEIGH request is not pending.
3066
3067        let controller_req_fut = neighbors_controller_request_stream
3068            .into_future()
3069            .then(async move |(req, _rest)| {
3070                match req
3071                    .expect("Controller request stream unexpectedly ended")
3072                    .expect("failed to receive `Controller` request")
3073                {
3074                    fnet_neighbor::ControllerRequest::AddEntry {
3075                        interface,
3076                        neighbor,
3077                        mac,
3078                        responder,
3079                    } => {
3080                        assert_eq!(interface, entry.interface.get());
3081                        assert_eq!(neighbor, entry.neighbor);
3082                        assert_eq!(mac, entry.mac.unwrap());
3083                        responder.send(Ok(())).expect("failed to respond to AddEntry");
3084                    }
3085                    _ => panic!("unexpected controller request"),
3086                }
3087            })
3088            .fuse();
3089        let _join_handle = scope.spawn(controller_req_fut);
3090
3091        event_loop.run_one_step_in_tests().await;
3092        assert_matches!(waiter.await.expect("waiter channel should not be closed"), Ok(()));
3093        assert_eq!(response_sink.take_messages().len(), 0);
3094        // The event loop & worker aren't responsible for the final response to
3095        // the client (either none, or ACK if requested), so there's nothing
3096        // more to check here.
3097
3098        neighbor_event_sink.close_channel();
3099        interface_event_sink.close_channel();
3100        drop(neighbor_client);
3101        scope.join().await;
3102    }
3103
3104    #[fuchsia::test]
3105    async fn event_loop_with_watch_events_probe_neighbor() {
3106        let scope = fuchsia_async::Scope::new();
3107        let neighbor_events = vec![fnet_neighbor_ext::testutil::EventSpec::Idle];
3108        let EventLoopSetup {
3109            mut event_loop,
3110            mut request_sink,
3111            neighbors_controller_request_stream,
3112            neighbor_event_sink,
3113            interface_event_sink,
3114        } = build_event_loop(&scope, &neighbor_events).await;
3115
3116        let (mut response_sink, neighbor_client, async_work_drain_task) =
3117            crate::client::testutil::new_fake_client::<NetlinkRoute>(
3118                crate::client::testutil::CLIENT_ID_1,
3119                [],
3120            );
3121        let _join_handle = scope.spawn(async_work_drain_task);
3122
3123        // Report existing entry not in `Probe` state.
3124
3125        let (event, entry) = {
3126            use fnet_neighbor_ext::testutil::EventSpec::*;
3127            let mut added = fnet_neighbor_ext::testutil::generate_event_from_spec(&Added(1));
3128            let fnet_neighbor::EntryIteratorItem::Added(to_add) = &mut added else {
3129                panic!("unexpected event")
3130            };
3131            to_add.state = Some(fnet_neighbor::EntryState::Reachable);
3132            let entry: fnet_neighbor_ext::Entry =
3133                to_add.clone().try_into().expect("extension conversion failed");
3134            (added, entry)
3135        };
3136        neighbor_event_sink.unbounded_send(vec![event]).expect("failed to send event");
3137        event_loop.run_one_step_in_tests().await;
3138
3139        // Send an RTM_NEWNEIGH request to probe the existing neighbor.
3140
3141        let (completer, waiter) = oneshot::channel();
3142        let waiter = waiter.fuse();
3143        pin_mut!(waiter);
3144
3145        let probe_request: UnifiedRequest<FakeSender<RouteNetlinkMessage>> =
3146            UnifiedRequest::NeighborsRequest(Request {
3147                args: NeighborRequestArgs::New(NewNeighborArgs::ProbeExisting {
3148                    ip: entry.neighbor,
3149                    interface: entry.interface,
3150                }),
3151                sequence_number: TEST_SEQUENCE_NUMBER,
3152                client: neighbor_client.clone(),
3153                completer,
3154            });
3155        request_sink.send(probe_request).await.expect("failed to send create request");
3156
3157        // Handle the expected Controller.ProbeEntry request and verify that the
3158        // RTM_NEWNEIGH request is still pending.
3159
3160        let controller_req_fut = neighbors_controller_request_stream
3161            .into_future()
3162            .then(async move |(req, _rest)| {
3163                match req
3164                    .expect("Controller request stream unexpectedly ended")
3165                    .expect("failed to receive `Controller` request")
3166                {
3167                    fnet_neighbor::ControllerRequest::ProbeEntry {
3168                        interface,
3169                        neighbor,
3170                        responder,
3171                    } => {
3172                        assert_eq!(interface, entry.interface.get());
3173                        assert_eq!(neighbor, entry.neighbor);
3174                        responder.send(Ok(())).expect("failed to respond to ProbeEntry");
3175                    }
3176                    _ => panic!("unexpected controller request"),
3177                }
3178            })
3179            .fuse();
3180        let _join_handle = scope.spawn(controller_req_fut);
3181
3182        event_loop.run_one_step_in_tests().await;
3183        assert_matches!(waiter.as_mut().now_or_never(), None);
3184        assert_eq!(response_sink.take_messages().len(), 0);
3185
3186        // Send an unrelated neighbor watch event and verify that the request is
3187        // still pending.
3188
3189        {
3190            use fnet_neighbor_ext::testutil::EventSpec::*;
3191            neighbor_event_sink
3192                .unbounded_send(fnet_neighbor_ext::testutil::generate_events_from_spec(&[Added(3)]))
3193                .expect("failed to send event");
3194        }
3195
3196        event_loop.run_one_step_in_tests().await;
3197        assert_matches!(waiter.as_mut().now_or_never(), None);
3198        assert_eq!(response_sink.take_messages().len(), 0);
3199
3200        // Send a neighbor watch event indicating successful transition to
3201        // `Probe` and verify that the request is completed.
3202
3203        {
3204            let mut changed = entry.clone();
3205            changed.state = fnet_neighbor::EntryState::Probe;
3206            let changed_event = fnet_neighbor::EntryIteratorItem::Changed(changed.into());
3207            neighbor_event_sink.unbounded_send(vec![changed_event]).expect("failed to send event");
3208        }
3209
3210        event_loop.run_one_step_in_tests().await;
3211        assert_matches!(waiter.await.expect("waiter channel should not be closed"), Ok(()));
3212        // The event loop & worker aren't responsible for the final response to
3213        // the client (either none, or ACK if requested), so there's nothing
3214        // more to check here.
3215
3216        neighbor_event_sink.close_channel();
3217        interface_event_sink.close_channel();
3218        drop(neighbor_client);
3219        scope.join().await;
3220    }
3221
3222    #[fuchsia::test]
3223    async fn event_loop_with_watch_events_probe_already_probing_succeeds_without_event() {
3224        let scope = fuchsia_async::Scope::new();
3225        let neighbor_events = vec![fnet_neighbor_ext::testutil::EventSpec::Idle];
3226        let EventLoopSetup {
3227            mut event_loop,
3228            mut request_sink,
3229            neighbors_controller_request_stream,
3230            neighbor_event_sink,
3231            interface_event_sink,
3232        } = build_event_loop(&scope, &neighbor_events).await;
3233
3234        let (mut response_sink, neighbor_client, async_work_drain_task) =
3235            crate::client::testutil::new_fake_client::<NetlinkRoute>(
3236                crate::client::testutil::CLIENT_ID_1,
3237                [],
3238            );
3239        let _join_handle = scope.spawn(async_work_drain_task);
3240
3241        // Report existing `Probe` entry.
3242
3243        let (event, entry) = {
3244            use fnet_neighbor_ext::testutil::EventSpec::*;
3245            let mut added = fnet_neighbor_ext::testutil::generate_event_from_spec(&Added(1));
3246            let fnet_neighbor::EntryIteratorItem::Added(to_add) = &mut added else {
3247                panic!("unexpected event")
3248            };
3249            to_add.state = Some(fnet_neighbor::EntryState::Probe);
3250            let entry: fnet_neighbor_ext::Entry =
3251                to_add.clone().try_into().expect("extension conversion failed");
3252            (added, entry)
3253        };
3254        neighbor_event_sink.unbounded_send(vec![event]).expect("failed to send event");
3255        event_loop.run_one_step_in_tests().await;
3256
3257        // Send an RTM_NEWNEIGH request for the neighbor in `Probe` state.
3258
3259        let (completer, waiter) = oneshot::channel();
3260        let waiter = waiter.fuse();
3261        pin_mut!(waiter);
3262
3263        let probe_request: UnifiedRequest<FakeSender<RouteNetlinkMessage>> =
3264            UnifiedRequest::NeighborsRequest(Request {
3265                args: NeighborRequestArgs::New(NewNeighborArgs::ProbeExisting {
3266                    ip: entry.neighbor,
3267                    interface: entry.interface,
3268                }),
3269                sequence_number: TEST_SEQUENCE_NUMBER,
3270                client: neighbor_client.clone(),
3271                completer,
3272            });
3273        request_sink.send(probe_request).await.expect("failed to send create request");
3274
3275        // Handle the expected Controller.ProbeEntry request and verify that the
3276        // RTM_NEWNEIGH request is not pending.
3277
3278        let controller_req_fut = neighbors_controller_request_stream
3279            .into_future()
3280            .then(async move |(req, _rest)| {
3281                match req
3282                    .expect("Controller request stream unexpectedly ended")
3283                    .expect("failed to receive `Controller` request")
3284                {
3285                    fnet_neighbor::ControllerRequest::ProbeEntry {
3286                        interface,
3287                        neighbor,
3288                        responder,
3289                    } => {
3290                        assert_eq!(interface, entry.interface.get());
3291                        assert_eq!(neighbor, entry.neighbor);
3292                        responder.send(Ok(())).expect("failed to respond to ProbeEntry");
3293                    }
3294                    _ => panic!("unexpected controller request"),
3295                }
3296            })
3297            .fuse();
3298        let _join_handle = scope.spawn(controller_req_fut);
3299
3300        event_loop.run_one_step_in_tests().await;
3301        assert_matches!(waiter.await.expect("waiter channel should not be closed"), Ok(()));
3302        assert_eq!(response_sink.take_messages().len(), 0);
3303        // The event loop & worker aren't responsible for the final response to
3304        // the client (either none, or ACK if requested), so there's nothing
3305        // more to check here.
3306
3307        neighbor_event_sink.close_channel();
3308        interface_event_sink.close_channel();
3309        drop(neighbor_client);
3310        scope.join().await;
3311    }
3312
3313    #[fuchsia::test]
3314    async fn neighbors_worker_sends_multicast_updates() {
3315        let (mut sender_sink_member, client_member, _async_work_drain_task_1) =
3316            new_fake_client::<NetlinkRoute>(
3317                CLIENT_ID_1,
3318                [ModernGroup(rtnetlink_groups_RTNLGRP_NEIGH)],
3319            );
3320        let (mut sender_sink_non_member, client_non_member, _async_work_drain_task_2) =
3321            new_fake_client::<NetlinkRoute>(CLIENT_ID_2, []);
3322
3323        let client_table = ClientTable::default();
3324        client_table.add_client(client_member);
3325        client_table.add_client(client_non_member);
3326
3327        let (neighbors_controller, _server_end) =
3328            fidl::endpoints::create_proxy::<fnet_neighbor::ControllerMarker>();
3329        let mut worker = NeighborsWorker { neighbor_table: HashMap::new(), neighbors_controller };
3330
3331        let entry = valid_neighbor_entry();
3332
3333        // Verify that an `Added` event results in an `RTM_NEWNEIGH` multicast
3334        // message.
3335
3336        worker
3337            .handle_neighbor_watcher_event(
3338                fnet_neighbor_ext::Event::Added(entry.clone()),
3339                &client_table,
3340            )
3341            .expect("handle added");
3342
3343        let messages = sender_sink_member.take_messages();
3344        assert_eq!(messages.len(), 1);
3345        let SentMessage { message, group } = &messages[0];
3346        assert_eq!(group, &Some(ModernGroup(rtnetlink_groups_RTNLGRP_NEIGH)));
3347        assert_matches!(
3348            &message.payload,
3349            NetlinkPayload::InnerMessage(RouteNetlinkMessage::NewNeighbour(msg)) => {
3350                assert_eq!(msg.header.ifindex as u64, entry.interface.get());
3351            }
3352        );
3353        assert_eq!(message.header.sequence_number, UNSPECIFIED_SEQUENCE_NUMBER);
3354        assert_eq!(sender_sink_non_member.take_messages().len(), 0);
3355
3356        // Verify that a `Changed` event results in an `RTM_NEWNEIGH` multicast
3357        // message.
3358
3359        let mut new_entry = entry.clone();
3360        new_entry.state = fnet_neighbor::EntryState::Stale;
3361        worker
3362            .handle_neighbor_watcher_event(
3363                fnet_neighbor_ext::Event::Changed(new_entry.clone()),
3364                &client_table,
3365            )
3366            .expect("handle changed");
3367
3368        let messages = sender_sink_member.take_messages();
3369        assert_eq!(messages.len(), 1);
3370        let SentMessage { message, group } = &messages[0];
3371        assert_eq!(group, &Some(ModernGroup(rtnetlink_groups_RTNLGRP_NEIGH)));
3372        assert_matches!(
3373            &message.payload,
3374            NetlinkPayload::InnerMessage(RouteNetlinkMessage::NewNeighbour(msg)) => {
3375                assert_eq!(msg.header.state, NeighbourState::Stale);
3376            }
3377        );
3378        assert_eq!(sender_sink_non_member.take_messages().len(), 0);
3379
3380        // Verify that a `Removed` event results in a `RTM_DELNEIGH` multicast
3381        // message.
3382
3383        worker
3384            .handle_neighbor_watcher_event(
3385                fnet_neighbor_ext::Event::Removed(new_entry.clone()),
3386                &client_table,
3387            )
3388            .expect("handle removed");
3389
3390        let messages = sender_sink_member.take_messages();
3391        assert_eq!(messages.len(), 1);
3392        let SentMessage { message, group } = &messages[0];
3393        assert_eq!(group, &Some(ModernGroup(rtnetlink_groups_RTNLGRP_NEIGH)));
3394        assert_matches!(
3395            &message.payload,
3396            NetlinkPayload::InnerMessage(RouteNetlinkMessage::DelNeighbour(msg)) => {
3397                assert_eq!(msg.header.ifindex as u64, entry.interface.get());
3398            }
3399        );
3400        assert_eq!(sender_sink_non_member.take_messages().len(), 0);
3401    }
3402}