reachability/
eventloop.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! The special-purpose event loop used by the reachability monitor.
6//!
7//! This event loop receives events from netstack. Thsose events are used by the reachability
8//! monitor to infer the connectivity state.
9
10use reachability_handler::ReachabilityState;
11
12use anyhow::{Context as _, anyhow};
13use fidl_fuchsia_net_interfaces_ext::{self as fnet_interfaces_ext, Update as _};
14use fuchsia_async::{self as fasync};
15use fuchsia_inspect::Inspector;
16use fuchsia_inspect::health::Reporter;
17use futures::channel::mpsc;
18use futures::prelude::*;
19use futures::select;
20use log::{debug, error, info, warn};
21use named_timer::NamedTimeoutExt;
22use reachability_core::dig::Dig;
23use reachability_core::fetch::Fetch;
24use reachability_core::ping::Ping;
25use reachability_core::route_table::RouteTable;
26use reachability_core::telemetry::{self, TelemetryEvent, TelemetrySender};
27use reachability_core::{
28    FIDL_TIMEOUT_ID, InterfaceView, Monitor, NeighborCache, NetworkCheckAction, NetworkCheckCookie,
29    NetworkCheckResult, NetworkChecker, NetworkCheckerOutcome, watchdog,
30};
31use reachability_handler::ReachabilityHandler;
32use std::collections::{HashMap, HashSet};
33use std::pin::pin;
34use {
35    fidl_fuchsia_hardware_network as fhardware_network, fidl_fuchsia_net_debug as fnet_debug,
36    fidl_fuchsia_net_interfaces as fnet_interfaces, fidl_fuchsia_net_neighbor as fnet_neighbor,
37    fidl_fuchsia_net_routes as fnet_routes, fidl_fuchsia_net_routes_ext as fnet_routes_ext,
38};
39
40const REPORT_PERIOD: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(60);
41const PROBE_PERIOD: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(60);
42const FIDL_TIMEOUT: zx::MonotonicDuration = zx::MonotonicDuration::from_seconds(90);
43
44struct SystemDispatcher;
45
46#[async_trait::async_trait]
47impl watchdog::SystemDispatcher for SystemDispatcher {
48    type DeviceDiagnostics = DeviceDiagnosticsProvider;
49
50    async fn log_debug_info(&self) -> Result<(), watchdog::Error> {
51        let diagnostics =
52            fuchsia_component::client::connect_to_protocol::<fnet_debug::DiagnosticsMarker>()
53                .map_err(|e| {
54                    error!(e:? = e; "failed to connect to protocol");
55                    watchdog::Error::NotSupported
56                })?;
57        diagnostics
58            .log_debug_info_to_syslog()
59            .map_err(watchdog::Error::Fidl)
60            .on_timeout_named(&FIDL_TIMEOUT_ID, FIDL_TIMEOUT, || Err(watchdog::Error::Timeout))
61            .await
62    }
63
64    fn get_device_diagnostics(
65        &self,
66        interface: u64,
67    ) -> Result<Self::DeviceDiagnostics, watchdog::Error> {
68        let interfaces_debug =
69            fuchsia_component::client::connect_to_protocol::<fnet_debug::InterfacesMarker>()
70                .map_err(|e| {
71                    error!(e:? = e; "failed to connect to protocol");
72                    watchdog::Error::NotSupported
73                })?;
74        let (port, server_end) = fidl::endpoints::create_proxy();
75        interfaces_debug.get_port(interface, server_end)?;
76
77        let (diagnostics, server_end) = fidl::endpoints::create_proxy();
78        port.get_diagnostics(server_end)?;
79
80        Ok(DeviceDiagnosticsProvider { interface, diagnostics, port })
81    }
82}
83
84struct DeviceDiagnosticsProvider {
85    interface: u64,
86    diagnostics: fhardware_network::DiagnosticsProxy,
87    port: fhardware_network::PortProxy,
88}
89
90#[async_trait::async_trait]
91impl watchdog::DeviceDiagnosticsProvider for DeviceDiagnosticsProvider {
92    async fn get_counters(&self) -> Result<watchdog::DeviceCounters, watchdog::Error> {
93        self.port
94            .get_counters()
95            .map_err(watchdog::Error::Fidl)
96            .and_then(
97                |fhardware_network::PortGetCountersResponse { rx_frames, tx_frames, .. }| match (
98                    rx_frames, tx_frames,
99                ) {
100                    (Some(rx_frames), Some(tx_frames)) => {
101                        futures::future::ok(watchdog::DeviceCounters { rx_frames, tx_frames })
102                    }
103                    (None, Some(_)) | (Some(_), None) | (None, None) => {
104                        error!(iface = self.interface; "missing information from port counters");
105                        futures::future::err(watchdog::Error::NotSupported)
106                    }
107                },
108            )
109            .on_timeout_named(&FIDL_TIMEOUT_ID, FIDL_TIMEOUT, || Err(watchdog::Error::Timeout))
110            .await
111    }
112
113    async fn log_debug_info(&self) -> Result<(), watchdog::Error> {
114        self.diagnostics
115            .log_debug_info_to_syslog()
116            .map_err(watchdog::Error::Fidl)
117            .on_timeout_named(&FIDL_TIMEOUT_ID, FIDL_TIMEOUT, || Err(watchdog::Error::Timeout))
118            .await
119    }
120}
121
122type Watchdog = watchdog::Watchdog<SystemDispatcher>;
123
124async fn handle_network_check_message<'a>(
125    netcheck_futures: &mut futures::stream::FuturesUnordered<
126        futures::future::BoxFuture<'a, (NetworkCheckCookie, NetworkCheckResult)>,
127    >,
128    msg: Option<(NetworkCheckAction, NetworkCheckCookie)>,
129) {
130    let (action, cookie) = msg
131        .context("network check receiver unexpectedly closed")
132        .unwrap_or_else(|err| exit_with_anyhow_error(err));
133    match action {
134        NetworkCheckAction::Ping(parameters) => {
135            netcheck_futures.push(Box::pin(async move {
136                let success = reachability_core::ping::Pinger
137                    .ping(&parameters.interface_name, parameters.addr)
138                    .await;
139                (cookie, NetworkCheckResult::Ping { parameters, success })
140            }));
141        }
142        NetworkCheckAction::ResolveDns(parameters) => {
143            netcheck_futures.push(Box::pin(async move {
144                let ips = reachability_core::dig::Digger::new()
145                    .dig(&parameters.interface_name, &parameters.domain)
146                    .await;
147                (cookie, NetworkCheckResult::ResolveDns { parameters, ips })
148            }));
149        }
150        NetworkCheckAction::Fetch(parameters) => {
151            netcheck_futures.push(Box::pin(async move {
152                let status = reachability_core::fetch::Fetcher
153                    .fetch(
154                        &parameters.interface_name,
155                        &parameters.domain,
156                        &parameters.path,
157                        &parameters.ip,
158                    )
159                    .await;
160                (cookie, NetworkCheckResult::Fetch { parameters, status })
161            }));
162        }
163    }
164}
165
166/// The event loop.
167pub struct EventLoop {
168    monitor: Monitor,
169    handler: ReachabilityHandler,
170    watchdog: Watchdog,
171    interface_properties: HashMap<
172        u64,
173        fnet_interfaces_ext::PropertiesAndState<(), fnet_interfaces_ext::DefaultInterest>,
174    >,
175    neighbor_cache: NeighborCache,
176    routes: RouteTable,
177    telemetry_sender: Option<TelemetrySender>,
178    network_check_receiver: mpsc::UnboundedReceiver<(NetworkCheckAction, NetworkCheckCookie)>,
179    inspector: &'static Inspector,
180}
181
182impl EventLoop {
183    /// `new` returns an `EventLoop` instance.
184    pub fn new(
185        monitor: Monitor,
186        handler: ReachabilityHandler,
187        network_check_receiver: mpsc::UnboundedReceiver<(NetworkCheckAction, NetworkCheckCookie)>,
188        inspector: &'static Inspector,
189    ) -> Self {
190        fuchsia_inspect::component::health().set_starting_up();
191        EventLoop {
192            monitor,
193            handler,
194            watchdog: Watchdog::new(),
195            interface_properties: Default::default(),
196            neighbor_cache: Default::default(),
197            routes: Default::default(),
198            telemetry_sender: None,
199            network_check_receiver,
200            inspector,
201        }
202    }
203
204    /// `run` starts the event loop.
205    pub async fn run(&mut self) -> Result<(), anyhow::Error> {
206        use fuchsia_component::client::connect_to_protocol;
207
208        let cobalt_svc = fuchsia_component::client::connect_to_protocol::<
209            fidl_fuchsia_metrics::MetricEventLoggerFactoryMarker,
210        >()
211        .context("connect to metrics service")
212        .unwrap_or_else(|err| exit_with_anyhow_error(err));
213
214        let cobalt_proxy = match telemetry::create_metrics_logger(cobalt_svc).await {
215            Ok(proxy) => proxy,
216            Err(e) => {
217                warn!("Metrics logging is unavailable: {}", e);
218
219                // If it is not possible to acquire a metrics logging proxy, create a disconnected
220                // proxy and attempt to serve the policy API with metrics disabled.
221                let (proxy, _) = fidl::endpoints::create_proxy::<
222                    fidl_fuchsia_metrics::MetricEventLoggerMarker,
223                >();
224                proxy
225            }
226        };
227
228        let telemetry_inspect_node = self.inspector.root().create_child("telemetry");
229        let (telemetry_sender, telemetry_fut) =
230            telemetry::serve_telemetry(cobalt_proxy, telemetry_inspect_node);
231        let telemetry_fut = telemetry_fut.fuse();
232        self.telemetry_sender = Some(telemetry_sender.clone());
233        self.monitor.set_telemetry_sender(telemetry_sender);
234
235        let if_watcher_stream = {
236            let interface_state = connect_to_protocol::<fnet_interfaces::StateMarker>()
237                .context("network_manager failed to connect to interface state")
238                .unwrap_or_else(|err| exit_with_anyhow_error(err));
239            fnet_interfaces_ext::event_stream_from_state(&interface_state, Default::default())
240                .context("get interface event stream")
241                .unwrap_or_else(|err| exit_with_anyhow_error(err))
242                .fuse()
243        };
244
245        let neigh_watcher_stream = {
246            let view = connect_to_protocol::<fnet_neighbor::ViewMarker>()
247                .context("failed to connect to neighbor view")
248                .unwrap_or_else(|err| exit_with_anyhow_error(err));
249            let (proxy, server_end) =
250                fidl::endpoints::create_proxy::<fnet_neighbor::EntryIteratorMarker>();
251            let () = view
252                .open_entry_iterator(server_end, &fnet_neighbor::EntryIteratorOptions::default())
253                .context("failed to open EntryIterator")
254                .unwrap_or_else(|err| exit_with_anyhow_error(err));
255            futures::stream::try_unfold(proxy, |proxy| {
256                proxy.get_next().map_ok(|e| {
257                    Some((
258                        futures::stream::iter(e.into_iter().map(Result::<_, fidl::Error>::Ok)),
259                        proxy,
260                    ))
261                })
262            })
263            .try_flatten()
264            .fuse()
265        };
266
267        let ipv4_route_event_stream = {
268            let state_v4 = connect_to_protocol::<fnet_routes::StateV4Marker>()
269                .context("failed to connect to fuchsia.net.routes/StateV4")
270                .unwrap_or_else(|err| exit_with_anyhow_error(err));
271            fnet_routes_ext::event_stream_from_state(&state_v4)
272                .context("failed to initialize a `WatcherV4` client")
273                .unwrap_or_else(|err| exit_with_anyhow_error(err))
274                .fuse()
275        };
276        let ipv6_route_event_stream = {
277            let state_v6 = connect_to_protocol::<fnet_routes::StateV6Marker>()
278                .context("failed to connect to fuchsia.net.routes/StateV6")
279                .unwrap_or_else(|err| exit_with_anyhow_error(err));
280            fnet_routes_ext::event_stream_from_state(&state_v6)
281                .context("failed to initialize a `WatcherV6` client")
282                .unwrap_or_else(|err| exit_with_anyhow_error(err))
283                .fuse()
284        };
285
286        let mut probe_futures = futures::stream::FuturesUnordered::new();
287        let mut netcheck_futures = futures::stream::FuturesUnordered::new();
288        let report_stream = fasync::Interval::new(REPORT_PERIOD).fuse();
289
290        let mut if_watcher_stream = pin!(if_watcher_stream);
291        let mut report_stream = pin!(report_stream);
292        let mut neigh_watcher_stream = pin!(neigh_watcher_stream);
293        let mut ipv4_route_event_stream = pin!(ipv4_route_event_stream);
294        let mut ipv6_route_event_stream = pin!(ipv6_route_event_stream);
295        let mut telemetry_fut = pin!(telemetry_fut);
296
297        // Establish the current routing table state.
298        let (v4_routes, v6_routes) = futures::join!(
299            fnet_routes_ext::collect_routes_until_idle::<_, HashSet<_>>(
300                ipv4_route_event_stream.by_ref(),
301            ),
302            fnet_routes_ext::collect_routes_until_idle::<_, HashSet<_>>(
303                ipv6_route_event_stream.by_ref(),
304            )
305        );
306        self.routes = RouteTable::new_with_existing_routes(
307            v4_routes
308                .context("get existing IPv4 routes")
309                .unwrap_or_else(|err| exit_with_anyhow_error(err)),
310            v6_routes
311                .context("get existing IPv6 routes")
312                .unwrap_or_else(|err| exit_with_anyhow_error(err)),
313        );
314
315        debug!("starting event loop");
316
317        fuchsia_inspect::component::health().set_ok();
318
319        loop {
320            select! {
321                if_watcher_res = if_watcher_stream.try_next() => {
322                    if let Some(id) = self.handle_interface_watcher_result(if_watcher_res).await {
323                        probe_futures.push(fasync::Interval::new(PROBE_PERIOD).map(move |()| id).into_future());
324                    }
325                },
326                neigh_res = neigh_watcher_stream.try_next() => {
327                    let event = neigh_res
328                        .unwrap_or_else(|err| exit_with_anyhow_error(
329                            anyhow!("neighbor watcher event stream fidl error: {err}")
330                        ))
331                        .unwrap_or_else(|| exit_with_anyhow_error(
332                            anyhow!("neighbor event stream ended")
333                        ));
334                    self.neighbor_cache.process_neighbor_event(event);
335                }
336                route_v4_res = ipv4_route_event_stream.try_next() => {
337                    let event = route_v4_res
338                        .unwrap_or_else(|err| exit_with_anyhow_error(
339                            anyhow!("ipv4 route watch error: {err}")
340                        ))
341                        .unwrap_or_else(|| exit_with_anyhow_error(
342                            anyhow!("ipv4 route event stream ended")
343                        ));
344                    self.handle_route_watcher_event(event);
345                }
346                route_v6_res = ipv6_route_event_stream.try_next() => {
347                    let event = route_v6_res
348                        .unwrap_or_else(|err| exit_with_anyhow_error(
349                            anyhow!("ipv6 route watch error: {err}")
350                        ))
351                        .unwrap_or_else(|| exit_with_anyhow_error(
352                            anyhow!("ipv6 route event stream ended")
353                        ));
354                    self.handle_route_watcher_event(event);
355                }
356                report = report_stream.next() => {
357                    let () = report
358                        .context("periodic timer for reporting unexpectedly ended")
359                        .unwrap_or_else(|err| exit_with_anyhow_error(err));
360                    let () = self.monitor.report_state();
361                },
362                probe = probe_futures.select_next_some() => {
363                    match probe {
364                        (Some(id), stream) => {
365                            if let Some(fnet_interfaces_ext::PropertiesAndState { properties, state: _ }) = self.interface_properties.get(&id) {
366                                let () = Self::begin_network_check(
367                                    &mut self.monitor,
368                                    &mut self.watchdog,
369                                    &mut self.handler,
370                                    properties,
371                                    &self.routes,
372                                    &self.neighbor_cache
373                                ).await;
374
375                                let () = probe_futures.push(stream.into_future());
376                            }
377                        }
378                        (None, _) => {
379                            exit_with_anyhow_error(anyhow!(
380                                "probe timer for probing reachability unexpectedly ended"
381                            ));
382                        }
383                    }
384                },
385                msg = self.network_check_receiver.next() => {
386                    handle_network_check_message(&mut netcheck_futures, msg).await;
387                },
388                netcheck_res = netcheck_futures.select_next_some() => {
389                    self.handle_netcheck_response(netcheck_res).await;
390                },
391                telemetry_res = telemetry_fut => exit_with_anyhow_error(
392                    anyhow!("unexpectedly stopped serving telemetry: {telemetry_res:?}")
393                ),
394            }
395        }
396    }
397
398    async fn handle_interface_watcher_result(
399        &mut self,
400        if_watcher_res: Result<
401            Option<fnet_interfaces_ext::EventWithInterest<fnet_interfaces_ext::DefaultInterest>>,
402            fidl::Error,
403        >,
404    ) -> Option<u64> {
405        let event = if_watcher_res
406            .unwrap_or_else(|err| {
407                exit_with_anyhow_error(anyhow!("interface watcher event stream fidl error: {err}"))
408            })
409            .unwrap_or_else(|| {
410                exit_with_anyhow_error(anyhow!("interface watcher stream unexpectedly ended"))
411            });
412
413        // Only proceed with tracking interfaces in `interface_properties` if
414        // the interface should be monitored.
415        let should_monitor_interface = match event.inner() {
416            fnet_interfaces::Event::Existing(properties)
417            | fnet_interfaces::Event::Added(properties) => Self::should_monitor_interface(
418                properties
419                    .port_class
420                    .clone()
421                    .expect("non-Changed Events should have port_class present"),
422            ),
423            // Changed and Removed events are preceded by a previous Existing
424            // or Added event that add the interface to `interface_properties`.
425            // When the interface does not exist in the map, don't process the
426            // Changed or Removed event.
427            fnet_interfaces::Event::Changed(fnet_interfaces::Properties { id, .. }) => self
428                .interface_properties
429                .contains_key(&id.expect("Changed events must include the interface id")),
430            fnet_interfaces::Event::Removed(id) => self.interface_properties.contains_key(id),
431            fnet_interfaces::Event::Idle(_) => true,
432        };
433        if !should_monitor_interface {
434            return None;
435        }
436
437        let discovered_id = self
438            .handle_interface_watcher_event(event)
439            .await
440            .unwrap_or_else(|err| exit_with_anyhow_error(err));
441        if let Some(id) = discovered_id {
442            if let Some(telemetry_sender) = &self.telemetry_sender {
443                let has_default_ipv4_route =
444                    self.interface_properties.values().any(|p| p.properties.has_default_ipv4_route);
445                let has_default_ipv6_route =
446                    self.interface_properties.values().any(|p| p.properties.has_default_ipv6_route);
447                telemetry_sender.send(TelemetryEvent::NetworkConfig {
448                    has_default_ipv4_route,
449                    has_default_ipv6_route,
450                });
451            }
452            return Some(id);
453        }
454        None
455    }
456
457    async fn handle_interface_watcher_event(
458        &mut self,
459        event: fnet_interfaces_ext::EventWithInterest<fnet_interfaces_ext::DefaultInterest>,
460    ) -> Result<Option<u64>, anyhow::Error> {
461        match self
462            .interface_properties
463            .update(event)
464            .context("failed to update interface properties map with watcher event")?
465        {
466            fnet_interfaces_ext::UpdateResult::Added { properties, state: _ }
467            | fnet_interfaces_ext::UpdateResult::Existing { properties, state: _ } => {
468                let id = properties.id;
469                debug!("setting timer for interface {}", id);
470
471                let () = Self::begin_network_check(
472                    &mut self.monitor,
473                    &mut self.watchdog,
474                    &mut self.handler,
475                    properties,
476                    &self.routes,
477                    &self.neighbor_cache,
478                )
479                .await;
480
481                return Ok(Some(id.get()));
482            }
483            fnet_interfaces_ext::UpdateResult::Changed {
484                previous:
485                    fnet_interfaces::Properties {
486                        online,
487                        addresses,
488                        has_default_ipv4_route,
489                        has_default_ipv6_route,
490                        ..
491                    },
492                current:
493                    properties @ fnet_interfaces_ext::Properties {
494                        addresses: current_addresses,
495                        id: _,
496                        name: _,
497                        port_class: _,
498                        online: _,
499                        has_default_ipv4_route: _,
500                        has_default_ipv6_route: _,
501                        port_identity_koid: _,
502                    },
503                state: _,
504            } => {
505                // NOTE: Filter changed events to only changes that might affect
506                // reachability. This acts as a guard against too many
507                // reachability checks in case of fuchsia.net.interfaces adding
508                // more fields.
509                if online.is_some()
510                    || has_default_ipv4_route.is_some()
511                    || has_default_ipv6_route.is_some()
512                    || addresses.is_some_and(|addresses| {
513                        let previous = addresses
514                            .iter()
515                            .filter_map(|fnet_interfaces::Address { addr, .. }| addr.as_ref());
516                        let current = current_addresses.iter().map(
517                            |fnet_interfaces_ext::Address {
518                                 addr,
519                                 valid_until: _,
520                                 preferred_lifetime_info: _,
521                                 assignment_state,
522                             }| {
523                                assert_eq!(
524                                    *assignment_state,
525                                    fnet_interfaces::AddressAssignmentState::Assigned
526                                );
527                                addr
528                            },
529                        );
530                        previous.ne(current)
531                    })
532                {
533                    let () = Self::begin_network_check(
534                        &mut self.monitor,
535                        &mut self.watchdog,
536                        &mut self.handler,
537                        properties,
538                        &self.routes,
539                        &self.neighbor_cache,
540                    )
541                    .await;
542                }
543            }
544            fnet_interfaces_ext::UpdateResult::Removed(
545                fnet_interfaces_ext::PropertiesAndState { properties, state: () },
546            ) => {
547                self.watchdog.handle_interface_removed(properties.id.get());
548                self.monitor.handle_interface_removed(properties);
549            }
550            fnet_interfaces_ext::UpdateResult::NoChange => {}
551        }
552        Ok(None)
553    }
554
555    /// Determine whether an interface should be monitored or not.
556    fn should_monitor_interface(port_class: fnet_interfaces::PortClass) -> bool {
557        match port_class {
558            fnet_interfaces::PortClass::Loopback(_)
559            | fnet_interfaces::PortClass::Blackhole(_)
560            | fnet_interfaces::PortClass::Device(fhardware_network::PortClass::Lowpan) => false,
561            fnet_interfaces::PortClass::Device(fhardware_network::PortClass::Virtual)
562            | fnet_interfaces::PortClass::Device(fhardware_network::PortClass::Ethernet)
563            | fnet_interfaces::PortClass::Device(fhardware_network::PortClass::WlanClient)
564            | fnet_interfaces::PortClass::Device(fhardware_network::PortClass::WlanAp)
565            | fnet_interfaces::PortClass::Device(fhardware_network::PortClass::Ppp)
566            | fnet_interfaces::PortClass::Device(fhardware_network::PortClass::Bridge) => true,
567            fnet_interfaces::PortClass::Device(
568                fhardware_network::PortClass::__SourceBreaking { unknown_ordinal },
569            ) => {
570                log::warn!("unknown fhardware_network::PortClass ordinal {unknown_ordinal:?}");
571                false
572            }
573            fnet_interfaces::PortClass::__SourceBreaking { unknown_ordinal } => {
574                log::warn!("unknown fnet_interfaces::PortClass ordinal {unknown_ordinal:?}");
575                false
576            }
577        }
578    }
579
580    /// Handles events observed by the route watchers by adding/removing routes
581    /// from the underlying `RouteTable`.
582    ///
583    /// # Panics
584    ///
585    /// Panics if the given event is unexpected (e.g. not an add or remove).
586    pub fn handle_route_watcher_event<I: net_types::ip::Ip>(
587        &mut self,
588        event: fnet_routes_ext::Event<I>,
589    ) {
590        match event {
591            fnet_routes_ext::Event::Added(route) => {
592                if !self.routes.add_route(route) {
593                    error!("Received add event for already existing route: {:?}", route)
594                }
595            }
596            fnet_routes_ext::Event::Removed(route) => {
597                if !self.routes.remove_route(&route) {
598                    error!("Received removed event for non-existing route: {:?}", route)
599                }
600            }
601            // Note that we don't expect to observe any existing events, because
602            // the route watchers were drained of existing events prior to
603            // starting the event loop.
604            fnet_routes_ext::Event::Existing(_)
605            | fnet_routes_ext::Event::Idle
606            | fnet_routes_ext::Event::Unknown => {
607                panic!("route watcher observed unexpected event: {:?}", event)
608            }
609        }
610    }
611
612    async fn begin_network_check(
613        monitor: &mut Monitor,
614        watchdog: &mut Watchdog,
615        handler: &mut ReachabilityHandler,
616        properties: &fnet_interfaces_ext::Properties<fnet_interfaces_ext::DefaultInterest>,
617        routes: &RouteTable,
618        neighbor_cache: &NeighborCache,
619    ) {
620        let view = InterfaceView {
621            properties,
622            routes,
623            neighbors: neighbor_cache.get_interface_neighbors(properties.id.get()),
624        };
625
626        // TODO(https://fxbug.dev/42074495): Move watchdog into its own future in the eventloop to prevent
627        // network check reliance on the watchdog completing.
628        let () = watchdog
629            .check_interface_state(zx::MonotonicInstant::get(), &SystemDispatcher {}, view)
630            .await;
631
632        let (system_internet, system_gateway, system_dns, system_http) = {
633            match monitor.begin(view) {
634                Ok(NetworkCheckerOutcome::MustResume) => return,
635                Ok(NetworkCheckerOutcome::Complete) => {
636                    let monitor_state = monitor.state();
637                    (
638                        monitor_state.system_has_internet(),
639                        monitor_state.system_has_gateway(),
640                        monitor_state.system_has_dns(),
641                        monitor_state.system_has_http(),
642                    )
643                }
644                Err(e) => {
645                    info!("begin network check error: {:?}", e);
646                    return;
647                }
648            }
649        };
650
651        handler
652            .replace_state(ReachabilityState {
653                internet_available: system_internet,
654                gateway_reachable: system_gateway,
655                dns_active: system_dns,
656                http_active: system_http,
657            })
658            .await;
659    }
660
661    // TODO(https://fxbug.dev/42076412): handle_netcheck_response and handle_network_check_message are missing
662    // tests because they reply on NetworkCheckCookie, which cannot be created in the event loop.
663    async fn handle_netcheck_response(
664        &mut self,
665        (cookie, result): (NetworkCheckCookie, NetworkCheckResult),
666    ) {
667        match self.monitor.resume(cookie, result) {
668            Ok(NetworkCheckerOutcome::MustResume) => {}
669            Ok(NetworkCheckerOutcome::Complete) => {
670                let (system_internet, system_gateway, system_dns, system_http) = {
671                    let monitor_state = self.monitor.state();
672                    (
673                        monitor_state.system_has_internet(),
674                        monitor_state.system_has_gateway(),
675                        monitor_state.system_has_dns(),
676                        monitor_state.system_has_http(),
677                    )
678                };
679
680                self.handler
681                    .replace_state(ReachabilityState {
682                        internet_available: system_internet,
683                        gateway_reachable: system_gateway,
684                        dns_active: system_dns,
685                        http_active: system_http,
686                    })
687                    .await;
688            }
689            Err(e) => error!("resume network check error: {:?}", e),
690        }
691    }
692}
693
694/// If we encounter an unrecoverable state, log an error and exit.
695//
696// TODO(https://fxbug.dev/42070352): add a test that works as intended.
697fn exit_with_anyhow_error(cause: anyhow::Error) -> ! {
698    error!(cause:%; "exiting due to error");
699    std::process::exit(1);
700}
701
702#[cfg(test)]
703mod tests {
704    use super::*;
705    #[allow(unused)]
706    use assert_matches::assert_matches as _;
707    use fidl_fuchsia_net::{IpAddress, Ipv4Address, Ipv6Address, Subnet};
708    use fidl_fuchsia_net_interfaces::{Address, AddressAssignmentState, Event, Properties};
709    use net_declare::{std_ip_v4, std_ip_v6};
710
711    fn create_eventloop() -> EventLoop {
712        let handler = ReachabilityHandler::new();
713        let inspector = fuchsia_inspect::component::inspector();
714        let (sender, receiver) =
715            futures::channel::mpsc::unbounded::<(NetworkCheckAction, NetworkCheckCookie)>();
716        let mut monitor = Monitor::new(sender).expect("failed to create reachability monitor");
717        let () = monitor.set_inspector(inspector);
718
719        return EventLoop::new(monitor, handler, receiver, inspector);
720    }
721
722    #[fuchsia::test]
723    async fn test_handle_interface_watcher_result_ipv4() {
724        let mut event_loop = create_eventloop();
725
726        let v4_subnet = Subnet {
727            addr: IpAddress::Ipv4(Ipv4Address { addr: std_ip_v4!("192.0.2.1").octets() }),
728            prefix_len: 16,
729        };
730
731        let addr = Address {
732            addr: Some(v4_subnet),
733            assignment_state: Some(AddressAssignmentState::Assigned),
734            ..Default::default()
735        };
736
737        let mut props = Properties {
738            id: Some(12345),
739            addresses: Some(vec![addr]),
740            online: Some(true),
741            port_class: Some(fnet_interfaces::PortClass::Device(
742                fidl_fuchsia_hardware_network::PortClass::Ethernet,
743            )),
744            has_default_ipv4_route: Some(true),
745            has_default_ipv6_route: Some(true),
746            name: Some("IPv4 Reachability Test Interface".to_string()),
747            ..Default::default()
748        };
749
750        let event_res = Ok(Some(Event::Existing(props.clone()).into()));
751        assert_eq!(event_loop.handle_interface_watcher_result(event_res).await, Some(12345));
752
753        props.id = Some(54321);
754        let event_res = Ok(Some(Event::Added(props.clone()).into()));
755        assert_eq!(event_loop.handle_interface_watcher_result(event_res).await, Some(54321));
756    }
757
758    #[fuchsia::test]
759    async fn test_handle_interface_watcher_result_ipv6() {
760        let mut event_loop = create_eventloop();
761
762        let v6_subnet = Subnet {
763            addr: IpAddress::Ipv6(Ipv6Address { addr: std_ip_v6!("2001:db8::1").octets() }),
764            prefix_len: 16,
765        };
766
767        let addr = Address {
768            addr: Some(v6_subnet),
769            assignment_state: Some(AddressAssignmentState::Assigned),
770            ..Default::default()
771        };
772
773        let mut props = Properties {
774            id: Some(12345),
775            addresses: Some(vec![addr]),
776            online: Some(true),
777            port_class: Some(fnet_interfaces::PortClass::Device(
778                fidl_fuchsia_hardware_network::PortClass::Ethernet,
779            )),
780            has_default_ipv4_route: Some(true),
781            has_default_ipv6_route: Some(true),
782            name: Some("IPv6 Reachability Test Interface".to_string()),
783            ..Default::default()
784        };
785
786        let event_res = Ok(Some(Event::Existing(props.clone()).into()));
787        assert_eq!(event_loop.handle_interface_watcher_result(event_res).await, Some(12345));
788
789        props.id = Some(54321);
790        let event_res = Ok(Some(Event::Added(props.clone()).into()));
791        assert_eq!(event_loop.handle_interface_watcher_result(event_res).await, Some(54321));
792    }
793}