Skip to main content

omaha_client/
state_machine.rs

1// Copyright 2019 The Fuchsia Authors
2//
3// Licensed under a BSD-style license <LICENSE-BSD>, Apache License, Version 2.0
4// <LICENSE-APACHE or https://www.apache.org/licenses/LICENSE-2.0>, or the MIT
5// license <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your option.
6// This file may not be copied, modified, or distributed except according to
7// those terms.
8
9use crate::{
10    app_set::{AppSet, AppSetExt as _},
11    async_generator,
12    common::{App, CheckOptions, CheckTiming},
13    configuration::Config,
14    cup_ecdsa::{CupDecorationError, CupVerificationError, Cupv2Handler, RequestMetadata},
15    http_request::{self, HttpRequest},
16    installer::{AppInstallResult, Installer, Plan},
17    metrics::{ClockType, Metrics, MetricsReporter, UpdateCheckFailureReason},
18    policy::{CheckDecision, PolicyEngine, UpdateDecision},
19    protocol::{
20        self,
21        request::{Event, EventErrorCode, EventResult, EventType, GUID, InstallSource},
22        response::{OmahaStatus, Response, UpdateCheck, parse_json_response},
23    },
24    request_builder::{self, RequestBuilder, RequestParams},
25    storage::{Storage, StorageExt},
26    time::{ComplexTime, PartialComplexTime, TimeSource, Timer},
27};
28
29use anyhow::anyhow;
30use futures::{
31    channel::{mpsc, oneshot},
32    future::{self, BoxFuture, Fuse},
33    lock::Mutex,
34    prelude::*,
35    select,
36};
37use http::{Response as HttpResponse, response::Parts};
38use log::{error, info, warn};
39use p256::ecdsa::DerSignature;
40use std::{
41    cmp::min,
42    collections::HashMap,
43    convert::TryInto,
44    rc::Rc,
45    str::Utf8Error,
46    time::{Duration, Instant, SystemTime},
47};
48use thiserror::Error;
49
50pub mod update_check;
51
52mod builder;
53pub use builder::StateMachineBuilder;
54
55mod observer;
56use observer::StateMachineProgressObserver;
57pub use observer::{InstallProgress, StateMachineEvent};
58
59const INSTALL_PLAN_ID: &str = "install_plan_id";
60const UPDATE_FIRST_SEEN_TIME: &str = "update_first_seen_time";
61const UPDATE_FINISH_TIME: &str = "update_finish_time";
62const TARGET_VERSION: &str = "target_version";
63const CONSECUTIVE_FAILED_INSTALL_ATTEMPTS: &str = "consecutive_failed_install_attempts";
64// How long do we wait after not allowed to reboot to check again.
65const CHECK_REBOOT_ALLOWED_INTERVAL: Duration = Duration::from_secs(30 * 60);
66// This header contains the number of seconds client must not contact server again.
67const X_RETRY_AFTER: &str = "X-Retry-After";
68// How many requests we will make to Omaha before giving up.
69const MAX_OMAHA_REQUEST_ATTEMPTS: u64 = 3;
70
71/// This is the core state machine for a client's update check.  It is instantiated and used to
72/// perform update checks over time or to perform a single update check process.
73#[derive(Debug)]
74pub struct StateMachine<PE, HR, IN, TM, MR, ST, AS, CH>
75where
76    PE: PolicyEngine,
77    HR: HttpRequest,
78    IN: Installer,
79    TM: Timer,
80    MR: MetricsReporter,
81    ST: Storage,
82    AS: AppSet,
83{
84    /// The immutable configuration of the client itself.
85    config: Config,
86
87    policy_engine: PE,
88
89    http: HR,
90
91    installer: IN,
92
93    timer: TM,
94
95    time_source: PE::TimeSource,
96
97    metrics_reporter: MR,
98
99    storage_ref: Rc<Mutex<ST>>,
100
101    /// Context for update check.
102    context: update_check::Context,
103
104    /// The list of apps used for update check.
105    /// When locking both storage and app_set, make sure to always lock storage first.
106    app_set: Rc<Mutex<AS>>,
107
108    cup_handler: Option<CH>,
109}
110
111#[derive(Copy, Clone, Debug, Eq, PartialEq)]
112pub enum State {
113    Idle,
114    CheckingForUpdates(InstallSource),
115    ErrorCheckingForUpdate,
116    NoUpdateAvailable,
117    InstallationDeferredByPolicy,
118    InstallingUpdate,
119    WaitingForReboot,
120    InstallationError,
121}
122
123/// This is the set of errors that can occur when making a request to Omaha.  This is an internal
124/// collection of error types.
125#[derive(Error, Debug)]
126pub enum OmahaRequestError {
127    #[error("Unexpected JSON error constructing update check")]
128    Json(#[from] serde_json::Error),
129
130    #[error("Error building update check HTTP request")]
131    HttpBuilder(#[from] http::Error),
132
133    #[error("Error decorating outgoing request with CUPv2 parameters")]
134    CupDecoration(#[from] CupDecorationError),
135
136    #[error("Error validating incoming response with CUPv2 protocol")]
137    CupValidation(#[from] CupVerificationError),
138
139    // TODO: This still contains hyper user error which should be split out.
140    #[error("HTTP transport error performing update check")]
141    HttpTransport(#[from] http_request::Error),
142
143    #[error("HTTP error performing update check: {0}")]
144    HttpStatus(hyper::StatusCode),
145}
146
147impl From<request_builder::Error> for OmahaRequestError {
148    fn from(err: request_builder::Error) -> Self {
149        match err {
150            request_builder::Error::Json(e) => OmahaRequestError::Json(e),
151            request_builder::Error::Http(e) => OmahaRequestError::HttpBuilder(e),
152            request_builder::Error::Cup(e) => OmahaRequestError::CupDecoration(e),
153        }
154    }
155}
156
157impl From<http::StatusCode> for OmahaRequestError {
158    fn from(sc: http::StatusCode) -> Self {
159        OmahaRequestError::HttpStatus(sc)
160    }
161}
162
163/// This is the set of errors that can occur when parsing the response body from Omaha.  This is an
164/// internal collection of error types.
165#[derive(Error, Debug)]
166pub enum ResponseParseError {
167    #[error("Response was not valid UTF-8")]
168    Utf8(#[from] Utf8Error),
169
170    #[error("Unexpected JSON error parsing update check response")]
171    Json(#[from] serde_json::Error),
172}
173
174#[derive(Error, Debug)]
175pub enum UpdateCheckError {
176    #[error("Error checking with Omaha")]
177    OmahaRequest(#[from] OmahaRequestError),
178
179    #[error("Error parsing Omaha response")]
180    ResponseParser(#[from] ResponseParseError),
181
182    #[error("Unable to create an install plan")]
183    InstallPlan(#[source] anyhow::Error),
184}
185
186/// A handle to interact with the state machine running in another task.
187#[derive(Clone)]
188pub struct ControlHandle(mpsc::Sender<ControlRequest>);
189
190/// Error indicating that the state machine task no longer exists.
191#[derive(Debug, Clone, Error, PartialEq, Eq)]
192#[error("state machine dropped before all its control handles")]
193pub struct StateMachineGone;
194
195impl From<mpsc::SendError> for StateMachineGone {
196    fn from(_: mpsc::SendError) -> Self {
197        StateMachineGone
198    }
199}
200
201impl From<oneshot::Canceled> for StateMachineGone {
202    fn from(_: oneshot::Canceled) -> Self {
203        StateMachineGone
204    }
205}
206
207enum ControlRequest {
208    StartUpdateCheck {
209        options: CheckOptions,
210        responder: oneshot::Sender<StartUpdateCheckResponse>,
211    },
212}
213
214/// Responses to a request to start an update check now.
215#[derive(Debug, Clone, PartialEq, Eq)]
216pub enum StartUpdateCheckResponse {
217    /// The state machine was idle and the request triggered an update check.
218    Started,
219
220    /// The state machine was already processing an update check and ignored this request and
221    /// options.
222    AlreadyRunning,
223
224    /// The update check was throttled by policy.
225    Throttled,
226}
227
228impl ControlHandle {
229    /// Ask the state machine to start an update check with the provided options, returning whether
230    /// or not the state machine started a check or was already running one.
231    pub async fn start_update_check(
232        &mut self,
233        options: CheckOptions,
234    ) -> Result<StartUpdateCheckResponse, StateMachineGone> {
235        let (responder, receive_response) = oneshot::channel();
236        self.0
237            .send(ControlRequest::StartUpdateCheck { options, responder })
238            .await?;
239        Ok(receive_response.await?)
240    }
241}
242
243#[derive(Debug)]
244enum RebootAfterUpdate<T> {
245    Needed(T),
246    NotNeeded,
247}
248
249impl<PE, HR, IN, TM, MR, ST, AS, IR, PL, CH> StateMachine<PE, HR, IN, TM, MR, ST, AS, CH>
250where
251    PE: PolicyEngine<InstallResult = IR, InstallPlan = PL>,
252    HR: HttpRequest,
253    IN: Installer<InstallResult = IR, InstallPlan = PL>,
254    TM: Timer,
255    MR: MetricsReporter,
256    ST: Storage,
257    AS: AppSet,
258    CH: Cupv2Handler,
259    IR: 'static + Send,
260    PL: Plan,
261{
262    /// Ask policy engine for the next update check time and update the context and yield event.
263    async fn update_next_update_time(
264        &mut self,
265        co: &mut async_generator::Yield<StateMachineEvent>,
266    ) -> CheckTiming {
267        let apps = self.app_set.lock().await.get_apps();
268        let timing = self
269            .policy_engine
270            .compute_next_update_time(&apps, &self.context.schedule, &self.context.state)
271            .await;
272        self.context.schedule.next_update_time = Some(timing);
273
274        co.yield_(StateMachineEvent::ScheduleChange(self.context.schedule))
275            .await;
276        info!("Calculated check timing: {}", timing);
277        timing
278    }
279
280    /// Return a future that will wait until the given check timing.
281    async fn make_wait_to_next_check(
282        &mut self,
283        check_timing: CheckTiming,
284    ) -> Fuse<BoxFuture<'static, ()>> {
285        if let Some(minimum_wait) = check_timing.minimum_wait {
286            // If there's a minimum wait, also wait at least that long, by joining the two
287            // timers so that both need to be true (in case `next_update_time` turns out to be
288            // very close to now)
289            future::join(
290                self.timer.wait_for(minimum_wait),
291                self.timer.wait_until(check_timing.time),
292            )
293            .map(|_| ())
294            .boxed()
295            .fuse()
296        } else {
297            // Otherwise just setup the timer for the waiting until the next time.  This is a
298            // wait until either the monotonic or wall times have passed.
299            self.timer.wait_until(check_timing.time).fuse()
300        }
301    }
302
303    async fn run(
304        mut self,
305        mut control: mpsc::Receiver<ControlRequest>,
306        mut co: async_generator::Yield<StateMachineEvent>,
307    ) {
308        {
309            let app_set = self.app_set.lock().await;
310            if !app_set.all_valid() {
311                error!(
312                    "App set not valid, not starting state machine: {:#?}",
313                    app_set.get_apps()
314                );
315                return;
316            }
317        }
318
319        let state_machine_start_monotonic_time = self.time_source.now_in_monotonic();
320
321        let mut should_report_waited_for_reboot_duration = false;
322
323        let update_finish_time = {
324            let storage = self.storage_ref.lock().await;
325            let update_finish_time = storage.get_time(UPDATE_FINISH_TIME).await;
326            if update_finish_time.is_some()
327                && let Some(target_version) = storage.get_string(TARGET_VERSION).await
328                && target_version == self.config.os.version
329            {
330                should_report_waited_for_reboot_duration = true;
331            }
332            update_finish_time
333        };
334
335        loop {
336            info!("Initial context: {:?}", self.context);
337
338            if should_report_waited_for_reboot_duration {
339                match self.report_waited_for_reboot_duration(
340                    update_finish_time.unwrap(),
341                    state_machine_start_monotonic_time,
342                    self.time_source.now(),
343                ) {
344                    Ok(()) => {
345                        // If the report was successful, don't try again on the next loop.
346                        should_report_waited_for_reboot_duration = false;
347
348                        let mut storage = self.storage_ref.lock().await;
349                        storage.remove_or_log(UPDATE_FINISH_TIME).await;
350                        storage.remove_or_log(TARGET_VERSION).await;
351                        storage.commit_or_log().await;
352                    }
353                    Err(e) => {
354                        warn!(
355                            "Couldn't report wait for reboot duration: {:#}, will try again",
356                            e
357                        );
358                    }
359                }
360            }
361
362            let (mut options, responder) = {
363                let check_timing = self.update_next_update_time(&mut co).await;
364                let mut wait_to_next_check = self.make_wait_to_next_check(check_timing).await;
365
366                // Wait for either the next check time or a request to start an update check.  Use
367                // the default check options with the timed check, or those sent with a request.
368                select! {
369                    () = wait_to_next_check => (CheckOptions::default(), None),
370                    ControlRequest::StartUpdateCheck{options, responder} = control.select_next_some() => {
371                        (options, Some(responder))
372                    }
373                }
374            };
375
376            let reboot_after_update = {
377                let apps = self.app_set.lock().await.get_apps();
378                info!(
379                    "Checking to see if an update check is allowed at this time for {:?}",
380                    apps
381                );
382                let decision = self
383                    .policy_engine
384                    .update_check_allowed(
385                        &apps,
386                        &self.context.schedule,
387                        &self.context.state,
388                        &options,
389                    )
390                    .await;
391
392                info!("The update check decision is: {:?}", decision);
393
394                let request_params = match decision {
395                    // Positive results, will continue with the update check process
396                    CheckDecision::Ok(rp) | CheckDecision::OkUpdateDeferred(rp) => rp,
397
398                    // Negative results, exit early
399                    CheckDecision::TooSoon
400                    | CheckDecision::ThrottledByPolicy
401                    | CheckDecision::DeniedByPolicy => {
402                        info!("The update check is not allowed at this time.");
403                        if let Some(responder) = responder {
404                            let _ = responder.send(StartUpdateCheckResponse::Throttled);
405                        }
406                        continue;
407                    }
408                };
409                if let Some(responder) = responder {
410                    let _ = responder.send(StartUpdateCheckResponse::Started);
411                }
412
413                // "start" the update check itself (well, create the future that is the update check)
414                let update_check = self.start_update_check(request_params, &mut co).fuse();
415                futures::pin_mut!(update_check);
416
417                // Wait for the update check to complete, handling any control requests that come in
418                // during the check.
419                loop {
420                    select! {
421                        update_check_result = update_check => break update_check_result,
422                        ControlRequest::StartUpdateCheck{
423                            options: new_options,
424                            responder
425                        } = control.select_next_some() => {
426                            if new_options.source == InstallSource::OnDemand {
427                                info!("Got on demand update check request, ensuring ongoing check is on demand");
428                                // TODO(63180): merge CheckOptions in Policy, not here.
429                                options.source = InstallSource::OnDemand;
430                            }
431
432                            let _ = responder.send(StartUpdateCheckResponse::AlreadyRunning);
433                        }
434                    }
435                }
436            };
437
438            if let RebootAfterUpdate::Needed(install_result) = reboot_after_update {
439                Self::yield_state(State::WaitingForReboot, &mut co).await;
440                self.wait_for_reboot(options, &mut control, install_result, &mut co)
441                    .await;
442            }
443
444            Self::yield_state(State::Idle, &mut co).await;
445        }
446    }
447
448    async fn wait_for_reboot(
449        &mut self,
450        mut options: CheckOptions,
451        control: &mut mpsc::Receiver<ControlRequest>,
452        install_result: IN::InstallResult,
453        co: &mut async_generator::Yield<StateMachineEvent>,
454    ) {
455        if !self
456            .policy_engine
457            .reboot_allowed(&options, &install_result)
458            .await
459        {
460            let wait_to_see_if_reboot_allowed =
461                self.timer.wait_for(CHECK_REBOOT_ALLOWED_INTERVAL).fuse();
462            futures::pin_mut!(wait_to_see_if_reboot_allowed);
463
464            let check_timing = self.update_next_update_time(co).await;
465            let wait_to_next_ping = self.make_wait_to_next_check(check_timing).await;
466            futures::pin_mut!(wait_to_next_ping);
467
468            loop {
469                // Wait for either the next time to check if reboot allowed or the next
470                // ping time or a request to start an update check.
471
472                select! {
473                    () = wait_to_see_if_reboot_allowed => {
474                        if self.policy_engine.reboot_allowed(&options, &install_result).await {
475                            break;
476                        }
477                        info!("Reboot not allowed at the moment, will try again in 30 minutes...");
478                        wait_to_see_if_reboot_allowed.set(
479                            self.timer.wait_for(CHECK_REBOOT_ALLOWED_INTERVAL).fuse()
480                        );
481                    },
482                    () = wait_to_next_ping => {
483                        self.ping_omaha(co).await;
484                        let check_timing = self.update_next_update_time(co).await;
485                        wait_to_next_ping.set(self.make_wait_to_next_check(check_timing).await);
486                    },
487                    ControlRequest::StartUpdateCheck{
488                        options: new_options,
489                        responder
490                    } = control.select_next_some() => {
491                        let _ = responder.send(StartUpdateCheckResponse::AlreadyRunning);
492                        if new_options.source == InstallSource::OnDemand {
493                            info!("Waiting for reboot, but ensuring that InstallSource is OnDemand");
494                            options.source = InstallSource::OnDemand;
495
496                            if self.policy_engine.reboot_allowed(&options, &install_result).await {
497                                info!("Upgraded update check request to on demand, policy allowed reboot");
498                                break;
499                            }
500                        };
501                    }
502                }
503            }
504        }
505        info!("Rebooting the system at the end of a successful update");
506        if let Err(e) = self.installer.perform_reboot().await {
507            error!("Unable to reboot the system: {}", e);
508        }
509    }
510
511    /// Report the duration the previous boot waited to reboot based on the update finish time in
512    /// storage, and the current time. Does not report a metric if there's an inconsistency in the
513    /// times stored or computed, i.e. if the reboot time is later than the current time.
514    /// Returns an error if time seems incorrect, e.g. update_finish_time is in the future.
515    fn report_waited_for_reboot_duration(
516        &mut self,
517        update_finish_time: SystemTime,
518        state_machine_start_monotonic_time: Instant,
519        now: ComplexTime,
520    ) -> Result<(), anyhow::Error> {
521        // If `update_finish_time` is in the future we don't have correct time, try again
522        // on the next loop.
523        let update_finish_time_to_now =
524            now.wall_duration_since(update_finish_time).map_err(|e| {
525                anyhow!(
526                    "Update finish time later than now, can't report waited for reboot duration,
527                    update finish time: {:?}, now: {:?}, error: {:?}",
528                    update_finish_time,
529                    now,
530                    e,
531                )
532            })?;
533
534        // It might take a while for us to get here, but we only want to report the
535        // time from update finish to state machine start after reboot, so we subtract
536        // the duration since then using monotonic time.
537
538        // We only want to report this metric if we can actually compute it.
539        // If for whatever reason the clock was wrong on the previous boot, or monotonic
540        // time is going backwards, better not to report this metric than to report an
541        // incorrect default value.
542        let state_machine_start_to_now = now
543            .mono
544            .checked_duration_since(state_machine_start_monotonic_time)
545            .ok_or_else(|| {
546                error!("Monotonic time appears to have gone backwards");
547                anyhow!(
548                    "State machine start later than now, can't report waited for reboot duration. \
549                    State machine start: {:?}, now: {:?}",
550                    state_machine_start_monotonic_time,
551                    now.mono,
552                )
553            })?;
554
555        let waited_for_reboot_duration = update_finish_time_to_now
556            .checked_sub(state_machine_start_to_now)
557            .ok_or_else(|| {
558                anyhow!(
559                    "Can't report waiting for reboot duration, update finish time to now smaller \
560                    than state machine start to now. Update finish time to now: {:?}, state \
561                    machine start to now: {:?}",
562                    update_finish_time_to_now,
563                    state_machine_start_to_now,
564                )
565            })?;
566
567        info!(
568            "Waited {} seconds for reboot.",
569            waited_for_reboot_duration.as_secs()
570        );
571        self.report_metrics(Metrics::WaitedForRebootDuration(waited_for_reboot_duration));
572        Ok(())
573    }
574
575    /// Report update check interval based on the last check time stored in storage.
576    /// It will also persist the new last check time to storage.
577    async fn report_check_interval(&mut self, install_source: InstallSource) {
578        let now = self.time_source.now();
579
580        match self.context.schedule.last_update_check_time {
581            // This is our first run; report the interval between that time and now,
582            // and update the context with the complex time.
583            Some(PartialComplexTime::Wall(t)) => match now.wall_duration_since(t) {
584                Ok(interval) => self.report_metrics(Metrics::UpdateCheckInterval {
585                    interval,
586                    clock: ClockType::Wall,
587                    install_source,
588                }),
589                Err(e) => warn!("Last check time is in the future: {}", e),
590            },
591
592            // We've reported an update check before, or we at least have a
593            // PartialComplexTime with a monotonic component. Report our interval
594            // between these Instants. (N.B. strictly speaking, we should only
595            // ever have a PCT::Complex here.)
596            Some(PartialComplexTime::Complex(t)) => match now.mono.checked_duration_since(t.mono) {
597                Some(interval) => self.report_metrics(Metrics::UpdateCheckInterval {
598                    interval,
599                    clock: ClockType::Monotonic,
600                    install_source,
601                }),
602                None => error!("Monotonic time in the past"),
603            },
604
605            // No last check time in storage, and no big deal. We'll continue from
606            // monotonic time from now on. This is the only place other than loading
607            // context from storage where the time can be set, so it's either unset
608            // because no storage, or a complex time. No need to match
609            // Some(PartialComplexTime::Monotonic)
610            _ => {}
611        }
612
613        self.context.schedule.last_update_check_time = now.into();
614    }
615
616    /// Perform update check and handle the result, including updating the update check context
617    /// and cohort.
618    /// Returns whether reboot is needed after the update.
619    async fn start_update_check(
620        &mut self,
621        request_params: RequestParams,
622        co: &mut async_generator::Yield<StateMachineEvent>,
623    ) -> RebootAfterUpdate<IN::InstallResult> {
624        let apps = self.app_set.lock().await.get_apps();
625        let result = self.perform_update_check(request_params, apps, co).await;
626
627        let (result, reboot_after_update) = match result {
628            Ok((result, reboot_after_update)) => {
629                info!("Update check result: {:?}", result);
630                // Update check succeeded, update |last_update_time|.
631                self.context.schedule.last_update_time = Some(self.time_source.now().into());
632
633                // Determine if any app failed to install, or we had a successful update.
634                let install_success =
635                    result.app_responses.iter().fold(None, |result, app| {
636                        match (result, &app.result) {
637                            (_, update_check::Action::InstallPlanExecutionError) => Some(false),
638                            (None, update_check::Action::Updated) => Some(true),
639                            (result, _) => result,
640                        }
641                    });
642
643                // Update check succeeded, reset |consecutive_failed_update_checks| to 0 and
644                // report metrics.
645                self.report_attempts_to_successful_check(true).await;
646
647                self.app_set
648                    .lock()
649                    .await
650                    .update_from_omaha(&result.app_responses);
651
652                // Only report |attempts_to_successful_install| if we get an error trying to
653                // install, or we succeed to install an update without error.
654                if let Some(success) = install_success {
655                    self.report_attempts_to_successful_install(success).await;
656                }
657
658                (Ok(result), reboot_after_update)
659                // TODO: update consecutive_proxied_requests
660            }
661            Err(error) => {
662                error!("Update check failed: {:?}", error);
663
664                let failure_reason = match &error {
665                    UpdateCheckError::ResponseParser(_) | UpdateCheckError::InstallPlan(_) => {
666                        // We talked to Omaha, update |last_update_time|.
667                        self.context.schedule.last_update_time =
668                            Some(self.time_source.now().into());
669
670                        UpdateCheckFailureReason::Omaha
671                    }
672                    UpdateCheckError::OmahaRequest(request_error) => match request_error {
673                        OmahaRequestError::Json(_)
674                        | OmahaRequestError::HttpBuilder(_)
675                        | OmahaRequestError::CupDecoration(_)
676                        | OmahaRequestError::CupValidation(_) => UpdateCheckFailureReason::Internal,
677                        OmahaRequestError::HttpTransport(_) | OmahaRequestError::HttpStatus(_) => {
678                            UpdateCheckFailureReason::Network
679                        }
680                    },
681                };
682                self.report_metrics(Metrics::UpdateCheckFailureReason(failure_reason));
683
684                self.report_attempts_to_successful_check(false).await;
685                (Err(error), RebootAfterUpdate::NotNeeded)
686            }
687        };
688
689        co.yield_(StateMachineEvent::ScheduleChange(self.context.schedule))
690            .await;
691        co.yield_(StateMachineEvent::ProtocolStateChange(
692            self.context.state.clone(),
693        ))
694        .await;
695        co.yield_(StateMachineEvent::UpdateCheckResult(result))
696            .await;
697
698        self.persist_data().await;
699
700        reboot_after_update
701    }
702
703    // Update self.context.state.consecutive_failed_update_checks and report the metric if
704    // `success`. Does not persist the value to storage, but rather relies on the caller.
705    async fn report_attempts_to_successful_check(&mut self, success: bool) {
706        let attempts = self.context.state.consecutive_failed_update_checks + 1;
707        if success {
708            self.context.state.consecutive_failed_update_checks = 0;
709            self.report_metrics(Metrics::AttemptsToSuccessfulCheck(attempts as u64));
710        } else {
711            self.context.state.consecutive_failed_update_checks = attempts;
712        }
713    }
714
715    /// Update `CONSECUTIVE_FAILED_INSTALL_ATTEMPTS` in storage and report the metrics if
716    /// `success`. Does not commit the change to storage.
717    async fn report_attempts_to_successful_install(&mut self, success: bool) {
718        let storage_ref = self.storage_ref.clone();
719        let mut storage = storage_ref.lock().await;
720        let attempts = storage
721            .get_int(CONSECUTIVE_FAILED_INSTALL_ATTEMPTS)
722            .await
723            .unwrap_or(0)
724            + 1;
725
726        self.report_metrics(Metrics::AttemptsToSuccessfulInstall {
727            count: attempts as u64,
728            successful: success,
729        });
730
731        if success {
732            storage
733                .remove_or_log(CONSECUTIVE_FAILED_INSTALL_ATTEMPTS)
734                .await;
735        } else if let Err(e) = storage
736            .set_int(CONSECUTIVE_FAILED_INSTALL_ATTEMPTS, attempts)
737            .await
738        {
739            error!(
740                "Unable to persist {}: {}",
741                CONSECUTIVE_FAILED_INSTALL_ATTEMPTS, e
742            );
743        }
744    }
745
746    /// Persist all necessary data to storage.
747    async fn persist_data(&self) {
748        let mut storage = self.storage_ref.lock().await;
749        self.context.persist(&mut *storage).await;
750        self.app_set.lock().await.persist(&mut *storage).await;
751
752        storage.commit_or_log().await;
753    }
754
755    /// This function constructs the chain of async futures needed to perform all of the async tasks
756    /// that comprise an update check.
757    async fn perform_update_check(
758        &mut self,
759        request_params: RequestParams,
760        apps: Vec<App>,
761        co: &mut async_generator::Yield<StateMachineEvent>,
762    ) -> Result<(update_check::Response, RebootAfterUpdate<IN::InstallResult>), UpdateCheckError>
763    {
764        Self::yield_state(State::CheckingForUpdates(request_params.source), co).await;
765
766        self.report_check_interval(request_params.source).await;
767
768        // Construct a request for the app(s).
769        let config = self.config.clone();
770        let mut request_builder = RequestBuilder::new(&config, &request_params);
771        for app in &apps {
772            request_builder = request_builder.add_update_check(app).add_ping(app);
773        }
774        let session_id = GUID::new();
775        request_builder = request_builder.session_id(session_id.clone());
776
777        let mut omaha_request_attempt = 1;
778
779        // Attempt in an loop of up to MAX_OMAHA_REQUEST_ATTEMPTS to communicate with Omaha.
780        // exit the loop early on success or an error that isn't related to a transport issue.
781        let loop_result = loop {
782            // Mark the start time for the request to omaha.
783            let omaha_check_start_time = self.time_source.now_in_monotonic();
784            request_builder = request_builder.request_id(GUID::new());
785            let result = self
786                .do_omaha_request_and_update_context(&request_builder, co)
787                .await;
788
789            // Report the response time of the omaha request.
790            {
791                // don't use Instant::elapsed(), it doesn't use the right TimeSource, and can panic!
792                // as a result
793                let now = self.time_source.now_in_monotonic();
794                let duration = now.checked_duration_since(omaha_check_start_time);
795
796                if let Some(response_time) = duration {
797                    self.report_metrics(Metrics::UpdateCheckResponseTime {
798                        response_time,
799                        successful: result.is_ok(),
800                    });
801                } else {
802                    // If this happens, it's a bug.
803                    error!(
804                        "now: {:?}, is before omaha_check_start_time: {:?}",
805                        now, omaha_check_start_time
806                    );
807                }
808            }
809
810            match result {
811                Ok(res) => {
812                    break Ok(res);
813                }
814                Err(OmahaRequestError::Json(e)) => {
815                    error!("Unable to construct request body! {:?}", e);
816                    Self::yield_state(State::ErrorCheckingForUpdate, co).await;
817                    break Err(UpdateCheckError::OmahaRequest(e.into()));
818                }
819                Err(OmahaRequestError::HttpBuilder(e)) => {
820                    error!("Unable to construct HTTP request! {:?}", e);
821                    Self::yield_state(State::ErrorCheckingForUpdate, co).await;
822                    break Err(UpdateCheckError::OmahaRequest(e.into()));
823                }
824                Err(OmahaRequestError::CupDecoration(e)) => {
825                    error!(
826                        "Unable to decorate HTTP request with CUPv2 parameters! {:?}",
827                        e
828                    );
829                    Self::yield_state(State::ErrorCheckingForUpdate, co).await;
830                    break Err(UpdateCheckError::OmahaRequest(e.into()));
831                }
832                Err(OmahaRequestError::CupValidation(e)) => {
833                    error!(
834                        "Unable to validate HTTP response with CUPv2 parameters! {:?}",
835                        e
836                    );
837                    Self::yield_state(State::ErrorCheckingForUpdate, co).await;
838                    break Err(UpdateCheckError::OmahaRequest(e.into()));
839                }
840                Err(OmahaRequestError::HttpTransport(e)) => {
841                    warn!("Unable to contact Omaha: {:?}", e);
842                    // Don't retry if the error was caused by user code, which means we weren't
843                    // using the library correctly.
844                    if omaha_request_attempt >= MAX_OMAHA_REQUEST_ATTEMPTS
845                        || e.is_user()
846                        || self.context.state.server_dictated_poll_interval.is_some()
847                    {
848                        Self::yield_state(State::ErrorCheckingForUpdate, co).await;
849                        break Err(UpdateCheckError::OmahaRequest(e.into()));
850                    }
851                }
852                Err(OmahaRequestError::HttpStatus(e)) => {
853                    warn!("Unable to contact Omaha: {:?}", e);
854                    if omaha_request_attempt >= MAX_OMAHA_REQUEST_ATTEMPTS
855                        || self.context.state.server_dictated_poll_interval.is_some()
856                    {
857                        Self::yield_state(State::ErrorCheckingForUpdate, co).await;
858                        break Err(UpdateCheckError::OmahaRequest(e.into()));
859                    }
860                }
861            }
862
863            // TODO(https://fxbug.dev/42117854): Move this to Policy.
864            // Randomized exponential backoff of 1, 2, & 4 seconds, +/- 500ms.
865            let backoff_time_secs = 1 << (omaha_request_attempt - 1);
866            let backoff_time = randomize(backoff_time_secs * 1000, 1000);
867            info!("Waiting {} ms before retrying...", backoff_time);
868            self.timer
869                .wait_for(Duration::from_millis(backoff_time))
870                .await;
871
872            omaha_request_attempt += 1;
873        };
874
875        self.report_metrics(Metrics::RequestsPerCheck {
876            count: omaha_request_attempt,
877            successful: loop_result.is_ok(),
878        });
879
880        let (_parts, data, request_metadata, signature) = loop_result?;
881
882        let response = match Self::parse_omaha_response(&data) {
883            Ok(res) => res,
884            Err(err) => {
885                warn!("Unable to parse Omaha response: {:?}", err);
886                Self::yield_state(State::ErrorCheckingForUpdate, co).await;
887                self.report_omaha_event_and_update_context(
888                    &request_params,
889                    Event::error(EventErrorCode::ParseResponse),
890                    &apps,
891                    &session_id,
892                    &apps.iter().map(|app| (app.id.clone(), None)).collect(),
893                    None,
894                    co,
895                )
896                .await;
897                return Err(UpdateCheckError::ResponseParser(err));
898            }
899        };
900
901        info!("result: {:?}", response);
902
903        co.yield_(StateMachineEvent::OmahaServerResponse(response.clone()))
904            .await;
905
906        let statuses = Self::get_app_update_statuses(&response);
907        for (app_id, status) in &statuses {
908            // TODO:  Report or metric statuses other than 'no-update' and 'ok'
909            info!("Omaha update check status: {} => {:?}", app_id, status);
910        }
911
912        let apps_with_update: Vec<_> = response
913            .apps
914            .iter()
915            .filter(|app| {
916                matches!(
917                    app.update_check,
918                    Some(UpdateCheck {
919                        status: OmahaStatus::Ok,
920                        ..
921                    })
922                )
923            })
924            .collect();
925
926        if apps_with_update.is_empty() {
927            // A successful, no-update, check
928
929            Self::yield_state(State::NoUpdateAvailable, co).await;
930            Self::make_not_updated_result(response, update_check::Action::NoUpdate)
931        } else {
932            info!(
933                "At least one app has an update, proceeding to build and process an Install Plan"
934            );
935            // A map from app id to the new version of the app, if an app has no update, then it
936            // won't appear in this map, if an app has update but there's no version in the omaha
937            // response, then its entry will be None.
938            let next_versions: HashMap<String, Option<String>> = apps_with_update
939                .iter()
940                .map(|app| (app.id.clone(), app.get_manifest_version()))
941                .collect();
942            let install_plan = match self
943                .installer
944                .try_create_install_plan(
945                    &request_params,
946                    request_metadata.as_ref(),
947                    &response,
948                    data,
949                    signature.map(|s| s.as_bytes().to_vec()),
950                )
951                .await
952            {
953                Ok(plan) => plan,
954                Err(e) => {
955                    error!("Unable to construct install plan! {}", e);
956                    Self::yield_state(State::InstallingUpdate, co).await;
957                    Self::yield_state(State::InstallationError, co).await;
958                    self.report_omaha_event_and_update_context(
959                        &request_params,
960                        Event::error(EventErrorCode::ConstructInstallPlan),
961                        &apps,
962                        &session_id,
963                        &next_versions,
964                        None,
965                        co,
966                    )
967                    .await;
968                    return Err(UpdateCheckError::InstallPlan(e.into()));
969                }
970            };
971
972            info!("Validating Install Plan with Policy");
973            let install_plan_decision = self.policy_engine.update_can_start(&install_plan).await;
974            match install_plan_decision {
975                UpdateDecision::Ok => {
976                    info!("Proceeding with install plan.");
977                }
978                UpdateDecision::DeferredByPolicy => {
979                    info!("Install plan was deferred by Policy.");
980                    // Report "error" to Omaha (as this is an event that needs reporting as the
981                    // install isn't starting immediately.
982                    let event = Event {
983                        event_type: EventType::UpdateComplete,
984                        event_result: EventResult::UpdateDeferred,
985                        ..Event::default()
986                    };
987                    self.report_omaha_event_and_update_context(
988                        &request_params,
989                        event,
990                        &apps,
991                        &session_id,
992                        &next_versions,
993                        None,
994                        co,
995                    )
996                    .await;
997
998                    Self::yield_state(State::InstallationDeferredByPolicy, co).await;
999
1000                    return Self::make_not_updated_result(
1001                        response,
1002                        update_check::Action::DeferredByPolicy,
1003                    );
1004                }
1005                UpdateDecision::DeniedByPolicy => {
1006                    warn!("Install plan was denied by Policy, see Policy logs for reasoning");
1007                    self.report_omaha_event_and_update_context(
1008                        &request_params,
1009                        Event::error(EventErrorCode::DeniedByPolicy),
1010                        &apps,
1011                        &session_id,
1012                        &next_versions,
1013                        None,
1014                        co,
1015                    )
1016                    .await;
1017
1018                    return Self::make_not_updated_result(
1019                        response,
1020                        update_check::Action::DeniedByPolicy,
1021                    );
1022                }
1023            }
1024
1025            Self::yield_state(State::InstallingUpdate, co).await;
1026            self.report_omaha_event_and_update_context(
1027                &request_params,
1028                Event::success(EventType::UpdateDownloadStarted),
1029                &apps,
1030                &session_id,
1031                &next_versions,
1032                None,
1033                co,
1034            )
1035            .await;
1036
1037            let install_plan_id = install_plan.id();
1038            let update_start_time = self.time_source.now_in_walltime();
1039            let update_first_seen_time = self
1040                .record_update_first_seen_time(&install_plan_id, update_start_time)
1041                .await;
1042
1043            let (send, mut recv) = mpsc::channel(0);
1044            let observer = StateMachineProgressObserver(send);
1045            let perform_install = async {
1046                let result = self
1047                    .installer
1048                    .perform_install(&install_plan, Some(&observer))
1049                    .await;
1050                // Drop observer so that we can stop waiting for the next progress.
1051                drop(observer);
1052                result
1053            };
1054            let yield_progress = async {
1055                while let Some(progress) = recv.next().await {
1056                    co.yield_(StateMachineEvent::InstallProgressChange(progress))
1057                        .await;
1058                }
1059            };
1060
1061            let ((install_result, mut app_install_results), ()) =
1062                future::join(perform_install, yield_progress).await;
1063            let no_apps_failed = app_install_results.iter().all(|result| {
1064                matches!(
1065                    result,
1066                    AppInstallResult::Installed | AppInstallResult::Deferred
1067                )
1068            });
1069            let update_finish_time = self.time_source.now_in_walltime();
1070            let install_duration = match update_finish_time.duration_since(update_start_time) {
1071                Ok(duration) => {
1072                    let metrics = if no_apps_failed {
1073                        Metrics::SuccessfulUpdateDuration(duration)
1074                    } else {
1075                        Metrics::FailedUpdateDuration(duration)
1076                    };
1077                    self.report_metrics(metrics);
1078                    Some(duration)
1079                }
1080                Err(e) => {
1081                    warn!("Update start time is in the future: {}", e);
1082                    None
1083                }
1084            };
1085
1086            let config = self.config.clone();
1087            let mut request_builder = RequestBuilder::new(&config, &request_params);
1088            let mut events = vec![];
1089            let mut installed_apps = vec![];
1090            for (response_app, app_install_result) in
1091                apps_with_update.iter().zip(&app_install_results)
1092            {
1093                match apps.iter().find(|app| app.id == response_app.id) {
1094                    Some(app) => {
1095                        let event = match app_install_result {
1096                            AppInstallResult::Installed => {
1097                                installed_apps.push(app);
1098                                Event::success(EventType::UpdateDownloadFinished)
1099                            }
1100                            AppInstallResult::Deferred => Event {
1101                                event_type: EventType::UpdateComplete,
1102                                event_result: EventResult::UpdateDeferred,
1103                                ..Event::default()
1104                            },
1105                            AppInstallResult::Failed(_) => {
1106                                Event::error(EventErrorCode::Installation)
1107                            }
1108                        };
1109                        let event = Event {
1110                            previous_version: Some(app.version.to_string()),
1111                            next_version: response_app.get_manifest_version(),
1112                            download_time_ms: install_duration
1113                                .and_then(|d| d.as_millis().try_into().ok()),
1114                            ..event
1115                        };
1116                        request_builder = request_builder.add_event(app, event.clone());
1117                        events.push(event);
1118                    }
1119                    None => {
1120                        error!("unknown app id in omaha response: {:?}", response_app.id);
1121                    }
1122                }
1123            }
1124            request_builder = request_builder
1125                .session_id(session_id.clone())
1126                .request_id(GUID::new());
1127            if let Err(e) = self
1128                .do_omaha_request_and_update_context(&request_builder, co)
1129                .await
1130            {
1131                for event in events {
1132                    self.report_metrics(Metrics::OmahaEventLost(event));
1133                }
1134                warn!("Unable to report event to Omaha: {:?}", e);
1135            }
1136
1137            // TODO: Verify downloaded update if needed.
1138
1139            // For apps that successfully installed, we need to report an extra `UpdateComplete` event.
1140            if !installed_apps.is_empty() {
1141                self.report_omaha_event_and_update_context(
1142                    &request_params,
1143                    Event::success(EventType::UpdateComplete),
1144                    installed_apps,
1145                    &session_id,
1146                    &next_versions,
1147                    install_duration,
1148                    co,
1149                )
1150                .await;
1151            }
1152
1153            let mut errors = vec![];
1154            let daystart = response.daystart;
1155            let app_responses = response
1156                .apps
1157                .into_iter()
1158                .map(|app| update_check::AppResponse {
1159                    app_id: app.id,
1160                    cohort: app.cohort,
1161                    user_counting: daystart.clone().into(),
1162                    result: match app.update_check {
1163                        Some(UpdateCheck {
1164                            status: OmahaStatus::Ok,
1165                            ..
1166                        }) => match app_install_results.remove(0) {
1167                            AppInstallResult::Installed => update_check::Action::Updated,
1168                            AppInstallResult::Deferred => update_check::Action::DeferredByPolicy,
1169                            AppInstallResult::Failed(e) => {
1170                                errors.push(e);
1171                                update_check::Action::InstallPlanExecutionError
1172                            }
1173                        },
1174                        _ => update_check::Action::NoUpdate,
1175                    },
1176                })
1177                .collect();
1178
1179            if !errors.is_empty() {
1180                for e in errors {
1181                    co.yield_(StateMachineEvent::InstallerError(Some(Box::new(e))))
1182                        .await;
1183                }
1184                Self::yield_state(State::InstallationError, co).await;
1185
1186                return Ok((
1187                    update_check::Response { app_responses },
1188                    RebootAfterUpdate::NotNeeded,
1189                ));
1190            }
1191
1192            match update_finish_time.duration_since(update_first_seen_time) {
1193                Ok(duration) => {
1194                    self.report_metrics(Metrics::SuccessfulUpdateFromFirstSeen(duration))
1195                }
1196                Err(e) => warn!("Update first seen time is in the future: {}", e),
1197            }
1198            {
1199                let mut storage = self.storage_ref.lock().await;
1200                if let Err(e) = storage
1201                    .set_time(UPDATE_FINISH_TIME, update_finish_time)
1202                    .await
1203                {
1204                    error!("Unable to persist {}: {}", UPDATE_FINISH_TIME, e);
1205                }
1206                let app_set = self.app_set.lock().await;
1207                let system_app_id = app_set.get_system_app_id();
1208                // If not found then this is not a system update, so no need to write target version.
1209                if let Some(next_version) = next_versions.get(system_app_id) {
1210                    let target_version = next_version.as_deref().unwrap_or_else(|| {
1211                        error!("Target version string not found in Omaha response.");
1212                        "UNKNOWN"
1213                    });
1214                    if let Err(e) = storage.set_string(TARGET_VERSION, target_version).await {
1215                        error!("Unable to persist {}: {}", TARGET_VERSION, e);
1216                    }
1217                }
1218                storage.commit_or_log().await;
1219            }
1220
1221            let reboot_after_update = if self.policy_engine.reboot_needed(&install_plan).await {
1222                RebootAfterUpdate::Needed(install_result)
1223            } else {
1224                RebootAfterUpdate::NotNeeded
1225            };
1226
1227            Ok((
1228                update_check::Response { app_responses },
1229                reboot_after_update,
1230            ))
1231        }
1232    }
1233
1234    /// Report the given |event| to Omaha, errors occurred during reporting are logged but not
1235    /// acted on.
1236    #[allow(clippy::too_many_arguments)]
1237    async fn report_omaha_event_and_update_context<'a>(
1238        &'a mut self,
1239        request_params: &'a RequestParams,
1240        event: Event,
1241        apps: impl IntoIterator<Item = &App>,
1242        session_id: &GUID,
1243        next_versions: &HashMap<String, Option<String>>,
1244        install_duration: Option<Duration>,
1245        co: &mut async_generator::Yield<StateMachineEvent>,
1246    ) {
1247        let config = self.config.clone();
1248        let mut request_builder = RequestBuilder::new(&config, request_params);
1249        for app in apps {
1250            // Skip apps with no update.
1251            if let Some(next_version) = next_versions.get(&app.id) {
1252                let event = Event {
1253                    previous_version: Some(app.version.to_string()),
1254                    next_version: next_version.clone(),
1255                    download_time_ms: install_duration.and_then(|d| d.as_millis().try_into().ok()),
1256                    ..event.clone()
1257                };
1258                request_builder = request_builder.add_event(app, event);
1259            }
1260        }
1261        request_builder = request_builder
1262            .session_id(session_id.clone())
1263            .request_id(GUID::new());
1264        if let Err(e) = self
1265            .do_omaha_request_and_update_context(&request_builder, co)
1266            .await
1267        {
1268            self.report_metrics(Metrics::OmahaEventLost(event));
1269            warn!("Unable to report event to Omaha: {:?}", e);
1270        }
1271    }
1272
1273    /// Sends a ping to Omaha and updates context and app_set.
1274    async fn ping_omaha(&mut self, co: &mut async_generator::Yield<StateMachineEvent>) {
1275        let apps = self.app_set.lock().await.get_apps();
1276        let request_params = RequestParams {
1277            source: InstallSource::ScheduledTask,
1278            use_configured_proxies: true,
1279            disable_updates: false,
1280            offer_update_if_same_version: false,
1281        };
1282        let config = self.config.clone();
1283        let mut request_builder = RequestBuilder::new(&config, &request_params);
1284        for app in &apps {
1285            request_builder = request_builder.add_ping(app);
1286        }
1287        request_builder = request_builder
1288            .session_id(GUID::new())
1289            .request_id(GUID::new());
1290
1291        let (_parts, data, _request_metadata, _signature) = match self
1292            .do_omaha_request_and_update_context(&request_builder, co)
1293            .await
1294        {
1295            Ok(res) => res,
1296            Err(e) => {
1297                error!("Ping Omaha failed: {:#}", anyhow!(e));
1298                self.context.state.consecutive_failed_update_checks += 1;
1299                self.persist_data().await;
1300                return;
1301            }
1302        };
1303
1304        let response = match Self::parse_omaha_response(&data) {
1305            Ok(res) => res,
1306            Err(e) => {
1307                error!("Unable to parse Omaha response: {:#}", anyhow!(e));
1308                self.context.state.consecutive_failed_update_checks += 1;
1309                self.persist_data().await;
1310                return;
1311            }
1312        };
1313
1314        self.context.state.consecutive_failed_update_checks = 0;
1315
1316        // Even though this is a ping, we should still update the last_update_time for
1317        // policy to compute the next ping time.
1318        self.context.schedule.last_update_time = Some(self.time_source.now().into());
1319        co.yield_(StateMachineEvent::ScheduleChange(self.context.schedule))
1320            .await;
1321
1322        let app_responses = Self::make_app_responses(response, update_check::Action::NoUpdate);
1323        self.app_set.lock().await.update_from_omaha(&app_responses);
1324
1325        self.persist_data().await;
1326    }
1327
1328    /// Make an http request to Omaha, and collect the response into an error or a blob of bytes
1329    /// that can be parsed.
1330    ///
1331    /// Given the http client and the request build, this makes the http request, and then coalesces
1332    /// the various errors into a single error type for easier error handling by the make process
1333    /// flow.
1334    ///
1335    /// This function also converts an HTTP error response into an Error, to divert those into the
1336    /// error handling paths instead of the Ok() path.
1337    ///
1338    /// If a valid X-Retry-After header is found in the response, this function will update the
1339    /// server dictated poll interval in context.
1340    async fn do_omaha_request_and_update_context<'a>(
1341        &'a mut self,
1342        builder: &RequestBuilder<'a>,
1343        co: &mut async_generator::Yield<StateMachineEvent>,
1344    ) -> Result<
1345        (
1346            Parts,
1347            Vec<u8>,
1348            Option<RequestMetadata>,
1349            Option<DerSignature>,
1350        ),
1351        OmahaRequestError,
1352    > {
1353        let (request, request_metadata) = builder.build(self.cup_handler.as_ref())?;
1354        let response = Self::make_request(&mut self.http, request).await?;
1355
1356        let signature: Option<DerSignature> = if let (Some(handler), Some(metadata)) =
1357            (self.cup_handler.as_ref(), &request_metadata)
1358        {
1359            let signature = handler
1360                .verify_response(metadata, &response, metadata.public_key_id)
1361                .map_err(|e| {
1362                    error!("Could not verify response: {:?}", e);
1363                    e
1364                })?;
1365            Some(signature)
1366        } else {
1367            None
1368        };
1369
1370        let (parts, body) = response.into_parts();
1371
1372        // Clients MUST respect this header even if paired with non-successful HTTP response code.
1373        let server_dictated_poll_interval = parts.headers.get(X_RETRY_AFTER).and_then(|header| {
1374            match header
1375                .to_str()
1376                .map_err(|e| anyhow!(e))
1377                .and_then(|s| s.parse::<u64>().map_err(|e| anyhow!(e)))
1378            {
1379                Ok(seconds) => {
1380                    // Servers SHOULD NOT send a value in excess of 86400 (24 hours), and clients
1381                    // SHOULD treat values greater than 86400 as 86400.
1382                    Some(Duration::from_secs(min(seconds, 86400)))
1383                }
1384                Err(e) => {
1385                    error!("Unable to parse {} header: {:#}", X_RETRY_AFTER, e);
1386                    None
1387                }
1388            }
1389        });
1390        if self.context.state.server_dictated_poll_interval != server_dictated_poll_interval {
1391            self.context.state.server_dictated_poll_interval = server_dictated_poll_interval;
1392            co.yield_(StateMachineEvent::ProtocolStateChange(
1393                self.context.state.clone(),
1394            ))
1395            .await;
1396            let mut storage = self.storage_ref.lock().await;
1397            self.context.persist(&mut *storage).await;
1398            storage.commit_or_log().await;
1399        }
1400        if !parts.status.is_success() {
1401            // Convert HTTP failure responses into Errors.
1402            Err(OmahaRequestError::HttpStatus(parts.status))
1403        } else {
1404            // Pass successful responses to the caller.
1405            info!("Omaha HTTP response: {}", parts.status);
1406            Ok((parts, body, request_metadata, signature))
1407        }
1408    }
1409
1410    /// Make an http request and collect the response body into a Vec of bytes.
1411    ///
1412    /// Specifically, this takes the body of the response and concatenates it into a single Vec of
1413    /// bytes so that any errors in receiving it can be captured immediately, instead of needing to
1414    /// handle them as part of parsing the response body.
1415    async fn make_request(
1416        http_client: &mut HR,
1417        request: http::Request<hyper::Body>,
1418    ) -> Result<HttpResponse<Vec<u8>>, http_request::Error> {
1419        info!("Making http request to: {}", request.uri());
1420        http_client.request(request).await.map_err(|err| {
1421            warn!("Unable to perform request: {}", err);
1422            err
1423        })
1424    }
1425
1426    /// This method takes the response bytes from Omaha, and converts them into a protocol::Response
1427    /// struct, returning all of the various errors that can occur in that process as a consolidated
1428    /// error enum.
1429    fn parse_omaha_response(data: &[u8]) -> Result<Response, ResponseParseError> {
1430        parse_json_response(data).map_err(ResponseParseError::Json)
1431    }
1432
1433    /// Utility to extract pairs of app id => omaha status response, to make it easier to ask
1434    /// questions about the response.
1435    fn get_app_update_statuses(response: &Response) -> Vec<(&str, &OmahaStatus)> {
1436        response
1437            .apps
1438            .iter()
1439            .filter_map(|app| {
1440                app.update_check
1441                    .as_ref()
1442                    .map(|u| (app.id.as_str(), &u.status))
1443            })
1444            .collect()
1445    }
1446
1447    /// Utility to take a set of protocol::response::Apps and then construct a set of AppResponse
1448    /// from the update check based on those app IDs.
1449    ///
1450    /// TODO(https://fxbug.dev/42170288): Change the Policy and Installer to return a set of results, one for
1451    ///                        each app ID, then make this match that.
1452    fn make_app_responses(
1453        response: protocol::response::Response,
1454        action: update_check::Action,
1455    ) -> Vec<update_check::AppResponse> {
1456        let daystart = response.daystart;
1457        response
1458            .apps
1459            .into_iter()
1460            .map(|app| update_check::AppResponse {
1461                app_id: app.id,
1462                cohort: app.cohort,
1463                user_counting: daystart.clone().into(),
1464                result: action.clone(),
1465            })
1466            .collect()
1467    }
1468
1469    /// Make an Ok result for `perform_update_check()` when update wasn't installed/failed.
1470    fn make_not_updated_result(
1471        response: protocol::response::Response,
1472        action: update_check::Action,
1473    ) -> Result<(update_check::Response, RebootAfterUpdate<IN::InstallResult>), UpdateCheckError>
1474    {
1475        Ok((
1476            update_check::Response {
1477                app_responses: Self::make_app_responses(response, action),
1478            },
1479            RebootAfterUpdate::NotNeeded,
1480        ))
1481    }
1482
1483    /// Send the state to the observer.
1484    async fn yield_state(state: State, co: &mut async_generator::Yield<StateMachineEvent>) {
1485        co.yield_(StateMachineEvent::StateChange(state)).await;
1486    }
1487
1488    fn report_metrics(&mut self, metrics: Metrics) {
1489        if let Err(err) = self.metrics_reporter.report_metrics(metrics) {
1490            warn!("Unable to report metrics: {:?}", err);
1491        }
1492    }
1493
1494    async fn record_update_first_seen_time(
1495        &mut self,
1496        install_plan_id: &str,
1497        now: SystemTime,
1498    ) -> SystemTime {
1499        let mut storage = self.storage_ref.lock().await;
1500        let previous_id = storage.get_string(INSTALL_PLAN_ID).await;
1501        if let Some(previous_id) = previous_id
1502            && previous_id == install_plan_id
1503        {
1504            return storage
1505                .get_time(UPDATE_FIRST_SEEN_TIME)
1506                .await
1507                .unwrap_or(now);
1508        }
1509        // Update INSTALL_PLAN_ID and UPDATE_FIRST_SEEN_TIME for new update.
1510        if let Err(e) = storage.set_string(INSTALL_PLAN_ID, install_plan_id).await {
1511            error!("Unable to persist {}: {}", INSTALL_PLAN_ID, e);
1512            return now;
1513        }
1514        if let Err(e) = storage.set_time(UPDATE_FIRST_SEEN_TIME, now).await {
1515            error!("Unable to persist {}: {}", UPDATE_FIRST_SEEN_TIME, e);
1516            let _ = storage.remove(INSTALL_PLAN_ID).await;
1517            return now;
1518        }
1519        storage.commit_or_log().await;
1520        now
1521    }
1522}
1523
1524/// Return a random number in [n - range / 2, n - range / 2 + range).
1525fn randomize(n: u64, range: u64) -> u64 {
1526    n - range / 2 + rand::random::<u64>() % range
1527}
1528
1529#[cfg(test)]
1530impl<PE, HR, IN, TM, MR, ST, AS, IR, PL, CH> StateMachine<PE, HR, IN, TM, MR, ST, AS, CH>
1531where
1532    PE: PolicyEngine<InstallResult = IR, InstallPlan = PL>,
1533    HR: HttpRequest,
1534    IN: Installer<InstallResult = IR, InstallPlan = PL>,
1535    TM: Timer,
1536    MR: MetricsReporter,
1537    ST: Storage,
1538    AS: AppSet,
1539    CH: Cupv2Handler,
1540    IR: 'static + Send,
1541    PL: Plan,
1542{
1543    /// Run perform_update_check once, returning the update check result.
1544    async fn oneshot(
1545        &mut self,
1546        request_params: RequestParams,
1547    ) -> Result<(update_check::Response, RebootAfterUpdate<IN::InstallResult>), UpdateCheckError>
1548    {
1549        let apps = self.app_set.lock().await.get_apps();
1550
1551        async_generator::generate(move |mut co| async move {
1552            self.perform_update_check(request_params, apps, &mut co)
1553                .await
1554        })
1555        .into_complete()
1556        .await
1557    }
1558
1559    /// Run start_upate_check once, discarding its states.
1560    async fn run_once(&mut self) {
1561        let request_params = RequestParams::default();
1562
1563        async_generator::generate(move |mut co| async move {
1564            self.start_update_check(request_params, &mut co).await;
1565        })
1566        .map(|_| ())
1567        .collect::<()>()
1568        .await;
1569    }
1570}
1571
1572#[cfg(test)]
1573mod tests {
1574    use super::update_check::{
1575        Action, CONSECUTIVE_FAILED_UPDATE_CHECKS, LAST_UPDATE_TIME, SERVER_DICTATED_POLL_INTERVAL,
1576    };
1577    use super::*;
1578    use crate::{
1579        app_set::VecAppSet,
1580        common::{
1581            App, CheckOptions, PersistedApp, ProtocolState, UpdateCheckSchedule, UserCounting,
1582        },
1583        configuration::Updater,
1584        cup_ecdsa::test_support::{MockCupv2Handler, make_cup_handler_for_test},
1585        http_request::mock::MockHttpRequest,
1586        installer::{
1587            ProgressObserver,
1588            stub::{StubInstallErrors, StubInstaller, StubPlan},
1589        },
1590        metrics::MockMetricsReporter,
1591        policy::{MockPolicyEngine, StubPolicyEngine},
1592        protocol::{Cohort, request::OS, response},
1593        storage::MemStorage,
1594        time::{
1595            MockTimeSource, PartialComplexTime,
1596            timers::{BlockingTimer, MockTimer, RequestedWait},
1597        },
1598        version::Version,
1599    };
1600    use assert_matches::assert_matches;
1601    use futures::executor::{LocalPool, block_on};
1602    use futures::future::LocalBoxFuture;
1603    use futures::task::LocalSpawnExt;
1604    use pretty_assertions::assert_eq;
1605    use serde_json::json;
1606    use std::cell::RefCell;
1607    use std::time::Duration;
1608
1609    fn make_test_app_set() -> Rc<Mutex<VecAppSet>> {
1610        Rc::new(Mutex::new(VecAppSet::new(vec![
1611            App::builder()
1612                .id("{00000000-0000-0000-0000-000000000001}")
1613                .version([1, 2, 3, 4])
1614                .cohort(Cohort::new("stable-channel"))
1615                .build(),
1616        ])))
1617    }
1618
1619    fn make_update_available_response() -> HttpResponse<Vec<u8>> {
1620        let response = json!({"response":{
1621          "server": "prod",
1622          "protocol": "3.0",
1623          "app": [{
1624            "appid": "{00000000-0000-0000-0000-000000000001}",
1625            "status": "ok",
1626            "updatecheck": {
1627              "status": "ok"
1628            }
1629          }],
1630        }});
1631        HttpResponse::new(serde_json::to_vec(&response).unwrap())
1632    }
1633
1634    fn make_noupdate_httpresponse() -> Vec<u8> {
1635        serde_json::to_vec(
1636            &(json!({"response":{
1637              "server": "prod",
1638              "protocol": "3.0",
1639              "app": [{
1640                "appid": "{00000000-0000-0000-0000-000000000001}",
1641                "status": "ok",
1642                "updatecheck": {
1643                  "status": "noupdate"
1644                }
1645              }]
1646            }})),
1647        )
1648        .unwrap()
1649    }
1650
1651    // Assert that the last request made to |http| is equal to the request built by
1652    // |request_builder|.
1653    async fn assert_request<'a>(http: &MockHttpRequest, request_builder: RequestBuilder<'a>) {
1654        let cup_handler = make_cup_handler_for_test();
1655        let (request, _request_metadata) = request_builder.build(Some(&cup_handler)).unwrap();
1656        let body = hyper::body::to_bytes(request).await.unwrap();
1657        // Compare string instead of Vec<u8> for easier debugging.
1658        let body_str = String::from_utf8_lossy(&body);
1659        http.assert_body_str(&body_str).await;
1660    }
1661
1662    #[test]
1663    fn run_simple_check_with_noupdate_result() {
1664        block_on(async {
1665            let http = MockHttpRequest::new(HttpResponse::new(make_noupdate_httpresponse()));
1666
1667            StateMachineBuilder::new_stub()
1668                .http(http)
1669                .oneshot(RequestParams::default())
1670                .await
1671                .unwrap();
1672
1673            info!("update check complete!");
1674        });
1675    }
1676
1677    #[test]
1678    fn test_cohort_returned_with_noupdate_result() {
1679        block_on(async {
1680            let response = json!({"response":{
1681              "server": "prod",
1682              "protocol": "3.0",
1683              "app": [{
1684                "appid": "{00000000-0000-0000-0000-000000000001}",
1685                "status": "ok",
1686                "cohort": "1",
1687                "cohortname": "stable-channel",
1688                "updatecheck": {
1689                  "status": "noupdate"
1690                }
1691              }]
1692            }});
1693            let response = serde_json::to_vec(&response).unwrap();
1694            let http = MockHttpRequest::new(HttpResponse::new(response));
1695
1696            let (response, reboot_after_update) = StateMachineBuilder::new_stub()
1697                .http(http)
1698                .oneshot(RequestParams::default())
1699                .await
1700                .unwrap();
1701            assert_eq!(
1702                "{00000000-0000-0000-0000-000000000001}",
1703                response.app_responses[0].app_id
1704            );
1705            assert_eq!(Some("1".into()), response.app_responses[0].cohort.id);
1706            assert_eq!(
1707                Some("stable-channel".into()),
1708                response.app_responses[0].cohort.name
1709            );
1710            assert_eq!(None, response.app_responses[0].cohort.hint);
1711
1712            assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
1713        });
1714    }
1715
1716    #[test]
1717    fn test_cohort_returned_with_update_result() {
1718        block_on(async {
1719            let response = json!({"response":{
1720              "server": "prod",
1721              "protocol": "3.0",
1722              "app": [{
1723                "appid": "{00000000-0000-0000-0000-000000000001}",
1724                "status": "ok",
1725                "cohort": "1",
1726                "cohortname": "stable-channel",
1727                "updatecheck": {
1728                  "status": "ok"
1729                }
1730              }]
1731            }});
1732            let response = serde_json::to_vec(&response).unwrap();
1733            let http = MockHttpRequest::new(HttpResponse::new(response));
1734
1735            let (response, reboot_after_update) = StateMachineBuilder::new_stub()
1736                .http(http)
1737                .oneshot(RequestParams::default())
1738                .await
1739                .unwrap();
1740            assert_eq!(
1741                "{00000000-0000-0000-0000-000000000001}",
1742                response.app_responses[0].app_id
1743            );
1744            assert_eq!(Some("1".into()), response.app_responses[0].cohort.id);
1745            assert_eq!(
1746                Some("stable-channel".into()),
1747                response.app_responses[0].cohort.name
1748            );
1749            assert_eq!(None, response.app_responses[0].cohort.hint);
1750
1751            assert_matches!(reboot_after_update, RebootAfterUpdate::Needed(()));
1752        });
1753    }
1754
1755    #[test]
1756    fn test_report_parse_response_error() {
1757        block_on(async {
1758            let http = MockHttpRequest::new(HttpResponse::new("invalid response".into()));
1759
1760            let mut state_machine = StateMachineBuilder::new_stub().http(http).build().await;
1761
1762            let response = state_machine.oneshot(RequestParams::default()).await;
1763            assert_matches!(response, Err(UpdateCheckError::ResponseParser(_)));
1764
1765            let request_params = RequestParams::default();
1766            let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
1767            let event = Event {
1768                previous_version: Some("1.2.3.4".to_string()),
1769                ..Event::error(EventErrorCode::ParseResponse)
1770            };
1771            let apps = state_machine.app_set.lock().await.get_apps();
1772            request_builder = request_builder
1773                .add_event(&apps[0], event)
1774                .session_id(GUID::from_u128(0))
1775                .request_id(GUID::from_u128(2));
1776            assert_request(&state_machine.http, request_builder).await;
1777        });
1778    }
1779
1780    #[test]
1781    fn test_report_construct_install_plan_error() {
1782        block_on(async {
1783            let response = json!({"response":{
1784              "server": "prod",
1785              "protocol": "4.0",
1786              "app": [{
1787                "appid": "{00000000-0000-0000-0000-000000000001}",
1788                "status": "ok",
1789                "updatecheck": {
1790                  "status": "ok"
1791                }
1792              }],
1793            }});
1794            let response = serde_json::to_vec(&response).unwrap();
1795            let http = MockHttpRequest::new(HttpResponse::new(response));
1796
1797            let mut state_machine = StateMachineBuilder::new_stub().http(http).build().await;
1798
1799            let response = state_machine.oneshot(RequestParams::default()).await;
1800            assert_matches!(response, Err(UpdateCheckError::InstallPlan(_)));
1801
1802            let request_params = RequestParams::default();
1803            let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
1804            let event = Event {
1805                previous_version: Some("1.2.3.4".to_string()),
1806                ..Event::error(EventErrorCode::ConstructInstallPlan)
1807            };
1808            let apps = state_machine.app_set.lock().await.get_apps();
1809            request_builder = request_builder
1810                .add_event(&apps[0], event)
1811                .session_id(GUID::from_u128(0))
1812                .request_id(GUID::from_u128(2));
1813            assert_request(&state_machine.http, request_builder).await;
1814        });
1815    }
1816
1817    #[test]
1818    fn test_report_installation_error() {
1819        block_on(async {
1820            let response = json!({"response":{
1821              "server": "prod",
1822              "protocol": "3.0",
1823              "app": [{
1824                "appid": "{00000000-0000-0000-0000-000000000001}",
1825                "status": "ok",
1826                "updatecheck": {
1827                  "status": "ok",
1828                  "manifest": {
1829                      "version": "5.6.7.8",
1830                      "actions": {
1831                          "action": [],
1832                      },
1833                      "packages": {
1834                          "package": [],
1835                      },
1836                  }
1837                }
1838              }],
1839            }});
1840            let response = serde_json::to_vec(&response).unwrap();
1841            let http = MockHttpRequest::new(HttpResponse::new(response));
1842
1843            let mut state_machine = StateMachineBuilder::new_stub()
1844                .http(http)
1845                .installer(StubInstaller { should_fail: true })
1846                .build()
1847                .await;
1848
1849            let (response, reboot_after_update) = state_machine
1850                .oneshot(RequestParams::default())
1851                .await
1852                .unwrap();
1853            assert_eq!(
1854                Action::InstallPlanExecutionError,
1855                response.app_responses[0].result
1856            );
1857            assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
1858
1859            let request_params = RequestParams::default();
1860            let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
1861            let event = Event {
1862                previous_version: Some("1.2.3.4".to_string()),
1863                next_version: Some("5.6.7.8".to_string()),
1864                download_time_ms: Some(0),
1865                ..Event::error(EventErrorCode::Installation)
1866            };
1867            let apps = state_machine.app_set.lock().await.get_apps();
1868            request_builder = request_builder
1869                .add_event(&apps[0], event)
1870                .session_id(GUID::from_u128(0))
1871                .request_id(GUID::from_u128(3));
1872            assert_request(&state_machine.http, request_builder).await;
1873        });
1874    }
1875
1876    #[test]
1877    fn test_report_installation_error_multi_app() {
1878        block_on(async {
1879            // Intentionally made the app order in response and app_set different.
1880            let response = json!({"response":{
1881              "server": "prod",
1882              "protocol": "3.0",
1883              "app": [{
1884                "appid": "appid_3",
1885                "status": "ok",
1886                "updatecheck": {
1887                  "status": "ok",
1888                  "manifest": {
1889                      "version": "5.6.7.8",
1890                      "actions": {
1891                          "action": [],
1892                      },
1893                      "packages": {
1894                          "package": [],
1895                      },
1896                  }
1897                }
1898              },{
1899                "appid": "appid_1",
1900                "status": "ok",
1901                "updatecheck": {
1902                  "status": "ok",
1903                  "manifest": {
1904                      "version": "1.2.3.4",
1905                      "actions": {
1906                          "action": [],
1907                      },
1908                      "packages": {
1909                          "package": [],
1910                      },
1911                  }
1912                }
1913              },{
1914                "appid": "appid_2",
1915                "status": "ok",
1916                "updatecheck": {
1917                  "status": "noupdate",
1918                }
1919              }],
1920            }});
1921            let response = serde_json::to_vec(&response).unwrap();
1922            let mut http = MockHttpRequest::new(HttpResponse::new(response));
1923            http.add_response(HttpResponse::new(vec![]));
1924            let app_set = VecAppSet::new(vec![
1925                App::builder().id("appid_1").version([1, 2, 3, 3]).build(),
1926                App::builder().id("appid_2").version([9, 9, 9, 9]).build(),
1927                App::builder().id("appid_3").version([5, 6, 7, 7]).build(),
1928            ]);
1929            let app_set = Rc::new(Mutex::new(app_set));
1930            let (send_install, mut recv_install) = mpsc::channel(0);
1931
1932            let mut state_machine = StateMachineBuilder::new_stub()
1933                .app_set(Rc::clone(&app_set))
1934                .http(http)
1935                .installer(BlockingInstaller {
1936                    on_install: send_install,
1937                    on_reboot: None,
1938                })
1939                .build()
1940                .await;
1941
1942            let recv_install_fut = async move {
1943                let unblock_install = recv_install.next().await.unwrap();
1944                unblock_install
1945                    .send(vec![
1946                        AppInstallResult::Deferred,
1947                        AppInstallResult::Installed,
1948                    ])
1949                    .unwrap();
1950            };
1951
1952            let (oneshot_result, ()) = future::join(
1953                state_machine.oneshot(RequestParams::default()),
1954                recv_install_fut,
1955            )
1956            .await;
1957            let (response, reboot_after_update) = oneshot_result.unwrap();
1958
1959            assert_eq!("appid_3", response.app_responses[0].app_id);
1960            assert_eq!(Action::DeferredByPolicy, response.app_responses[0].result);
1961            assert_eq!("appid_1", response.app_responses[1].app_id);
1962            assert_eq!(Action::Updated, response.app_responses[1].result);
1963            assert_eq!("appid_2", response.app_responses[2].app_id);
1964            assert_eq!(Action::NoUpdate, response.app_responses[2].result);
1965            assert_matches!(reboot_after_update, RebootAfterUpdate::Needed(()));
1966
1967            let request_params = RequestParams::default();
1968            let apps = app_set.lock().await.get_apps();
1969
1970            let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
1971            let event = Event {
1972                previous_version: Some("1.2.3.3".to_string()),
1973                next_version: Some("1.2.3.4".to_string()),
1974                download_time_ms: Some(0),
1975                ..Event::success(EventType::UpdateComplete)
1976            };
1977            request_builder = request_builder
1978                .add_event(&apps[0], event)
1979                .session_id(GUID::from_u128(0))
1980                .request_id(GUID::from_u128(4));
1981            assert_request(&state_machine.http, request_builder).await;
1982
1983            let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
1984            let event1 = Event {
1985                previous_version: Some("1.2.3.3".to_string()),
1986                next_version: Some("1.2.3.4".to_string()),
1987                download_time_ms: Some(0),
1988                ..Event::success(EventType::UpdateDownloadFinished)
1989            };
1990            let event2 = Event {
1991                previous_version: Some("5.6.7.7".to_string()),
1992                next_version: Some("5.6.7.8".to_string()),
1993                download_time_ms: Some(0),
1994                event_type: EventType::UpdateComplete,
1995                event_result: EventResult::UpdateDeferred,
1996                ..Event::default()
1997            };
1998            request_builder = request_builder
1999                .add_event(&apps[2], event2)
2000                .add_event(&apps[0], event1)
2001                .session_id(GUID::from_u128(0))
2002                .request_id(GUID::from_u128(3));
2003            assert_request(&state_machine.http, request_builder).await;
2004        });
2005    }
2006
2007    // Test that our observer can see when there's an installation error, and that it gets
2008    // the right error type.
2009    #[test]
2010    fn test_observe_installation_error() {
2011        block_on(async {
2012            let http = MockHttpRequest::new(make_update_available_response());
2013
2014            let actual_errors = StateMachineBuilder::new_stub()
2015                .http(http)
2016                .installer(StubInstaller { should_fail: true })
2017                .oneshot_check()
2018                .await
2019                .filter_map(|event| {
2020                    future::ready(match event {
2021                        StateMachineEvent::InstallerError(Some(e)) => {
2022                            Some(*e.downcast::<StubInstallErrors>().unwrap())
2023                        }
2024                        _ => None,
2025                    })
2026                })
2027                .collect::<Vec<StubInstallErrors>>()
2028                .await;
2029
2030            let expected_errors = vec![StubInstallErrors::Failed];
2031            assert_eq!(actual_errors, expected_errors);
2032        });
2033    }
2034
2035    #[test]
2036    fn test_report_deferred_by_policy() {
2037        block_on(async {
2038            let http = MockHttpRequest::new(make_update_available_response());
2039
2040            let policy_engine = MockPolicyEngine {
2041                update_decision: UpdateDecision::DeferredByPolicy,
2042                ..MockPolicyEngine::default()
2043            };
2044            let mut state_machine = StateMachineBuilder::new_stub()
2045                .policy_engine(policy_engine)
2046                .http(http)
2047                .build()
2048                .await;
2049
2050            let (response, reboot_after_update) = state_machine
2051                .oneshot(RequestParams::default())
2052                .await
2053                .unwrap();
2054            assert_eq!(Action::DeferredByPolicy, response.app_responses[0].result);
2055            assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
2056
2057            let request_params = RequestParams::default();
2058            let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
2059            let event = Event {
2060                event_type: EventType::UpdateComplete,
2061                event_result: EventResult::UpdateDeferred,
2062                previous_version: Some("1.2.3.4".to_string()),
2063                ..Event::default()
2064            };
2065            let apps = state_machine.app_set.lock().await.get_apps();
2066            request_builder = request_builder
2067                .add_event(&apps[0], event)
2068                .session_id(GUID::from_u128(0))
2069                .request_id(GUID::from_u128(2));
2070            assert_request(&state_machine.http, request_builder).await;
2071        });
2072    }
2073
2074    #[test]
2075    fn test_report_denied_by_policy() {
2076        block_on(async {
2077            let response = make_update_available_response();
2078            let http = MockHttpRequest::new(response);
2079            let policy_engine = MockPolicyEngine {
2080                update_decision: UpdateDecision::DeniedByPolicy,
2081                ..MockPolicyEngine::default()
2082            };
2083
2084            let mut state_machine = StateMachineBuilder::new_stub()
2085                .policy_engine(policy_engine)
2086                .http(http)
2087                .build()
2088                .await;
2089
2090            let (response, reboot_after_update) = state_machine
2091                .oneshot(RequestParams::default())
2092                .await
2093                .unwrap();
2094            assert_eq!(Action::DeniedByPolicy, response.app_responses[0].result);
2095            assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
2096
2097            let request_params = RequestParams::default();
2098            let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
2099            let event = Event {
2100                previous_version: Some("1.2.3.4".to_string()),
2101                ..Event::error(EventErrorCode::DeniedByPolicy)
2102            };
2103            let apps = state_machine.app_set.lock().await.get_apps();
2104            request_builder = request_builder
2105                .add_event(&apps[0], event)
2106                .session_id(GUID::from_u128(0))
2107                .request_id(GUID::from_u128(2));
2108            assert_request(&state_machine.http, request_builder).await;
2109        });
2110    }
2111
2112    #[test]
2113    fn test_wait_timer() {
2114        let mut pool = LocalPool::new();
2115        let mock_time = MockTimeSource::new_from_now();
2116        let next_update_time = mock_time.now() + Duration::from_secs(111);
2117        let (timer, mut timers) = BlockingTimer::new();
2118        let policy_engine = MockPolicyEngine {
2119            check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
2120            time_source: mock_time,
2121            ..MockPolicyEngine::default()
2122        };
2123
2124        let (_ctl, state_machine) = pool.run_until(
2125            StateMachineBuilder::new_stub()
2126                .policy_engine(policy_engine)
2127                .timer(timer)
2128                .start(),
2129        );
2130
2131        pool.spawner()
2132            .spawn_local(state_machine.map(|_| ()).collect())
2133            .unwrap();
2134
2135        // With otherwise stub implementations, the pool stalls when a timer is awaited.  Dropping
2136        // the state machine will panic if any timer durations were not used.
2137        let blocked_timer = pool.run_until(timers.next()).unwrap();
2138        assert_eq!(
2139            blocked_timer.requested_wait(),
2140            RequestedWait::Until(next_update_time.into())
2141        );
2142    }
2143
2144    #[test]
2145    fn test_cohort_and_user_counting_updates_are_used_in_subsequent_requests() {
2146        block_on(async {
2147            let response = json!({"response":{
2148                "server": "prod",
2149                "protocol": "3.0",
2150                "daystart": {
2151                  "elapsed_days": 1234567,
2152                  "elapsed_seconds": 3645
2153                },
2154                "app": [{
2155                  "appid": "{00000000-0000-0000-0000-000000000001}",
2156                  "status": "ok",
2157                  "cohort": "1",
2158                  "cohortname": "stable-channel",
2159                  "updatecheck": {
2160                    "status": "noupdate"
2161                  }
2162                }]
2163            }});
2164            let response = serde_json::to_vec(&response).unwrap();
2165            let mut http = MockHttpRequest::new(HttpResponse::new(response.clone()));
2166            http.add_response(HttpResponse::new(response));
2167            let apps = make_test_app_set();
2168
2169            let mut state_machine = StateMachineBuilder::new_stub()
2170                .http(http)
2171                .app_set(apps.clone())
2172                .build()
2173                .await;
2174
2175            // Run it the first time.
2176            state_machine.run_once().await;
2177
2178            let apps = apps.lock().await.get_apps();
2179            assert_eq!(Some("1".to_string()), apps[0].cohort.id);
2180            assert_eq!(None, apps[0].cohort.hint);
2181            assert_eq!(Some("stable-channel".to_string()), apps[0].cohort.name);
2182            assert_eq!(
2183                UserCounting::ClientRegulatedByDate(Some(1234567)),
2184                apps[0].user_counting
2185            );
2186
2187            // Run it the second time.
2188            state_machine.run_once().await;
2189
2190            let request_params = RequestParams::default();
2191            let expected_request_builder =
2192                RequestBuilder::new(&state_machine.config, &request_params)
2193                    .add_update_check(&apps[0])
2194                    .add_ping(&apps[0])
2195                    .session_id(GUID::from_u128(2))
2196                    .request_id(GUID::from_u128(3));
2197            // Check that the second update check used the new app.
2198            assert_request(&state_machine.http, expected_request_builder).await;
2199        });
2200    }
2201
2202    #[test]
2203    fn test_user_counting_returned() {
2204        block_on(async {
2205            let response = json!({"response":{
2206            "server": "prod",
2207            "protocol": "3.0",
2208            "daystart": {
2209              "elapsed_days": 1234567,
2210              "elapsed_seconds": 3645
2211            },
2212            "app": [{
2213              "appid": "{00000000-0000-0000-0000-000000000001}",
2214              "status": "ok",
2215              "cohort": "1",
2216              "cohortname": "stable-channel",
2217              "updatecheck": {
2218                "status": "noupdate"
2219                  }
2220              }]
2221            }});
2222            let response = serde_json::to_vec(&response).unwrap();
2223            let http = MockHttpRequest::new(HttpResponse::new(response));
2224
2225            let (response, reboot_after_update) = StateMachineBuilder::new_stub()
2226                .http(http)
2227                .oneshot(RequestParams::default())
2228                .await
2229                .unwrap();
2230
2231            assert_eq!(
2232                UserCounting::ClientRegulatedByDate(Some(1234567)),
2233                response.app_responses[0].user_counting
2234            );
2235            assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
2236        });
2237    }
2238
2239    #[test]
2240    fn test_observe_state() {
2241        block_on(async {
2242            let actual_states = StateMachineBuilder::new_stub()
2243                .oneshot_check()
2244                .await
2245                .filter_map(|event| {
2246                    future::ready(match event {
2247                        StateMachineEvent::StateChange(state) => Some(state),
2248                        _ => None,
2249                    })
2250                })
2251                .collect::<Vec<State>>()
2252                .await;
2253
2254            let expected_states = vec![
2255                State::CheckingForUpdates(InstallSource::ScheduledTask),
2256                State::ErrorCheckingForUpdate,
2257            ];
2258            assert_eq!(actual_states, expected_states);
2259        });
2260    }
2261
2262    #[test]
2263    fn test_observe_schedule() {
2264        block_on(async {
2265            let mock_time = MockTimeSource::new_from_now();
2266            let actual_schedules = StateMachineBuilder::new_stub()
2267                .policy_engine(StubPolicyEngine::new(&mock_time))
2268                .oneshot_check()
2269                .await
2270                .filter_map(|event| {
2271                    future::ready(match event {
2272                        StateMachineEvent::ScheduleChange(schedule) => Some(schedule),
2273                        _ => None,
2274                    })
2275                })
2276                .collect::<Vec<UpdateCheckSchedule>>()
2277                .await;
2278
2279            // The resultant schedule should only contain the timestamp of the above update check.
2280            let expected_schedule = UpdateCheckSchedule::builder()
2281                .last_update_time(mock_time.now())
2282                .last_update_check_time(mock_time.now())
2283                .build();
2284
2285            assert_eq!(actual_schedules, vec![expected_schedule]);
2286        });
2287    }
2288
2289    #[test]
2290    fn test_observe_protocol_state() {
2291        block_on(async {
2292            let actual_protocol_states = StateMachineBuilder::new_stub()
2293                .oneshot_check()
2294                .await
2295                .filter_map(|event| {
2296                    future::ready(match event {
2297                        StateMachineEvent::ProtocolStateChange(state) => Some(state),
2298                        _ => None,
2299                    })
2300                })
2301                .collect::<Vec<ProtocolState>>()
2302                .await;
2303
2304            let expected_protocol_state = ProtocolState {
2305                consecutive_failed_update_checks: 1,
2306                ..ProtocolState::default()
2307            };
2308
2309            assert_eq!(actual_protocol_states, vec![expected_protocol_state]);
2310        });
2311    }
2312
2313    #[test]
2314    fn test_observe_omaha_server_response() {
2315        block_on(async {
2316            let response = json!({"response":{
2317              "server": "prod",
2318              "protocol": "3.0",
2319              "app": [{
2320                "appid": "{00000000-0000-0000-0000-000000000001}",
2321                "status": "ok",
2322                "cohort": "1",
2323                "cohortname": "stable-channel",
2324                "updatecheck": {
2325                  "status": "noupdate"
2326                }
2327              }]
2328            }});
2329            let response = serde_json::to_vec(&response).unwrap();
2330            let expected_omaha_response = response::parse_json_response(&response).unwrap();
2331            let http = MockHttpRequest::new(HttpResponse::new(response));
2332
2333            let actual_omaha_response = StateMachineBuilder::new_stub()
2334                .http(http)
2335                .oneshot_check()
2336                .await
2337                .filter_map(|event| {
2338                    future::ready(match event {
2339                        StateMachineEvent::OmahaServerResponse(response) => Some(response),
2340                        _ => None,
2341                    })
2342                })
2343                .collect::<Vec<response::Response>>()
2344                .await;
2345
2346            assert_eq!(actual_omaha_response, vec![expected_omaha_response]);
2347        });
2348    }
2349
2350    #[test]
2351    fn test_metrics_report_omaha_event_lost() {
2352        block_on(async {
2353            // This is sufficient to trigger a lost Omaha event as oneshot triggers an
2354            // update check, which gets the invalid response (but hasn't checked the
2355            // validity yet). This invalid response still contains an OK status, resulting
2356            // in the UpdateCheckResponseTime and RequestsPerCheck events being generated
2357            // reporting success.
2358            //
2359            // The response is then parsed and found to be incorrect; this parse error is
2360            // attempted to be sent back to Omaha as an event with the ParseResponse error
2361            // associated. However, the MockHttpRequest has already consumed the one
2362            // response it knew how to give; this event is reported via HTTP, but is "lost"
2363            // because the mock responds with a 500 error when it has no responses left to
2364            // return.
2365            //
2366            // That finally results in the OmahaEventLost.
2367            let http = MockHttpRequest::new(HttpResponse::new("invalid response".into()));
2368            let mut metrics_reporter = MockMetricsReporter::new();
2369            let _response = StateMachineBuilder::new_stub()
2370                .http(http)
2371                .metrics_reporter(&mut metrics_reporter)
2372                .oneshot(RequestParams::default())
2373                .await;
2374
2375            // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice
2376            // patterns yet.
2377            #[rustfmt::skip]
2378            assert_matches!(
2379                metrics_reporter.metrics.as_slice(),
2380                [
2381                    Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
2382                    Metrics::RequestsPerCheck { count: 1, successful: true },
2383                    Metrics::OmahaEventLost(Event {
2384                        event_type: EventType::UpdateComplete,
2385                        event_result: EventResult::Error,
2386                        errorcode: Some(EventErrorCode::ParseResponse),
2387                        previous_version: None,
2388                        next_version: None,
2389                        download_time_ms: None,
2390                    })
2391                ]
2392            );
2393        });
2394    }
2395
2396    #[test]
2397    fn test_metrics_report_update_check_response_time() {
2398        block_on(async {
2399            let mut metrics_reporter = MockMetricsReporter::new();
2400            let _response = StateMachineBuilder::new_stub()
2401                .metrics_reporter(&mut metrics_reporter)
2402                .oneshot(RequestParams::default())
2403                .await;
2404
2405            // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice
2406            // patterns yet.
2407            #[rustfmt::skip]
2408            assert_matches!(
2409                metrics_reporter.metrics.as_slice(),
2410                [
2411                    Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
2412                    Metrics::RequestsPerCheck { count: 1, successful: true },
2413                ]
2414            );
2415        });
2416    }
2417
2418    #[test]
2419    fn test_metrics_report_update_check_response_time_on_failure() {
2420        block_on(async {
2421            let mut metrics_reporter = MockMetricsReporter::new();
2422            let mut http = MockHttpRequest::default();
2423
2424            for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS {
2425                http.add_error(http_request::mock_errors::make_transport_error());
2426            }
2427
2428            // Note: we exit the update loop before we fetch the successful result, so we never see
2429            // this result.
2430            http.add_response(hyper::Response::default());
2431
2432            let _response = StateMachineBuilder::new_stub()
2433                .http(http)
2434                .metrics_reporter(&mut metrics_reporter)
2435                .oneshot(RequestParams::default())
2436                .await;
2437
2438            // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice
2439            // patterns yet.
2440            #[rustfmt::skip]
2441            assert_matches!(
2442                metrics_reporter.metrics.as_slice(),
2443                [
2444                    Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
2445                    Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
2446                    Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
2447                    Metrics::RequestsPerCheck { count: 3, successful: false },
2448                ]
2449            );
2450        });
2451    }
2452
2453    #[test]
2454    fn test_metrics_report_update_check_response_time_on_failure_followed_by_success() {
2455        block_on(async {
2456            let mut metrics_reporter = MockMetricsReporter::new();
2457            let mut http = MockHttpRequest::default();
2458
2459            for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS - 1 {
2460                http.add_error(http_request::mock_errors::make_transport_error());
2461            }
2462            http.add_response(hyper::Response::default());
2463
2464            let _response = StateMachineBuilder::new_stub()
2465                .http(http)
2466                .metrics_reporter(&mut metrics_reporter)
2467                .oneshot(RequestParams::default())
2468                .await;
2469
2470            // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice
2471            // patterns yet.
2472            #[rustfmt::skip]
2473            assert_matches!(
2474                metrics_reporter.metrics.as_slice(),
2475                [
2476                    Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
2477                    Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
2478                    Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
2479                    Metrics::RequestsPerCheck { count: 3, successful: true },
2480                    Metrics::OmahaEventLost(Event {
2481                        event_type: EventType::UpdateComplete,
2482                        event_result: EventResult::Error,
2483                        errorcode: Some(EventErrorCode::ParseResponse),
2484                        previous_version: None,
2485                        next_version: None,
2486                        download_time_ms: None
2487                    }),
2488                ]
2489            );
2490        });
2491    }
2492
2493    #[test]
2494    fn test_metrics_report_requests_per_check() {
2495        block_on(async {
2496            let mut metrics_reporter = MockMetricsReporter::new();
2497            let _response = StateMachineBuilder::new_stub()
2498                .metrics_reporter(&mut metrics_reporter)
2499                .oneshot(RequestParams::default())
2500                .await;
2501
2502            assert!(
2503                metrics_reporter
2504                    .metrics
2505                    .contains(&Metrics::RequestsPerCheck {
2506                        count: 1,
2507                        successful: true
2508                    })
2509            );
2510        });
2511    }
2512
2513    #[test]
2514    fn test_metrics_report_requests_per_check_on_failure_followed_by_success() {
2515        block_on(async {
2516            let mut metrics_reporter = MockMetricsReporter::new();
2517            let mut http = MockHttpRequest::default();
2518
2519            for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS - 1 {
2520                http.add_error(http_request::mock_errors::make_transport_error());
2521            }
2522
2523            http.add_response(hyper::Response::default());
2524
2525            let _response = StateMachineBuilder::new_stub()
2526                .http(http)
2527                .metrics_reporter(&mut metrics_reporter)
2528                .oneshot(RequestParams::default())
2529                .await;
2530
2531            assert!(!metrics_reporter.metrics.is_empty());
2532            assert!(
2533                metrics_reporter
2534                    .metrics
2535                    .contains(&Metrics::RequestsPerCheck {
2536                        count: MAX_OMAHA_REQUEST_ATTEMPTS,
2537                        successful: true
2538                    })
2539            );
2540        });
2541    }
2542
2543    #[test]
2544    fn test_metrics_report_requests_per_check_on_failure() {
2545        block_on(async {
2546            let mut metrics_reporter = MockMetricsReporter::new();
2547            let mut http = MockHttpRequest::default();
2548
2549            for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS {
2550                http.add_error(http_request::mock_errors::make_transport_error());
2551            }
2552
2553            // Note we will give up before we get this successful request.
2554            http.add_response(hyper::Response::default());
2555
2556            let _response = StateMachineBuilder::new_stub()
2557                .http(http)
2558                .metrics_reporter(&mut metrics_reporter)
2559                .oneshot(RequestParams::default())
2560                .await;
2561
2562            assert!(!metrics_reporter.metrics.is_empty());
2563            assert!(
2564                metrics_reporter
2565                    .metrics
2566                    .contains(&Metrics::RequestsPerCheck {
2567                        count: MAX_OMAHA_REQUEST_ATTEMPTS,
2568                        successful: false
2569                    })
2570            );
2571        });
2572    }
2573
2574    #[test]
2575    fn test_requests_per_check_backoff_with_mock_timer() {
2576        block_on(async {
2577            let mut timer = MockTimer::new();
2578            timer.expect_for_range(Duration::from_millis(500), Duration::from_millis(1500));
2579            timer.expect_for_range(Duration::from_millis(1500), Duration::from_millis(2500));
2580            let requested_waits = timer.get_requested_waits_view();
2581            let response = StateMachineBuilder::new_stub()
2582                .http(MockHttpRequest::empty())
2583                .timer(timer)
2584                .oneshot(RequestParams::default())
2585                .await;
2586
2587            let waits = requested_waits.borrow();
2588            assert_eq!(waits.len(), 2);
2589            assert_matches!(
2590                waits[0],
2591                RequestedWait::For(d) if d >= Duration::from_millis(500) && d <= Duration::from_millis(1500)
2592            );
2593            assert_matches!(
2594                waits[1],
2595                RequestedWait::For(d) if d >= Duration::from_millis(1500) && d <= Duration::from_millis(2500)
2596            );
2597
2598            assert_matches!(
2599                response,
2600                Err(UpdateCheckError::OmahaRequest(
2601                    OmahaRequestError::HttpStatus(_)
2602                ))
2603            );
2604        });
2605    }
2606
2607    #[test]
2608    fn test_metrics_report_update_check_failure_reason_omaha() {
2609        block_on(async {
2610            let mut metrics_reporter = MockMetricsReporter::new();
2611            let mut state_machine = StateMachineBuilder::new_stub()
2612                .metrics_reporter(&mut metrics_reporter)
2613                .build()
2614                .await;
2615
2616            state_machine.run_once().await;
2617
2618            assert!(
2619                metrics_reporter
2620                    .metrics
2621                    .contains(&Metrics::UpdateCheckFailureReason(
2622                        UpdateCheckFailureReason::Omaha
2623                    ))
2624            );
2625        });
2626    }
2627
2628    #[test]
2629    fn test_metrics_report_update_check_failure_reason_network() {
2630        block_on(async {
2631            let mut metrics_reporter = MockMetricsReporter::new();
2632            let mut state_machine = StateMachineBuilder::new_stub()
2633                .http(MockHttpRequest::empty())
2634                .metrics_reporter(&mut metrics_reporter)
2635                .build()
2636                .await;
2637
2638            state_machine.run_once().await;
2639
2640            assert!(
2641                metrics_reporter
2642                    .metrics
2643                    .contains(&Metrics::UpdateCheckFailureReason(
2644                        UpdateCheckFailureReason::Network
2645                    ))
2646            );
2647        });
2648    }
2649
2650    #[test]
2651    fn test_persist_last_update_time() {
2652        block_on(async {
2653            let storage = Rc::new(Mutex::new(MemStorage::new()));
2654
2655            StateMachineBuilder::new_stub()
2656                .storage(Rc::clone(&storage))
2657                .oneshot_check()
2658                .await
2659                .map(|_| ())
2660                .collect::<()>()
2661                .await;
2662
2663            let storage = storage.lock().await;
2664            storage.get_int(LAST_UPDATE_TIME).await.unwrap();
2665            assert!(storage.committed());
2666        });
2667    }
2668
2669    #[test]
2670    fn test_persist_server_dictated_poll_interval() {
2671        block_on(async {
2672            let response = HttpResponse::builder()
2673                .header(X_RETRY_AFTER, 1234)
2674                .body(make_noupdate_httpresponse())
2675                .unwrap();
2676            let http = MockHttpRequest::new(response);
2677            let storage = Rc::new(Mutex::new(MemStorage::new()));
2678
2679            let mut state_machine = StateMachineBuilder::new_stub()
2680                .http(http)
2681                .storage(Rc::clone(&storage))
2682                .build()
2683                .await;
2684            state_machine
2685                .oneshot(RequestParams::default())
2686                .await
2687                .unwrap();
2688
2689            assert_eq!(
2690                state_machine.context.state.server_dictated_poll_interval,
2691                Some(Duration::from_secs(1234))
2692            );
2693
2694            let storage = storage.lock().await;
2695            assert_eq!(
2696                storage.get_int(SERVER_DICTATED_POLL_INTERVAL).await,
2697                Some(1234000000)
2698            );
2699            assert!(storage.committed());
2700        });
2701    }
2702
2703    #[test]
2704    fn test_persist_server_dictated_poll_interval_http_error() {
2705        block_on(async {
2706            let response = HttpResponse::builder()
2707                .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
2708                .header(X_RETRY_AFTER, 1234)
2709                .body(vec![])
2710                .unwrap();
2711            let http = MockHttpRequest::new(response);
2712            let storage = Rc::new(Mutex::new(MemStorage::new()));
2713
2714            let mut state_machine = StateMachineBuilder::new_stub()
2715                .http(http)
2716                .storage(Rc::clone(&storage))
2717                .build()
2718                .await;
2719            assert_matches!(
2720                state_machine.oneshot(RequestParams::default()).await,
2721                Err(UpdateCheckError::OmahaRequest(
2722                    OmahaRequestError::HttpStatus(_)
2723                ))
2724            );
2725
2726            assert_eq!(
2727                state_machine.context.state.server_dictated_poll_interval,
2728                Some(Duration::from_secs(1234))
2729            );
2730
2731            let storage = storage.lock().await;
2732            assert_eq!(
2733                storage.get_int(SERVER_DICTATED_POLL_INTERVAL).await,
2734                Some(1234000000)
2735            );
2736            assert!(storage.committed());
2737        });
2738    }
2739
2740    #[test]
2741    fn test_persist_server_dictated_poll_interval_max_duration() {
2742        block_on(async {
2743            let response = HttpResponse::builder()
2744                .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
2745                .header(X_RETRY_AFTER, 123456789)
2746                .body(vec![])
2747                .unwrap();
2748            let http = MockHttpRequest::new(response);
2749            let storage = Rc::new(Mutex::new(MemStorage::new()));
2750
2751            let mut state_machine = StateMachineBuilder::new_stub()
2752                .http(http)
2753                .storage(Rc::clone(&storage))
2754                .build()
2755                .await;
2756            assert_matches!(
2757                state_machine.oneshot(RequestParams::default()).await,
2758                Err(UpdateCheckError::OmahaRequest(
2759                    OmahaRequestError::HttpStatus(_)
2760                ))
2761            );
2762
2763            assert_eq!(
2764                state_machine.context.state.server_dictated_poll_interval,
2765                Some(Duration::from_secs(86400))
2766            );
2767
2768            let storage = storage.lock().await;
2769            assert_eq!(
2770                storage.get_int(SERVER_DICTATED_POLL_INTERVAL).await,
2771                Some(86400000000)
2772            );
2773            assert!(storage.committed());
2774        });
2775    }
2776
2777    #[test]
2778    fn test_server_dictated_poll_interval_with_transport_error_no_retry() {
2779        block_on(async {
2780            let mut http = MockHttpRequest::empty();
2781            http.add_error(http_request::mock_errors::make_transport_error());
2782            let mut storage = MemStorage::new();
2783            let _ = storage.set_int(SERVER_DICTATED_POLL_INTERVAL, 1234000000);
2784            let _ = storage.commit();
2785            let storage = Rc::new(Mutex::new(storage));
2786
2787            let mut state_machine = StateMachineBuilder::new_stub()
2788                .http(http)
2789                .storage(Rc::clone(&storage))
2790                .build()
2791                .await;
2792            // This verifies that state machine does not retry because MockHttpRequest will only
2793            // return the transport error on the first request, any additional requests will get
2794            // HttpStatus error.
2795            assert_matches!(
2796                state_machine.oneshot(RequestParams::default()).await,
2797                Err(UpdateCheckError::OmahaRequest(
2798                    OmahaRequestError::HttpTransport(_)
2799                ))
2800            );
2801
2802            assert_eq!(
2803                state_machine.context.state.server_dictated_poll_interval,
2804                Some(Duration::from_secs(1234))
2805            );
2806        });
2807    }
2808
2809    #[test]
2810    fn test_persist_app() {
2811        block_on(async {
2812            let storage = Rc::new(Mutex::new(MemStorage::new()));
2813            let app_set = make_test_app_set();
2814
2815            StateMachineBuilder::new_stub()
2816                .storage(Rc::clone(&storage))
2817                .app_set(app_set.clone())
2818                .oneshot_check()
2819                .await
2820                .map(|_| ())
2821                .collect::<()>()
2822                .await;
2823
2824            let storage = storage.lock().await;
2825            let apps = app_set.lock().await.get_apps();
2826            storage.get_string(&apps[0].id).await.unwrap();
2827            assert!(storage.committed());
2828        });
2829    }
2830
2831    #[test]
2832    fn test_load_last_update_time() {
2833        block_on(async {
2834            let mut storage = MemStorage::new();
2835            let mut mock_time = MockTimeSource::new_from_now();
2836            mock_time.truncate_submicrosecond_walltime();
2837            let last_update_time = mock_time.now_in_walltime() - Duration::from_secs(999);
2838            storage
2839                .set_time(LAST_UPDATE_TIME, last_update_time)
2840                .await
2841                .unwrap();
2842
2843            let state_machine = StateMachineBuilder::new_stub()
2844                .policy_engine(StubPolicyEngine::new(&mock_time))
2845                .storage(Rc::new(Mutex::new(storage)))
2846                .build()
2847                .await;
2848
2849            assert_eq!(
2850                state_machine.context.schedule.last_update_time.unwrap(),
2851                PartialComplexTime::Wall(last_update_time)
2852            );
2853        });
2854    }
2855
2856    #[test]
2857    fn test_load_server_dictated_poll_interval() {
2858        block_on(async {
2859            let mut storage = MemStorage::new();
2860            storage
2861                .set_int(SERVER_DICTATED_POLL_INTERVAL, 56789)
2862                .await
2863                .unwrap();
2864
2865            let state_machine = StateMachineBuilder::new_stub()
2866                .storage(Rc::new(Mutex::new(storage)))
2867                .build()
2868                .await;
2869
2870            assert_eq!(
2871                Some(Duration::from_micros(56789)),
2872                state_machine.context.state.server_dictated_poll_interval
2873            );
2874        });
2875    }
2876
2877    #[test]
2878    fn test_load_app() {
2879        block_on(async {
2880            let app_set = VecAppSet::new(vec![
2881                App::builder()
2882                    .id("{00000000-0000-0000-0000-000000000001}")
2883                    .version([1, 2, 3, 4])
2884                    .build(),
2885            ]);
2886            let mut storage = MemStorage::new();
2887            let persisted_app = PersistedApp {
2888                cohort: Cohort {
2889                    id: Some("cohort_id".to_string()),
2890                    hint: Some("test_channel".to_string()),
2891                    name: None,
2892                },
2893                user_counting: UserCounting::ClientRegulatedByDate(Some(22222)),
2894            };
2895            let json = serde_json::to_string(&persisted_app).unwrap();
2896            let apps = app_set.get_apps();
2897            storage.set_string(&apps[0].id, &json).await.unwrap();
2898
2899            let app_set = Rc::new(Mutex::new(app_set));
2900
2901            let _state_machine = StateMachineBuilder::new_stub()
2902                .storage(Rc::new(Mutex::new(storage)))
2903                .app_set(Rc::clone(&app_set))
2904                .build()
2905                .await;
2906
2907            let apps = app_set.lock().await.get_apps();
2908            assert_eq!(persisted_app.cohort, apps[0].cohort);
2909            assert_eq!(
2910                UserCounting::ClientRegulatedByDate(Some(22222)),
2911                apps[0].user_counting
2912            );
2913        });
2914    }
2915
2916    #[test]
2917    fn test_report_check_interval_with_no_storage() {
2918        block_on(async {
2919            let mut mock_time = MockTimeSource::new_from_now();
2920            let mut state_machine = StateMachineBuilder::new_stub()
2921                .policy_engine(StubPolicyEngine::new(mock_time.clone()))
2922                .metrics_reporter(MockMetricsReporter::new())
2923                .build()
2924                .await;
2925
2926            state_machine
2927                .report_check_interval(InstallSource::ScheduledTask)
2928                .await;
2929            // No metrics should be reported because no LAST_UPDATE_TIME in storage.
2930            assert!(state_machine.metrics_reporter.metrics.is_empty());
2931
2932            // A second update check should report metrics.
2933            let interval = Duration::from_micros(999999);
2934            mock_time.advance(interval);
2935
2936            state_machine
2937                .report_check_interval(InstallSource::ScheduledTask)
2938                .await;
2939
2940            assert_eq!(
2941                state_machine.metrics_reporter.metrics,
2942                vec![Metrics::UpdateCheckInterval {
2943                    interval,
2944                    clock: ClockType::Monotonic,
2945                    install_source: InstallSource::ScheduledTask,
2946                }]
2947            );
2948        });
2949    }
2950
2951    #[test]
2952    fn test_report_check_interval_mono_transition() {
2953        block_on(async {
2954            let mut mock_time = MockTimeSource::new_from_now();
2955            let mut state_machine = StateMachineBuilder::new_stub()
2956                .policy_engine(StubPolicyEngine::new(mock_time.clone()))
2957                .metrics_reporter(MockMetricsReporter::new())
2958                .build()
2959                .await;
2960
2961            // Make sure that, provided a wall time, we get an initial report
2962            // using the wall time.
2963            let initial_duration = Duration::from_secs(999);
2964            let initial_time = mock_time.now_in_walltime() - initial_duration;
2965            state_machine.context.schedule.last_update_check_time =
2966                Some(PartialComplexTime::Wall(initial_time));
2967            state_machine
2968                .report_check_interval(InstallSource::ScheduledTask)
2969                .await;
2970
2971            // Advance one more time, and this time we should see a monotonic delta.
2972            let interval = Duration::from_micros(999999);
2973            mock_time.advance(interval);
2974            state_machine
2975                .report_check_interval(InstallSource::ScheduledTask)
2976                .await;
2977
2978            // One final time, to demonstrate monotonic time edges to
2979            // monotonic time.
2980            mock_time.advance(interval);
2981            state_machine
2982                .report_check_interval(InstallSource::ScheduledTask)
2983                .await;
2984            assert_eq!(
2985                state_machine.metrics_reporter.metrics,
2986                vec![
2987                    Metrics::UpdateCheckInterval {
2988                        interval: initial_duration,
2989                        clock: ClockType::Wall,
2990                        install_source: InstallSource::ScheduledTask,
2991                    },
2992                    Metrics::UpdateCheckInterval {
2993                        interval,
2994                        clock: ClockType::Monotonic,
2995                        install_source: InstallSource::ScheduledTask,
2996                    },
2997                    Metrics::UpdateCheckInterval {
2998                        interval,
2999                        clock: ClockType::Monotonic,
3000                        install_source: InstallSource::ScheduledTask,
3001                    },
3002                ]
3003            );
3004        });
3005    }
3006
3007    #[derive(Debug)]
3008    pub struct TestInstaller {
3009        reboot_called: Rc<RefCell<bool>>,
3010        install_fails: usize,
3011        mock_time: MockTimeSource,
3012    }
3013    struct TestInstallerBuilder {
3014        install_fails: usize,
3015        mock_time: MockTimeSource,
3016    }
3017    impl TestInstaller {
3018        fn builder(mock_time: MockTimeSource) -> TestInstallerBuilder {
3019            TestInstallerBuilder {
3020                install_fails: 0,
3021                mock_time,
3022            }
3023        }
3024    }
3025    impl TestInstallerBuilder {
3026        fn add_install_fail(mut self) -> Self {
3027            self.install_fails += 1;
3028            self
3029        }
3030        fn build(self) -> TestInstaller {
3031            TestInstaller {
3032                reboot_called: Rc::new(RefCell::new(false)),
3033                install_fails: self.install_fails,
3034                mock_time: self.mock_time,
3035            }
3036        }
3037    }
3038    const INSTALL_DURATION: Duration = Duration::from_micros(98765433);
3039
3040    impl Installer for TestInstaller {
3041        type InstallPlan = StubPlan;
3042        type Error = StubInstallErrors;
3043        type InstallResult = ();
3044
3045        fn perform_install<'a>(
3046            &'a mut self,
3047            _install_plan: &StubPlan,
3048            observer: Option<&'a dyn ProgressObserver>,
3049        ) -> LocalBoxFuture<'a, (Self::InstallResult, Vec<AppInstallResult<Self::Error>>)> {
3050            if self.install_fails > 0 {
3051                self.install_fails -= 1;
3052                future::ready((
3053                    (),
3054                    vec![AppInstallResult::Failed(StubInstallErrors::Failed)],
3055                ))
3056                .boxed()
3057            } else {
3058                self.mock_time.advance(INSTALL_DURATION);
3059                async move {
3060                    if let Some(observer) = observer {
3061                        observer.receive_progress(None, 0.0, None, None).await;
3062                        observer.receive_progress(None, 0.3, None, None).await;
3063                        observer.receive_progress(None, 0.9, None, None).await;
3064                        observer.receive_progress(None, 1.0, None, None).await;
3065                    }
3066                    ((), vec![AppInstallResult::Installed])
3067                }
3068                .boxed_local()
3069            }
3070        }
3071
3072        fn perform_reboot(&mut self) -> LocalBoxFuture<'_, Result<(), anyhow::Error>> {
3073            self.reboot_called.replace(true);
3074            future::ready(Ok(())).boxed_local()
3075        }
3076
3077        fn try_create_install_plan<'a>(
3078            &'a self,
3079            _request_params: &'a RequestParams,
3080            _request_metadata: Option<&'a RequestMetadata>,
3081            _response: &'a Response,
3082            _response_bytes: Vec<u8>,
3083            _ecdsa_signature: Option<Vec<u8>>,
3084        ) -> LocalBoxFuture<'a, Result<Self::InstallPlan, Self::Error>> {
3085            future::ready(Ok(StubPlan)).boxed_local()
3086        }
3087    }
3088
3089    #[test]
3090    fn test_report_successful_update_duration() {
3091        block_on(async {
3092            let http = MockHttpRequest::new(make_update_available_response());
3093            let storage = Rc::new(Mutex::new(MemStorage::new()));
3094
3095            let mut mock_time = MockTimeSource::new_from_now();
3096            mock_time.truncate_submicrosecond_walltime();
3097            let now = mock_time.now();
3098
3099            let update_completed_time = now + INSTALL_DURATION;
3100            let expected_update_duration = update_completed_time.wall_duration_since(now).unwrap();
3101
3102            let first_seen_time = now - Duration::from_micros(1000);
3103
3104            let expected_duration_since_first_seen = update_completed_time
3105                .wall_duration_since(first_seen_time)
3106                .unwrap();
3107
3108            let mut state_machine = StateMachineBuilder::new_stub()
3109                .http(http)
3110                .installer(TestInstaller::builder(mock_time.clone()).build())
3111                .policy_engine(StubPolicyEngine::new(mock_time.clone()))
3112                .metrics_reporter(MockMetricsReporter::new())
3113                .storage(Rc::clone(&storage))
3114                .build()
3115                .await;
3116
3117            {
3118                let mut storage = storage.lock().await;
3119                storage.set_string(INSTALL_PLAN_ID, "").await.unwrap();
3120                storage
3121                    .set_time(UPDATE_FIRST_SEEN_TIME, first_seen_time)
3122                    .await
3123                    .unwrap();
3124                storage.commit().await.unwrap();
3125            }
3126
3127            state_machine.run_once().await;
3128
3129            #[rustfmt::skip]
3130            assert_matches!(
3131                state_machine.metrics_reporter.metrics.as_slice(),
3132                [
3133                    Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
3134                    Metrics::RequestsPerCheck { count: 1, successful: true },
3135                    Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadStarted, event_result: EventResult::Success, .. }),
3136                    Metrics::SuccessfulUpdateDuration(install_duration),
3137                    Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadFinished, event_result: EventResult::Success, .. }),
3138                    Metrics::OmahaEventLost(Event { event_type: EventType::UpdateComplete, event_result: EventResult::Success, .. }),
3139                    Metrics::SuccessfulUpdateFromFirstSeen(duration_since_first_seen),
3140                    Metrics::AttemptsToSuccessfulCheck(1),
3141                    Metrics::AttemptsToSuccessfulInstall { count: 1, successful: true },
3142                ]
3143                if
3144                    *install_duration == expected_update_duration &&
3145                    *duration_since_first_seen == expected_duration_since_first_seen
3146            );
3147        });
3148    }
3149
3150    #[test]
3151    fn test_report_failed_update_duration() {
3152        block_on(async {
3153            let http = MockHttpRequest::new(make_update_available_response());
3154            let mut state_machine = StateMachineBuilder::new_stub()
3155                .http(http)
3156                .installer(StubInstaller { should_fail: true })
3157                .metrics_reporter(MockMetricsReporter::new())
3158                .build()
3159                .await;
3160            // clock::mock::set(time::i64_to_time(123456789));
3161
3162            state_machine.run_once().await;
3163
3164            assert!(
3165                state_machine
3166                    .metrics_reporter
3167                    .metrics
3168                    .contains(&Metrics::FailedUpdateDuration(Duration::from_micros(0)))
3169            );
3170        });
3171    }
3172
3173    #[test]
3174    fn test_record_update_first_seen_time() {
3175        block_on(async {
3176            let storage = Rc::new(Mutex::new(MemStorage::new()));
3177            let mut state_machine = StateMachineBuilder::new_stub()
3178                .storage(Rc::clone(&storage))
3179                .build()
3180                .await;
3181
3182            let mut mock_time = MockTimeSource::new_from_now();
3183            mock_time.truncate_submicrosecond_walltime();
3184            let now = mock_time.now_in_walltime();
3185            assert_eq!(
3186                state_machine.record_update_first_seen_time("id", now).await,
3187                now
3188            );
3189            {
3190                let storage = storage.lock().await;
3191                assert_eq!(
3192                    storage.get_string(INSTALL_PLAN_ID).await,
3193                    Some("id".to_string())
3194                );
3195                assert_eq!(storage.get_time(UPDATE_FIRST_SEEN_TIME).await, Some(now));
3196                assert_eq!(storage.len(), 2);
3197                assert!(storage.committed());
3198            }
3199
3200            mock_time.advance(Duration::from_secs(1000));
3201            let now2 = mock_time.now_in_walltime();
3202            assert_eq!(
3203                state_machine
3204                    .record_update_first_seen_time("id", now2)
3205                    .await,
3206                now
3207            );
3208            {
3209                let storage = storage.lock().await;
3210                assert_eq!(
3211                    storage.get_string(INSTALL_PLAN_ID).await,
3212                    Some("id".to_string())
3213                );
3214                assert_eq!(storage.get_time(UPDATE_FIRST_SEEN_TIME).await, Some(now));
3215                assert_eq!(storage.len(), 2);
3216                assert!(storage.committed());
3217            }
3218            assert_eq!(
3219                state_machine
3220                    .record_update_first_seen_time("id2", now2)
3221                    .await,
3222                now2
3223            );
3224            {
3225                let storage = storage.lock().await;
3226                assert_eq!(
3227                    storage.get_string(INSTALL_PLAN_ID).await,
3228                    Some("id2".to_string())
3229                );
3230                assert_eq!(storage.get_time(UPDATE_FIRST_SEEN_TIME).await, Some(now2));
3231                assert_eq!(storage.len(), 2);
3232                assert!(storage.committed());
3233            }
3234        });
3235    }
3236
3237    #[test]
3238    fn test_report_attempts_to_successful_check() {
3239        block_on(async {
3240            let storage = Rc::new(Mutex::new(MemStorage::new()));
3241            let mut state_machine = StateMachineBuilder::new_stub()
3242                .installer(StubInstaller { should_fail: true })
3243                .metrics_reporter(MockMetricsReporter::new())
3244                .storage(Rc::clone(&storage))
3245                .build()
3246                .await;
3247
3248            state_machine
3249                .report_attempts_to_successful_check(true)
3250                .await;
3251
3252            // consecutive_failed_update_attempts should be zero (there were no previous failures)
3253            // but we should record an attempt in metrics
3254            assert_eq!(
3255                state_machine.context.state.consecutive_failed_update_checks,
3256                0
3257            );
3258            assert_eq!(
3259                state_machine.metrics_reporter.metrics,
3260                vec![Metrics::AttemptsToSuccessfulCheck(1)]
3261            );
3262
3263            state_machine
3264                .report_attempts_to_successful_check(false)
3265                .await;
3266            assert_eq!(
3267                state_machine.context.state.consecutive_failed_update_checks,
3268                1
3269            );
3270
3271            state_machine
3272                .report_attempts_to_successful_check(false)
3273                .await;
3274            assert_eq!(
3275                state_machine.context.state.consecutive_failed_update_checks,
3276                2
3277            );
3278
3279            // consecutive_failed_update_attempts should be reset to zero on success
3280            // but we should record the previous number of failed attempts (2) + 1 in metrics
3281            state_machine
3282                .report_attempts_to_successful_check(true)
3283                .await;
3284            assert_eq!(
3285                state_machine.context.state.consecutive_failed_update_checks,
3286                0
3287            );
3288            assert_eq!(
3289                state_machine.metrics_reporter.metrics,
3290                vec![
3291                    Metrics::AttemptsToSuccessfulCheck(1),
3292                    Metrics::AttemptsToSuccessfulCheck(3)
3293                ]
3294            );
3295        });
3296    }
3297
3298    #[test]
3299    fn test_ping_omaha_updates_consecutive_failed_update_checks_and_persists() {
3300        block_on(async {
3301            let mut http = MockHttpRequest::empty();
3302            http.add_error(http_request::mock_errors::make_transport_error());
3303            http.add_response(HttpResponse::new(vec![]));
3304            let response = json!({"response":{
3305              "server": "prod",
3306              "protocol": "3.0",
3307              "app": [{
3308                "appid": "{00000000-0000-0000-0000-000000000001}",
3309                "status": "ok",
3310              }],
3311            }});
3312            let response = serde_json::to_vec(&response).unwrap();
3313            http.add_response(HttpResponse::new(response));
3314
3315            let storage = Rc::new(Mutex::new(MemStorage::new()));
3316
3317            // Start out with a value in storage...
3318            {
3319                let mut storage = storage.lock().await;
3320                let _ = storage.set_int(CONSECUTIVE_FAILED_UPDATE_CHECKS, 1);
3321                let _ = storage.commit();
3322            }
3323
3324            let mut state_machine = StateMachineBuilder::new_stub()
3325                .storage(Rc::clone(&storage))
3326                .http(http)
3327                .build()
3328                .await;
3329
3330            async_generator::generate(move |mut co| async move {
3331                // Failed ping increases `consecutive_failed_update_checks`, adding the value from
3332                // storage.
3333                state_machine.ping_omaha(&mut co).await;
3334                assert_eq!(
3335                    state_machine.context.state.consecutive_failed_update_checks,
3336                    2
3337                );
3338                {
3339                    let storage = storage.lock().await;
3340                    assert_eq!(
3341                        storage.get_int(CONSECUTIVE_FAILED_UPDATE_CHECKS).await,
3342                        Some(2)
3343                    );
3344                }
3345
3346                state_machine.ping_omaha(&mut co).await;
3347                assert_eq!(
3348                    state_machine.context.state.consecutive_failed_update_checks,
3349                    3
3350                );
3351                {
3352                    let storage = storage.lock().await;
3353                    assert_eq!(
3354                        storage.get_int(CONSECUTIVE_FAILED_UPDATE_CHECKS).await,
3355                        Some(3)
3356                    );
3357                }
3358
3359                // Successful ping resets `consecutive_failed_update_checks`.
3360                state_machine.ping_omaha(&mut co).await;
3361                assert_eq!(
3362                    state_machine.context.state.consecutive_failed_update_checks,
3363                    0
3364                );
3365                {
3366                    let storage = storage.lock().await;
3367                    assert_eq!(
3368                        storage.get_int(CONSECUTIVE_FAILED_UPDATE_CHECKS).await,
3369                        None
3370                    );
3371                }
3372            })
3373            .into_complete()
3374            .await;
3375        });
3376    }
3377
3378    #[test]
3379    fn test_report_attempts_to_successful_install() {
3380        block_on(async {
3381            let http = MockHttpRequest::new(make_update_available_response());
3382            let storage = Rc::new(Mutex::new(MemStorage::new()));
3383
3384            let mock_time = MockTimeSource::new_from_now();
3385
3386            let mut state_machine = StateMachineBuilder::new_stub()
3387                .http(http)
3388                .installer(TestInstaller::builder(mock_time.clone()).build())
3389                .policy_engine(StubPolicyEngine::new(mock_time.clone()))
3390                .metrics_reporter(MockMetricsReporter::new())
3391                .storage(Rc::clone(&storage))
3392                .build()
3393                .await;
3394
3395            state_machine.run_once().await;
3396
3397            // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice
3398            // patterns yet.
3399            #[rustfmt::skip]
3400            assert_matches!(
3401                state_machine.metrics_reporter.metrics.as_slice(),
3402                [
3403                    Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
3404                    Metrics::RequestsPerCheck { count: 1, successful: true },
3405                    Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadStarted, event_result: EventResult::Success, .. }),
3406                    Metrics::SuccessfulUpdateDuration(_),
3407                    Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadFinished, event_result: EventResult::Success, .. }),
3408                    Metrics::OmahaEventLost(Event { event_type: EventType::UpdateComplete, event_result: EventResult::Success, .. }),
3409                    Metrics::SuccessfulUpdateFromFirstSeen(_),
3410                    Metrics::AttemptsToSuccessfulCheck(1),
3411                    Metrics::AttemptsToSuccessfulInstall { count: 1, successful: true },
3412                ]
3413            );
3414        });
3415    }
3416
3417    #[test]
3418    fn test_report_attempts_to_successful_install_fails_then_succeeds() {
3419        block_on(async {
3420            let mut http = MockHttpRequest::new(make_update_available_response());
3421            // Responses to events. This first batch corresponds to the install failure, so these
3422            // should be the update download started, and another for a failed install.
3423            // `Event::error(EventErrorCode::Installation)`.
3424            http.add_response(HttpResponse::new(vec![]));
3425            http.add_response(HttpResponse::new(vec![]));
3426
3427            // Respond to the next request.
3428            http.add_response(make_update_available_response());
3429            // Responses to events. This corresponds to the update download started, and the other
3430            // for a successful install.
3431            http.add_response(HttpResponse::new(vec![]));
3432            http.add_response(HttpResponse::new(vec![]));
3433
3434            let storage = Rc::new(Mutex::new(MemStorage::new()));
3435            let mock_time = MockTimeSource::new_from_now();
3436
3437            let mut state_machine = StateMachineBuilder::new_stub()
3438                .http(http)
3439                .installer(
3440                    TestInstaller::builder(mock_time.clone())
3441                        .add_install_fail()
3442                        .build(),
3443                )
3444                .policy_engine(StubPolicyEngine::new(mock_time.clone()))
3445                .metrics_reporter(MockMetricsReporter::new())
3446                .storage(Rc::clone(&storage))
3447                .build()
3448                .await;
3449
3450            state_machine.run_once().await;
3451            state_machine.run_once().await;
3452
3453            // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice
3454            // patterns yet.
3455            #[rustfmt::skip]
3456            assert_matches!(
3457                state_machine.metrics_reporter.metrics.as_slice(),
3458                [
3459                    Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
3460                    Metrics::RequestsPerCheck { count: 1, successful: true },
3461                    Metrics::FailedUpdateDuration(_),
3462                    Metrics::AttemptsToSuccessfulCheck(1),
3463                    Metrics::AttemptsToSuccessfulInstall { count: 1, successful: false },
3464                    Metrics::UpdateCheckInterval { .. },
3465                    Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
3466                    Metrics::RequestsPerCheck { count: 1, successful: true },
3467                    Metrics::SuccessfulUpdateDuration(_),
3468                    Metrics::OmahaEventLost(Event { .. }),
3469                    Metrics::SuccessfulUpdateFromFirstSeen(_),
3470                    Metrics::AttemptsToSuccessfulCheck(1),
3471                    Metrics::AttemptsToSuccessfulInstall { count: 2, successful: true }
3472                ]
3473            );
3474        });
3475    }
3476
3477    #[test]
3478    fn test_report_attempts_to_successful_install_does_not_report_for_no_update() {
3479        block_on(async {
3480            let response = json!({"response":{
3481              "server": "prod",
3482              "protocol": "3.0",
3483              "app": [{
3484                "appid": "{00000000-0000-0000-0000-000000000001}",
3485                "status": "ok",
3486                "updatecheck": {
3487                  "status": "noupdate",
3488                  "info": "no update for you"
3489                }
3490              }],
3491            }});
3492            let response = serde_json::to_vec(&response).unwrap();
3493            let http = MockHttpRequest::new(HttpResponse::new(response.clone()));
3494
3495            let storage = Rc::new(Mutex::new(MemStorage::new()));
3496            let mock_time = MockTimeSource::new_from_now();
3497
3498            let mut state_machine = StateMachineBuilder::new_stub()
3499                .http(http)
3500                .installer(TestInstaller::builder(mock_time.clone()).build())
3501                .policy_engine(StubPolicyEngine::new(mock_time.clone()))
3502                .metrics_reporter(MockMetricsReporter::new())
3503                .storage(Rc::clone(&storage))
3504                .build()
3505                .await;
3506
3507            state_machine.run_once().await;
3508
3509            // FIXME(https://github.com/rust-lang/rustfmt/issues/4530) rustfmt doesn't wrap slice
3510            // patterns yet.
3511            #[rustfmt::skip]
3512            assert_matches!(
3513                state_machine.metrics_reporter.metrics.as_slice(),
3514                [
3515                    Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
3516                    Metrics::RequestsPerCheck { count: 1, successful: true },
3517                    Metrics::AttemptsToSuccessfulCheck(1),
3518                ]
3519            );
3520        });
3521    }
3522
3523    #[test]
3524    fn test_successful_update_triggers_reboot() {
3525        let mut pool = LocalPool::new();
3526        let spawner = pool.spawner();
3527
3528        let http = MockHttpRequest::new(make_update_available_response());
3529        let mock_time = MockTimeSource::new_from_now();
3530        let next_update_time = mock_time.now();
3531        let (timer, mut timers) = BlockingTimer::new();
3532
3533        let installer = TestInstaller::builder(mock_time.clone()).build();
3534        let reboot_called = Rc::clone(&installer.reboot_called);
3535        let (_ctl, state_machine) = pool.run_until(
3536            StateMachineBuilder::new_stub()
3537                .http(http)
3538                .installer(installer)
3539                .policy_engine(StubPolicyEngine::new(mock_time))
3540                .timer(timer)
3541                .start(),
3542        );
3543        let observer = TestObserver::default();
3544        spawner
3545            .spawn_local(observer.observe(state_machine))
3546            .unwrap();
3547
3548        let blocked_timer = pool.run_until(timers.next()).unwrap();
3549        assert_eq!(
3550            blocked_timer.requested_wait(),
3551            RequestedWait::Until(next_update_time.into())
3552        );
3553        blocked_timer.unblock();
3554        pool.run_until_stalled();
3555
3556        assert!(*reboot_called.borrow());
3557    }
3558
3559    #[test]
3560    fn test_skip_reboot_if_not_needed() {
3561        let mut pool = LocalPool::new();
3562        let spawner = pool.spawner();
3563
3564        let http = MockHttpRequest::new(make_update_available_response());
3565        let mock_time = MockTimeSource::new_from_now();
3566        let next_update_time = mock_time.now();
3567        let reboot_check_options_received = Rc::new(RefCell::new(vec![]));
3568        let policy_engine = MockPolicyEngine {
3569            reboot_check_options_received: Rc::clone(&reboot_check_options_received),
3570            check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
3571            time_source: mock_time.clone(),
3572            reboot_needed: Rc::new(RefCell::new(false)),
3573            ..MockPolicyEngine::default()
3574        };
3575        let (timer, mut timers) = BlockingTimer::new();
3576
3577        let installer = TestInstaller::builder(mock_time).build();
3578        let reboot_called = Rc::clone(&installer.reboot_called);
3579        let (_ctl, state_machine) = pool.run_until(
3580            StateMachineBuilder::new_stub()
3581                .http(http)
3582                .installer(installer)
3583                .policy_engine(policy_engine)
3584                .timer(timer)
3585                .start(),
3586        );
3587        let observer = TestObserver::default();
3588        spawner
3589            .spawn_local(observer.observe(state_machine))
3590            .unwrap();
3591
3592        let blocked_timer = pool.run_until(timers.next()).unwrap();
3593        assert_eq!(
3594            blocked_timer.requested_wait(),
3595            RequestedWait::Until(next_update_time.into())
3596        );
3597        blocked_timer.unblock();
3598        pool.run_until_stalled();
3599
3600        assert_eq!(
3601            observer.take_states(),
3602            vec![
3603                State::CheckingForUpdates(InstallSource::ScheduledTask),
3604                State::InstallingUpdate,
3605                State::Idle
3606            ]
3607        );
3608
3609        assert_eq!(*reboot_check_options_received.borrow(), vec![]);
3610        assert!(!*reboot_called.borrow());
3611    }
3612
3613    #[test]
3614    fn test_failed_update_does_not_trigger_reboot() {
3615        let mut pool = LocalPool::new();
3616        let spawner = pool.spawner();
3617
3618        let http = MockHttpRequest::new(make_update_available_response());
3619        let mock_time = MockTimeSource::new_from_now();
3620        let next_update_time = mock_time.now();
3621        let (timer, mut timers) = BlockingTimer::new();
3622
3623        let installer = TestInstaller::builder(mock_time.clone())
3624            .add_install_fail()
3625            .build();
3626        let reboot_called = Rc::clone(&installer.reboot_called);
3627        let (_ctl, state_machine) = pool.run_until(
3628            StateMachineBuilder::new_stub()
3629                .http(http)
3630                .installer(installer)
3631                .policy_engine(StubPolicyEngine::new(mock_time))
3632                .timer(timer)
3633                .start(),
3634        );
3635        let observer = TestObserver::default();
3636        spawner
3637            .spawn_local(observer.observe(state_machine))
3638            .unwrap();
3639
3640        let blocked_timer = pool.run_until(timers.next()).unwrap();
3641        assert_eq!(
3642            blocked_timer.requested_wait(),
3643            RequestedWait::Until(next_update_time.into())
3644        );
3645        blocked_timer.unblock();
3646        pool.run_until_stalled();
3647
3648        assert!(!*reboot_called.borrow());
3649    }
3650
3651    // Verify that if we are in the middle of checking for or applying an update, a new OnDemand
3652    // update check request will "upgrade" the inflight check request to behave as if it was
3653    // OnDemand. In particular, this should cause an immediate reboot.
3654    #[test]
3655    fn test_reboots_immediately_if_user_initiated_update_requests_occurs_during_install() {
3656        let mut pool = LocalPool::new();
3657        let spawner = pool.spawner();
3658
3659        let http = MockHttpRequest::new(make_update_available_response());
3660        let mock_time = MockTimeSource::new_from_now();
3661
3662        let (send_install, mut recv_install) = mpsc::channel(0);
3663        let (send_reboot, mut recv_reboot) = mpsc::channel(0);
3664        let reboot_check_options_received = Rc::new(RefCell::new(vec![]));
3665        let policy_engine = MockPolicyEngine {
3666            reboot_check_options_received: Rc::clone(&reboot_check_options_received),
3667            check_timing: Some(CheckTiming::builder().time(mock_time.now()).build()),
3668            ..MockPolicyEngine::default()
3669        };
3670
3671        let (mut ctl, state_machine) = pool.run_until(
3672            StateMachineBuilder::new_stub()
3673                .http(http)
3674                .installer(BlockingInstaller {
3675                    on_install: send_install,
3676                    on_reboot: Some(send_reboot),
3677                })
3678                .policy_engine(policy_engine)
3679                .start(),
3680        );
3681
3682        let observer = TestObserver::default();
3683        spawner
3684            .spawn_local(observer.observe(state_machine))
3685            .unwrap();
3686
3687        let unblock_install = pool.run_until(recv_install.next()).unwrap();
3688        pool.run_until_stalled();
3689        assert_eq!(
3690            observer.take_states(),
3691            vec![
3692                State::CheckingForUpdates(InstallSource::ScheduledTask),
3693                State::InstallingUpdate
3694            ]
3695        );
3696
3697        pool.run_until(async {
3698            assert_eq!(
3699                ctl.start_update_check(CheckOptions {
3700                    source: InstallSource::OnDemand
3701                })
3702                .await,
3703                Ok(StartUpdateCheckResponse::AlreadyRunning)
3704            );
3705        });
3706
3707        pool.run_until_stalled();
3708        assert_eq!(observer.take_states(), vec![]);
3709
3710        unblock_install
3711            .send(vec![AppInstallResult::Installed])
3712            .unwrap();
3713        pool.run_until_stalled();
3714        assert_eq!(observer.take_states(), vec![State::WaitingForReboot]);
3715
3716        let unblock_reboot = pool.run_until(recv_reboot.next()).unwrap();
3717        pool.run_until_stalled();
3718        unblock_reboot.send(Ok(())).unwrap();
3719
3720        // Make sure when we checked whether we could reboot, it was from an OnDemand source
3721        assert_eq!(
3722            *reboot_check_options_received.borrow(),
3723            vec![CheckOptions {
3724                source: InstallSource::OnDemand
3725            }]
3726        );
3727    }
3728
3729    // Verifies that if the state machine is done with an install and waiting for a reboot, and a
3730    // user-initiated UpdateCheckRequest comes in, we reboot immediately.
3731    #[test]
3732    fn test_reboots_immediately_when_check_now_comes_in_during_wait() {
3733        let mut pool = LocalPool::new();
3734        let spawner = pool.spawner();
3735
3736        let mut http = MockHttpRequest::new(make_update_available_response());
3737        // Responses to events.
3738        http.add_response(HttpResponse::new(vec![]));
3739        http.add_response(HttpResponse::new(vec![]));
3740        http.add_response(HttpResponse::new(vec![]));
3741        // Response to the ping.
3742        http.add_response(make_update_available_response());
3743        let mut mock_time = MockTimeSource::new_from_now();
3744        mock_time.truncate_submicrosecond_walltime();
3745        let next_update_time = mock_time.now() + Duration::from_secs(1000);
3746        let (timer, mut timers) = BlockingTimer::new();
3747        let reboot_allowed = Rc::new(RefCell::new(false));
3748        let reboot_check_options_received = Rc::new(RefCell::new(vec![]));
3749        let policy_engine = MockPolicyEngine {
3750            time_source: mock_time.clone(),
3751            reboot_allowed: Rc::clone(&reboot_allowed),
3752            check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
3753            reboot_check_options_received: Rc::clone(&reboot_check_options_received),
3754            ..MockPolicyEngine::default()
3755        };
3756        let installer = TestInstaller::builder(mock_time.clone()).build();
3757        let reboot_called = Rc::clone(&installer.reboot_called);
3758        let storage_ref = Rc::new(Mutex::new(MemStorage::new()));
3759        let apps = make_test_app_set();
3760
3761        let (mut ctl, state_machine) = pool.run_until(
3762            StateMachineBuilder::new_stub()
3763                .app_set(apps)
3764                .http(http)
3765                .installer(installer)
3766                .policy_engine(policy_engine)
3767                .timer(timer)
3768                .storage(Rc::clone(&storage_ref))
3769                .start(),
3770        );
3771
3772        let observer = TestObserver::default();
3773        spawner
3774            .spawn_local(observer.observe(state_machine))
3775            .unwrap();
3776
3777        // The first wait before update check.
3778        let blocked_timer = pool.run_until(timers.next()).unwrap();
3779        assert_eq!(
3780            blocked_timer.requested_wait(),
3781            RequestedWait::Until(next_update_time.into())
3782        );
3783        blocked_timer.unblock();
3784        pool.run_until_stalled();
3785
3786        // The timers for reboot and ping, even though the order should be deterministic, but that
3787        // is an implementation detail, the test should still pass if that order changes.
3788        let blocked_timer1 = pool.run_until(timers.next()).unwrap();
3789        let blocked_timer2 = pool.run_until(timers.next()).unwrap();
3790        let (wait_for_reboot_timer, _wait_for_next_ping_timer) =
3791            match blocked_timer1.requested_wait() {
3792                RequestedWait::For(_) => (blocked_timer1, blocked_timer2),
3793                RequestedWait::Until(_) => (blocked_timer2, blocked_timer1),
3794            };
3795        // This is the timer waiting for next reboot_allowed check.
3796        assert_eq!(
3797            wait_for_reboot_timer.requested_wait(),
3798            RequestedWait::For(CHECK_REBOOT_ALLOWED_INTERVAL)
3799        );
3800
3801        // If we send an update check request that's from a user (source == OnDemand), we should
3802        // short-circuit the wait for reboot, and update immediately.
3803        assert!(!*reboot_called.borrow());
3804        *reboot_allowed.borrow_mut() = true;
3805        pool.run_until(async {
3806            assert_eq!(
3807                ctl.start_update_check(CheckOptions {
3808                    source: InstallSource::OnDemand
3809                })
3810                .await,
3811                Ok(StartUpdateCheckResponse::AlreadyRunning)
3812            );
3813        });
3814        pool.run_until_stalled();
3815        assert!(*reboot_called.borrow());
3816
3817        // Check that we got one check for reboot from a Scheduled Task (the start of the wait),
3818        // and then another came in with OnDemand, as we "upgraded it" with our OnDemand check
3819        // request
3820        assert_eq!(
3821            *reboot_check_options_received.borrow(),
3822            vec![
3823                CheckOptions {
3824                    source: InstallSource::ScheduledTask
3825                },
3826                CheckOptions {
3827                    source: InstallSource::OnDemand
3828                },
3829            ]
3830        );
3831    }
3832
3833    // Verifies that if reboot is not allowed, state machine will send pings to Omaha while waiting
3834    // for reboot, and it will reply AlreadyRunning to any StartUpdateCheck requests, and when it's
3835    // finally time to reboot, it will trigger reboot.
3836    #[test]
3837    fn test_wait_for_reboot() {
3838        let mut pool = LocalPool::new();
3839        let spawner = pool.spawner();
3840
3841        let mut http = MockHttpRequest::new(make_update_available_response());
3842        // Responses to events.
3843        http.add_response(HttpResponse::new(vec![]));
3844        http.add_response(HttpResponse::new(vec![]));
3845        http.add_response(HttpResponse::new(vec![]));
3846        // Response to the ping.
3847        http.add_response(make_update_available_response());
3848        let ping_request_viewer = MockHttpRequest::from_request_cell(http.get_request_cell());
3849        let mut mock_time = MockTimeSource::new_from_now();
3850        mock_time.truncate_submicrosecond_walltime();
3851        let next_update_time = mock_time.now() + Duration::from_secs(1000);
3852        let (timer, mut timers) = BlockingTimer::new();
3853        let reboot_allowed = Rc::new(RefCell::new(false));
3854        let policy_engine = MockPolicyEngine {
3855            time_source: mock_time.clone(),
3856            reboot_allowed: Rc::clone(&reboot_allowed),
3857            check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
3858            ..MockPolicyEngine::default()
3859        };
3860        let installer = TestInstaller::builder(mock_time.clone()).build();
3861        let reboot_called = Rc::clone(&installer.reboot_called);
3862        let storage_ref = Rc::new(Mutex::new(MemStorage::new()));
3863        let apps = make_test_app_set();
3864
3865        let (mut ctl, state_machine) = pool.run_until(
3866            StateMachineBuilder::new_stub()
3867                .app_set(apps.clone())
3868                .http(http)
3869                .installer(installer)
3870                .policy_engine(policy_engine)
3871                .timer(timer)
3872                .storage(Rc::clone(&storage_ref))
3873                .start(),
3874        );
3875
3876        let observer = TestObserver::default();
3877        spawner
3878            .spawn_local(observer.observe(state_machine))
3879            .unwrap();
3880
3881        // The first wait before update check.
3882        let blocked_timer = pool.run_until(timers.next()).unwrap();
3883        assert_eq!(
3884            blocked_timer.requested_wait(),
3885            RequestedWait::Until(next_update_time.into())
3886        );
3887        blocked_timer.unblock();
3888        pool.run_until_stalled();
3889
3890        // The timers for reboot and ping, even though the order should be deterministic, but that
3891        // is an implementation detail, the test should still pass if that order changes.
3892        let blocked_timer1 = pool.run_until(timers.next()).unwrap();
3893        let blocked_timer2 = pool.run_until(timers.next()).unwrap();
3894        let (wait_for_reboot_timer, wait_for_next_ping_timer) =
3895            match blocked_timer1.requested_wait() {
3896                RequestedWait::For(_) => (blocked_timer1, blocked_timer2),
3897                RequestedWait::Until(_) => (blocked_timer2, blocked_timer1),
3898            };
3899        // This is the timer waiting for next reboot_allowed check.
3900        assert_eq!(
3901            wait_for_reboot_timer.requested_wait(),
3902            RequestedWait::For(CHECK_REBOOT_ALLOWED_INTERVAL)
3903        );
3904        // This is the timer waiting for the next ping.
3905        assert_eq!(
3906            wait_for_next_ping_timer.requested_wait(),
3907            RequestedWait::Until(next_update_time.into())
3908        );
3909        // Unblock the ping.
3910        mock_time.advance(Duration::from_secs(1000));
3911        wait_for_next_ping_timer.unblock();
3912        pool.run_until_stalled();
3913
3914        // Verify that it sends a ping.
3915        let config = crate::configuration::test_support::config_generator();
3916        let request_params = RequestParams::default();
3917
3918        let apps = pool.run_until(apps.lock()).get_apps();
3919        let mut expected_request_builder = RequestBuilder::new(&config, &request_params)
3920            // 0: session id for update check
3921            // 1: request id for update check
3922            // 2-4: request id for events
3923            .session_id(GUID::from_u128(5))
3924            .request_id(GUID::from_u128(6));
3925        for app in &apps {
3926            expected_request_builder = expected_request_builder.add_ping(app);
3927        }
3928        pool.run_until(assert_request(
3929            &ping_request_viewer,
3930            expected_request_builder,
3931        ));
3932
3933        pool.run_until(async {
3934            assert_eq!(
3935                ctl.start_update_check(CheckOptions::default()).await,
3936                Ok(StartUpdateCheckResponse::AlreadyRunning)
3937            );
3938        });
3939
3940        // Last update time is updated in storage.
3941        pool.run_until(async {
3942            let storage = storage_ref.lock().await;
3943            let context = update_check::Context::load(&*storage).await;
3944            assert_eq!(
3945                context.schedule.last_update_time,
3946                Some(mock_time.now_in_walltime().into())
3947            );
3948        });
3949
3950        // State machine should be waiting for the next ping.
3951        let wait_for_next_ping_timer = pool.run_until(timers.next()).unwrap();
3952        assert_eq!(
3953            wait_for_next_ping_timer.requested_wait(),
3954            RequestedWait::Until(next_update_time.into())
3955        );
3956
3957        // Let state machine check reboot_allowed again, but still don't allow it.
3958        wait_for_reboot_timer.unblock();
3959        pool.run_until_stalled();
3960        assert!(!*reboot_called.borrow());
3961
3962        // State machine should be waiting for the next reboot.
3963        let wait_for_reboot_timer = pool.run_until(timers.next()).unwrap();
3964        assert_eq!(
3965            wait_for_reboot_timer.requested_wait(),
3966            RequestedWait::For(CHECK_REBOOT_ALLOWED_INTERVAL)
3967        );
3968
3969        // Time for a second ping.
3970        wait_for_next_ping_timer.unblock();
3971        pool.run_until_stalled();
3972
3973        // Verify that it sends another ping.
3974        let mut expected_request_builder = RequestBuilder::new(&config, &request_params)
3975            .session_id(GUID::from_u128(7))
3976            .request_id(GUID::from_u128(8));
3977        for app in &apps {
3978            expected_request_builder = expected_request_builder.add_ping(app);
3979        }
3980        pool.run_until(assert_request(
3981            &ping_request_viewer,
3982            expected_request_builder,
3983        ));
3984
3985        assert!(!*reboot_called.borrow());
3986
3987        // Now allow reboot.
3988        *reboot_called.borrow_mut() = true;
3989        wait_for_reboot_timer.unblock();
3990        pool.run_until_stalled();
3991        assert!(*reboot_called.borrow());
3992    }
3993
3994    #[derive(Debug)]
3995    struct BlockingInstaller {
3996        on_install: mpsc::Sender<oneshot::Sender<Vec<AppInstallResult<StubInstallErrors>>>>,
3997        on_reboot: Option<mpsc::Sender<oneshot::Sender<Result<(), anyhow::Error>>>>,
3998    }
3999
4000    impl Installer for BlockingInstaller {
4001        type InstallPlan = StubPlan;
4002        type Error = StubInstallErrors;
4003        type InstallResult = ();
4004
4005        fn perform_install(
4006            &mut self,
4007            _install_plan: &StubPlan,
4008            _observer: Option<&dyn ProgressObserver>,
4009        ) -> LocalBoxFuture<'_, (Self::InstallResult, Vec<AppInstallResult<Self::Error>>)> {
4010            let (send, recv) = oneshot::channel();
4011            let send_fut = self.on_install.send(send);
4012
4013            async move {
4014                send_fut.await.unwrap();
4015                ((), recv.await.unwrap())
4016            }
4017            .boxed_local()
4018        }
4019
4020        fn perform_reboot(&mut self) -> LocalBoxFuture<'_, Result<(), anyhow::Error>> {
4021            match &mut self.on_reboot {
4022                Some(on_reboot) => {
4023                    let (send, recv) = oneshot::channel();
4024                    let send_fut = on_reboot.send(send);
4025
4026                    async move {
4027                        send_fut.await.unwrap();
4028                        recv.await.unwrap()
4029                    }
4030                    .boxed_local()
4031                }
4032                None => future::ready(Ok(())).boxed_local(),
4033            }
4034        }
4035
4036        fn try_create_install_plan<'a>(
4037            &'a self,
4038            _request_params: &'a RequestParams,
4039            _request_metadata: Option<&'a RequestMetadata>,
4040            _response: &'a Response,
4041            _response_bytes: Vec<u8>,
4042            _ecdsa_signature: Option<Vec<u8>>,
4043        ) -> LocalBoxFuture<'a, Result<Self::InstallPlan, Self::Error>> {
4044            future::ready(Ok(StubPlan)).boxed_local()
4045        }
4046    }
4047
4048    #[derive(Debug, Default)]
4049    struct TestObserver {
4050        states: Rc<RefCell<Vec<State>>>,
4051    }
4052
4053    impl TestObserver {
4054        fn observe<T: Stream<Item = StateMachineEvent>>(
4055            &self,
4056            s: T,
4057        ) -> impl Future<Output = ()> + use<T> {
4058            let states = Rc::clone(&self.states);
4059            async move {
4060                futures::pin_mut!(s);
4061                while let Some(event) = s.next().await {
4062                    if let StateMachineEvent::StateChange(state) = event {
4063                        states.borrow_mut().push(state);
4064                    }
4065                }
4066            }
4067        }
4068
4069        fn observe_until_terminal<T: Stream<Item = StateMachineEvent>>(
4070            &self,
4071            s: T,
4072        ) -> impl Future<Output = ()> + use<T> {
4073            let states = Rc::clone(&self.states);
4074            async move {
4075                futures::pin_mut!(s);
4076                while let Some(event) = s.next().await {
4077                    if let StateMachineEvent::StateChange(state) = event {
4078                        states.borrow_mut().push(state);
4079                        match state {
4080                            State::Idle | State::WaitingForReboot => return,
4081                            _ => {}
4082                        }
4083                    }
4084                }
4085            }
4086        }
4087
4088        fn take_states(&self) -> Vec<State> {
4089            std::mem::take(&mut *self.states.borrow_mut())
4090        }
4091    }
4092
4093    #[test]
4094    fn test_start_update_during_update_replies_with_in_progress() {
4095        let mut pool = LocalPool::new();
4096        let spawner = pool.spawner();
4097
4098        let http = MockHttpRequest::new(make_update_available_response());
4099        let (send_install, mut recv_install) = mpsc::channel(0);
4100        let (mut ctl, state_machine) = pool.run_until(
4101            StateMachineBuilder::new_stub()
4102                .http(http)
4103                .installer(BlockingInstaller {
4104                    on_install: send_install,
4105                    on_reboot: None,
4106                })
4107                .start(),
4108        );
4109
4110        let observer = TestObserver::default();
4111        spawner
4112            .spawn_local(observer.observe_until_terminal(state_machine))
4113            .unwrap();
4114
4115        let unblock_install = pool.run_until(recv_install.next()).unwrap();
4116        pool.run_until_stalled();
4117        assert_eq!(
4118            observer.take_states(),
4119            vec![
4120                State::CheckingForUpdates(InstallSource::ScheduledTask),
4121                State::InstallingUpdate
4122            ]
4123        );
4124
4125        pool.run_until(async {
4126            assert_eq!(
4127                ctl.start_update_check(CheckOptions::default()).await,
4128                Ok(StartUpdateCheckResponse::AlreadyRunning)
4129            );
4130        });
4131        pool.run_until_stalled();
4132        assert_eq!(observer.take_states(), vec![]);
4133
4134        unblock_install
4135            .send(vec![AppInstallResult::Installed])
4136            .unwrap();
4137        pool.run_until_stalled();
4138
4139        assert_eq!(observer.take_states(), vec![State::WaitingForReboot]);
4140    }
4141
4142    #[test]
4143    fn test_start_update_during_timer_starts_update() {
4144        let mut pool = LocalPool::new();
4145        let spawner = pool.spawner();
4146
4147        let mut mock_time = MockTimeSource::new_from_now();
4148        let next_update_time = mock_time.now() + Duration::from_secs(321);
4149
4150        let (timer, mut timers) = BlockingTimer::new();
4151        let policy_engine = MockPolicyEngine {
4152            check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
4153            time_source: mock_time.clone(),
4154            ..MockPolicyEngine::default()
4155        };
4156        let (mut ctl, state_machine) = pool.run_until(
4157            StateMachineBuilder::new_stub()
4158                .policy_engine(policy_engine)
4159                .timer(timer)
4160                .start(),
4161        );
4162
4163        let observer = TestObserver::default();
4164        spawner
4165            .spawn_local(observer.observe(state_machine))
4166            .unwrap();
4167
4168        let blocked_timer = pool.run_until(timers.next()).unwrap();
4169        assert_eq!(
4170            blocked_timer.requested_wait(),
4171            RequestedWait::Until(next_update_time.into())
4172        );
4173        mock_time.advance(Duration::from_secs(200));
4174        assert_eq!(observer.take_states(), vec![]);
4175
4176        // Nothing happens while the timer is waiting.
4177        pool.run_until_stalled();
4178        assert_eq!(observer.take_states(), vec![]);
4179
4180        blocked_timer.unblock();
4181        let blocked_timer = pool.run_until(timers.next()).unwrap();
4182        assert_eq!(
4183            blocked_timer.requested_wait(),
4184            RequestedWait::Until(next_update_time.into())
4185        );
4186        assert_eq!(
4187            observer.take_states(),
4188            vec![
4189                State::CheckingForUpdates(InstallSource::ScheduledTask),
4190                State::ErrorCheckingForUpdate,
4191                State::Idle
4192            ]
4193        );
4194
4195        // Unless a control signal to start an update check comes in.
4196        pool.run_until(async {
4197            assert_eq!(
4198                ctl.start_update_check(CheckOptions::default()).await,
4199                Ok(StartUpdateCheckResponse::Started)
4200            );
4201        });
4202        pool.run_until_stalled();
4203        assert_eq!(
4204            observer.take_states(),
4205            vec![
4206                State::CheckingForUpdates(InstallSource::ScheduledTask),
4207                State::ErrorCheckingForUpdate,
4208                State::Idle
4209            ]
4210        );
4211    }
4212
4213    #[test]
4214    fn test_start_update_check_returns_throttled() {
4215        let mut pool = LocalPool::new();
4216        let spawner = pool.spawner();
4217
4218        let mut mock_time = MockTimeSource::new_from_now();
4219        let next_update_time = mock_time.now() + Duration::from_secs(321);
4220
4221        let (timer, mut timers) = BlockingTimer::new();
4222        let policy_engine = MockPolicyEngine {
4223            check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
4224            time_source: mock_time.clone(),
4225            check_decision: CheckDecision::ThrottledByPolicy,
4226            ..MockPolicyEngine::default()
4227        };
4228        let (mut ctl, state_machine) = pool.run_until(
4229            StateMachineBuilder::new_stub()
4230                .policy_engine(policy_engine)
4231                .timer(timer)
4232                .start(),
4233        );
4234
4235        let observer = TestObserver::default();
4236        spawner
4237            .spawn_local(observer.observe(state_machine))
4238            .unwrap();
4239
4240        let blocked_timer = pool.run_until(timers.next()).unwrap();
4241        assert_eq!(
4242            blocked_timer.requested_wait(),
4243            RequestedWait::Until(next_update_time.into())
4244        );
4245        mock_time.advance(Duration::from_secs(200));
4246        assert_eq!(observer.take_states(), vec![]);
4247
4248        pool.run_until(async {
4249            assert_eq!(
4250                ctl.start_update_check(CheckOptions::default()).await,
4251                Ok(StartUpdateCheckResponse::Throttled)
4252            );
4253        });
4254        pool.run_until_stalled();
4255        assert_eq!(observer.take_states(), vec![]);
4256    }
4257
4258    #[test]
4259    fn test_progress_observer() {
4260        block_on(async {
4261            let http = MockHttpRequest::new(make_update_available_response());
4262            let mock_time = MockTimeSource::new_from_now();
4263            let progresses = StateMachineBuilder::new_stub()
4264                .http(http)
4265                .installer(TestInstaller::builder(mock_time.clone()).build())
4266                .policy_engine(StubPolicyEngine::new(mock_time))
4267                .oneshot_check()
4268                .await
4269                .filter_map(|event| {
4270                    future::ready(match event {
4271                        StateMachineEvent::InstallProgressChange(InstallProgress { progress }) => {
4272                            Some(progress)
4273                        }
4274                        _ => None,
4275                    })
4276                })
4277                .collect::<Vec<f32>>()
4278                .await;
4279            assert_eq!(progresses, [0.0, 0.3, 0.9, 1.0]);
4280        });
4281    }
4282
4283    #[test]
4284    // A scenario in which
4285    // (now_in_monotonic - state_machine_start_in_monotonic) > (update_finish_time - now_in_wall)
4286    // should not panic.
4287    fn test_report_waited_for_reboot_duration_doesnt_panic_on_wrong_current_time() {
4288        block_on(async {
4289            let metrics_reporter = MockMetricsReporter::new();
4290
4291            let state_machine_start_monotonic = Instant::now();
4292            let update_finish_time = SystemTime::now();
4293
4294            // Set the monotonic increase in time larger than the wall time increase since the end
4295            // of the last update.
4296            // This can happen if we don't have a reliable current wall time.
4297            let now_wall = update_finish_time + Duration::from_secs(1);
4298            let now_monotonic = state_machine_start_monotonic + Duration::from_secs(10);
4299
4300            let mut state_machine = StateMachineBuilder::new_stub()
4301                .metrics_reporter(metrics_reporter)
4302                .build()
4303                .await;
4304
4305            // Time has advanced monotonically since we noted the start of the state machine for
4306            // longer than the wall time difference between update finish time and now.
4307            // This computation should currently overflow.
4308            state_machine
4309                .report_waited_for_reboot_duration(
4310                    update_finish_time,
4311                    state_machine_start_monotonic,
4312                    ComplexTime {
4313                        wall: now_wall,
4314                        mono: now_monotonic,
4315                    },
4316                )
4317                .expect_err("should overflow and error out");
4318
4319            // We should have reported no metrics
4320            assert!(state_machine.metrics_reporter.metrics.is_empty());
4321        });
4322    }
4323
4324    #[test]
4325    fn test_report_waited_for_reboot_duration() {
4326        let mut pool = LocalPool::new();
4327        let spawner = pool.spawner();
4328
4329        let response = json!({"response": {
4330            "server": "prod",
4331            "protocol": "3.0",
4332            "app": [{
4333            "appid": "{00000000-0000-0000-0000-000000000001}",
4334            "status": "ok",
4335            "updatecheck": {
4336                "status": "ok",
4337                "manifest": {
4338                    "version": "1.2.3.5",
4339                    "actions": {
4340                        "action": [],
4341                    },
4342                    "packages": {
4343                        "package": [],
4344                    },
4345                }
4346            }
4347            }],
4348        }});
4349        let response = serde_json::to_vec(&response).unwrap();
4350        let http = MockHttpRequest::new(HttpResponse::new(response));
4351        let mut mock_time = MockTimeSource::new_from_now();
4352        mock_time.truncate_submicrosecond_walltime();
4353        let storage = Rc::new(Mutex::new(MemStorage::new()));
4354
4355        // Do one update.
4356        assert_matches!(
4357            pool.run_until(
4358                StateMachineBuilder::new_stub()
4359                    .http(http)
4360                    .policy_engine(StubPolicyEngine::new(mock_time.clone()))
4361                    .storage(Rc::clone(&storage))
4362                    .oneshot(RequestParams::default())
4363            ),
4364            Ok(_)
4365        );
4366
4367        mock_time.advance(Duration::from_secs(999));
4368
4369        // Execute state machine `run()`, simulating that we already rebooted.
4370        let config = Config {
4371            updater: Updater {
4372                name: "updater".to_string(),
4373                version: Version::from([0, 1]),
4374            },
4375            os: OS {
4376                version: "1.2.3.5".to_string(),
4377                ..OS::default()
4378            },
4379            service_url: "http://example.com/".to_string(),
4380            omaha_public_keys: None,
4381        };
4382        let metrics_reporter = Rc::new(RefCell::new(MockMetricsReporter::new()));
4383        let (_ctl, state_machine) = pool.run_until(
4384            StateMachineBuilder::new_stub()
4385                .config(config)
4386                .metrics_reporter(Rc::clone(&metrics_reporter))
4387                .policy_engine(StubPolicyEngine::new(mock_time.clone()))
4388                .storage(Rc::clone(&storage))
4389                .timer(MockTimer::new())
4390                .start(),
4391        );
4392
4393        // Move state machine forward using observer.
4394        let observer = TestObserver::default();
4395        spawner
4396            .spawn_local(observer.observe(state_machine))
4397            .unwrap();
4398        pool.run_until_stalled();
4399
4400        assert_eq!(
4401            metrics_reporter
4402                .borrow()
4403                .metrics
4404                .iter()
4405                .filter(|m| matches!(m, Metrics::WaitedForRebootDuration(_)))
4406                .collect::<Vec<_>>(),
4407            vec![&Metrics::WaitedForRebootDuration(Duration::from_secs(999))]
4408        );
4409
4410        // Verify that storage is cleaned up.
4411        pool.run_until(async {
4412            let storage = storage.lock().await;
4413            assert_eq!(storage.get_time(UPDATE_FINISH_TIME).await, None);
4414            assert_eq!(storage.get_string(TARGET_VERSION).await, None);
4415            assert!(storage.committed());
4416        })
4417    }
4418
4419    // The same as |run_simple_check_with_noupdate_result|, but with CUPv2 protocol validation.
4420    #[test]
4421    fn run_cup_but_decoration_error() {
4422        block_on(async {
4423            let http = MockHttpRequest::new(HttpResponse::new(make_noupdate_httpresponse()));
4424
4425            let stub_cup_handler = MockCupv2Handler::new().set_decoration_error(|| {
4426                Some(CupDecorationError::ParseError(
4427                    "".parse::<http::Uri>().unwrap_err(),
4428                ))
4429            });
4430
4431            assert_matches!(
4432                StateMachineBuilder::new_stub()
4433                    .http(http)
4434                    .cup_handler(Some(stub_cup_handler))
4435                    .oneshot(RequestParams::default())
4436                    .await,
4437                Err(UpdateCheckError::OmahaRequest(
4438                    OmahaRequestError::CupDecoration(CupDecorationError::ParseError(_))
4439                ))
4440            );
4441
4442            info!("update check complete!");
4443        });
4444    }
4445
4446    #[test]
4447    fn run_cup_but_verification_error() {
4448        block_on(async {
4449            let http = MockHttpRequest::new(HttpResponse::new(make_noupdate_httpresponse()));
4450
4451            let stub_cup_handler = MockCupv2Handler::new()
4452                .set_verification_error(|| Some(CupVerificationError::EtagHeaderMissing));
4453
4454            assert_matches!(
4455                StateMachineBuilder::new_stub()
4456                    .http(http)
4457                    .cup_handler(Some(stub_cup_handler))
4458                    .oneshot(RequestParams::default())
4459                    .await,
4460                Err(UpdateCheckError::OmahaRequest(
4461                    OmahaRequestError::CupValidation(CupVerificationError::EtagHeaderMissing)
4462                ))
4463            );
4464
4465            info!("update check complete!");
4466        });
4467    }
4468
4469    #[test]
4470    fn run_cup_valid() {
4471        block_on(async {
4472            let http = MockHttpRequest::new(HttpResponse::new(make_noupdate_httpresponse()));
4473
4474            assert_matches!(
4475                StateMachineBuilder::new_stub()
4476                    .http(http)
4477                    // Default stub_cup_handler, which is permissive.
4478                    .oneshot(RequestParams::default())
4479                    .await,
4480                Ok(_)
4481            );
4482
4483            info!("update check complete!");
4484        });
4485    }
4486}