1#![deny(missing_docs)]
8
9#[cfg(any(test, feature = "testutils"))]
10pub mod testutil;
11
12use async_utils::{fold, stream};
13use fidl_table_validation::*;
14use futures::{Stream, TryStreamExt as _};
15use std::num::NonZeroU64;
16use thiserror::Error;
17use {
18 fidl_fuchsia_net as fnet, fidl_fuchsia_net_ext as fnet_ext,
19 fidl_fuchsia_net_neighbor as fnet_neighbor, zx_types as zx,
20};
21
22#[derive(Debug, Error)]
23enum ConversionError {
24 #[error("interface ID must be non-zero")]
25 ZeroInterface,
26}
27
28struct NonZeroU64Converter;
29
30impl Converter for NonZeroU64Converter {
31 type Fidl = u64;
32 type Validated = NonZeroU64;
33 type Error = ConversionError;
34 fn try_from_fidl(value: Self::Fidl) -> std::result::Result<Self::Validated, Self::Error> {
35 NonZeroU64::new(value).ok_or(ConversionError::ZeroInterface)
36 }
37 fn from_validated(validated: Self::Validated) -> Self::Fidl {
38 validated.get()
39 }
40}
41
42#[derive(Clone, Debug, Eq, Hash, PartialEq, ValidFidlTable)]
44#[fidl_table_src(fnet_neighbor::Entry)]
45#[fidl_table_strict]
46pub struct Entry {
47 #[fidl_field_type(converter = NonZeroU64Converter)]
49 pub interface: NonZeroU64,
50 pub neighbor: fnet::IpAddress,
52 pub state: fnet_neighbor::EntryState,
55 #[fidl_field_type(optional)]
57 pub mac: Option<fnet::MacAddress>,
58 pub updated_at: zx::zx_time_t,
62}
63
64pub fn display_entry_state(state: &fnet_neighbor::EntryState) -> &'static str {
66 match state {
67 fnet_neighbor::EntryState::Incomplete => "INCOMPLETE",
68 fnet_neighbor::EntryState::Reachable => "REACHABLE",
69 fnet_neighbor::EntryState::Stale => "STALE",
70 fnet_neighbor::EntryState::Delay => "DELAY",
71 fnet_neighbor::EntryState::Probe => "PROBE",
72 fnet_neighbor::EntryState::Static => "STATIC",
73 fnet_neighbor::EntryState::Unreachable => "UNREACHABLE",
74 }
75}
76
77impl std::fmt::Display for Entry {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
79 let Self { interface, neighbor, mac, state, updated_at: _ } = self;
80 write!(f, "Interface {} | IP {} | MAC ", interface, fnet_ext::IpAddress::from(*neighbor))?;
81 if let Some(mac) = mac {
82 write!(f, "{}", fnet_ext::MacAddress::from(*mac))?;
83 } else {
84 write!(f, "?")?;
85 }
86 write!(f, " | {}", display_entry_state(state))
87 }
88}
89
90#[derive(Clone, Debug, PartialEq)]
92pub enum Event {
93 Existing(Entry),
95 Added(Entry),
97 Changed(Entry),
99 Removed(Entry),
101 Idle,
103}
104
105impl TryFrom<fnet_neighbor::EntryIteratorItem> for Event {
106 type Error = EntryValidationError;
107
108 fn try_from(value: fnet_neighbor::EntryIteratorItem) -> Result<Self, Self::Error> {
109 match value {
110 fnet_neighbor::EntryIteratorItem::Existing(e) => {
111 Ok(Event::Existing(Entry::try_from(e)?))
112 }
113 fnet_neighbor::EntryIteratorItem::Added(e) => Ok(Event::Added(Entry::try_from(e)?)),
114 fnet_neighbor::EntryIteratorItem::Changed(e) => Ok(Event::Changed(Entry::try_from(e)?)),
115 fnet_neighbor::EntryIteratorItem::Removed(e) => Ok(Event::Removed(Entry::try_from(e)?)),
116 fnet_neighbor::EntryIteratorItem::Idle(_) => Ok(Event::Idle),
117 }
118 }
119}
120
121#[derive(Clone, Debug, Default, Eq, PartialEq, ValidFidlTable)]
123#[fidl_table_src(fnet_neighbor::EntryIteratorOptions)]
124#[fidl_table_strict]
125pub struct EntryIteratorOptions {}
126
127#[derive(Clone, Debug, Error)]
129#[error("failed to open neighbor entry iterator: {0}")]
130pub struct OpenEntryIteratorError(fidl::Error);
131
132pub fn open_entry_iterator(
134 view_proxy: &fnet_neighbor::ViewProxy,
135 options: EntryIteratorOptions,
136) -> Result<fnet_neighbor::EntryIteratorProxy, OpenEntryIteratorError> {
137 let (neighbor_iter_proxy, entry_iter_server_end) =
138 fidl::endpoints::create_proxy::<fnet_neighbor::EntryIteratorMarker>();
139
140 view_proxy
141 .open_entry_iterator(entry_iter_server_end, &options.into())
142 .map_err(OpenEntryIteratorError)?;
143
144 Ok(neighbor_iter_proxy)
145}
146
147#[derive(Debug, Error)]
149pub enum EntryIteratorError {
150 #[error("the call to `GetNext()` failed: {0}")]
152 Fidl(fidl::Error),
153 #[error("failed to convert event returned by `GetNext()`: {0:?}")]
155 Conversion(EntryValidationError),
156 #[error("the call to `GetNext()` returned an empty batch of events")]
158 EmptyEventBatch,
159}
160
161pub fn event_stream_from_view(
163 neighbors_view: &fnet_neighbor::ViewProxy,
164) -> Result<impl Stream<Item = Result<Event, EntryIteratorError>> + 'static, OpenEntryIteratorError>
165{
166 event_stream_from_view_with_options(neighbors_view, Default::default())
167}
168
169pub fn event_stream_from_view_with_options(
177 neighbors_view: &fnet_neighbor::ViewProxy,
178 options: EntryIteratorOptions,
179) -> Result<impl Stream<Item = Result<Event, EntryIteratorError>> + 'static, OpenEntryIteratorError>
180{
181 let neighbor_iterator = open_entry_iterator(neighbors_view, options)?;
182 Ok(event_stream_from_iterator(neighbor_iterator))
183}
184
185pub fn event_stream_from_iterator(
192 neighbor_iterator: fnet_neighbor::EntryIteratorProxy,
193) -> impl Stream<Item = Result<Event, EntryIteratorError>> {
194 stream::ShortCircuit::new(
195 futures::stream::try_unfold(neighbor_iterator, |iter| async {
196 let events_batch = iter.get_next().await.map_err(EntryIteratorError::Fidl)?;
197 if events_batch.is_empty() {
198 return Err(EntryIteratorError::EmptyEventBatch);
199 }
200 let events_batch = events_batch
201 .into_iter()
202 .map(|event| event.try_into().map_err(EntryIteratorError::Conversion));
203 let event_stream = futures::stream::iter(events_batch);
204 Ok(Some((event_stream, iter)))
205 })
206 .try_flatten(),
208 )
209}
210
211#[derive(Debug, Error)]
213pub enum CollectNeighborsUntilIdleError {
214 #[error("there was an error in the event stream: {0}")]
216 ErrorInStream(EntryIteratorError),
217 #[error("there was an unexpected event in the event stream: {0:?}")]
220 UnexpectedEvent(Event),
221 #[error("the event stream unexpectedly ended")]
223 StreamEnded,
224}
225
226pub async fn collect_neighbors_until_idle<C: Extend<Entry> + Default>(
229 event_stream: impl futures::Stream<Item = Result<Event, EntryIteratorError>> + Unpin,
230) -> Result<C, CollectNeighborsUntilIdleError> {
231 fold::fold_while(
232 event_stream,
233 Ok(C::default()),
234 |existing_neighbors: Result<C, CollectNeighborsUntilIdleError>, event| {
235 futures::future::ready(match existing_neighbors {
236 Err(_) => {
237 unreachable!(
238 "`existing_neighbors` must be `Ok`, because we stop folding on err"
239 )
240 }
241 Ok(mut existing_neighbors) => match event {
242 Err(e) => {
243 fold::FoldWhile::Done(Err(CollectNeighborsUntilIdleError::ErrorInStream(e)))
244 }
245 Ok(e) => match e {
246 Event::Existing(e) => {
247 existing_neighbors.extend([e]);
248 fold::FoldWhile::Continue(Ok(existing_neighbors))
249 }
250 Event::Idle => fold::FoldWhile::Done(Ok(existing_neighbors)),
251 e @ Event::Added(_) | e @ Event::Changed(_) | e @ Event::Removed(_) => {
252 fold::FoldWhile::Done(Err(
253 CollectNeighborsUntilIdleError::UnexpectedEvent(e),
254 ))
255 }
256 },
257 },
258 })
259 },
260 )
261 .await
262 .short_circuited()
263 .map_err(|_accumulated_thus_far: Result<C, CollectNeighborsUntilIdleError>| {
264 CollectNeighborsUntilIdleError::StreamEnded
265 })?
266}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271
272 use assert_matches::assert_matches;
273 use futures::{FutureExt, StreamExt as _};
274 use test_case::test_case;
275
276 fn valid_fidl_entry(interface: NonZeroU64) -> fnet_neighbor::Entry {
277 fnet_neighbor::Entry {
278 interface: Some(interface.get()),
279 neighbor: Some(fnet::IpAddress::Ipv4(fnet::Ipv4Address { addr: [192, 168, 0, 1] })),
280 state: Some(fnet_neighbor::EntryState::Reachable),
281 mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
282 updated_at: Some(123456),
283 ..Default::default()
284 }
285 }
286
287 #[test]
288 fn event_try_from_success() {
289 let fidl_entry = valid_fidl_entry(NonZeroU64::new(1).unwrap());
290 let local_entry: Entry = fidl_entry.clone().try_into().unwrap();
291 assert_matches!(
292 fnet_neighbor::EntryIteratorItem::Existing(fidl_entry.clone()).try_into(),
293 Ok(Event::Existing(entry)) if entry == local_entry
294 );
295 assert_matches!(
296 fnet_neighbor::EntryIteratorItem::Added(fidl_entry.clone()).try_into(),
297 Ok(Event::Added(entry)) if entry == local_entry
298 );
299 assert_matches!(
300 fnet_neighbor::EntryIteratorItem::Changed(fidl_entry.clone()).try_into(),
301 Ok(Event::Changed(entry)) if entry == local_entry
302 );
303 assert_matches!(
304 fnet_neighbor::EntryIteratorItem::Removed(fidl_entry.clone()).try_into(),
305 Ok(Event::Removed(entry)) if entry == local_entry
306 );
307 assert_matches!(
308 fnet_neighbor::EntryIteratorItem::Idle(fnet_neighbor::IdleEvent).try_into(),
309 Ok(Event::Idle)
310 );
311 }
312
313 #[test]
314 fn event_try_from_missing_field() {
315 let fidl_entry = fnet_neighbor::Entry {
316 interface: None, ..valid_fidl_entry(NonZeroU64::new(1).unwrap())
318 };
319 assert_matches!(
320 Event::try_from(fnet_neighbor::EntryIteratorItem::Existing(fidl_entry.clone())),
321 Err(EntryValidationError::MissingField(_))
322 );
323 assert_matches!(
324 Event::try_from(fnet_neighbor::EntryIteratorItem::Added(fidl_entry.clone())),
325 Err(EntryValidationError::MissingField(_))
326 );
327 assert_matches!(
328 Event::try_from(fnet_neighbor::EntryIteratorItem::Changed(fidl_entry.clone())),
329 Err(EntryValidationError::MissingField(_))
330 );
331 assert_matches!(
332 Event::try_from(fnet_neighbor::EntryIteratorItem::Removed(fidl_entry.clone())),
333 Err(EntryValidationError::MissingField(_))
334 );
335 }
336
337 #[test]
338 fn entry_try_from_zero_interface() {
339 let fidl_entry = fnet_neighbor::Entry {
340 interface: Some(0),
341 ..valid_fidl_entry(NonZeroU64::new(1).unwrap())
342 };
343 assert_matches!(Entry::try_from(fidl_entry), Err(EntryValidationError::InvalidField(_)));
344 }
345
346 #[test_case(Vec::new(); "no events")]
350 #[test_case(vec![0..1]; "single_batch_single_event")]
351 #[test_case(vec![0..10]; "single_batch_many_events")]
352 #[test_case(vec![0..10, 10..20, 20..30]; "many_batches_many_events")]
353 #[fuchsia_async::run_singlethreaded(test)]
354 async fn event_stream_from_view_against_shape(test_shape: Vec<std::ops::Range<u8>>) {
355 let (batches_sender, batches_receiver) =
358 futures::channel::mpsc::unbounded::<Vec<fnet_neighbor::EntryIteratorItem>>();
359 for batch_shape in &test_shape {
360 batches_sender
361 .unbounded_send(testutil::generate_events_in_range(batch_shape.clone()))
362 .expect("failed to send event batch");
363 }
364
365 let (view, entry_iter_fut) = testutil::create_fake_view(batches_receiver);
366
367 let event_stream =
368 event_stream_from_view(&view).expect("failed to open entry iterator").fuse();
369
370 futures::pin_mut!(entry_iter_fut, event_stream);
371
372 for batch_shape in test_shape {
373 for event_idx in batch_shape.into_iter() {
374 futures::select! {
375 () = entry_iter_fut => panic!(
376 "fake entry iterator implementation unexpectedly finished"
377 ),
378 event = event_stream.next() => {
379 let actual_event = event
380 .expect("event stream unexpectedly empty")
381 .expect("error processing event");
382 let expected_event = testutil::generate_event(event_idx)
383 .try_into()
384 .expect("test event is unexpectedly invalid");
385 assert_eq!(actual_event, expected_event);
386 }
387 };
388 }
389 }
390
391 batches_sender.close_channel();
393 let ((), mut events) = futures::join!(entry_iter_fut, event_stream.collect::<Vec<_>>());
394 assert_matches!(
395 events.pop(),
396 Some(Err(EntryIteratorError::Fidl(fidl::Error::ClientChannelClosed {
397 status: zx_status::Status::PEER_CLOSED,
398 ..
399 })))
400 );
401 assert_matches!(events[..], []);
402 }
403
404 #[fuchsia_async::run_singlethreaded(test)]
407 async fn event_stream_from_view_multiple_iterators() {
408 let test_data = vec![
410 vec![testutil::generate_events_in_range(0..10)],
411 vec![testutil::generate_events_in_range(10..20)],
412 vec![testutil::generate_events_in_range(20..30)],
413 ];
414
415 let (view, view_server_end) = fidl::endpoints::create_proxy::<fnet_neighbor::ViewMarker>();
417 let view_request_stream = view_server_end.into_stream();
418 let entry_iters_fut = view_request_stream
419 .zip(futures::stream::iter(test_data.clone()))
420 .for_each_concurrent(std::usize::MAX, |(request, event_data)| {
421 testutil::serve_view_request(
422 request.expect("failed to receive `OpenEntryIterator` request"),
423 futures::stream::iter(event_data),
424 )
425 });
426
427 let validate_event_streams_fut =
428 futures::future::join_all(test_data.into_iter().map(|event_data| {
429 let events_fut = event_stream_from_view(&view)
430 .expect("failed to create entry iterator")
431 .collect::<std::collections::VecDeque<_>>();
432 events_fut.then(|mut events| {
433 for expected_event in event_data.into_iter().flatten() {
434 assert_eq!(
435 events
436 .pop_front()
437 .expect("event_stream unexpectedly empty")
438 .expect("error processing event"),
439 expected_event.try_into().expect("test event is unexpectedly invalid"),
440 );
441 }
442 assert_matches!(
443 events.pop_front(),
444 Some(Err(EntryIteratorError::Fidl(fidl::Error::ClientChannelClosed {
445 status: zx_status::Status::PEER_CLOSED,
446 ..
447 })))
448 );
449 assert_matches!(events.make_contiguous(), []);
450 futures::future::ready(())
451 })
452 }));
453
454 futures::join!(entry_iters_fut, validate_event_streams_fut);
455 }
456
457 #[test_case(false, false; "no_trailing")]
463 #[test_case(true, false; "trailing_event")]
464 #[test_case(false, true; "trailing_batch")]
465 #[test_case(true, true; "trailing_event_and_batch")]
466 #[fuchsia_async::run_singlethreaded(test)]
467 async fn event_stream_from_view_conversion_error(trailing_event: bool, trailing_batch: bool) {
468 let bad_event = fnet_neighbor::EntryIteratorItem::Added(fnet_neighbor::Entry {
469 interface: None, neighbor: Some(fnet::IpAddress::Ipv4(fnet::Ipv4Address { addr: [192, 168, 0, 1] })),
471 state: Some(fnet_neighbor::EntryState::Reachable),
472 mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
473 updated_at: Some(123456),
474 ..Default::default()
475 });
476
477 let batch = std::iter::once(bad_event)
478 .chain(trailing_event.then(|| testutil::generate_event(0)).into_iter())
480 .collect::<Vec<_>>();
481 let batches = std::iter::once(batch)
482 .chain(trailing_batch.then(|| vec![testutil::generate_event(1)]))
484 .collect::<Vec<_>>();
485
486 let (view, entry_iter_fut) = testutil::create_fake_view(futures::stream::iter(batches));
488
489 let event_stream = event_stream_from_view(&view).expect("failed to connect to view").fuse();
490
491 futures::pin_mut!(entry_iter_fut, event_stream);
492 let ((), events) = futures::join!(entry_iter_fut, event_stream.collect::<Vec<_>>());
493 assert_matches!(&events[..], &[Err(EntryIteratorError::Conversion(_))]);
494 }
495
496 #[test_case(false; "no_trailing_batch")]
501 #[test_case(true; "trailing_batch")]
502 #[fuchsia_async::run_singlethreaded(test)]
503 async fn event_stream_from_view_empty_batch_error(trailing_batch: bool) {
504 let batches = std::iter::once(Vec::new())
505 .chain(trailing_batch.then(|| vec![testutil::generate_event(0)]))
507 .collect::<Vec<_>>();
508
509 let (view, entry_iter_fut) = testutil::create_fake_view(futures::stream::iter(batches));
511
512 let event_stream =
513 event_stream_from_view(&view).expect("failed to create entry iterator").fuse();
514
515 futures::pin_mut!(entry_iter_fut, event_stream);
516 let ((), events) = futures::join!(entry_iter_fut, event_stream.collect::<Vec<_>>());
517 assert_matches!(&events[..], &[Err(EntryIteratorError::EmptyEventBatch)]);
518 }
519
520 #[fuchsia_async::run_singlethreaded(test)]
521 async fn collect_neighbors_until_idle_error_error_in_stream() {
522 let event = Err(EntryIteratorError::EmptyEventBatch);
523 let event_stream = futures::stream::once(futures::future::ready(event));
524 assert_matches!(
525 collect_neighbors_until_idle::<Vec<_>>(event_stream).await,
526 Err(CollectNeighborsUntilIdleError::ErrorInStream(_))
527 );
528 }
529
530 #[fuchsia_async::run_singlethreaded(test)]
531 async fn collect_neighbors_until_idle_error_unexpected_event() {
532 let event =
533 Ok(Event::Added(valid_fidl_entry(NonZeroU64::new(1).unwrap()).try_into().unwrap()));
534 let event_stream = futures::stream::once(futures::future::ready(event));
535 assert_matches!(
536 collect_neighbors_until_idle::<Vec<_>>(event_stream).await,
537 Err(CollectNeighborsUntilIdleError::UnexpectedEvent(_))
538 );
539 }
540
541 #[fuchsia_async::run_singlethreaded(test)]
542 async fn collect_neighbors_until_idle_error_stream_ended() {
543 let event =
544 Ok(Event::Existing(valid_fidl_entry(NonZeroU64::new(1).unwrap()).try_into().unwrap()));
545 let event_stream = futures::stream::once(futures::future::ready(event));
546 assert_matches!(
547 collect_neighbors_until_idle::<Vec<_>>(event_stream).await,
548 Err(CollectNeighborsUntilIdleError::StreamEnded)
549 );
550 }
551
552 #[fuchsia_async::run_singlethreaded(test)]
553 async fn collect_neighbors_until_idle_success() {
554 let entry: Entry = valid_fidl_entry(NonZeroU64::new(1).unwrap()).try_into().unwrap();
555 let mut event_stream = futures::stream::iter([
556 Ok(Event::Existing(entry.clone())),
557 Ok(Event::Idle),
558 Ok(Event::Added(entry.clone())),
559 ]);
560
561 let existing: Vec<_> = collect_neighbors_until_idle(&mut event_stream)
562 .await
563 .expect("failed to collect existing neighbors");
564 assert_eq!(existing, &[entry.clone()]);
565
566 let trailing_events: Vec<_> = event_stream.collect().await;
567 assert_matches!(
568 &trailing_events[..],
569 [Ok(Event::Added(found_entry))] if *found_entry == entry
570 );
571 }
572}