omaha_client/
state_machine.rs

1// Copyright 2019 The Fuchsia Authors
2//
3// Licensed under a BSD-style license <LICENSE-BSD>, Apache License, Version 2.0
4// <LICENSE-APACHE or https://www.apache.org/licenses/LICENSE-2.0>, or the MIT
5// license <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your option.
6// This file may not be copied, modified, or distributed except according to
7// those terms.
8
9use crate::{
10    app_set::{AppSet, AppSetExt as _},
11    async_generator,
12    common::{App, CheckOptions, CheckTiming},
13    configuration::Config,
14    cup_ecdsa::{CupDecorationError, CupVerificationError, Cupv2Handler, RequestMetadata},
15    http_request::{self, HttpRequest},
16    installer::{AppInstallResult, Installer, Plan},
17    metrics::{ClockType, Metrics, MetricsReporter, UpdateCheckFailureReason},
18    policy::{CheckDecision, PolicyEngine, UpdateDecision},
19    protocol::{
20        self,
21        request::{Event, EventErrorCode, EventResult, EventType, InstallSource, GUID},
22        response::{parse_json_response, OmahaStatus, Response, UpdateCheck},
23    },
24    request_builder::{self, RequestBuilder, RequestParams},
25    storage::{Storage, StorageExt},
26    time::{ComplexTime, PartialComplexTime, TimeSource, Timer},
27};
28
29use anyhow::anyhow;
30use futures::{
31    channel::{mpsc, oneshot},
32    future::{self, BoxFuture, Fuse},
33    lock::Mutex,
34    prelude::*,
35    select,
36};
37use http::{response::Parts, Response as HttpResponse};
38use p256::ecdsa::DerSignature;
39use std::{
40    cmp::min,
41    collections::HashMap,
42    convert::TryInto,
43    rc::Rc,
44    str::Utf8Error,
45    time::{Duration, Instant, SystemTime},
46};
47use thiserror::Error;
48use tracing::{error, info, warn};
49
50pub mod update_check;
51
52mod builder;
53pub use builder::StateMachineBuilder;
54
55mod observer;
56use observer::StateMachineProgressObserver;
57pub use observer::{InstallProgress, StateMachineEvent};
58
59const INSTALL_PLAN_ID: &str = "install_plan_id";
60const UPDATE_FIRST_SEEN_TIME: &str = "update_first_seen_time";
61const UPDATE_FINISH_TIME: &str = "update_finish_time";
62const TARGET_VERSION: &str = "target_version";
63const CONSECUTIVE_FAILED_INSTALL_ATTEMPTS: &str = "consecutive_failed_install_attempts";
64// How long do we wait after not allowed to reboot to check again.
65const CHECK_REBOOT_ALLOWED_INTERVAL: Duration = Duration::from_secs(30 * 60);
66// This header contains the number of seconds client must not contact server again.
67const X_RETRY_AFTER: &str = "X-Retry-After";
68// How many requests we will make to Omaha before giving up.
69const MAX_OMAHA_REQUEST_ATTEMPTS: u64 = 3;
70
71/// This is the core state machine for a client's update check.  It is instantiated and used to
72/// perform update checks over time or to perform a single update check process.
73#[derive(Debug)]
74pub struct StateMachine<PE, HR, IN, TM, MR, ST, AS, CH>
75where
76    PE: PolicyEngine,
77    HR: HttpRequest,
78    IN: Installer,
79    TM: Timer,
80    MR: MetricsReporter,
81    ST: Storage,
82    AS: AppSet,
83{
84    /// The immutable configuration of the client itself.
85    config: Config,
86
87    policy_engine: PE,
88
89    http: HR,
90
91    installer: IN,
92
93    timer: TM,
94
95    time_source: PE::TimeSource,
96
97    metrics_reporter: MR,
98
99    storage_ref: Rc<Mutex<ST>>,
100
101    /// Context for update check.
102    context: update_check::Context,
103
104    /// The list of apps used for update check.
105    /// When locking both storage and app_set, make sure to always lock storage first.
106    app_set: Rc<Mutex<AS>>,
107
108    cup_handler: Option<CH>,
109}
110
111#[derive(Copy, Clone, Debug, Eq, PartialEq)]
112pub enum State {
113    Idle,
114    CheckingForUpdates(InstallSource),
115    ErrorCheckingForUpdate,
116    NoUpdateAvailable,
117    InstallationDeferredByPolicy,
118    InstallingUpdate,
119    WaitingForReboot,
120    InstallationError,
121}
122
123/// This is the set of errors that can occur when making a request to Omaha.  This is an internal
124/// collection of error types.
125#[derive(Error, Debug)]
126pub enum OmahaRequestError {
127    #[error("Unexpected JSON error constructing update check")]
128    Json(#[from] serde_json::Error),
129
130    #[error("Error building update check HTTP request")]
131    HttpBuilder(#[from] http::Error),
132
133    #[error("Error decorating outgoing request with CUPv2 parameters")]
134    CupDecoration(#[from] CupDecorationError),
135
136    #[error("Error validating incoming response with CUPv2 protocol")]
137    CupValidation(#[from] CupVerificationError),
138
139    // TODO: This still contains hyper user error which should be split out.
140    #[error("HTTP transport error performing update check")]
141    HttpTransport(#[from] http_request::Error),
142
143    #[error("HTTP error performing update check: {0}")]
144    HttpStatus(hyper::StatusCode),
145}
146
147impl From<request_builder::Error> for OmahaRequestError {
148    fn from(err: request_builder::Error) -> Self {
149        match err {
150            request_builder::Error::Json(e) => OmahaRequestError::Json(e),
151            request_builder::Error::Http(e) => OmahaRequestError::HttpBuilder(e),
152            request_builder::Error::Cup(e) => OmahaRequestError::CupDecoration(e),
153        }
154    }
155}
156
157impl From<http::StatusCode> for OmahaRequestError {
158    fn from(sc: http::StatusCode) -> Self {
159        OmahaRequestError::HttpStatus(sc)
160    }
161}
162
163/// This is the set of errors that can occur when parsing the response body from Omaha.  This is an
164/// internal collection of error types.
165#[derive(Error, Debug)]
166pub enum ResponseParseError {
167    #[error("Response was not valid UTF-8")]
168    Utf8(#[from] Utf8Error),
169
170    #[error("Unexpected JSON error parsing update check response")]
171    Json(#[from] serde_json::Error),
172}
173
174#[derive(Error, Debug)]
175pub enum UpdateCheckError {
176    #[error("Error checking with Omaha")]
177    OmahaRequest(#[from] OmahaRequestError),
178
179    #[error("Error parsing Omaha response")]
180    ResponseParser(#[from] ResponseParseError),
181
182    #[error("Unable to create an install plan")]
183    InstallPlan(#[source] anyhow::Error),
184}
185
186/// A handle to interact with the state machine running in another task.
187#[derive(Clone)]
188pub struct ControlHandle(mpsc::Sender<ControlRequest>);
189
190/// Error indicating that the state machine task no longer exists.
191#[derive(Debug, Clone, Error, PartialEq, Eq)]
192#[error("state machine dropped before all its control handles")]
193pub struct StateMachineGone;
194
195impl From<mpsc::SendError> for StateMachineGone {
196    fn from(_: mpsc::SendError) -> Self {
197        StateMachineGone
198    }
199}
200
201impl From<oneshot::Canceled> for StateMachineGone {
202    fn from(_: oneshot::Canceled) -> Self {
203        StateMachineGone
204    }
205}
206
207enum ControlRequest {
208    StartUpdateCheck {
209        options: CheckOptions,
210        responder: oneshot::Sender<StartUpdateCheckResponse>,
211    },
212}
213
214/// Responses to a request to start an update check now.
215#[derive(Debug, Clone, PartialEq, Eq)]
216pub enum StartUpdateCheckResponse {
217    /// The state machine was idle and the request triggered an update check.
218    Started,
219
220    /// The state machine was already processing an update check and ignored this request and
221    /// options.
222    AlreadyRunning,
223
224    /// The update check was throttled by policy.
225    Throttled,
226}
227
228impl ControlHandle {
229    /// Ask the state machine to start an update check with the provided options, returning whether
230    /// or not the state machine started a check or was already running one.
231    pub async fn start_update_check(
232        &mut self,
233        options: CheckOptions,
234    ) -> Result<StartUpdateCheckResponse, StateMachineGone> {
235        let (responder, receive_response) = oneshot::channel();
236        self.0
237            .send(ControlRequest::StartUpdateCheck { options, responder })
238            .await?;
239        Ok(receive_response.await?)
240    }
241}
242
243#[derive(Debug)]
244enum RebootAfterUpdate<T> {
245    Needed(T),
246    NotNeeded,
247}
248
249impl<PE, HR, IN, TM, MR, ST, AS, IR, PL, CH> StateMachine<PE, HR, IN, TM, MR, ST, AS, CH>
250where
251    PE: PolicyEngine<InstallResult = IR, InstallPlan = PL>,
252    HR: HttpRequest,
253    IN: Installer<InstallResult = IR, InstallPlan = PL>,
254    TM: Timer,
255    MR: MetricsReporter,
256    ST: Storage,
257    AS: AppSet,
258    CH: Cupv2Handler,
259    IR: 'static + Send,
260    PL: Plan,
261{
262    /// Ask policy engine for the next update check time and update the context and yield event.
263    async fn update_next_update_time(
264        &mut self,
265        co: &mut async_generator::Yield<StateMachineEvent>,
266    ) -> CheckTiming {
267        let apps = self.app_set.lock().await.get_apps();
268        let timing = self
269            .policy_engine
270            .compute_next_update_time(&apps, &self.context.schedule, &self.context.state)
271            .await;
272        self.context.schedule.next_update_time = Some(timing);
273
274        co.yield_(StateMachineEvent::ScheduleChange(self.context.schedule))
275            .await;
276        info!("Calculated check timing: {}", timing);
277        timing
278    }
279
280    /// Return a future that will wait until the given check timing.
281    async fn make_wait_to_next_check(
282        &mut self,
283        check_timing: CheckTiming,
284    ) -> Fuse<BoxFuture<'static, ()>> {
285        if let Some(minimum_wait) = check_timing.minimum_wait {
286            // If there's a minimum wait, also wait at least that long, by joining the two
287            // timers so that both need to be true (in case `next_update_time` turns out to be
288            // very close to now)
289            future::join(
290                self.timer.wait_for(minimum_wait),
291                self.timer.wait_until(check_timing.time),
292            )
293            .map(|_| ())
294            .boxed()
295            .fuse()
296        } else {
297            // Otherwise just setup the timer for the waiting until the next time.  This is a
298            // wait until either the monotonic or wall times have passed.
299            self.timer.wait_until(check_timing.time).fuse()
300        }
301    }
302
303    async fn run(
304        mut self,
305        mut control: mpsc::Receiver<ControlRequest>,
306        mut co: async_generator::Yield<StateMachineEvent>,
307    ) {
308        {
309            let app_set = self.app_set.lock().await;
310            if !app_set.all_valid() {
311                error!(
312                    "App set not valid, not starting state machine: {:#?}",
313                    app_set.get_apps()
314                );
315                return;
316            }
317        }
318
319        let state_machine_start_monotonic_time = self.time_source.now_in_monotonic();
320
321        let mut should_report_waited_for_reboot_duration = false;
322
323        let update_finish_time = {
324            let storage = self.storage_ref.lock().await;
325            let update_finish_time = storage.get_time(UPDATE_FINISH_TIME).await;
326            if update_finish_time.is_some() {
327                if let Some(target_version) = storage.get_string(TARGET_VERSION).await {
328                    if target_version == self.config.os.version {
329                        should_report_waited_for_reboot_duration = true;
330                    }
331                }
332            }
333            update_finish_time
334        };
335
336        loop {
337            info!("Initial context: {:?}", self.context);
338
339            if should_report_waited_for_reboot_duration {
340                match self.report_waited_for_reboot_duration(
341                    update_finish_time.unwrap(),
342                    state_machine_start_monotonic_time,
343                    self.time_source.now(),
344                ) {
345                    Ok(()) => {
346                        // If the report was successful, don't try again on the next loop.
347                        should_report_waited_for_reboot_duration = false;
348
349                        let mut storage = self.storage_ref.lock().await;
350                        storage.remove_or_log(UPDATE_FINISH_TIME).await;
351                        storage.remove_or_log(TARGET_VERSION).await;
352                        storage.commit_or_log().await;
353                    }
354                    Err(e) => {
355                        warn!(
356                            "Couldn't report wait for reboot duration: {:#}, will try again",
357                            e
358                        );
359                    }
360                }
361            }
362
363            let (mut options, responder) = {
364                let check_timing = self.update_next_update_time(&mut co).await;
365                let mut wait_to_next_check = self.make_wait_to_next_check(check_timing).await;
366
367                // Wait for either the next check time or a request to start an update check.  Use
368                // the default check options with the timed check, or those sent with a request.
369                select! {
370                    () = wait_to_next_check => (CheckOptions::default(), None),
371                    ControlRequest::StartUpdateCheck{options, responder} = control.select_next_some() => {
372                        (options, Some(responder))
373                    }
374                }
375            };
376
377            let reboot_after_update = {
378                let apps = self.app_set.lock().await.get_apps();
379                info!(
380                    "Checking to see if an update check is allowed at this time for {:?}",
381                    apps
382                );
383                let decision = self
384                    .policy_engine
385                    .update_check_allowed(
386                        &apps,
387                        &self.context.schedule,
388                        &self.context.state,
389                        &options,
390                    )
391                    .await;
392
393                info!("The update check decision is: {:?}", decision);
394
395                let request_params = match decision {
396                    // Positive results, will continue with the update check process
397                    CheckDecision::Ok(rp) | CheckDecision::OkUpdateDeferred(rp) => rp,
398
399                    // Negative results, exit early
400                    CheckDecision::TooSoon
401                    | CheckDecision::ThrottledByPolicy
402                    | CheckDecision::DeniedByPolicy => {
403                        info!("The update check is not allowed at this time.");
404                        if let Some(responder) = responder {
405                            let _ = responder.send(StartUpdateCheckResponse::Throttled);
406                        }
407                        continue;
408                    }
409                };
410                if let Some(responder) = responder {
411                    let _ = responder.send(StartUpdateCheckResponse::Started);
412                }
413
414                // "start" the update check itself (well, create the future that is the update check)
415                let update_check = self.start_update_check(request_params, &mut co).fuse();
416                futures::pin_mut!(update_check);
417
418                // Wait for the update check to complete, handling any control requests that come in
419                // during the check.
420                loop {
421                    select! {
422                        update_check_result = update_check => break update_check_result,
423                        ControlRequest::StartUpdateCheck{
424                            options: new_options,
425                            responder
426                        } = control.select_next_some() => {
427                            if new_options.source == InstallSource::OnDemand {
428                                info!("Got on demand update check request, ensuring ongoing check is on demand");
429                                // TODO(63180): merge CheckOptions in Policy, not here.
430                                options.source = InstallSource::OnDemand;
431                            }
432
433                            let _ = responder.send(StartUpdateCheckResponse::AlreadyRunning);
434                        }
435                    }
436                }
437            };
438
439            if let RebootAfterUpdate::Needed(install_result) = reboot_after_update {
440                Self::yield_state(State::WaitingForReboot, &mut co).await;
441                self.wait_for_reboot(options, &mut control, install_result, &mut co)
442                    .await;
443            }
444
445            Self::yield_state(State::Idle, &mut co).await;
446        }
447    }
448
449    async fn wait_for_reboot(
450        &mut self,
451        mut options: CheckOptions,
452        control: &mut mpsc::Receiver<ControlRequest>,
453        install_result: IN::InstallResult,
454        co: &mut async_generator::Yield<StateMachineEvent>,
455    ) {
456        if !self
457            .policy_engine
458            .reboot_allowed(&options, &install_result)
459            .await
460        {
461            let wait_to_see_if_reboot_allowed =
462                self.timer.wait_for(CHECK_REBOOT_ALLOWED_INTERVAL).fuse();
463            futures::pin_mut!(wait_to_see_if_reboot_allowed);
464
465            let check_timing = self.update_next_update_time(co).await;
466            let wait_to_next_ping = self.make_wait_to_next_check(check_timing).await;
467            futures::pin_mut!(wait_to_next_ping);
468
469            loop {
470                // Wait for either the next time to check if reboot allowed or the next
471                // ping time or a request to start an update check.
472
473                select! {
474                    () = wait_to_see_if_reboot_allowed => {
475                        if self.policy_engine.reboot_allowed(&options, &install_result).await {
476                            break;
477                        }
478                        info!("Reboot not allowed at the moment, will try again in 30 minutes...");
479                        wait_to_see_if_reboot_allowed.set(
480                            self.timer.wait_for(CHECK_REBOOT_ALLOWED_INTERVAL).fuse()
481                        );
482                    },
483                    () = wait_to_next_ping => {
484                        self.ping_omaha(co).await;
485                        let check_timing = self.update_next_update_time(co).await;
486                        wait_to_next_ping.set(self.make_wait_to_next_check(check_timing).await);
487                    },
488                    ControlRequest::StartUpdateCheck{
489                        options: new_options,
490                        responder
491                    } = control.select_next_some() => {
492                        let _ = responder.send(StartUpdateCheckResponse::AlreadyRunning);
493                        if new_options.source == InstallSource::OnDemand {
494                            info!("Waiting for reboot, but ensuring that InstallSource is OnDemand");
495                            options.source = InstallSource::OnDemand;
496
497                            if self.policy_engine.reboot_allowed(&options, &install_result).await {
498                                info!("Upgraded update check request to on demand, policy allowed reboot");
499                                break;
500                            }
501                        };
502                    }
503                }
504            }
505        }
506        info!("Rebooting the system at the end of a successful update");
507        if let Err(e) = self.installer.perform_reboot().await {
508            error!("Unable to reboot the system: {}", e);
509        }
510    }
511
512    /// Report the duration the previous boot waited to reboot based on the update finish time in
513    /// storage, and the current time. Does not report a metric if there's an inconsistency in the
514    /// times stored or computed, i.e. if the reboot time is later than the current time.
515    /// Returns an error if time seems incorrect, e.g. update_finish_time is in the future.
516    fn report_waited_for_reboot_duration(
517        &mut self,
518        update_finish_time: SystemTime,
519        state_machine_start_monotonic_time: Instant,
520        now: ComplexTime,
521    ) -> Result<(), anyhow::Error> {
522        // If `update_finish_time` is in the future we don't have correct time, try again
523        // on the next loop.
524        let update_finish_time_to_now =
525            now.wall_duration_since(update_finish_time).map_err(|e| {
526                anyhow!(
527                    "Update finish time later than now, can't report waited for reboot duration,
528                    update finish time: {:?}, now: {:?}, error: {:?}",
529                    update_finish_time,
530                    now,
531                    e,
532                )
533            })?;
534
535        // It might take a while for us to get here, but we only want to report the
536        // time from update finish to state machine start after reboot, so we subtract
537        // the duration since then using monotonic time.
538
539        // We only want to report this metric if we can actually compute it.
540        // If for whatever reason the clock was wrong on the previous boot, or monotonic
541        // time is going backwards, better not to report this metric than to report an
542        // incorrect default value.
543        let state_machine_start_to_now = now
544            .mono
545            .checked_duration_since(state_machine_start_monotonic_time)
546            .ok_or_else(|| {
547                error!("Monotonic time appears to have gone backwards");
548                anyhow!(
549                    "State machine start later than now, can't report waited for reboot duration. \
550                    State machine start: {:?}, now: {:?}",
551                    state_machine_start_monotonic_time,
552                    now.mono,
553                )
554            })?;
555
556        let waited_for_reboot_duration = update_finish_time_to_now
557            .checked_sub(state_machine_start_to_now)
558            .ok_or_else(|| {
559                anyhow!(
560                    "Can't report waiting for reboot duration, update finish time to now smaller \
561                    than state machine start to now. Update finish time to now: {:?}, state \
562                    machine start to now: {:?}",
563                    update_finish_time_to_now,
564                    state_machine_start_to_now,
565                )
566            })?;
567
568        info!(
569            "Waited {} seconds for reboot.",
570            waited_for_reboot_duration.as_secs()
571        );
572        self.report_metrics(Metrics::WaitedForRebootDuration(waited_for_reboot_duration));
573        Ok(())
574    }
575
576    /// Report update check interval based on the last check time stored in storage.
577    /// It will also persist the new last check time to storage.
578    async fn report_check_interval(&mut self, install_source: InstallSource) {
579        let now = self.time_source.now();
580
581        match self.context.schedule.last_update_check_time {
582            // This is our first run; report the interval between that time and now,
583            // and update the context with the complex time.
584            Some(PartialComplexTime::Wall(t)) => match now.wall_duration_since(t) {
585                Ok(interval) => self.report_metrics(Metrics::UpdateCheckInterval {
586                    interval,
587                    clock: ClockType::Wall,
588                    install_source,
589                }),
590                Err(e) => warn!("Last check time is in the future: {}", e),
591            },
592
593            // We've reported an update check before, or we at least have a
594            // PartialComplexTime with a monotonic component. Report our interval
595            // between these Instants. (N.B. strictly speaking, we should only
596            // ever have a PCT::Complex here.)
597            Some(PartialComplexTime::Complex(t)) => match now.mono.checked_duration_since(t.mono) {
598                Some(interval) => self.report_metrics(Metrics::UpdateCheckInterval {
599                    interval,
600                    clock: ClockType::Monotonic,
601                    install_source,
602                }),
603                None => error!("Monotonic time in the past"),
604            },
605
606            // No last check time in storage, and no big deal. We'll continue from
607            // monotonic time from now on. This is the only place other than loading
608            // context from storage where the time can be set, so it's either unset
609            // because no storage, or a complex time. No need to match
610            // Some(PartialComplexTime::Monotonic)
611            _ => {}
612        }
613
614        self.context.schedule.last_update_check_time = now.into();
615    }
616
617    /// Perform update check and handle the result, including updating the update check context
618    /// and cohort.
619    /// Returns whether reboot is needed after the update.
620    async fn start_update_check(
621        &mut self,
622        request_params: RequestParams,
623        co: &mut async_generator::Yield<StateMachineEvent>,
624    ) -> RebootAfterUpdate<IN::InstallResult> {
625        let apps = self.app_set.lock().await.get_apps();
626        let result = self.perform_update_check(request_params, apps, co).await;
627
628        let (result, reboot_after_update) = match result {
629            Ok((result, reboot_after_update)) => {
630                info!("Update check result: {:?}", result);
631                // Update check succeeded, update |last_update_time|.
632                self.context.schedule.last_update_time = Some(self.time_source.now().into());
633
634                // Determine if any app failed to install, or we had a successful update.
635                let install_success =
636                    result.app_responses.iter().fold(None, |result, app| {
637                        match (result, &app.result) {
638                            (_, update_check::Action::InstallPlanExecutionError) => Some(false),
639                            (None, update_check::Action::Updated) => Some(true),
640                            (result, _) => result,
641                        }
642                    });
643
644                // Update check succeeded, reset |consecutive_failed_update_checks| to 0 and
645                // report metrics.
646                self.report_attempts_to_successful_check(true).await;
647
648                self.app_set
649                    .lock()
650                    .await
651                    .update_from_omaha(&result.app_responses);
652
653                // Only report |attempts_to_successful_install| if we get an error trying to
654                // install, or we succeed to install an update without error.
655                if let Some(success) = install_success {
656                    self.report_attempts_to_successful_install(success).await;
657                }
658
659                (Ok(result), reboot_after_update)
660                // TODO: update consecutive_proxied_requests
661            }
662            Err(error) => {
663                error!("Update check failed: {:?}", error);
664
665                let failure_reason = match &error {
666                    UpdateCheckError::ResponseParser(_) | UpdateCheckError::InstallPlan(_) => {
667                        // We talked to Omaha, update |last_update_time|.
668                        self.context.schedule.last_update_time =
669                            Some(self.time_source.now().into());
670
671                        UpdateCheckFailureReason::Omaha
672                    }
673                    UpdateCheckError::OmahaRequest(request_error) => match request_error {
674                        OmahaRequestError::Json(_)
675                        | OmahaRequestError::HttpBuilder(_)
676                        | OmahaRequestError::CupDecoration(_)
677                        | OmahaRequestError::CupValidation(_) => UpdateCheckFailureReason::Internal,
678                        OmahaRequestError::HttpTransport(_) | OmahaRequestError::HttpStatus(_) => {
679                            UpdateCheckFailureReason::Network
680                        }
681                    },
682                };
683                self.report_metrics(Metrics::UpdateCheckFailureReason(failure_reason));
684
685                self.report_attempts_to_successful_check(false).await;
686                (Err(error), RebootAfterUpdate::NotNeeded)
687            }
688        };
689
690        co.yield_(StateMachineEvent::ScheduleChange(self.context.schedule))
691            .await;
692        co.yield_(StateMachineEvent::ProtocolStateChange(
693            self.context.state.clone(),
694        ))
695        .await;
696        co.yield_(StateMachineEvent::UpdateCheckResult(result))
697            .await;
698
699        self.persist_data().await;
700
701        reboot_after_update
702    }
703
704    // Update self.context.state.consecutive_failed_update_checks and report the metric if
705    // `success`. Does not persist the value to storage, but rather relies on the caller.
706    async fn report_attempts_to_successful_check(&mut self, success: bool) {
707        let attempts = self.context.state.consecutive_failed_update_checks + 1;
708        if success {
709            self.context.state.consecutive_failed_update_checks = 0;
710            self.report_metrics(Metrics::AttemptsToSuccessfulCheck(attempts as u64));
711        } else {
712            self.context.state.consecutive_failed_update_checks = attempts;
713        }
714    }
715
716    /// Update `CONSECUTIVE_FAILED_INSTALL_ATTEMPTS` in storage and report the metrics if
717    /// `success`. Does not commit the change to storage.
718    async fn report_attempts_to_successful_install(&mut self, success: bool) {
719        let storage_ref = self.storage_ref.clone();
720        let mut storage = storage_ref.lock().await;
721        let attempts = storage
722            .get_int(CONSECUTIVE_FAILED_INSTALL_ATTEMPTS)
723            .await
724            .unwrap_or(0)
725            + 1;
726
727        self.report_metrics(Metrics::AttemptsToSuccessfulInstall {
728            count: attempts as u64,
729            successful: success,
730        });
731
732        if success {
733            storage
734                .remove_or_log(CONSECUTIVE_FAILED_INSTALL_ATTEMPTS)
735                .await;
736        } else if let Err(e) = storage
737            .set_int(CONSECUTIVE_FAILED_INSTALL_ATTEMPTS, attempts)
738            .await
739        {
740            error!(
741                "Unable to persist {}: {}",
742                CONSECUTIVE_FAILED_INSTALL_ATTEMPTS, e
743            );
744        }
745    }
746
747    /// Persist all necessary data to storage.
748    async fn persist_data(&self) {
749        let mut storage = self.storage_ref.lock().await;
750        self.context.persist(&mut *storage).await;
751        self.app_set.lock().await.persist(&mut *storage).await;
752
753        storage.commit_or_log().await;
754    }
755
756    /// This function constructs the chain of async futures needed to perform all of the async tasks
757    /// that comprise an update check.
758    async fn perform_update_check(
759        &mut self,
760        request_params: RequestParams,
761        apps: Vec<App>,
762        co: &mut async_generator::Yield<StateMachineEvent>,
763    ) -> Result<(update_check::Response, RebootAfterUpdate<IN::InstallResult>), UpdateCheckError>
764    {
765        Self::yield_state(State::CheckingForUpdates(request_params.source), co).await;
766
767        self.report_check_interval(request_params.source).await;
768
769        // Construct a request for the app(s).
770        let config = self.config.clone();
771        let mut request_builder = RequestBuilder::new(&config, &request_params);
772        for app in &apps {
773            request_builder = request_builder.add_update_check(app).add_ping(app);
774        }
775        let session_id = GUID::new();
776        request_builder = request_builder.session_id(session_id.clone());
777
778        let mut omaha_request_attempt = 1;
779
780        // Attempt in an loop of up to MAX_OMAHA_REQUEST_ATTEMPTS to communicate with Omaha.
781        // exit the loop early on success or an error that isn't related to a transport issue.
782        let loop_result = loop {
783            // Mark the start time for the request to omaha.
784            let omaha_check_start_time = self.time_source.now_in_monotonic();
785            request_builder = request_builder.request_id(GUID::new());
786            let result = self
787                .do_omaha_request_and_update_context(&request_builder, co)
788                .await;
789
790            // Report the response time of the omaha request.
791            {
792                // don't use Instant::elapsed(), it doesn't use the right TimeSource, and can panic!
793                // as a result
794                let now = self.time_source.now_in_monotonic();
795                let duration = now.checked_duration_since(omaha_check_start_time);
796
797                if let Some(response_time) = duration {
798                    self.report_metrics(Metrics::UpdateCheckResponseTime {
799                        response_time,
800                        successful: result.is_ok(),
801                    });
802                } else {
803                    // If this happens, it's a bug.
804                    error!(
805                        "now: {:?}, is before omaha_check_start_time: {:?}",
806                        now, omaha_check_start_time
807                    );
808                }
809            }
810
811            match result {
812                Ok(res) => {
813                    break Ok(res);
814                }
815                Err(OmahaRequestError::Json(e)) => {
816                    error!("Unable to construct request body! {:?}", e);
817                    Self::yield_state(State::ErrorCheckingForUpdate, co).await;
818                    break Err(UpdateCheckError::OmahaRequest(e.into()));
819                }
820                Err(OmahaRequestError::HttpBuilder(e)) => {
821                    error!("Unable to construct HTTP request! {:?}", e);
822                    Self::yield_state(State::ErrorCheckingForUpdate, co).await;
823                    break Err(UpdateCheckError::OmahaRequest(e.into()));
824                }
825                Err(OmahaRequestError::CupDecoration(e)) => {
826                    error!(
827                        "Unable to decorate HTTP request with CUPv2 parameters! {:?}",
828                        e
829                    );
830                    Self::yield_state(State::ErrorCheckingForUpdate, co).await;
831                    break Err(UpdateCheckError::OmahaRequest(e.into()));
832                }
833                Err(OmahaRequestError::CupValidation(e)) => {
834                    error!(
835                        "Unable to validate HTTP response with CUPv2 parameters! {:?}",
836                        e
837                    );
838                    Self::yield_state(State::ErrorCheckingForUpdate, co).await;
839                    break Err(UpdateCheckError::OmahaRequest(e.into()));
840                }
841                Err(OmahaRequestError::HttpTransport(e)) => {
842                    warn!("Unable to contact Omaha: {:?}", e);
843                    // Don't retry if the error was caused by user code, which means we weren't
844                    // using the library correctly.
845                    if omaha_request_attempt >= MAX_OMAHA_REQUEST_ATTEMPTS
846                        || e.is_user()
847                        || self.context.state.server_dictated_poll_interval.is_some()
848                    {
849                        Self::yield_state(State::ErrorCheckingForUpdate, co).await;
850                        break Err(UpdateCheckError::OmahaRequest(e.into()));
851                    }
852                }
853                Err(OmahaRequestError::HttpStatus(e)) => {
854                    warn!("Unable to contact Omaha: {:?}", e);
855                    if omaha_request_attempt >= MAX_OMAHA_REQUEST_ATTEMPTS
856                        || self.context.state.server_dictated_poll_interval.is_some()
857                    {
858                        Self::yield_state(State::ErrorCheckingForUpdate, co).await;
859                        break Err(UpdateCheckError::OmahaRequest(e.into()));
860                    }
861                }
862            }
863
864            // TODO(https://fxbug.dev/42117854): Move this to Policy.
865            // Randomized exponential backoff of 1, 2, & 4 seconds, +/- 500ms.
866            let backoff_time_secs = 1 << (omaha_request_attempt - 1);
867            let backoff_time = randomize(backoff_time_secs * 1000, 1000);
868            info!("Waiting {} ms before retrying...", backoff_time);
869            self.timer
870                .wait_for(Duration::from_millis(backoff_time))
871                .await;
872
873            omaha_request_attempt += 1;
874        };
875
876        self.report_metrics(Metrics::RequestsPerCheck {
877            count: omaha_request_attempt,
878            successful: loop_result.is_ok(),
879        });
880
881        let (_parts, data, request_metadata, signature) = loop_result?;
882
883        let response = match Self::parse_omaha_response(&data) {
884            Ok(res) => res,
885            Err(err) => {
886                warn!("Unable to parse Omaha response: {:?}", err);
887                Self::yield_state(State::ErrorCheckingForUpdate, co).await;
888                self.report_omaha_event_and_update_context(
889                    &request_params,
890                    Event::error(EventErrorCode::ParseResponse),
891                    &apps,
892                    &session_id,
893                    &apps.iter().map(|app| (app.id.clone(), None)).collect(),
894                    None,
895                    co,
896                )
897                .await;
898                return Err(UpdateCheckError::ResponseParser(err));
899            }
900        };
901
902        info!("result: {:?}", response);
903
904        co.yield_(StateMachineEvent::OmahaServerResponse(response.clone()))
905            .await;
906
907        let statuses = Self::get_app_update_statuses(&response);
908        for (app_id, status) in &statuses {
909            // TODO:  Report or metric statuses other than 'no-update' and 'ok'
910            info!("Omaha update check status: {} => {:?}", app_id, status);
911        }
912
913        let apps_with_update: Vec<_> = response
914            .apps
915            .iter()
916            .filter(|app| {
917                matches!(
918                    app.update_check,
919                    Some(UpdateCheck {
920                        status: OmahaStatus::Ok,
921                        ..
922                    })
923                )
924            })
925            .collect();
926
927        if apps_with_update.is_empty() {
928            // A successful, no-update, check
929
930            Self::yield_state(State::NoUpdateAvailable, co).await;
931            Self::make_not_updated_result(response, update_check::Action::NoUpdate)
932        } else {
933            info!(
934                "At least one app has an update, proceeding to build and process an Install Plan"
935            );
936            // A map from app id to the new version of the app, if an app has no update, then it
937            // won't appear in this map, if an app has update but there's no version in the omaha
938            // response, then its entry will be None.
939            let next_versions: HashMap<String, Option<String>> = apps_with_update
940                .iter()
941                .map(|app| (app.id.clone(), app.get_manifest_version()))
942                .collect();
943            let install_plan = match self
944                .installer
945                .try_create_install_plan(
946                    &request_params,
947                    request_metadata.as_ref(),
948                    &response,
949                    data,
950                    signature.map(|s| s.as_bytes().to_vec()),
951                )
952                .await
953            {
954                Ok(plan) => plan,
955                Err(e) => {
956                    error!("Unable to construct install plan! {}", e);
957                    Self::yield_state(State::InstallingUpdate, co).await;
958                    Self::yield_state(State::InstallationError, co).await;
959                    self.report_omaha_event_and_update_context(
960                        &request_params,
961                        Event::error(EventErrorCode::ConstructInstallPlan),
962                        &apps,
963                        &session_id,
964                        &next_versions,
965                        None,
966                        co,
967                    )
968                    .await;
969                    return Err(UpdateCheckError::InstallPlan(e.into()));
970                }
971            };
972
973            info!("Validating Install Plan with Policy");
974            let install_plan_decision = self.policy_engine.update_can_start(&install_plan).await;
975            match install_plan_decision {
976                UpdateDecision::Ok => {
977                    info!("Proceeding with install plan.");
978                }
979                UpdateDecision::DeferredByPolicy => {
980                    info!("Install plan was deferred by Policy.");
981                    // Report "error" to Omaha (as this is an event that needs reporting as the
982                    // install isn't starting immediately.
983                    let event = Event {
984                        event_type: EventType::UpdateComplete,
985                        event_result: EventResult::UpdateDeferred,
986                        ..Event::default()
987                    };
988                    self.report_omaha_event_and_update_context(
989                        &request_params,
990                        event,
991                        &apps,
992                        &session_id,
993                        &next_versions,
994                        None,
995                        co,
996                    )
997                    .await;
998
999                    Self::yield_state(State::InstallationDeferredByPolicy, co).await;
1000
1001                    return Self::make_not_updated_result(
1002                        response,
1003                        update_check::Action::DeferredByPolicy,
1004                    );
1005                }
1006                UpdateDecision::DeniedByPolicy => {
1007                    warn!("Install plan was denied by Policy, see Policy logs for reasoning");
1008                    self.report_omaha_event_and_update_context(
1009                        &request_params,
1010                        Event::error(EventErrorCode::DeniedByPolicy),
1011                        &apps,
1012                        &session_id,
1013                        &next_versions,
1014                        None,
1015                        co,
1016                    )
1017                    .await;
1018
1019                    return Self::make_not_updated_result(
1020                        response,
1021                        update_check::Action::DeniedByPolicy,
1022                    );
1023                }
1024            }
1025
1026            Self::yield_state(State::InstallingUpdate, co).await;
1027            self.report_omaha_event_and_update_context(
1028                &request_params,
1029                Event::success(EventType::UpdateDownloadStarted),
1030                &apps,
1031                &session_id,
1032                &next_versions,
1033                None,
1034                co,
1035            )
1036            .await;
1037
1038            let install_plan_id = install_plan.id();
1039            let update_start_time = self.time_source.now_in_walltime();
1040            let update_first_seen_time = self
1041                .record_update_first_seen_time(&install_plan_id, update_start_time)
1042                .await;
1043
1044            let (send, mut recv) = mpsc::channel(0);
1045            let observer = StateMachineProgressObserver(send);
1046            let perform_install = async {
1047                let result = self
1048                    .installer
1049                    .perform_install(&install_plan, Some(&observer))
1050                    .await;
1051                // Drop observer so that we can stop waiting for the next progress.
1052                drop(observer);
1053                result
1054            };
1055            let yield_progress = async {
1056                while let Some(progress) = recv.next().await {
1057                    co.yield_(StateMachineEvent::InstallProgressChange(progress))
1058                        .await;
1059                }
1060            };
1061
1062            let ((install_result, mut app_install_results), ()) =
1063                future::join(perform_install, yield_progress).await;
1064            let no_apps_failed = app_install_results.iter().all(|result| {
1065                matches!(
1066                    result,
1067                    AppInstallResult::Installed | AppInstallResult::Deferred
1068                )
1069            });
1070            let update_finish_time = self.time_source.now_in_walltime();
1071            let install_duration = match update_finish_time.duration_since(update_start_time) {
1072                Ok(duration) => {
1073                    let metrics = if no_apps_failed {
1074                        Metrics::SuccessfulUpdateDuration(duration)
1075                    } else {
1076                        Metrics::FailedUpdateDuration(duration)
1077                    };
1078                    self.report_metrics(metrics);
1079                    Some(duration)
1080                }
1081                Err(e) => {
1082                    warn!("Update start time is in the future: {}", e);
1083                    None
1084                }
1085            };
1086
1087            let config = self.config.clone();
1088            let mut request_builder = RequestBuilder::new(&config, &request_params);
1089            let mut events = vec![];
1090            let mut installed_apps = vec![];
1091            for (response_app, app_install_result) in
1092                apps_with_update.iter().zip(&app_install_results)
1093            {
1094                match apps.iter().find(|app| app.id == response_app.id) {
1095                    Some(app) => {
1096                        let event = match app_install_result {
1097                            AppInstallResult::Installed => {
1098                                installed_apps.push(app);
1099                                Event::success(EventType::UpdateDownloadFinished)
1100                            }
1101                            AppInstallResult::Deferred => Event {
1102                                event_type: EventType::UpdateComplete,
1103                                event_result: EventResult::UpdateDeferred,
1104                                ..Event::default()
1105                            },
1106                            AppInstallResult::Failed(_) => {
1107                                Event::error(EventErrorCode::Installation)
1108                            }
1109                        };
1110                        let event = Event {
1111                            previous_version: Some(app.version.to_string()),
1112                            next_version: response_app.get_manifest_version(),
1113                            download_time_ms: install_duration
1114                                .and_then(|d| d.as_millis().try_into().ok()),
1115                            ..event
1116                        };
1117                        request_builder = request_builder.add_event(app, event.clone());
1118                        events.push(event);
1119                    }
1120                    None => {
1121                        error!("unknown app id in omaha response: {:?}", response_app.id);
1122                    }
1123                }
1124            }
1125            request_builder = request_builder
1126                .session_id(session_id.clone())
1127                .request_id(GUID::new());
1128            if let Err(e) = self
1129                .do_omaha_request_and_update_context(&request_builder, co)
1130                .await
1131            {
1132                for event in events {
1133                    self.report_metrics(Metrics::OmahaEventLost(event));
1134                }
1135                warn!("Unable to report event to Omaha: {:?}", e);
1136            }
1137
1138            // TODO: Verify downloaded update if needed.
1139
1140            // For apps that successfully installed, we need to report an extra `UpdateComplete` event.
1141            if !installed_apps.is_empty() {
1142                self.report_omaha_event_and_update_context(
1143                    &request_params,
1144                    Event::success(EventType::UpdateComplete),
1145                    installed_apps,
1146                    &session_id,
1147                    &next_versions,
1148                    install_duration,
1149                    co,
1150                )
1151                .await;
1152            }
1153
1154            let mut errors = vec![];
1155            let daystart = response.daystart;
1156            let app_responses = response
1157                .apps
1158                .into_iter()
1159                .map(|app| update_check::AppResponse {
1160                    app_id: app.id,
1161                    cohort: app.cohort,
1162                    user_counting: daystart.clone().into(),
1163                    result: match app.update_check {
1164                        Some(UpdateCheck {
1165                            status: OmahaStatus::Ok,
1166                            ..
1167                        }) => match app_install_results.remove(0) {
1168                            AppInstallResult::Installed => update_check::Action::Updated,
1169                            AppInstallResult::Deferred => update_check::Action::DeferredByPolicy,
1170                            AppInstallResult::Failed(e) => {
1171                                errors.push(e);
1172                                update_check::Action::InstallPlanExecutionError
1173                            }
1174                        },
1175                        _ => update_check::Action::NoUpdate,
1176                    },
1177                })
1178                .collect();
1179
1180            if !errors.is_empty() {
1181                for e in errors {
1182                    co.yield_(StateMachineEvent::InstallerError(Some(Box::new(e))))
1183                        .await;
1184                }
1185                Self::yield_state(State::InstallationError, co).await;
1186
1187                return Ok((
1188                    update_check::Response { app_responses },
1189                    RebootAfterUpdate::NotNeeded,
1190                ));
1191            }
1192
1193            match update_finish_time.duration_since(update_first_seen_time) {
1194                Ok(duration) => {
1195                    self.report_metrics(Metrics::SuccessfulUpdateFromFirstSeen(duration))
1196                }
1197                Err(e) => warn!("Update first seen time is in the future: {}", e),
1198            }
1199            {
1200                let mut storage = self.storage_ref.lock().await;
1201                if let Err(e) = storage
1202                    .set_time(UPDATE_FINISH_TIME, update_finish_time)
1203                    .await
1204                {
1205                    error!("Unable to persist {}: {}", UPDATE_FINISH_TIME, e);
1206                }
1207                let app_set = self.app_set.lock().await;
1208                let system_app_id = app_set.get_system_app_id();
1209                // If not found then this is not a system update, so no need to write target version.
1210                if let Some(next_version) = next_versions.get(system_app_id) {
1211                    let target_version = next_version.as_deref().unwrap_or_else(|| {
1212                        error!("Target version string not found in Omaha response.");
1213                        "UNKNOWN"
1214                    });
1215                    if let Err(e) = storage.set_string(TARGET_VERSION, target_version).await {
1216                        error!("Unable to persist {}: {}", TARGET_VERSION, e);
1217                    }
1218                }
1219                storage.commit_or_log().await;
1220            }
1221
1222            let reboot_after_update = if self.policy_engine.reboot_needed(&install_plan).await {
1223                RebootAfterUpdate::Needed(install_result)
1224            } else {
1225                RebootAfterUpdate::NotNeeded
1226            };
1227
1228            Ok((
1229                update_check::Response { app_responses },
1230                reboot_after_update,
1231            ))
1232        }
1233    }
1234
1235    /// Report the given |event| to Omaha, errors occurred during reporting are logged but not
1236    /// acted on.
1237    #[allow(clippy::too_many_arguments)]
1238    async fn report_omaha_event_and_update_context<'a>(
1239        &'a mut self,
1240        request_params: &'a RequestParams,
1241        event: Event,
1242        apps: impl IntoIterator<Item = &App>,
1243        session_id: &GUID,
1244        next_versions: &HashMap<String, Option<String>>,
1245        install_duration: Option<Duration>,
1246        co: &mut async_generator::Yield<StateMachineEvent>,
1247    ) {
1248        let config = self.config.clone();
1249        let mut request_builder = RequestBuilder::new(&config, request_params);
1250        for app in apps {
1251            // Skip apps with no update.
1252            if let Some(next_version) = next_versions.get(&app.id) {
1253                let event = Event {
1254                    previous_version: Some(app.version.to_string()),
1255                    next_version: next_version.clone(),
1256                    download_time_ms: install_duration.and_then(|d| d.as_millis().try_into().ok()),
1257                    ..event.clone()
1258                };
1259                request_builder = request_builder.add_event(app, event);
1260            }
1261        }
1262        request_builder = request_builder
1263            .session_id(session_id.clone())
1264            .request_id(GUID::new());
1265        if let Err(e) = self
1266            .do_omaha_request_and_update_context(&request_builder, co)
1267            .await
1268        {
1269            self.report_metrics(Metrics::OmahaEventLost(event));
1270            warn!("Unable to report event to Omaha: {:?}", e);
1271        }
1272    }
1273
1274    /// Sends a ping to Omaha and updates context and app_set.
1275    async fn ping_omaha(&mut self, co: &mut async_generator::Yield<StateMachineEvent>) {
1276        let apps = self.app_set.lock().await.get_apps();
1277        let request_params = RequestParams {
1278            source: InstallSource::ScheduledTask,
1279            use_configured_proxies: true,
1280            disable_updates: false,
1281            offer_update_if_same_version: false,
1282        };
1283        let config = self.config.clone();
1284        let mut request_builder = RequestBuilder::new(&config, &request_params);
1285        for app in &apps {
1286            request_builder = request_builder.add_ping(app);
1287        }
1288        request_builder = request_builder
1289            .session_id(GUID::new())
1290            .request_id(GUID::new());
1291
1292        let (_parts, data, _request_metadata, _signature) = match self
1293            .do_omaha_request_and_update_context(&request_builder, co)
1294            .await
1295        {
1296            Ok(res) => res,
1297            Err(e) => {
1298                error!("Ping Omaha failed: {:#}", anyhow!(e));
1299                self.context.state.consecutive_failed_update_checks += 1;
1300                self.persist_data().await;
1301                return;
1302            }
1303        };
1304
1305        let response = match Self::parse_omaha_response(&data) {
1306            Ok(res) => res,
1307            Err(e) => {
1308                error!("Unable to parse Omaha response: {:#}", anyhow!(e));
1309                self.context.state.consecutive_failed_update_checks += 1;
1310                self.persist_data().await;
1311                return;
1312            }
1313        };
1314
1315        self.context.state.consecutive_failed_update_checks = 0;
1316
1317        // Even though this is a ping, we should still update the last_update_time for
1318        // policy to compute the next ping time.
1319        self.context.schedule.last_update_time = Some(self.time_source.now().into());
1320        co.yield_(StateMachineEvent::ScheduleChange(self.context.schedule))
1321            .await;
1322
1323        let app_responses = Self::make_app_responses(response, update_check::Action::NoUpdate);
1324        self.app_set.lock().await.update_from_omaha(&app_responses);
1325
1326        self.persist_data().await;
1327    }
1328
1329    /// Make an http request to Omaha, and collect the response into an error or a blob of bytes
1330    /// that can be parsed.
1331    ///
1332    /// Given the http client and the request build, this makes the http request, and then coalesces
1333    /// the various errors into a single error type for easier error handling by the make process
1334    /// flow.
1335    ///
1336    /// This function also converts an HTTP error response into an Error, to divert those into the
1337    /// error handling paths instead of the Ok() path.
1338    ///
1339    /// If a valid X-Retry-After header is found in the response, this function will update the
1340    /// server dictated poll interval in context.
1341    async fn do_omaha_request_and_update_context<'a>(
1342        &'a mut self,
1343        builder: &RequestBuilder<'a>,
1344        co: &mut async_generator::Yield<StateMachineEvent>,
1345    ) -> Result<
1346        (
1347            Parts,
1348            Vec<u8>,
1349            Option<RequestMetadata>,
1350            Option<DerSignature>,
1351        ),
1352        OmahaRequestError,
1353    > {
1354        let (request, request_metadata) = builder.build(self.cup_handler.as_ref())?;
1355        let response = Self::make_request(&mut self.http, request).await?;
1356
1357        let signature: Option<DerSignature> = if let (Some(handler), Some(metadata)) =
1358            (self.cup_handler.as_ref(), &request_metadata)
1359        {
1360            let signature = handler
1361                .verify_response(metadata, &response, metadata.public_key_id)
1362                .map_err(|e| {
1363                    error!("Could not verify response: {:?}", e);
1364                    e
1365                })?;
1366            Some(signature)
1367        } else {
1368            None
1369        };
1370
1371        let (parts, body) = response.into_parts();
1372
1373        // Clients MUST respect this header even if paired with non-successful HTTP response code.
1374        let server_dictated_poll_interval = parts.headers.get(X_RETRY_AFTER).and_then(|header| {
1375            match header
1376                .to_str()
1377                .map_err(|e| anyhow!(e))
1378                .and_then(|s| s.parse::<u64>().map_err(|e| anyhow!(e)))
1379            {
1380                Ok(seconds) => {
1381                    // Servers SHOULD NOT send a value in excess of 86400 (24 hours), and clients
1382                    // SHOULD treat values greater than 86400 as 86400.
1383                    Some(Duration::from_secs(min(seconds, 86400)))
1384                }
1385                Err(e) => {
1386                    error!("Unable to parse {} header: {:#}", X_RETRY_AFTER, e);
1387                    None
1388                }
1389            }
1390        });
1391        if self.context.state.server_dictated_poll_interval != server_dictated_poll_interval {
1392            self.context.state.server_dictated_poll_interval = server_dictated_poll_interval;
1393            co.yield_(StateMachineEvent::ProtocolStateChange(
1394                self.context.state.clone(),
1395            ))
1396            .await;
1397            let mut storage = self.storage_ref.lock().await;
1398            self.context.persist(&mut *storage).await;
1399            storage.commit_or_log().await;
1400        }
1401        if !parts.status.is_success() {
1402            // Convert HTTP failure responses into Errors.
1403            Err(OmahaRequestError::HttpStatus(parts.status))
1404        } else {
1405            // Pass successful responses to the caller.
1406            info!("Omaha HTTP response: {}", parts.status);
1407            Ok((parts, body, request_metadata, signature))
1408        }
1409    }
1410
1411    /// Make an http request and collect the response body into a Vec of bytes.
1412    ///
1413    /// Specifically, this takes the body of the response and concatenates it into a single Vec of
1414    /// bytes so that any errors in receiving it can be captured immediately, instead of needing to
1415    /// handle them as part of parsing the response body.
1416    async fn make_request(
1417        http_client: &mut HR,
1418        request: http::Request<hyper::Body>,
1419    ) -> Result<HttpResponse<Vec<u8>>, http_request::Error> {
1420        info!("Making http request to: {}", request.uri());
1421        http_client.request(request).await.map_err(|err| {
1422            warn!("Unable to perform request: {}", err);
1423            err
1424        })
1425    }
1426
1427    /// This method takes the response bytes from Omaha, and converts them into a protocol::Response
1428    /// struct, returning all of the various errors that can occur in that process as a consolidated
1429    /// error enum.
1430    fn parse_omaha_response(data: &[u8]) -> Result<Response, ResponseParseError> {
1431        parse_json_response(data).map_err(ResponseParseError::Json)
1432    }
1433
1434    /// Utility to extract pairs of app id => omaha status response, to make it easier to ask
1435    /// questions about the response.
1436    fn get_app_update_statuses(response: &Response) -> Vec<(&str, &OmahaStatus)> {
1437        response
1438            .apps
1439            .iter()
1440            .filter_map(|app| {
1441                app.update_check
1442                    .as_ref()
1443                    .map(|u| (app.id.as_str(), &u.status))
1444            })
1445            .collect()
1446    }
1447
1448    /// Utility to take a set of protocol::response::Apps and then construct a set of AppResponse
1449    /// from the update check based on those app IDs.
1450    ///
1451    /// TODO(https://fxbug.dev/42170288): Change the Policy and Installer to return a set of results, one for
1452    ///                        each app ID, then make this match that.
1453    fn make_app_responses(
1454        response: protocol::response::Response,
1455        action: update_check::Action,
1456    ) -> Vec<update_check::AppResponse> {
1457        let daystart = response.daystart;
1458        response
1459            .apps
1460            .into_iter()
1461            .map(|app| update_check::AppResponse {
1462                app_id: app.id,
1463                cohort: app.cohort,
1464                user_counting: daystart.clone().into(),
1465                result: action.clone(),
1466            })
1467            .collect()
1468    }
1469
1470    /// Make an Ok result for `perform_update_check()` when update wasn't installed/failed.
1471    fn make_not_updated_result(
1472        response: protocol::response::Response,
1473        action: update_check::Action,
1474    ) -> Result<(update_check::Response, RebootAfterUpdate<IN::InstallResult>), UpdateCheckError>
1475    {
1476        Ok((
1477            update_check::Response {
1478                app_responses: Self::make_app_responses(response, action),
1479            },
1480            RebootAfterUpdate::NotNeeded,
1481        ))
1482    }
1483
1484    /// Send the state to the observer.
1485    async fn yield_state(state: State, co: &mut async_generator::Yield<StateMachineEvent>) {
1486        co.yield_(StateMachineEvent::StateChange(state)).await;
1487    }
1488
1489    fn report_metrics(&mut self, metrics: Metrics) {
1490        if let Err(err) = self.metrics_reporter.report_metrics(metrics) {
1491            warn!("Unable to report metrics: {:?}", err);
1492        }
1493    }
1494
1495    async fn record_update_first_seen_time(
1496        &mut self,
1497        install_plan_id: &str,
1498        now: SystemTime,
1499    ) -> SystemTime {
1500        let mut storage = self.storage_ref.lock().await;
1501        let previous_id = storage.get_string(INSTALL_PLAN_ID).await;
1502        if let Some(previous_id) = previous_id {
1503            if previous_id == install_plan_id {
1504                return storage
1505                    .get_time(UPDATE_FIRST_SEEN_TIME)
1506                    .await
1507                    .unwrap_or(now);
1508            }
1509        }
1510        // Update INSTALL_PLAN_ID and UPDATE_FIRST_SEEN_TIME for new update.
1511        if let Err(e) = storage.set_string(INSTALL_PLAN_ID, install_plan_id).await {
1512            error!("Unable to persist {}: {}", INSTALL_PLAN_ID, e);
1513            return now;
1514        }
1515        if let Err(e) = storage.set_time(UPDATE_FIRST_SEEN_TIME, now).await {
1516            error!("Unable to persist {}: {}", UPDATE_FIRST_SEEN_TIME, e);
1517            let _ = storage.remove(INSTALL_PLAN_ID).await;
1518            return now;
1519        }
1520        storage.commit_or_log().await;
1521        now
1522    }
1523}
1524
1525/// Return a random number in [n - range / 2, n - range / 2 + range).
1526fn randomize(n: u64, range: u64) -> u64 {
1527    n - range / 2 + rand::random::<u64>() % range
1528}
1529
1530#[cfg(test)]
1531impl<PE, HR, IN, TM, MR, ST, AS, IR, PL, CH> StateMachine<PE, HR, IN, TM, MR, ST, AS, CH>
1532where
1533    PE: PolicyEngine<InstallResult = IR, InstallPlan = PL>,
1534    HR: HttpRequest,
1535    IN: Installer<InstallResult = IR, InstallPlan = PL>,
1536    TM: Timer,
1537    MR: MetricsReporter,
1538    ST: Storage,
1539    AS: AppSet,
1540    CH: Cupv2Handler,
1541    IR: 'static + Send,
1542    PL: Plan,
1543{
1544    /// Run perform_update_check once, returning the update check result.
1545    async fn oneshot(
1546        &mut self,
1547        request_params: RequestParams,
1548    ) -> Result<(update_check::Response, RebootAfterUpdate<IN::InstallResult>), UpdateCheckError>
1549    {
1550        let apps = self.app_set.lock().await.get_apps();
1551
1552        async_generator::generate(move |mut co| async move {
1553            self.perform_update_check(request_params, apps, &mut co)
1554                .await
1555        })
1556        .into_complete()
1557        .await
1558    }
1559
1560    /// Run start_upate_check once, discarding its states.
1561    async fn run_once(&mut self) {
1562        let request_params = RequestParams::default();
1563
1564        async_generator::generate(move |mut co| async move {
1565            self.start_update_check(request_params, &mut co).await;
1566        })
1567        .map(|_| ())
1568        .collect::<()>()
1569        .await;
1570    }
1571}
1572
1573#[cfg(test)]
1574mod tests {
1575    use super::update_check::{
1576        Action, CONSECUTIVE_FAILED_UPDATE_CHECKS, LAST_UPDATE_TIME, SERVER_DICTATED_POLL_INTERVAL,
1577    };
1578    use super::*;
1579    use crate::{
1580        app_set::VecAppSet,
1581        common::{
1582            App, CheckOptions, PersistedApp, ProtocolState, UpdateCheckSchedule, UserCounting,
1583        },
1584        configuration::Updater,
1585        cup_ecdsa::test_support::{make_cup_handler_for_test, MockCupv2Handler},
1586        http_request::mock::MockHttpRequest,
1587        installer::{
1588            stub::{StubInstallErrors, StubInstaller, StubPlan},
1589            ProgressObserver,
1590        },
1591        metrics::MockMetricsReporter,
1592        policy::{MockPolicyEngine, StubPolicyEngine},
1593        protocol::{request::OS, response, Cohort},
1594        storage::MemStorage,
1595        time::{
1596            timers::{BlockingTimer, MockTimer, RequestedWait},
1597            MockTimeSource, PartialComplexTime,
1598        },
1599        version::Version,
1600    };
1601    use assert_matches::assert_matches;
1602    use futures::executor::{block_on, LocalPool};
1603    use futures::future::LocalBoxFuture;
1604    use futures::task::LocalSpawnExt;
1605    use pretty_assertions::assert_eq;
1606    use serde_json::json;
1607    use std::cell::RefCell;
1608    use std::time::Duration;
1609    use tracing::info;
1610
1611    fn make_test_app_set() -> Rc<Mutex<VecAppSet>> {
1612        Rc::new(Mutex::new(VecAppSet::new(vec![App::builder()
1613            .id("{00000000-0000-0000-0000-000000000001}")
1614            .version([1, 2, 3, 4])
1615            .cohort(Cohort::new("stable-channel"))
1616            .build()])))
1617    }
1618
1619    fn make_update_available_response() -> HttpResponse<Vec<u8>> {
1620        let response = json!({"response":{
1621          "server": "prod",
1622          "protocol": "3.0",
1623          "app": [{
1624            "appid": "{00000000-0000-0000-0000-000000000001}",
1625            "status": "ok",
1626            "updatecheck": {
1627              "status": "ok"
1628            }
1629          }],
1630        }});
1631        HttpResponse::new(serde_json::to_vec(&response).unwrap())
1632    }
1633
1634    fn make_noupdate_httpresponse() -> Vec<u8> {
1635        serde_json::to_vec(
1636            &(json!({"response":{
1637              "server": "prod",
1638              "protocol": "3.0",
1639              "app": [{
1640                "appid": "{00000000-0000-0000-0000-000000000001}",
1641                "status": "ok",
1642                "updatecheck": {
1643                  "status": "noupdate"
1644                }
1645              }]
1646            }})),
1647        )
1648        .unwrap()
1649    }
1650
1651    // Assert that the last request made to |http| is equal to the request built by
1652    // |request_builder|.
1653    async fn assert_request<'a>(http: &MockHttpRequest, request_builder: RequestBuilder<'a>) {
1654        let cup_handler = make_cup_handler_for_test();
1655        let (request, _request_metadata) = request_builder.build(Some(&cup_handler)).unwrap();
1656        let body = hyper::body::to_bytes(request).await.unwrap();
1657        // Compare string instead of Vec<u8> for easier debugging.
1658        let body_str = String::from_utf8_lossy(&body);
1659        http.assert_body_str(&body_str).await;
1660    }
1661
1662    #[test]
1663    fn run_simple_check_with_noupdate_result() {
1664        block_on(async {
1665            let http = MockHttpRequest::new(HttpResponse::new(make_noupdate_httpresponse()));
1666
1667            StateMachineBuilder::new_stub()
1668                .http(http)
1669                .oneshot(RequestParams::default())
1670                .await
1671                .unwrap();
1672
1673            info!("update check complete!");
1674        });
1675    }
1676
1677    #[test]
1678    fn test_cohort_returned_with_noupdate_result() {
1679        block_on(async {
1680            let response = json!({"response":{
1681              "server": "prod",
1682              "protocol": "3.0",
1683              "app": [{
1684                "appid": "{00000000-0000-0000-0000-000000000001}",
1685                "status": "ok",
1686                "cohort": "1",
1687                "cohortname": "stable-channel",
1688                "updatecheck": {
1689                  "status": "noupdate"
1690                }
1691              }]
1692            }});
1693            let response = serde_json::to_vec(&response).unwrap();
1694            let http = MockHttpRequest::new(HttpResponse::new(response));
1695
1696            let (response, reboot_after_update) = StateMachineBuilder::new_stub()
1697                .http(http)
1698                .oneshot(RequestParams::default())
1699                .await
1700                .unwrap();
1701            assert_eq!(
1702                "{00000000-0000-0000-0000-000000000001}",
1703                response.app_responses[0].app_id
1704            );
1705            assert_eq!(Some("1".into()), response.app_responses[0].cohort.id);
1706            assert_eq!(
1707                Some("stable-channel".into()),
1708                response.app_responses[0].cohort.name
1709            );
1710            assert_eq!(None, response.app_responses[0].cohort.hint);
1711
1712            assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
1713        });
1714    }
1715
1716    #[test]
1717    fn test_cohort_returned_with_update_result() {
1718        block_on(async {
1719            let response = json!({"response":{
1720              "server": "prod",
1721              "protocol": "3.0",
1722              "app": [{
1723                "appid": "{00000000-0000-0000-0000-000000000001}",
1724                "status": "ok",
1725                "cohort": "1",
1726                "cohortname": "stable-channel",
1727                "updatecheck": {
1728                  "status": "ok"
1729                }
1730              }]
1731            }});
1732            let response = serde_json::to_vec(&response).unwrap();
1733            let http = MockHttpRequest::new(HttpResponse::new(response));
1734
1735            let (response, reboot_after_update) = StateMachineBuilder::new_stub()
1736                .http(http)
1737                .oneshot(RequestParams::default())
1738                .await
1739                .unwrap();
1740            assert_eq!(
1741                "{00000000-0000-0000-0000-000000000001}",
1742                response.app_responses[0].app_id
1743            );
1744            assert_eq!(Some("1".into()), response.app_responses[0].cohort.id);
1745            assert_eq!(
1746                Some("stable-channel".into()),
1747                response.app_responses[0].cohort.name
1748            );
1749            assert_eq!(None, response.app_responses[0].cohort.hint);
1750
1751            assert_matches!(reboot_after_update, RebootAfterUpdate::Needed(()));
1752        });
1753    }
1754
1755    #[test]
1756    fn test_report_parse_response_error() {
1757        block_on(async {
1758            let http = MockHttpRequest::new(HttpResponse::new("invalid response".into()));
1759
1760            let mut state_machine = StateMachineBuilder::new_stub().http(http).build().await;
1761
1762            let response = state_machine.oneshot(RequestParams::default()).await;
1763            assert_matches!(response, Err(UpdateCheckError::ResponseParser(_)));
1764
1765            let request_params = RequestParams::default();
1766            let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
1767            let event = Event {
1768                previous_version: Some("1.2.3.4".to_string()),
1769                ..Event::error(EventErrorCode::ParseResponse)
1770            };
1771            let apps = state_machine.app_set.lock().await.get_apps();
1772            request_builder = request_builder
1773                .add_event(&apps[0], event)
1774                .session_id(GUID::from_u128(0))
1775                .request_id(GUID::from_u128(2));
1776            assert_request(&state_machine.http, request_builder).await;
1777        });
1778    }
1779
1780    #[test]
1781    fn test_report_construct_install_plan_error() {
1782        block_on(async {
1783            let response = json!({"response":{
1784              "server": "prod",
1785              "protocol": "4.0",
1786              "app": [{
1787                "appid": "{00000000-0000-0000-0000-000000000001}",
1788                "status": "ok",
1789                "updatecheck": {
1790                  "status": "ok"
1791                }
1792              }],
1793            }});
1794            let response = serde_json::to_vec(&response).unwrap();
1795            let http = MockHttpRequest::new(HttpResponse::new(response));
1796
1797            let mut state_machine = StateMachineBuilder::new_stub().http(http).build().await;
1798
1799            let response = state_machine.oneshot(RequestParams::default()).await;
1800            assert_matches!(response, Err(UpdateCheckError::InstallPlan(_)));
1801
1802            let request_params = RequestParams::default();
1803            let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
1804            let event = Event {
1805                previous_version: Some("1.2.3.4".to_string()),
1806                ..Event::error(EventErrorCode::ConstructInstallPlan)
1807            };
1808            let apps = state_machine.app_set.lock().await.get_apps();
1809            request_builder = request_builder
1810                .add_event(&apps[0], event)
1811                .session_id(GUID::from_u128(0))
1812                .request_id(GUID::from_u128(2));
1813            assert_request(&state_machine.http, request_builder).await;
1814        });
1815    }
1816
1817    #[test]
1818    fn test_report_installation_error() {
1819        block_on(async {
1820            let response = json!({"response":{
1821              "server": "prod",
1822              "protocol": "3.0",
1823              "app": [{
1824                "appid": "{00000000-0000-0000-0000-000000000001}",
1825                "status": "ok",
1826                "updatecheck": {
1827                  "status": "ok",
1828                  "manifest": {
1829                      "version": "5.6.7.8",
1830                      "actions": {
1831                          "action": [],
1832                      },
1833                      "packages": {
1834                          "package": [],
1835                      },
1836                  }
1837                }
1838              }],
1839            }});
1840            let response = serde_json::to_vec(&response).unwrap();
1841            let http = MockHttpRequest::new(HttpResponse::new(response));
1842
1843            let mut state_machine = StateMachineBuilder::new_stub()
1844                .http(http)
1845                .installer(StubInstaller { should_fail: true })
1846                .build()
1847                .await;
1848
1849            let (response, reboot_after_update) = state_machine
1850                .oneshot(RequestParams::default())
1851                .await
1852                .unwrap();
1853            assert_eq!(
1854                Action::InstallPlanExecutionError,
1855                response.app_responses[0].result
1856            );
1857            assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
1858
1859            let request_params = RequestParams::default();
1860            let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
1861            let event = Event {
1862                previous_version: Some("1.2.3.4".to_string()),
1863                next_version: Some("5.6.7.8".to_string()),
1864                download_time_ms: Some(0),
1865                ..Event::error(EventErrorCode::Installation)
1866            };
1867            let apps = state_machine.app_set.lock().await.get_apps();
1868            request_builder = request_builder
1869                .add_event(&apps[0], event)
1870                .session_id(GUID::from_u128(0))
1871                .request_id(GUID::from_u128(3));
1872            assert_request(&state_machine.http, request_builder).await;
1873        });
1874    }
1875
1876    #[test]
1877    fn test_report_installation_error_multi_app() {
1878        block_on(async {
1879            // Intentionally made the app order in response and app_set different.
1880            let response = json!({"response":{
1881              "server": "prod",
1882              "protocol": "3.0",
1883              "app": [{
1884                "appid": "appid_3",
1885                "status": "ok",
1886                "updatecheck": {
1887                  "status": "ok",
1888                  "manifest": {
1889                      "version": "5.6.7.8",
1890                      "actions": {
1891                          "action": [],
1892                      },
1893                      "packages": {
1894                          "package": [],
1895                      },
1896                  }
1897                }
1898              },{
1899                "appid": "appid_1",
1900                "status": "ok",
1901                "updatecheck": {
1902                  "status": "ok",
1903                  "manifest": {
1904                      "version": "1.2.3.4",
1905                      "actions": {
1906                          "action": [],
1907                      },
1908                      "packages": {
1909                          "package": [],
1910                      },
1911                  }
1912                }
1913              },{
1914                "appid": "appid_2",
1915                "status": "ok",
1916                "updatecheck": {
1917                  "status": "noupdate",
1918                }
1919              }],
1920            }});
1921            let response = serde_json::to_vec(&response).unwrap();
1922            let mut http = MockHttpRequest::new(HttpResponse::new(response));
1923            http.add_response(HttpResponse::new(vec![]));
1924            let app_set = VecAppSet::new(vec![
1925                App::builder().id("appid_1").version([1, 2, 3, 3]).build(),
1926                App::builder().id("appid_2").version([9, 9, 9, 9]).build(),
1927                App::builder().id("appid_3").version([5, 6, 7, 7]).build(),
1928            ]);
1929            let app_set = Rc::new(Mutex::new(app_set));
1930            let (send_install, mut recv_install) = mpsc::channel(0);
1931
1932            let mut state_machine = StateMachineBuilder::new_stub()
1933                .app_set(Rc::clone(&app_set))
1934                .http(http)
1935                .installer(BlockingInstaller {
1936                    on_install: send_install,
1937                    on_reboot: None,
1938                })
1939                .build()
1940                .await;
1941
1942            let recv_install_fut = async move {
1943                let unblock_install = recv_install.next().await.unwrap();
1944                unblock_install
1945                    .send(vec![
1946                        AppInstallResult::Deferred,
1947                        AppInstallResult::Installed,
1948                    ])
1949                    .unwrap();
1950            };
1951
1952            let (oneshot_result, ()) = future::join(
1953                state_machine.oneshot(RequestParams::default()),
1954                recv_install_fut,
1955            )
1956            .await;
1957            let (response, reboot_after_update) = oneshot_result.unwrap();
1958
1959            assert_eq!("appid_3", response.app_responses[0].app_id);
1960            assert_eq!(Action::DeferredByPolicy, response.app_responses[0].result);
1961            assert_eq!("appid_1", response.app_responses[1].app_id);
1962            assert_eq!(Action::Updated, response.app_responses[1].result);
1963            assert_eq!("appid_2", response.app_responses[2].app_id);
1964            assert_eq!(Action::NoUpdate, response.app_responses[2].result);
1965            assert_matches!(reboot_after_update, RebootAfterUpdate::Needed(()));
1966
1967            let request_params = RequestParams::default();
1968            let apps = app_set.lock().await.get_apps();
1969
1970            let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
1971            let event = Event {
1972                previous_version: Some("1.2.3.3".to_string()),
1973                next_version: Some("1.2.3.4".to_string()),
1974                download_time_ms: Some(0),
1975                ..Event::success(EventType::UpdateComplete)
1976            };
1977            request_builder = request_builder
1978                .add_event(&apps[0], event)
1979                .session_id(GUID::from_u128(0))
1980                .request_id(GUID::from_u128(4));
1981            assert_request(&state_machine.http, request_builder).await;
1982
1983            let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
1984            let event1 = Event {
1985                previous_version: Some("1.2.3.3".to_string()),
1986                next_version: Some("1.2.3.4".to_string()),
1987                download_time_ms: Some(0),
1988                ..Event::success(EventType::UpdateDownloadFinished)
1989            };
1990            let event2 = Event {
1991                previous_version: Some("5.6.7.7".to_string()),
1992                next_version: Some("5.6.7.8".to_string()),
1993                download_time_ms: Some(0),
1994                event_type: EventType::UpdateComplete,
1995                event_result: EventResult::UpdateDeferred,
1996                ..Event::default()
1997            };
1998            request_builder = request_builder
1999                .add_event(&apps[2], event2)
2000                .add_event(&apps[0], event1)
2001                .session_id(GUID::from_u128(0))
2002                .request_id(GUID::from_u128(3));
2003            assert_request(&state_machine.http, request_builder).await;
2004        });
2005    }
2006
2007    // Test that our observer can see when there's an installation error, and that it gets
2008    // the right error type.
2009    #[test]
2010    fn test_observe_installation_error() {
2011        block_on(async {
2012            let http = MockHttpRequest::new(make_update_available_response());
2013
2014            let actual_errors = StateMachineBuilder::new_stub()
2015                .http(http)
2016                .installer(StubInstaller { should_fail: true })
2017                .oneshot_check()
2018                .await
2019                .filter_map(|event| {
2020                    future::ready(match event {
2021                        StateMachineEvent::InstallerError(Some(e)) => {
2022                            Some(*e.downcast::<StubInstallErrors>().unwrap())
2023                        }
2024                        _ => None,
2025                    })
2026                })
2027                .collect::<Vec<StubInstallErrors>>()
2028                .await;
2029
2030            let expected_errors = vec![StubInstallErrors::Failed];
2031            assert_eq!(actual_errors, expected_errors);
2032        });
2033    }
2034
2035    #[test]
2036    fn test_report_deferred_by_policy() {
2037        block_on(async {
2038            let http = MockHttpRequest::new(make_update_available_response());
2039
2040            let policy_engine = MockPolicyEngine {
2041                update_decision: UpdateDecision::DeferredByPolicy,
2042                ..MockPolicyEngine::default()
2043            };
2044            let mut state_machine = StateMachineBuilder::new_stub()
2045                .policy_engine(policy_engine)
2046                .http(http)
2047                .build()
2048                .await;
2049
2050            let (response, reboot_after_update) = state_machine
2051                .oneshot(RequestParams::default())
2052                .await
2053                .unwrap();
2054            assert_eq!(Action::DeferredByPolicy, response.app_responses[0].result);
2055            assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
2056
2057            let request_params = RequestParams::default();
2058            let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
2059            let event = Event {
2060                event_type: EventType::UpdateComplete,
2061                event_result: EventResult::UpdateDeferred,
2062                previous_version: Some("1.2.3.4".to_string()),
2063                ..Event::default()
2064            };
2065            let apps = state_machine.app_set.lock().await.get_apps();
2066            request_builder = request_builder
2067                .add_event(&apps[0], event)
2068                .session_id(GUID::from_u128(0))
2069                .request_id(GUID::from_u128(2));
2070            assert_request(&state_machine.http, request_builder).await;
2071        });
2072    }
2073
2074    #[test]
2075    fn test_report_denied_by_policy() {
2076        block_on(async {
2077            let response = make_update_available_response();
2078            let http = MockHttpRequest::new(response);
2079            let policy_engine = MockPolicyEngine {
2080                update_decision: UpdateDecision::DeniedByPolicy,
2081                ..MockPolicyEngine::default()
2082            };
2083
2084            let mut state_machine = StateMachineBuilder::new_stub()
2085                .policy_engine(policy_engine)
2086                .http(http)
2087                .build()
2088                .await;
2089
2090            let (response, reboot_after_update) = state_machine
2091                .oneshot(RequestParams::default())
2092                .await
2093                .unwrap();
2094            assert_eq!(Action::DeniedByPolicy, response.app_responses[0].result);
2095            assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
2096
2097            let request_params = RequestParams::default();
2098            let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
2099            let event = Event {
2100                previous_version: Some("1.2.3.4".to_string()),
2101                ..Event::error(EventErrorCode::DeniedByPolicy)
2102            };
2103            let apps = state_machine.app_set.lock().await.get_apps();
2104            request_builder = request_builder
2105                .add_event(&apps[0], event)
2106                .session_id(GUID::from_u128(0))
2107                .request_id(GUID::from_u128(2));
2108            assert_request(&state_machine.http, request_builder).await;
2109        });
2110    }
2111
2112    #[test]
2113    fn test_wait_timer() {
2114        let mut pool = LocalPool::new();
2115        let mock_time = MockTimeSource::new_from_now();
2116        let next_update_time = mock_time.now() + Duration::from_secs(111);
2117        let (timer, mut timers) = BlockingTimer::new();
2118        let policy_engine = MockPolicyEngine {
2119            check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
2120            time_source: mock_time,
2121            ..MockPolicyEngine::default()
2122        };
2123
2124        let (_ctl, state_machine) = pool.run_until(
2125            StateMachineBuilder::new_stub()
2126                .policy_engine(policy_engine)
2127                .timer(timer)
2128                .start(),
2129        );
2130
2131        pool.spawner()
2132            .spawn_local(state_machine.map(|_| ()).collect())
2133            .unwrap();
2134
2135        // With otherwise stub implementations, the pool stalls when a timer is awaited.  Dropping
2136        // the state machine will panic if any timer durations were not used.
2137        let blocked_timer = pool.run_until(timers.next()).unwrap();
2138        assert_eq!(
2139            blocked_timer.requested_wait(),
2140            RequestedWait::Until(next_update_time.into())
2141        );
2142    }
2143
2144    #[test]
2145    fn test_cohort_and_user_counting_updates_are_used_in_subsequent_requests() {
2146        block_on(async {
2147            let response = json!({"response":{
2148                "server": "prod",
2149                "protocol": "3.0",
2150                "daystart": {
2151                  "elapsed_days": 1234567,
2152                  "elapsed_seconds": 3645
2153                },
2154                "app": [{
2155                  "appid": "{00000000-0000-0000-0000-000000000001}",
2156                  "status": "ok",
2157                  "cohort": "1",
2158                  "cohortname": "stable-channel",
2159                  "updatecheck": {
2160                    "status": "noupdate"
2161                  }
2162                }]
2163            }});
2164            let response = serde_json::to_vec(&response).unwrap();
2165            let mut http = MockHttpRequest::new(HttpResponse::new(response.clone()));
2166            http.add_response(HttpResponse::new(response));
2167            let apps = make_test_app_set();
2168
2169            let mut state_machine = StateMachineBuilder::new_stub()
2170                .http(http)
2171                .app_set(apps.clone())
2172                .build()
2173                .await;
2174
2175            // Run it the first time.
2176            state_machine.run_once().await;
2177
2178            let apps = apps.lock().await.get_apps();
2179            assert_eq!(Some("1".to_string()), apps[0].cohort.id);
2180            assert_eq!(None, apps[0].cohort.hint);
2181            assert_eq!(Some("stable-channel".to_string()), apps[0].cohort.name);
2182            assert_eq!(
2183                UserCounting::ClientRegulatedByDate(Some(1234567)),
2184                apps[0].user_counting
2185            );
2186
2187            // Run it the second time.
2188            state_machine.run_once().await;
2189
2190            let request_params = RequestParams::default();
2191            let expected_request_builder =
2192                RequestBuilder::new(&state_machine.config, &request_params)
2193                    .add_update_check(&apps[0])
2194                    .add_ping(&apps[0])
2195                    .session_id(GUID::from_u128(2))
2196                    .request_id(GUID::from_u128(3));
2197            // Check that the second update check used the new app.
2198            assert_request(&state_machine.http, expected_request_builder).await;
2199        });
2200    }
2201
2202    #[test]
2203    fn test_user_counting_returned() {
2204        block_on(async {
2205            let response = json!({"response":{
2206            "server": "prod",
2207            "protocol": "3.0",
2208            "daystart": {
2209              "elapsed_days": 1234567,
2210              "elapsed_seconds": 3645
2211            },
2212            "app": [{
2213              "appid": "{00000000-0000-0000-0000-000000000001}",
2214              "status": "ok",
2215              "cohort": "1",
2216              "cohortname": "stable-channel",
2217              "updatecheck": {
2218                "status": "noupdate"
2219                  }
2220              }]
2221            }});
2222            let response = serde_json::to_vec(&response).unwrap();
2223            let http = MockHttpRequest::new(HttpResponse::new(response));
2224
2225            let (response, reboot_after_update) = StateMachineBuilder::new_stub()
2226                .http(http)
2227                .oneshot(RequestParams::default())
2228                .await
2229                .unwrap();
2230
2231            assert_eq!(
2232                UserCounting::ClientRegulatedByDate(Some(1234567)),
2233                response.app_responses[0].user_counting
2234            );
2235            assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
2236        });
2237    }
2238
2239    #[test]
2240    fn test_observe_state() {
2241        block_on(async {
2242            let actual_states = StateMachineBuilder::new_stub()
2243                .oneshot_check()
2244                .await
2245                .filter_map(|event| {
2246                    future::ready(match event {
2247                        StateMachineEvent::StateChange(state) => Some(state),
2248                        _ => None,
2249                    })
2250                })
2251                .collect::<Vec<State>>()
2252                .await;
2253
2254            let expected_states = vec![
2255                State::CheckingForUpdates(InstallSource::ScheduledTask),
2256                State::ErrorCheckingForUpdate,
2257            ];
2258            assert_eq!(actual_states, expected_states);
2259        });
2260    }
2261
2262    #[test]
2263    fn test_observe_schedule() {
2264        block_on(async {
2265            let mock_time = MockTimeSource::new_from_now();
2266            let actual_schedules = StateMachineBuilder::new_stub()
2267                .policy_engine(StubPolicyEngine::new(&mock_time))
2268                .oneshot_check()
2269                .await
2270                .filter_map(|event| {
2271                    future::ready(match event {
2272                        StateMachineEvent::ScheduleChange(schedule) => Some(schedule),
2273                        _ => None,
2274                    })
2275                })
2276                .collect::<Vec<UpdateCheckSchedule>>()
2277                .await;
2278
2279            // The resultant schedule should only contain the timestamp of the above update check.
2280            let expected_schedule = UpdateCheckSchedule::builder()
2281                .last_update_time(mock_time.now())
2282                .last_update_check_time(mock_time.now())
2283                .build();
2284
2285            assert_eq!(actual_schedules, vec![expected_schedule]);
2286        });
2287    }
2288
2289    #[test]
2290    fn test_observe_protocol_state() {
2291        block_on(async {
2292            let actual_protocol_states = StateMachineBuilder::new_stub()
2293                .oneshot_check()
2294                .await
2295                .filter_map(|event| {
2296                    future::ready(match event {
2297                        StateMachineEvent::ProtocolStateChange(state) => Some(state),
2298                        _ => None,
2299                    })
2300                })
2301                .collect::<Vec<ProtocolState>>()
2302                .await;
2303
2304            let expected_protocol_state = ProtocolState {
2305                consecutive_failed_update_checks: 1,
2306                ..ProtocolState::default()
2307            };
2308
2309            assert_eq!(actual_protocol_states, vec![expected_protocol_state]);
2310        });
2311    }
2312
2313    #[test]
2314    fn test_observe_omaha_server_response() {
2315        block_on(async {
2316            let response = json!({"response":{
2317              "server": "prod",
2318              "protocol": "3.0",
2319              "app": [{
2320                "appid": "{00000000-0000-0000-0000-000000000001}",
2321                "status": "ok",
2322                "cohort": "1",
2323                "cohortname": "stable-channel",
2324                "updatecheck": {
2325                  "status": "noupdate"
2326                }
2327              }]
2328            }});
2329            let response = serde_json::to_vec(&response).unwrap();
2330            let expected_omaha_response = response::parse_json_response(&response).unwrap();
2331            let http = MockHttpRequest::new(HttpResponse::new(response));
2332
2333            let actual_omaha_response = StateMachineBuilder::new_stub()
2334                .http(http)
2335                .oneshot_check()
2336                .await
2337                .filter_map(|event| {
2338                    future::ready(match event {
2339                        StateMachineEvent::OmahaServerResponse(response) => Some(response),
2340                        _ => None,
2341                    })
2342                })
2343                .collect::<Vec<response::Response>>()
2344                .await;
2345
2346            assert_eq!(actual_omaha_response, vec![expected_omaha_response]);
2347        });
2348    }
2349
2350    #[test]
2351    fn test_metrics_report_omaha_event_lost() {
2352        block_on(async {
2353            // This is sufficient to trigger a lost Omaha event as oneshot triggers an
2354            // update check, which gets the invalid response (but hasn't checked the
2355            // validity yet). This invalid response still contains an OK status, resulting
2356            // in the UpdateCheckResponseTime and RequestsPerCheck events being generated
2357            // reporting success.
2358            //
2359            // The response is then parsed and found to be incorrect; this parse error is
2360            // attempted to be sent back to Omaha as an event with the ParseResponse error
2361            // associated. However, the MockHttpRequest has already consumed the one
2362            // response it knew how to give; this event is reported via HTTP, but is "lost"
2363            // because the mock responds with a 500 error when it has no responses left to
2364            // return.
2365            //
2366            // That finally results in the OmahaEventLost.
2367            let http = MockHttpRequest::new(HttpResponse::new("invalid response".into()));
2368            let mut metrics_reporter = MockMetricsReporter::new();
2369            let _response = StateMachineBuilder::new_stub()
2370                .http(http)
2371                .metrics_reporter(&mut metrics_reporter)
2372                .oneshot(RequestParams::default())
2373                .await;
2374
2375            // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice
2376            // patterns yet.
2377            #[rustfmt::skip]
2378            assert_matches!(
2379                metrics_reporter.metrics.as_slice(),
2380                [
2381                    Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
2382                    Metrics::RequestsPerCheck { count: 1, successful: true },
2383                    Metrics::OmahaEventLost(Event {
2384                        event_type: EventType::UpdateComplete,
2385                        event_result: EventResult::Error,
2386                        errorcode: Some(EventErrorCode::ParseResponse),
2387                        previous_version: None,
2388                        next_version: None,
2389                        download_time_ms: None,
2390                    })
2391                ]
2392            );
2393        });
2394    }
2395
2396    #[test]
2397    fn test_metrics_report_update_check_response_time() {
2398        block_on(async {
2399            let mut metrics_reporter = MockMetricsReporter::new();
2400            let _response = StateMachineBuilder::new_stub()
2401                .metrics_reporter(&mut metrics_reporter)
2402                .oneshot(RequestParams::default())
2403                .await;
2404
2405            // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice
2406            // patterns yet.
2407            #[rustfmt::skip]
2408            assert_matches!(
2409                metrics_reporter.metrics.as_slice(),
2410                [
2411                    Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
2412                    Metrics::RequestsPerCheck { count: 1, successful: true },
2413                ]
2414            );
2415        });
2416    }
2417
2418    #[test]
2419    fn test_metrics_report_update_check_response_time_on_failure() {
2420        block_on(async {
2421            let mut metrics_reporter = MockMetricsReporter::new();
2422            let mut http = MockHttpRequest::default();
2423
2424            for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS {
2425                http.add_error(http_request::mock_errors::make_transport_error());
2426            }
2427
2428            // Note: we exit the update loop before we fetch the successful result, so we never see
2429            // this result.
2430            http.add_response(hyper::Response::default());
2431
2432            let _response = StateMachineBuilder::new_stub()
2433                .http(http)
2434                .metrics_reporter(&mut metrics_reporter)
2435                .oneshot(RequestParams::default())
2436                .await;
2437
2438            // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice
2439            // patterns yet.
2440            #[rustfmt::skip]
2441            assert_matches!(
2442                metrics_reporter.metrics.as_slice(),
2443                [
2444                    Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
2445                    Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
2446                    Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
2447                    Metrics::RequestsPerCheck { count: 3, successful: false },
2448                ]
2449            );
2450        });
2451    }
2452
2453    #[test]
2454    fn test_metrics_report_update_check_response_time_on_failure_followed_by_success() {
2455        block_on(async {
2456            let mut metrics_reporter = MockMetricsReporter::new();
2457            let mut http = MockHttpRequest::default();
2458
2459            for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS - 1 {
2460                http.add_error(http_request::mock_errors::make_transport_error());
2461            }
2462            http.add_response(hyper::Response::default());
2463
2464            let _response = StateMachineBuilder::new_stub()
2465                .http(http)
2466                .metrics_reporter(&mut metrics_reporter)
2467                .oneshot(RequestParams::default())
2468                .await;
2469
2470            // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice
2471            // patterns yet.
2472            #[rustfmt::skip]
2473            assert_matches!(
2474                metrics_reporter.metrics.as_slice(),
2475                [
2476                    Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
2477                    Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
2478                    Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
2479                    Metrics::RequestsPerCheck { count: 3, successful: true },
2480                    Metrics::OmahaEventLost(Event {
2481                        event_type: EventType::UpdateComplete,
2482                        event_result: EventResult::Error,
2483                        errorcode: Some(EventErrorCode::ParseResponse),
2484                        previous_version: None,
2485                        next_version: None,
2486                        download_time_ms: None
2487                    }),
2488                ]
2489            );
2490        });
2491    }
2492
2493    #[test]
2494    fn test_metrics_report_requests_per_check() {
2495        block_on(async {
2496            let mut metrics_reporter = MockMetricsReporter::new();
2497            let _response = StateMachineBuilder::new_stub()
2498                .metrics_reporter(&mut metrics_reporter)
2499                .oneshot(RequestParams::default())
2500                .await;
2501
2502            assert!(metrics_reporter
2503                .metrics
2504                .contains(&Metrics::RequestsPerCheck {
2505                    count: 1,
2506                    successful: true
2507                }));
2508        });
2509    }
2510
2511    #[test]
2512    fn test_metrics_report_requests_per_check_on_failure_followed_by_success() {
2513        block_on(async {
2514            let mut metrics_reporter = MockMetricsReporter::new();
2515            let mut http = MockHttpRequest::default();
2516
2517            for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS - 1 {
2518                http.add_error(http_request::mock_errors::make_transport_error());
2519            }
2520
2521            http.add_response(hyper::Response::default());
2522
2523            let _response = StateMachineBuilder::new_stub()
2524                .http(http)
2525                .metrics_reporter(&mut metrics_reporter)
2526                .oneshot(RequestParams::default())
2527                .await;
2528
2529            assert!(!metrics_reporter.metrics.is_empty());
2530            assert!(metrics_reporter
2531                .metrics
2532                .contains(&Metrics::RequestsPerCheck {
2533                    count: MAX_OMAHA_REQUEST_ATTEMPTS,
2534                    successful: true
2535                }));
2536        });
2537    }
2538
2539    #[test]
2540    fn test_metrics_report_requests_per_check_on_failure() {
2541        block_on(async {
2542            let mut metrics_reporter = MockMetricsReporter::new();
2543            let mut http = MockHttpRequest::default();
2544
2545            for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS {
2546                http.add_error(http_request::mock_errors::make_transport_error());
2547            }
2548
2549            // Note we will give up before we get this successful request.
2550            http.add_response(hyper::Response::default());
2551
2552            let _response = StateMachineBuilder::new_stub()
2553                .http(http)
2554                .metrics_reporter(&mut metrics_reporter)
2555                .oneshot(RequestParams::default())
2556                .await;
2557
2558            assert!(!metrics_reporter.metrics.is_empty());
2559            assert!(metrics_reporter
2560                .metrics
2561                .contains(&Metrics::RequestsPerCheck {
2562                    count: MAX_OMAHA_REQUEST_ATTEMPTS,
2563                    successful: false
2564                }));
2565        });
2566    }
2567
2568    #[test]
2569    fn test_requests_per_check_backoff_with_mock_timer() {
2570        block_on(async {
2571            let mut timer = MockTimer::new();
2572            timer.expect_for_range(Duration::from_millis(500), Duration::from_millis(1500));
2573            timer.expect_for_range(Duration::from_millis(1500), Duration::from_millis(2500));
2574            let requested_waits = timer.get_requested_waits_view();
2575            let response = StateMachineBuilder::new_stub()
2576                .http(MockHttpRequest::empty())
2577                .timer(timer)
2578                .oneshot(RequestParams::default())
2579                .await;
2580
2581            let waits = requested_waits.borrow();
2582            assert_eq!(waits.len(), 2);
2583            assert_matches!(
2584                waits[0],
2585                RequestedWait::For(d) if d >= Duration::from_millis(500) && d <= Duration::from_millis(1500)
2586            );
2587            assert_matches!(
2588                waits[1],
2589                RequestedWait::For(d) if d >= Duration::from_millis(1500) && d <= Duration::from_millis(2500)
2590            );
2591
2592            assert_matches!(
2593                response,
2594                Err(UpdateCheckError::OmahaRequest(
2595                    OmahaRequestError::HttpStatus(_)
2596                ))
2597            );
2598        });
2599    }
2600
2601    #[test]
2602    fn test_metrics_report_update_check_failure_reason_omaha() {
2603        block_on(async {
2604            let mut metrics_reporter = MockMetricsReporter::new();
2605            let mut state_machine = StateMachineBuilder::new_stub()
2606                .metrics_reporter(&mut metrics_reporter)
2607                .build()
2608                .await;
2609
2610            state_machine.run_once().await;
2611
2612            assert!(metrics_reporter
2613                .metrics
2614                .contains(&Metrics::UpdateCheckFailureReason(
2615                    UpdateCheckFailureReason::Omaha
2616                )));
2617        });
2618    }
2619
2620    #[test]
2621    fn test_metrics_report_update_check_failure_reason_network() {
2622        block_on(async {
2623            let mut metrics_reporter = MockMetricsReporter::new();
2624            let mut state_machine = StateMachineBuilder::new_stub()
2625                .http(MockHttpRequest::empty())
2626                .metrics_reporter(&mut metrics_reporter)
2627                .build()
2628                .await;
2629
2630            state_machine.run_once().await;
2631
2632            assert!(metrics_reporter
2633                .metrics
2634                .contains(&Metrics::UpdateCheckFailureReason(
2635                    UpdateCheckFailureReason::Network
2636                )));
2637        });
2638    }
2639
2640    #[test]
2641    fn test_persist_last_update_time() {
2642        block_on(async {
2643            let storage = Rc::new(Mutex::new(MemStorage::new()));
2644
2645            StateMachineBuilder::new_stub()
2646                .storage(Rc::clone(&storage))
2647                .oneshot_check()
2648                .await
2649                .map(|_| ())
2650                .collect::<()>()
2651                .await;
2652
2653            let storage = storage.lock().await;
2654            storage.get_int(LAST_UPDATE_TIME).await.unwrap();
2655            assert!(storage.committed());
2656        });
2657    }
2658
2659    #[test]
2660    fn test_persist_server_dictated_poll_interval() {
2661        block_on(async {
2662            let response = HttpResponse::builder()
2663                .header(X_RETRY_AFTER, 1234)
2664                .body(make_noupdate_httpresponse())
2665                .unwrap();
2666            let http = MockHttpRequest::new(response);
2667            let storage = Rc::new(Mutex::new(MemStorage::new()));
2668
2669            let mut state_machine = StateMachineBuilder::new_stub()
2670                .http(http)
2671                .storage(Rc::clone(&storage))
2672                .build()
2673                .await;
2674            state_machine
2675                .oneshot(RequestParams::default())
2676                .await
2677                .unwrap();
2678
2679            assert_eq!(
2680                state_machine.context.state.server_dictated_poll_interval,
2681                Some(Duration::from_secs(1234))
2682            );
2683
2684            let storage = storage.lock().await;
2685            assert_eq!(
2686                storage.get_int(SERVER_DICTATED_POLL_INTERVAL).await,
2687                Some(1234000000)
2688            );
2689            assert!(storage.committed());
2690        });
2691    }
2692
2693    #[test]
2694    fn test_persist_server_dictated_poll_interval_http_error() {
2695        block_on(async {
2696            let response = HttpResponse::builder()
2697                .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
2698                .header(X_RETRY_AFTER, 1234)
2699                .body(vec![])
2700                .unwrap();
2701            let http = MockHttpRequest::new(response);
2702            let storage = Rc::new(Mutex::new(MemStorage::new()));
2703
2704            let mut state_machine = StateMachineBuilder::new_stub()
2705                .http(http)
2706                .storage(Rc::clone(&storage))
2707                .build()
2708                .await;
2709            assert_matches!(
2710                state_machine.oneshot(RequestParams::default()).await,
2711                Err(UpdateCheckError::OmahaRequest(
2712                    OmahaRequestError::HttpStatus(_)
2713                ))
2714            );
2715
2716            assert_eq!(
2717                state_machine.context.state.server_dictated_poll_interval,
2718                Some(Duration::from_secs(1234))
2719            );
2720
2721            let storage = storage.lock().await;
2722            assert_eq!(
2723                storage.get_int(SERVER_DICTATED_POLL_INTERVAL).await,
2724                Some(1234000000)
2725            );
2726            assert!(storage.committed());
2727        });
2728    }
2729
2730    #[test]
2731    fn test_persist_server_dictated_poll_interval_max_duration() {
2732        block_on(async {
2733            let response = HttpResponse::builder()
2734                .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
2735                .header(X_RETRY_AFTER, 123456789)
2736                .body(vec![])
2737                .unwrap();
2738            let http = MockHttpRequest::new(response);
2739            let storage = Rc::new(Mutex::new(MemStorage::new()));
2740
2741            let mut state_machine = StateMachineBuilder::new_stub()
2742                .http(http)
2743                .storage(Rc::clone(&storage))
2744                .build()
2745                .await;
2746            assert_matches!(
2747                state_machine.oneshot(RequestParams::default()).await,
2748                Err(UpdateCheckError::OmahaRequest(
2749                    OmahaRequestError::HttpStatus(_)
2750                ))
2751            );
2752
2753            assert_eq!(
2754                state_machine.context.state.server_dictated_poll_interval,
2755                Some(Duration::from_secs(86400))
2756            );
2757
2758            let storage = storage.lock().await;
2759            assert_eq!(
2760                storage.get_int(SERVER_DICTATED_POLL_INTERVAL).await,
2761                Some(86400000000)
2762            );
2763            assert!(storage.committed());
2764        });
2765    }
2766
2767    #[test]
2768    fn test_server_dictated_poll_interval_with_transport_error_no_retry() {
2769        block_on(async {
2770            let mut http = MockHttpRequest::empty();
2771            http.add_error(http_request::mock_errors::make_transport_error());
2772            let mut storage = MemStorage::new();
2773            let _ = storage.set_int(SERVER_DICTATED_POLL_INTERVAL, 1234000000);
2774            let _ = storage.commit();
2775            let storage = Rc::new(Mutex::new(storage));
2776
2777            let mut state_machine = StateMachineBuilder::new_stub()
2778                .http(http)
2779                .storage(Rc::clone(&storage))
2780                .build()
2781                .await;
2782            // This verifies that state machine does not retry because MockHttpRequest will only
2783            // return the transport error on the first request, any additional requests will get
2784            // HttpStatus error.
2785            assert_matches!(
2786                state_machine.oneshot(RequestParams::default()).await,
2787                Err(UpdateCheckError::OmahaRequest(
2788                    OmahaRequestError::HttpTransport(_)
2789                ))
2790            );
2791
2792            assert_eq!(
2793                state_machine.context.state.server_dictated_poll_interval,
2794                Some(Duration::from_secs(1234))
2795            );
2796        });
2797    }
2798
2799    #[test]
2800    fn test_persist_app() {
2801        block_on(async {
2802            let storage = Rc::new(Mutex::new(MemStorage::new()));
2803            let app_set = make_test_app_set();
2804
2805            StateMachineBuilder::new_stub()
2806                .storage(Rc::clone(&storage))
2807                .app_set(app_set.clone())
2808                .oneshot_check()
2809                .await
2810                .map(|_| ())
2811                .collect::<()>()
2812                .await;
2813
2814            let storage = storage.lock().await;
2815            let apps = app_set.lock().await.get_apps();
2816            storage.get_string(&apps[0].id).await.unwrap();
2817            assert!(storage.committed());
2818        });
2819    }
2820
2821    #[test]
2822    fn test_load_last_update_time() {
2823        block_on(async {
2824            let mut storage = MemStorage::new();
2825            let mut mock_time = MockTimeSource::new_from_now();
2826            mock_time.truncate_submicrosecond_walltime();
2827            let last_update_time = mock_time.now_in_walltime() - Duration::from_secs(999);
2828            storage
2829                .set_time(LAST_UPDATE_TIME, last_update_time)
2830                .await
2831                .unwrap();
2832
2833            let state_machine = StateMachineBuilder::new_stub()
2834                .policy_engine(StubPolicyEngine::new(&mock_time))
2835                .storage(Rc::new(Mutex::new(storage)))
2836                .build()
2837                .await;
2838
2839            assert_eq!(
2840                state_machine.context.schedule.last_update_time.unwrap(),
2841                PartialComplexTime::Wall(last_update_time)
2842            );
2843        });
2844    }
2845
2846    #[test]
2847    fn test_load_server_dictated_poll_interval() {
2848        block_on(async {
2849            let mut storage = MemStorage::new();
2850            storage
2851                .set_int(SERVER_DICTATED_POLL_INTERVAL, 56789)
2852                .await
2853                .unwrap();
2854
2855            let state_machine = StateMachineBuilder::new_stub()
2856                .storage(Rc::new(Mutex::new(storage)))
2857                .build()
2858                .await;
2859
2860            assert_eq!(
2861                Some(Duration::from_micros(56789)),
2862                state_machine.context.state.server_dictated_poll_interval
2863            );
2864        });
2865    }
2866
2867    #[test]
2868    fn test_load_app() {
2869        block_on(async {
2870            let app_set = VecAppSet::new(vec![App::builder()
2871                .id("{00000000-0000-0000-0000-000000000001}")
2872                .version([1, 2, 3, 4])
2873                .build()]);
2874            let mut storage = MemStorage::new();
2875            let persisted_app = PersistedApp {
2876                cohort: Cohort {
2877                    id: Some("cohort_id".to_string()),
2878                    hint: Some("test_channel".to_string()),
2879                    name: None,
2880                },
2881                user_counting: UserCounting::ClientRegulatedByDate(Some(22222)),
2882            };
2883            let json = serde_json::to_string(&persisted_app).unwrap();
2884            let apps = app_set.get_apps();
2885            storage.set_string(&apps[0].id, &json).await.unwrap();
2886
2887            let app_set = Rc::new(Mutex::new(app_set));
2888
2889            let _state_machine = StateMachineBuilder::new_stub()
2890                .storage(Rc::new(Mutex::new(storage)))
2891                .app_set(Rc::clone(&app_set))
2892                .build()
2893                .await;
2894
2895            let apps = app_set.lock().await.get_apps();
2896            assert_eq!(persisted_app.cohort, apps[0].cohort);
2897            assert_eq!(
2898                UserCounting::ClientRegulatedByDate(Some(22222)),
2899                apps[0].user_counting
2900            );
2901        });
2902    }
2903
2904    #[test]
2905    fn test_report_check_interval_with_no_storage() {
2906        block_on(async {
2907            let mut mock_time = MockTimeSource::new_from_now();
2908            let mut state_machine = StateMachineBuilder::new_stub()
2909                .policy_engine(StubPolicyEngine::new(mock_time.clone()))
2910                .metrics_reporter(MockMetricsReporter::new())
2911                .build()
2912                .await;
2913
2914            state_machine
2915                .report_check_interval(InstallSource::ScheduledTask)
2916                .await;
2917            // No metrics should be reported because no LAST_UPDATE_TIME in storage.
2918            assert!(state_machine.metrics_reporter.metrics.is_empty());
2919
2920            // A second update check should report metrics.
2921            let interval = Duration::from_micros(999999);
2922            mock_time.advance(interval);
2923
2924            state_machine
2925                .report_check_interval(InstallSource::ScheduledTask)
2926                .await;
2927
2928            assert_eq!(
2929                state_machine.metrics_reporter.metrics,
2930                vec![Metrics::UpdateCheckInterval {
2931                    interval,
2932                    clock: ClockType::Monotonic,
2933                    install_source: InstallSource::ScheduledTask,
2934                }]
2935            );
2936        });
2937    }
2938
2939    #[test]
2940    fn test_report_check_interval_mono_transition() {
2941        block_on(async {
2942            let mut mock_time = MockTimeSource::new_from_now();
2943            let mut state_machine = StateMachineBuilder::new_stub()
2944                .policy_engine(StubPolicyEngine::new(mock_time.clone()))
2945                .metrics_reporter(MockMetricsReporter::new())
2946                .build()
2947                .await;
2948
2949            // Make sure that, provided a wall time, we get an initial report
2950            // using the wall time.
2951            let initial_duration = Duration::from_secs(999);
2952            let initial_time = mock_time.now_in_walltime() - initial_duration;
2953            state_machine.context.schedule.last_update_check_time =
2954                Some(PartialComplexTime::Wall(initial_time));
2955            state_machine
2956                .report_check_interval(InstallSource::ScheduledTask)
2957                .await;
2958
2959            // Advance one more time, and this time we should see a monotonic delta.
2960            let interval = Duration::from_micros(999999);
2961            mock_time.advance(interval);
2962            state_machine
2963                .report_check_interval(InstallSource::ScheduledTask)
2964                .await;
2965
2966            // One final time, to demonstrate monotonic time edges to
2967            // monotonic time.
2968            mock_time.advance(interval);
2969            state_machine
2970                .report_check_interval(InstallSource::ScheduledTask)
2971                .await;
2972            assert_eq!(
2973                state_machine.metrics_reporter.metrics,
2974                vec![
2975                    Metrics::UpdateCheckInterval {
2976                        interval: initial_duration,
2977                        clock: ClockType::Wall,
2978                        install_source: InstallSource::ScheduledTask,
2979                    },
2980                    Metrics::UpdateCheckInterval {
2981                        interval,
2982                        clock: ClockType::Monotonic,
2983                        install_source: InstallSource::ScheduledTask,
2984                    },
2985                    Metrics::UpdateCheckInterval {
2986                        interval,
2987                        clock: ClockType::Monotonic,
2988                        install_source: InstallSource::ScheduledTask,
2989                    },
2990                ]
2991            );
2992        });
2993    }
2994
2995    #[derive(Debug)]
2996    pub struct TestInstaller {
2997        reboot_called: Rc<RefCell<bool>>,
2998        install_fails: usize,
2999        mock_time: MockTimeSource,
3000    }
3001    struct TestInstallerBuilder {
3002        install_fails: usize,
3003        mock_time: MockTimeSource,
3004    }
3005    impl TestInstaller {
3006        fn builder(mock_time: MockTimeSource) -> TestInstallerBuilder {
3007            TestInstallerBuilder {
3008                install_fails: 0,
3009                mock_time,
3010            }
3011        }
3012    }
3013    impl TestInstallerBuilder {
3014        fn add_install_fail(mut self) -> Self {
3015            self.install_fails += 1;
3016            self
3017        }
3018        fn build(self) -> TestInstaller {
3019            TestInstaller {
3020                reboot_called: Rc::new(RefCell::new(false)),
3021                install_fails: self.install_fails,
3022                mock_time: self.mock_time,
3023            }
3024        }
3025    }
3026    const INSTALL_DURATION: Duration = Duration::from_micros(98765433);
3027
3028    impl Installer for TestInstaller {
3029        type InstallPlan = StubPlan;
3030        type Error = StubInstallErrors;
3031        type InstallResult = ();
3032
3033        fn perform_install<'a>(
3034            &'a mut self,
3035            _install_plan: &StubPlan,
3036            observer: Option<&'a dyn ProgressObserver>,
3037        ) -> LocalBoxFuture<'a, (Self::InstallResult, Vec<AppInstallResult<Self::Error>>)> {
3038            if self.install_fails > 0 {
3039                self.install_fails -= 1;
3040                future::ready((
3041                    (),
3042                    vec![AppInstallResult::Failed(StubInstallErrors::Failed)],
3043                ))
3044                .boxed()
3045            } else {
3046                self.mock_time.advance(INSTALL_DURATION);
3047                async move {
3048                    if let Some(observer) = observer {
3049                        observer.receive_progress(None, 0.0, None, None).await;
3050                        observer.receive_progress(None, 0.3, None, None).await;
3051                        observer.receive_progress(None, 0.9, None, None).await;
3052                        observer.receive_progress(None, 1.0, None, None).await;
3053                    }
3054                    ((), vec![AppInstallResult::Installed])
3055                }
3056                .boxed_local()
3057            }
3058        }
3059
3060        fn perform_reboot(&mut self) -> LocalBoxFuture<'_, Result<(), anyhow::Error>> {
3061            self.reboot_called.replace(true);
3062            future::ready(Ok(())).boxed_local()
3063        }
3064
3065        fn try_create_install_plan<'a>(
3066            &'a self,
3067            _request_params: &'a RequestParams,
3068            _request_metadata: Option<&'a RequestMetadata>,
3069            _response: &'a Response,
3070            _response_bytes: Vec<u8>,
3071            _ecdsa_signature: Option<Vec<u8>>,
3072        ) -> LocalBoxFuture<'a, Result<Self::InstallPlan, Self::Error>> {
3073            future::ready(Ok(StubPlan)).boxed_local()
3074        }
3075    }
3076
3077    #[test]
3078    fn test_report_successful_update_duration() {
3079        block_on(async {
3080            let http = MockHttpRequest::new(make_update_available_response());
3081            let storage = Rc::new(Mutex::new(MemStorage::new()));
3082
3083            let mut mock_time = MockTimeSource::new_from_now();
3084            mock_time.truncate_submicrosecond_walltime();
3085            let now = mock_time.now();
3086
3087            let update_completed_time = now + INSTALL_DURATION;
3088            let expected_update_duration = update_completed_time.wall_duration_since(now).unwrap();
3089
3090            let first_seen_time = now - Duration::from_micros(1000);
3091
3092            let expected_duration_since_first_seen = update_completed_time
3093                .wall_duration_since(first_seen_time)
3094                .unwrap();
3095
3096            let mut state_machine = StateMachineBuilder::new_stub()
3097                .http(http)
3098                .installer(TestInstaller::builder(mock_time.clone()).build())
3099                .policy_engine(StubPolicyEngine::new(mock_time.clone()))
3100                .metrics_reporter(MockMetricsReporter::new())
3101                .storage(Rc::clone(&storage))
3102                .build()
3103                .await;
3104
3105            {
3106                let mut storage = storage.lock().await;
3107                storage.set_string(INSTALL_PLAN_ID, "").await.unwrap();
3108                storage
3109                    .set_time(UPDATE_FIRST_SEEN_TIME, first_seen_time)
3110                    .await
3111                    .unwrap();
3112                storage.commit().await.unwrap();
3113            }
3114
3115            state_machine.run_once().await;
3116
3117            #[rustfmt::skip]
3118            assert_matches!(
3119                state_machine.metrics_reporter.metrics.as_slice(),
3120                [
3121                    Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
3122                    Metrics::RequestsPerCheck { count: 1, successful: true },
3123                    Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadStarted, event_result: EventResult::Success, .. }),
3124                    Metrics::SuccessfulUpdateDuration(install_duration),
3125                    Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadFinished, event_result: EventResult::Success, .. }),
3126                    Metrics::OmahaEventLost(Event { event_type: EventType::UpdateComplete, event_result: EventResult::Success, .. }),
3127                    Metrics::SuccessfulUpdateFromFirstSeen(duration_since_first_seen),
3128                    Metrics::AttemptsToSuccessfulCheck(1),
3129                    Metrics::AttemptsToSuccessfulInstall { count: 1, successful: true },
3130                ]
3131                if
3132                    *install_duration == expected_update_duration &&
3133                    *duration_since_first_seen == expected_duration_since_first_seen
3134            );
3135        });
3136    }
3137
3138    #[test]
3139    fn test_report_failed_update_duration() {
3140        block_on(async {
3141            let http = MockHttpRequest::new(make_update_available_response());
3142            let mut state_machine = StateMachineBuilder::new_stub()
3143                .http(http)
3144                .installer(StubInstaller { should_fail: true })
3145                .metrics_reporter(MockMetricsReporter::new())
3146                .build()
3147                .await;
3148            // clock::mock::set(time::i64_to_time(123456789));
3149
3150            state_machine.run_once().await;
3151
3152            assert!(state_machine
3153                .metrics_reporter
3154                .metrics
3155                .contains(&Metrics::FailedUpdateDuration(Duration::from_micros(0))));
3156        });
3157    }
3158
3159    #[test]
3160    fn test_record_update_first_seen_time() {
3161        block_on(async {
3162            let storage = Rc::new(Mutex::new(MemStorage::new()));
3163            let mut state_machine = StateMachineBuilder::new_stub()
3164                .storage(Rc::clone(&storage))
3165                .build()
3166                .await;
3167
3168            let mut mock_time = MockTimeSource::new_from_now();
3169            mock_time.truncate_submicrosecond_walltime();
3170            let now = mock_time.now_in_walltime();
3171            assert_eq!(
3172                state_machine.record_update_first_seen_time("id", now).await,
3173                now
3174            );
3175            {
3176                let storage = storage.lock().await;
3177                assert_eq!(
3178                    storage.get_string(INSTALL_PLAN_ID).await,
3179                    Some("id".to_string())
3180                );
3181                assert_eq!(storage.get_time(UPDATE_FIRST_SEEN_TIME).await, Some(now));
3182                assert_eq!(storage.len(), 2);
3183                assert!(storage.committed());
3184            }
3185
3186            mock_time.advance(Duration::from_secs(1000));
3187            let now2 = mock_time.now_in_walltime();
3188            assert_eq!(
3189                state_machine
3190                    .record_update_first_seen_time("id", now2)
3191                    .await,
3192                now
3193            );
3194            {
3195                let storage = storage.lock().await;
3196                assert_eq!(
3197                    storage.get_string(INSTALL_PLAN_ID).await,
3198                    Some("id".to_string())
3199                );
3200                assert_eq!(storage.get_time(UPDATE_FIRST_SEEN_TIME).await, Some(now));
3201                assert_eq!(storage.len(), 2);
3202                assert!(storage.committed());
3203            }
3204            assert_eq!(
3205                state_machine
3206                    .record_update_first_seen_time("id2", now2)
3207                    .await,
3208                now2
3209            );
3210            {
3211                let storage = storage.lock().await;
3212                assert_eq!(
3213                    storage.get_string(INSTALL_PLAN_ID).await,
3214                    Some("id2".to_string())
3215                );
3216                assert_eq!(storage.get_time(UPDATE_FIRST_SEEN_TIME).await, Some(now2));
3217                assert_eq!(storage.len(), 2);
3218                assert!(storage.committed());
3219            }
3220        });
3221    }
3222
3223    #[test]
3224    fn test_report_attempts_to_successful_check() {
3225        block_on(async {
3226            let storage = Rc::new(Mutex::new(MemStorage::new()));
3227            let mut state_machine = StateMachineBuilder::new_stub()
3228                .installer(StubInstaller { should_fail: true })
3229                .metrics_reporter(MockMetricsReporter::new())
3230                .storage(Rc::clone(&storage))
3231                .build()
3232                .await;
3233
3234            state_machine
3235                .report_attempts_to_successful_check(true)
3236                .await;
3237
3238            // consecutive_failed_update_attempts should be zero (there were no previous failures)
3239            // but we should record an attempt in metrics
3240            assert_eq!(
3241                state_machine.context.state.consecutive_failed_update_checks,
3242                0
3243            );
3244            assert_eq!(
3245                state_machine.metrics_reporter.metrics,
3246                vec![Metrics::AttemptsToSuccessfulCheck(1)]
3247            );
3248
3249            state_machine
3250                .report_attempts_to_successful_check(false)
3251                .await;
3252            assert_eq!(
3253                state_machine.context.state.consecutive_failed_update_checks,
3254                1
3255            );
3256
3257            state_machine
3258                .report_attempts_to_successful_check(false)
3259                .await;
3260            assert_eq!(
3261                state_machine.context.state.consecutive_failed_update_checks,
3262                2
3263            );
3264
3265            // consecutive_failed_update_attempts should be reset to zero on success
3266            // but we should record the previous number of failed attempts (2) + 1 in metrics
3267            state_machine
3268                .report_attempts_to_successful_check(true)
3269                .await;
3270            assert_eq!(
3271                state_machine.context.state.consecutive_failed_update_checks,
3272                0
3273            );
3274            assert_eq!(
3275                state_machine.metrics_reporter.metrics,
3276                vec![
3277                    Metrics::AttemptsToSuccessfulCheck(1),
3278                    Metrics::AttemptsToSuccessfulCheck(3)
3279                ]
3280            );
3281        });
3282    }
3283
3284    #[test]
3285    fn test_ping_omaha_updates_consecutive_failed_update_checks_and_persists() {
3286        block_on(async {
3287            let mut http = MockHttpRequest::empty();
3288            http.add_error(http_request::mock_errors::make_transport_error());
3289            http.add_response(HttpResponse::new(vec![]));
3290            let response = json!({"response":{
3291              "server": "prod",
3292              "protocol": "3.0",
3293              "app": [{
3294                "appid": "{00000000-0000-0000-0000-000000000001}",
3295                "status": "ok",
3296              }],
3297            }});
3298            let response = serde_json::to_vec(&response).unwrap();
3299            http.add_response(HttpResponse::new(response));
3300
3301            let storage = Rc::new(Mutex::new(MemStorage::new()));
3302
3303            // Start out with a value in storage...
3304            {
3305                let mut storage = storage.lock().await;
3306                let _ = storage.set_int(CONSECUTIVE_FAILED_UPDATE_CHECKS, 1);
3307                let _ = storage.commit();
3308            }
3309
3310            let mut state_machine = StateMachineBuilder::new_stub()
3311                .storage(Rc::clone(&storage))
3312                .http(http)
3313                .build()
3314                .await;
3315
3316            async_generator::generate(move |mut co| async move {
3317                // Failed ping increases `consecutive_failed_update_checks`, adding the value from
3318                // storage.
3319                state_machine.ping_omaha(&mut co).await;
3320                assert_eq!(
3321                    state_machine.context.state.consecutive_failed_update_checks,
3322                    2
3323                );
3324                {
3325                    let storage = storage.lock().await;
3326                    assert_eq!(
3327                        storage.get_int(CONSECUTIVE_FAILED_UPDATE_CHECKS).await,
3328                        Some(2)
3329                    );
3330                }
3331
3332                state_machine.ping_omaha(&mut co).await;
3333                assert_eq!(
3334                    state_machine.context.state.consecutive_failed_update_checks,
3335                    3
3336                );
3337                {
3338                    let storage = storage.lock().await;
3339                    assert_eq!(
3340                        storage.get_int(CONSECUTIVE_FAILED_UPDATE_CHECKS).await,
3341                        Some(3)
3342                    );
3343                }
3344
3345                // Successful ping resets `consecutive_failed_update_checks`.
3346                state_machine.ping_omaha(&mut co).await;
3347                assert_eq!(
3348                    state_machine.context.state.consecutive_failed_update_checks,
3349                    0
3350                );
3351                {
3352                    let storage = storage.lock().await;
3353                    assert_eq!(
3354                        storage.get_int(CONSECUTIVE_FAILED_UPDATE_CHECKS).await,
3355                        None
3356                    );
3357                }
3358            })
3359            .into_complete()
3360            .await;
3361        });
3362    }
3363
3364    #[test]
3365    fn test_report_attempts_to_successful_install() {
3366        block_on(async {
3367            let http = MockHttpRequest::new(make_update_available_response());
3368            let storage = Rc::new(Mutex::new(MemStorage::new()));
3369
3370            let mock_time = MockTimeSource::new_from_now();
3371
3372            let mut state_machine = StateMachineBuilder::new_stub()
3373                .http(http)
3374                .installer(TestInstaller::builder(mock_time.clone()).build())
3375                .policy_engine(StubPolicyEngine::new(mock_time.clone()))
3376                .metrics_reporter(MockMetricsReporter::new())
3377                .storage(Rc::clone(&storage))
3378                .build()
3379                .await;
3380
3381            state_machine.run_once().await;
3382
3383            // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice
3384            // patterns yet.
3385            #[rustfmt::skip]
3386            assert_matches!(
3387                state_machine.metrics_reporter.metrics.as_slice(),
3388                [
3389                    Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
3390                    Metrics::RequestsPerCheck { count: 1, successful: true },
3391                    Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadStarted, event_result: EventResult::Success, .. }),
3392                    Metrics::SuccessfulUpdateDuration(_),
3393                    Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadFinished, event_result: EventResult::Success, .. }),
3394                    Metrics::OmahaEventLost(Event { event_type: EventType::UpdateComplete, event_result: EventResult::Success, .. }),
3395                    Metrics::SuccessfulUpdateFromFirstSeen(_),
3396                    Metrics::AttemptsToSuccessfulCheck(1),
3397                    Metrics::AttemptsToSuccessfulInstall { count: 1, successful: true },
3398                ]
3399            );
3400        });
3401    }
3402
3403    #[test]
3404    fn test_report_attempts_to_successful_install_fails_then_succeeds() {
3405        block_on(async {
3406            let mut http = MockHttpRequest::new(make_update_available_response());
3407            // Responses to events. This first batch corresponds to the install failure, so these
3408            // should be the update download started, and another for a failed install.
3409            // `Event::error(EventErrorCode::Installation)`.
3410            http.add_response(HttpResponse::new(vec![]));
3411            http.add_response(HttpResponse::new(vec![]));
3412
3413            // Respond to the next request.
3414            http.add_response(make_update_available_response());
3415            // Responses to events. This corresponds to the update download started, and the other
3416            // for a successful install.
3417            http.add_response(HttpResponse::new(vec![]));
3418            http.add_response(HttpResponse::new(vec![]));
3419
3420            let storage = Rc::new(Mutex::new(MemStorage::new()));
3421            let mock_time = MockTimeSource::new_from_now();
3422
3423            let mut state_machine = StateMachineBuilder::new_stub()
3424                .http(http)
3425                .installer(
3426                    TestInstaller::builder(mock_time.clone())
3427                        .add_install_fail()
3428                        .build(),
3429                )
3430                .policy_engine(StubPolicyEngine::new(mock_time.clone()))
3431                .metrics_reporter(MockMetricsReporter::new())
3432                .storage(Rc::clone(&storage))
3433                .build()
3434                .await;
3435
3436            state_machine.run_once().await;
3437            state_machine.run_once().await;
3438
3439            // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice
3440            // patterns yet.
3441            #[rustfmt::skip]
3442            assert_matches!(
3443                state_machine.metrics_reporter.metrics.as_slice(),
3444                [
3445                    Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
3446                    Metrics::RequestsPerCheck { count: 1, successful: true },
3447                    Metrics::FailedUpdateDuration(_),
3448                    Metrics::AttemptsToSuccessfulCheck(1),
3449                    Metrics::AttemptsToSuccessfulInstall { count: 1, successful: false },
3450                    Metrics::UpdateCheckInterval { .. },
3451                    Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
3452                    Metrics::RequestsPerCheck { count: 1, successful: true },
3453                    Metrics::SuccessfulUpdateDuration(_),
3454                    Metrics::OmahaEventLost(Event { .. }),
3455                    Metrics::SuccessfulUpdateFromFirstSeen(_),
3456                    Metrics::AttemptsToSuccessfulCheck(1),
3457                    Metrics::AttemptsToSuccessfulInstall { count: 2, successful: true }
3458                ]
3459            );
3460        });
3461    }
3462
3463    #[test]
3464    fn test_report_attempts_to_successful_install_does_not_report_for_no_update() {
3465        block_on(async {
3466            let response = json!({"response":{
3467              "server": "prod",
3468              "protocol": "3.0",
3469              "app": [{
3470                "appid": "{00000000-0000-0000-0000-000000000001}",
3471                "status": "ok",
3472                "updatecheck": {
3473                  "status": "noupdate",
3474                  "info": "no update for you"
3475                }
3476              }],
3477            }});
3478            let response = serde_json::to_vec(&response).unwrap();
3479            let http = MockHttpRequest::new(HttpResponse::new(response.clone()));
3480
3481            let storage = Rc::new(Mutex::new(MemStorage::new()));
3482            let mock_time = MockTimeSource::new_from_now();
3483
3484            let mut state_machine = StateMachineBuilder::new_stub()
3485                .http(http)
3486                .installer(TestInstaller::builder(mock_time.clone()).build())
3487                .policy_engine(StubPolicyEngine::new(mock_time.clone()))
3488                .metrics_reporter(MockMetricsReporter::new())
3489                .storage(Rc::clone(&storage))
3490                .build()
3491                .await;
3492
3493            state_machine.run_once().await;
3494
3495            // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice
3496            // patterns yet.
3497            #[rustfmt::skip]
3498            assert_matches!(
3499                state_machine.metrics_reporter.metrics.as_slice(),
3500                [
3501                    Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
3502                    Metrics::RequestsPerCheck { count: 1, successful: true },
3503                    Metrics::AttemptsToSuccessfulCheck(1),
3504                ]
3505            );
3506        });
3507    }
3508
3509    #[test]
3510    fn test_successful_update_triggers_reboot() {
3511        let mut pool = LocalPool::new();
3512        let spawner = pool.spawner();
3513
3514        let http = MockHttpRequest::new(make_update_available_response());
3515        let mock_time = MockTimeSource::new_from_now();
3516        let next_update_time = mock_time.now();
3517        let (timer, mut timers) = BlockingTimer::new();
3518
3519        let installer = TestInstaller::builder(mock_time.clone()).build();
3520        let reboot_called = Rc::clone(&installer.reboot_called);
3521        let (_ctl, state_machine) = pool.run_until(
3522            StateMachineBuilder::new_stub()
3523                .http(http)
3524                .installer(installer)
3525                .policy_engine(StubPolicyEngine::new(mock_time))
3526                .timer(timer)
3527                .start(),
3528        );
3529        let observer = TestObserver::default();
3530        spawner
3531            .spawn_local(observer.observe(state_machine))
3532            .unwrap();
3533
3534        let blocked_timer = pool.run_until(timers.next()).unwrap();
3535        assert_eq!(
3536            blocked_timer.requested_wait(),
3537            RequestedWait::Until(next_update_time.into())
3538        );
3539        blocked_timer.unblock();
3540        pool.run_until_stalled();
3541
3542        assert!(*reboot_called.borrow());
3543    }
3544
3545    #[test]
3546    fn test_skip_reboot_if_not_needed() {
3547        let mut pool = LocalPool::new();
3548        let spawner = pool.spawner();
3549
3550        let http = MockHttpRequest::new(make_update_available_response());
3551        let mock_time = MockTimeSource::new_from_now();
3552        let next_update_time = mock_time.now();
3553        let reboot_check_options_received = Rc::new(RefCell::new(vec![]));
3554        let policy_engine = MockPolicyEngine {
3555            reboot_check_options_received: Rc::clone(&reboot_check_options_received),
3556            check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
3557            time_source: mock_time.clone(),
3558            reboot_needed: Rc::new(RefCell::new(false)),
3559            ..MockPolicyEngine::default()
3560        };
3561        let (timer, mut timers) = BlockingTimer::new();
3562
3563        let installer = TestInstaller::builder(mock_time).build();
3564        let reboot_called = Rc::clone(&installer.reboot_called);
3565        let (_ctl, state_machine) = pool.run_until(
3566            StateMachineBuilder::new_stub()
3567                .http(http)
3568                .installer(installer)
3569                .policy_engine(policy_engine)
3570                .timer(timer)
3571                .start(),
3572        );
3573        let observer = TestObserver::default();
3574        spawner
3575            .spawn_local(observer.observe(state_machine))
3576            .unwrap();
3577
3578        let blocked_timer = pool.run_until(timers.next()).unwrap();
3579        assert_eq!(
3580            blocked_timer.requested_wait(),
3581            RequestedWait::Until(next_update_time.into())
3582        );
3583        blocked_timer.unblock();
3584        pool.run_until_stalled();
3585
3586        assert_eq!(
3587            observer.take_states(),
3588            vec![
3589                State::CheckingForUpdates(InstallSource::ScheduledTask),
3590                State::InstallingUpdate,
3591                State::Idle
3592            ]
3593        );
3594
3595        assert_eq!(*reboot_check_options_received.borrow(), vec![]);
3596        assert!(!*reboot_called.borrow());
3597    }
3598
3599    #[test]
3600    fn test_failed_update_does_not_trigger_reboot() {
3601        let mut pool = LocalPool::new();
3602        let spawner = pool.spawner();
3603
3604        let http = MockHttpRequest::new(make_update_available_response());
3605        let mock_time = MockTimeSource::new_from_now();
3606        let next_update_time = mock_time.now();
3607        let (timer, mut timers) = BlockingTimer::new();
3608
3609        let installer = TestInstaller::builder(mock_time.clone())
3610            .add_install_fail()
3611            .build();
3612        let reboot_called = Rc::clone(&installer.reboot_called);
3613        let (_ctl, state_machine) = pool.run_until(
3614            StateMachineBuilder::new_stub()
3615                .http(http)
3616                .installer(installer)
3617                .policy_engine(StubPolicyEngine::new(mock_time))
3618                .timer(timer)
3619                .start(),
3620        );
3621        let observer = TestObserver::default();
3622        spawner
3623            .spawn_local(observer.observe(state_machine))
3624            .unwrap();
3625
3626        let blocked_timer = pool.run_until(timers.next()).unwrap();
3627        assert_eq!(
3628            blocked_timer.requested_wait(),
3629            RequestedWait::Until(next_update_time.into())
3630        );
3631        blocked_timer.unblock();
3632        pool.run_until_stalled();
3633
3634        assert!(!*reboot_called.borrow());
3635    }
3636
3637    // Verify that if we are in the middle of checking for or applying an update, a new OnDemand
3638    // update check request will "upgrade" the inflight check request to behave as if it was
3639    // OnDemand. In particular, this should cause an immediate reboot.
3640    #[test]
3641    fn test_reboots_immediately_if_user_initiated_update_requests_occurs_during_install() {
3642        let mut pool = LocalPool::new();
3643        let spawner = pool.spawner();
3644
3645        let http = MockHttpRequest::new(make_update_available_response());
3646        let mock_time = MockTimeSource::new_from_now();
3647
3648        let (send_install, mut recv_install) = mpsc::channel(0);
3649        let (send_reboot, mut recv_reboot) = mpsc::channel(0);
3650        let reboot_check_options_received = Rc::new(RefCell::new(vec![]));
3651        let policy_engine = MockPolicyEngine {
3652            reboot_check_options_received: Rc::clone(&reboot_check_options_received),
3653            check_timing: Some(CheckTiming::builder().time(mock_time.now()).build()),
3654            ..MockPolicyEngine::default()
3655        };
3656
3657        let (mut ctl, state_machine) = pool.run_until(
3658            StateMachineBuilder::new_stub()
3659                .http(http)
3660                .installer(BlockingInstaller {
3661                    on_install: send_install,
3662                    on_reboot: Some(send_reboot),
3663                })
3664                .policy_engine(policy_engine)
3665                .start(),
3666        );
3667
3668        let observer = TestObserver::default();
3669        spawner
3670            .spawn_local(observer.observe(state_machine))
3671            .unwrap();
3672
3673        let unblock_install = pool.run_until(recv_install.next()).unwrap();
3674        pool.run_until_stalled();
3675        assert_eq!(
3676            observer.take_states(),
3677            vec![
3678                State::CheckingForUpdates(InstallSource::ScheduledTask),
3679                State::InstallingUpdate
3680            ]
3681        );
3682
3683        pool.run_until(async {
3684            assert_eq!(
3685                ctl.start_update_check(CheckOptions {
3686                    source: InstallSource::OnDemand
3687                })
3688                .await,
3689                Ok(StartUpdateCheckResponse::AlreadyRunning)
3690            );
3691        });
3692
3693        pool.run_until_stalled();
3694        assert_eq!(observer.take_states(), vec![]);
3695
3696        unblock_install
3697            .send(vec![AppInstallResult::Installed])
3698            .unwrap();
3699        pool.run_until_stalled();
3700        assert_eq!(observer.take_states(), vec![State::WaitingForReboot]);
3701
3702        let unblock_reboot = pool.run_until(recv_reboot.next()).unwrap();
3703        pool.run_until_stalled();
3704        unblock_reboot.send(Ok(())).unwrap();
3705
3706        // Make sure when we checked whether we could reboot, it was from an OnDemand source
3707        assert_eq!(
3708            *reboot_check_options_received.borrow(),
3709            vec![CheckOptions {
3710                source: InstallSource::OnDemand
3711            }]
3712        );
3713    }
3714
3715    // Verifies that if the state machine is done with an install and waiting for a reboot, and a
3716    // user-initiated UpdateCheckRequest comes in, we reboot immediately.
3717    #[test]
3718    fn test_reboots_immediately_when_check_now_comes_in_during_wait() {
3719        let mut pool = LocalPool::new();
3720        let spawner = pool.spawner();
3721
3722        let mut http = MockHttpRequest::new(make_update_available_response());
3723        // Responses to events.
3724        http.add_response(HttpResponse::new(vec![]));
3725        http.add_response(HttpResponse::new(vec![]));
3726        http.add_response(HttpResponse::new(vec![]));
3727        // Response to the ping.
3728        http.add_response(make_update_available_response());
3729        let mut mock_time = MockTimeSource::new_from_now();
3730        mock_time.truncate_submicrosecond_walltime();
3731        let next_update_time = mock_time.now() + Duration::from_secs(1000);
3732        let (timer, mut timers) = BlockingTimer::new();
3733        let reboot_allowed = Rc::new(RefCell::new(false));
3734        let reboot_check_options_received = Rc::new(RefCell::new(vec![]));
3735        let policy_engine = MockPolicyEngine {
3736            time_source: mock_time.clone(),
3737            reboot_allowed: Rc::clone(&reboot_allowed),
3738            check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
3739            reboot_check_options_received: Rc::clone(&reboot_check_options_received),
3740            ..MockPolicyEngine::default()
3741        };
3742        let installer = TestInstaller::builder(mock_time.clone()).build();
3743        let reboot_called = Rc::clone(&installer.reboot_called);
3744        let storage_ref = Rc::new(Mutex::new(MemStorage::new()));
3745        let apps = make_test_app_set();
3746
3747        let (mut ctl, state_machine) = pool.run_until(
3748            StateMachineBuilder::new_stub()
3749                .app_set(apps)
3750                .http(http)
3751                .installer(installer)
3752                .policy_engine(policy_engine)
3753                .timer(timer)
3754                .storage(Rc::clone(&storage_ref))
3755                .start(),
3756        );
3757
3758        let observer = TestObserver::default();
3759        spawner
3760            .spawn_local(observer.observe(state_machine))
3761            .unwrap();
3762
3763        // The first wait before update check.
3764        let blocked_timer = pool.run_until(timers.next()).unwrap();
3765        assert_eq!(
3766            blocked_timer.requested_wait(),
3767            RequestedWait::Until(next_update_time.into())
3768        );
3769        blocked_timer.unblock();
3770        pool.run_until_stalled();
3771
3772        // The timers for reboot and ping, even though the order should be deterministic, but that
3773        // is an implementation detail, the test should still pass if that order changes.
3774        let blocked_timer1 = pool.run_until(timers.next()).unwrap();
3775        let blocked_timer2 = pool.run_until(timers.next()).unwrap();
3776        let (wait_for_reboot_timer, _wait_for_next_ping_timer) =
3777            match blocked_timer1.requested_wait() {
3778                RequestedWait::For(_) => (blocked_timer1, blocked_timer2),
3779                RequestedWait::Until(_) => (blocked_timer2, blocked_timer1),
3780            };
3781        // This is the timer waiting for next reboot_allowed check.
3782        assert_eq!(
3783            wait_for_reboot_timer.requested_wait(),
3784            RequestedWait::For(CHECK_REBOOT_ALLOWED_INTERVAL)
3785        );
3786
3787        // If we send an update check request that's from a user (source == OnDemand), we should
3788        // short-circuit the wait for reboot, and update immediately.
3789        assert!(!*reboot_called.borrow());
3790        *reboot_allowed.borrow_mut() = true;
3791        pool.run_until(async {
3792            assert_eq!(
3793                ctl.start_update_check(CheckOptions {
3794                    source: InstallSource::OnDemand
3795                })
3796                .await,
3797                Ok(StartUpdateCheckResponse::AlreadyRunning)
3798            );
3799        });
3800        pool.run_until_stalled();
3801        assert!(*reboot_called.borrow());
3802
3803        // Check that we got one check for reboot from a Scheduled Task (the start of the wait),
3804        // and then another came in with OnDemand, as we "upgraded it" with our OnDemand check
3805        // request
3806        assert_eq!(
3807            *reboot_check_options_received.borrow(),
3808            vec![
3809                CheckOptions {
3810                    source: InstallSource::ScheduledTask
3811                },
3812                CheckOptions {
3813                    source: InstallSource::OnDemand
3814                },
3815            ]
3816        );
3817    }
3818
3819    // Verifies that if reboot is not allowed, state machine will send pings to Omaha while waiting
3820    // for reboot, and it will reply AlreadyRunning to any StartUpdateCheck requests, and when it's
3821    // finally time to reboot, it will trigger reboot.
3822    #[test]
3823    fn test_wait_for_reboot() {
3824        let mut pool = LocalPool::new();
3825        let spawner = pool.spawner();
3826
3827        let mut http = MockHttpRequest::new(make_update_available_response());
3828        // Responses to events.
3829        http.add_response(HttpResponse::new(vec![]));
3830        http.add_response(HttpResponse::new(vec![]));
3831        http.add_response(HttpResponse::new(vec![]));
3832        // Response to the ping.
3833        http.add_response(make_update_available_response());
3834        let ping_request_viewer = MockHttpRequest::from_request_cell(http.get_request_cell());
3835        let mut mock_time = MockTimeSource::new_from_now();
3836        mock_time.truncate_submicrosecond_walltime();
3837        let next_update_time = mock_time.now() + Duration::from_secs(1000);
3838        let (timer, mut timers) = BlockingTimer::new();
3839        let reboot_allowed = Rc::new(RefCell::new(false));
3840        let policy_engine = MockPolicyEngine {
3841            time_source: mock_time.clone(),
3842            reboot_allowed: Rc::clone(&reboot_allowed),
3843            check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
3844            ..MockPolicyEngine::default()
3845        };
3846        let installer = TestInstaller::builder(mock_time.clone()).build();
3847        let reboot_called = Rc::clone(&installer.reboot_called);
3848        let storage_ref = Rc::new(Mutex::new(MemStorage::new()));
3849        let apps = make_test_app_set();
3850
3851        let (mut ctl, state_machine) = pool.run_until(
3852            StateMachineBuilder::new_stub()
3853                .app_set(apps.clone())
3854                .http(http)
3855                .installer(installer)
3856                .policy_engine(policy_engine)
3857                .timer(timer)
3858                .storage(Rc::clone(&storage_ref))
3859                .start(),
3860        );
3861
3862        let observer = TestObserver::default();
3863        spawner
3864            .spawn_local(observer.observe(state_machine))
3865            .unwrap();
3866
3867        // The first wait before update check.
3868        let blocked_timer = pool.run_until(timers.next()).unwrap();
3869        assert_eq!(
3870            blocked_timer.requested_wait(),
3871            RequestedWait::Until(next_update_time.into())
3872        );
3873        blocked_timer.unblock();
3874        pool.run_until_stalled();
3875
3876        // The timers for reboot and ping, even though the order should be deterministic, but that
3877        // is an implementation detail, the test should still pass if that order changes.
3878        let blocked_timer1 = pool.run_until(timers.next()).unwrap();
3879        let blocked_timer2 = pool.run_until(timers.next()).unwrap();
3880        let (wait_for_reboot_timer, wait_for_next_ping_timer) =
3881            match blocked_timer1.requested_wait() {
3882                RequestedWait::For(_) => (blocked_timer1, blocked_timer2),
3883                RequestedWait::Until(_) => (blocked_timer2, blocked_timer1),
3884            };
3885        // This is the timer waiting for next reboot_allowed check.
3886        assert_eq!(
3887            wait_for_reboot_timer.requested_wait(),
3888            RequestedWait::For(CHECK_REBOOT_ALLOWED_INTERVAL)
3889        );
3890        // This is the timer waiting for the next ping.
3891        assert_eq!(
3892            wait_for_next_ping_timer.requested_wait(),
3893            RequestedWait::Until(next_update_time.into())
3894        );
3895        // Unblock the ping.
3896        mock_time.advance(Duration::from_secs(1000));
3897        wait_for_next_ping_timer.unblock();
3898        pool.run_until_stalled();
3899
3900        // Verify that it sends a ping.
3901        let config = crate::configuration::test_support::config_generator();
3902        let request_params = RequestParams::default();
3903
3904        let apps = pool.run_until(apps.lock()).get_apps();
3905        let mut expected_request_builder = RequestBuilder::new(&config, &request_params)
3906            // 0: session id for update check
3907            // 1: request id for update check
3908            // 2-4: request id for events
3909            .session_id(GUID::from_u128(5))
3910            .request_id(GUID::from_u128(6));
3911        for app in &apps {
3912            expected_request_builder = expected_request_builder.add_ping(app);
3913        }
3914        pool.run_until(assert_request(
3915            &ping_request_viewer,
3916            expected_request_builder,
3917        ));
3918
3919        pool.run_until(async {
3920            assert_eq!(
3921                ctl.start_update_check(CheckOptions::default()).await,
3922                Ok(StartUpdateCheckResponse::AlreadyRunning)
3923            );
3924        });
3925
3926        // Last update time is updated in storage.
3927        pool.run_until(async {
3928            let storage = storage_ref.lock().await;
3929            let context = update_check::Context::load(&*storage).await;
3930            assert_eq!(
3931                context.schedule.last_update_time,
3932                Some(mock_time.now_in_walltime().into())
3933            );
3934        });
3935
3936        // State machine should be waiting for the next ping.
3937        let wait_for_next_ping_timer = pool.run_until(timers.next()).unwrap();
3938        assert_eq!(
3939            wait_for_next_ping_timer.requested_wait(),
3940            RequestedWait::Until(next_update_time.into())
3941        );
3942
3943        // Let state machine check reboot_allowed again, but still don't allow it.
3944        wait_for_reboot_timer.unblock();
3945        pool.run_until_stalled();
3946        assert!(!*reboot_called.borrow());
3947
3948        // State machine should be waiting for the next reboot.
3949        let wait_for_reboot_timer = pool.run_until(timers.next()).unwrap();
3950        assert_eq!(
3951            wait_for_reboot_timer.requested_wait(),
3952            RequestedWait::For(CHECK_REBOOT_ALLOWED_INTERVAL)
3953        );
3954
3955        // Time for a second ping.
3956        wait_for_next_ping_timer.unblock();
3957        pool.run_until_stalled();
3958
3959        // Verify that it sends another ping.
3960        let mut expected_request_builder = RequestBuilder::new(&config, &request_params)
3961            .session_id(GUID::from_u128(7))
3962            .request_id(GUID::from_u128(8));
3963        for app in &apps {
3964            expected_request_builder = expected_request_builder.add_ping(app);
3965        }
3966        pool.run_until(assert_request(
3967            &ping_request_viewer,
3968            expected_request_builder,
3969        ));
3970
3971        assert!(!*reboot_called.borrow());
3972
3973        // Now allow reboot.
3974        *reboot_called.borrow_mut() = true;
3975        wait_for_reboot_timer.unblock();
3976        pool.run_until_stalled();
3977        assert!(*reboot_called.borrow());
3978    }
3979
3980    #[derive(Debug)]
3981    struct BlockingInstaller {
3982        on_install: mpsc::Sender<oneshot::Sender<Vec<AppInstallResult<StubInstallErrors>>>>,
3983        on_reboot: Option<mpsc::Sender<oneshot::Sender<Result<(), anyhow::Error>>>>,
3984    }
3985
3986    impl Installer for BlockingInstaller {
3987        type InstallPlan = StubPlan;
3988        type Error = StubInstallErrors;
3989        type InstallResult = ();
3990
3991        fn perform_install(
3992            &mut self,
3993            _install_plan: &StubPlan,
3994            _observer: Option<&dyn ProgressObserver>,
3995        ) -> LocalBoxFuture<'_, (Self::InstallResult, Vec<AppInstallResult<Self::Error>>)> {
3996            let (send, recv) = oneshot::channel();
3997            let send_fut = self.on_install.send(send);
3998
3999            async move {
4000                send_fut.await.unwrap();
4001                ((), recv.await.unwrap())
4002            }
4003            .boxed_local()
4004        }
4005
4006        fn perform_reboot(&mut self) -> LocalBoxFuture<'_, Result<(), anyhow::Error>> {
4007            match &mut self.on_reboot {
4008                Some(on_reboot) => {
4009                    let (send, recv) = oneshot::channel();
4010                    let send_fut = on_reboot.send(send);
4011
4012                    async move {
4013                        send_fut.await.unwrap();
4014                        recv.await.unwrap()
4015                    }
4016                    .boxed_local()
4017                }
4018                None => future::ready(Ok(())).boxed_local(),
4019            }
4020        }
4021
4022        fn try_create_install_plan<'a>(
4023            &'a self,
4024            _request_params: &'a RequestParams,
4025            _request_metadata: Option<&'a RequestMetadata>,
4026            _response: &'a Response,
4027            _response_bytes: Vec<u8>,
4028            _ecdsa_signature: Option<Vec<u8>>,
4029        ) -> LocalBoxFuture<'a, Result<Self::InstallPlan, Self::Error>> {
4030            future::ready(Ok(StubPlan)).boxed_local()
4031        }
4032    }
4033
4034    #[derive(Debug, Default)]
4035    struct TestObserver {
4036        states: Rc<RefCell<Vec<State>>>,
4037    }
4038
4039    impl TestObserver {
4040        fn observe(&self, s: impl Stream<Item = StateMachineEvent>) -> impl Future<Output = ()> {
4041            let states = Rc::clone(&self.states);
4042            async move {
4043                futures::pin_mut!(s);
4044                while let Some(event) = s.next().await {
4045                    if let StateMachineEvent::StateChange(state) = event {
4046                        states.borrow_mut().push(state);
4047                    }
4048                }
4049            }
4050        }
4051
4052        fn observe_until_terminal(
4053            &self,
4054            s: impl Stream<Item = StateMachineEvent>,
4055        ) -> impl Future<Output = ()> {
4056            let states = Rc::clone(&self.states);
4057            async move {
4058                futures::pin_mut!(s);
4059                while let Some(event) = s.next().await {
4060                    if let StateMachineEvent::StateChange(state) = event {
4061                        states.borrow_mut().push(state);
4062                        match state {
4063                            State::Idle | State::WaitingForReboot => return,
4064                            _ => {}
4065                        }
4066                    }
4067                }
4068            }
4069        }
4070
4071        fn take_states(&self) -> Vec<State> {
4072            std::mem::take(&mut *self.states.borrow_mut())
4073        }
4074    }
4075
4076    #[test]
4077    fn test_start_update_during_update_replies_with_in_progress() {
4078        let mut pool = LocalPool::new();
4079        let spawner = pool.spawner();
4080
4081        let http = MockHttpRequest::new(make_update_available_response());
4082        let (send_install, mut recv_install) = mpsc::channel(0);
4083        let (mut ctl, state_machine) = pool.run_until(
4084            StateMachineBuilder::new_stub()
4085                .http(http)
4086                .installer(BlockingInstaller {
4087                    on_install: send_install,
4088                    on_reboot: None,
4089                })
4090                .start(),
4091        );
4092
4093        let observer = TestObserver::default();
4094        spawner
4095            .spawn_local(observer.observe_until_terminal(state_machine))
4096            .unwrap();
4097
4098        let unblock_install = pool.run_until(recv_install.next()).unwrap();
4099        pool.run_until_stalled();
4100        assert_eq!(
4101            observer.take_states(),
4102            vec![
4103                State::CheckingForUpdates(InstallSource::ScheduledTask),
4104                State::InstallingUpdate
4105            ]
4106        );
4107
4108        pool.run_until(async {
4109            assert_eq!(
4110                ctl.start_update_check(CheckOptions::default()).await,
4111                Ok(StartUpdateCheckResponse::AlreadyRunning)
4112            );
4113        });
4114        pool.run_until_stalled();
4115        assert_eq!(observer.take_states(), vec![]);
4116
4117        unblock_install
4118            .send(vec![AppInstallResult::Installed])
4119            .unwrap();
4120        pool.run_until_stalled();
4121
4122        assert_eq!(observer.take_states(), vec![State::WaitingForReboot]);
4123    }
4124
4125    #[test]
4126    fn test_start_update_during_timer_starts_update() {
4127        let mut pool = LocalPool::new();
4128        let spawner = pool.spawner();
4129
4130        let mut mock_time = MockTimeSource::new_from_now();
4131        let next_update_time = mock_time.now() + Duration::from_secs(321);
4132
4133        let (timer, mut timers) = BlockingTimer::new();
4134        let policy_engine = MockPolicyEngine {
4135            check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
4136            time_source: mock_time.clone(),
4137            ..MockPolicyEngine::default()
4138        };
4139        let (mut ctl, state_machine) = pool.run_until(
4140            StateMachineBuilder::new_stub()
4141                .policy_engine(policy_engine)
4142                .timer(timer)
4143                .start(),
4144        );
4145
4146        let observer = TestObserver::default();
4147        spawner
4148            .spawn_local(observer.observe(state_machine))
4149            .unwrap();
4150
4151        let blocked_timer = pool.run_until(timers.next()).unwrap();
4152        assert_eq!(
4153            blocked_timer.requested_wait(),
4154            RequestedWait::Until(next_update_time.into())
4155        );
4156        mock_time.advance(Duration::from_secs(200));
4157        assert_eq!(observer.take_states(), vec![]);
4158
4159        // Nothing happens while the timer is waiting.
4160        pool.run_until_stalled();
4161        assert_eq!(observer.take_states(), vec![]);
4162
4163        blocked_timer.unblock();
4164        let blocked_timer = pool.run_until(timers.next()).unwrap();
4165        assert_eq!(
4166            blocked_timer.requested_wait(),
4167            RequestedWait::Until(next_update_time.into())
4168        );
4169        assert_eq!(
4170            observer.take_states(),
4171            vec![
4172                State::CheckingForUpdates(InstallSource::ScheduledTask),
4173                State::ErrorCheckingForUpdate,
4174                State::Idle
4175            ]
4176        );
4177
4178        // Unless a control signal to start an update check comes in.
4179        pool.run_until(async {
4180            assert_eq!(
4181                ctl.start_update_check(CheckOptions::default()).await,
4182                Ok(StartUpdateCheckResponse::Started)
4183            );
4184        });
4185        pool.run_until_stalled();
4186        assert_eq!(
4187            observer.take_states(),
4188            vec![
4189                State::CheckingForUpdates(InstallSource::ScheduledTask),
4190                State::ErrorCheckingForUpdate,
4191                State::Idle
4192            ]
4193        );
4194    }
4195
4196    #[test]
4197    fn test_start_update_check_returns_throttled() {
4198        let mut pool = LocalPool::new();
4199        let spawner = pool.spawner();
4200
4201        let mut mock_time = MockTimeSource::new_from_now();
4202        let next_update_time = mock_time.now() + Duration::from_secs(321);
4203
4204        let (timer, mut timers) = BlockingTimer::new();
4205        let policy_engine = MockPolicyEngine {
4206            check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
4207            time_source: mock_time.clone(),
4208            check_decision: CheckDecision::ThrottledByPolicy,
4209            ..MockPolicyEngine::default()
4210        };
4211        let (mut ctl, state_machine) = pool.run_until(
4212            StateMachineBuilder::new_stub()
4213                .policy_engine(policy_engine)
4214                .timer(timer)
4215                .start(),
4216        );
4217
4218        let observer = TestObserver::default();
4219        spawner
4220            .spawn_local(observer.observe(state_machine))
4221            .unwrap();
4222
4223        let blocked_timer = pool.run_until(timers.next()).unwrap();
4224        assert_eq!(
4225            blocked_timer.requested_wait(),
4226            RequestedWait::Until(next_update_time.into())
4227        );
4228        mock_time.advance(Duration::from_secs(200));
4229        assert_eq!(observer.take_states(), vec![]);
4230
4231        pool.run_until(async {
4232            assert_eq!(
4233                ctl.start_update_check(CheckOptions::default()).await,
4234                Ok(StartUpdateCheckResponse::Throttled)
4235            );
4236        });
4237        pool.run_until_stalled();
4238        assert_eq!(observer.take_states(), vec![]);
4239    }
4240
4241    #[test]
4242    fn test_progress_observer() {
4243        block_on(async {
4244            let http = MockHttpRequest::new(make_update_available_response());
4245            let mock_time = MockTimeSource::new_from_now();
4246            let progresses = StateMachineBuilder::new_stub()
4247                .http(http)
4248                .installer(TestInstaller::builder(mock_time.clone()).build())
4249                .policy_engine(StubPolicyEngine::new(mock_time))
4250                .oneshot_check()
4251                .await
4252                .filter_map(|event| {
4253                    future::ready(match event {
4254                        StateMachineEvent::InstallProgressChange(InstallProgress { progress }) => {
4255                            Some(progress)
4256                        }
4257                        _ => None,
4258                    })
4259                })
4260                .collect::<Vec<f32>>()
4261                .await;
4262            assert_eq!(progresses, [0.0, 0.3, 0.9, 1.0]);
4263        });
4264    }
4265
4266    #[test]
4267    // A scenario in which
4268    // (now_in_monotonic - state_machine_start_in_monotonic) > (update_finish_time - now_in_wall)
4269    // should not panic.
4270    fn test_report_waited_for_reboot_duration_doesnt_panic_on_wrong_current_time() {
4271        block_on(async {
4272            let metrics_reporter = MockMetricsReporter::new();
4273
4274            let state_machine_start_monotonic = Instant::now();
4275            let update_finish_time = SystemTime::now();
4276
4277            // Set the monotonic increase in time larger than the wall time increase since the end
4278            // of the last update.
4279            // This can happen if we don't have a reliable current wall time.
4280            let now_wall = update_finish_time + Duration::from_secs(1);
4281            let now_monotonic = state_machine_start_monotonic + Duration::from_secs(10);
4282
4283            let mut state_machine = StateMachineBuilder::new_stub()
4284                .metrics_reporter(metrics_reporter)
4285                .build()
4286                .await;
4287
4288            // Time has advanced monotonically since we noted the start of the state machine for
4289            // longer than the wall time difference between update finish time and now.
4290            // This computation should currently overflow.
4291            state_machine
4292                .report_waited_for_reboot_duration(
4293                    update_finish_time,
4294                    state_machine_start_monotonic,
4295                    ComplexTime {
4296                        wall: now_wall,
4297                        mono: now_monotonic,
4298                    },
4299                )
4300                .expect_err("should overflow and error out");
4301
4302            // We should have reported no metrics
4303            assert!(state_machine.metrics_reporter.metrics.is_empty());
4304        });
4305    }
4306
4307    #[test]
4308    fn test_report_waited_for_reboot_duration() {
4309        let mut pool = LocalPool::new();
4310        let spawner = pool.spawner();
4311
4312        let response = json!({"response": {
4313            "server": "prod",
4314            "protocol": "3.0",
4315            "app": [{
4316            "appid": "{00000000-0000-0000-0000-000000000001}",
4317            "status": "ok",
4318            "updatecheck": {
4319                "status": "ok",
4320                "manifest": {
4321                    "version": "1.2.3.5",
4322                    "actions": {
4323                        "action": [],
4324                    },
4325                    "packages": {
4326                        "package": [],
4327                    },
4328                }
4329            }
4330            }],
4331        }});
4332        let response = serde_json::to_vec(&response).unwrap();
4333        let http = MockHttpRequest::new(HttpResponse::new(response));
4334        let mut mock_time = MockTimeSource::new_from_now();
4335        mock_time.truncate_submicrosecond_walltime();
4336        let storage = Rc::new(Mutex::new(MemStorage::new()));
4337
4338        // Do one update.
4339        assert_matches!(
4340            pool.run_until(
4341                StateMachineBuilder::new_stub()
4342                    .http(http)
4343                    .policy_engine(StubPolicyEngine::new(mock_time.clone()))
4344                    .storage(Rc::clone(&storage))
4345                    .oneshot(RequestParams::default())
4346            ),
4347            Ok(_)
4348        );
4349
4350        mock_time.advance(Duration::from_secs(999));
4351
4352        // Execute state machine `run()`, simulating that we already rebooted.
4353        let config = Config {
4354            updater: Updater {
4355                name: "updater".to_string(),
4356                version: Version::from([0, 1]),
4357            },
4358            os: OS {
4359                version: "1.2.3.5".to_string(),
4360                ..OS::default()
4361            },
4362            service_url: "http://example.com/".to_string(),
4363            omaha_public_keys: None,
4364        };
4365        let metrics_reporter = Rc::new(RefCell::new(MockMetricsReporter::new()));
4366        let (_ctl, state_machine) = pool.run_until(
4367            StateMachineBuilder::new_stub()
4368                .config(config)
4369                .metrics_reporter(Rc::clone(&metrics_reporter))
4370                .policy_engine(StubPolicyEngine::new(mock_time.clone()))
4371                .storage(Rc::clone(&storage))
4372                .timer(MockTimer::new())
4373                .start(),
4374        );
4375
4376        // Move state machine forward using observer.
4377        let observer = TestObserver::default();
4378        spawner
4379            .spawn_local(observer.observe(state_machine))
4380            .unwrap();
4381        pool.run_until_stalled();
4382
4383        assert_eq!(
4384            metrics_reporter
4385                .borrow()
4386                .metrics
4387                .iter()
4388                .filter(|m| matches!(m, Metrics::WaitedForRebootDuration(_)))
4389                .collect::<Vec<_>>(),
4390            vec![&Metrics::WaitedForRebootDuration(Duration::from_secs(999))]
4391        );
4392
4393        // Verify that storage is cleaned up.
4394        pool.run_until(async {
4395            let storage = storage.lock().await;
4396            assert_eq!(storage.get_time(UPDATE_FINISH_TIME).await, None);
4397            assert_eq!(storage.get_string(TARGET_VERSION).await, None);
4398            assert!(storage.committed());
4399        })
4400    }
4401
4402    // The same as |run_simple_check_with_noupdate_result|, but with CUPv2 protocol validation.
4403    #[test]
4404    fn run_cup_but_decoration_error() {
4405        block_on(async {
4406            let http = MockHttpRequest::new(HttpResponse::new(make_noupdate_httpresponse()));
4407
4408            let stub_cup_handler = MockCupv2Handler::new().set_decoration_error(|| {
4409                Some(CupDecorationError::ParseError(
4410                    "".parse::<http::Uri>().unwrap_err(),
4411                ))
4412            });
4413
4414            assert_matches!(
4415                StateMachineBuilder::new_stub()
4416                    .http(http)
4417                    .cup_handler(Some(stub_cup_handler))
4418                    .oneshot(RequestParams::default())
4419                    .await,
4420                Err(UpdateCheckError::OmahaRequest(
4421                    OmahaRequestError::CupDecoration(CupDecorationError::ParseError(_))
4422                ))
4423            );
4424
4425            info!("update check complete!");
4426        });
4427    }
4428
4429    #[test]
4430    fn run_cup_but_verification_error() {
4431        block_on(async {
4432            let http = MockHttpRequest::new(HttpResponse::new(make_noupdate_httpresponse()));
4433
4434            let stub_cup_handler = MockCupv2Handler::new()
4435                .set_verification_error(|| Some(CupVerificationError::EtagHeaderMissing));
4436
4437            assert_matches!(
4438                StateMachineBuilder::new_stub()
4439                    .http(http)
4440                    .cup_handler(Some(stub_cup_handler))
4441                    .oneshot(RequestParams::default())
4442                    .await,
4443                Err(UpdateCheckError::OmahaRequest(
4444                    OmahaRequestError::CupValidation(CupVerificationError::EtagHeaderMissing)
4445                ))
4446            );
4447
4448            info!("update check complete!");
4449        });
4450    }
4451
4452    #[test]
4453    fn run_cup_valid() {
4454        block_on(async {
4455            let http = MockHttpRequest::new(HttpResponse::new(make_noupdate_httpresponse()));
4456
4457            assert_matches!(
4458                StateMachineBuilder::new_stub()
4459                    .http(http)
4460                    // Default stub_cup_handler, which is permissive.
4461                    .oneshot(RequestParams::default())
4462                    .await,
4463                Ok(_)
4464            );
4465
4466            info!("update check complete!");
4467        });
4468    }
4469}