1#![deny(missing_docs)]
8
9#[cfg(any(test, feature = "testutils"))]
10pub mod testutil;
11
12use async_utils::{fold, stream};
13use fidl_fuchsia_net_ext as fnet_ext;
14use fidl_table_validation::*;
15use flex_client::ProxyHasDomain;
16use flex_fuchsia_net as fnet;
17use flex_fuchsia_net_neighbor as fnet_neighbor;
18use futures::{Stream, TryStreamExt as _};
19use std::num::NonZeroU64;
20use thiserror::Error;
21use zx_types as zx;
22
23#[derive(Debug, Error)]
24enum ConversionError {
25 #[error("interface ID must be non-zero")]
26 ZeroInterface,
27}
28
29struct NonZeroU64Converter;
30
31impl Converter for NonZeroU64Converter {
32 type Fidl = u64;
33 type Validated = NonZeroU64;
34 type Error = ConversionError;
35 fn try_from_fidl(value: Self::Fidl) -> std::result::Result<Self::Validated, Self::Error> {
36 NonZeroU64::new(value).ok_or(ConversionError::ZeroInterface)
37 }
38 fn from_validated(validated: Self::Validated) -> Self::Fidl {
39 validated.get()
40 }
41}
42
43#[derive(Clone, Debug, Eq, Hash, PartialEq, ValidFidlTable)]
45#[fidl_table_src(fnet_neighbor::Entry)]
46#[fidl_table_strict]
47pub struct Entry {
48 #[fidl_field_type(converter = NonZeroU64Converter)]
50 pub interface: NonZeroU64,
51 pub neighbor: fnet::IpAddress,
53 pub state: fnet_neighbor::EntryState,
56 #[fidl_field_type(optional)]
58 pub mac: Option<fnet::MacAddress>,
59 pub updated_at: zx::zx_time_t,
63}
64
65pub fn display_entry_state(state: &fnet_neighbor::EntryState) -> &'static str {
67 match state {
68 fnet_neighbor::EntryState::Incomplete => "INCOMPLETE",
69 fnet_neighbor::EntryState::Reachable => "REACHABLE",
70 fnet_neighbor::EntryState::Stale => "STALE",
71 fnet_neighbor::EntryState::Delay => "DELAY",
72 fnet_neighbor::EntryState::Probe => "PROBE",
73 fnet_neighbor::EntryState::Static => "STATIC",
74 fnet_neighbor::EntryState::Unreachable => "UNREACHABLE",
75 }
76}
77
78impl std::fmt::Display for Entry {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
80 let Self { interface, neighbor, mac, state, updated_at: _ } = self;
81 write!(f, "Interface {} | IP {} | MAC ", interface, fnet_ext::IpAddress::from(*neighbor))?;
82 if let Some(mac) = mac {
83 write!(f, "{}", fnet_ext::MacAddress::from(*mac))?;
84 } else {
85 write!(f, "?")?;
86 }
87 write!(f, " | {}", display_entry_state(state))
88 }
89}
90
91#[derive(Clone, Debug, PartialEq)]
93pub enum Event {
94 Existing(Entry),
96 Added(Entry),
98 Changed(Entry),
100 Removed(Entry),
102 Idle,
104}
105
106impl TryFrom<fnet_neighbor::EntryIteratorItem> for Event {
107 type Error = EntryValidationError;
108
109 fn try_from(value: fnet_neighbor::EntryIteratorItem) -> Result<Self, Self::Error> {
110 match value {
111 fnet_neighbor::EntryIteratorItem::Existing(e) => {
112 Ok(Event::Existing(Entry::try_from(e)?))
113 }
114 fnet_neighbor::EntryIteratorItem::Added(e) => Ok(Event::Added(Entry::try_from(e)?)),
115 fnet_neighbor::EntryIteratorItem::Changed(e) => Ok(Event::Changed(Entry::try_from(e)?)),
116 fnet_neighbor::EntryIteratorItem::Removed(e) => Ok(Event::Removed(Entry::try_from(e)?)),
117 fnet_neighbor::EntryIteratorItem::Idle(_) => Ok(Event::Idle),
118 }
119 }
120}
121
122#[derive(Clone, Debug, Default, Eq, PartialEq, ValidFidlTable)]
124#[fidl_table_src(fnet_neighbor::EntryIteratorOptions)]
125#[fidl_table_strict]
126pub struct EntryIteratorOptions {}
127
128#[derive(Clone, Debug, Error)]
130#[error("failed to open neighbor entry iterator: {0}")]
131pub struct OpenEntryIteratorError(fidl::Error);
132
133pub fn open_entry_iterator(
135 view_proxy: &fnet_neighbor::ViewProxy,
136 options: EntryIteratorOptions,
137) -> Result<fnet_neighbor::EntryIteratorProxy, OpenEntryIteratorError> {
138 let (neighbor_iter_proxy, entry_iter_server_end) =
139 view_proxy.domain().create_proxy::<fnet_neighbor::EntryIteratorMarker>();
140
141 view_proxy
142 .open_entry_iterator(entry_iter_server_end, &options.into())
143 .map_err(OpenEntryIteratorError)?;
144
145 Ok(neighbor_iter_proxy)
146}
147
148#[derive(Debug, Error)]
150pub enum EntryIteratorError {
151 #[error("the call to `GetNext()` failed: {0}")]
153 Fidl(fidl::Error),
154 #[error("failed to convert event returned by `GetNext()`: {0:?}")]
156 Conversion(EntryValidationError),
157 #[error("the call to `GetNext()` returned an empty batch of events")]
159 EmptyEventBatch,
160}
161
162pub fn event_stream_from_view(
164 neighbors_view: &fnet_neighbor::ViewProxy,
165) -> Result<impl Stream<Item = Result<Event, EntryIteratorError>> + 'static, OpenEntryIteratorError>
166{
167 event_stream_from_view_with_options(neighbors_view, Default::default())
168}
169
170pub fn event_stream_from_view_with_options(
178 neighbors_view: &fnet_neighbor::ViewProxy,
179 options: EntryIteratorOptions,
180) -> Result<impl Stream<Item = Result<Event, EntryIteratorError>> + 'static, OpenEntryIteratorError>
181{
182 let neighbor_iterator = open_entry_iterator(neighbors_view, options)?;
183 Ok(event_stream_from_iterator(neighbor_iterator))
184}
185
186pub fn event_stream_from_iterator(
193 neighbor_iterator: fnet_neighbor::EntryIteratorProxy,
194) -> impl Stream<Item = Result<Event, EntryIteratorError>> {
195 stream::ShortCircuit::new(
196 futures::stream::try_unfold(neighbor_iterator, |iter| async {
197 let events_batch = iter.get_next().await.map_err(EntryIteratorError::Fidl)?;
198 if events_batch.is_empty() {
199 return Err(EntryIteratorError::EmptyEventBatch);
200 }
201 let events_batch = events_batch
202 .into_iter()
203 .map(|event| event.try_into().map_err(EntryIteratorError::Conversion));
204 let event_stream = futures::stream::iter(events_batch);
205 Ok(Some((event_stream, iter)))
206 })
207 .try_flatten(),
209 )
210}
211
212#[derive(Debug, Error)]
214pub enum CollectNeighborsUntilIdleError {
215 #[error("there was an error in the event stream: {0}")]
217 ErrorInStream(EntryIteratorError),
218 #[error("there was an unexpected event in the event stream: {0:?}")]
221 UnexpectedEvent(Event),
222 #[error("the event stream unexpectedly ended")]
224 StreamEnded,
225}
226
227pub async fn collect_neighbors_until_idle<C: Extend<Entry> + Default>(
230 event_stream: impl futures::Stream<Item = Result<Event, EntryIteratorError>> + Unpin,
231) -> Result<C, CollectNeighborsUntilIdleError> {
232 fold::fold_while(
233 event_stream,
234 Ok(C::default()),
235 |existing_neighbors: Result<C, CollectNeighborsUntilIdleError>, event| {
236 futures::future::ready(match existing_neighbors {
237 Err(_) => {
238 unreachable!(
239 "`existing_neighbors` must be `Ok`, because we stop folding on err"
240 )
241 }
242 Ok(mut existing_neighbors) => match event {
243 Err(e) => {
244 fold::FoldWhile::Done(Err(CollectNeighborsUntilIdleError::ErrorInStream(e)))
245 }
246 Ok(e) => match e {
247 Event::Existing(e) => {
248 existing_neighbors.extend([e]);
249 fold::FoldWhile::Continue(Ok(existing_neighbors))
250 }
251 Event::Idle => fold::FoldWhile::Done(Ok(existing_neighbors)),
252 e @ Event::Added(_) | e @ Event::Changed(_) | e @ Event::Removed(_) => {
253 fold::FoldWhile::Done(Err(
254 CollectNeighborsUntilIdleError::UnexpectedEvent(e),
255 ))
256 }
257 },
258 },
259 })
260 },
261 )
262 .await
263 .short_circuited()
264 .map_err(|_accumulated_thus_far: Result<C, CollectNeighborsUntilIdleError>| {
265 CollectNeighborsUntilIdleError::StreamEnded
266 })?
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272
273 use assert_matches::assert_matches;
274 use futures::{FutureExt, StreamExt as _};
275 use test_case::test_case;
276
277 fn valid_fidl_entry(interface: NonZeroU64) -> fnet_neighbor::Entry {
278 fnet_neighbor::Entry {
279 interface: Some(interface.get()),
280 neighbor: Some(fnet::IpAddress::Ipv4(fnet::Ipv4Address { addr: [192, 168, 0, 1] })),
281 state: Some(fnet_neighbor::EntryState::Reachable),
282 mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
283 updated_at: Some(123456),
284 ..Default::default()
285 }
286 }
287
288 #[test]
289 fn event_try_from_success() {
290 let fidl_entry = valid_fidl_entry(NonZeroU64::new(1).unwrap());
291 let local_entry: Entry = fidl_entry.clone().try_into().unwrap();
292 assert_matches!(
293 fnet_neighbor::EntryIteratorItem::Existing(fidl_entry.clone()).try_into(),
294 Ok(Event::Existing(entry)) if entry == local_entry
295 );
296 assert_matches!(
297 fnet_neighbor::EntryIteratorItem::Added(fidl_entry.clone()).try_into(),
298 Ok(Event::Added(entry)) if entry == local_entry
299 );
300 assert_matches!(
301 fnet_neighbor::EntryIteratorItem::Changed(fidl_entry.clone()).try_into(),
302 Ok(Event::Changed(entry)) if entry == local_entry
303 );
304 assert_matches!(
305 fnet_neighbor::EntryIteratorItem::Removed(fidl_entry.clone()).try_into(),
306 Ok(Event::Removed(entry)) if entry == local_entry
307 );
308 assert_matches!(
309 fnet_neighbor::EntryIteratorItem::Idle(fnet_neighbor::IdleEvent).try_into(),
310 Ok(Event::Idle)
311 );
312 }
313
314 #[test]
315 fn event_try_from_missing_field() {
316 let fidl_entry = fnet_neighbor::Entry {
317 interface: None, ..valid_fidl_entry(NonZeroU64::new(1).unwrap())
319 };
320 assert_matches!(
321 Event::try_from(fnet_neighbor::EntryIteratorItem::Existing(fidl_entry.clone())),
322 Err(EntryValidationError::MissingField(_))
323 );
324 assert_matches!(
325 Event::try_from(fnet_neighbor::EntryIteratorItem::Added(fidl_entry.clone())),
326 Err(EntryValidationError::MissingField(_))
327 );
328 assert_matches!(
329 Event::try_from(fnet_neighbor::EntryIteratorItem::Changed(fidl_entry.clone())),
330 Err(EntryValidationError::MissingField(_))
331 );
332 assert_matches!(
333 Event::try_from(fnet_neighbor::EntryIteratorItem::Removed(fidl_entry.clone())),
334 Err(EntryValidationError::MissingField(_))
335 );
336 }
337
338 #[test]
339 fn entry_try_from_zero_interface() {
340 let fidl_entry = fnet_neighbor::Entry {
341 interface: Some(0),
342 ..valid_fidl_entry(NonZeroU64::new(1).unwrap())
343 };
344 assert_matches!(Entry::try_from(fidl_entry), Err(EntryValidationError::InvalidField(_)));
345 }
346
347 #[test_case(Vec::new(); "no events")]
351 #[test_case(vec![0..1]; "single_batch_single_event")]
352 #[test_case(vec![0..10]; "single_batch_many_events")]
353 #[test_case(vec![0..10, 10..20, 20..30]; "many_batches_many_events")]
354 #[fuchsia_async::run_singlethreaded(test)]
355 async fn event_stream_from_view_against_shape(test_shape: Vec<std::ops::Range<u8>>) {
356 let (batches_sender, batches_receiver) =
359 futures::channel::mpsc::unbounded::<Vec<fnet_neighbor::EntryIteratorItem>>();
360 for batch_shape in &test_shape {
361 batches_sender
362 .unbounded_send(testutil::generate_events_in_range(batch_shape.clone()))
363 .expect("failed to send event batch");
364 }
365
366 let (view, entry_iter_fut) = testutil::create_fake_view(batches_receiver);
367
368 let event_stream =
369 event_stream_from_view(&view).expect("failed to open entry iterator").fuse();
370
371 futures::pin_mut!(entry_iter_fut, event_stream);
372
373 for batch_shape in test_shape {
374 for event_idx in batch_shape.into_iter() {
375 futures::select! {
376 () = entry_iter_fut => panic!(
377 "fake entry iterator implementation unexpectedly finished"
378 ),
379 event = event_stream.next() => {
380 let actual_event = event
381 .expect("event stream unexpectedly empty")
382 .expect("error processing event");
383 let expected_event = testutil::generate_event(event_idx)
384 .try_into()
385 .expect("test event is unexpectedly invalid");
386 assert_eq!(actual_event, expected_event);
387 }
388 };
389 }
390 }
391
392 batches_sender.close_channel();
394 let ((), mut events) = futures::join!(entry_iter_fut, event_stream.collect::<Vec<_>>());
395 assert_matches!(
396 events.pop(),
397 Some(Err(EntryIteratorError::Fidl(fidl::Error::ClientChannelClosed {
398 status: zx_status::Status::PEER_CLOSED,
399 ..
400 })))
401 );
402 assert_matches!(events[..], []);
403 }
404
405 #[fuchsia_async::run_singlethreaded(test)]
408 async fn event_stream_from_view_multiple_iterators() {
409 let test_data = vec![
411 vec![testutil::generate_events_in_range(0..10)],
412 vec![testutil::generate_events_in_range(10..20)],
413 vec![testutil::generate_events_in_range(20..30)],
414 ];
415
416 let (view, view_server_end) = fidl::endpoints::create_proxy::<fnet_neighbor::ViewMarker>();
418 let view_request_stream = view_server_end.into_stream();
419 let entry_iters_fut = view_request_stream
420 .zip(futures::stream::iter(test_data.clone()))
421 .for_each_concurrent(std::usize::MAX, |(request, event_data)| {
422 testutil::serve_view_request(
423 request.expect("failed to receive `OpenEntryIterator` request"),
424 futures::stream::iter(event_data),
425 )
426 });
427
428 let validate_event_streams_fut =
429 futures::future::join_all(test_data.into_iter().map(|event_data| {
430 let events_fut = event_stream_from_view(&view)
431 .expect("failed to create entry iterator")
432 .collect::<std::collections::VecDeque<_>>();
433 events_fut.then(|mut events| {
434 for expected_event in event_data.into_iter().flatten() {
435 assert_eq!(
436 events
437 .pop_front()
438 .expect("event_stream unexpectedly empty")
439 .expect("error processing event"),
440 expected_event.try_into().expect("test event is unexpectedly invalid"),
441 );
442 }
443 assert_matches!(
444 events.pop_front(),
445 Some(Err(EntryIteratorError::Fidl(fidl::Error::ClientChannelClosed {
446 status: zx_status::Status::PEER_CLOSED,
447 ..
448 })))
449 );
450 assert_matches!(events.make_contiguous(), []);
451 futures::future::ready(())
452 })
453 }));
454
455 futures::join!(entry_iters_fut, validate_event_streams_fut);
456 }
457
458 #[test_case(false, false; "no_trailing")]
464 #[test_case(true, false; "trailing_event")]
465 #[test_case(false, true; "trailing_batch")]
466 #[test_case(true, true; "trailing_event_and_batch")]
467 #[fuchsia_async::run_singlethreaded(test)]
468 async fn event_stream_from_view_conversion_error(trailing_event: bool, trailing_batch: bool) {
469 let bad_event = fnet_neighbor::EntryIteratorItem::Added(fnet_neighbor::Entry {
470 interface: None, neighbor: Some(fnet::IpAddress::Ipv4(fnet::Ipv4Address { addr: [192, 168, 0, 1] })),
472 state: Some(fnet_neighbor::EntryState::Reachable),
473 mac: Some(fnet::MacAddress { octets: [0, 1, 2, 3, 4, 5] }),
474 updated_at: Some(123456),
475 ..Default::default()
476 });
477
478 let batch = std::iter::once(bad_event)
479 .chain(trailing_event.then(|| testutil::generate_event(0)).into_iter())
481 .collect::<Vec<_>>();
482 let batches = std::iter::once(batch)
483 .chain(trailing_batch.then(|| vec![testutil::generate_event(1)]))
485 .collect::<Vec<_>>();
486
487 let (view, entry_iter_fut) = testutil::create_fake_view(futures::stream::iter(batches));
489
490 let event_stream = event_stream_from_view(&view).expect("failed to connect to view").fuse();
491
492 futures::pin_mut!(entry_iter_fut, event_stream);
493 let ((), events) = futures::join!(entry_iter_fut, event_stream.collect::<Vec<_>>());
494 assert_matches!(&events[..], &[Err(EntryIteratorError::Conversion(_))]);
495 }
496
497 #[test_case(false; "no_trailing_batch")]
502 #[test_case(true; "trailing_batch")]
503 #[fuchsia_async::run_singlethreaded(test)]
504 async fn event_stream_from_view_empty_batch_error(trailing_batch: bool) {
505 let batches = std::iter::once(Vec::new())
506 .chain(trailing_batch.then(|| vec![testutil::generate_event(0)]))
508 .collect::<Vec<_>>();
509
510 let (view, entry_iter_fut) = testutil::create_fake_view(futures::stream::iter(batches));
512
513 let event_stream =
514 event_stream_from_view(&view).expect("failed to create entry iterator").fuse();
515
516 futures::pin_mut!(entry_iter_fut, event_stream);
517 let ((), events) = futures::join!(entry_iter_fut, event_stream.collect::<Vec<_>>());
518 assert_matches!(&events[..], &[Err(EntryIteratorError::EmptyEventBatch)]);
519 }
520
521 #[fuchsia_async::run_singlethreaded(test)]
522 async fn collect_neighbors_until_idle_error_error_in_stream() {
523 let event = Err(EntryIteratorError::EmptyEventBatch);
524 let event_stream = futures::stream::once(futures::future::ready(event));
525 assert_matches!(
526 collect_neighbors_until_idle::<Vec<_>>(event_stream).await,
527 Err(CollectNeighborsUntilIdleError::ErrorInStream(_))
528 );
529 }
530
531 #[fuchsia_async::run_singlethreaded(test)]
532 async fn collect_neighbors_until_idle_error_unexpected_event() {
533 let event =
534 Ok(Event::Added(valid_fidl_entry(NonZeroU64::new(1).unwrap()).try_into().unwrap()));
535 let event_stream = futures::stream::once(futures::future::ready(event));
536 assert_matches!(
537 collect_neighbors_until_idle::<Vec<_>>(event_stream).await,
538 Err(CollectNeighborsUntilIdleError::UnexpectedEvent(_))
539 );
540 }
541
542 #[fuchsia_async::run_singlethreaded(test)]
543 async fn collect_neighbors_until_idle_error_stream_ended() {
544 let event =
545 Ok(Event::Existing(valid_fidl_entry(NonZeroU64::new(1).unwrap()).try_into().unwrap()));
546 let event_stream = futures::stream::once(futures::future::ready(event));
547 assert_matches!(
548 collect_neighbors_until_idle::<Vec<_>>(event_stream).await,
549 Err(CollectNeighborsUntilIdleError::StreamEnded)
550 );
551 }
552
553 #[fuchsia_async::run_singlethreaded(test)]
554 async fn collect_neighbors_until_idle_success() {
555 let entry: Entry = valid_fidl_entry(NonZeroU64::new(1).unwrap()).try_into().unwrap();
556 let mut event_stream = futures::stream::iter([
557 Ok(Event::Existing(entry.clone())),
558 Ok(Event::Idle),
559 Ok(Event::Added(entry.clone())),
560 ]);
561
562 let existing: Vec<_> = collect_neighbors_until_idle(&mut event_stream)
563 .await
564 .expect("failed to collect existing neighbors");
565 assert_eq!(existing, &[entry.clone()]);
566
567 let trailing_events: Vec<_> = event_stream.collect().await;
568 assert_matches!(
569 &trailing_events[..],
570 [Ok(Event::Added(found_entry))] if *found_entry == entry
571 );
572 }
573}