1#![deny(missing_docs)]
8
9#[cfg(any(test, feature = "testutils"))]
10pub mod testutil;
11
12use async_utils::{fold, stream};
13use fidl_table_validation::*;
14use futures::{Stream, TryStreamExt as _};
15use thiserror::Error;
16use {
17 fidl_fuchsia_net as fnet, fidl_fuchsia_net_ext as fnet_ext,
18 fidl_fuchsia_net_neighbor as fnet_neighbor, zx_types as zx,
19};
20
21#[derive(Clone, Debug, Eq, Hash, PartialEq, ValidFidlTable)]
23#[fidl_table_src(fnet_neighbor::Entry)]
24#[fidl_table_strict]
25pub struct Entry {
26 pub interface: u64,
28 pub neighbor: fnet::IpAddress,
30 pub state: fnet_neighbor::EntryState,
33 #[fidl_field_type(optional)]
35 pub mac: Option<fnet::MacAddress>,
36 pub updated_at: zx::zx_time_t,
40}
41
42pub fn display_entry_state(state: &fnet_neighbor::EntryState) -> &'static str {
44 match state {
45 fnet_neighbor::EntryState::Incomplete => "INCOMPLETE",
46 fnet_neighbor::EntryState::Reachable => "REACHABLE",
47 fnet_neighbor::EntryState::Stale => "STALE",
48 fnet_neighbor::EntryState::Delay => "DELAY",
49 fnet_neighbor::EntryState::Probe => "PROBE",
50 fnet_neighbor::EntryState::Static => "STATIC",
51 fnet_neighbor::EntryState::Unreachable => "UNREACHABLE",
52 }
53}
54
55impl std::fmt::Display for Entry {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
57 let Self { interface, neighbor, mac, state, updated_at: _ } = self;
58 write!(f, "Interface {} | IP {} | MAC ", interface, fnet_ext::IpAddress::from(*neighbor))?;
59 if let Some(mac) = mac {
60 write!(f, "{}", fnet_ext::MacAddress::from(*mac))?;
61 } else {
62 write!(f, "?")?;
63 }
64 write!(f, " | {}", display_entry_state(state))
65 }
66}
67
68#[derive(Clone, Debug, PartialEq)]
70pub enum Event {
71 Existing(Entry),
73 Added(Entry),
75 Changed(Entry),
77 Removed(Entry),
79 Idle,
81}
82
83impl TryFrom<fnet_neighbor::EntryIteratorItem> for Event {
84 type Error = EntryValidationError;
85
86 fn try_from(value: fnet_neighbor::EntryIteratorItem) -> Result<Self, Self::Error> {
87 match value {
88 fnet_neighbor::EntryIteratorItem::Existing(e) => {
89 Ok(Event::Existing(Entry::try_from(e)?))
90 }
91 fnet_neighbor::EntryIteratorItem::Added(e) => Ok(Event::Added(Entry::try_from(e)?)),
92 fnet_neighbor::EntryIteratorItem::Changed(e) => Ok(Event::Changed(Entry::try_from(e)?)),
93 fnet_neighbor::EntryIteratorItem::Removed(e) => Ok(Event::Removed(Entry::try_from(e)?)),
94 fnet_neighbor::EntryIteratorItem::Idle(_) => Ok(Event::Idle),
95 }
96 }
97}
98
99#[derive(Clone, Debug, Default, Eq, PartialEq, ValidFidlTable)]
101#[fidl_table_src(fnet_neighbor::EntryIteratorOptions)]
102#[fidl_table_strict]
103pub struct EntryIteratorOptions {}
104
105#[derive(Clone, Debug, Error)]
107#[error("failed to open neighbor entry iterator: {0}")]
108pub struct OpenEntryIteratorError(fidl::Error);
109
110pub fn open_entry_iterator(
112 view_proxy: &fnet_neighbor::ViewProxy,
113 options: EntryIteratorOptions,
114) -> Result<fnet_neighbor::EntryIteratorProxy, OpenEntryIteratorError> {
115 let (neighbor_iter_proxy, entry_iter_server_end) =
116 fidl::endpoints::create_proxy::<fnet_neighbor::EntryIteratorMarker>();
117
118 view_proxy
119 .open_entry_iterator(entry_iter_server_end, &options.into())
120 .map_err(OpenEntryIteratorError)?;
121
122 Ok(neighbor_iter_proxy)
123}
124
125#[derive(Debug, Error)]
127pub enum EntryIteratorError {
128 #[error("the call to `GetNext()` failed: {0}")]
130 Fidl(fidl::Error),
131 #[error("failed to convert event returned by `GetNext()`: {0:?}")]
133 Conversion(EntryValidationError),
134 #[error("the call to `GetNext()` returned an empty batch of events")]
136 EmptyEventBatch,
137}
138
139pub fn event_stream_from_view(
141 neighbors_view: &fnet_neighbor::ViewProxy,
142) -> Result<impl Stream<Item = Result<Event, EntryIteratorError>> + 'static, OpenEntryIteratorError>
143{
144 event_stream_from_view_with_options(neighbors_view, Default::default())
145}
146
147pub fn event_stream_from_view_with_options(
155 neighbors_view: &fnet_neighbor::ViewProxy,
156 options: EntryIteratorOptions,
157) -> Result<impl Stream<Item = Result<Event, EntryIteratorError>> + 'static, OpenEntryIteratorError>
158{
159 let neighbor_iterator = open_entry_iterator(neighbors_view, options)?;
160 Ok(event_stream_from_iterator(neighbor_iterator))
161}
162
163pub fn event_stream_from_iterator(
170 neighbor_iterator: fnet_neighbor::EntryIteratorProxy,
171) -> impl Stream<Item = Result<Event, EntryIteratorError>> {
172 stream::ShortCircuit::new(
173 futures::stream::try_unfold(neighbor_iterator, |iter| async {
174 let events_batch = iter.get_next().await.map_err(EntryIteratorError::Fidl)?;
175 if events_batch.is_empty() {
176 return Err(EntryIteratorError::EmptyEventBatch);
177 }
178 let events_batch = events_batch
179 .into_iter()
180 .map(|event| event.try_into().map_err(EntryIteratorError::Conversion));
181 let event_stream = futures::stream::iter(events_batch);
182 Ok(Some((event_stream, iter)))
183 })
184 .try_flatten(),
186 )
187}
188
189#[derive(Debug, Error)]
191pub enum CollectNeighborsUntilIdleError {
192 #[error("there was an error in the event stream: {0}")]
194 ErrorInStream(EntryIteratorError),
195 #[error("there was an unexpected event in the event stream: {0:?}")]
198 UnexpectedEvent(Event),
199 #[error("the event stream unexpectedly ended")]
201 StreamEnded,
202}
203
204pub async fn collect_neighbors_until_idle<C: Extend<Entry> + Default>(
207 event_stream: impl futures::Stream<Item = Result<Event, EntryIteratorError>> + Unpin,
208) -> Result<C, CollectNeighborsUntilIdleError> {
209 fold::fold_while(
210 event_stream,
211 Ok(C::default()),
212 |existing_neighbors: Result<C, CollectNeighborsUntilIdleError>, event| {
213 futures::future::ready(match existing_neighbors {
214 Err(_) => {
215 unreachable!(
216 "`existing_neighbors` must be `Ok`, because we stop folding on err"
217 )
218 }
219 Ok(mut existing_neighbors) => match event {
220 Err(e) => {
221 fold::FoldWhile::Done(Err(CollectNeighborsUntilIdleError::ErrorInStream(e)))
222 }
223 Ok(e) => match e {
224 Event::Existing(e) => {
225 existing_neighbors.extend([e]);
226 fold::FoldWhile::Continue(Ok(existing_neighbors))
227 }
228 Event::Idle => fold::FoldWhile::Done(Ok(existing_neighbors)),
229 e @ Event::Added(_) | e @ Event::Changed(_) | e @ Event::Removed(_) => {
230 fold::FoldWhile::Done(Err(
231 CollectNeighborsUntilIdleError::UnexpectedEvent(e),
232 ))
233 }
234 },
235 },
236 })
237 },
238 )
239 .await
240 .short_circuited()
241 .map_err(|_accumulated_thus_far: Result<C, CollectNeighborsUntilIdleError>| {
242 CollectNeighborsUntilIdleError::StreamEnded
243 })?
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249
250 use assert_matches::assert_matches;
251 use futures::{FutureExt, StreamExt as _};
252 use test_case::test_case;
253
254 fn valid_fidl_entry(interface: u64) -> fnet_neighbor::Entry {
255 fnet_neighbor::Entry {
256 interface: Some(interface),
257 neighbor: Some(fnet::IpAddress::Ipv4(fnet::Ipv4Address { addr: [192, 168, 0, 1] })),
258 state: Some(fnet_neighbor::EntryState::Reachable),
259 mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
260 updated_at: Some(123456),
261 ..Default::default()
262 }
263 }
264
265 #[test]
266 fn event_try_from_success() {
267 let fidl_entry = valid_fidl_entry(1);
268 let local_entry: Entry = fidl_entry.clone().try_into().unwrap();
269 assert_matches!(
270 fnet_neighbor::EntryIteratorItem::Existing(fidl_entry.clone()).try_into(),
271 Ok(Event::Existing(entry)) if entry == local_entry
272 );
273 assert_matches!(
274 fnet_neighbor::EntryIteratorItem::Added(fidl_entry.clone()).try_into(),
275 Ok(Event::Added(entry)) if entry == local_entry
276 );
277 assert_matches!(
278 fnet_neighbor::EntryIteratorItem::Changed(fidl_entry.clone()).try_into(),
279 Ok(Event::Changed(entry)) if entry == local_entry
280 );
281 assert_matches!(
282 fnet_neighbor::EntryIteratorItem::Removed(fidl_entry.clone()).try_into(),
283 Ok(Event::Removed(entry)) if entry == local_entry
284 );
285 assert_matches!(
286 fnet_neighbor::EntryIteratorItem::Idle(fnet_neighbor::IdleEvent).try_into(),
287 Ok(Event::Idle)
288 );
289 }
290
291 #[test]
292 fn event_try_from_missing_field() {
293 let fidl_entry = fnet_neighbor::Entry {
294 interface: None, neighbor: Some(fnet::IpAddress::Ipv4(fnet::Ipv4Address { addr: [192, 168, 0, 1] })),
296 state: Some(fnet_neighbor::EntryState::Reachable),
297 mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
298 updated_at: Some(123456),
299 ..Default::default()
300 };
301 assert_matches!(
302 Event::try_from(fnet_neighbor::EntryIteratorItem::Existing(fidl_entry.clone())),
303 Err(EntryValidationError::MissingField(_))
304 );
305 assert_matches!(
306 Event::try_from(fnet_neighbor::EntryIteratorItem::Added(fidl_entry.clone())),
307 Err(EntryValidationError::MissingField(_))
308 );
309 assert_matches!(
310 Event::try_from(fnet_neighbor::EntryIteratorItem::Changed(fidl_entry.clone())),
311 Err(EntryValidationError::MissingField(_))
312 );
313 assert_matches!(
314 Event::try_from(fnet_neighbor::EntryIteratorItem::Removed(fidl_entry.clone())),
315 Err(EntryValidationError::MissingField(_))
316 );
317 }
318
319 #[test_case(Vec::new(); "no events")]
323 #[test_case(vec![0..1]; "single_batch_single_event")]
324 #[test_case(vec![0..10]; "single_batch_many_events")]
325 #[test_case(vec![0..10, 10..20, 20..30]; "many_batches_many_events")]
326 #[fuchsia_async::run_singlethreaded(test)]
327 async fn event_stream_from_view_against_shape(test_shape: Vec<std::ops::Range<u8>>) {
328 let (batches_sender, batches_receiver) =
331 futures::channel::mpsc::unbounded::<Vec<fnet_neighbor::EntryIteratorItem>>();
332 for batch_shape in &test_shape {
333 batches_sender
334 .unbounded_send(testutil::generate_events_in_range(batch_shape.clone()))
335 .expect("failed to send event batch");
336 }
337
338 let (view, entry_iter_fut) = testutil::create_fake_view(batches_receiver);
339
340 let event_stream =
341 event_stream_from_view(&view).expect("failed to open entry iterator").fuse();
342
343 futures::pin_mut!(entry_iter_fut, event_stream);
344
345 for batch_shape in test_shape {
346 for event_idx in batch_shape.into_iter() {
347 futures::select! {
348 () = entry_iter_fut => panic!(
349 "fake entry iterator implementation unexpectedly finished"
350 ),
351 event = event_stream.next() => {
352 let actual_event = event
353 .expect("event stream unexpectedly empty")
354 .expect("error processing event");
355 let expected_event = testutil::generate_event(event_idx)
356 .try_into()
357 .expect("test event is unexpectedly invalid");
358 assert_eq!(actual_event, expected_event);
359 }
360 };
361 }
362 }
363
364 batches_sender.close_channel();
366 let ((), mut events) = futures::join!(entry_iter_fut, event_stream.collect::<Vec<_>>());
367 assert_matches!(
368 events.pop(),
369 Some(Err(EntryIteratorError::Fidl(fidl::Error::ClientChannelClosed {
370 status: zx_status::Status::PEER_CLOSED,
371 ..
372 })))
373 );
374 assert_matches!(events[..], []);
375 }
376
377 #[fuchsia_async::run_singlethreaded(test)]
380 async fn event_stream_from_view_multiple_iterators() {
381 let test_data = vec![
383 vec![testutil::generate_events_in_range(0..10)],
384 vec![testutil::generate_events_in_range(10..20)],
385 vec![testutil::generate_events_in_range(20..30)],
386 ];
387
388 let (view, view_server_end) = fidl::endpoints::create_proxy::<fnet_neighbor::ViewMarker>();
390 let view_request_stream = view_server_end.into_stream();
391 let entry_iters_fut = view_request_stream
392 .zip(futures::stream::iter(test_data.clone()))
393 .for_each_concurrent(std::usize::MAX, |(request, event_data)| {
394 testutil::serve_view_request(
395 request.expect("failed to receive `OpenEntryIterator` request"),
396 futures::stream::iter(event_data),
397 )
398 });
399
400 let validate_event_streams_fut =
401 futures::future::join_all(test_data.into_iter().map(|event_data| {
402 let events_fut = event_stream_from_view(&view)
403 .expect("failed to create entry iterator")
404 .collect::<std::collections::VecDeque<_>>();
405 events_fut.then(|mut events| {
406 for expected_event in event_data.into_iter().flatten() {
407 assert_eq!(
408 events
409 .pop_front()
410 .expect("event_stream unexpectedly empty")
411 .expect("error processing event"),
412 expected_event.try_into().expect("test event is unexpectedly invalid"),
413 );
414 }
415 assert_matches!(
416 events.pop_front(),
417 Some(Err(EntryIteratorError::Fidl(fidl::Error::ClientChannelClosed {
418 status: zx_status::Status::PEER_CLOSED,
419 ..
420 })))
421 );
422 assert_matches!(events.make_contiguous(), []);
423 futures::future::ready(())
424 })
425 }));
426
427 futures::join!(entry_iters_fut, validate_event_streams_fut);
428 }
429
430 #[test_case(false, false; "no_trailing")]
436 #[test_case(true, false; "trailing_event")]
437 #[test_case(false, true; "trailing_batch")]
438 #[test_case(true, true; "trailing_event_and_batch")]
439 #[fuchsia_async::run_singlethreaded(test)]
440 async fn event_stream_from_view_conversion_error(trailing_event: bool, trailing_batch: bool) {
441 let bad_event = fnet_neighbor::EntryIteratorItem::Added(fnet_neighbor::Entry {
442 interface: None, neighbor: Some(fnet::IpAddress::Ipv4(fnet::Ipv4Address { addr: [192, 168, 0, 1] })),
444 state: Some(fnet_neighbor::EntryState::Reachable),
445 mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
446 updated_at: Some(123456),
447 ..Default::default()
448 });
449
450 let batch = std::iter::once(bad_event)
451 .chain(trailing_event.then(|| testutil::generate_event(0)).into_iter())
453 .collect::<Vec<_>>();
454 let batches = std::iter::once(batch)
455 .chain(trailing_batch.then(|| vec![testutil::generate_event(1)]))
457 .collect::<Vec<_>>();
458
459 let (view, entry_iter_fut) = testutil::create_fake_view(futures::stream::iter(batches));
461
462 let event_stream = event_stream_from_view(&view).expect("failed to connect to view").fuse();
463
464 futures::pin_mut!(entry_iter_fut, event_stream);
465 let ((), events) = futures::join!(entry_iter_fut, event_stream.collect::<Vec<_>>());
466 assert_matches!(&events[..], &[Err(EntryIteratorError::Conversion(_))]);
467 }
468
469 #[test_case(false; "no_trailing_batch")]
474 #[test_case(true; "trailing_batch")]
475 #[fuchsia_async::run_singlethreaded(test)]
476 async fn event_stream_from_view_empty_batch_error(trailing_batch: bool) {
477 let batches = std::iter::once(Vec::new())
478 .chain(trailing_batch.then(|| vec![testutil::generate_event(0)]))
480 .collect::<Vec<_>>();
481
482 let (view, entry_iter_fut) = testutil::create_fake_view(futures::stream::iter(batches));
484
485 let event_stream =
486 event_stream_from_view(&view).expect("failed to create entry iterator").fuse();
487
488 futures::pin_mut!(entry_iter_fut, event_stream);
489 let ((), events) = futures::join!(entry_iter_fut, event_stream.collect::<Vec<_>>());
490 assert_matches!(&events[..], &[Err(EntryIteratorError::EmptyEventBatch)]);
491 }
492
493 #[fuchsia_async::run_singlethreaded(test)]
494 async fn collect_neighbors_until_idle_error_error_in_stream() {
495 let event = Err(EntryIteratorError::EmptyEventBatch);
496 let event_stream = futures::stream::once(futures::future::ready(event));
497 assert_matches!(
498 collect_neighbors_until_idle::<Vec<_>>(event_stream).await,
499 Err(CollectNeighborsUntilIdleError::ErrorInStream(_))
500 );
501 }
502
503 #[fuchsia_async::run_singlethreaded(test)]
504 async fn collect_neighbors_until_idle_error_unexpected_event() {
505 let event = Ok(Event::Added(valid_fidl_entry(1).try_into().unwrap()));
506 let event_stream = futures::stream::once(futures::future::ready(event));
507 assert_matches!(
508 collect_neighbors_until_idle::<Vec<_>>(event_stream).await,
509 Err(CollectNeighborsUntilIdleError::UnexpectedEvent(_))
510 );
511 }
512
513 #[fuchsia_async::run_singlethreaded(test)]
514 async fn collect_neighbors_until_idle_error_stream_ended() {
515 let event = Ok(Event::Existing(valid_fidl_entry(1).try_into().unwrap()));
516 let event_stream = futures::stream::once(futures::future::ready(event));
517 assert_matches!(
518 collect_neighbors_until_idle::<Vec<_>>(event_stream).await,
519 Err(CollectNeighborsUntilIdleError::StreamEnded)
520 );
521 }
522
523 #[fuchsia_async::run_singlethreaded(test)]
524 async fn collect_neighbors_until_idle_success() {
525 let entry: Entry = valid_fidl_entry(1).try_into().unwrap();
526 let mut event_stream = futures::stream::iter([
527 Ok(Event::Existing(entry.clone())),
528 Ok(Event::Idle),
529 Ok(Event::Added(entry.clone())),
530 ]);
531
532 let existing: Vec<_> = collect_neighbors_until_idle(&mut event_stream)
533 .await
534 .expect("failed to collect existing neighbors");
535 assert_eq!(existing, &[entry.clone()]);
536
537 let trailing_events: Vec<_> = event_stream.collect().await;
538 assert_matches!(
539 &trailing_events[..],
540 [Ok(Event::Added(found_entry))] if *found_entry == entry
541 );
542 }
543}