fidl_fuchsia_net_neighbor_ext/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Extensions for types in the `fidl_fuchsia_net_neighbor` crate.
6
7#![deny(missing_docs)]
8
9#[cfg(any(test, feature = "testutils"))]
10pub mod testutil;
11
12use async_utils::{fold, stream};
13use fidl_table_validation::*;
14use futures::{Stream, TryStreamExt as _};
15use thiserror::Error;
16use {
17    fidl_fuchsia_net as fnet, fidl_fuchsia_net_ext as fnet_ext,
18    fidl_fuchsia_net_neighbor as fnet_neighbor, zx_types as zx,
19};
20
21/// Information on a neighboring device in the local network.
22#[derive(Clone, Debug, Eq, Hash, PartialEq, ValidFidlTable)]
23#[fidl_table_src(fnet_neighbor::Entry)]
24#[fidl_table_strict]
25pub struct Entry {
26    /// Identifier for the interface used for communicating with the neighbor.
27    pub interface: u64,
28    /// IP address of the neighbor.
29    pub neighbor: fnet::IpAddress,
30    /// State of the entry within the Neighbor Unreachability Detection (NUD)
31    /// state machine.
32    pub state: fnet_neighbor::EntryState,
33    /// MAC address of the neighboring device's network interface controller.
34    #[fidl_field_type(optional)]
35    pub mac: Option<fnet::MacAddress>,
36    /// Timestamp when this entry has changed `state`.
37    // TODO(https://fxbug.dev/42155335): Replace with zx::MonotonicInstant once there is
38    // support for custom conversion functions.
39    pub updated_at: zx::zx_time_t,
40}
41
42/// Returns a &str suitable for display representing the EntryState parameter.
43pub fn display_entry_state(state: &fnet_neighbor::EntryState) -> &'static str {
44    match state {
45        fnet_neighbor::EntryState::Incomplete => "INCOMPLETE",
46        fnet_neighbor::EntryState::Reachable => "REACHABLE",
47        fnet_neighbor::EntryState::Stale => "STALE",
48        fnet_neighbor::EntryState::Delay => "DELAY",
49        fnet_neighbor::EntryState::Probe => "PROBE",
50        fnet_neighbor::EntryState::Static => "STATIC",
51        fnet_neighbor::EntryState::Unreachable => "UNREACHABLE",
52    }
53}
54
55impl std::fmt::Display for Entry {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
57        let Self { interface, neighbor, mac, state, updated_at: _ } = self;
58        write!(f, "Interface {} | IP {} | MAC ", interface, fnet_ext::IpAddress::from(*neighbor))?;
59        if let Some(mac) = mac {
60            write!(f, "{}", fnet_ext::MacAddress::from(*mac))?;
61        } else {
62            write!(f, "?")?;
63        }
64        write!(f, " | {}", display_entry_state(state))
65    }
66}
67
68/// Neighbor entry events.
69#[derive(Clone, Debug, PartialEq)]
70pub enum Event {
71    /// A neighbor entry that existed prior to watching.
72    Existing(Entry),
73    /// A neighbor entry that was added.
74    Added(Entry),
75    /// A neighbor entry that was changed.
76    Changed(Entry),
77    /// A neighbor entry that was removed.
78    Removed(Entry),
79    /// Sentinel value indicating that all existing entries have been sent.
80    Idle,
81}
82
83impl TryFrom<fnet_neighbor::EntryIteratorItem> for Event {
84    type Error = EntryValidationError;
85
86    fn try_from(value: fnet_neighbor::EntryIteratorItem) -> Result<Self, Self::Error> {
87        match value {
88            fnet_neighbor::EntryIteratorItem::Existing(e) => {
89                Ok(Event::Existing(Entry::try_from(e)?))
90            }
91            fnet_neighbor::EntryIteratorItem::Added(e) => Ok(Event::Added(Entry::try_from(e)?)),
92            fnet_neighbor::EntryIteratorItem::Changed(e) => Ok(Event::Changed(Entry::try_from(e)?)),
93            fnet_neighbor::EntryIteratorItem::Removed(e) => Ok(Event::Removed(Entry::try_from(e)?)),
94            fnet_neighbor::EntryIteratorItem::Idle(_) => Ok(Event::Idle),
95        }
96    }
97}
98
99/// Options for modifying the behavior of `EntryIterator`.
100#[derive(Clone, Debug, Default, Eq, PartialEq, ValidFidlTable)]
101#[fidl_table_src(fnet_neighbor::EntryIteratorOptions)]
102#[fidl_table_strict]
103pub struct EntryIteratorOptions {}
104
105/// Neighbor table entry iterator creation error.
106#[derive(Clone, Debug, Error)]
107#[error("failed to open neighbor entry iterator: {0}")]
108pub struct OpenEntryIteratorError(fidl::Error);
109
110/// Dispatches `OpenEntryIterator` on the view proxy.
111pub fn open_entry_iterator(
112    view_proxy: &fnet_neighbor::ViewProxy,
113    options: EntryIteratorOptions,
114) -> Result<fnet_neighbor::EntryIteratorProxy, OpenEntryIteratorError> {
115    let (neighbor_iter_proxy, entry_iter_server_end) =
116        fidl::endpoints::create_proxy::<fnet_neighbor::EntryIteratorMarker>();
117
118    view_proxy
119        .open_entry_iterator(entry_iter_server_end, &options.into())
120        .map_err(OpenEntryIteratorError)?;
121
122    Ok(neighbor_iter_proxy)
123}
124
125/// Neighbor table entry iterator `GetNext` errors.
126#[derive(Debug, Error)]
127pub enum EntryIteratorError {
128    /// The call to `GetNext` returned a FIDL error.
129    #[error("the call to `GetNext()` failed: {0}")]
130    Fidl(fidl::Error),
131    /// The event returned by `GetNext` encountered a conversion error.
132    #[error("failed to convert event returned by `GetNext()`: {0:?}")]
133    Conversion(EntryValidationError),
134    /// The server returned an empty batch of events.
135    #[error("the call to `GetNext()` returned an empty batch of events")]
136    EmptyEventBatch,
137}
138
139/// [`event_stream_from_view_with_options`] with default [`EntryIteratorOptions`].
140pub fn event_stream_from_view(
141    neighbors_view: &fnet_neighbor::ViewProxy,
142) -> Result<impl Stream<Item = Result<Event, EntryIteratorError>> + 'static, OpenEntryIteratorError>
143{
144    event_stream_from_view_with_options(neighbors_view, Default::default())
145}
146
147/// Connects to the neighbor table entry iterator protocol with
148/// [`EntryIteratorOptions`] and converts the Hanging-Get style API into an
149/// Event stream.
150///
151/// Each call to `GetNext` returns a batch of events, which are flattened into a
152/// single stream. If an error is encountered while calling `GetNext` or while
153/// converting the event, the stream is immediately terminated.
154pub fn event_stream_from_view_with_options(
155    neighbors_view: &fnet_neighbor::ViewProxy,
156    options: EntryIteratorOptions,
157) -> Result<impl Stream<Item = Result<Event, EntryIteratorError>> + 'static, OpenEntryIteratorError>
158{
159    let neighbor_iterator = open_entry_iterator(neighbors_view, options)?;
160    Ok(event_stream_from_iterator(neighbor_iterator))
161}
162
163/// Turns the provided neighbor table entry iterator into an [`Event`] stream by applying
164/// Hanging-Get watch.
165///
166/// Each call to `GetNext` returns a batch of events, which are flattened into a
167/// single stream. If an error is encountered while calling `GetNext` or while
168/// converting the event, the stream is immediately terminated.
169pub fn event_stream_from_iterator(
170    neighbor_iterator: fnet_neighbor::EntryIteratorProxy,
171) -> impl Stream<Item = Result<Event, EntryIteratorError>> {
172    stream::ShortCircuit::new(
173        futures::stream::try_unfold(neighbor_iterator, |iter| async {
174            let events_batch = iter.get_next().await.map_err(EntryIteratorError::Fidl)?;
175            if events_batch.is_empty() {
176                return Err(EntryIteratorError::EmptyEventBatch);
177            }
178            let events_batch = events_batch
179                .into_iter()
180                .map(|event| event.try_into().map_err(EntryIteratorError::Conversion));
181            let event_stream = futures::stream::iter(events_batch);
182            Ok(Some((event_stream, iter)))
183        })
184        // Flatten the stream of event streams into a single event stream.
185        .try_flatten(),
186    )
187}
188
189/// Errors returned by [`collect_neighbors_until_idle`].
190#[derive(Debug, Error)]
191pub enum CollectNeighborsUntilIdleError {
192    /// There was an error in the event stream.
193    #[error("there was an error in the event stream: {0}")]
194    ErrorInStream(EntryIteratorError),
195    /// There was an unexpected event in the event stream. Only `existing` or
196    /// `idle` events are expected.
197    #[error("there was an unexpected event in the event stream: {0:?}")]
198    UnexpectedEvent(Event),
199    /// The event stream unexpectedly ended.
200    #[error("the event stream unexpectedly ended")]
201    StreamEnded,
202}
203
204/// Collects all `existing` events from the stream, stopping once the `idle`
205/// event is observed.
206pub async fn collect_neighbors_until_idle<C: Extend<Entry> + Default>(
207    event_stream: impl futures::Stream<Item = Result<Event, EntryIteratorError>> + Unpin,
208) -> Result<C, CollectNeighborsUntilIdleError> {
209    fold::fold_while(
210        event_stream,
211        Ok(C::default()),
212        |existing_neighbors: Result<C, CollectNeighborsUntilIdleError>, event| {
213            futures::future::ready(match existing_neighbors {
214                Err(_) => {
215                    unreachable!(
216                        "`existing_neighbors` must be `Ok`, because we stop folding on err"
217                    )
218                }
219                Ok(mut existing_neighbors) => match event {
220                    Err(e) => {
221                        fold::FoldWhile::Done(Err(CollectNeighborsUntilIdleError::ErrorInStream(e)))
222                    }
223                    Ok(e) => match e {
224                        Event::Existing(e) => {
225                            existing_neighbors.extend([e]);
226                            fold::FoldWhile::Continue(Ok(existing_neighbors))
227                        }
228                        Event::Idle => fold::FoldWhile::Done(Ok(existing_neighbors)),
229                        e @ Event::Added(_) | e @ Event::Changed(_) | e @ Event::Removed(_) => {
230                            fold::FoldWhile::Done(Err(
231                                CollectNeighborsUntilIdleError::UnexpectedEvent(e),
232                            ))
233                        }
234                    },
235                },
236            })
237        },
238    )
239    .await
240    .short_circuited()
241    .map_err(|_accumulated_thus_far: Result<C, CollectNeighborsUntilIdleError>| {
242        CollectNeighborsUntilIdleError::StreamEnded
243    })?
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249
250    use assert_matches::assert_matches;
251    use futures::{FutureExt, StreamExt as _};
252    use test_case::test_case;
253
254    fn valid_fidl_entry(interface: u64) -> fnet_neighbor::Entry {
255        fnet_neighbor::Entry {
256            interface: Some(interface),
257            neighbor: Some(fnet::IpAddress::Ipv4(fnet::Ipv4Address { addr: [192, 168, 0, 1] })),
258            state: Some(fnet_neighbor::EntryState::Reachable),
259            mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
260            updated_at: Some(123456),
261            ..Default::default()
262        }
263    }
264
265    #[test]
266    fn event_try_from_success() {
267        let fidl_entry = valid_fidl_entry(1);
268        let local_entry: Entry = fidl_entry.clone().try_into().unwrap();
269        assert_matches!(
270            fnet_neighbor::EntryIteratorItem::Existing(fidl_entry.clone()).try_into(),
271            Ok(Event::Existing(entry)) if entry == local_entry
272        );
273        assert_matches!(
274            fnet_neighbor::EntryIteratorItem::Added(fidl_entry.clone()).try_into(),
275            Ok(Event::Added(entry)) if entry == local_entry
276        );
277        assert_matches!(
278            fnet_neighbor::EntryIteratorItem::Changed(fidl_entry.clone()).try_into(),
279            Ok(Event::Changed(entry)) if entry == local_entry
280        );
281        assert_matches!(
282            fnet_neighbor::EntryIteratorItem::Removed(fidl_entry.clone()).try_into(),
283            Ok(Event::Removed(entry)) if entry == local_entry
284        );
285        assert_matches!(
286            fnet_neighbor::EntryIteratorItem::Idle(fnet_neighbor::IdleEvent).try_into(),
287            Ok(Event::Idle)
288        );
289    }
290
291    #[test]
292    fn event_try_from_missing_field() {
293        let fidl_entry = fnet_neighbor::Entry {
294            interface: None, // Required field omitted.
295            neighbor: Some(fnet::IpAddress::Ipv4(fnet::Ipv4Address { addr: [192, 168, 0, 1] })),
296            state: Some(fnet_neighbor::EntryState::Reachable),
297            mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
298            updated_at: Some(123456),
299            ..Default::default()
300        };
301        assert_matches!(
302            Event::try_from(fnet_neighbor::EntryIteratorItem::Existing(fidl_entry.clone())),
303            Err(EntryValidationError::MissingField(_))
304        );
305        assert_matches!(
306            Event::try_from(fnet_neighbor::EntryIteratorItem::Added(fidl_entry.clone())),
307            Err(EntryValidationError::MissingField(_))
308        );
309        assert_matches!(
310            Event::try_from(fnet_neighbor::EntryIteratorItem::Changed(fidl_entry.clone())),
311            Err(EntryValidationError::MissingField(_))
312        );
313        assert_matches!(
314            Event::try_from(fnet_neighbor::EntryIteratorItem::Removed(fidl_entry.clone())),
315            Err(EntryValidationError::MissingField(_))
316        );
317    }
318
319    // Tests `event_stream_from_view` with various "shapes". The test
320    // parameter is a vec of ranges, where each range corresponds to the batch
321    // of events that will be sent in response to a single call to `GetNext().
322    #[test_case(Vec::new(); "no events")]
323    #[test_case(vec![0..1]; "single_batch_single_event")]
324    #[test_case(vec![0..10]; "single_batch_many_events")]
325    #[test_case(vec![0..10, 10..20, 20..30]; "many_batches_many_events")]
326    #[fuchsia_async::run_singlethreaded(test)]
327    async fn event_stream_from_view_against_shape(test_shape: Vec<std::ops::Range<u8>>) {
328        // Build the event stream based on the `test_shape`. Use a channel
329        // so that the stream stays open until `close_channel` is called later.
330        let (batches_sender, batches_receiver) =
331            futures::channel::mpsc::unbounded::<Vec<fnet_neighbor::EntryIteratorItem>>();
332        for batch_shape in &test_shape {
333            batches_sender
334                .unbounded_send(testutil::generate_events_in_range(batch_shape.clone()))
335                .expect("failed to send event batch");
336        }
337
338        let (view, entry_iter_fut) = testutil::create_fake_view(batches_receiver);
339
340        let event_stream =
341            event_stream_from_view(&view).expect("failed to open entry iterator").fuse();
342
343        futures::pin_mut!(entry_iter_fut, event_stream);
344
345        for batch_shape in test_shape {
346            for event_idx in batch_shape.into_iter() {
347                futures::select! {
348                    () = entry_iter_fut => panic!(
349                        "fake entry iterator implementation unexpectedly finished"
350                    ),
351                    event = event_stream.next() => {
352                        let actual_event = event
353                            .expect("event stream unexpectedly empty")
354                            .expect("error processing event");
355                        let expected_event = testutil::generate_event(event_idx)
356                                .try_into()
357                                .expect("test event is unexpectedly invalid");
358                        assert_eq!(actual_event, expected_event);
359                    }
360                };
361            }
362        }
363
364        // Close `batches_sender` and observe that the `event_stream` ends.
365        batches_sender.close_channel();
366        let ((), mut events) = futures::join!(entry_iter_fut, event_stream.collect::<Vec<_>>());
367        assert_matches!(
368            events.pop(),
369            Some(Err(EntryIteratorError::Fidl(fidl::Error::ClientChannelClosed {
370                status: zx_status::Status::PEER_CLOSED,
371                ..
372            })))
373        );
374        assert_matches!(events[..], []);
375    }
376
377    // Verify that calling `event_stream_from_view` multiple times with the
378    // same `View` proxy, results in independent `EntryIterator` clients.
379    #[fuchsia_async::run_singlethreaded(test)]
380    async fn event_stream_from_view_multiple_iterators() {
381        // Events for 3 iterators. Each receives one batch containing 10 events.
382        let test_data = vec![
383            vec![testutil::generate_events_in_range(0..10)],
384            vec![testutil::generate_events_in_range(10..20)],
385            vec![testutil::generate_events_in_range(20..30)],
386        ];
387
388        // Instantiate the fake EntryIterator implementations.
389        let (view, view_server_end) = fidl::endpoints::create_proxy::<fnet_neighbor::ViewMarker>();
390        let view_request_stream = view_server_end.into_stream();
391        let entry_iters_fut = view_request_stream
392            .zip(futures::stream::iter(test_data.clone()))
393            .for_each_concurrent(std::usize::MAX, |(request, event_data)| {
394                testutil::serve_view_request(
395                    request.expect("failed to receive `OpenEntryIterator` request"),
396                    futures::stream::iter(event_data),
397                )
398            });
399
400        let validate_event_streams_fut =
401            futures::future::join_all(test_data.into_iter().map(|event_data| {
402                let events_fut = event_stream_from_view(&view)
403                    .expect("failed to create entry iterator")
404                    .collect::<std::collections::VecDeque<_>>();
405                events_fut.then(|mut events| {
406                    for expected_event in event_data.into_iter().flatten() {
407                        assert_eq!(
408                            events
409                                .pop_front()
410                                .expect("event_stream unexpectedly empty")
411                                .expect("error processing event"),
412                            expected_event.try_into().expect("test event is unexpectedly invalid"),
413                        );
414                    }
415                    assert_matches!(
416                        events.pop_front(),
417                        Some(Err(EntryIteratorError::Fidl(fidl::Error::ClientChannelClosed {
418                            status: zx_status::Status::PEER_CLOSED,
419                            ..
420                        })))
421                    );
422                    assert_matches!(events.make_contiguous(), []);
423                    futures::future::ready(())
424                })
425            }));
426
427        futures::join!(entry_iters_fut, validate_event_streams_fut);
428    }
429
430    // Verify that failing to convert an event results in an error and closes
431    // the event stream. `trailing_event` and `trailing_batch` control whether
432    // a good event is sent after the bad event, either as part of the same
433    // batch or in a subsequent batch. The test expects this data to be
434    // truncated from the resulting event_stream.
435    #[test_case(false, false; "no_trailing")]
436    #[test_case(true, false; "trailing_event")]
437    #[test_case(false, true; "trailing_batch")]
438    #[test_case(true, true; "trailing_event_and_batch")]
439    #[fuchsia_async::run_singlethreaded(test)]
440    async fn event_stream_from_view_conversion_error(trailing_event: bool, trailing_batch: bool) {
441        let bad_event = fnet_neighbor::EntryIteratorItem::Added(fnet_neighbor::Entry {
442            interface: None, // Required field omitted.
443            neighbor: Some(fnet::IpAddress::Ipv4(fnet::Ipv4Address { addr: [192, 168, 0, 1] })),
444            state: Some(fnet_neighbor::EntryState::Reachable),
445            mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
446            updated_at: Some(123456),
447            ..Default::default()
448        });
449
450        let batch = std::iter::once(bad_event)
451            // Optionally append a known good event to the batch.
452            .chain(trailing_event.then(|| testutil::generate_event(0)).into_iter())
453            .collect::<Vec<_>>();
454        let batches = std::iter::once(batch)
455            // Optionally append a known good batch to the sequence of batches.
456            .chain(trailing_batch.then(|| vec![testutil::generate_event(1)]))
457            .collect::<Vec<_>>();
458
459        // Instantiate the fake entry iterator implementation.
460        let (view, entry_iter_fut) = testutil::create_fake_view(futures::stream::iter(batches));
461
462        let event_stream = event_stream_from_view(&view).expect("failed to connect to view").fuse();
463
464        futures::pin_mut!(entry_iter_fut, event_stream);
465        let ((), events) = futures::join!(entry_iter_fut, event_stream.collect::<Vec<_>>());
466        assert_matches!(&events[..], &[Err(EntryIteratorError::Conversion(_))]);
467    }
468
469    // Verify that iterator returning an empty batch results in an error and
470    // closes the event stream. When `trailing_batch` is true, an additional
471    // "good" batch will be sent after the empty batch; the test expects this
472    // data to be truncated from the resulting event_stream.
473    #[test_case(false; "no_trailing_batch")]
474    #[test_case(true; "trailing_batch")]
475    #[fuchsia_async::run_singlethreaded(test)]
476    async fn event_stream_from_view_empty_batch_error(trailing_batch: bool) {
477        let batches = std::iter::once(Vec::new())
478            // Optionally append a known good batch to the sequence of batches.
479            .chain(trailing_batch.then(|| vec![testutil::generate_event(0)]))
480            .collect::<Vec<_>>();
481
482        // Instantiate the fake EntryIterator implementation.
483        let (view, entry_iter_fut) = testutil::create_fake_view(futures::stream::iter(batches));
484
485        let event_stream =
486            event_stream_from_view(&view).expect("failed to create entry iterator").fuse();
487
488        futures::pin_mut!(entry_iter_fut, event_stream);
489        let ((), events) = futures::join!(entry_iter_fut, event_stream.collect::<Vec<_>>());
490        assert_matches!(&events[..], &[Err(EntryIteratorError::EmptyEventBatch)]);
491    }
492
493    #[fuchsia_async::run_singlethreaded(test)]
494    async fn collect_neighbors_until_idle_error_error_in_stream() {
495        let event = Err(EntryIteratorError::EmptyEventBatch);
496        let event_stream = futures::stream::once(futures::future::ready(event));
497        assert_matches!(
498            collect_neighbors_until_idle::<Vec<_>>(event_stream).await,
499            Err(CollectNeighborsUntilIdleError::ErrorInStream(_))
500        );
501    }
502
503    #[fuchsia_async::run_singlethreaded(test)]
504    async fn collect_neighbors_until_idle_error_unexpected_event() {
505        let event = Ok(Event::Added(valid_fidl_entry(1).try_into().unwrap()));
506        let event_stream = futures::stream::once(futures::future::ready(event));
507        assert_matches!(
508            collect_neighbors_until_idle::<Vec<_>>(event_stream).await,
509            Err(CollectNeighborsUntilIdleError::UnexpectedEvent(_))
510        );
511    }
512
513    #[fuchsia_async::run_singlethreaded(test)]
514    async fn collect_neighbors_until_idle_error_stream_ended() {
515        let event = Ok(Event::Existing(valid_fidl_entry(1).try_into().unwrap()));
516        let event_stream = futures::stream::once(futures::future::ready(event));
517        assert_matches!(
518            collect_neighbors_until_idle::<Vec<_>>(event_stream).await,
519            Err(CollectNeighborsUntilIdleError::StreamEnded)
520        );
521    }
522
523    #[fuchsia_async::run_singlethreaded(test)]
524    async fn collect_neighbors_until_idle_success() {
525        let entry: Entry = valid_fidl_entry(1).try_into().unwrap();
526        let mut event_stream = futures::stream::iter([
527            Ok(Event::Existing(entry.clone())),
528            Ok(Event::Idle),
529            Ok(Event::Added(entry.clone())),
530        ]);
531
532        let existing: Vec<_> = collect_neighbors_until_idle(&mut event_stream)
533            .await
534            .expect("failed to collect existing neighbors");
535        assert_eq!(existing, &[entry.clone()]);
536
537        let trailing_events: Vec<_> = event_stream.collect().await;
538        assert_matches!(
539            &trailing_events[..],
540            [Ok(Event::Added(found_entry))] if *found_entry == entry
541        );
542    }
543}