stack_migration/
main.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod rollback;
6
7use std::pin::{Pin, pin};
8use std::time::Duration;
9
10use cobalt_client::traits::AsEventCode as _;
11use fuchsia_async::Task;
12use fuchsia_component::server::{ServiceFs, ServiceFsDir};
13use fuchsia_inspect::Property as _;
14use futures::channel::mpsc;
15use futures::{FutureExt as _, Stream, StreamExt as _};
16use log::{error, info, warn};
17use networking_metrics_registry::networking_metrics_registry as metrics_registry;
18use {
19    fidl_fuchsia_metrics as fmetrics, fidl_fuchsia_net_stackmigrationdeprecated as fnet_migration,
20    fidl_fuchsia_power_internal as fpower,
21};
22
23const DEFAULT_NETSTACK: NetstackVersion = NetstackVersion::Netstack3;
24
25/// The number of times a device is allowed to fail to migrate to Netstack3,
26/// before abandoning migration attempts during the `CURRENT_EPOCH`.
27const MAX_ROLLBACKS_PER_EPOCH: u8 = 3;
28
29/// The "epoch" is an arbitrary interval used to limit the number of attempts a
30/// device will make to migrate to Netstack3. Once the epoch increases, the
31/// device will resume attempts to migrate to Netstack3.
32///
33/// Pragmatically the intention is to increment this number every time a Fuchsia
34/// Release is cut that contains a known fix to a Netstack3 bug.
35const CURRENT_EPOCH: u32 = 2;
36
37/// The number of failed healthchecks at which we begin generating crash
38/// reports.
39///
40/// By setting this to one less than the `rollback::MAX_FAILED_HEALTHCHECKS`, we
41/// ensure each rollback event will have two "failed healthcheck" reports from
42/// the previous boot.
43const HEALTHCHECK_FAILURE_THRESHOLD_FOR_CRASH_REPORTS: usize =
44    crate::rollback::MAX_FAILED_HEALTHCHECKS - 1;
45
46#[derive(Debug, Clone, Copy, serde::Deserialize, serde::Serialize, Eq, PartialEq)]
47enum NetstackVersion {
48    Netstack2,
49    Netstack3,
50}
51
52impl NetstackVersion {
53    fn inspect_uint_value(&self) -> u64 {
54        match self {
55            Self::Netstack2 => 2,
56            Self::Netstack3 => 3,
57        }
58    }
59
60    fn optional_inspect_uint_value(o: &Option<Self>) -> u64 {
61        o.as_ref().map(Self::inspect_uint_value).unwrap_or(0)
62    }
63}
64
65impl From<fnet_migration::NetstackVersion> for NetstackVersion {
66    fn from(value: fnet_migration::NetstackVersion) -> Self {
67        match value {
68            fnet_migration::NetstackVersion::Netstack2 => NetstackVersion::Netstack2,
69            fnet_migration::NetstackVersion::Netstack3 => NetstackVersion::Netstack3,
70        }
71    }
72}
73
74impl From<NetstackVersion> for fnet_migration::NetstackVersion {
75    fn from(value: NetstackVersion) -> Self {
76        match value {
77            NetstackVersion::Netstack2 => fnet_migration::NetstackVersion::Netstack2,
78            NetstackVersion::Netstack3 => fnet_migration::NetstackVersion::Netstack3,
79        }
80    }
81}
82
83impl From<NetstackVersion> for Box<fnet_migration::VersionSetting> {
84    fn from(value: NetstackVersion) -> Self {
85        Box::new(fnet_migration::VersionSetting { version: value.into() })
86    }
87}
88
89#[derive(Debug, PartialEq)]
90enum RollbackNetstackVersion {
91    Netstack2,
92    // The automated setting requested Netstack3, but the persisted state
93    // indicates that the previous boot failed to migrate to Netstack3 (had too
94    // many health check failure). Forcibly use Netstack2 for this boot.
95    ForceNetstack2,
96    // The automated setting requested Netstack3, but we've already failed too
97    // many migration attempts in the `CURRENT_EPOCH`. Forcibly Use Netstack2
98    // until the next epoch.
99    ForceNetstack2Indefinitely,
100    Netstack3,
101}
102
103impl RollbackNetstackVersion {
104    // Convert into a `NetstackVersion`, while honoring the forced setting.
105    fn version(&self) -> NetstackVersion {
106        match self {
107            Self::Netstack2 | Self::ForceNetstack2 | Self::ForceNetstack2Indefinitely => {
108                NetstackVersion::Netstack2
109            }
110            Self::Netstack3 => NetstackVersion::Netstack3,
111        }
112    }
113
114    // Convert into a `NetstackVersion`, while ignoring the forced setting.
115    fn version_ignoring_force(&self) -> NetstackVersion {
116        match self {
117            Self::Netstack2 => NetstackVersion::Netstack2,
118            Self::Netstack3 | Self::ForceNetstack2 | Self::ForceNetstack2Indefinitely => {
119                NetstackVersion::Netstack3
120            }
121        }
122    }
123}
124
125impl From<NetstackVersion> for RollbackNetstackVersion {
126    fn from(version: NetstackVersion) -> Self {
127        match version {
128            NetstackVersion::Netstack2 => RollbackNetstackVersion::Netstack2,
129            NetstackVersion::Netstack3 => RollbackNetstackVersion::Netstack3,
130        }
131    }
132}
133
134#[derive(Default, Debug, serde::Deserialize, serde::Serialize)]
135#[cfg_attr(test, derive(Eq, PartialEq))]
136struct Persisted {
137    automated: Option<NetstackVersion>,
138    user: Option<NetstackVersion>,
139    rollback: Option<rollback::Persisted>,
140    rollback_history: Option<RollbackHistory>,
141}
142
143impl Persisted {
144    fn load<R: std::io::Read>(r: R) -> Self {
145        serde_json::from_reader(std::io::BufReader::new(r)).unwrap_or_else(|e| {
146            error!("error loading persisted config {e:?}, using defaults");
147            Persisted::default()
148        })
149    }
150
151    fn save<W: std::io::Write>(&self, w: W) {
152        serde_json::to_writer(w, self).unwrap_or_else(|e: serde_json::Error| {
153            error!("error persisting configuration {self:?}: {e:?}")
154        })
155    }
156
157    // Determine the desired NetstackVersion based on the persisted values
158    fn desired_netstack_version(&self) -> RollbackNetstackVersion {
159        match self {
160            // Always prefer the user setting, if present.
161            Persisted { user: Some(user), automated: _, rollback: _, rollback_history: _ } => {
162                (*user).into()
163            }
164            // If the automated setting is Netstack2, honor it unconditionally.
165            Persisted {
166                user: None,
167                automated: Some(NetstackVersion::Netstack2),
168                rollback: _,
169                rollback_history: _,
170            } => RollbackNetstackVersion::Netstack2,
171            // If the automated setting is Netstack3, we need to first check the
172            // rollback state.
173            Persisted {
174                user: None,
175                automated: Some(NetstackVersion::Netstack3),
176                rollback,
177                rollback_history,
178            } => {
179                let rollback_count = match rollback_history {
180                    None => 0,
181                    Some(RollbackHistory { epoch, count }) => {
182                        if *epoch == CURRENT_EPOCH {
183                            *count
184                        } else {
185                            // Ignore failed rollbacks in an different epoch.
186                            0
187                        }
188                    }
189                };
190                let healthcheck_failure_count = match rollback {
191                    None => 0,
192                    Some(rollback::Persisted::Success) => 0,
193                    Some(rollback::Persisted::HealthcheckFailures(count)) => *count,
194                };
195
196                if rollback_count >= MAX_ROLLBACKS_PER_EPOCH {
197                    // If we've failed all of our allotted migration attempts
198                    // abandon further migration attempts in this epoch.
199                    RollbackNetstackVersion::ForceNetstack2Indefinitely
200                } else if healthcheck_failure_count >= rollback::MAX_FAILED_HEALTHCHECKS {
201                    // If we've failed all the healthchecks in the current
202                    // migration attempt, temporarily rollback to Netstack2.
203                    RollbackNetstackVersion::ForceNetstack2
204                } else {
205                    // Otherwise, proceed as normal with Netstack3.
206                    RollbackNetstackVersion::Netstack3
207                }
208            }
209            // Use the default version if nothing is set.
210            Persisted { user: None, automated: None, rollback: _, rollback_history: _ } => {
211                DEFAULT_NETSTACK.into()
212            }
213        }
214    }
215}
216
217/// A history of rollbacks that have occurred on the device.
218#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
219pub(crate) struct RollbackHistory {
220    /// The epoch during which the rollback occurred.
221    epoch: u32,
222    /// The number of rollbacks that took place.
223    count: u8,
224}
225
226impl Default for RollbackHistory {
227    fn default() -> Self {
228        RollbackHistory { epoch: CURRENT_EPOCH, count: 0 }
229    }
230}
231
232enum ServiceRequest {
233    Control(fnet_migration::ControlRequest),
234    State(fnet_migration::StateRequest),
235}
236
237struct Migration<P, CR, R> {
238    current_boot: RollbackNetstackVersion,
239    persisted: Persisted,
240    persistence: P,
241    collaborative_reboot: CollaborativeReboot<CR>,
242    crash_reporter: R,
243}
244
245trait PersistenceProvider {
246    type Writer: std::io::Write;
247    type Reader: std::io::Read;
248
249    fn open_writer(&mut self) -> std::io::Result<Self::Writer>;
250    fn open_reader(&self) -> std::io::Result<Self::Reader>;
251}
252
253fn try_persist<P: PersistenceProvider>(provider: &mut P, data: &Persisted) {
254    let w = match provider.open_writer() {
255        Ok(w) => w,
256        Err(e) => {
257            error!("failed to open writer to persist settings: {e:?}");
258            return;
259        }
260    };
261    data.save(w);
262}
263
264struct DataPersistenceProvider {}
265
266const PERSISTED_FILE_PATH: &'static str = "/data/config.json";
267
268impl PersistenceProvider for DataPersistenceProvider {
269    type Writer = std::fs::File;
270    type Reader = std::fs::File;
271
272    fn open_writer(&mut self) -> std::io::Result<Self::Writer> {
273        std::fs::File::create(PERSISTED_FILE_PATH)
274    }
275
276    fn open_reader(&self) -> std::io::Result<Self::Reader> {
277        std::fs::File::open(PERSISTED_FILE_PATH)
278    }
279}
280
281struct CollaborativeReboot<CR> {
282    scheduler: CR,
283    /// `Some(<cancellation_token>)` if there's an outstanding collaborative
284    /// reboot scheduled.
285    scheduled_req: Option<zx::EventPair>,
286}
287
288impl<CR: CollaborativeRebootScheduler> CollaborativeReboot<CR> {
289    /// Schedules a collaborative reboot.
290    ///
291    /// No-Op if there's already a reboot scheduled.
292    async fn schedule(&mut self) {
293        let Self { scheduler, scheduled_req } = self;
294        if scheduled_req.is_some() {
295            // We already have an outstanding request.
296            return;
297        }
298
299        info!("Scheduling collaborative reboot");
300        let (mine, theirs) = zx::EventPair::create();
301        *scheduled_req = Some(mine);
302        scheduler
303            .schedule(fpower::CollaborativeRebootReason::NetstackMigration, Some(theirs))
304            .await;
305    }
306
307    /// Cancels the currently scheduled collaborative Reboot.
308    ///
309    /// No-Op if there's none scheduled.
310    fn cancel(&mut self) {
311        if let Some(cancel) = self.scheduled_req.take() {
312            info!("Canceling collaborative reboot request. It's no longer necessary.");
313            // Dropping the eventpair cancels the request.
314            std::mem::drop(cancel);
315        }
316    }
317}
318
319/// An abstraction over the `fpower::CollaborativeRebootScheduler` FIDL API.
320trait CollaborativeRebootScheduler {
321    async fn schedule(
322        &mut self,
323        reason: fpower::CollaborativeRebootReason,
324        cancel: Option<zx::EventPair>,
325    );
326}
327
328/// An implementation of `CollaborativeRebootScheduler` that connects to the
329/// API over FIDL.
330struct Scheduler {}
331
332impl CollaborativeRebootScheduler for Scheduler {
333    async fn schedule(
334        &mut self,
335        reason: fpower::CollaborativeRebootReason,
336        cancel: Option<zx::EventPair>,
337    ) {
338        let proxy = match fuchsia_component::client::connect_to_protocol::<
339            fpower::CollaborativeRebootSchedulerMarker,
340        >() {
341            Ok(proxy) => proxy,
342            Err(e) => {
343                error!("Failed to connect to collaborative reboot scheduler: {e:?}");
344                return;
345            }
346        };
347        match proxy.schedule_reboot(reason, cancel).await {
348            Ok(()) => {}
349            Err(e) => error!("Failed to schedule collaborative reboot: {e:?}"),
350        }
351    }
352}
353
354/// The reason to file a crash report.
355#[derive(Debug, Clone, PartialEq, Eq)]
356enum CrashReportReason {
357    FailedHealthcheck,
358    RolledBack,
359}
360
361/// An abstraction over the `fidl_fuchsia_feedback::CrashReporter` FIDL API.
362trait CrashReporter {
363    /// The amount of time after startup to wait before filling a rollback
364    /// induced crash report.
365    const ROLLBACK_CRASH_REPORT_DELAY: Duration;
366
367    fn file_report(&mut self, reason: CrashReportReason);
368}
369
370/// An implementation of `CrashReporter` that connects to the API over FIDL.
371struct Reporter {}
372
373impl CrashReporter for Reporter {
374    const ROLLBACK_CRASH_REPORT_DELAY: Duration = Duration::from_secs(180);
375
376    fn file_report(&mut self, reason: CrashReportReason) {
377        const PROGRAM_NAME: &str = "netstack_migration";
378        const FAILED_HEALTHCHECK_SIGNATURE: &str = "fuchsia-netstack3-healthcheck-failed";
379        const ROLLED_BACK_SIGNATURE: &str = "fuchsia-netstack3-rolled-back";
380
381        info!("Generating a crash report for reason: {reason:?}");
382
383        let proxy = match fuchsia_component::client::connect_to_protocol::<
384            fidl_fuchsia_feedback::CrashReporterMarker,
385        >() {
386            Ok(proxy) => proxy,
387            Err(e) => {
388                error!("Failed to connect to CrashReport proxy: {e:?}");
389                return;
390            }
391        };
392
393        let signature = match reason {
394            CrashReportReason::FailedHealthcheck => FAILED_HEALTHCHECK_SIGNATURE,
395            CrashReportReason::RolledBack => ROLLED_BACK_SIGNATURE,
396        };
397        let report = fidl_fuchsia_feedback::CrashReport {
398            program_name: Some(PROGRAM_NAME.to_string()),
399            program_uptime: Some(zx::MonotonicInstant::get().into_nanos()),
400            crash_signature: Some(signature.to_string()),
401            is_fatal: Some(false),
402            ..Default::default()
403        };
404
405        // File the crash report in a background task. It may take a while and
406        // we don't want to block the migration worker from serving other
407        // operations.
408        Task::spawn(proxy.file_report(report).map(|result| match result {
409            Ok(Ok(status)) => info!("Successfully filed crash report. Status={status:?}"),
410            Ok(Err(e)) => error!("Failed to file crash report: {e:?}"),
411            Err(e) => error!("Failed to request crash report: {e:?}"),
412        }))
413        .detach();
414    }
415}
416
417impl<P: PersistenceProvider, CR: CollaborativeRebootScheduler, R: CrashReporter>
418    Migration<P, CR, R>
419{
420    fn new(mut persistence: P, cr_scheduler: CR, crash_reporter: R) -> Self {
421        let mut persisted = persistence.open_reader().map(Persisted::load).unwrap_or_else(|e| {
422            warn!("could not open persistence reader: {e:?}. using defaults");
423            Persisted::default()
424        });
425
426        // Routine to update the rollback history on startup based on persisted
427        // values from the previous boot.
428        //
429        // This performs two important operations:
430        //   1) reset the failure count if the epoch has advanced, and
431        //   2) increment the failure count if the previous boot failed to
432        //      migrate.
433        let mut new = match persisted.rollback_history {
434            None => RollbackHistory::default(),
435            Some(RollbackHistory { epoch, count }) if epoch != CURRENT_EPOCH => {
436                if count > 0 {
437                    info!("Reseting Rollback History for new epoch");
438                }
439                RollbackHistory::default()
440            }
441            Some(history) => history,
442        };
443        if let Some(rollback::Persisted::HealthcheckFailures(failures)) = persisted.rollback {
444            if failures >= rollback::MAX_FAILED_HEALTHCHECKS {
445                new.count = new.count.saturating_add(1);
446            }
447        }
448
449        if Some(new) != persisted.rollback_history {
450            persisted.rollback_history = Some(new);
451            try_persist(&mut persistence, &persisted);
452        }
453
454        info!("Netstack Migration starting with persisted state: {persisted:?}");
455
456        let current_boot = persisted.desired_netstack_version();
457
458        match current_boot {
459            RollbackNetstackVersion::ForceNetstack2 => {
460                warn!(
461                    "Previous boot failed to migrate to Netstack3. \
462                    Ignoring automated setting and forcibly using Netstack2 \
463                    for the current boot."
464                );
465            }
466            RollbackNetstackVersion::ForceNetstack2Indefinitely => {
467                warn!(
468                    "Previous boot failed to migrate to Netstack3 and the \
469                    rollback limit has been exceeded. Ignoring automated \
470                    setting and forcibly using Netstack2 until the next epoch."
471                );
472            }
473            RollbackNetstackVersion::Netstack2 | RollbackNetstackVersion::Netstack3 => {}
474        }
475
476        Self {
477            current_boot,
478            persisted,
479            persistence,
480            collaborative_reboot: CollaborativeReboot {
481                scheduler: cr_scheduler,
482                scheduled_req: None,
483            },
484            crash_reporter,
485        }
486    }
487
488    fn persist(&mut self) {
489        let Self {
490            current_boot: _,
491            persisted,
492            persistence,
493            collaborative_reboot: _,
494            crash_reporter: _,
495        } = self;
496        try_persist(persistence, persisted);
497    }
498
499    fn map_version_setting(
500        version: Option<Box<fnet_migration::VersionSetting>>,
501    ) -> Option<NetstackVersion> {
502        version.map(|v| {
503            let fnet_migration::VersionSetting { version } = &*v;
504            (*version).into()
505        })
506    }
507
508    async fn update_collaborative_reboot(&mut self) {
509        let Self {
510            current_boot,
511            persisted,
512            persistence: _,
513            collaborative_reboot,
514            crash_reporter: _,
515        } = self;
516        if persisted.desired_netstack_version().version() != current_boot.version() {
517            // When the current boot differs from our desired version, schedule
518            // a reboot (if there's not already one).
519            collaborative_reboot.schedule().await
520        } else {
521            // When the current_boot matches our desired version, we no longer
522            // need reboot. Cancel the outstanding request (if any)
523            collaborative_reboot.cancel()
524        }
525    }
526
527    async fn update_rollback_state(&mut self, new_state: rollback::Persisted) {
528        if self.persisted.rollback != Some(new_state) {
529            self.persisted.rollback = Some(new_state);
530            self.update_collaborative_reboot().await;
531            self.persist();
532
533            if let rollback::Persisted::HealthcheckFailures(f) = new_state {
534                if f >= HEALTHCHECK_FAILURE_THRESHOLD_FOR_CRASH_REPORTS {
535                    self.crash_reporter.file_report(CrashReportReason::FailedHealthcheck);
536                }
537            }
538        }
539    }
540
541    async fn handle_control_request(
542        &mut self,
543        req: fnet_migration::ControlRequest,
544    ) -> Result<(), fidl::Error> {
545        match req {
546            fnet_migration::ControlRequest::SetAutomatedNetstackVersion { version, responder } => {
547                let version = Self::map_version_setting(version);
548                let Self {
549                    current_boot: _,
550                    persisted: Persisted { automated, user: _, rollback: _, rollback_history: _ },
551                    persistence: _,
552                    collaborative_reboot: _,
553                    crash_reporter: _,
554                } = self;
555                if version != *automated {
556                    info!("automated netstack version switched to {version:?}");
557                    *automated = version;
558                    self.persist();
559                    self.update_collaborative_reboot().await;
560                }
561                responder.send()
562            }
563            fnet_migration::ControlRequest::SetUserNetstackVersion { version, responder } => {
564                let version = Self::map_version_setting(version);
565                let Self {
566                    current_boot: _,
567                    persisted: Persisted { automated: _, user, rollback: _, rollback_history: _ },
568                    persistence: _,
569                    collaborative_reboot: _,
570                    crash_reporter: _,
571                } = self;
572                if version != *user {
573                    info!("user netstack version switched to {version:?}");
574                    *user = version;
575                    self.persist();
576                    self.update_collaborative_reboot().await;
577                }
578                responder.send()
579            }
580        }
581    }
582
583    fn handle_state_request(&self, req: fnet_migration::StateRequest) -> Result<(), fidl::Error> {
584        let Migration {
585            current_boot,
586            persisted: Persisted { user, automated, rollback: _, rollback_history: _ },
587            persistence: _,
588            collaborative_reboot: _,
589            crash_reporter: _,
590        } = self;
591        match req {
592            fnet_migration::StateRequest::GetNetstackVersion { responder } => {
593                responder.send(&fnet_migration::InEffectVersion {
594                    current_boot: current_boot.version().into(),
595                    user: (*user).map(Into::into),
596                    automated: (*automated).map(Into::into),
597                })
598            }
599        }
600    }
601
602    async fn handle_request(&mut self, req: ServiceRequest) -> Result<(), fidl::Error> {
603        match req {
604            ServiceRequest::Control(r) => self.handle_control_request(r).await,
605            ServiceRequest::State(r) => self.handle_state_request(r),
606        }
607    }
608}
609
610struct InspectNodes {
611    automated_setting: fuchsia_inspect::UintProperty,
612    user_setting: fuchsia_inspect::UintProperty,
613    rollback_state: fuchsia_inspect::StringProperty,
614}
615
616impl InspectNodes {
617    fn new<P, CR, F>(inspector: &fuchsia_inspect::Inspector, m: &Migration<P, CR, F>) -> Self {
618        let root = inspector.root();
619        let Migration {
620            current_boot,
621            persisted: Persisted { automated, user, rollback, rollback_history },
622            ..
623        } = m;
624        let automated_setting = root.create_uint(
625            "automated_setting",
626            NetstackVersion::optional_inspect_uint_value(automated),
627        );
628        let user_setting =
629            root.create_uint("user_setting", NetstackVersion::optional_inspect_uint_value(user));
630
631        let rollback_state = root.create_string("rollback_state", format!("{rollback:?}"));
632
633        // Record immutable state once, instead of keeping track of a property node.
634        root.record_uint("current_boot", current_boot.version().inspect_uint_value());
635        let forced_netstack2 = match current_boot {
636            RollbackNetstackVersion::Netstack2 | RollbackNetstackVersion::Netstack3 => false,
637            RollbackNetstackVersion::ForceNetstack2
638            | RollbackNetstackVersion::ForceNetstack2Indefinitely => true,
639        };
640        root.record_bool("forced_netstack2", forced_netstack2);
641        root.record_uint("current_epoch", CURRENT_EPOCH.into());
642        root.record_string("rollback_history", format!("{rollback_history:?}"));
643
644        Self { automated_setting, user_setting, rollback_state }
645    }
646
647    fn update<P, CR, F>(&self, m: &Migration<P, CR, F>) {
648        let Migration {
649            persisted: Persisted { automated, user, rollback, rollback_history: _ },
650            ..
651        } = m;
652        let Self { automated_setting, user_setting, rollback_state } = self;
653        automated_setting.set(NetstackVersion::optional_inspect_uint_value(automated));
654        user_setting.set(NetstackVersion::optional_inspect_uint_value(user));
655        rollback_state.set(&format!("{rollback:?}"));
656    }
657}
658
659/// Wraps communication with metrics (cobalt) server.
660struct MetricsLogger {
661    logger: Option<fmetrics::MetricEventLoggerProxy>,
662}
663
664impl MetricsLogger {
665    async fn new() -> Self {
666        let (logger, server_end) =
667            fidl::endpoints::create_proxy::<fmetrics::MetricEventLoggerMarker>();
668
669        let factory = match fuchsia_component::client::connect_to_protocol::<
670            fmetrics::MetricEventLoggerFactoryMarker,
671        >() {
672            Ok(f) => f,
673            Err(e) => {
674                warn!("can't connect to logger factory {e:?}");
675                return Self { logger: None };
676            }
677        };
678
679        match factory
680            .create_metric_event_logger(
681                &fmetrics::ProjectSpec {
682                    customer_id: Some(metrics_registry::CUSTOMER_ID),
683                    project_id: Some(metrics_registry::PROJECT_ID),
684                    ..Default::default()
685                },
686                server_end,
687            )
688            .await
689        {
690            Ok(Ok(())) => Self { logger: Some(logger) },
691            Ok(Err(e)) => {
692                warn!("can't create event logger {e:?}");
693                Self { logger: None }
694            }
695            Err(e) => {
696                warn!("error connecting to metric event logger {e:?}");
697                Self { logger: None }
698            }
699        }
700    }
701
702    /// Logs metrics from `migration` to the metrics server.
703    async fn log_metrics<P, CR, F>(&self, migration: &Migration<P, CR, F>) {
704        let logger = if let Some(logger) = self.logger.as_ref() {
705            logger
706        } else {
707            // Silently don't log metrics if we didn't manage to create a
708            // logger, warnings are emitted upon creation.
709            return;
710        };
711
712        let current_boot = match migration.current_boot {
713            RollbackNetstackVersion::Netstack2
714            | RollbackNetstackVersion::ForceNetstack2
715            | RollbackNetstackVersion::ForceNetstack2Indefinitely => {
716                metrics_registry::StackMigrationCurrentBootMetricDimensionNetstackVersion::Netstack2
717            }
718            RollbackNetstackVersion::Netstack3 => {
719                metrics_registry::StackMigrationCurrentBootMetricDimensionNetstackVersion::Netstack3
720            }
721        }
722        .as_event_code();
723        let user = match migration.persisted.user {
724            None => metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion::NoSelection,
725            Some(NetstackVersion::Netstack2) => metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion::Netstack2,
726            Some(NetstackVersion::Netstack3) => metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion::Netstack3,
727        }
728        .as_event_code();
729        let automated = match migration.persisted.automated {
730            None => metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion::NoSelection,
731            Some(NetstackVersion::Netstack2) => metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion::Netstack2,
732            Some(NetstackVersion::Netstack3) => metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion::Netstack3,
733        }.as_event_code();
734        let rollback_state = compute_state_metric(migration).as_event_code();
735        for (metric_id, event_code) in [
736            (metrics_registry::STACK_MIGRATION_CURRENT_BOOT_METRIC_ID, current_boot),
737            (metrics_registry::STACK_MIGRATION_USER_SETTING_METRIC_ID, user),
738            (metrics_registry::STACK_MIGRATION_AUTOMATED_SETTING_METRIC_ID, automated),
739            (metrics_registry::STACK_MIGRATION_STATE_METRIC_ID, rollback_state),
740        ] {
741            let occurrence_count = 1;
742            logger
743                .log_occurrence(metric_id, occurrence_count, &[event_code][..])
744                .await
745                .map(|r| {
746                    r.unwrap_or_else(|e| warn!("error reported logging metric {metric_id} {e:?}"))
747                })
748                .unwrap_or_else(|fidl_error| {
749                    warn!("error logging metric {metric_id} {fidl_error:?}")
750                });
751        }
752    }
753}
754
755fn compute_state_metric<P, CR, F>(
756    migration: &Migration<P, CR, F>,
757) -> metrics_registry::StackMigrationStateMetricDimensionMigrationState {
758    use metrics_registry::StackMigrationStateMetricDimensionMigrationState as state_metric;
759    let Migration {
760        current_boot,
761        persisted: Persisted { automated, user: _, rollback, rollback_history: _ },
762        persistence: _,
763        collaborative_reboot: _,
764        crash_reporter: _,
765    } = migration;
766
767    match (current_boot, automated, rollback) {
768        (RollbackNetstackVersion::Netstack2, Some(NetstackVersion::Netstack2) | None, _) => {
769            state_metric::NotStarted
770        }
771        (RollbackNetstackVersion::Netstack2, Some(NetstackVersion::Netstack3), _) => {
772            state_metric::Scheduled
773        }
774        (RollbackNetstackVersion::ForceNetstack2Indefinitely, _, _) => state_metric::Abandoned,
775        (RollbackNetstackVersion::ForceNetstack2, Some(NetstackVersion::Netstack3), _) => {
776            state_metric::RolledBack
777        }
778        // NB: If a device observes the automated setting switch to Netstack2
779        // while it's current boot is `ForceNetstack2` it won't have any need
780        // to schedule a reboot, and it will perpetually stay in
781        // `ForceNetstack`. Rather that perpetually reporting
782        // `state_metric::RolledBack`, it's more useful from a diagnostics
783        // perspective to consider this device `state_metric::NotStarted`.
784        (RollbackNetstackVersion::ForceNetstack2, Some(NetstackVersion::Netstack2) | None, _) => {
785            state_metric::NotStarted
786        }
787        (RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack2) | None, _) => {
788            state_metric::Canceled
789        }
790        (RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack3), None) => {
791            state_metric::InProgress
792        }
793        (
794            RollbackNetstackVersion::Netstack3,
795            Some(NetstackVersion::Netstack3),
796            Some(rollback::Persisted::HealthcheckFailures(f)),
797        ) => {
798            if *f >= rollback::MAX_FAILED_HEALTHCHECKS {
799                state_metric::Failed
800            } else {
801                state_metric::InProgress
802            }
803        }
804        (
805            RollbackNetstackVersion::Netstack3,
806            Some(NetstackVersion::Netstack3),
807            Some(rollback::Persisted::Success),
808        ) => state_metric::Success,
809    }
810}
811
812#[fuchsia::main]
813pub async fn main() {
814    info!("running netstack migration service");
815
816    let mut fs = ServiceFs::new();
817    let _: &mut ServiceFsDir<'_, _> = fs
818        .dir("svc")
819        .add_fidl_service(|rs: fnet_migration::ControlRequestStream| {
820            rs.map(|req| req.map(ServiceRequest::Control)).left_stream()
821        })
822        .add_fidl_service(|rs: fnet_migration::StateRequestStream| {
823            rs.map(|req| req.map(ServiceRequest::State)).right_stream()
824        });
825    let _: &mut ServiceFs<_> =
826        fs.take_and_serve_directory_handle().expect("failed to take out directory handle");
827
828    let mut migration = Migration::new(DataPersistenceProvider {}, Scheduler {}, Reporter {});
829    main_inner(
830        &mut migration,
831        fs.fuse().flatten_unordered(None),
832        rollback::FidlHttpFetcher::new(),
833        rollback::new_healthcheck_stream(),
834    )
835    .await
836}
837
838async fn main_inner<
839    P: PersistenceProvider,
840    CR: CollaborativeRebootScheduler,
841    R: CrashReporter,
842    H: rollback::HttpFetcher + Send + 'static,
843    T: Stream<Item = ()> + Send + 'static,
844    SR: Stream<Item = Result<ServiceRequest, fidl::Error>>,
845>(
846    migration: &mut Migration<P, CR, R>,
847    service_request_stream: SR,
848    http_fetcher: H,
849    healthcheck_tick: T,
850) {
851    let inspector = fuchsia_inspect::component::inspector();
852    let _inspect_server =
853        inspect_runtime::publish(inspector, inspect_runtime::PublishOptions::default())
854            .expect("failed to serve inspector");
855    let inspect_nodes = InspectNodes::new(inspector, &migration);
856
857    let metrics_logger = MetricsLogger::new().await;
858
859    let (desired_version_sender, desired_version_receiver) = mpsc::unbounded();
860    let (rollback_state_sender, rollback_state_receiver) = mpsc::unbounded();
861
862    // NB: Check if we failed to migrate to NS3 in the previous boot.
863    //
864    // It's important to perform this check before the rollback state is reset
865    // below.
866    let current_boot_is_rollback = match migration.persisted.rollback {
867        Some(rollback::Persisted::Success) | None => false,
868        Some(rollback::Persisted::HealthcheckFailures(f)) => f >= rollback::MAX_FAILED_HEALTHCHECKS,
869    };
870
871    let rollback_state =
872        rollback::State::new(migration.persisted.rollback, migration.current_boot.version());
873
874    // Update rollback persistence immediately in case the device reboots before
875    // the rollback module has time to send an asynchronous update. This is
876    // required for correctness if Netstack3 is crashing on startup, or in the
877    // following case:
878    //
879    // 1. Device fails to migrate to Netstack3 and persists
880    //    HealthcheckFailures(MAX_FAILED_HEALTHCHECKS), which will force
881    //    Netstack2 on subsequent boots.
882    // 2. Device reboots into Netstack2, sees that it should be running
883    //    Netstack3, and schedules a reboot without clearing the failures.
884    // 3. Device reboots back into Netstack3 and sees that it should schedule
885    //    a reboot because the persisted failures are above the limit.
886    migration.update_rollback_state(rollback_state.persisted()).await;
887
888    Task::spawn(async move {
889        rollback::run(
890            rollback_state,
891            http_fetcher,
892            desired_version_receiver,
893            rollback_state_sender,
894            pin!(healthcheck_tick),
895        )
896        .await
897    })
898    .detach();
899
900    enum Action {
901        ServiceRequest(Result<ServiceRequest, fidl::Error>),
902        LogMetrics,
903        UpdateRollbackState(rollback::Persisted),
904        FileRollbackCrashReport,
905    }
906
907    let metrics_logging_interval = fuchsia_async::MonotonicDuration::from_hours(1);
908    let mut stream: futures::stream::SelectAll<Pin<Box<dyn Stream<Item = Action>>>> =
909        futures::stream::SelectAll::new();
910
911    // Always log metrics once on startup then periodically log new values so
912    // the aggregation window always contains one sample of the current
913    // settings.
914    stream.push(Box::pin(Box::new(
915        futures::stream::once(futures::future::ready(()))
916            .chain(fuchsia_async::Interval::new(metrics_logging_interval))
917            .map(|()| Action::LogMetrics),
918    )));
919    stream.push(Box::pin(Box::new(Box::pin(service_request_stream.map(Action::ServiceRequest)))));
920    stream.push(Box::pin(rollback_state_receiver.map(|state| Action::UpdateRollbackState(state))));
921
922    // If the current boot is a rollback, schedule a crash report to be filed
923    // soon. Don't file it right away, as during early boot there is not much
924    // useful diagnostics data available.
925    if current_boot_is_rollback {
926        stream.push(Box::pin(
927            futures::stream::once(fuchsia_async::Timer::new(R::ROLLBACK_CRASH_REPORT_DELAY))
928                .map(|()| Action::FileRollbackCrashReport),
929        ));
930    }
931
932    while let Some(action) = stream.next().await {
933        match action {
934            Action::ServiceRequest(req) => {
935                let result = match req {
936                    Ok(req) => migration.handle_request(req).await,
937                    Err(e) => Err(e),
938                };
939                // Always update inspector state after handling a request.
940                inspect_nodes.update(&migration);
941
942                // Send the desired netstack version to the rollback mechanism,
943                // but ignore the "forced" setting. The "forced" setting comes
944                // from the rollback mechanism, and sending that signal back
945                // into it would cause a the mechanism to incorrectly detect
946                // a cancelation.
947                match desired_version_sender.unbounded_send(
948                    migration.persisted.desired_netstack_version().version_ignoring_force(),
949                ) {
950                    Ok(()) => (),
951                    Err(e) => {
952                        error!("error sending update to rollback module: {:?}", e);
953                    }
954                }
955
956                match result {
957                    Ok(()) => (),
958                    Err(e) => {
959                        if !e.is_closed() {
960                            error!("error processing FIDL request {:?}", e)
961                        }
962                    }
963                }
964            }
965            Action::LogMetrics => {
966                metrics_logger.log_metrics(&migration).await;
967            }
968            Action::UpdateRollbackState(new_state) => {
969                migration.update_rollback_state(new_state).await;
970                // Always update inspector state when the rollback state
971                // changes.
972                inspect_nodes.update(&migration);
973            }
974            Action::FileRollbackCrashReport => {
975                migration.crash_reporter.file_report(CrashReportReason::RolledBack);
976            }
977        }
978    }
979}
980
981#[cfg(test)]
982mod tests {
983    use super::*;
984    use assert_matches::assert_matches;
985    use async_utils::event::{Event, EventWait};
986    use diagnostics_assertions::assert_data_tree;
987    use fidl::Peered as _;
988    use fidl_fuchsia_net_http as fnet_http;
989    use fuchsia_async::TimeoutExt;
990    use futures::FutureExt;
991    use std::cell::RefCell;
992    use std::rc::Rc;
993    use std::time::Duration;
994    use test_case::test_case;
995
996    #[derive(Default, Clone)]
997    struct InMemory {
998        file: Rc<RefCell<Option<Vec<u8>>>>,
999    }
1000
1001    impl InMemory {
1002        fn with_persisted(p: Persisted) -> Self {
1003            let mut s = Self::default();
1004            p.save(s.open_writer().unwrap());
1005            s
1006        }
1007    }
1008
1009    impl PersistenceProvider for InMemory {
1010        type Writer = Self;
1011        type Reader = std::io::Cursor<Vec<u8>>;
1012
1013        fn open_writer(&mut self) -> std::io::Result<Self::Writer> {
1014            *self.file.borrow_mut() = Some(Vec::new());
1015            Ok(self.clone())
1016        }
1017
1018        fn open_reader(&self) -> std::io::Result<Self::Reader> {
1019            self.file
1020                .borrow()
1021                .clone()
1022                .map(std::io::Cursor::new)
1023                .ok_or_else(|| std::io::ErrorKind::NotFound.into())
1024        }
1025    }
1026
1027    impl std::io::Write for InMemory {
1028        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1029            let r = self.file.borrow_mut().as_mut().expect("no file open").write(buf);
1030            r
1031        }
1032
1033        fn flush(&mut self) -> std::io::Result<()> {
1034            Ok(())
1035        }
1036    }
1037
1038    struct NoCollaborativeReboot;
1039
1040    impl CollaborativeRebootScheduler for NoCollaborativeReboot {
1041        async fn schedule(
1042            &mut self,
1043            _reason: fpower::CollaborativeRebootReason,
1044            _cancel: Option<zx::EventPair>,
1045        ) {
1046            panic!("unexpectedly attempted to schedule a collaborative reboot");
1047        }
1048    }
1049
1050    #[derive(Default)]
1051    struct FakeCollaborativeReboot {
1052        req: Option<zx::EventPair>,
1053    }
1054
1055    impl CollaborativeRebootScheduler for FakeCollaborativeReboot {
1056        async fn schedule(
1057            &mut self,
1058            reason: fpower::CollaborativeRebootReason,
1059            cancel: Option<zx::EventPair>,
1060        ) {
1061            assert_eq!(reason, fpower::CollaborativeRebootReason::NetstackMigration);
1062            let cancel = cancel.expect("cancellation signal must be provided");
1063            assert_eq!(self.req.replace(cancel), None, "attempted to schedule multiple reboots");
1064        }
1065    }
1066
1067    struct NoCrashReports;
1068
1069    impl CrashReporter for NoCrashReports {
1070        const ROLLBACK_CRASH_REPORT_DELAY: Duration = Duration::ZERO;
1071        fn file_report(&mut self, reason: CrashReportReason) {
1072            panic!("unexpectedly attemped to file a crash report: {reason:?}")
1073        }
1074    }
1075
1076    #[derive(Default)]
1077    struct FakeCrashReporter {
1078        reports: Vec<CrashReportReason>,
1079    }
1080
1081    impl CrashReporter for FakeCrashReporter {
1082        const ROLLBACK_CRASH_REPORT_DELAY: Duration = Duration::from_millis(1);
1083        fn file_report(&mut self, reason: CrashReportReason) {
1084            self.reports.push(reason)
1085        }
1086    }
1087
1088    /// Signals an `EventWait` once the `target` crash reports have been filed.
1089    struct AwaitCrashReports {
1090        target: Vec<CrashReportReason>,
1091        reports: Vec<CrashReportReason>,
1092        event: Event,
1093    }
1094
1095    impl AwaitCrashReports {
1096        fn new(target: Vec<CrashReportReason>) -> (Self, EventWait) {
1097            let event = Event::new();
1098            let wait = event.wait();
1099            (Self { target, reports: Vec::new(), event }, wait)
1100        }
1101    }
1102
1103    impl CrashReporter for AwaitCrashReports {
1104        const ROLLBACK_CRASH_REPORT_DELAY: Duration = Duration::from_millis(1);
1105        fn file_report(&mut self, reason: CrashReportReason) {
1106            self.reports.push(reason);
1107            if self.reports == self.target {
1108                assert!(self.event.signal());
1109            }
1110        }
1111    }
1112
1113    fn serve_migration<
1114        P: PersistenceProvider,
1115        CR: CollaborativeRebootScheduler,
1116        R: CrashReporter,
1117    >(
1118        migration: Migration<P, CR, R>,
1119    ) -> (
1120        impl futures::Future<Output = Migration<P, CR, R>>,
1121        fnet_migration::ControlProxy,
1122        fnet_migration::StateProxy,
1123    ) {
1124        let (control, control_server) =
1125            fidl::endpoints::create_proxy_and_stream::<fnet_migration::ControlMarker>();
1126        let (state, state_server) =
1127            fidl::endpoints::create_proxy_and_stream::<fnet_migration::StateMarker>();
1128
1129        let fut = {
1130            let control =
1131                control_server.map(|req| ServiceRequest::Control(req.expect("control error")));
1132            let state = state_server.map(|req| ServiceRequest::State(req.expect("state error")));
1133            futures::stream::select(control, state).fold(migration, |mut migration, req| async {
1134                migration.handle_request(req).await.expect("handling request");
1135                migration
1136            })
1137        };
1138        (fut, control, state)
1139    }
1140
1141    #[test_case(Persisted{
1142        user: Some(NetstackVersion::Netstack2),
1143        automated: None,
1144        rollback: None,
1145        rollback_history: None,
1146    }; "user_netstack2")]
1147    #[test_case(Persisted{
1148        user: Some(NetstackVersion::Netstack3),
1149        automated: None,
1150        rollback: None,
1151        rollback_history: None,
1152    }; "user_netstack3")]
1153    #[test_case(Persisted{
1154        user: None,
1155        automated: None,
1156        rollback: None,
1157        rollback_history: None,
1158    }; "none")]
1159    #[test_case(Persisted{
1160        user: None,
1161        automated: Some(NetstackVersion::Netstack2),
1162        rollback: None,
1163        rollback_history: None,
1164    }; "automated_netstack2")]
1165    #[test_case(Persisted{
1166        user: None,
1167        automated: Some(NetstackVersion::Netstack3),
1168        rollback: None,
1169        rollback_history: None,
1170    }; "automated_netstack3")]
1171    #[test_case(Persisted{
1172        user: None,
1173        automated: None,
1174        rollback: Some(rollback::Persisted::Success),
1175        rollback_history: None,
1176    }; "rollback_success")]
1177    #[test_case(Persisted{
1178        user: None,
1179        automated: None,
1180        rollback: Some(rollback::Persisted::HealthcheckFailures(0)),
1181        rollback_history: Some(RollbackHistory {epoch: CURRENT_EPOCH, count: 3}),
1182    }; "rollback_history")]
1183    #[test_case(Persisted{
1184        user: Some(NetstackVersion::Netstack2),
1185        automated: Some(NetstackVersion::Netstack3),
1186        rollback: Some(rollback::Persisted::HealthcheckFailures(5)),
1187        rollback_history: Some(RollbackHistory {epoch: CURRENT_EPOCH, count: 3}),
1188    }; "all")]
1189    #[fuchsia::test(add_test_attr = false)]
1190    fn persist_save_load(v: Persisted) {
1191        let mut m = InMemory::default();
1192        v.save(m.open_writer().unwrap());
1193        assert_eq!(Persisted::load(m.open_reader().unwrap()), v);
1194    }
1195
1196    #[fuchsia::test]
1197    fn uses_defaults_if_no_persistence() {
1198        let m = Migration::new(InMemory::default(), NoCollaborativeReboot, NoCrashReports);
1199        let Migration {
1200            current_boot,
1201            persisted: Persisted { user, automated, rollback: _, rollback_history: _ },
1202            persistence: _,
1203            collaborative_reboot: _,
1204            crash_reporter: _,
1205        } = m;
1206        assert_eq!(current_boot.version(), DEFAULT_NETSTACK);
1207        assert_eq!(user, None);
1208        assert_eq!(automated, None);
1209    }
1210
1211    #[test_case(
1212        None, Some(NetstackVersion::Netstack3), None, None, NetstackVersion::Netstack3;
1213        "automated_ns3")]
1214    #[test_case(
1215        None, Some(NetstackVersion::Netstack2), None, None, NetstackVersion::Netstack2;
1216        "automated_ns2")]
1217    #[test_case(
1218        Some(NetstackVersion::Netstack3),
1219        Some(NetstackVersion::Netstack2),
1220        Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS)),
1221        Some(RollbackHistory{epoch: CURRENT_EPOCH, count: MAX_ROLLBACKS_PER_EPOCH}),
1222        NetstackVersion::Netstack3;
1223        "user_ns3_override")]
1224    #[test_case(
1225        Some(NetstackVersion::Netstack2),
1226        Some(NetstackVersion::Netstack3),
1227        Some(rollback::Persisted::Success),
1228        None,
1229        NetstackVersion::Netstack2;
1230        "user_ns2_override")]
1231    #[test_case(
1232        Some(NetstackVersion::Netstack2),
1233        None,
1234        None,
1235        None,
1236        NetstackVersion::Netstack2; "user_ns2")]
1237    #[test_case(
1238        Some(NetstackVersion::Netstack3),
1239        None,
1240        None,
1241        None,
1242        NetstackVersion::Netstack3; "user_ns3")]
1243    #[test_case(
1244        None,
1245        Some(NetstackVersion::Netstack3),
1246        Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS)),
1247        None,
1248        NetstackVersion::Netstack2; "rollback_to_ns2")]
1249    #[test_case(
1250        None,
1251        Some(NetstackVersion::Netstack3),
1252        Some(rollback::Persisted::HealthcheckFailures(0)),
1253        Some(RollbackHistory{epoch: CURRENT_EPOCH, count: MAX_ROLLBACKS_PER_EPOCH}),
1254        NetstackVersion::Netstack2; "indefinitely_rolled_back_to_ns2")]
1255    #[test_case(
1256        None,
1257        Some(NetstackVersion::Netstack3),
1258        Some(rollback::Persisted::HealthcheckFailures(0)),
1259        Some(RollbackHistory{epoch: CURRENT_EPOCH - 1, count: MAX_ROLLBACKS_PER_EPOCH}),
1260        NetstackVersion::Netstack3; "ignore_rollback_history_from_old_epoch")]
1261    #[test_case(None, None, None, None, DEFAULT_NETSTACK; "default")]
1262    #[fuchsia::test]
1263    async fn get_netstack_version(
1264        p_user: Option<NetstackVersion>,
1265        p_automated: Option<NetstackVersion>,
1266        p_rollback: Option<rollback::Persisted>,
1267        p_history: Option<RollbackHistory>,
1268        expect: NetstackVersion,
1269    ) {
1270        let m = Migration::new(
1271            InMemory::with_persisted(Persisted {
1272                user: p_user,
1273                automated: p_automated,
1274                rollback: p_rollback,
1275                rollback_history: p_history,
1276            }),
1277            NoCollaborativeReboot,
1278            NoCrashReports,
1279        );
1280        let Migration {
1281            current_boot,
1282            persisted: Persisted { user, automated, rollback: _, rollback_history: _ },
1283            persistence: _,
1284            collaborative_reboot: _,
1285            crash_reporter: _,
1286        } = &m;
1287        assert_eq!(current_boot.version(), expect);
1288        assert_eq!(*user, p_user);
1289        assert_eq!(*automated, p_automated);
1290
1291        let (serve, _, state) = serve_migration(m);
1292        let fut = async move {
1293            let fnet_migration::InEffectVersion { current_boot, user, automated } =
1294                state.get_netstack_version().await.expect("get netstack version");
1295            let expect = expect.into();
1296            let p_user = p_user.map(Into::into);
1297            let p_automated = p_automated.map(Into::into);
1298            assert_eq!(current_boot, expect);
1299            assert_eq!(user, p_user);
1300            assert_eq!(automated, p_automated);
1301        };
1302        let (_, ()): (Migration<_, _, _>, _) = futures::future::join(serve, fut).await;
1303    }
1304
1305    #[derive(Debug, Copy, Clone)]
1306    enum SetMechanism {
1307        User,
1308        Automated,
1309    }
1310
1311    #[test_case(SetMechanism::User, NetstackVersion::Netstack2; "set_user_ns2")]
1312    #[test_case(SetMechanism::User, NetstackVersion::Netstack3; "set_user_ns3")]
1313    #[test_case(SetMechanism::Automated, NetstackVersion::Netstack2; "set_automated_ns2")]
1314    #[test_case(SetMechanism::Automated, NetstackVersion::Netstack3; "set_automated_ns3")]
1315    #[fuchsia::test]
1316    async fn set_netstack_version(mechanism: SetMechanism, set_version: NetstackVersion) {
1317        let m = Migration::new(
1318            InMemory::with_persisted(Default::default()),
1319            FakeCollaborativeReboot::default(),
1320            NoCrashReports,
1321        );
1322        let (serve, control, _) = serve_migration(m);
1323        let fut = async move {
1324            let setting = fnet_migration::VersionSetting { version: set_version.into() };
1325            match mechanism {
1326                SetMechanism::User => control
1327                    .set_user_netstack_version(Some(&setting))
1328                    .await
1329                    .expect("set user netstack version"),
1330                SetMechanism::Automated => control
1331                    .set_automated_netstack_version(Some(&setting))
1332                    .await
1333                    .expect("set automated netstack version"),
1334            }
1335        };
1336        let (migration, ()) = futures::future::join(serve, fut).await;
1337
1338        let validate_versions = |m: &Migration<_, _, _>, current| {
1339            let Migration {
1340                current_boot,
1341                persisted: Persisted { user, automated, rollback: _, rollback_history: _ },
1342                persistence: _,
1343                collaborative_reboot: _,
1344                crash_reporter: _,
1345            } = m;
1346            assert_eq!(current_boot.version(), current);
1347            match mechanism {
1348                SetMechanism::User => {
1349                    assert_eq!(*user, Some(set_version));
1350                    assert_eq!(*automated, None);
1351                }
1352                SetMechanism::Automated => {
1353                    assert_eq!(*user, None);
1354                    assert_eq!(*automated, Some(set_version));
1355                }
1356            }
1357        };
1358
1359        validate_versions(&migration, DEFAULT_NETSTACK);
1360        // There should only be a collaborative reboot request if the new
1361        // version differs from the default version.
1362        let cr_req = &migration.collaborative_reboot.scheduler.req;
1363        if set_version != DEFAULT_NETSTACK {
1364            assert_eq!(Ok(false), cr_req.as_ref().expect("there should be a request").is_closed());
1365        } else {
1366            assert_eq!(cr_req, &None);
1367        }
1368
1369        // Check that the setting was properly persisted.
1370        let migration = Migration::new(
1371            migration.persistence,
1372            migration.collaborative_reboot.scheduler,
1373            migration.crash_reporter,
1374        );
1375        validate_versions(&migration, set_version);
1376    }
1377
1378    #[fuchsia::test]
1379    async fn update_rollback_state() {
1380        let mut migration = Migration::new(
1381            InMemory::with_persisted(Persisted {
1382                automated: Some(NetstackVersion::Netstack3),
1383                user: None,
1384                rollback: None,
1385                rollback_history: None,
1386            }),
1387            FakeCollaborativeReboot::default(),
1388            FakeCrashReporter::default(),
1389        );
1390
1391        assert_eq!(migration.current_boot.version(), NetstackVersion::Netstack3);
1392        assert!(migration.collaborative_reboot.scheduler.req.is_none());
1393
1394        // The first update shouldn't schedule a reboot because we haven't
1395        // passed the healthcheck threshold yet.
1396        migration.update_rollback_state(rollback::Persisted::HealthcheckFailures(1)).await;
1397        assert_matches!(
1398            migration.persisted.rollback,
1399            Some(rollback::Persisted::HealthcheckFailures(1))
1400        );
1401        assert!(migration.collaborative_reboot.scheduler.req.is_none());
1402
1403        // This second update should schedule a reboot because we've passed
1404        // the healthcheck limit and want to roll back to Netstack2.
1405        migration
1406            .update_rollback_state(rollback::Persisted::HealthcheckFailures(
1407                rollback::MAX_FAILED_HEALTHCHECKS,
1408            ))
1409            .await;
1410        assert_matches!(
1411            migration.persisted.rollback,
1412            Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS))
1413        );
1414        assert_eq!(
1415            migration
1416                .collaborative_reboot
1417                .scheduler
1418                .req
1419                .as_ref()
1420                .expect("reboot was not scheduled")
1421                .is_closed()
1422                .unwrap(),
1423            false
1424        );
1425
1426        // It should have also filed a crash report
1427        assert_eq!(migration.crash_reporter.reports, vec![CrashReportReason::FailedHealthcheck]);
1428
1429        // This emulates seeing a healthcheck success before rebooting, in which
1430        // case we should see the reboot get canceled.
1431        migration.update_rollback_state(rollback::Persisted::Success).await;
1432        assert_matches!(migration.persisted.rollback, Some(rollback::Persisted::Success));
1433        assert!(
1434            migration.collaborative_reboot.scheduler.req.as_ref().unwrap().is_closed().unwrap()
1435        );
1436
1437        // Ensure that the changes were persisted successfully.
1438        let migration = Migration::new(
1439            migration.persistence,
1440            migration.collaborative_reboot.scheduler,
1441            migration.crash_reporter,
1442        );
1443        assert_matches!(migration.persisted.rollback, Some(rollback::Persisted::Success));
1444    }
1445
1446    #[test_case(SetMechanism::User, Some(NetstackVersion::Netstack2), false)]
1447    #[test_case(SetMechanism::User, Some(NetstackVersion::Netstack3), true)]
1448    #[test_case(SetMechanism::User, None, false)]
1449    #[test_case(SetMechanism::Automated, Some(NetstackVersion::Netstack2), false)]
1450    #[test_case(SetMechanism::Automated, Some(NetstackVersion::Netstack3), true)]
1451    #[test_case(SetMechanism::Automated, None, true)]
1452    #[fuchsia::test]
1453    async fn cancel_collaborative_reboot(
1454        mechanism: SetMechanism,
1455        version: Option<NetstackVersion>,
1456        expect_canceled: bool,
1457    ) {
1458        let migration = Migration::new(
1459            InMemory::with_persisted(Persisted {
1460                user: None,
1461                automated: None,
1462                rollback: None,
1463                rollback_history: None,
1464            }),
1465            FakeCollaborativeReboot::default(),
1466            NoCrashReports,
1467        );
1468
1469        // Start of by updating the automated setting to Netstack2; this ensures
1470        // there is a pending request to cancel.
1471        let (serve, control, _) = serve_migration(migration);
1472        let fut = async move {
1473            control
1474                .set_automated_netstack_version(Some(&fnet_migration::VersionSetting {
1475                    version: fnet_migration::NetstackVersion::Netstack2,
1476                }))
1477                .await
1478                .expect("set automated netstack version");
1479        };
1480        let (migration, ()) = futures::future::join(serve, fut).await;
1481        let cancel = migration
1482            .collaborative_reboot
1483            .scheduler
1484            .req
1485            .as_ref()
1486            .expect("there should be a request");
1487        assert_eq!(Ok(false), cancel.is_closed());
1488
1489        // Update the setting based on the test parameters
1490        let (serve, control, _) = serve_migration(migration);
1491        let fut = async move {
1492            let setting = version.map(|v| fnet_migration::VersionSetting { version: v.into() });
1493            match mechanism {
1494                SetMechanism::User => control
1495                    .set_user_netstack_version(setting.as_ref())
1496                    .await
1497                    .expect("set user netstack version"),
1498                SetMechanism::Automated => control
1499                    .set_automated_netstack_version(setting.as_ref())
1500                    .await
1501                    .expect("set automated netstack version"),
1502            }
1503        };
1504        let (migration, ()) = futures::future::join(serve, fut).await;
1505
1506        let cancel = migration
1507            .collaborative_reboot
1508            .scheduler
1509            .req
1510            .as_ref()
1511            .expect("there should be a request");
1512        assert_eq!(Ok(expect_canceled), cancel.is_closed());
1513    }
1514
1515    #[test_case(SetMechanism::User)]
1516    #[test_case(SetMechanism::Automated)]
1517    #[fuchsia::test]
1518    async fn clear_netstack_version(mechanism: SetMechanism) {
1519        const PREVIOUS_VERSION: NetstackVersion = NetstackVersion::Netstack2;
1520        let m = Migration::new(
1521            InMemory::with_persisted(Persisted {
1522                user: Some(PREVIOUS_VERSION),
1523                automated: Some(PREVIOUS_VERSION),
1524                rollback: None,
1525                rollback_history: None,
1526            }),
1527            NoCollaborativeReboot,
1528            NoCrashReports,
1529        );
1530        let (serve, control, _) = serve_migration(m);
1531        let fut = async move {
1532            match mechanism {
1533                SetMechanism::User => control
1534                    .set_user_netstack_version(None)
1535                    .await
1536                    .expect("set user netstack version"),
1537                SetMechanism::Automated => control
1538                    .set_automated_netstack_version(None)
1539                    .await
1540                    .expect("set automated netstack version"),
1541            }
1542        };
1543        let (migration, ()) = futures::future::join(serve, fut).await;
1544
1545        let validate_versions = |m: &Migration<_, _, _>| {
1546            let Migration {
1547                current_boot,
1548                persisted: Persisted { user, automated, rollback: _, rollback_history: _ },
1549                persistence: _,
1550                collaborative_reboot: _,
1551                crash_reporter: _,
1552            } = m;
1553            assert_eq!(current_boot.version(), PREVIOUS_VERSION);
1554            match mechanism {
1555                SetMechanism::User => {
1556                    assert_eq!(*user, None);
1557                    assert_eq!(*automated, Some(PREVIOUS_VERSION));
1558                }
1559                SetMechanism::Automated => {
1560                    assert_eq!(*user, Some(PREVIOUS_VERSION));
1561                    assert_eq!(*automated, None);
1562                }
1563            }
1564        };
1565
1566        validate_versions(&migration);
1567        // Check that the setting was properly persisted.
1568        let migration = Migration::new(
1569            migration.persistence,
1570            migration.collaborative_reboot.scheduler,
1571            migration.crash_reporter,
1572        );
1573        validate_versions(&migration);
1574    }
1575
1576    #[fuchsia::test]
1577    async fn inspect() {
1578        let mut m = Migration::new(
1579            InMemory::with_persisted(Persisted {
1580                user: Some(NetstackVersion::Netstack2),
1581                automated: Some(NetstackVersion::Netstack3),
1582                rollback: None,
1583                rollback_history: None,
1584            }),
1585            NoCollaborativeReboot,
1586            NoCrashReports,
1587        );
1588        let inspector = fuchsia_inspect::component::inspector();
1589        let nodes = InspectNodes::new(inspector, &m);
1590        assert_data_tree!(inspector,
1591            root: {
1592                current_boot: 2u64,
1593                user_setting: 2u64,
1594                automated_setting: 3u64,
1595                rollback_state: "None",
1596                forced_netstack2: false,
1597                current_epoch: CURRENT_EPOCH,
1598                rollback_history: format!(
1599                    "Some(RollbackHistory {{ epoch: {CURRENT_EPOCH}, count: 0 }})"
1600                ),
1601            }
1602        );
1603
1604        m.persisted = Persisted {
1605            user: None,
1606            automated: Some(NetstackVersion::Netstack2),
1607            rollback: None,
1608            rollback_history: None,
1609        };
1610        nodes.update(&m);
1611        assert_data_tree!(inspector,
1612            root: {
1613                current_boot: 2u64,
1614                user_setting: 0u64,
1615                automated_setting: 2u64,
1616                rollback_state: "None",
1617                forced_netstack2: false,
1618                current_epoch: CURRENT_EPOCH,
1619                rollback_history: format!(
1620                    "Some(RollbackHistory {{ epoch: {CURRENT_EPOCH}, count: 0 }})"
1621                ),
1622            }
1623        );
1624    }
1625
1626    #[fuchsia::test]
1627    async fn inspect_rollback() {
1628        let mut m = Migration::new(
1629            InMemory::with_persisted(Persisted {
1630                user: None,
1631                automated: Some(NetstackVersion::Netstack3),
1632                rollback: Some(rollback::Persisted::HealthcheckFailures(
1633                    rollback::MAX_FAILED_HEALTHCHECKS,
1634                )),
1635                rollback_history: None,
1636            }),
1637            NoCollaborativeReboot,
1638            NoCrashReports,
1639        );
1640        let inspector = fuchsia_inspect::component::inspector();
1641        let nodes = InspectNodes::new(inspector, &m);
1642        assert_data_tree!(inspector,
1643            root: {
1644                current_boot: 2u64,
1645                user_setting: 0u64,
1646                automated_setting: 3u64,
1647                rollback_state: "Some(HealthcheckFailures(5))",
1648                forced_netstack2: true,
1649                current_epoch: CURRENT_EPOCH,
1650                rollback_history: format!(
1651                    "Some(RollbackHistory {{ epoch: {CURRENT_EPOCH}, count: 1 }})"
1652                ),
1653            }
1654        );
1655
1656        m.persisted.rollback =
1657            Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS + 1));
1658        nodes.update(&m);
1659        assert_data_tree!(inspector,
1660            root: {
1661                current_boot: 2u64,
1662                user_setting: 0u64,
1663                automated_setting: 3u64,
1664                rollback_state: "Some(HealthcheckFailures(6))",
1665                forced_netstack2: true,
1666                current_epoch: CURRENT_EPOCH,
1667                rollback_history: format!(
1668                    "Some(RollbackHistory {{ epoch: {CURRENT_EPOCH}, count: 1 }})"
1669                ),
1670            }
1671        );
1672        m.persisted.rollback = Some(rollback::Persisted::Success);
1673        nodes.update(&m);
1674        assert_data_tree!(inspector,
1675            root: {
1676                current_boot: 2u64,
1677                user_setting: 0u64,
1678                automated_setting: 3u64,
1679                rollback_state: "Some(Success)",
1680                forced_netstack2: true,
1681                current_epoch: CURRENT_EPOCH,
1682                rollback_history: format!(
1683                    "Some(RollbackHistory {{ epoch: {CURRENT_EPOCH}, count: 1 }})"
1684                ),
1685            }
1686        );
1687    }
1688
1689    #[test_case(CURRENT_EPOCH, 2, true; "ns2_with_failures_in_current_epoch")]
1690    #[test_case(CURRENT_EPOCH - 1, 3, false; "ns3_with_failures_in_old_epoch")]
1691    #[fuchsia::test]
1692    async fn inspect_rollback_history(epoch: u32, current_boot: u64, forced_netstack2: bool) {
1693        let m = Migration::new(
1694            InMemory::with_persisted(Persisted {
1695                user: None,
1696                automated: Some(NetstackVersion::Netstack3),
1697                rollback: Some(rollback::Persisted::HealthcheckFailures(0)),
1698                rollback_history: Some(RollbackHistory {
1699                    epoch: epoch,
1700                    count: MAX_ROLLBACKS_PER_EPOCH,
1701                }),
1702            }),
1703            NoCollaborativeReboot,
1704            NoCrashReports,
1705        );
1706        let inspector = fuchsia_inspect::component::inspector();
1707        let _nodes = InspectNodes::new(inspector, &m);
1708        assert_data_tree!(inspector,
1709            root: {
1710                current_boot: current_boot,
1711                user_setting: 0u64,
1712                automated_setting: 3u64,
1713                rollback_state: "Some(HealthcheckFailures(0))",
1714                forced_netstack2: forced_netstack2,
1715                current_epoch: CURRENT_EPOCH,
1716                rollback_history: format!(
1717                    "Some(RollbackHistory {{ epoch: {}, count: {} }})",
1718                    CURRENT_EPOCH,
1719                    // Check if the history should have been reset.
1720                    if epoch == CURRENT_EPOCH { MAX_ROLLBACKS_PER_EPOCH } else {0},
1721                ),
1722            }
1723        );
1724    }
1725
1726    #[test_case::test_matrix(
1727    [
1728        (RollbackNetstackVersion::Netstack2, metrics_registry::StackMigrationCurrentBootMetricDimensionNetstackVersion::Netstack2),
1729        (RollbackNetstackVersion::Netstack3, metrics_registry::StackMigrationCurrentBootMetricDimensionNetstackVersion::Netstack3),
1730    ],
1731    [
1732        (None, metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion::NoSelection),
1733        (Some(NetstackVersion::Netstack2), metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion::Netstack2),
1734        (Some(NetstackVersion::Netstack3), metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion::Netstack3),
1735    ],
1736    [
1737        (None, metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion::NoSelection),
1738        (Some(NetstackVersion::Netstack2), metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion::Netstack2),
1739        (Some(NetstackVersion::Netstack3), metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion::Netstack3),
1740    ]
1741    )]
1742    #[fuchsia::test]
1743    async fn metrics_logger(
1744        current_boot: (
1745            RollbackNetstackVersion,
1746            metrics_registry::StackMigrationCurrentBootMetricDimensionNetstackVersion,
1747        ),
1748        user: (
1749            Option<NetstackVersion>,
1750            metrics_registry::StackMigrationUserSettingMetricDimensionNetstackVersion,
1751        ),
1752        automated: (
1753            Option<NetstackVersion>,
1754            metrics_registry::StackMigrationAutomatedSettingMetricDimensionNetstackVersion,
1755        ),
1756    ) {
1757        let (current_boot, current_boot_expect) = current_boot;
1758        let (user, user_expect) = user;
1759        let (automated, automated_expect) = automated;
1760        let mut m = Migration::new(
1761            InMemory::with_persisted(Persisted {
1762                user,
1763                automated,
1764                rollback: None,
1765                rollback_history: None,
1766            }),
1767            NoCollaborativeReboot,
1768            NoCrashReports,
1769        );
1770        m.current_boot = current_boot;
1771        let (logger, mut logger_stream) =
1772            fidl::endpoints::create_proxy_and_stream::<fmetrics::MetricEventLoggerMarker>();
1773
1774        let metrics_logger = MetricsLogger { logger: Some(logger) };
1775
1776        let ((), ()) = futures::future::join(metrics_logger.log_metrics(&m), async {
1777            let expect = [
1778                (
1779                    metrics_registry::STACK_MIGRATION_CURRENT_BOOT_METRIC_ID,
1780                    Some(current_boot_expect.as_event_code()),
1781                ),
1782                (
1783                    metrics_registry::STACK_MIGRATION_USER_SETTING_METRIC_ID,
1784                    Some(user_expect.as_event_code()),
1785                ),
1786                (
1787                    metrics_registry::STACK_MIGRATION_AUTOMATED_SETTING_METRIC_ID,
1788                    Some(automated_expect.as_event_code()),
1789                ),
1790                (
1791                    metrics_registry::STACK_MIGRATION_STATE_METRIC_ID,
1792                    // Note: The rollback state doesn't have a flat expectation.
1793                    // Don't assert on its value here, and instead we directly
1794                    // test it in a separate test case.
1795                    None,
1796                ),
1797            ];
1798            for (id, ev) in expect {
1799                let (metric, occurences, codes, responder) = logger_stream
1800                    .next()
1801                    .await
1802                    .unwrap()
1803                    .unwrap()
1804                    .into_log_occurrence()
1805                    .expect("bad request");
1806                assert_eq!(metric, id);
1807                assert_eq!(occurences, 1);
1808                if let Some(ev) = ev {
1809                    assert_eq!(codes, vec![ev]);
1810                }
1811                responder.send(Ok(())).unwrap();
1812            }
1813        })
1814        .await;
1815    }
1816
1817    #[test_case(
1818        RollbackNetstackVersion::Netstack2, None, None =>
1819        metrics_registry::StackMigrationStateMetricDimensionMigrationState::NotStarted;
1820        "not_started_none"
1821    )]
1822    #[test_case(
1823        RollbackNetstackVersion::Netstack2, Some(NetstackVersion::Netstack2), None =>
1824        metrics_registry::StackMigrationStateMetricDimensionMigrationState::NotStarted;
1825        "not_started_ns2"
1826    )]
1827    #[test_case(
1828        RollbackNetstackVersion::Netstack2, Some(NetstackVersion::Netstack3), None =>
1829        metrics_registry::StackMigrationStateMetricDimensionMigrationState::Scheduled;
1830        "scheduled"
1831    )]
1832    #[test_case(
1833        RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack3), None =>
1834        metrics_registry::StackMigrationStateMetricDimensionMigrationState::InProgress;
1835        "in_progress_none"
1836    )]
1837    #[test_case(
1838        RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack3),
1839        Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS - 1)) =>
1840        metrics_registry::StackMigrationStateMetricDimensionMigrationState::InProgress;
1841        "in_progress_some"
1842    )]
1843    #[test_case(
1844        RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack3),
1845        Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS)) =>
1846        metrics_registry::StackMigrationStateMetricDimensionMigrationState::Failed;
1847        "failed_exact"
1848    )]
1849    #[test_case(
1850        RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack3),
1851        Some(rollback::Persisted::HealthcheckFailures(rollback::MAX_FAILED_HEALTHCHECKS + 1)) =>
1852        metrics_registry::StackMigrationStateMetricDimensionMigrationState::Failed;
1853        "failed_more"
1854    )]
1855    #[test_case(
1856        RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack3),
1857        Some(rollback::Persisted::Success) =>
1858        metrics_registry::StackMigrationStateMetricDimensionMigrationState::Success;
1859        "success"
1860    )]
1861    #[test_case(
1862        RollbackNetstackVersion::Netstack3, None, None =>
1863        metrics_registry::StackMigrationStateMetricDimensionMigrationState::Canceled;
1864        "canceled_none"
1865    )]
1866    #[test_case(
1867        RollbackNetstackVersion::Netstack3, Some(NetstackVersion::Netstack2), None =>
1868        metrics_registry::StackMigrationStateMetricDimensionMigrationState::Canceled;
1869        "canceled_ns2"
1870    )]
1871    #[test_case(
1872        RollbackNetstackVersion::ForceNetstack2, Some(NetstackVersion::Netstack3), None =>
1873        metrics_registry::StackMigrationStateMetricDimensionMigrationState::RolledBack;
1874        "rolled_back"
1875    )]
1876    #[test_case(
1877        RollbackNetstackVersion::ForceNetstack2, Some(NetstackVersion::Netstack2), None =>
1878        metrics_registry::StackMigrationStateMetricDimensionMigrationState::NotStarted;
1879        "rolled_back_becomes_not_started"
1880    )]
1881    #[test_case(
1882        RollbackNetstackVersion::ForceNetstack2Indefinitely, None, None =>
1883        metrics_registry::StackMigrationStateMetricDimensionMigrationState::Abandoned;
1884        "abandoned"
1885    )]
1886    #[fuchsia::test]
1887    fn test_state_metric(
1888        current_boot: RollbackNetstackVersion,
1889        automated: Option<NetstackVersion>,
1890        rollback: Option<rollback::Persisted>,
1891    ) -> metrics_registry::StackMigrationStateMetricDimensionMigrationState {
1892        let mut migration = Migration::new(
1893            InMemory::with_persisted(Persisted {
1894                user: None,
1895                automated,
1896                rollback,
1897                rollback_history: None,
1898            }),
1899            NoCollaborativeReboot,
1900            NoCrashReports,
1901        );
1902        migration.current_boot = current_boot;
1903        compute_state_metric(&migration)
1904    }
1905
1906    /// An in-memory mock-persistence that triggers an event once the target
1907    /// state has been persisted.
1908    #[derive(Clone)]
1909    struct AwaitPersisted {
1910        file: Rc<RefCell<Option<Vec<u8>>>>,
1911        target: Vec<u8>,
1912        event: Event,
1913    }
1914
1915    impl AwaitPersisted {
1916        fn with_persisted(start: Persisted, target: &Persisted) -> (Self, EventWait) {
1917            let event = Event::new();
1918            let wait = event.wait();
1919            let target_bytes = serde_json::to_vec(target).expect("failed to serialize target");
1920            let mut s = Self { file: Default::default(), target: target_bytes, event };
1921            start.save(s.open_writer().unwrap());
1922            (s, wait)
1923        }
1924    }
1925
1926    impl PersistenceProvider for AwaitPersisted {
1927        type Writer = Self;
1928        type Reader = std::io::Cursor<Vec<u8>>;
1929
1930        fn open_writer(&mut self) -> std::io::Result<Self::Writer> {
1931            *self.file.borrow_mut() = Some(Vec::new());
1932            Ok(self.clone())
1933        }
1934
1935        fn open_reader(&self) -> std::io::Result<Self::Reader> {
1936            self.file
1937                .borrow()
1938                .clone()
1939                .map(std::io::Cursor::new)
1940                .ok_or_else(|| std::io::ErrorKind::NotFound.into())
1941        }
1942    }
1943
1944    impl std::io::Write for AwaitPersisted {
1945        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1946            let r = self.file.borrow_mut().as_mut().expect("no file open").write(buf);
1947            if self.file.borrow().as_ref().expect("no_file_open") == &self.target {
1948                let _: bool = self.event.signal();
1949            }
1950            r
1951        }
1952
1953        fn flush(&mut self) -> std::io::Result<()> {
1954            Ok(())
1955        }
1956    }
1957
1958    #[fuchsia::test]
1959    async fn migrate_to_ns3_success() {
1960        let start = Persisted {
1961            user: None,
1962            automated: Some(NetstackVersion::Netstack3),
1963            rollback: None,
1964            rollback_history: None,
1965        };
1966        let target = Persisted {
1967            user: None,
1968            automated: Some(NetstackVersion::Netstack3),
1969            rollback: Some(rollback::Persisted::Success),
1970            rollback_history: Some(RollbackHistory { epoch: CURRENT_EPOCH, count: 0 }),
1971        };
1972
1973        let (persistence, mut wait) = AwaitPersisted::with_persisted(start, &target);
1974        let mut migration = Migration::new(persistence, NoCollaborativeReboot, NoCrashReports);
1975        // No service requests.
1976        let service_request_stream = futures::stream::pending();
1977        // A health check that always succeeds.
1978        let mock_healthcheck = rollback::testutil::MockHttpRequester(|| {
1979            Ok(fnet_http::Response { error: None, status_code: Some(204), ..Default::default() })
1980        });
1981        let healthcheck_tick = futures::stream::once(futures::future::ready(()));
1982
1983        {
1984            let main_fut = main_inner(
1985                &mut migration,
1986                service_request_stream,
1987                mock_healthcheck,
1988                healthcheck_tick,
1989            )
1990            .fuse();
1991            futures::pin_mut!(main_fut);
1992            futures::select!(
1993                () = main_fut => unreachable!("main fut should never exit"),
1994                () = wait => {}
1995            );
1996        }
1997
1998        assert_eq!(migration.persisted.rollback, Some(rollback::Persisted::Success));
1999    }
2000
2001    #[test_case[0; "first_failure"]]
2002    #[test_case[MAX_ROLLBACKS_PER_EPOCH-1; "last_failure"]]
2003    #[fuchsia::test]
2004    async fn migrate_to_ns3_fails(rollbacks_in_history: u8) {
2005        let start = Persisted {
2006            user: None,
2007            automated: Some(NetstackVersion::Netstack3),
2008            rollback: None,
2009            rollback_history: Some(RollbackHistory {
2010                epoch: CURRENT_EPOCH,
2011                count: rollbacks_in_history,
2012            }),
2013        };
2014        let target = Persisted {
2015            user: None,
2016            automated: Some(NetstackVersion::Netstack3),
2017            rollback: Some(rollback::Persisted::HealthcheckFailures(
2018                rollback::MAX_FAILED_HEALTHCHECKS,
2019            )),
2020            rollback_history: Some(RollbackHistory {
2021                epoch: CURRENT_EPOCH,
2022                count: rollbacks_in_history,
2023            }),
2024        };
2025
2026        let (persistence, mut wait) = AwaitPersisted::with_persisted(start, &target);
2027        let mut migration = Migration::new(
2028            persistence,
2029            FakeCollaborativeReboot::default(),
2030            FakeCrashReporter::default(),
2031        );
2032        // No service requests.
2033        let service_request_stream = futures::stream::pending();
2034        // A health check that always fails.
2035        let mock_healthcheck = rollback::testutil::MockHttpRequester(|| {
2036            Ok(fnet_http::Response { error: None, status_code: Some(500), ..Default::default() })
2037        });
2038        // Use a non-zero interval so that the Healthcheck code doesn't hog the scheduler.
2039        let healthcheck_tick = fuchsia_async::Interval::new(Duration::from_millis(1).into());
2040
2041        {
2042            let main_fut = main_inner(
2043                &mut migration,
2044                service_request_stream,
2045                mock_healthcheck,
2046                healthcheck_tick,
2047            )
2048            .fuse();
2049            futures::pin_mut!(main_fut);
2050            futures::select!(
2051                () = main_fut => unreachable!("main fut should never exit"),
2052                () = wait => {}
2053            );
2054        }
2055
2056        let failure_count = assert_matches!(
2057            migration.persisted.rollback,
2058            Some(rollback::Persisted::HealthcheckFailures(f)) => f
2059        );
2060        assert!(
2061            failure_count >= rollback::MAX_FAILED_HEALTHCHECKS,
2062            "failure_count={failure_count}"
2063        );
2064
2065        // Verify a failed migration schedules a collaborative reboot.
2066        let cr_req = &migration.collaborative_reboot.scheduler.req;
2067        assert_eq!(Ok(false), cr_req.as_ref().expect("there should be a request").is_closed());
2068
2069        // Verify crash reports were filed for each failed healthcheck above
2070        // the threshold.
2071        let expected_num_crash_reports =
2072            failure_count - HEALTHCHECK_FAILURE_THRESHOLD_FOR_CRASH_REPORTS + 1;
2073        assert_eq!(
2074            migration.crash_reporter.reports,
2075            vec![CrashReportReason::FailedHealthcheck; expected_num_crash_reports]
2076        )
2077    }
2078
2079    // Regression test for https://fxbug.dev/395913604.
2080    //
2081    // The original bug would reset the number of failed healthchecks in
2082    // persistence from `rollback::MAX_FAILED_HEALTHCHECKS` to 0, if an inbound
2083    // service request was received.
2084    //
2085    // Verify this is no longer the case by triggering a failed healthcheck,
2086    // pushing the total to `rollback::MAX_FAILED_HEALTHCHECKS`, then sending
2087    // a `fuchsia.net.migration/State.GetNetstackVersion` request.
2088    #[fuchsia::test]
2089    async fn migrate_to_ns3_rollback_regression_test() {
2090        let start = Persisted {
2091            user: None,
2092            automated: Some(NetstackVersion::Netstack3),
2093            rollback: Some(rollback::Persisted::HealthcheckFailures(
2094                rollback::MAX_FAILED_HEALTHCHECKS - 1,
2095            )),
2096            rollback_history: None,
2097        };
2098        let target = Persisted {
2099            user: None,
2100            automated: Some(NetstackVersion::Netstack3),
2101            rollback: Some(rollback::Persisted::HealthcheckFailures(0)),
2102            rollback_history: Some(RollbackHistory { epoch: CURRENT_EPOCH, count: 0 }),
2103        };
2104
2105        let (persistence, wait) = AwaitPersisted::with_persisted(start, &target);
2106        let mut migration = Migration::new(
2107            persistence,
2108            FakeCollaborativeReboot::default(),
2109            FakeCrashReporter::default(),
2110        );
2111        // A health check that always fails.
2112        let mock_healthcheck = rollback::testutil::MockHttpRequester(|| {
2113            Ok(fnet_http::Response { error: None, status_code: Some(500), ..Default::default() })
2114        });
2115        let healthcheck_tick = futures::stream::once(futures::future::ready(()));
2116        // Send "get" requests, to trigger the bug.
2117        let (client, server) =
2118            fidl::endpoints::create_proxy_and_stream::<fnet_migration::StateMarker>();
2119        let service_request_stream = server.map(|r| r.map(ServiceRequest::State));
2120        let client_fut = async move {
2121            // Send multiple get requests, to ensure that at least one would occur after the failed
2122            // healthcheck.
2123            let mut stream = fuchsia_async::Interval::new(Duration::from_millis(1).into());
2124            while let Some(()) = stream.next().await {
2125                let _ =
2126                    client.get_netstack_version().await.expect("failed to get netstack version");
2127            }
2128        }
2129        .fuse();
2130
2131        // If wait were to fire, the bug has occurred. Instead expect a timeout.
2132        // Use 1 second to keep the test runtime short; If CQ has a hiccup and
2133        // pauses execution, we'd see a false negative, which isn't a big deal.
2134        let wait_fut = wait
2135            .map(|()| panic!("unexpectedly observed the persisted healthcheck failures reset to 0"))
2136            .on_timeout(Duration::from_secs(1), || ())
2137            .fuse();
2138
2139        {
2140            let main_fut = main_inner(
2141                &mut migration,
2142                service_request_stream,
2143                mock_healthcheck,
2144                healthcheck_tick,
2145            )
2146            .fuse();
2147            futures::pin_mut!(main_fut);
2148            futures::pin_mut!(client_fut);
2149            futures::pin_mut!(wait_fut);
2150            futures::select!(
2151                () = main_fut => unreachable!("main fut should never exit"),
2152                () = client_fut => unreachable!("client fut should never exit"),
2153                () = wait_fut => {}
2154            );
2155        }
2156    }
2157
2158    #[fuchsia::test]
2159    async fn startup_after_first_rollback() {
2160        let start = Persisted {
2161            user: None,
2162            automated: Some(NetstackVersion::Netstack3),
2163            rollback: Some(rollback::Persisted::HealthcheckFailures(
2164                rollback::MAX_FAILED_HEALTHCHECKS,
2165            )),
2166            rollback_history: None,
2167        };
2168        let target = Persisted {
2169            user: None,
2170            automated: Some(NetstackVersion::Netstack3),
2171            rollback: Some(rollback::Persisted::HealthcheckFailures(0)),
2172            rollback_history: Some(RollbackHistory { epoch: CURRENT_EPOCH, count: 1 }),
2173        };
2174
2175        let (persistence, persistence_wait) = AwaitPersisted::with_persisted(start, &target);
2176        let (crash_reporter, crash_reporter_wait) =
2177            AwaitCrashReports::new(vec![CrashReportReason::RolledBack]);
2178        let mut migration =
2179            Migration::new(persistence, FakeCollaborativeReboot::default(), crash_reporter);
2180        // No service requests.
2181        let service_request_stream = futures::stream::pending();
2182        // No Healthchecks.
2183        let healthcheck_tick = futures::stream::pending();
2184        let mock_healthcheck = rollback::testutil::MockHttpRequester(|| {
2185            panic!("This test isn't exercising healthchecks");
2186        });
2187        {
2188            let main_fut = main_inner(
2189                &mut migration,
2190                service_request_stream,
2191                mock_healthcheck,
2192                healthcheck_tick,
2193            )
2194            .fuse();
2195            futures::pin_mut!(main_fut);
2196            let mut wait = futures::future::join(persistence_wait, crash_reporter_wait);
2197            futures::select!(
2198                () = main_fut => unreachable!("main fut should never exit"),
2199                ((), ()) = wait => {}
2200            );
2201        }
2202
2203        // The number of Healthcheck Failures should have been reset in
2204        // preparation for the next migration attempt.
2205        assert_eq!(migration.persisted.rollback, Some(rollback::Persisted::HealthcheckFailures(0)));
2206        // The rollback history should have been updated to reflect this
2207        // failure.
2208        assert_eq!(
2209            migration.persisted.rollback_history,
2210            Some(RollbackHistory { epoch: CURRENT_EPOCH, count: 1 })
2211        );
2212        // A Collaborative Reboot request should be scheduled, so that we can
2213        // attempt the migration again.
2214        let cr_req = &migration.collaborative_reboot.scheduler.req;
2215        assert_eq!(Ok(false), cr_req.as_ref().expect("there should be a request").is_closed());
2216        // The current_boot should reflect that we've rolled back to NS2
2217        // temporarily.
2218        assert_eq!(migration.current_boot, RollbackNetstackVersion::ForceNetstack2);
2219        // A rollback crash report should have been filed.
2220        assert_eq!(migration.crash_reporter.reports, vec![CrashReportReason::RolledBack],);
2221    }
2222
2223    #[fuchsia::test]
2224    async fn startup_after_final_rollback() {
2225        let start = Persisted {
2226            user: None,
2227            automated: Some(NetstackVersion::Netstack3),
2228            rollback: Some(rollback::Persisted::HealthcheckFailures(
2229                rollback::MAX_FAILED_HEALTHCHECKS,
2230            )),
2231            rollback_history: Some(RollbackHistory {
2232                epoch: CURRENT_EPOCH,
2233                count: MAX_ROLLBACKS_PER_EPOCH - 1,
2234            }),
2235        };
2236        let target = Persisted {
2237            user: None,
2238            automated: Some(NetstackVersion::Netstack3),
2239            rollback: Some(rollback::Persisted::HealthcheckFailures(0)),
2240            rollback_history: Some(RollbackHistory {
2241                epoch: CURRENT_EPOCH,
2242                count: MAX_ROLLBACKS_PER_EPOCH,
2243            }),
2244        };
2245
2246        let (persistence, persistence_wait) = AwaitPersisted::with_persisted(start, &target);
2247        let (crash_reporter, crash_reporter_wait) =
2248            AwaitCrashReports::new(vec![CrashReportReason::RolledBack]);
2249        let mut migration =
2250            Migration::new(persistence, FakeCollaborativeReboot::default(), crash_reporter);
2251        // No service requests.
2252        let service_request_stream = futures::stream::pending();
2253        // No Healthchecks.
2254        let healthcheck_tick = futures::stream::pending();
2255        let mock_healthcheck = rollback::testutil::MockHttpRequester(|| {
2256            panic!("This test isn't exercising healthchecks");
2257        });
2258        {
2259            let main_fut = main_inner(
2260                &mut migration,
2261                service_request_stream,
2262                mock_healthcheck,
2263                healthcheck_tick,
2264            )
2265            .fuse();
2266            futures::pin_mut!(main_fut);
2267            let mut wait = futures::future::join(persistence_wait, crash_reporter_wait);
2268            futures::select!(
2269                () = main_fut => unreachable!("main fut should never exit"),
2270                ((), ()) = wait => {}
2271            );
2272        }
2273
2274        // The number of Healthcheck Failures should have been reset in
2275        // preparation for the next migration attempt (in a later epoch).
2276        assert_eq!(migration.persisted.rollback, Some(rollback::Persisted::HealthcheckFailures(0)));
2277        // The rollback history should have been updated to reflect this
2278        // failure.
2279        assert_eq!(
2280            migration.persisted.rollback_history,
2281            Some(RollbackHistory { epoch: CURRENT_EPOCH, count: MAX_ROLLBACKS_PER_EPOCH })
2282        );
2283        // A collaborative Reboot request should NOT be scheduled. We're not
2284        // going to re-attempt the migration until the epoch advances.
2285        assert_eq!(migration.collaborative_reboot.scheduler.req, None);
2286        // The current_boot should reflect that we've rolled back to NS2
2287        // indefinitely.
2288        assert_eq!(migration.current_boot, RollbackNetstackVersion::ForceNetstack2Indefinitely);
2289        // A rollback crash report should have been filed.
2290        assert_eq!(migration.crash_reporter.reports, vec![CrashReportReason::RolledBack]);
2291    }
2292
2293    // This test simulates the device starting up while we were already
2294    // in the `ForceNetstack2Indefinitely` state. E.g. the current boot isn't
2295    // a rollback, rather it's continuing to use Netstack2, as established in
2296    // a previous boot.
2297    #[fuchsia::test]
2298    async fn startup_already_with_max_rollbacks_per_epoch() {
2299        let persistence = InMemory::with_persisted(Persisted {
2300            user: None,
2301            automated: Some(NetstackVersion::Netstack3),
2302            rollback: Some(rollback::Persisted::HealthcheckFailures(0)),
2303            rollback_history: Some(RollbackHistory {
2304                epoch: CURRENT_EPOCH,
2305                count: MAX_ROLLBACKS_PER_EPOCH,
2306            }),
2307        });
2308        // No crash report should get filed because this boot isn't a rollback.
2309        let crash_reporter = NoCrashReports;
2310        let mut migration =
2311            Migration::new(persistence, FakeCollaborativeReboot::default(), crash_reporter);
2312        // No service requests.
2313        let service_request_stream = futures::stream::pending();
2314        // No Healthchecks.
2315        let healthcheck_tick = futures::stream::pending();
2316        let mock_healthcheck = rollback::testutil::MockHttpRequester(|| {
2317            panic!("This test isn't exercising healthchecks");
2318        });
2319
2320        // Drive the main future for a fixed timeout. It will never exit, but
2321        // we want to give the migration machinery enough time to apply any
2322        // changes it wants to make (it shouldn't make any).
2323        main_inner(&mut migration, service_request_stream, mock_healthcheck, healthcheck_tick)
2324            .map(Err)
2325            .on_timeout(Duration::from_secs(1), || Ok(()))
2326            .await
2327            .expect("main fut should never exit");
2328
2329        // None of the persisted state should have changed.
2330        assert_eq!(migration.persisted.rollback, Some(rollback::Persisted::HealthcheckFailures(0)));
2331        assert_eq!(
2332            migration.persisted.rollback_history,
2333            Some(RollbackHistory { epoch: CURRENT_EPOCH, count: MAX_ROLLBACKS_PER_EPOCH })
2334        );
2335        // A collaborative Reboot request should NOT be scheduled.
2336        assert_eq!(migration.collaborative_reboot.scheduler.req, None);
2337        // The current_boot should reflect that we've rolled back to NS2
2338        // indefinitely.
2339        assert_eq!(migration.current_boot, RollbackNetstackVersion::ForceNetstack2Indefinitely);
2340    }
2341
2342    #[fuchsia::test]
2343    async fn startup_with_new_epoch() {
2344        let start = Persisted {
2345            user: None,
2346            automated: Some(NetstackVersion::Netstack3),
2347            rollback: Some(rollback::Persisted::HealthcheckFailures(0)),
2348            rollback_history: Some(RollbackHistory {
2349                epoch: CURRENT_EPOCH - 1,
2350                count: MAX_ROLLBACKS_PER_EPOCH,
2351            }),
2352        };
2353        let target = Persisted {
2354            user: None,
2355            automated: Some(NetstackVersion::Netstack3),
2356            rollback: Some(rollback::Persisted::HealthcheckFailures(1)),
2357            rollback_history: Some(RollbackHistory { epoch: CURRENT_EPOCH, count: 0 }),
2358        };
2359
2360        let (persistence, mut wait) = AwaitPersisted::with_persisted(start, &target);
2361        let mut migration =
2362            Migration::new(persistence, FakeCollaborativeReboot::default(), NoCrashReports);
2363        // No service requests.
2364        let service_request_stream = futures::stream::pending();
2365        // No Healthchecks.
2366        let healthcheck_tick = futures::stream::pending();
2367        let mock_healthcheck = rollback::testutil::MockHttpRequester(|| {
2368            panic!("This test isn't exercising healthchecks");
2369        });
2370        {
2371            let main_fut = main_inner(
2372                &mut migration,
2373                service_request_stream,
2374                mock_healthcheck,
2375                healthcheck_tick,
2376            )
2377            .fuse();
2378            futures::pin_mut!(main_fut);
2379            futures::select!(
2380                () = main_fut => unreachable!("main fut should never exit"),
2381                () = wait => {}
2382            );
2383        }
2384
2385        // The number of Healthcheck Failures should have been incremented,
2386        // indicating the rollback mechanism is trying to migrate to NS3.
2387        assert_eq!(migration.persisted.rollback, Some(rollback::Persisted::HealthcheckFailures(1)));
2388        // The rollback history should have been updated to reflect the new
2389        // epoch.
2390        assert_eq!(
2391            migration.persisted.rollback_history,
2392            Some(RollbackHistory { epoch: CURRENT_EPOCH, count: 0 })
2393        );
2394        // A collaborative Reboot request should NOT be scheduled. We're already
2395        // running Nestack3!
2396        assert_eq!(migration.collaborative_reboot.scheduler.req, None);
2397        // The current_boot should reflect that we're retrying migrations to NS3.
2398        assert_eq!(migration.current_boot, RollbackNetstackVersion::Netstack3);
2399    }
2400
2401    #[fuchsia::test]
2402    async fn noop_rollback_state_update() {
2403        let rollback_state = rollback::Persisted::HealthcheckFailures(
2404            HEALTHCHECK_FAILURE_THRESHOLD_FOR_CRASH_REPORTS,
2405        );
2406        let mut m = Migration::new(
2407            InMemory::with_persisted(Persisted {
2408                user: None,
2409                automated: Some(NetstackVersion::Netstack3),
2410                rollback: Some(rollback_state),
2411                rollback_history: None,
2412            }),
2413            // Expect not to see a Collaborative Reboot or a Crash Report.
2414            NoCollaborativeReboot,
2415            NoCrashReports,
2416        );
2417        m.update_rollback_state(rollback_state).await;
2418    }
2419}