Skip to main content

fidl_fuchsia_net_neighbor_ext/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Extensions for types in the `fidl_fuchsia_net_neighbor` crate.
6
7#![deny(missing_docs)]
8
9#[cfg(any(test, feature = "testutils"))]
10pub mod testutil;
11
12use async_utils::{fold, stream};
13use fidl_fuchsia_net_ext as fnet_ext;
14use fidl_table_validation::*;
15use flex_client::ProxyHasDomain;
16use flex_fuchsia_net as fnet;
17use flex_fuchsia_net_neighbor as fnet_neighbor;
18use futures::{Stream, TryStreamExt as _};
19use std::num::NonZeroU64;
20use thiserror::Error;
21use zx_types as zx;
22
23#[derive(Debug, Error)]
24enum ConversionError {
25    #[error("interface ID must be non-zero")]
26    ZeroInterface,
27}
28
29struct NonZeroU64Converter;
30
31impl Converter for NonZeroU64Converter {
32    type Fidl = u64;
33    type Validated = NonZeroU64;
34    type Error = ConversionError;
35    fn try_from_fidl(value: Self::Fidl) -> std::result::Result<Self::Validated, Self::Error> {
36        NonZeroU64::new(value).ok_or(ConversionError::ZeroInterface)
37    }
38    fn from_validated(validated: Self::Validated) -> Self::Fidl {
39        validated.get()
40    }
41}
42
43/// Information on a neighboring device in the local network.
44#[derive(Clone, Debug, Eq, Hash, PartialEq, ValidFidlTable)]
45#[fidl_table_src(fnet_neighbor::Entry)]
46#[fidl_table_strict]
47pub struct Entry {
48    /// Identifier for the interface used for communicating with the neighbor.
49    #[fidl_field_type(converter = NonZeroU64Converter)]
50    pub interface: NonZeroU64,
51    /// IP address of the neighbor.
52    pub neighbor: fnet::IpAddress,
53    /// State of the entry within the Neighbor Unreachability Detection (NUD)
54    /// state machine.
55    pub state: fnet_neighbor::EntryState,
56    /// MAC address of the neighboring device's network interface controller.
57    #[fidl_field_type(optional)]
58    pub mac: Option<fnet::MacAddress>,
59    /// Timestamp when this entry has changed `state`.
60    // TODO(https://fxbug.dev/42155335): Replace with zx::MonotonicInstant once there is
61    // support for custom conversion functions.
62    pub updated_at: zx::zx_time_t,
63}
64
65/// Returns a &str suitable for display representing the EntryState parameter.
66pub fn display_entry_state(state: &fnet_neighbor::EntryState) -> &'static str {
67    match state {
68        fnet_neighbor::EntryState::Incomplete => "INCOMPLETE",
69        fnet_neighbor::EntryState::Reachable => "REACHABLE",
70        fnet_neighbor::EntryState::Stale => "STALE",
71        fnet_neighbor::EntryState::Delay => "DELAY",
72        fnet_neighbor::EntryState::Probe => "PROBE",
73        fnet_neighbor::EntryState::Static => "STATIC",
74        fnet_neighbor::EntryState::Unreachable => "UNREACHABLE",
75    }
76}
77
78impl std::fmt::Display for Entry {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
80        let Self { interface, neighbor, mac, state, updated_at: _ } = self;
81        write!(f, "Interface {} | IP {} | MAC ", interface, fnet_ext::IpAddress::from(*neighbor))?;
82        if let Some(mac) = mac {
83            write!(f, "{}", fnet_ext::MacAddress::from(*mac))?;
84        } else {
85            write!(f, "?")?;
86        }
87        write!(f, " | {}", display_entry_state(state))
88    }
89}
90
91/// Neighbor entry events.
92#[derive(Clone, Debug, PartialEq)]
93pub enum Event {
94    /// A neighbor entry that existed prior to watching.
95    Existing(Entry),
96    /// A neighbor entry that was added.
97    Added(Entry),
98    /// A neighbor entry that was changed.
99    Changed(Entry),
100    /// A neighbor entry that was removed.
101    Removed(Entry),
102    /// Sentinel value indicating that all existing entries have been sent.
103    Idle,
104}
105
106impl TryFrom<fnet_neighbor::EntryIteratorItem> for Event {
107    type Error = EntryValidationError;
108
109    fn try_from(value: fnet_neighbor::EntryIteratorItem) -> Result<Self, Self::Error> {
110        match value {
111            fnet_neighbor::EntryIteratorItem::Existing(e) => {
112                Ok(Event::Existing(Entry::try_from(e)?))
113            }
114            fnet_neighbor::EntryIteratorItem::Added(e) => Ok(Event::Added(Entry::try_from(e)?)),
115            fnet_neighbor::EntryIteratorItem::Changed(e) => Ok(Event::Changed(Entry::try_from(e)?)),
116            fnet_neighbor::EntryIteratorItem::Removed(e) => Ok(Event::Removed(Entry::try_from(e)?)),
117            fnet_neighbor::EntryIteratorItem::Idle(_) => Ok(Event::Idle),
118        }
119    }
120}
121
122/// Options for modifying the behavior of `EntryIterator`.
123#[derive(Clone, Debug, Default, Eq, PartialEq, ValidFidlTable)]
124#[fidl_table_src(fnet_neighbor::EntryIteratorOptions)]
125#[fidl_table_strict]
126pub struct EntryIteratorOptions {}
127
128/// Neighbor table entry iterator creation error.
129#[derive(Clone, Debug, Error)]
130#[error("failed to open neighbor entry iterator: {0}")]
131pub struct OpenEntryIteratorError(fidl::Error);
132
133/// Dispatches `OpenEntryIterator` on the view proxy.
134pub fn open_entry_iterator(
135    view_proxy: &fnet_neighbor::ViewProxy,
136    options: EntryIteratorOptions,
137) -> Result<fnet_neighbor::EntryIteratorProxy, OpenEntryIteratorError> {
138    let (neighbor_iter_proxy, entry_iter_server_end) =
139        view_proxy.domain().create_proxy::<fnet_neighbor::EntryIteratorMarker>();
140
141    view_proxy
142        .open_entry_iterator(entry_iter_server_end, &options.into())
143        .map_err(OpenEntryIteratorError)?;
144
145    Ok(neighbor_iter_proxy)
146}
147
148/// Neighbor table entry iterator `GetNext` errors.
149#[derive(Debug, Error)]
150pub enum EntryIteratorError {
151    /// The call to `GetNext` returned a FIDL error.
152    #[error("the call to `GetNext()` failed: {0}")]
153    Fidl(fidl::Error),
154    /// The event returned by `GetNext` encountered a conversion error.
155    #[error("failed to convert event returned by `GetNext()`: {0:?}")]
156    Conversion(EntryValidationError),
157    /// The server returned an empty batch of events.
158    #[error("the call to `GetNext()` returned an empty batch of events")]
159    EmptyEventBatch,
160}
161
162/// [`event_stream_from_view_with_options`] with default [`EntryIteratorOptions`].
163pub fn event_stream_from_view(
164    neighbors_view: &fnet_neighbor::ViewProxy,
165) -> Result<impl Stream<Item = Result<Event, EntryIteratorError>> + 'static, OpenEntryIteratorError>
166{
167    event_stream_from_view_with_options(neighbors_view, Default::default())
168}
169
170/// Connects to the neighbor table entry iterator protocol with
171/// [`EntryIteratorOptions`] and converts the Hanging-Get style API into an
172/// Event stream.
173///
174/// Each call to `GetNext` returns a batch of events, which are flattened into a
175/// single stream. If an error is encountered while calling `GetNext` or while
176/// converting the event, the stream is immediately terminated.
177pub fn event_stream_from_view_with_options(
178    neighbors_view: &fnet_neighbor::ViewProxy,
179    options: EntryIteratorOptions,
180) -> Result<impl Stream<Item = Result<Event, EntryIteratorError>> + 'static, OpenEntryIteratorError>
181{
182    let neighbor_iterator = open_entry_iterator(neighbors_view, options)?;
183    Ok(event_stream_from_iterator(neighbor_iterator))
184}
185
186/// Turns the provided neighbor table entry iterator into an [`Event`] stream by applying
187/// Hanging-Get watch.
188///
189/// Each call to `GetNext` returns a batch of events, which are flattened into a
190/// single stream. If an error is encountered while calling `GetNext` or while
191/// converting the event, the stream is immediately terminated.
192pub fn event_stream_from_iterator(
193    neighbor_iterator: fnet_neighbor::EntryIteratorProxy,
194) -> impl Stream<Item = Result<Event, EntryIteratorError>> {
195    stream::ShortCircuit::new(
196        futures::stream::try_unfold(neighbor_iterator, |iter| async {
197            let events_batch = iter.get_next().await.map_err(EntryIteratorError::Fidl)?;
198            if events_batch.is_empty() {
199                return Err(EntryIteratorError::EmptyEventBatch);
200            }
201            let events_batch = events_batch
202                .into_iter()
203                .map(|event| event.try_into().map_err(EntryIteratorError::Conversion));
204            let event_stream = futures::stream::iter(events_batch);
205            Ok(Some((event_stream, iter)))
206        })
207        // Flatten the stream of event streams into a single event stream.
208        .try_flatten(),
209    )
210}
211
212/// Errors returned by [`collect_neighbors_until_idle`].
213#[derive(Debug, Error)]
214pub enum CollectNeighborsUntilIdleError {
215    /// There was an error in the event stream.
216    #[error("there was an error in the event stream: {0}")]
217    ErrorInStream(EntryIteratorError),
218    /// There was an unexpected event in the event stream. Only `existing` or
219    /// `idle` events are expected.
220    #[error("there was an unexpected event in the event stream: {0:?}")]
221    UnexpectedEvent(Event),
222    /// The event stream unexpectedly ended.
223    #[error("the event stream unexpectedly ended")]
224    StreamEnded,
225}
226
227/// Collects all `existing` events from the stream, stopping once the `idle`
228/// event is observed.
229pub async fn collect_neighbors_until_idle<C: Extend<Entry> + Default>(
230    event_stream: impl futures::Stream<Item = Result<Event, EntryIteratorError>> + Unpin,
231) -> Result<C, CollectNeighborsUntilIdleError> {
232    fold::fold_while(
233        event_stream,
234        Ok(C::default()),
235        |existing_neighbors: Result<C, CollectNeighborsUntilIdleError>, event| {
236            futures::future::ready(match existing_neighbors {
237                Err(_) => {
238                    unreachable!(
239                        "`existing_neighbors` must be `Ok`, because we stop folding on err"
240                    )
241                }
242                Ok(mut existing_neighbors) => match event {
243                    Err(e) => {
244                        fold::FoldWhile::Done(Err(CollectNeighborsUntilIdleError::ErrorInStream(e)))
245                    }
246                    Ok(e) => match e {
247                        Event::Existing(e) => {
248                            existing_neighbors.extend([e]);
249                            fold::FoldWhile::Continue(Ok(existing_neighbors))
250                        }
251                        Event::Idle => fold::FoldWhile::Done(Ok(existing_neighbors)),
252                        e @ Event::Added(_) | e @ Event::Changed(_) | e @ Event::Removed(_) => {
253                            fold::FoldWhile::Done(Err(
254                                CollectNeighborsUntilIdleError::UnexpectedEvent(e),
255                            ))
256                        }
257                    },
258                },
259            })
260        },
261    )
262    .await
263    .short_circuited()
264    .map_err(|_accumulated_thus_far: Result<C, CollectNeighborsUntilIdleError>| {
265        CollectNeighborsUntilIdleError::StreamEnded
266    })?
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272
273    use assert_matches::assert_matches;
274    use futures::{FutureExt, StreamExt as _};
275    use test_case::test_case;
276
277    fn valid_fidl_entry(interface: NonZeroU64) -> fnet_neighbor::Entry {
278        fnet_neighbor::Entry {
279            interface: Some(interface.get()),
280            neighbor: Some(fnet::IpAddress::Ipv4(fnet::Ipv4Address { addr: [192, 168, 0, 1] })),
281            state: Some(fnet_neighbor::EntryState::Reachable),
282            mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
283            updated_at: Some(123456),
284            ..Default::default()
285        }
286    }
287
288    #[test]
289    fn event_try_from_success() {
290        let fidl_entry = valid_fidl_entry(NonZeroU64::new(1).unwrap());
291        let local_entry: Entry = fidl_entry.clone().try_into().unwrap();
292        assert_matches!(
293            fnet_neighbor::EntryIteratorItem::Existing(fidl_entry.clone()).try_into(),
294            Ok(Event::Existing(entry)) if entry == local_entry
295        );
296        assert_matches!(
297            fnet_neighbor::EntryIteratorItem::Added(fidl_entry.clone()).try_into(),
298            Ok(Event::Added(entry)) if entry == local_entry
299        );
300        assert_matches!(
301            fnet_neighbor::EntryIteratorItem::Changed(fidl_entry.clone()).try_into(),
302            Ok(Event::Changed(entry)) if entry == local_entry
303        );
304        assert_matches!(
305            fnet_neighbor::EntryIteratorItem::Removed(fidl_entry.clone()).try_into(),
306            Ok(Event::Removed(entry)) if entry == local_entry
307        );
308        assert_matches!(
309            fnet_neighbor::EntryIteratorItem::Idle(fnet_neighbor::IdleEvent).try_into(),
310            Ok(Event::Idle)
311        );
312    }
313
314    #[test]
315    fn event_try_from_missing_field() {
316        let fidl_entry = fnet_neighbor::Entry {
317            interface: None, // Required field omitted.
318            ..valid_fidl_entry(NonZeroU64::new(1).unwrap())
319        };
320        assert_matches!(
321            Event::try_from(fnet_neighbor::EntryIteratorItem::Existing(fidl_entry.clone())),
322            Err(EntryValidationError::MissingField(_))
323        );
324        assert_matches!(
325            Event::try_from(fnet_neighbor::EntryIteratorItem::Added(fidl_entry.clone())),
326            Err(EntryValidationError::MissingField(_))
327        );
328        assert_matches!(
329            Event::try_from(fnet_neighbor::EntryIteratorItem::Changed(fidl_entry.clone())),
330            Err(EntryValidationError::MissingField(_))
331        );
332        assert_matches!(
333            Event::try_from(fnet_neighbor::EntryIteratorItem::Removed(fidl_entry.clone())),
334            Err(EntryValidationError::MissingField(_))
335        );
336    }
337
338    #[test]
339    fn entry_try_from_zero_interface() {
340        let fidl_entry = fnet_neighbor::Entry {
341            interface: Some(0),
342            ..valid_fidl_entry(NonZeroU64::new(1).unwrap())
343        };
344        assert_matches!(Entry::try_from(fidl_entry), Err(EntryValidationError::InvalidField(_)));
345    }
346
347    // Tests `event_stream_from_view` with various "shapes". The test
348    // parameter is a vec of ranges, where each range corresponds to the batch
349    // of events that will be sent in response to a single call to `GetNext().
350    #[test_case(Vec::new(); "no events")]
351    #[test_case(vec![0..1]; "single_batch_single_event")]
352    #[test_case(vec![0..10]; "single_batch_many_events")]
353    #[test_case(vec![0..10, 10..20, 20..30]; "many_batches_many_events")]
354    #[fuchsia_async::run_singlethreaded(test)]
355    async fn event_stream_from_view_against_shape(test_shape: Vec<std::ops::Range<u8>>) {
356        // Build the event stream based on the `test_shape`. Use a channel
357        // so that the stream stays open until `close_channel` is called later.
358        let (batches_sender, batches_receiver) =
359            futures::channel::mpsc::unbounded::<Vec<fnet_neighbor::EntryIteratorItem>>();
360        for batch_shape in &test_shape {
361            batches_sender
362                .unbounded_send(testutil::generate_events_in_range(batch_shape.clone()))
363                .expect("failed to send event batch");
364        }
365
366        let (view, entry_iter_fut) = testutil::create_fake_view(batches_receiver);
367
368        let event_stream =
369            event_stream_from_view(&view).expect("failed to open entry iterator").fuse();
370
371        futures::pin_mut!(entry_iter_fut, event_stream);
372
373        for batch_shape in test_shape {
374            for event_idx in batch_shape.into_iter() {
375                futures::select! {
376                    () = entry_iter_fut => panic!(
377                        "fake entry iterator implementation unexpectedly finished"
378                    ),
379                    event = event_stream.next() => {
380                        let actual_event = event
381                            .expect("event stream unexpectedly empty")
382                            .expect("error processing event");
383                        let expected_event = testutil::generate_event(event_idx)
384                                .try_into()
385                                .expect("test event is unexpectedly invalid");
386                        assert_eq!(actual_event, expected_event);
387                    }
388                };
389            }
390        }
391
392        // Close `batches_sender` and observe that the `event_stream` ends.
393        batches_sender.close_channel();
394        let ((), mut events) = futures::join!(entry_iter_fut, event_stream.collect::<Vec<_>>());
395        assert_matches!(
396            events.pop(),
397            Some(Err(EntryIteratorError::Fidl(fidl::Error::ClientChannelClosed {
398                status: zx_status::Status::PEER_CLOSED,
399                ..
400            })))
401        );
402        assert_matches!(events[..], []);
403    }
404
405    // Verify that calling `event_stream_from_view` multiple times with the
406    // same `View` proxy, results in independent `EntryIterator` clients.
407    #[fuchsia_async::run_singlethreaded(test)]
408    async fn event_stream_from_view_multiple_iterators() {
409        // Events for 3 iterators. Each receives one batch containing 10 events.
410        let test_data = vec![
411            vec![testutil::generate_events_in_range(0..10)],
412            vec![testutil::generate_events_in_range(10..20)],
413            vec![testutil::generate_events_in_range(20..30)],
414        ];
415
416        // Instantiate the fake EntryIterator implementations.
417        let (view, view_server_end) = fidl::endpoints::create_proxy::<fnet_neighbor::ViewMarker>();
418        let view_request_stream = view_server_end.into_stream();
419        let entry_iters_fut = view_request_stream
420            .zip(futures::stream::iter(test_data.clone()))
421            .for_each_concurrent(std::usize::MAX, |(request, event_data)| {
422                testutil::serve_view_request(
423                    request.expect("failed to receive `OpenEntryIterator` request"),
424                    futures::stream::iter(event_data),
425                )
426            });
427
428        let validate_event_streams_fut =
429            futures::future::join_all(test_data.into_iter().map(|event_data| {
430                let events_fut = event_stream_from_view(&view)
431                    .expect("failed to create entry iterator")
432                    .collect::<std::collections::VecDeque<_>>();
433                events_fut.then(|mut events| {
434                    for expected_event in event_data.into_iter().flatten() {
435                        assert_eq!(
436                            events
437                                .pop_front()
438                                .expect("event_stream unexpectedly empty")
439                                .expect("error processing event"),
440                            expected_event.try_into().expect("test event is unexpectedly invalid"),
441                        );
442                    }
443                    assert_matches!(
444                        events.pop_front(),
445                        Some(Err(EntryIteratorError::Fidl(fidl::Error::ClientChannelClosed {
446                            status: zx_status::Status::PEER_CLOSED,
447                            ..
448                        })))
449                    );
450                    assert_matches!(events.make_contiguous(), []);
451                    futures::future::ready(())
452                })
453            }));
454
455        futures::join!(entry_iters_fut, validate_event_streams_fut);
456    }
457
458    // Verify that failing to convert an event results in an error and closes
459    // the event stream. `trailing_event` and `trailing_batch` control whether
460    // a good event is sent after the bad event, either as part of the same
461    // batch or in a subsequent batch. The test expects this data to be
462    // truncated from the resulting event_stream.
463    #[test_case(false, false; "no_trailing")]
464    #[test_case(true, false; "trailing_event")]
465    #[test_case(false, true; "trailing_batch")]
466    #[test_case(true, true; "trailing_event_and_batch")]
467    #[fuchsia_async::run_singlethreaded(test)]
468    async fn event_stream_from_view_conversion_error(trailing_event: bool, trailing_batch: bool) {
469        let bad_event = fnet_neighbor::EntryIteratorItem::Added(fnet_neighbor::Entry {
470            interface: None, // Required field omitted.
471            neighbor: Some(fnet::IpAddress::Ipv4(fnet::Ipv4Address { addr: [192, 168, 0, 1] })),
472            state: Some(fnet_neighbor::EntryState::Reachable),
473            mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
474            updated_at: Some(123456),
475            ..Default::default()
476        });
477
478        let batch = std::iter::once(bad_event)
479            // Optionally append a known good event to the batch.
480            .chain(trailing_event.then(|| testutil::generate_event(0)).into_iter())
481            .collect::<Vec<_>>();
482        let batches = std::iter::once(batch)
483            // Optionally append a known good batch to the sequence of batches.
484            .chain(trailing_batch.then(|| vec![testutil::generate_event(1)]))
485            .collect::<Vec<_>>();
486
487        // Instantiate the fake entry iterator implementation.
488        let (view, entry_iter_fut) = testutil::create_fake_view(futures::stream::iter(batches));
489
490        let event_stream = event_stream_from_view(&view).expect("failed to connect to view").fuse();
491
492        futures::pin_mut!(entry_iter_fut, event_stream);
493        let ((), events) = futures::join!(entry_iter_fut, event_stream.collect::<Vec<_>>());
494        assert_matches!(&events[..], &[Err(EntryIteratorError::Conversion(_))]);
495    }
496
497    // Verify that iterator returning an empty batch results in an error and
498    // closes the event stream. When `trailing_batch` is true, an additional
499    // "good" batch will be sent after the empty batch; the test expects this
500    // data to be truncated from the resulting event_stream.
501    #[test_case(false; "no_trailing_batch")]
502    #[test_case(true; "trailing_batch")]
503    #[fuchsia_async::run_singlethreaded(test)]
504    async fn event_stream_from_view_empty_batch_error(trailing_batch: bool) {
505        let batches = std::iter::once(Vec::new())
506            // Optionally append a known good batch to the sequence of batches.
507            .chain(trailing_batch.then(|| vec![testutil::generate_event(0)]))
508            .collect::<Vec<_>>();
509
510        // Instantiate the fake EntryIterator implementation.
511        let (view, entry_iter_fut) = testutil::create_fake_view(futures::stream::iter(batches));
512
513        let event_stream =
514            event_stream_from_view(&view).expect("failed to create entry iterator").fuse();
515
516        futures::pin_mut!(entry_iter_fut, event_stream);
517        let ((), events) = futures::join!(entry_iter_fut, event_stream.collect::<Vec<_>>());
518        assert_matches!(&events[..], &[Err(EntryIteratorError::EmptyEventBatch)]);
519    }
520
521    #[fuchsia_async::run_singlethreaded(test)]
522    async fn collect_neighbors_until_idle_error_error_in_stream() {
523        let event = Err(EntryIteratorError::EmptyEventBatch);
524        let event_stream = futures::stream::once(futures::future::ready(event));
525        assert_matches!(
526            collect_neighbors_until_idle::<Vec<_>>(event_stream).await,
527            Err(CollectNeighborsUntilIdleError::ErrorInStream(_))
528        );
529    }
530
531    #[fuchsia_async::run_singlethreaded(test)]
532    async fn collect_neighbors_until_idle_error_unexpected_event() {
533        let event =
534            Ok(Event::Added(valid_fidl_entry(NonZeroU64::new(1).unwrap()).try_into().unwrap()));
535        let event_stream = futures::stream::once(futures::future::ready(event));
536        assert_matches!(
537            collect_neighbors_until_idle::<Vec<_>>(event_stream).await,
538            Err(CollectNeighborsUntilIdleError::UnexpectedEvent(_))
539        );
540    }
541
542    #[fuchsia_async::run_singlethreaded(test)]
543    async fn collect_neighbors_until_idle_error_stream_ended() {
544        let event =
545            Ok(Event::Existing(valid_fidl_entry(NonZeroU64::new(1).unwrap()).try_into().unwrap()));
546        let event_stream = futures::stream::once(futures::future::ready(event));
547        assert_matches!(
548            collect_neighbors_until_idle::<Vec<_>>(event_stream).await,
549            Err(CollectNeighborsUntilIdleError::StreamEnded)
550        );
551    }
552
553    #[fuchsia_async::run_singlethreaded(test)]
554    async fn collect_neighbors_until_idle_success() {
555        let entry: Entry = valid_fidl_entry(NonZeroU64::new(1).unwrap()).try_into().unwrap();
556        let mut event_stream = futures::stream::iter([
557            Ok(Event::Existing(entry.clone())),
558            Ok(Event::Idle),
559            Ok(Event::Added(entry.clone())),
560        ]);
561
562        let existing: Vec<_> = collect_neighbors_until_idle(&mut event_stream)
563            .await
564            .expect("failed to collect existing neighbors");
565        assert_eq!(existing, &[entry.clone()]);
566
567        let trailing_events: Vec<_> = event_stream.collect().await;
568        assert_matches!(
569            &trailing_events[..],
570            [Ok(Event::Added(found_entry))] if *found_entry == entry
571        );
572    }
573}