Skip to main content

fidl_fuchsia_net_neighbor_ext/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Extensions for types in the `fidl_fuchsia_net_neighbor` crate.
6
7#![deny(missing_docs)]
8
9#[cfg(any(test, feature = "testutils"))]
10pub mod testutil;
11
12use async_utils::{fold, stream};
13use fidl_table_validation::*;
14use futures::{Stream, TryStreamExt as _};
15use std::num::NonZeroU64;
16use thiserror::Error;
17use {
18    fidl_fuchsia_net as fnet, fidl_fuchsia_net_ext as fnet_ext,
19    fidl_fuchsia_net_neighbor as fnet_neighbor, zx_types as zx,
20};
21
22#[derive(Debug, Error)]
23enum ConversionError {
24    #[error("interface ID must be non-zero")]
25    ZeroInterface,
26}
27
28struct NonZeroU64Converter;
29
30impl Converter for NonZeroU64Converter {
31    type Fidl = u64;
32    type Validated = NonZeroU64;
33    type Error = ConversionError;
34    fn try_from_fidl(value: Self::Fidl) -> std::result::Result<Self::Validated, Self::Error> {
35        NonZeroU64::new(value).ok_or(ConversionError::ZeroInterface)
36    }
37    fn from_validated(validated: Self::Validated) -> Self::Fidl {
38        validated.get()
39    }
40}
41
42/// Information on a neighboring device in the local network.
43#[derive(Clone, Debug, Eq, Hash, PartialEq, ValidFidlTable)]
44#[fidl_table_src(fnet_neighbor::Entry)]
45#[fidl_table_strict]
46pub struct Entry {
47    /// Identifier for the interface used for communicating with the neighbor.
48    #[fidl_field_type(converter = NonZeroU64Converter)]
49    pub interface: NonZeroU64,
50    /// IP address of the neighbor.
51    pub neighbor: fnet::IpAddress,
52    /// State of the entry within the Neighbor Unreachability Detection (NUD)
53    /// state machine.
54    pub state: fnet_neighbor::EntryState,
55    /// MAC address of the neighboring device's network interface controller.
56    #[fidl_field_type(optional)]
57    pub mac: Option<fnet::MacAddress>,
58    /// Timestamp when this entry has changed `state`.
59    // TODO(https://fxbug.dev/42155335): Replace with zx::MonotonicInstant once there is
60    // support for custom conversion functions.
61    pub updated_at: zx::zx_time_t,
62}
63
64/// Returns a &str suitable for display representing the EntryState parameter.
65pub fn display_entry_state(state: &fnet_neighbor::EntryState) -> &'static str {
66    match state {
67        fnet_neighbor::EntryState::Incomplete => "INCOMPLETE",
68        fnet_neighbor::EntryState::Reachable => "REACHABLE",
69        fnet_neighbor::EntryState::Stale => "STALE",
70        fnet_neighbor::EntryState::Delay => "DELAY",
71        fnet_neighbor::EntryState::Probe => "PROBE",
72        fnet_neighbor::EntryState::Static => "STATIC",
73        fnet_neighbor::EntryState::Unreachable => "UNREACHABLE",
74    }
75}
76
77impl std::fmt::Display for Entry {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
79        let Self { interface, neighbor, mac, state, updated_at: _ } = self;
80        write!(f, "Interface {} | IP {} | MAC ", interface, fnet_ext::IpAddress::from(*neighbor))?;
81        if let Some(mac) = mac {
82            write!(f, "{}", fnet_ext::MacAddress::from(*mac))?;
83        } else {
84            write!(f, "?")?;
85        }
86        write!(f, " | {}", display_entry_state(state))
87    }
88}
89
90/// Neighbor entry events.
91#[derive(Clone, Debug, PartialEq)]
92pub enum Event {
93    /// A neighbor entry that existed prior to watching.
94    Existing(Entry),
95    /// A neighbor entry that was added.
96    Added(Entry),
97    /// A neighbor entry that was changed.
98    Changed(Entry),
99    /// A neighbor entry that was removed.
100    Removed(Entry),
101    /// Sentinel value indicating that all existing entries have been sent.
102    Idle,
103}
104
105impl TryFrom<fnet_neighbor::EntryIteratorItem> for Event {
106    type Error = EntryValidationError;
107
108    fn try_from(value: fnet_neighbor::EntryIteratorItem) -> Result<Self, Self::Error> {
109        match value {
110            fnet_neighbor::EntryIteratorItem::Existing(e) => {
111                Ok(Event::Existing(Entry::try_from(e)?))
112            }
113            fnet_neighbor::EntryIteratorItem::Added(e) => Ok(Event::Added(Entry::try_from(e)?)),
114            fnet_neighbor::EntryIteratorItem::Changed(e) => Ok(Event::Changed(Entry::try_from(e)?)),
115            fnet_neighbor::EntryIteratorItem::Removed(e) => Ok(Event::Removed(Entry::try_from(e)?)),
116            fnet_neighbor::EntryIteratorItem::Idle(_) => Ok(Event::Idle),
117        }
118    }
119}
120
121/// Options for modifying the behavior of `EntryIterator`.
122#[derive(Clone, Debug, Default, Eq, PartialEq, ValidFidlTable)]
123#[fidl_table_src(fnet_neighbor::EntryIteratorOptions)]
124#[fidl_table_strict]
125pub struct EntryIteratorOptions {}
126
127/// Neighbor table entry iterator creation error.
128#[derive(Clone, Debug, Error)]
129#[error("failed to open neighbor entry iterator: {0}")]
130pub struct OpenEntryIteratorError(fidl::Error);
131
132/// Dispatches `OpenEntryIterator` on the view proxy.
133pub fn open_entry_iterator(
134    view_proxy: &fnet_neighbor::ViewProxy,
135    options: EntryIteratorOptions,
136) -> Result<fnet_neighbor::EntryIteratorProxy, OpenEntryIteratorError> {
137    let (neighbor_iter_proxy, entry_iter_server_end) =
138        fidl::endpoints::create_proxy::<fnet_neighbor::EntryIteratorMarker>();
139
140    view_proxy
141        .open_entry_iterator(entry_iter_server_end, &options.into())
142        .map_err(OpenEntryIteratorError)?;
143
144    Ok(neighbor_iter_proxy)
145}
146
147/// Neighbor table entry iterator `GetNext` errors.
148#[derive(Debug, Error)]
149pub enum EntryIteratorError {
150    /// The call to `GetNext` returned a FIDL error.
151    #[error("the call to `GetNext()` failed: {0}")]
152    Fidl(fidl::Error),
153    /// The event returned by `GetNext` encountered a conversion error.
154    #[error("failed to convert event returned by `GetNext()`: {0:?}")]
155    Conversion(EntryValidationError),
156    /// The server returned an empty batch of events.
157    #[error("the call to `GetNext()` returned an empty batch of events")]
158    EmptyEventBatch,
159}
160
161/// [`event_stream_from_view_with_options`] with default [`EntryIteratorOptions`].
162pub fn event_stream_from_view(
163    neighbors_view: &fnet_neighbor::ViewProxy,
164) -> Result<impl Stream<Item = Result<Event, EntryIteratorError>> + 'static, OpenEntryIteratorError>
165{
166    event_stream_from_view_with_options(neighbors_view, Default::default())
167}
168
169/// Connects to the neighbor table entry iterator protocol with
170/// [`EntryIteratorOptions`] and converts the Hanging-Get style API into an
171/// Event stream.
172///
173/// Each call to `GetNext` returns a batch of events, which are flattened into a
174/// single stream. If an error is encountered while calling `GetNext` or while
175/// converting the event, the stream is immediately terminated.
176pub fn event_stream_from_view_with_options(
177    neighbors_view: &fnet_neighbor::ViewProxy,
178    options: EntryIteratorOptions,
179) -> Result<impl Stream<Item = Result<Event, EntryIteratorError>> + 'static, OpenEntryIteratorError>
180{
181    let neighbor_iterator = open_entry_iterator(neighbors_view, options)?;
182    Ok(event_stream_from_iterator(neighbor_iterator))
183}
184
185/// Turns the provided neighbor table entry iterator into an [`Event`] stream by applying
186/// Hanging-Get watch.
187///
188/// Each call to `GetNext` returns a batch of events, which are flattened into a
189/// single stream. If an error is encountered while calling `GetNext` or while
190/// converting the event, the stream is immediately terminated.
191pub fn event_stream_from_iterator(
192    neighbor_iterator: fnet_neighbor::EntryIteratorProxy,
193) -> impl Stream<Item = Result<Event, EntryIteratorError>> {
194    stream::ShortCircuit::new(
195        futures::stream::try_unfold(neighbor_iterator, |iter| async {
196            let events_batch = iter.get_next().await.map_err(EntryIteratorError::Fidl)?;
197            if events_batch.is_empty() {
198                return Err(EntryIteratorError::EmptyEventBatch);
199            }
200            let events_batch = events_batch
201                .into_iter()
202                .map(|event| event.try_into().map_err(EntryIteratorError::Conversion));
203            let event_stream = futures::stream::iter(events_batch);
204            Ok(Some((event_stream, iter)))
205        })
206        // Flatten the stream of event streams into a single event stream.
207        .try_flatten(),
208    )
209}
210
211/// Errors returned by [`collect_neighbors_until_idle`].
212#[derive(Debug, Error)]
213pub enum CollectNeighborsUntilIdleError {
214    /// There was an error in the event stream.
215    #[error("there was an error in the event stream: {0}")]
216    ErrorInStream(EntryIteratorError),
217    /// There was an unexpected event in the event stream. Only `existing` or
218    /// `idle` events are expected.
219    #[error("there was an unexpected event in the event stream: {0:?}")]
220    UnexpectedEvent(Event),
221    /// The event stream unexpectedly ended.
222    #[error("the event stream unexpectedly ended")]
223    StreamEnded,
224}
225
226/// Collects all `existing` events from the stream, stopping once the `idle`
227/// event is observed.
228pub async fn collect_neighbors_until_idle<C: Extend<Entry> + Default>(
229    event_stream: impl futures::Stream<Item = Result<Event, EntryIteratorError>> + Unpin,
230) -> Result<C, CollectNeighborsUntilIdleError> {
231    fold::fold_while(
232        event_stream,
233        Ok(C::default()),
234        |existing_neighbors: Result<C, CollectNeighborsUntilIdleError>, event| {
235            futures::future::ready(match existing_neighbors {
236                Err(_) => {
237                    unreachable!(
238                        "`existing_neighbors` must be `Ok`, because we stop folding on err"
239                    )
240                }
241                Ok(mut existing_neighbors) => match event {
242                    Err(e) => {
243                        fold::FoldWhile::Done(Err(CollectNeighborsUntilIdleError::ErrorInStream(e)))
244                    }
245                    Ok(e) => match e {
246                        Event::Existing(e) => {
247                            existing_neighbors.extend([e]);
248                            fold::FoldWhile::Continue(Ok(existing_neighbors))
249                        }
250                        Event::Idle => fold::FoldWhile::Done(Ok(existing_neighbors)),
251                        e @ Event::Added(_) | e @ Event::Changed(_) | e @ Event::Removed(_) => {
252                            fold::FoldWhile::Done(Err(
253                                CollectNeighborsUntilIdleError::UnexpectedEvent(e),
254                            ))
255                        }
256                    },
257                },
258            })
259        },
260    )
261    .await
262    .short_circuited()
263    .map_err(|_accumulated_thus_far: Result<C, CollectNeighborsUntilIdleError>| {
264        CollectNeighborsUntilIdleError::StreamEnded
265    })?
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271
272    use assert_matches::assert_matches;
273    use futures::{FutureExt, StreamExt as _};
274    use test_case::test_case;
275
276    fn valid_fidl_entry(interface: NonZeroU64) -> fnet_neighbor::Entry {
277        fnet_neighbor::Entry {
278            interface: Some(interface.get()),
279            neighbor: Some(fnet::IpAddress::Ipv4(fnet::Ipv4Address { addr: [192, 168, 0, 1] })),
280            state: Some(fnet_neighbor::EntryState::Reachable),
281            mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
282            updated_at: Some(123456),
283            ..Default::default()
284        }
285    }
286
287    #[test]
288    fn event_try_from_success() {
289        let fidl_entry = valid_fidl_entry(NonZeroU64::new(1).unwrap());
290        let local_entry: Entry = fidl_entry.clone().try_into().unwrap();
291        assert_matches!(
292            fnet_neighbor::EntryIteratorItem::Existing(fidl_entry.clone()).try_into(),
293            Ok(Event::Existing(entry)) if entry == local_entry
294        );
295        assert_matches!(
296            fnet_neighbor::EntryIteratorItem::Added(fidl_entry.clone()).try_into(),
297            Ok(Event::Added(entry)) if entry == local_entry
298        );
299        assert_matches!(
300            fnet_neighbor::EntryIteratorItem::Changed(fidl_entry.clone()).try_into(),
301            Ok(Event::Changed(entry)) if entry == local_entry
302        );
303        assert_matches!(
304            fnet_neighbor::EntryIteratorItem::Removed(fidl_entry.clone()).try_into(),
305            Ok(Event::Removed(entry)) if entry == local_entry
306        );
307        assert_matches!(
308            fnet_neighbor::EntryIteratorItem::Idle(fnet_neighbor::IdleEvent).try_into(),
309            Ok(Event::Idle)
310        );
311    }
312
313    #[test]
314    fn event_try_from_missing_field() {
315        let fidl_entry = fnet_neighbor::Entry {
316            interface: None, // Required field omitted.
317            ..valid_fidl_entry(NonZeroU64::new(1).unwrap())
318        };
319        assert_matches!(
320            Event::try_from(fnet_neighbor::EntryIteratorItem::Existing(fidl_entry.clone())),
321            Err(EntryValidationError::MissingField(_))
322        );
323        assert_matches!(
324            Event::try_from(fnet_neighbor::EntryIteratorItem::Added(fidl_entry.clone())),
325            Err(EntryValidationError::MissingField(_))
326        );
327        assert_matches!(
328            Event::try_from(fnet_neighbor::EntryIteratorItem::Changed(fidl_entry.clone())),
329            Err(EntryValidationError::MissingField(_))
330        );
331        assert_matches!(
332            Event::try_from(fnet_neighbor::EntryIteratorItem::Removed(fidl_entry.clone())),
333            Err(EntryValidationError::MissingField(_))
334        );
335    }
336
337    #[test]
338    fn entry_try_from_zero_interface() {
339        let fidl_entry = fnet_neighbor::Entry {
340            interface: Some(0),
341            ..valid_fidl_entry(NonZeroU64::new(1).unwrap())
342        };
343        assert_matches!(Entry::try_from(fidl_entry), Err(EntryValidationError::InvalidField(_)));
344    }
345
346    // Tests `event_stream_from_view` with various "shapes". The test
347    // parameter is a vec of ranges, where each range corresponds to the batch
348    // of events that will be sent in response to a single call to `GetNext().
349    #[test_case(Vec::new(); "no events")]
350    #[test_case(vec![0..1]; "single_batch_single_event")]
351    #[test_case(vec![0..10]; "single_batch_many_events")]
352    #[test_case(vec![0..10, 10..20, 20..30]; "many_batches_many_events")]
353    #[fuchsia_async::run_singlethreaded(test)]
354    async fn event_stream_from_view_against_shape(test_shape: Vec<std::ops::Range<u8>>) {
355        // Build the event stream based on the `test_shape`. Use a channel
356        // so that the stream stays open until `close_channel` is called later.
357        let (batches_sender, batches_receiver) =
358            futures::channel::mpsc::unbounded::<Vec<fnet_neighbor::EntryIteratorItem>>();
359        for batch_shape in &test_shape {
360            batches_sender
361                .unbounded_send(testutil::generate_events_in_range(batch_shape.clone()))
362                .expect("failed to send event batch");
363        }
364
365        let (view, entry_iter_fut) = testutil::create_fake_view(batches_receiver);
366
367        let event_stream =
368            event_stream_from_view(&view).expect("failed to open entry iterator").fuse();
369
370        futures::pin_mut!(entry_iter_fut, event_stream);
371
372        for batch_shape in test_shape {
373            for event_idx in batch_shape.into_iter() {
374                futures::select! {
375                    () = entry_iter_fut => panic!(
376                        "fake entry iterator implementation unexpectedly finished"
377                    ),
378                    event = event_stream.next() => {
379                        let actual_event = event
380                            .expect("event stream unexpectedly empty")
381                            .expect("error processing event");
382                        let expected_event = testutil::generate_event(event_idx)
383                                .try_into()
384                                .expect("test event is unexpectedly invalid");
385                        assert_eq!(actual_event, expected_event);
386                    }
387                };
388            }
389        }
390
391        // Close `batches_sender` and observe that the `event_stream` ends.
392        batches_sender.close_channel();
393        let ((), mut events) = futures::join!(entry_iter_fut, event_stream.collect::<Vec<_>>());
394        assert_matches!(
395            events.pop(),
396            Some(Err(EntryIteratorError::Fidl(fidl::Error::ClientChannelClosed {
397                status: zx_status::Status::PEER_CLOSED,
398                ..
399            })))
400        );
401        assert_matches!(events[..], []);
402    }
403
404    // Verify that calling `event_stream_from_view` multiple times with the
405    // same `View` proxy, results in independent `EntryIterator` clients.
406    #[fuchsia_async::run_singlethreaded(test)]
407    async fn event_stream_from_view_multiple_iterators() {
408        // Events for 3 iterators. Each receives one batch containing 10 events.
409        let test_data = vec![
410            vec![testutil::generate_events_in_range(0..10)],
411            vec![testutil::generate_events_in_range(10..20)],
412            vec![testutil::generate_events_in_range(20..30)],
413        ];
414
415        // Instantiate the fake EntryIterator implementations.
416        let (view, view_server_end) = fidl::endpoints::create_proxy::<fnet_neighbor::ViewMarker>();
417        let view_request_stream = view_server_end.into_stream();
418        let entry_iters_fut = view_request_stream
419            .zip(futures::stream::iter(test_data.clone()))
420            .for_each_concurrent(std::usize::MAX, |(request, event_data)| {
421                testutil::serve_view_request(
422                    request.expect("failed to receive `OpenEntryIterator` request"),
423                    futures::stream::iter(event_data),
424                )
425            });
426
427        let validate_event_streams_fut =
428            futures::future::join_all(test_data.into_iter().map(|event_data| {
429                let events_fut = event_stream_from_view(&view)
430                    .expect("failed to create entry iterator")
431                    .collect::<std::collections::VecDeque<_>>();
432                events_fut.then(|mut events| {
433                    for expected_event in event_data.into_iter().flatten() {
434                        assert_eq!(
435                            events
436                                .pop_front()
437                                .expect("event_stream unexpectedly empty")
438                                .expect("error processing event"),
439                            expected_event.try_into().expect("test event is unexpectedly invalid"),
440                        );
441                    }
442                    assert_matches!(
443                        events.pop_front(),
444                        Some(Err(EntryIteratorError::Fidl(fidl::Error::ClientChannelClosed {
445                            status: zx_status::Status::PEER_CLOSED,
446                            ..
447                        })))
448                    );
449                    assert_matches!(events.make_contiguous(), []);
450                    futures::future::ready(())
451                })
452            }));
453
454        futures::join!(entry_iters_fut, validate_event_streams_fut);
455    }
456
457    // Verify that failing to convert an event results in an error and closes
458    // the event stream. `trailing_event` and `trailing_batch` control whether
459    // a good event is sent after the bad event, either as part of the same
460    // batch or in a subsequent batch. The test expects this data to be
461    // truncated from the resulting event_stream.
462    #[test_case(false, false; "no_trailing")]
463    #[test_case(true, false; "trailing_event")]
464    #[test_case(false, true; "trailing_batch")]
465    #[test_case(true, true; "trailing_event_and_batch")]
466    #[fuchsia_async::run_singlethreaded(test)]
467    async fn event_stream_from_view_conversion_error(trailing_event: bool, trailing_batch: bool) {
468        let bad_event = fnet_neighbor::EntryIteratorItem::Added(fnet_neighbor::Entry {
469            interface: None, // Required field omitted.
470            neighbor: Some(fnet::IpAddress::Ipv4(fnet::Ipv4Address { addr: [192, 168, 0, 1] })),
471            state: Some(fnet_neighbor::EntryState::Reachable),
472            mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
473            updated_at: Some(123456),
474            ..Default::default()
475        });
476
477        let batch = std::iter::once(bad_event)
478            // Optionally append a known good event to the batch.
479            .chain(trailing_event.then(|| testutil::generate_event(0)).into_iter())
480            .collect::<Vec<_>>();
481        let batches = std::iter::once(batch)
482            // Optionally append a known good batch to the sequence of batches.
483            .chain(trailing_batch.then(|| vec![testutil::generate_event(1)]))
484            .collect::<Vec<_>>();
485
486        // Instantiate the fake entry iterator implementation.
487        let (view, entry_iter_fut) = testutil::create_fake_view(futures::stream::iter(batches));
488
489        let event_stream = event_stream_from_view(&view).expect("failed to connect to view").fuse();
490
491        futures::pin_mut!(entry_iter_fut, event_stream);
492        let ((), events) = futures::join!(entry_iter_fut, event_stream.collect::<Vec<_>>());
493        assert_matches!(&events[..], &[Err(EntryIteratorError::Conversion(_))]);
494    }
495
496    // Verify that iterator returning an empty batch results in an error and
497    // closes the event stream. When `trailing_batch` is true, an additional
498    // "good" batch will be sent after the empty batch; the test expects this
499    // data to be truncated from the resulting event_stream.
500    #[test_case(false; "no_trailing_batch")]
501    #[test_case(true; "trailing_batch")]
502    #[fuchsia_async::run_singlethreaded(test)]
503    async fn event_stream_from_view_empty_batch_error(trailing_batch: bool) {
504        let batches = std::iter::once(Vec::new())
505            // Optionally append a known good batch to the sequence of batches.
506            .chain(trailing_batch.then(|| vec![testutil::generate_event(0)]))
507            .collect::<Vec<_>>();
508
509        // Instantiate the fake EntryIterator implementation.
510        let (view, entry_iter_fut) = testutil::create_fake_view(futures::stream::iter(batches));
511
512        let event_stream =
513            event_stream_from_view(&view).expect("failed to create entry iterator").fuse();
514
515        futures::pin_mut!(entry_iter_fut, event_stream);
516        let ((), events) = futures::join!(entry_iter_fut, event_stream.collect::<Vec<_>>());
517        assert_matches!(&events[..], &[Err(EntryIteratorError::EmptyEventBatch)]);
518    }
519
520    #[fuchsia_async::run_singlethreaded(test)]
521    async fn collect_neighbors_until_idle_error_error_in_stream() {
522        let event = Err(EntryIteratorError::EmptyEventBatch);
523        let event_stream = futures::stream::once(futures::future::ready(event));
524        assert_matches!(
525            collect_neighbors_until_idle::<Vec<_>>(event_stream).await,
526            Err(CollectNeighborsUntilIdleError::ErrorInStream(_))
527        );
528    }
529
530    #[fuchsia_async::run_singlethreaded(test)]
531    async fn collect_neighbors_until_idle_error_unexpected_event() {
532        let event =
533            Ok(Event::Added(valid_fidl_entry(NonZeroU64::new(1).unwrap()).try_into().unwrap()));
534        let event_stream = futures::stream::once(futures::future::ready(event));
535        assert_matches!(
536            collect_neighbors_until_idle::<Vec<_>>(event_stream).await,
537            Err(CollectNeighborsUntilIdleError::UnexpectedEvent(_))
538        );
539    }
540
541    #[fuchsia_async::run_singlethreaded(test)]
542    async fn collect_neighbors_until_idle_error_stream_ended() {
543        let event =
544            Ok(Event::Existing(valid_fidl_entry(NonZeroU64::new(1).unwrap()).try_into().unwrap()));
545        let event_stream = futures::stream::once(futures::future::ready(event));
546        assert_matches!(
547            collect_neighbors_until_idle::<Vec<_>>(event_stream).await,
548            Err(CollectNeighborsUntilIdleError::StreamEnded)
549        );
550    }
551
552    #[fuchsia_async::run_singlethreaded(test)]
553    async fn collect_neighbors_until_idle_success() {
554        let entry: Entry = valid_fidl_entry(NonZeroU64::new(1).unwrap()).try_into().unwrap();
555        let mut event_stream = futures::stream::iter([
556            Ok(Event::Existing(entry.clone())),
557            Ok(Event::Idle),
558            Ok(Event::Added(entry.clone())),
559        ]);
560
561        let existing: Vec<_> = collect_neighbors_until_idle(&mut event_stream)
562            .await
563            .expect("failed to collect existing neighbors");
564        assert_eq!(existing, &[entry.clone()]);
565
566        let trailing_events: Vec<_> = event_stream.collect().await;
567        assert_matches!(
568            &trailing_events[..],
569            [Ok(Event::Added(found_entry))] if *found_entry == entry
570        );
571    }
572}