stack_migration/
main.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod rollback;
6
7use std::pin::{pin, Pin};
8
9use cobalt_client::traits::AsEventCode as _;
10use fuchsia_async::Task;
11use fuchsia_component::server::{ServiceFs, ServiceFsDir};
12use fuchsia_inspect::Property as _;
13use futures::channel::mpsc;
14use futures::{Stream, StreamExt as _};
15use log::{error, info, warn};
16use networking_metrics_registry::networking_metrics_registry as metrics_registry;
17use {
18    fidl_fuchsia_metrics as fmetrics, fidl_fuchsia_net_stackmigrationdeprecated as fnet_migration,
19    fidl_fuchsia_power_internal as fpower,
20};
21
22const DEFAULT_NETSTACK: NetstackVersion = NetstackVersion::Netstack2;
23
24#[derive(Debug, Clone, Copy, serde::Deserialize, serde::Serialize, Eq, PartialEq)]
25enum NetstackVersion {
26    Netstack2,
27    Netstack3,
28}
29
30impl NetstackVersion {
31    fn inspect_uint_value(&self) -> u64 {
32        match self {
33            Self::Netstack2 => 2,
34            Self::Netstack3 => 3,
35        }
36    }
37
38    fn optional_inspect_uint_value(o: &Option<Self>) -> u64 {
39        o.as_ref().map(Self::inspect_uint_value).unwrap_or(0)
40    }
41}
42
43impl From<fnet_migration::NetstackVersion> for NetstackVersion {
44    fn from(value: fnet_migration::NetstackVersion) -> Self {
45        match value {
46            fnet_migration::NetstackVersion::Netstack2 => NetstackVersion::Netstack2,
47            fnet_migration::NetstackVersion::Netstack3 => NetstackVersion::Netstack3,
48        }
49    }
50}
51
52impl From<NetstackVersion> for fnet_migration::NetstackVersion {
53    fn from(value: NetstackVersion) -> Self {
54        match value {
55            NetstackVersion::Netstack2 => fnet_migration::NetstackVersion::Netstack2,
56            NetstackVersion::Netstack3 => fnet_migration::NetstackVersion::Netstack3,
57        }
58    }
59}
60
61impl From<NetstackVersion> for Box<fnet_migration::VersionSetting> {
62    fn from(value: NetstackVersion) -> Self {
63        Box::new(fnet_migration::VersionSetting { version: value.into() })
64    }
65}
66
67#[derive(Debug, PartialEq)]
68enum RollbackNetstackVersion {
69    Netstack2,
70    // The automated setting requested Netstack3, but the persisted state
71    // indicates that the previous boot had too many health check failure.
72    // Forcibly use Netstack2.
73    ForceNetstack2,
74    Netstack3,
75}
76
77impl RollbackNetstackVersion {
78    // Convert into a `NetstackVersion`, while honoring the forced setting.
79    fn version(&self) -> NetstackVersion {
80        match self {
81            Self::Netstack2 | Self::ForceNetstack2 => NetstackVersion::Netstack2,
82            Self::Netstack3 => NetstackVersion::Netstack3,
83        }
84    }
85
86    // Convert into a `NetstackVersion`, while ignoring the forced setting.
87    fn version_ignoring_force(&self) -> NetstackVersion {
88        match self {
89            Self::Netstack2 => NetstackVersion::Netstack2,
90            Self::Netstack3 | Self::ForceNetstack2 => NetstackVersion::Netstack3,
91        }
92    }
93}
94
95impl From<NetstackVersion> for RollbackNetstackVersion {
96    fn from(version: NetstackVersion) -> Self {
97        match version {
98            NetstackVersion::Netstack2 => RollbackNetstackVersion::Netstack2,
99            NetstackVersion::Netstack3 => RollbackNetstackVersion::Netstack3,
100        }
101    }
102}
103
104#[derive(Default, Debug, serde::Deserialize, serde::Serialize)]
105#[cfg_attr(test, derive(Eq, PartialEq))]
106struct Persisted {
107    automated: Option<NetstackVersion>,
108    user: Option<NetstackVersion>,
109    rollback: Option<rollback::Persisted>,
110}
111
112impl Persisted {
113    fn load<R: std::io::Read>(r: R) -> Self {
114        serde_json::from_reader(std::io::BufReader::new(r)).unwrap_or_else(|e| {
115            error!("error loading persisted config {e:?}, using defaults");
116            Persisted::default()
117        })
118    }
119
120    fn save<W: std::io::Write>(&self, w: W) {
121        serde_json::to_writer(w, self).unwrap_or_else(|e: serde_json::Error| {
122            error!("error persisting configuration {self:?}: {e:?}")
123        })
124    }
125
126    // Determine the desired NetstackVersion based on the persisted values
127    fn desired_netstack_version(&self) -> RollbackNetstackVersion {
128        match self {
129            Persisted { user: Some(user), automated: _, rollback: _ } => (*user).into(),
130            Persisted {
131                user: None,
132                rollback: Some(rollback::Persisted::HealthcheckFailures(failures)),
133                automated: Some(NetstackVersion::Netstack3),
134            } if *failures >= rollback::MAX_FAILED_HEALTHCHECKS => {
135                RollbackNetstackVersion::ForceNetstack2
136            }
137            Persisted { user: None, automated: Some(automated), rollback: _ } => {
138                (*automated).into()
139            }
140            // Use the default version if nothing is set.
141            Persisted { user: None, automated: None, rollback: _ } => DEFAULT_NETSTACK.into(),
142        }
143    }
144}
145
146enum ServiceRequest {
147    Control(fnet_migration::ControlRequest),
148    State(fnet_migration::StateRequest),
149}
150
151struct Migration<P, CR> {
152    current_boot: RollbackNetstackVersion,
153    persisted: Persisted,
154    persistence: P,
155    collaborative_reboot: CollaborativeReboot<CR>,
156}
157
158trait PersistenceProvider {
159    type Writer: std::io::Write;
160    type Reader: std::io::Read;
161
162    fn open_writer(&mut self) -> std::io::Result<Self::Writer>;
163    fn open_reader(&self) -> std::io::Result<Self::Reader>;
164}
165
166struct DataPersistenceProvider {}
167
168const PERSISTED_FILE_PATH: &'static str = "/data/config.json";
169
170impl PersistenceProvider for DataPersistenceProvider {
171    type Writer = std::fs::File;
172    type Reader = std::fs::File;
173
174    fn open_writer(&mut self) -> std::io::Result<Self::Writer> {
175        std::fs::File::create(PERSISTED_FILE_PATH)
176    }
177
178    fn open_reader(&self) -> std::io::Result<Self::Reader> {
179        std::fs::File::open(PERSISTED_FILE_PATH)
180    }
181}
182
183struct CollaborativeReboot<CR> {
184    scheduler: CR,
185    /// `Some(<cancellation_token>)` if there's an outstanding collaborative
186    /// reboot scheduled.
187    scheduled_req: Option<zx::EventPair>,
188}
189
190impl<CR: CollaborativeRebootScheduler> CollaborativeReboot<CR> {
191    /// Schedules a collaborative reboot.
192    ///
193    /// No-Op if there's already a reboot scheduled.
194    async fn schedule(&mut self) {
195        let Self { scheduler, scheduled_req } = self;
196        if scheduled_req.is_some() {
197            // We already have an outstanding request.
198            return;
199        }
200
201        info!("Scheduling collaborative reboot");
202        let (mine, theirs) = zx::EventPair::create();
203        *scheduled_req = Some(mine);
204        scheduler
205            .schedule(fpower::CollaborativeRebootReason::NetstackMigration, Some(theirs))
206            .await;
207    }
208
209    /// Cancels the currently scheduled collaborative Reboot.
210    ///
211    /// No-Op if there's none scheduled.
212    fn cancel(&mut self) {
213        if let Some(cancel) = self.scheduled_req.take() {
214            info!("Canceling collaborative reboot request. It's no longer necessary.");
215            // Dropping the eventpair cancels the request.
216            std::mem::drop(cancel);
217        }
218    }
219}
220
221/// An abstraction over the `fpower::CollaborativeRebootScheduler` FIDL API.
222trait CollaborativeRebootScheduler {
223    async fn schedule(
224        &mut self,
225        reason: fpower::CollaborativeRebootReason,
226        cancel: Option<zx::EventPair>,
227    );
228}
229
230/// An implementation of `CollaborativeRebootScheduler` that connects to the
231/// API over FIDL.
232struct Scheduler {}
233
234impl CollaborativeRebootScheduler for Scheduler {
235    async fn schedule(
236        &mut self,
237        reason: fpower::CollaborativeRebootReason,
238        cancel: Option<zx::EventPair>,
239    ) {
240        let proxy = match fuchsia_component::client::connect_to_protocol::<
241            fpower::CollaborativeRebootSchedulerMarker,
242        >() {
243            Ok(proxy) => proxy,
244            Err(e) => {
245                error!("Failed to connect to collaborative reboot scheduler: {e:?}");
246                return;
247            }
248        };
249        match proxy.schedule_reboot(reason, cancel).await {
250            Ok(()) => {}
251            Err(e) => error!("Failed to schedule collaborative reboot: {e:?}"),
252        }
253    }
254}
255
256impl<P: PersistenceProvider, CR: CollaborativeRebootScheduler> Migration<P, CR> {
257    fn new(persistence: P, cr_scheduler: CR) -> Self {
258        let persisted = persistence.open_reader().map(Persisted::load).unwrap_or_else(|e| {
259            warn!("could not open persistence reader: {e:?}. using defaults");
260            Persisted::default()
261        });
262        let current_boot = persisted.desired_netstack_version();
263
264        if current_boot == RollbackNetstackVersion::ForceNetstack2 {
265            warn!(
266                "Previous boot failed to migrate to Netstack3. \
267                Ignoring automated setting and forcibly using Netstack2."
268            );
269        }
270
271        Self {
272            current_boot,
273            persisted,
274            persistence,
275            collaborative_reboot: CollaborativeReboot {
276                scheduler: cr_scheduler,
277                scheduled_req: None,
278            },
279        }
280    }
281
282    fn persist(&mut self) {
283        let Self { current_boot: _, persisted, persistence, collaborative_reboot: _ } = self;
284        let w = match persistence.open_writer() {
285            Ok(w) => w,
286            Err(e) => {
287                error!("failed to open writer to persist settings: {e:?}");
288                return;
289            }
290        };
291        persisted.save(w);
292    }
293
294    fn map_version_setting(
295        version: Option<Box<fnet_migration::VersionSetting>>,
296    ) -> Option<NetstackVersion> {
297        version.map(|v| {
298            let fnet_migration::VersionSetting { version } = &*v;
299            (*version).into()
300        })
301    }
302
303    async fn update_collaborative_reboot(&mut self) {
304        let Self { current_boot, persisted, persistence: _, collaborative_reboot } = self;
305        if persisted.desired_netstack_version().version() != current_boot.version() {
306            // When the current boot differs from our desired version, schedule
307            // a reboot (if there's not already one).
308            collaborative_reboot.schedule().await
309        } else {
310            // When the current_boot matches our desired version, we no longer
311            // need reboot. Cancel the outstanding request (if any)
312            collaborative_reboot.cancel()
313        }
314    }
315
316    async fn update_rollback_state(&mut self, new_state: rollback::Persisted) {
317        if self.persisted.rollback != Some(new_state) {
318            self.persisted.rollback = Some(new_state);
319            self.update_collaborative_reboot().await;
320            self.persist();
321        }
322    }
323
324    async fn handle_control_request(
325        &mut self,
326        req: fnet_migration::ControlRequest,
327    ) -> Result<(), fidl::Error> {
328        match req {
329            fnet_migration::ControlRequest::SetAutomatedNetstackVersion { version, responder } => {
330                let version = Self::map_version_setting(version);
331                let Self {
332                    current_boot: _,
333                    persisted: Persisted { automated, user: _, rollback: _ },
334                    persistence: _,
335                    collaborative_reboot: _,
336                } = self;
337                if version != *automated {
338                    info!("automated netstack version switched to {version:?}");
339                    *automated = version;
340                    self.persist();
341                    self.update_collaborative_reboot().await;
342                }
343                responder.send()
344            }
345            fnet_migration::ControlRequest::SetUserNetstackVersion { version, responder } => {
346                let version = Self::map_version_setting(version);
347                let Self {
348                    current_boot: _,
349                    persisted: Persisted { automated: _, user, rollback: _ },
350                    persistence: _,
351                    collaborative_reboot: _,
352                } = self;
353                if version != *user {
354                    info!("user netstack version switched to {version:?}");
355                    *user = version;
356                    self.persist();
357                    self.update_collaborative_reboot().await;
358                }
359                responder.send()
360            }
361        }
362    }
363
364    fn handle_state_request(&self, req: fnet_migration::StateRequest) -> Result<(), fidl::Error> {
365        let Migration {
366            current_boot,
367            persisted: Persisted { user, automated, rollback: _ },
368            persistence: _,
369            collaborative_reboot: _,
370        } = self;
371        match req {
372            fnet_migration::StateRequest::GetNetstackVersion { responder } => {
373                responder.send(&fnet_migration::InEffectVersion {
374                    current_boot: current_boot.version().into(),
375                    user: (*user).map(Into::into),
376                    automated: (*automated).map(Into::into),
377                })
378            }
379        }
380    }
381
382    async fn handle_request(&mut self, req: ServiceRequest) -> Result<(), fidl::Error> {
383        match req {
384            ServiceRequest::Control(r) => self.handle_control_request(r).await,
385            ServiceRequest::State(r) => self.handle_state_request(r),
386        }
387    }
388}
389
390struct InspectNodes {
391    automated_setting: fuchsia_inspect::UintProperty,
392    user_setting: fuchsia_inspect::UintProperty,
393    rollback_state: fuchsia_inspect::StringProperty,
394}
395
396impl InspectNodes {
397    fn new<P, CR>(inspector: &fuchsia_inspect::Inspector, m: &Migration<P, CR>) -> Self {
398        let root = inspector.root();
399        let Migration { current_boot, persisted: Persisted { automated, user, rollback }, .. } = m;
400        let automated_setting = root.create_uint(
401            "automated_setting",
402            NetstackVersion::optional_inspect_uint_value(automated),
403        );
404        let user_setting =
405            root.create_uint("user_setting", NetstackVersion::optional_inspect_uint_value(user));
406
407        let rollback_state = root.create_string("rollback_state", format!("{rollback:?}"));
408
409        // The current boot version is immutable, record it once instead of
410        // keeping track of a property node.
411        root.record_uint("current_boot", current_boot.version().inspect_uint_value());
412        root.record_bool(
413            "forced_netstack2",
414            *current_boot == RollbackNetstackVersion::ForceNetstack2,
415        );
416
417        Self { automated_setting, user_setting, rollback_state }
418    }
419
420    fn update<P, CR>(&self, m: &Migration<P, CR>) {
421        let Migration { persisted: Persisted { automated, user, rollback }, .. } = m;
422        let Self { automated_setting, user_setting, rollback_state } = self;
423        automated_setting.set(NetstackVersion::optional_inspect_uint_value(automated));
424        user_setting.set(NetstackVersion::optional_inspect_uint_value(user));
425        rollback_state.set(&format!("{rollback:?}"));
426    }
427}
428
429/// Wraps communication with metrics (cobalt) server.
430struct MetricsLogger {
431    logger: Option<fmetrics::MetricEventLoggerProxy>,
432}
433
434impl MetricsLogger {
435    async fn new() -> Self {
436        let (logger, server_end) =
437            fidl::endpoints::create_proxy::<fmetrics::MetricEventLoggerMarker>();
438
439        let factory = match fuchsia_component::client::connect_to_protocol::<
440            fmetrics::MetricEventLoggerFactoryMarker,
441        >() {
442            Ok(f) => f,
443            Err(e) => {
444                warn!("can't connect to logger factory {e:?}");
445                return Self { logger: None };
446            }
447        };
448
449        match factory
450            .create_metric_event_logger(
451                &fmetrics::ProjectSpec {
452                    customer_id: Some(metrics_registry::CUSTOMER_ID),
453                    project_id: Some(metrics_registry::PROJECT_ID),
454                    ..Default::default()
455                },
456                server_end,
457            )
458            .await
459        {
460            Ok(Ok(())) => Self { logger: Some(logger) },
461            Ok(Err(e)) => {
462                warn!("can't create event logger {e:?}");
463                Self { logger: None }
464            }
465            Err(e) => {
466                warn!("error connecting to metric event logger {e:?}");
467                Self { logger: None }
468            }
469        }
470    }
471
472    /// Logs metrics from `migration` to the metrics server.
473    async fn log_metrics<P, CR>(&self, migration: &Migration<P, CR>) {
474        let logger = if let Some(logger) = self.logger.as_ref() {
475            logger
476        } else {
477            // Silently don't log metrics if we didn't manage to create a
478            // logger, warnings are emitted upon creation.
479            return;
480        };
481
482        let current_boot = match migration.current_boot {
483            RollbackNetstackVersion::Netstack2 | RollbackNetstackVersion::ForceNetstack2 => {
484                metrics_registry::StackMigrationCurrentBootMetricDimensionNetstackVersion::Netstack2
485            }
486            RollbackNetstackVersion::Netstack3 => {
487                metrics_registry::StackMigrationCurrentBootMetricDimensionNetstackVersion::Netstack3
488            }
489        }
490        .as_event_code();
491        let user = match migration.persisted.user {
492            None => metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion::NoSelection,
493            Some(NetstackVersion::Netstack2) => metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion::Netstack2,
494            Some(NetstackVersion::Netstack3) => metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion::Netstack3,
495        }
496        .as_event_code();
497        let automated = match migration.persisted.automated {
498            None => metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion::NoSelection,
499            Some(NetstackVersion::Netstack2) => metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion::Netstack2,
500            Some(NetstackVersion::Netstack3) => metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion::Netstack3,
501        }.as_event_code();
502        let rollback_state = compute_state_metric(migration).as_event_code();
503        for (metric_id, event_code) in [
504            (metrics_registry::STACK_MIGRATION_CURRENT_BOOT_METRIC_ID, current_boot),
505            (metrics_registry::STACK_MIGRATION_USER_SETTING_METRIC_ID, user),
506            (metrics_registry::STACK_MIGRATION_AUTOMATED_SETTING_METRIC_ID, automated),
507            (metrics_registry::STACK_MIGRATION_STATE_METRIC_ID, rollback_state),
508        ] {
509            let occurrence_count = 1;
510            logger
511                .log_occurrence(metric_id, occurrence_count, &[event_code][..])
512                .await
513                .map(|r| {
514                    r.unwrap_or_else(|e| warn!("error reported logging metric {metric_id} {e:?}"))
515                })
516                .unwrap_or_else(|fidl_error| {
517                    warn!("error logging metric {metric_id} {fidl_error:?}")
518                });
519        }
520    }
521}
522
523fn compute_state_metric<P, CR>(
524    migration: &Migration<P, CR>,
525) -> metrics_registry::StackMigrationStateMetricDimensionMigrationState {
526    use metrics_registry::StackMigrationStateMetricDimensionMigrationState as state_metric;
527    let Migration {
528        current_boot,
529        persisted: Persisted { automated, user: _, rollback },
530        persistence: _,
531        collaborative_reboot: _,
532    } = migration;
533
534    match (current_boot, automated, rollback) {
535        (RollbackNetstackVersion::Netstack2, Some(NetstackVersion::Netstack2) | None, _) => {
536            state_metric::NotStarted
537        }
538        (RollbackNetstackVersion::Netstack2, Some(NetstackVersion::Netstack3), _) => {
539            state_metric::Scheduled
540        }
541        (RollbackNetstackVersion::ForceNetstack2, _, _) => state_metric::RolledBack,
542        (RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack2) | None, _) => {
543            state_metric::Canceled
544        }
545        (RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack3), None) => {
546            state_metric::InProgress
547        }
548        (
549            RollbackNetstackVersion::Netstack3,
550            Some(NetstackVersion::Netstack3),
551            Some(rollback::Persisted::HealthcheckFailures(f)),
552        ) => {
553            if *f >= rollback::MAX_FAILED_HEALTHCHECKS {
554                state_metric::Failed
555            } else {
556                state_metric::InProgress
557            }
558        }
559        (
560            RollbackNetstackVersion::Netstack3,
561            Some(NetstackVersion::Netstack3),
562            Some(rollback::Persisted::Success),
563        ) => state_metric::Success,
564    }
565}
566
567#[fuchsia::main]
568pub async fn main() {
569    info!("running netstack migration service");
570
571    let mut fs = ServiceFs::new();
572    let _: &mut ServiceFsDir<'_, _> = fs
573        .dir("svc")
574        .add_fidl_service(|rs: fnet_migration::ControlRequestStream| {
575            rs.map(|req| req.map(ServiceRequest::Control)).left_stream()
576        })
577        .add_fidl_service(|rs: fnet_migration::StateRequestStream| {
578            rs.map(|req| req.map(ServiceRequest::State)).right_stream()
579        });
580    let _: &mut ServiceFs<_> =
581        fs.take_and_serve_directory_handle().expect("failed to take out directory handle");
582
583    let mut migration = Migration::new(DataPersistenceProvider {}, Scheduler {});
584    main_inner(
585        &mut migration,
586        fs.fuse().flatten_unordered(None),
587        rollback::FidlHttpFetcher::new(),
588        rollback::new_healthcheck_stream(),
589    )
590    .await
591}
592
593async fn main_inner<
594    P: PersistenceProvider,
595    CR: CollaborativeRebootScheduler,
596    H: rollback::HttpFetcher + Send + 'static,
597    T: Stream<Item = ()> + Send + 'static,
598    SR: Stream<Item = Result<ServiceRequest, fidl::Error>>,
599>(
600    migration: &mut Migration<P, CR>,
601    service_request_stream: SR,
602    http_fetcher: H,
603    healthcheck_tick: T,
604) {
605    let inspector = fuchsia_inspect::component::inspector();
606    let _inspect_server =
607        inspect_runtime::publish(inspector, inspect_runtime::PublishOptions::default())
608            .expect("failed to serve inspector");
609    let inspect_nodes = InspectNodes::new(inspector, &migration);
610
611    let metrics_logger = MetricsLogger::new().await;
612
613    let (desired_version_sender, desired_version_receiver) = mpsc::unbounded();
614    let (rollback_state_sender, rollback_state_receiver) = mpsc::unbounded();
615    let rollback_state =
616        rollback::State::new(migration.persisted.rollback, migration.current_boot.version());
617    // Update rollback persistence immediately in case the device reboots before
618    // the rollback module has time to send an asynchronous update. This is
619    // required for correctness if Netstack3 is crashing on startup, or in the
620    // following case:
621    //
622    // 1. Device fails to migrate to Netstack3 and persists
623    //    HealthcheckFailures(MAX_FAILED_HEALTHCHECKS), which will force
624    //    Netstack2 on subsequent boots.
625    // 2. Device reboots into Netstack2, sees that it should be running
626    //    Netstack3, and schedules a reboot without clearing the failures.
627    // 3. Device reboots back into Netstack3 and sees that it should schedule
628    //    a reboot because the persisted failures are above the limit.
629    migration.update_rollback_state(rollback_state.persisted()).await;
630
631    Task::spawn(async move {
632        rollback::run(
633            rollback_state,
634            http_fetcher,
635            desired_version_receiver,
636            rollback_state_sender,
637            pin!(healthcheck_tick),
638        )
639        .await
640    })
641    .detach();
642
643    enum Action {
644        ServiceRequest(Result<ServiceRequest, fidl::Error>),
645        LogMetrics,
646        UpdateRollbackState(rollback::Persisted),
647    }
648
649    let metrics_logging_interval = fuchsia_async::MonotonicDuration::from_hours(1);
650    let mut stream: futures::stream::SelectAll<Pin<Box<dyn Stream<Item = Action>>>> =
651        futures::stream::SelectAll::new();
652
653    // Always log metrics once on startup then periodically log new values so
654    // the aggregation window always contains one sample of the current
655    // settings.
656    stream.push(Box::pin(Box::new(
657        futures::stream::once(futures::future::ready(()))
658            .chain(fuchsia_async::Interval::new(metrics_logging_interval))
659            .map(|()| Action::LogMetrics),
660    )));
661    stream.push(Box::pin(Box::new(Box::pin(service_request_stream.map(Action::ServiceRequest)))));
662    stream.push(Box::pin(rollback_state_receiver.map(|state| Action::UpdateRollbackState(state))));
663
664    while let Some(action) = stream.next().await {
665        match action {
666            Action::ServiceRequest(req) => {
667                let result = match req {
668                    Ok(req) => migration.handle_request(req).await,
669                    Err(e) => Err(e),
670                };
671                // Always update inspector state after handling a request.
672                inspect_nodes.update(&migration);
673
674                // Send the desired netstack version to the rollback mechanism,
675                // but ignore the "forced" setting. The "forced" setting comes
676                // from the rollback mechanism, and sending that signal back
677                // into it would cause a the mechanism to incorrectly detect
678                // a cancelation.
679                match desired_version_sender.unbounded_send(
680                    migration.persisted.desired_netstack_version().version_ignoring_force(),
681                ) {
682                    Ok(()) => (),
683                    Err(e) => {
684                        error!("error sending update to rollback module: {:?}", e);
685                    }
686                }
687
688                match result {
689                    Ok(()) => (),
690                    Err(e) => {
691                        if !e.is_closed() {
692                            error!("error processing FIDL request {:?}", e)
693                        }
694                    }
695                }
696            }
697            Action::LogMetrics => {
698                metrics_logger.log_metrics(&migration).await;
699            }
700            Action::UpdateRollbackState(new_state) => {
701                migration.update_rollback_state(new_state).await;
702                // Always update inspector state when the rollback state
703                // changes.
704                inspect_nodes.update(&migration);
705            }
706        }
707    }
708}
709
710#[cfg(test)]
711mod tests {
712    use super::*;
713    use assert_matches::assert_matches;
714    use async_utils::event::{Event, EventWait};
715    use diagnostics_assertions::assert_data_tree;
716    use fidl::Peered as _;
717    use fidl_fuchsia_net_http as fnet_http;
718    use fuchsia_async::TimeoutExt;
719    use futures::FutureExt;
720    use std::cell::RefCell;
721    use std::rc::Rc;
722    use std::time::Duration;
723    use test_case::test_case;
724
725    #[derive(Default, Clone)]
726    struct InMemory {
727        file: Rc<RefCell<Option<Vec<u8>>>>,
728    }
729
730    impl InMemory {
731        fn with_persisted(p: Persisted) -> Self {
732            let mut s = Self::default();
733            p.save(s.open_writer().unwrap());
734            s
735        }
736    }
737
738    impl PersistenceProvider for InMemory {
739        type Writer = Self;
740        type Reader = std::io::Cursor<Vec<u8>>;
741
742        fn open_writer(&mut self) -> std::io::Result<Self::Writer> {
743            *self.file.borrow_mut() = Some(Vec::new());
744            Ok(self.clone())
745        }
746
747        fn open_reader(&self) -> std::io::Result<Self::Reader> {
748            self.file
749                .borrow()
750                .clone()
751                .map(std::io::Cursor::new)
752                .ok_or(std::io::ErrorKind::NotFound.into())
753        }
754    }
755
756    impl std::io::Write for InMemory {
757        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
758            let r = self.file.borrow_mut().as_mut().expect("no file open").write(buf);
759            r
760        }
761
762        fn flush(&mut self) -> std::io::Result<()> {
763            Ok(())
764        }
765    }
766
767    struct NoCollaborativeReboot;
768
769    impl CollaborativeRebootScheduler for NoCollaborativeReboot {
770        async fn schedule(
771            &mut self,
772            _reason: fpower::CollaborativeRebootReason,
773            _cancel: Option<zx::EventPair>,
774        ) {
775            panic!("unexpectedly attempted to schedule a collaborative reboot");
776        }
777    }
778
779    #[derive(Default)]
780    struct FakeCollaborativeReboot {
781        req: Option<zx::EventPair>,
782    }
783
784    impl CollaborativeRebootScheduler for FakeCollaborativeReboot {
785        async fn schedule(
786            &mut self,
787            reason: fpower::CollaborativeRebootReason,
788            cancel: Option<zx::EventPair>,
789        ) {
790            assert_eq!(reason, fpower::CollaborativeRebootReason::NetstackMigration);
791            let cancel = cancel.expect("cancellation signal must be provided");
792            assert_eq!(self.req.replace(cancel), None, "attempted to schedule multiple reboots");
793        }
794    }
795
796    fn serve_migration<P: PersistenceProvider, CR: CollaborativeRebootScheduler>(
797        migration: Migration<P, CR>,
798    ) -> (
799        impl futures::Future<Output = Migration<P, CR>>,
800        fnet_migration::ControlProxy,
801        fnet_migration::StateProxy,
802    ) {
803        let (control, control_server) =
804            fidl::endpoints::create_proxy_and_stream::<fnet_migration::ControlMarker>();
805        let (state, state_server) =
806            fidl::endpoints::create_proxy_and_stream::<fnet_migration::StateMarker>();
807
808        let fut = {
809            let control =
810                control_server.map(|req| ServiceRequest::Control(req.expect("control error")));
811            let state = state_server.map(|req| ServiceRequest::State(req.expect("state error")));
812            futures::stream::select(control, state).fold(migration, |mut migration, req| async {
813                migration.handle_request(req).await.expect("handling request");
814                migration
815            })
816        };
817        (fut, control, state)
818    }
819
820    #[test_case(Persisted{
821        user: Some(NetstackVersion::Netstack2),
822        automated: None,
823        rollback: None,
824    }; "user_netstack2")]
825    #[test_case(Persisted{
826        user: Some(NetstackVersion::Netstack3),
827        automated: None,
828        rollback: None,
829    }; "user_netstack3")]
830    #[test_case(Persisted{
831        user: None,
832        automated: None,
833        rollback: None,
834    }; "none")]
835    #[test_case(Persisted{
836        user: None,
837        automated: Some(NetstackVersion::Netstack2),
838        rollback: None,
839    }; "automated_netstack2")]
840    #[test_case(Persisted{
841        user: None,
842        automated: Some(NetstackVersion::Netstack3),
843        rollback: None,
844    }; "automated_netstack3")]
845    #[test_case(Persisted{
846        user: None,
847        automated: None,
848        rollback: Some(rollback::Persisted::Success),
849    }; "rollback_success")]
850    #[test_case(Persisted{
851        user: Some(NetstackVersion::Netstack2),
852        automated: Some(NetstackVersion::Netstack3),
853        rollback: Some(rollback::Persisted::HealthcheckFailures(5)),
854    }; "all")]
855    #[fuchsia::test(add_test_attr = false)]
856    fn persist_save_load(v: Persisted) {
857        let mut m = InMemory::default();
858        v.save(m.open_writer().unwrap());
859        assert_eq!(Persisted::load(m.open_reader().unwrap()), v);
860    }
861
862    #[fuchsia::test]
863    fn uses_defaults_if_no_persistence() {
864        let m = Migration::new(InMemory::default(), NoCollaborativeReboot);
865        let Migration {
866            current_boot,
867            persisted: Persisted { user, automated, rollback: _ },
868            persistence: _,
869            collaborative_reboot: _,
870        } = m;
871        assert_eq!(current_boot.version(), DEFAULT_NETSTACK);
872        assert_eq!(user, None);
873        assert_eq!(automated, None);
874    }
875
876    #[test_case(
877        None, Some(NetstackVersion::Netstack3), None, NetstackVersion::Netstack3;
878        "automated_ns3")]
879    #[test_case(
880        None, Some(NetstackVersion::Netstack2), None, NetstackVersion::Netstack2;
881        "automated_ns2")]
882    #[test_case(
883        Some(NetstackVersion::Netstack3),
884        Some(NetstackVersion::Netstack2),
885        Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS)),
886        NetstackVersion::Netstack3;
887        "user_ns3_override")]
888    #[test_case(
889        Some(NetstackVersion::Netstack2),
890        Some(NetstackVersion::Netstack3),
891        Some(rollback::Persisted::Success),
892        NetstackVersion::Netstack2;
893        "user_ns2_override")]
894    #[test_case(
895        Some(NetstackVersion::Netstack2),
896        None,
897        None,
898        NetstackVersion::Netstack2; "user_ns2")]
899    #[test_case(
900        Some(NetstackVersion::Netstack3),
901        None,
902        None,
903        NetstackVersion::Netstack3; "user_ns3")]
904    #[test_case(
905        None,
906        Some(NetstackVersion::Netstack3),
907        Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS)),
908        NetstackVersion::Netstack2; "rollback_to_ns2")]
909    #[test_case(None, None, None, DEFAULT_NETSTACK; "default")]
910    #[fuchsia::test]
911    async fn get_netstack_version(
912        p_user: Option<NetstackVersion>,
913        p_automated: Option<NetstackVersion>,
914        p_rollback: Option<rollback::Persisted>,
915        expect: NetstackVersion,
916    ) {
917        let m = Migration::new(
918            InMemory::with_persisted(Persisted {
919                user: p_user,
920                automated: p_automated,
921                rollback: p_rollback,
922            }),
923            NoCollaborativeReboot,
924        );
925        let Migration {
926            current_boot,
927            persisted: Persisted { user, automated, rollback: _ },
928            persistence: _,
929            collaborative_reboot: _,
930        } = &m;
931        assert_eq!(current_boot.version(), expect);
932        assert_eq!(*user, p_user);
933        assert_eq!(*automated, p_automated);
934
935        let (serve, _, state) = serve_migration(m);
936        let fut = async move {
937            let fnet_migration::InEffectVersion { current_boot, user, automated } =
938                state.get_netstack_version().await.expect("get netstack version");
939            let expect = expect.into();
940            let p_user = p_user.map(Into::into);
941            let p_automated = p_automated.map(Into::into);
942            assert_eq!(current_boot, expect);
943            assert_eq!(user, p_user);
944            assert_eq!(automated, p_automated);
945        };
946        let (_, ()): (Migration<_, _>, _) = futures::future::join(serve, fut).await;
947    }
948
949    #[derive(Debug, Copy, Clone)]
950    enum SetMechanism {
951        User,
952        Automated,
953    }
954
955    #[test_case(SetMechanism::User, NetstackVersion::Netstack2; "set_user_ns2")]
956    #[test_case(SetMechanism::User, NetstackVersion::Netstack3; "set_user_ns3")]
957    #[test_case(SetMechanism::Automated, NetstackVersion::Netstack2; "set_automated_ns2")]
958    #[test_case(SetMechanism::Automated, NetstackVersion::Netstack3; "set_automated_ns3")]
959    #[fuchsia::test]
960    async fn set_netstack_version(mechanism: SetMechanism, set_version: NetstackVersion) {
961        let m = Migration::new(
962            InMemory::with_persisted(Default::default()),
963            FakeCollaborativeReboot::default(),
964        );
965        let (serve, control, _) = serve_migration(m);
966        let fut = async move {
967            let setting = fnet_migration::VersionSetting { version: set_version.into() };
968            match mechanism {
969                SetMechanism::User => control
970                    .set_user_netstack_version(Some(&setting))
971                    .await
972                    .expect("set user netstack version"),
973                SetMechanism::Automated => control
974                    .set_automated_netstack_version(Some(&setting))
975                    .await
976                    .expect("set automated netstack version"),
977            }
978        };
979        let (migration, ()) = futures::future::join(serve, fut).await;
980
981        let validate_versions = |m: &Migration<_, _>, current| {
982            let Migration {
983                current_boot,
984                persisted: Persisted { user, automated, rollback: _ },
985                persistence: _,
986                collaborative_reboot: _,
987            } = m;
988            assert_eq!(current_boot.version(), current);
989            match mechanism {
990                SetMechanism::User => {
991                    assert_eq!(*user, Some(set_version));
992                    assert_eq!(*automated, None);
993                }
994                SetMechanism::Automated => {
995                    assert_eq!(*user, None);
996                    assert_eq!(*automated, Some(set_version));
997                }
998            }
999        };
1000
1001        validate_versions(&migration, DEFAULT_NETSTACK);
1002        let cr_req = &migration.collaborative_reboot.scheduler.req;
1003        match (mechanism, set_version) {
1004            (_, NetstackVersion::Netstack3) => {
1005                assert_eq!(
1006                    Ok(false),
1007                    cr_req.as_ref().expect("there should be a request").is_closed()
1008                )
1009            }
1010            _ => assert_eq!(cr_req, &None),
1011        }
1012
1013        // Check that the setting was properly persisted.
1014        let migration =
1015            Migration::new(migration.persistence, migration.collaborative_reboot.scheduler);
1016        validate_versions(&migration, set_version);
1017    }
1018
1019    #[fuchsia::test]
1020    async fn update_rollback_state() {
1021        let mut migration = Migration::new(
1022            InMemory::with_persisted(Persisted {
1023                automated: Some(NetstackVersion::Netstack3),
1024                user: None,
1025                rollback: None,
1026            }),
1027            FakeCollaborativeReboot::default(),
1028        );
1029
1030        assert_eq!(migration.current_boot.version(), NetstackVersion::Netstack3);
1031        assert!(migration.collaborative_reboot.scheduler.req.is_none());
1032
1033        // The first update shouldn't schedule a reboot because we haven't
1034        // passed the healthcheck threshold yet.
1035        migration.update_rollback_state(rollback::Persisted::HealthcheckFailures(1)).await;
1036        assert_matches!(
1037            migration.persisted.rollback,
1038            Some(rollback::Persisted::HealthcheckFailures(1))
1039        );
1040        assert!(migration.collaborative_reboot.scheduler.req.is_none());
1041
1042        // This second update should schedule a reboot because we've passed
1043        // the healthcheck limit and want to roll back to Netstack2.
1044        migration
1045            .update_rollback_state(rollback::Persisted::HealthcheckFailures(
1046                rollback::MAX_FAILED_HEALTHCHECKS,
1047            ))
1048            .await;
1049        assert_matches!(
1050            migration.persisted.rollback,
1051            Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS))
1052        );
1053        assert_eq!(
1054            migration
1055                .collaborative_reboot
1056                .scheduler
1057                .req
1058                .as_ref()
1059                .expect("reboot was not scheduled")
1060                .is_closed()
1061                .unwrap(),
1062            false
1063        );
1064
1065        // This emulates seeing a healthcheck success before rebooting, in which
1066        // case we should see the reboot get canceled.
1067        migration.update_rollback_state(rollback::Persisted::Success).await;
1068        assert_matches!(migration.persisted.rollback, Some(rollback::Persisted::Success));
1069        assert!(migration
1070            .collaborative_reboot
1071            .scheduler
1072            .req
1073            .as_ref()
1074            .unwrap()
1075            .is_closed()
1076            .unwrap());
1077
1078        // Ensure that the changes were persisted successfully.
1079        let migration =
1080            Migration::new(migration.persistence, migration.collaborative_reboot.scheduler);
1081        assert_matches!(migration.persisted.rollback, Some(rollback::Persisted::Success));
1082    }
1083
1084    #[test_case(SetMechanism::User, Some(NetstackVersion::Netstack2), true)]
1085    #[test_case(SetMechanism::User, Some(NetstackVersion::Netstack3), false)]
1086    #[test_case(SetMechanism::User, None, false)]
1087    #[test_case(SetMechanism::Automated, Some(NetstackVersion::Netstack2), true)]
1088    #[test_case(SetMechanism::Automated, Some(NetstackVersion::Netstack3), false)]
1089    #[test_case(SetMechanism::Automated, None, true)]
1090    #[fuchsia::test]
1091    async fn cancel_collaborative_reboot(
1092        mechanism: SetMechanism,
1093        version: Option<NetstackVersion>,
1094        expect_canceled: bool,
1095    ) {
1096        let migration = Migration::new(
1097            InMemory::with_persisted(Persisted { user: None, automated: None, rollback: None }),
1098            FakeCollaborativeReboot::default(),
1099        );
1100
1101        // Start of by updating the automated setting to Netstack3; this ensures
1102        // their is a pending request to cancel.
1103        let (serve, control, _) = serve_migration(migration);
1104        let fut = async move {
1105            control
1106                .set_automated_netstack_version(Some(&fnet_migration::VersionSetting {
1107                    version: fnet_migration::NetstackVersion::Netstack3,
1108                }))
1109                .await
1110                .expect("set automated netstack version");
1111        };
1112        let (migration, ()) = futures::future::join(serve, fut).await;
1113        let cancel = migration
1114            .collaborative_reboot
1115            .scheduler
1116            .req
1117            .as_ref()
1118            .expect("there should be a request");
1119        assert_eq!(Ok(false), cancel.is_closed());
1120
1121        // Update the setting based on the test parameters
1122        let (serve, control, _) = serve_migration(migration);
1123        let fut = async move {
1124            let setting = version.map(|v| fnet_migration::VersionSetting { version: v.into() });
1125            match mechanism {
1126                SetMechanism::User => control
1127                    .set_user_netstack_version(setting.as_ref())
1128                    .await
1129                    .expect("set user netstack version"),
1130                SetMechanism::Automated => control
1131                    .set_automated_netstack_version(setting.as_ref())
1132                    .await
1133                    .expect("set automated netstack version"),
1134            }
1135        };
1136        let (migration, ()) = futures::future::join(serve, fut).await;
1137
1138        let cancel = migration
1139            .collaborative_reboot
1140            .scheduler
1141            .req
1142            .as_ref()
1143            .expect("there should be a request");
1144        assert_eq!(Ok(expect_canceled), cancel.is_closed());
1145    }
1146
1147    #[test_case(SetMechanism::User)]
1148    #[test_case(SetMechanism::Automated)]
1149    #[fuchsia::test]
1150    async fn clear_netstack_version(mechanism: SetMechanism) {
1151        const PREVIOUS_VERSION: NetstackVersion = NetstackVersion::Netstack2;
1152        let m = Migration::new(
1153            InMemory::with_persisted(Persisted {
1154                user: Some(PREVIOUS_VERSION),
1155                automated: Some(PREVIOUS_VERSION),
1156                rollback: None,
1157            }),
1158            NoCollaborativeReboot,
1159        );
1160        let (serve, control, _) = serve_migration(m);
1161        let fut = async move {
1162            match mechanism {
1163                SetMechanism::User => control
1164                    .set_user_netstack_version(None)
1165                    .await
1166                    .expect("set user netstack version"),
1167                SetMechanism::Automated => control
1168                    .set_automated_netstack_version(None)
1169                    .await
1170                    .expect("set automated netstack version"),
1171            }
1172        };
1173        let (migration, ()) = futures::future::join(serve, fut).await;
1174
1175        let validate_versions = |m: &Migration<_, _>| {
1176            let Migration {
1177                current_boot,
1178                persisted: Persisted { user, automated, rollback: _ },
1179                persistence: _,
1180                collaborative_reboot: _,
1181            } = m;
1182            assert_eq!(current_boot.version(), PREVIOUS_VERSION);
1183            match mechanism {
1184                SetMechanism::User => {
1185                    assert_eq!(*user, None);
1186                    assert_eq!(*automated, Some(PREVIOUS_VERSION));
1187                }
1188                SetMechanism::Automated => {
1189                    assert_eq!(*user, Some(PREVIOUS_VERSION));
1190                    assert_eq!(*automated, None);
1191                }
1192            }
1193        };
1194
1195        validate_versions(&migration);
1196        // Check that the setting was properly persisted.
1197        let migration =
1198            Migration::new(migration.persistence, migration.collaborative_reboot.scheduler);
1199        validate_versions(&migration);
1200    }
1201
1202    #[fuchsia::test]
1203    fn inspect() {
1204        let mut m = Migration::new(
1205            InMemory::with_persisted(Persisted {
1206                user: Some(NetstackVersion::Netstack2),
1207                automated: Some(NetstackVersion::Netstack3),
1208                rollback: None,
1209            }),
1210            NoCollaborativeReboot,
1211        );
1212        let inspector = fuchsia_inspect::component::inspector();
1213        let nodes = InspectNodes::new(inspector, &m);
1214        assert_data_tree!(inspector,
1215            root: {
1216                current_boot: 2u64,
1217                user_setting: 2u64,
1218                automated_setting: 3u64,
1219                rollback_state: "None",
1220                forced_netstack2: false,
1221            }
1222        );
1223
1224        m.persisted =
1225            Persisted { user: None, automated: Some(NetstackVersion::Netstack2), rollback: None };
1226        nodes.update(&m);
1227        assert_data_tree!(inspector,
1228            root: {
1229                current_boot: 2u64,
1230                user_setting: 0u64,
1231                automated_setting: 2u64,
1232                rollback_state: "None",
1233                forced_netstack2: false,
1234            }
1235        );
1236    }
1237
1238    #[fuchsia::test]
1239    fn inspect_rollback() {
1240        let mut m = Migration::new(
1241            InMemory::with_persisted(Persisted {
1242                user: None,
1243                automated: Some(NetstackVersion::Netstack3),
1244                rollback: Some(rollback::Persisted::HealthcheckFailures(
1245                    rollback::MAX_FAILED_HEALTHCHECKS,
1246                )),
1247            }),
1248            NoCollaborativeReboot,
1249        );
1250        let inspector = fuchsia_inspect::component::inspector();
1251        let nodes = InspectNodes::new(inspector, &m);
1252        assert_data_tree!(inspector,
1253            root: {
1254                current_boot: 2u64,
1255                user_setting: 0u64,
1256                automated_setting: 3u64,
1257                rollback_state: "Some(HealthcheckFailures(5))",
1258                forced_netstack2: true,
1259            }
1260        );
1261
1262        m.persisted.rollback =
1263            Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS + 1));
1264        nodes.update(&m);
1265        assert_data_tree!(inspector,
1266            root: {
1267                current_boot: 2u64,
1268                user_setting: 0u64,
1269                automated_setting: 3u64,
1270                rollback_state: "Some(HealthcheckFailures(6))",
1271                forced_netstack2: true,
1272            }
1273        );
1274        m.persisted.rollback = Some(rollback::Persisted::Success);
1275        nodes.update(&m);
1276        assert_data_tree!(inspector,
1277            root: {
1278                current_boot: 2u64,
1279                user_setting: 0u64,
1280                automated_setting: 3u64,
1281                rollback_state: "Some(Success)",
1282                forced_netstack2: true,
1283            }
1284        );
1285    }
1286
1287    #[test_case::test_matrix(
1288    [
1289        (RollbackNetstackVersion::Netstack2, metrics_registry::StackMigrationCurrentBootMetricDimensionNetstackVersion::Netstack2),
1290        (RollbackNetstackVersion::Netstack3, metrics_registry::StackMigrationCurrentBootMetricDimensionNetstackVersion::Netstack3),
1291    ],
1292    [
1293        (None, metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion::NoSelection),
1294        (Some(NetstackVersion::Netstack2), metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion::Netstack2),
1295        (Some(NetstackVersion::Netstack3), metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion::Netstack3),
1296    ],
1297    [
1298        (None, metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion::NoSelection),
1299        (Some(NetstackVersion::Netstack2), metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion::Netstack2),
1300        (Some(NetstackVersion::Netstack3), metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion::Netstack3),
1301    ]
1302    )]
1303    #[fuchsia::test]
1304    async fn metrics_logger(
1305        current_boot: (
1306            RollbackNetstackVersion,
1307            metrics_registry::StackMigrationCurrentBootMetricDimensionNetstackVersion,
1308        ),
1309        user: (
1310            Option<NetstackVersion>,
1311            metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion,
1312        ),
1313        automated: (
1314            Option<NetstackVersion>,
1315            metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion,
1316        ),
1317    ) {
1318        let (current_boot, current_boot_expect) = current_boot;
1319        let (user, user_expect) = user;
1320        let (automated, automated_expect) = automated;
1321        let mut m = Migration::new(
1322            InMemory::with_persisted(Persisted { user, automated, rollback: None }),
1323            NoCollaborativeReboot,
1324        );
1325        m.current_boot = current_boot;
1326        let (logger, mut logger_stream) =
1327            fidl::endpoints::create_proxy_and_stream::<fmetrics::MetricEventLoggerMarker>();
1328
1329        let metrics_logger = MetricsLogger { logger: Some(logger) };
1330
1331        let ((), ()) = futures::future::join(metrics_logger.log_metrics(&m), async {
1332            let expect = [
1333                (
1334                    metrics_registry::STACK_MIGRATION_CURRENT_BOOT_METRIC_ID,
1335                    Some(current_boot_expect.as_event_code()),
1336                ),
1337                (
1338                    metrics_registry::STACK_MIGRATION_USER_SETTING_METRIC_ID,
1339                    Some(user_expect.as_event_code()),
1340                ),
1341                (
1342                    metrics_registry::STACK_MIGRATION_AUTOMATED_SETTING_METRIC_ID,
1343                    Some(automated_expect.as_event_code()),
1344                ),
1345                (
1346                    metrics_registry::STACK_MIGRATION_STATE_METRIC_ID,
1347                    // Note: The rollback state doesn't have a flat expectation.
1348                    // Don't assert on its value here, and instead we directly
1349                    // test it in a separate test case.
1350                    None,
1351                ),
1352            ];
1353            for (id, ev) in expect {
1354                let (metric, occurences, codes, responder) = logger_stream
1355                    .next()
1356                    .await
1357                    .unwrap()
1358                    .unwrap()
1359                    .into_log_occurrence()
1360                    .expect("bad request");
1361                assert_eq!(metric, id);
1362                assert_eq!(occurences, 1);
1363                if let Some(ev) = ev {
1364                    assert_eq!(codes, vec![ev]);
1365                }
1366                responder.send(Ok(())).unwrap();
1367            }
1368        })
1369        .await;
1370    }
1371
1372    #[test_case(
1373        RollbackNetstackVersion::Netstack2, None, None =>
1374        metrics_registry::StackMigrationStateMetricDimensionMigrationState::NotStarted;
1375        "not_started_none"
1376    )]
1377    #[test_case(
1378        RollbackNetstackVersion::Netstack2, Some(NetstackVersion::Netstack2), None =>
1379        metrics_registry::StackMigrationStateMetricDimensionMigrationState::NotStarted;
1380        "not_started_ns2"
1381    )]
1382    #[test_case(
1383        RollbackNetstackVersion::Netstack2, Some(NetstackVersion::Netstack3), None =>
1384        metrics_registry::StackMigrationStateMetricDimensionMigrationState::Scheduled;
1385        "scheduled"
1386    )]
1387    #[test_case(
1388        RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack3), None =>
1389        metrics_registry::StackMigrationStateMetricDimensionMigrationState::InProgress;
1390        "in_progress_none"
1391    )]
1392    #[test_case(
1393        RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack3),
1394        Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS - 1)) =>
1395        metrics_registry::StackMigrationStateMetricDimensionMigrationState::InProgress;
1396        "in_progress_some"
1397    )]
1398    #[test_case(
1399        RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack3),
1400        Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS)) =>
1401        metrics_registry::StackMigrationStateMetricDimensionMigrationState::Failed;
1402        "failed_exact"
1403    )]
1404    #[test_case(
1405        RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack3),
1406        Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS + 1)) =>
1407        metrics_registry::StackMigrationStateMetricDimensionMigrationState::Failed;
1408        "failed_more"
1409    )]
1410    #[test_case(
1411        RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack3),
1412        Some(rollback::Persisted::Success) =>
1413        metrics_registry::StackMigrationStateMetricDimensionMigrationState::Success;
1414        "success"
1415    )]
1416    #[test_case(
1417        RollbackNetstackVersion::Netstack3, None, None =>
1418        metrics_registry::StackMigrationStateMetricDimensionMigrationState::Canceled;
1419        "canceled_none"
1420    )]
1421    #[test_case(
1422        RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack2), None =>
1423        metrics_registry::StackMigrationStateMetricDimensionMigrationState::Canceled;
1424        "canceled_ns2"
1425    )]
1426    #[test_case(
1427        RollbackNetstackVersion::ForceNetstack2, Some(NetstackVersion::Netstack3), None =>
1428        metrics_registry::StackMigrationStateMetricDimensionMigrationState::RolledBack;
1429        "rolled_back"
1430    )]
1431    #[fuchsia::test]
1432    fn test_state_metric(
1433        current_boot: RollbackNetstackVersion,
1434        automated: Option<NetstackVersion>,
1435        rollback: Option<rollback::Persisted>,
1436    ) -> metrics_registry::StackMigrationStateMetricDimensionMigrationState {
1437        let mut migration = Migration::new(
1438            InMemory::with_persisted(Persisted { user: None, automated, rollback }),
1439            NoCollaborativeReboot,
1440        );
1441        migration.current_boot = current_boot;
1442        compute_state_metric(&migration)
1443    }
1444
1445    /// An in-memory mock-persistence that triggers an event once the target
1446    /// state has been persisted.
1447    #[derive(Clone)]
1448    struct AwaitPersisted {
1449        file: Rc<RefCell<Option<Vec<u8>>>>,
1450        target: Vec<u8>,
1451        event: Event,
1452    }
1453
1454    impl AwaitPersisted {
1455        fn with_persisted(start: Persisted, target: &Persisted) -> (Self, EventWait) {
1456            let event = Event::new();
1457            let wait = event.wait();
1458            let target_bytes = serde_json::to_vec(target).expect("failed to serialize target");
1459            let mut s = Self { file: Default::default(), target: target_bytes, event };
1460            start.save(s.open_writer().unwrap());
1461            (s, wait)
1462        }
1463    }
1464
1465    impl PersistenceProvider for AwaitPersisted {
1466        type Writer = Self;
1467        type Reader = std::io::Cursor<Vec<u8>>;
1468
1469        fn open_writer(&mut self) -> std::io::Result<Self::Writer> {
1470            *self.file.borrow_mut() = Some(Vec::new());
1471            Ok(self.clone())
1472        }
1473
1474        fn open_reader(&self) -> std::io::Result<Self::Reader> {
1475            self.file
1476                .borrow()
1477                .clone()
1478                .map(std::io::Cursor::new)
1479                .ok_or(std::io::ErrorKind::NotFound.into())
1480        }
1481    }
1482
1483    impl std::io::Write for AwaitPersisted {
1484        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1485            let r = self.file.borrow_mut().as_mut().expect("no file open").write(buf);
1486            if self.file.borrow().as_ref().expect("no_file_open") == &self.target {
1487                let _: bool = self.event.signal();
1488            }
1489            r
1490        }
1491
1492        fn flush(&mut self) -> std::io::Result<()> {
1493            Ok(())
1494        }
1495    }
1496
1497    #[fuchsia::test]
1498    async fn migrate_to_ns3_success() {
1499        let start =
1500            Persisted { user: None, automated: Some(NetstackVersion::Netstack3), rollback: None };
1501        let target = Persisted {
1502            user: None,
1503            automated: Some(NetstackVersion::Netstack3),
1504            rollback: Some(rollback::Persisted::Success),
1505        };
1506
1507        let (persistence, mut wait) = AwaitPersisted::with_persisted(start, &target);
1508        let mut migration = Migration::new(persistence, NoCollaborativeReboot);
1509        // No service requests.
1510        let service_request_stream = futures::stream::pending();
1511        // A health check that always succeeds.
1512        let mock_healthcheck = rollback::testutil::MockHttpRequester(|| {
1513            Ok(fnet_http::Response { error: None, status_code: Some(204), ..Default::default() })
1514        });
1515        let healthcheck_tick = futures::stream::once(futures::future::ready(()));
1516
1517        {
1518            let main_fut = main_inner(
1519                &mut migration,
1520                service_request_stream,
1521                mock_healthcheck,
1522                healthcheck_tick,
1523            )
1524            .fuse();
1525            futures::pin_mut!(main_fut);
1526            futures::select!(
1527                () = main_fut => unreachable!("main fut should never exit"),
1528                () = wait => {}
1529            );
1530        }
1531
1532        assert_eq!(migration.persisted.rollback, Some(rollback::Persisted::Success));
1533    }
1534
1535    #[fuchsia::test]
1536    async fn migrate_to_ns3_fails() {
1537        let start =
1538            Persisted { user: None, automated: Some(NetstackVersion::Netstack3), rollback: None };
1539        let target = Persisted {
1540            user: None,
1541            automated: Some(NetstackVersion::Netstack3),
1542            rollback: Some(rollback::Persisted::HealthcheckFailures(
1543                rollback::MAX_FAILED_HEALTHCHECKS,
1544            )),
1545        };
1546
1547        let (persistence, mut wait) = AwaitPersisted::with_persisted(start, &target);
1548        let mut migration = Migration::new(persistence, FakeCollaborativeReboot::default());
1549        // No service requests.
1550        let service_request_stream = futures::stream::pending();
1551        // A health check that always fails.
1552        let mock_healthcheck = rollback::testutil::MockHttpRequester(|| {
1553            Ok(fnet_http::Response { error: None, status_code: Some(500), ..Default::default() })
1554        });
1555        // Use a non-zero interval so that the Healthcheck code doesn't hog the scheduler.
1556        let healthcheck_tick = fuchsia_async::Interval::new(Duration::from_millis(1).into());
1557
1558        {
1559            let main_fut = main_inner(
1560                &mut migration,
1561                service_request_stream,
1562                mock_healthcheck,
1563                healthcheck_tick,
1564            )
1565            .fuse();
1566            futures::pin_mut!(main_fut);
1567            futures::select!(
1568                () = main_fut => unreachable!("main fut should never exit"),
1569                () = wait => {}
1570            );
1571        }
1572
1573        assert_matches!(
1574            migration.persisted.rollback,
1575            Some(rollback::Persisted::HealthcheckFailures(f)) if
1576            f >= rollback::MAX_FAILED_HEALTHCHECKS
1577        );
1578        // Verify a failed migration schedules a collaborative reboot.
1579        let cr_req = &migration.collaborative_reboot.scheduler.req;
1580        assert_eq!(Ok(false), cr_req.as_ref().expect("there should be a request").is_closed())
1581    }
1582
1583    // Regression test for https://fxbug.dev/395913604.
1584    //
1585    // The original bug would reset the number of failed healthchecks in
1586    // persistence from `rollback::MAX_FAILED_HEALTHCHECKS` to 0, if an inbound
1587    // service request was received.
1588    //
1589    // Verify this is no longer the case by triggering a failed healthcheck,
1590    // pushing the total to `rollback::MAX_FAILED_HEALTHCHECKS`, then sending
1591    // a `fuchsia.net.migration/State.GetNetstackVersion` request.
1592    #[fuchsia::test]
1593    async fn migrate_to_ns3_rollback_regression_test() {
1594        let start = Persisted {
1595            user: None,
1596            automated: Some(NetstackVersion::Netstack3),
1597            rollback: Some(rollback::Persisted::HealthcheckFailures(
1598                rollback::MAX_FAILED_HEALTHCHECKS - 1,
1599            )),
1600        };
1601        let target = Persisted {
1602            user: None,
1603            automated: Some(NetstackVersion::Netstack3),
1604            rollback: Some(rollback::Persisted::HealthcheckFailures(0)),
1605        };
1606
1607        let (persistence, wait) = AwaitPersisted::with_persisted(start, &target);
1608        let mut migration = Migration::new(persistence, FakeCollaborativeReboot::default());
1609        // A health check that always fails.
1610        let mock_healthcheck = rollback::testutil::MockHttpRequester(|| {
1611            Ok(fnet_http::Response { error: None, status_code: Some(500), ..Default::default() })
1612        });
1613        let healthcheck_tick = futures::stream::once(futures::future::ready(()));
1614        // Send "get" requests, to trigger the bug.
1615        let (client, server) =
1616            fidl::endpoints::create_proxy_and_stream::<fnet_migration::StateMarker>();
1617        let service_request_stream = server.map(|r| r.map(ServiceRequest::State));
1618        let client_fut = async move {
1619            // Send multiple get requests, to ensure that at least one would occur after the failed
1620            // healthcheck.
1621            let mut stream = fuchsia_async::Interval::new(Duration::from_millis(1).into());
1622            while let Some(()) = stream.next().await {
1623                let _ =
1624                    client.get_netstack_version().await.expect("failed to get netstack version");
1625            }
1626        }
1627        .fuse();
1628
1629        // If wait were to fire, the bug has occurred. Instead expect a timeout.
1630        // Use 1 second to keep the test runtime short; If CQ has a hiccup and
1631        // pauses execution, we'd see a false negative, which isn't a big deal.
1632        let wait_fut = wait
1633            .map(|()| panic!("unexpectedly observed the persisted healthcheck failures reset to 0"))
1634            .on_timeout(Duration::from_secs(1), || ())
1635            .fuse();
1636
1637        {
1638            let main_fut = main_inner(
1639                &mut migration,
1640                service_request_stream,
1641                mock_healthcheck,
1642                healthcheck_tick,
1643            )
1644            .fuse();
1645            futures::pin_mut!(main_fut);
1646            futures::pin_mut!(client_fut);
1647            futures::pin_mut!(wait_fut);
1648            futures::select!(
1649                () = main_fut => unreachable!("main fut should never exit"),
1650                () = client_fut => unreachable!("client fut should never exit"),
1651                () = wait_fut => {}
1652            );
1653        }
1654    }
1655}