use crate::{
app_set::{AppSet, AppSetExt as _},
async_generator,
common::{App, CheckOptions, CheckTiming},
configuration::Config,
cup_ecdsa::{CupDecorationError, CupVerificationError, Cupv2Handler, RequestMetadata},
http_request::{self, HttpRequest},
installer::{AppInstallResult, Installer, Plan},
metrics::{ClockType, Metrics, MetricsReporter, UpdateCheckFailureReason},
policy::{CheckDecision, PolicyEngine, UpdateDecision},
protocol::{
self,
request::{Event, EventErrorCode, EventResult, EventType, InstallSource, GUID},
response::{parse_json_response, OmahaStatus, Response, UpdateCheck},
},
request_builder::{self, RequestBuilder, RequestParams},
storage::{Storage, StorageExt},
time::{ComplexTime, PartialComplexTime, TimeSource, Timer},
};
use anyhow::anyhow;
use futures::{
channel::{mpsc, oneshot},
future::{self, BoxFuture, Fuse},
lock::Mutex,
prelude::*,
select,
};
use http::{response::Parts, Response as HttpResponse};
use p256::ecdsa::DerSignature;
use std::{
cmp::min,
collections::HashMap,
convert::TryInto,
rc::Rc,
str::Utf8Error,
time::{Duration, Instant, SystemTime},
};
use thiserror::Error;
use tracing::{error, info, warn};
pub mod update_check;
mod builder;
pub use builder::StateMachineBuilder;
mod observer;
use observer::StateMachineProgressObserver;
pub use observer::{InstallProgress, StateMachineEvent};
const INSTALL_PLAN_ID: &str = "install_plan_id";
const UPDATE_FIRST_SEEN_TIME: &str = "update_first_seen_time";
const UPDATE_FINISH_TIME: &str = "update_finish_time";
const TARGET_VERSION: &str = "target_version";
const CONSECUTIVE_FAILED_INSTALL_ATTEMPTS: &str = "consecutive_failed_install_attempts";
const CHECK_REBOOT_ALLOWED_INTERVAL: Duration = Duration::from_secs(30 * 60);
const X_RETRY_AFTER: &str = "X-Retry-After";
const MAX_OMAHA_REQUEST_ATTEMPTS: u64 = 3;
#[derive(Debug)]
pub struct StateMachine<PE, HR, IN, TM, MR, ST, AS, CH>
where
PE: PolicyEngine,
HR: HttpRequest,
IN: Installer,
TM: Timer,
MR: MetricsReporter,
ST: Storage,
AS: AppSet,
{
config: Config,
policy_engine: PE,
http: HR,
installer: IN,
timer: TM,
time_source: PE::TimeSource,
metrics_reporter: MR,
storage_ref: Rc<Mutex<ST>>,
context: update_check::Context,
app_set: Rc<Mutex<AS>>,
cup_handler: Option<CH>,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum State {
Idle,
CheckingForUpdates(InstallSource),
ErrorCheckingForUpdate,
NoUpdateAvailable,
InstallationDeferredByPolicy,
InstallingUpdate,
WaitingForReboot,
InstallationError,
}
#[derive(Error, Debug)]
pub enum OmahaRequestError {
#[error("Unexpected JSON error constructing update check")]
Json(#[from] serde_json::Error),
#[error("Error building update check HTTP request")]
HttpBuilder(#[from] http::Error),
#[error("Error decorating outgoing request with CUPv2 parameters")]
CupDecoration(#[from] CupDecorationError),
#[error("Error validating incoming response with CUPv2 protocol")]
CupValidation(#[from] CupVerificationError),
#[error("HTTP transport error performing update check")]
HttpTransport(#[from] http_request::Error),
#[error("HTTP error performing update check: {0}")]
HttpStatus(hyper::StatusCode),
}
impl From<request_builder::Error> for OmahaRequestError {
fn from(err: request_builder::Error) -> Self {
match err {
request_builder::Error::Json(e) => OmahaRequestError::Json(e),
request_builder::Error::Http(e) => OmahaRequestError::HttpBuilder(e),
request_builder::Error::Cup(e) => OmahaRequestError::CupDecoration(e),
}
}
}
impl From<http::StatusCode> for OmahaRequestError {
fn from(sc: http::StatusCode) -> Self {
OmahaRequestError::HttpStatus(sc)
}
}
#[derive(Error, Debug)]
pub enum ResponseParseError {
#[error("Response was not valid UTF-8")]
Utf8(#[from] Utf8Error),
#[error("Unexpected JSON error parsing update check response")]
Json(#[from] serde_json::Error),
}
#[derive(Error, Debug)]
pub enum UpdateCheckError {
#[error("Error checking with Omaha")]
OmahaRequest(#[from] OmahaRequestError),
#[error("Error parsing Omaha response")]
ResponseParser(#[from] ResponseParseError),
#[error("Unable to create an install plan")]
InstallPlan(#[source] anyhow::Error),
}
#[derive(Clone)]
pub struct ControlHandle(mpsc::Sender<ControlRequest>);
#[derive(Debug, Clone, Error, PartialEq, Eq)]
#[error("state machine dropped before all its control handles")]
pub struct StateMachineGone;
impl From<mpsc::SendError> for StateMachineGone {
fn from(_: mpsc::SendError) -> Self {
StateMachineGone
}
}
impl From<oneshot::Canceled> for StateMachineGone {
fn from(_: oneshot::Canceled) -> Self {
StateMachineGone
}
}
enum ControlRequest {
StartUpdateCheck {
options: CheckOptions,
responder: oneshot::Sender<StartUpdateCheckResponse>,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StartUpdateCheckResponse {
Started,
AlreadyRunning,
Throttled,
}
impl ControlHandle {
pub async fn start_update_check(
&mut self,
options: CheckOptions,
) -> Result<StartUpdateCheckResponse, StateMachineGone> {
let (responder, receive_response) = oneshot::channel();
self.0
.send(ControlRequest::StartUpdateCheck { options, responder })
.await?;
Ok(receive_response.await?)
}
}
#[derive(Debug)]
enum RebootAfterUpdate<T> {
Needed(T),
NotNeeded,
}
impl<PE, HR, IN, TM, MR, ST, AS, IR, PL, CH> StateMachine<PE, HR, IN, TM, MR, ST, AS, CH>
where
PE: PolicyEngine<InstallResult = IR, InstallPlan = PL>,
HR: HttpRequest,
IN: Installer<InstallResult = IR, InstallPlan = PL>,
TM: Timer,
MR: MetricsReporter,
ST: Storage,
AS: AppSet,
CH: Cupv2Handler,
IR: 'static + Send,
PL: Plan,
{
async fn update_next_update_time(
&mut self,
co: &mut async_generator::Yield<StateMachineEvent>,
) -> CheckTiming {
let apps = self.app_set.lock().await.get_apps();
let timing = self
.policy_engine
.compute_next_update_time(&apps, &self.context.schedule, &self.context.state)
.await;
self.context.schedule.next_update_time = Some(timing);
co.yield_(StateMachineEvent::ScheduleChange(self.context.schedule))
.await;
info!("Calculated check timing: {}", timing);
timing
}
async fn make_wait_to_next_check(
&mut self,
check_timing: CheckTiming,
) -> Fuse<BoxFuture<'static, ()>> {
if let Some(minimum_wait) = check_timing.minimum_wait {
future::join(
self.timer.wait_for(minimum_wait),
self.timer.wait_until(check_timing.time),
)
.map(|_| ())
.boxed()
.fuse()
} else {
self.timer.wait_until(check_timing.time).fuse()
}
}
async fn run(
mut self,
mut control: mpsc::Receiver<ControlRequest>,
mut co: async_generator::Yield<StateMachineEvent>,
) {
{
let app_set = self.app_set.lock().await;
if !app_set.all_valid() {
error!(
"App set not valid, not starting state machine: {:#?}",
app_set.get_apps()
);
return;
}
}
let state_machine_start_monotonic_time = self.time_source.now_in_monotonic();
let mut should_report_waited_for_reboot_duration = false;
let update_finish_time = {
let storage = self.storage_ref.lock().await;
let update_finish_time = storage.get_time(UPDATE_FINISH_TIME).await;
if update_finish_time.is_some() {
if let Some(target_version) = storage.get_string(TARGET_VERSION).await {
if target_version == self.config.os.version {
should_report_waited_for_reboot_duration = true;
}
}
}
update_finish_time
};
loop {
info!("Initial context: {:?}", self.context);
if should_report_waited_for_reboot_duration {
match self.report_waited_for_reboot_duration(
update_finish_time.unwrap(),
state_machine_start_monotonic_time,
self.time_source.now(),
) {
Ok(()) => {
should_report_waited_for_reboot_duration = false;
let mut storage = self.storage_ref.lock().await;
storage.remove_or_log(UPDATE_FINISH_TIME).await;
storage.remove_or_log(TARGET_VERSION).await;
storage.commit_or_log().await;
}
Err(e) => {
warn!(
"Couldn't report wait for reboot duration: {:#}, will try again",
e
);
}
}
}
let (mut options, responder) = {
let check_timing = self.update_next_update_time(&mut co).await;
let mut wait_to_next_check = self.make_wait_to_next_check(check_timing).await;
select! {
() = wait_to_next_check => (CheckOptions::default(), None),
ControlRequest::StartUpdateCheck{options, responder} = control.select_next_some() => {
(options, Some(responder))
}
}
};
let reboot_after_update = {
let apps = self.app_set.lock().await.get_apps();
info!(
"Checking to see if an update check is allowed at this time for {:?}",
apps
);
let decision = self
.policy_engine
.update_check_allowed(
&apps,
&self.context.schedule,
&self.context.state,
&options,
)
.await;
info!("The update check decision is: {:?}", decision);
let request_params = match decision {
CheckDecision::Ok(rp) | CheckDecision::OkUpdateDeferred(rp) => rp,
CheckDecision::TooSoon
| CheckDecision::ThrottledByPolicy
| CheckDecision::DeniedByPolicy => {
info!("The update check is not allowed at this time.");
if let Some(responder) = responder {
let _ = responder.send(StartUpdateCheckResponse::Throttled);
}
continue;
}
};
if let Some(responder) = responder {
let _ = responder.send(StartUpdateCheckResponse::Started);
}
let update_check = self.start_update_check(request_params, &mut co).fuse();
futures::pin_mut!(update_check);
loop {
select! {
update_check_result = update_check => break update_check_result,
ControlRequest::StartUpdateCheck{
options: new_options,
responder
} = control.select_next_some() => {
if new_options.source == InstallSource::OnDemand {
info!("Got on demand update check request, ensuring ongoing check is on demand");
options.source = InstallSource::OnDemand;
}
let _ = responder.send(StartUpdateCheckResponse::AlreadyRunning);
}
}
}
};
if let RebootAfterUpdate::Needed(install_result) = reboot_after_update {
Self::yield_state(State::WaitingForReboot, &mut co).await;
self.wait_for_reboot(options, &mut control, install_result, &mut co)
.await;
}
Self::yield_state(State::Idle, &mut co).await;
}
}
async fn wait_for_reboot(
&mut self,
mut options: CheckOptions,
control: &mut mpsc::Receiver<ControlRequest>,
install_result: IN::InstallResult,
co: &mut async_generator::Yield<StateMachineEvent>,
) {
if !self
.policy_engine
.reboot_allowed(&options, &install_result)
.await
{
let wait_to_see_if_reboot_allowed =
self.timer.wait_for(CHECK_REBOOT_ALLOWED_INTERVAL).fuse();
futures::pin_mut!(wait_to_see_if_reboot_allowed);
let check_timing = self.update_next_update_time(co).await;
let wait_to_next_ping = self.make_wait_to_next_check(check_timing).await;
futures::pin_mut!(wait_to_next_ping);
loop {
select! {
() = wait_to_see_if_reboot_allowed => {
if self.policy_engine.reboot_allowed(&options, &install_result).await {
break;
}
info!("Reboot not allowed at the moment, will try again in 30 minutes...");
wait_to_see_if_reboot_allowed.set(
self.timer.wait_for(CHECK_REBOOT_ALLOWED_INTERVAL).fuse()
);
},
() = wait_to_next_ping => {
self.ping_omaha(co).await;
let check_timing = self.update_next_update_time(co).await;
wait_to_next_ping.set(self.make_wait_to_next_check(check_timing).await);
},
ControlRequest::StartUpdateCheck{
options: new_options,
responder
} = control.select_next_some() => {
let _ = responder.send(StartUpdateCheckResponse::AlreadyRunning);
if new_options.source == InstallSource::OnDemand {
info!("Waiting for reboot, but ensuring that InstallSource is OnDemand");
options.source = InstallSource::OnDemand;
if self.policy_engine.reboot_allowed(&options, &install_result).await {
info!("Upgraded update check request to on demand, policy allowed reboot");
break;
}
};
}
}
}
}
info!("Rebooting the system at the end of a successful update");
if let Err(e) = self.installer.perform_reboot().await {
error!("Unable to reboot the system: {}", e);
}
}
fn report_waited_for_reboot_duration(
&mut self,
update_finish_time: SystemTime,
state_machine_start_monotonic_time: Instant,
now: ComplexTime,
) -> Result<(), anyhow::Error> {
let update_finish_time_to_now =
now.wall_duration_since(update_finish_time).map_err(|e| {
anyhow!(
"Update finish time later than now, can't report waited for reboot duration,
update finish time: {:?}, now: {:?}, error: {:?}",
update_finish_time,
now,
e,
)
})?;
let state_machine_start_to_now = now
.mono
.checked_duration_since(state_machine_start_monotonic_time)
.ok_or_else(|| {
error!("Monotonic time appears to have gone backwards");
anyhow!(
"State machine start later than now, can't report waited for reboot duration. \
State machine start: {:?}, now: {:?}",
state_machine_start_monotonic_time,
now.mono,
)
})?;
let waited_for_reboot_duration = update_finish_time_to_now
.checked_sub(state_machine_start_to_now)
.ok_or_else(|| {
anyhow!(
"Can't report waiting for reboot duration, update finish time to now smaller \
than state machine start to now. Update finish time to now: {:?}, state \
machine start to now: {:?}",
update_finish_time_to_now,
state_machine_start_to_now,
)
})?;
info!(
"Waited {} seconds for reboot.",
waited_for_reboot_duration.as_secs()
);
self.report_metrics(Metrics::WaitedForRebootDuration(waited_for_reboot_duration));
Ok(())
}
async fn report_check_interval(&mut self, install_source: InstallSource) {
let now = self.time_source.now();
match self.context.schedule.last_update_check_time {
Some(PartialComplexTime::Wall(t)) => match now.wall_duration_since(t) {
Ok(interval) => self.report_metrics(Metrics::UpdateCheckInterval {
interval,
clock: ClockType::Wall,
install_source,
}),
Err(e) => warn!("Last check time is in the future: {}", e),
},
Some(PartialComplexTime::Complex(t)) => match now.mono.checked_duration_since(t.mono) {
Some(interval) => self.report_metrics(Metrics::UpdateCheckInterval {
interval,
clock: ClockType::Monotonic,
install_source,
}),
None => error!("Monotonic time in the past"),
},
_ => {}
}
self.context.schedule.last_update_check_time = now.into();
}
async fn start_update_check(
&mut self,
request_params: RequestParams,
co: &mut async_generator::Yield<StateMachineEvent>,
) -> RebootAfterUpdate<IN::InstallResult> {
let apps = self.app_set.lock().await.get_apps();
let result = self.perform_update_check(request_params, apps, co).await;
let (result, reboot_after_update) = match result {
Ok((result, reboot_after_update)) => {
info!("Update check result: {:?}", result);
self.context.schedule.last_update_time = Some(self.time_source.now().into());
let install_success =
result.app_responses.iter().fold(None, |result, app| {
match (result, &app.result) {
(_, update_check::Action::InstallPlanExecutionError) => Some(false),
(None, update_check::Action::Updated) => Some(true),
(result, _) => result,
}
});
self.report_attempts_to_successful_check(true).await;
self.app_set
.lock()
.await
.update_from_omaha(&result.app_responses);
if let Some(success) = install_success {
self.report_attempts_to_successful_install(success).await;
}
(Ok(result), reboot_after_update)
}
Err(error) => {
error!("Update check failed: {:?}", error);
let failure_reason = match &error {
UpdateCheckError::ResponseParser(_) | UpdateCheckError::InstallPlan(_) => {
self.context.schedule.last_update_time =
Some(self.time_source.now().into());
UpdateCheckFailureReason::Omaha
}
UpdateCheckError::OmahaRequest(request_error) => match request_error {
OmahaRequestError::Json(_)
| OmahaRequestError::HttpBuilder(_)
| OmahaRequestError::CupDecoration(_)
| OmahaRequestError::CupValidation(_) => UpdateCheckFailureReason::Internal,
OmahaRequestError::HttpTransport(_) | OmahaRequestError::HttpStatus(_) => {
UpdateCheckFailureReason::Network
}
},
};
self.report_metrics(Metrics::UpdateCheckFailureReason(failure_reason));
self.report_attempts_to_successful_check(false).await;
(Err(error), RebootAfterUpdate::NotNeeded)
}
};
co.yield_(StateMachineEvent::ScheduleChange(self.context.schedule))
.await;
co.yield_(StateMachineEvent::ProtocolStateChange(
self.context.state.clone(),
))
.await;
co.yield_(StateMachineEvent::UpdateCheckResult(result))
.await;
self.persist_data().await;
reboot_after_update
}
async fn report_attempts_to_successful_check(&mut self, success: bool) {
let attempts = self.context.state.consecutive_failed_update_checks + 1;
if success {
self.context.state.consecutive_failed_update_checks = 0;
self.report_metrics(Metrics::AttemptsToSuccessfulCheck(attempts as u64));
} else {
self.context.state.consecutive_failed_update_checks = attempts;
}
}
async fn report_attempts_to_successful_install(&mut self, success: bool) {
let storage_ref = self.storage_ref.clone();
let mut storage = storage_ref.lock().await;
let attempts = storage
.get_int(CONSECUTIVE_FAILED_INSTALL_ATTEMPTS)
.await
.unwrap_or(0)
+ 1;
self.report_metrics(Metrics::AttemptsToSuccessfulInstall {
count: attempts as u64,
successful: success,
});
if success {
storage
.remove_or_log(CONSECUTIVE_FAILED_INSTALL_ATTEMPTS)
.await;
} else if let Err(e) = storage
.set_int(CONSECUTIVE_FAILED_INSTALL_ATTEMPTS, attempts)
.await
{
error!(
"Unable to persist {}: {}",
CONSECUTIVE_FAILED_INSTALL_ATTEMPTS, e
);
}
}
async fn persist_data(&self) {
let mut storage = self.storage_ref.lock().await;
self.context.persist(&mut *storage).await;
self.app_set.lock().await.persist(&mut *storage).await;
storage.commit_or_log().await;
}
async fn perform_update_check(
&mut self,
request_params: RequestParams,
apps: Vec<App>,
co: &mut async_generator::Yield<StateMachineEvent>,
) -> Result<(update_check::Response, RebootAfterUpdate<IN::InstallResult>), UpdateCheckError>
{
Self::yield_state(State::CheckingForUpdates(request_params.source), co).await;
self.report_check_interval(request_params.source).await;
let config = self.config.clone();
let mut request_builder = RequestBuilder::new(&config, &request_params);
for app in &apps {
request_builder = request_builder.add_update_check(app).add_ping(app);
}
let session_id = GUID::new();
request_builder = request_builder.session_id(session_id.clone());
let mut omaha_request_attempt = 1;
let loop_result = loop {
let omaha_check_start_time = self.time_source.now_in_monotonic();
request_builder = request_builder.request_id(GUID::new());
let result = self
.do_omaha_request_and_update_context(&request_builder, co)
.await;
{
let now = self.time_source.now_in_monotonic();
let duration = now.checked_duration_since(omaha_check_start_time);
if let Some(response_time) = duration {
self.report_metrics(Metrics::UpdateCheckResponseTime {
response_time,
successful: result.is_ok(),
});
} else {
error!(
"now: {:?}, is before omaha_check_start_time: {:?}",
now, omaha_check_start_time
);
}
}
match result {
Ok(res) => {
break Ok(res);
}
Err(OmahaRequestError::Json(e)) => {
error!("Unable to construct request body! {:?}", e);
Self::yield_state(State::ErrorCheckingForUpdate, co).await;
break Err(UpdateCheckError::OmahaRequest(e.into()));
}
Err(OmahaRequestError::HttpBuilder(e)) => {
error!("Unable to construct HTTP request! {:?}", e);
Self::yield_state(State::ErrorCheckingForUpdate, co).await;
break Err(UpdateCheckError::OmahaRequest(e.into()));
}
Err(OmahaRequestError::CupDecoration(e)) => {
error!(
"Unable to decorate HTTP request with CUPv2 parameters! {:?}",
e
);
Self::yield_state(State::ErrorCheckingForUpdate, co).await;
break Err(UpdateCheckError::OmahaRequest(e.into()));
}
Err(OmahaRequestError::CupValidation(e)) => {
error!(
"Unable to validate HTTP response with CUPv2 parameters! {:?}",
e
);
Self::yield_state(State::ErrorCheckingForUpdate, co).await;
break Err(UpdateCheckError::OmahaRequest(e.into()));
}
Err(OmahaRequestError::HttpTransport(e)) => {
warn!("Unable to contact Omaha: {:?}", e);
if omaha_request_attempt >= MAX_OMAHA_REQUEST_ATTEMPTS
|| e.is_user()
|| self.context.state.server_dictated_poll_interval.is_some()
{
Self::yield_state(State::ErrorCheckingForUpdate, co).await;
break Err(UpdateCheckError::OmahaRequest(e.into()));
}
}
Err(OmahaRequestError::HttpStatus(e)) => {
warn!("Unable to contact Omaha: {:?}", e);
if omaha_request_attempt >= MAX_OMAHA_REQUEST_ATTEMPTS
|| self.context.state.server_dictated_poll_interval.is_some()
{
Self::yield_state(State::ErrorCheckingForUpdate, co).await;
break Err(UpdateCheckError::OmahaRequest(e.into()));
}
}
}
let backoff_time_secs = 1 << (omaha_request_attempt - 1);
let backoff_time = randomize(backoff_time_secs * 1000, 1000);
info!("Waiting {} ms before retrying...", backoff_time);
self.timer
.wait_for(Duration::from_millis(backoff_time))
.await;
omaha_request_attempt += 1;
};
self.report_metrics(Metrics::RequestsPerCheck {
count: omaha_request_attempt,
successful: loop_result.is_ok(),
});
let (_parts, data, request_metadata, signature) = loop_result?;
let response = match Self::parse_omaha_response(&data) {
Ok(res) => res,
Err(err) => {
warn!("Unable to parse Omaha response: {:?}", err);
Self::yield_state(State::ErrorCheckingForUpdate, co).await;
self.report_omaha_event_and_update_context(
&request_params,
Event::error(EventErrorCode::ParseResponse),
&apps,
&session_id,
&apps.iter().map(|app| (app.id.clone(), None)).collect(),
None,
co,
)
.await;
return Err(UpdateCheckError::ResponseParser(err));
}
};
info!("result: {:?}", response);
co.yield_(StateMachineEvent::OmahaServerResponse(response.clone()))
.await;
let statuses = Self::get_app_update_statuses(&response);
for (app_id, status) in &statuses {
info!("Omaha update check status: {} => {:?}", app_id, status);
}
let apps_with_update: Vec<_> = response
.apps
.iter()
.filter(|app| {
matches!(
app.update_check,
Some(UpdateCheck {
status: OmahaStatus::Ok,
..
})
)
})
.collect();
if apps_with_update.is_empty() {
Self::yield_state(State::NoUpdateAvailable, co).await;
Self::make_not_updated_result(response, update_check::Action::NoUpdate)
} else {
info!(
"At least one app has an update, proceeding to build and process an Install Plan"
);
let next_versions: HashMap<String, Option<String>> = apps_with_update
.iter()
.map(|app| (app.id.clone(), app.get_manifest_version()))
.collect();
let install_plan = match self
.installer
.try_create_install_plan(
&request_params,
request_metadata.as_ref(),
&response,
data,
signature.map(|s| s.as_bytes().to_vec()),
)
.await
{
Ok(plan) => plan,
Err(e) => {
error!("Unable to construct install plan! {}", e);
Self::yield_state(State::InstallingUpdate, co).await;
Self::yield_state(State::InstallationError, co).await;
self.report_omaha_event_and_update_context(
&request_params,
Event::error(EventErrorCode::ConstructInstallPlan),
&apps,
&session_id,
&next_versions,
None,
co,
)
.await;
return Err(UpdateCheckError::InstallPlan(e.into()));
}
};
info!("Validating Install Plan with Policy");
let install_plan_decision = self.policy_engine.update_can_start(&install_plan).await;
match install_plan_decision {
UpdateDecision::Ok => {
info!("Proceeding with install plan.");
}
UpdateDecision::DeferredByPolicy => {
info!("Install plan was deferred by Policy.");
let event = Event {
event_type: EventType::UpdateComplete,
event_result: EventResult::UpdateDeferred,
..Event::default()
};
self.report_omaha_event_and_update_context(
&request_params,
event,
&apps,
&session_id,
&next_versions,
None,
co,
)
.await;
Self::yield_state(State::InstallationDeferredByPolicy, co).await;
return Self::make_not_updated_result(
response,
update_check::Action::DeferredByPolicy,
);
}
UpdateDecision::DeniedByPolicy => {
warn!("Install plan was denied by Policy, see Policy logs for reasoning");
self.report_omaha_event_and_update_context(
&request_params,
Event::error(EventErrorCode::DeniedByPolicy),
&apps,
&session_id,
&next_versions,
None,
co,
)
.await;
return Self::make_not_updated_result(
response,
update_check::Action::DeniedByPolicy,
);
}
}
Self::yield_state(State::InstallingUpdate, co).await;
self.report_omaha_event_and_update_context(
&request_params,
Event::success(EventType::UpdateDownloadStarted),
&apps,
&session_id,
&next_versions,
None,
co,
)
.await;
let install_plan_id = install_plan.id();
let update_start_time = self.time_source.now_in_walltime();
let update_first_seen_time = self
.record_update_first_seen_time(&install_plan_id, update_start_time)
.await;
let (send, mut recv) = mpsc::channel(0);
let observer = StateMachineProgressObserver(send);
let perform_install = async {
let result = self
.installer
.perform_install(&install_plan, Some(&observer))
.await;
drop(observer);
result
};
let yield_progress = async {
while let Some(progress) = recv.next().await {
co.yield_(StateMachineEvent::InstallProgressChange(progress))
.await;
}
};
let ((install_result, mut app_install_results), ()) =
future::join(perform_install, yield_progress).await;
let no_apps_failed = app_install_results.iter().all(|result| {
matches!(
result,
AppInstallResult::Installed | AppInstallResult::Deferred
)
});
let update_finish_time = self.time_source.now_in_walltime();
let install_duration = match update_finish_time.duration_since(update_start_time) {
Ok(duration) => {
let metrics = if no_apps_failed {
Metrics::SuccessfulUpdateDuration(duration)
} else {
Metrics::FailedUpdateDuration(duration)
};
self.report_metrics(metrics);
Some(duration)
}
Err(e) => {
warn!("Update start time is in the future: {}", e);
None
}
};
let config = self.config.clone();
let mut request_builder = RequestBuilder::new(&config, &request_params);
let mut events = vec![];
let mut installed_apps = vec![];
for (response_app, app_install_result) in
apps_with_update.iter().zip(&app_install_results)
{
match apps.iter().find(|app| app.id == response_app.id) {
Some(app) => {
let event = match app_install_result {
AppInstallResult::Installed => {
installed_apps.push(app);
Event::success(EventType::UpdateDownloadFinished)
}
AppInstallResult::Deferred => Event {
event_type: EventType::UpdateComplete,
event_result: EventResult::UpdateDeferred,
..Event::default()
},
AppInstallResult::Failed(_) => {
Event::error(EventErrorCode::Installation)
}
};
let event = Event {
previous_version: Some(app.version.to_string()),
next_version: response_app.get_manifest_version(),
download_time_ms: install_duration
.and_then(|d| d.as_millis().try_into().ok()),
..event
};
request_builder = request_builder.add_event(app, event.clone());
events.push(event);
}
None => {
error!("unknown app id in omaha response: {:?}", response_app.id);
}
}
}
request_builder = request_builder
.session_id(session_id.clone())
.request_id(GUID::new());
if let Err(e) = self
.do_omaha_request_and_update_context(&request_builder, co)
.await
{
for event in events {
self.report_metrics(Metrics::OmahaEventLost(event));
}
warn!("Unable to report event to Omaha: {:?}", e);
}
if !installed_apps.is_empty() {
self.report_omaha_event_and_update_context(
&request_params,
Event::success(EventType::UpdateComplete),
installed_apps,
&session_id,
&next_versions,
install_duration,
co,
)
.await;
}
let mut errors = vec![];
let daystart = response.daystart;
let app_responses = response
.apps
.into_iter()
.map(|app| update_check::AppResponse {
app_id: app.id,
cohort: app.cohort,
user_counting: daystart.clone().into(),
result: match app.update_check {
Some(UpdateCheck {
status: OmahaStatus::Ok,
..
}) => match app_install_results.remove(0) {
AppInstallResult::Installed => update_check::Action::Updated,
AppInstallResult::Deferred => update_check::Action::DeferredByPolicy,
AppInstallResult::Failed(e) => {
errors.push(e);
update_check::Action::InstallPlanExecutionError
}
},
_ => update_check::Action::NoUpdate,
},
})
.collect();
if !errors.is_empty() {
for e in errors {
co.yield_(StateMachineEvent::InstallerError(Some(Box::new(e))))
.await;
}
Self::yield_state(State::InstallationError, co).await;
return Ok((
update_check::Response { app_responses },
RebootAfterUpdate::NotNeeded,
));
}
match update_finish_time.duration_since(update_first_seen_time) {
Ok(duration) => {
self.report_metrics(Metrics::SuccessfulUpdateFromFirstSeen(duration))
}
Err(e) => warn!("Update first seen time is in the future: {}", e),
}
{
let mut storage = self.storage_ref.lock().await;
if let Err(e) = storage
.set_time(UPDATE_FINISH_TIME, update_finish_time)
.await
{
error!("Unable to persist {}: {}", UPDATE_FINISH_TIME, e);
}
let app_set = self.app_set.lock().await;
let system_app_id = app_set.get_system_app_id();
if let Some(next_version) = next_versions.get(system_app_id) {
let target_version = next_version.as_deref().unwrap_or_else(|| {
error!("Target version string not found in Omaha response.");
"UNKNOWN"
});
if let Err(e) = storage.set_string(TARGET_VERSION, target_version).await {
error!("Unable to persist {}: {}", TARGET_VERSION, e);
}
}
storage.commit_or_log().await;
}
let reboot_after_update = if self.policy_engine.reboot_needed(&install_plan).await {
RebootAfterUpdate::Needed(install_result)
} else {
RebootAfterUpdate::NotNeeded
};
Ok((
update_check::Response { app_responses },
reboot_after_update,
))
}
}
#[allow(clippy::too_many_arguments)]
async fn report_omaha_event_and_update_context<'a>(
&'a mut self,
request_params: &'a RequestParams,
event: Event,
apps: impl IntoIterator<Item = &App>,
session_id: &GUID,
next_versions: &HashMap<String, Option<String>>,
install_duration: Option<Duration>,
co: &mut async_generator::Yield<StateMachineEvent>,
) {
let config = self.config.clone();
let mut request_builder = RequestBuilder::new(&config, request_params);
for app in apps {
if let Some(next_version) = next_versions.get(&app.id) {
let event = Event {
previous_version: Some(app.version.to_string()),
next_version: next_version.clone(),
download_time_ms: install_duration.and_then(|d| d.as_millis().try_into().ok()),
..event.clone()
};
request_builder = request_builder.add_event(app, event);
}
}
request_builder = request_builder
.session_id(session_id.clone())
.request_id(GUID::new());
if let Err(e) = self
.do_omaha_request_and_update_context(&request_builder, co)
.await
{
self.report_metrics(Metrics::OmahaEventLost(event));
warn!("Unable to report event to Omaha: {:?}", e);
}
}
async fn ping_omaha(&mut self, co: &mut async_generator::Yield<StateMachineEvent>) {
let apps = self.app_set.lock().await.get_apps();
let request_params = RequestParams {
source: InstallSource::ScheduledTask,
use_configured_proxies: true,
disable_updates: false,
offer_update_if_same_version: false,
};
let config = self.config.clone();
let mut request_builder = RequestBuilder::new(&config, &request_params);
for app in &apps {
request_builder = request_builder.add_ping(app);
}
request_builder = request_builder
.session_id(GUID::new())
.request_id(GUID::new());
let (_parts, data, _request_metadata, _signature) = match self
.do_omaha_request_and_update_context(&request_builder, co)
.await
{
Ok(res) => res,
Err(e) => {
error!("Ping Omaha failed: {:#}", anyhow!(e));
self.context.state.consecutive_failed_update_checks += 1;
self.persist_data().await;
return;
}
};
let response = match Self::parse_omaha_response(&data) {
Ok(res) => res,
Err(e) => {
error!("Unable to parse Omaha response: {:#}", anyhow!(e));
self.context.state.consecutive_failed_update_checks += 1;
self.persist_data().await;
return;
}
};
self.context.state.consecutive_failed_update_checks = 0;
self.context.schedule.last_update_time = Some(self.time_source.now().into());
co.yield_(StateMachineEvent::ScheduleChange(self.context.schedule))
.await;
let app_responses = Self::make_app_responses(response, update_check::Action::NoUpdate);
self.app_set.lock().await.update_from_omaha(&app_responses);
self.persist_data().await;
}
async fn do_omaha_request_and_update_context<'a>(
&'a mut self,
builder: &RequestBuilder<'a>,
co: &mut async_generator::Yield<StateMachineEvent>,
) -> Result<
(
Parts,
Vec<u8>,
Option<RequestMetadata>,
Option<DerSignature>,
),
OmahaRequestError,
> {
let (request, request_metadata) = builder.build(self.cup_handler.as_ref())?;
let response = Self::make_request(&mut self.http, request).await?;
let signature: Option<DerSignature> = if let (Some(handler), Some(metadata)) =
(self.cup_handler.as_ref(), &request_metadata)
{
let signature = handler
.verify_response(metadata, &response, metadata.public_key_id)
.map_err(|e| {
error!("Could not verify response: {:?}", e);
e
})?;
Some(signature)
} else {
None
};
let (parts, body) = response.into_parts();
let server_dictated_poll_interval = parts.headers.get(X_RETRY_AFTER).and_then(|header| {
match header
.to_str()
.map_err(|e| anyhow!(e))
.and_then(|s| s.parse::<u64>().map_err(|e| anyhow!(e)))
{
Ok(seconds) => {
Some(Duration::from_secs(min(seconds, 86400)))
}
Err(e) => {
error!("Unable to parse {} header: {:#}", X_RETRY_AFTER, e);
None
}
}
});
if self.context.state.server_dictated_poll_interval != server_dictated_poll_interval {
self.context.state.server_dictated_poll_interval = server_dictated_poll_interval;
co.yield_(StateMachineEvent::ProtocolStateChange(
self.context.state.clone(),
))
.await;
let mut storage = self.storage_ref.lock().await;
self.context.persist(&mut *storage).await;
storage.commit_or_log().await;
}
if !parts.status.is_success() {
Err(OmahaRequestError::HttpStatus(parts.status))
} else {
info!("Omaha HTTP response: {}", parts.status);
Ok((parts, body, request_metadata, signature))
}
}
async fn make_request(
http_client: &mut HR,
request: http::Request<hyper::Body>,
) -> Result<HttpResponse<Vec<u8>>, http_request::Error> {
info!("Making http request to: {}", request.uri());
http_client.request(request).await.map_err(|err| {
warn!("Unable to perform request: {}", err);
err
})
}
fn parse_omaha_response(data: &[u8]) -> Result<Response, ResponseParseError> {
parse_json_response(data).map_err(ResponseParseError::Json)
}
fn get_app_update_statuses(response: &Response) -> Vec<(&str, &OmahaStatus)> {
response
.apps
.iter()
.filter_map(|app| {
app.update_check
.as_ref()
.map(|u| (app.id.as_str(), &u.status))
})
.collect()
}
fn make_app_responses(
response: protocol::response::Response,
action: update_check::Action,
) -> Vec<update_check::AppResponse> {
let daystart = response.daystart;
response
.apps
.into_iter()
.map(|app| update_check::AppResponse {
app_id: app.id,
cohort: app.cohort,
user_counting: daystart.clone().into(),
result: action.clone(),
})
.collect()
}
fn make_not_updated_result(
response: protocol::response::Response,
action: update_check::Action,
) -> Result<(update_check::Response, RebootAfterUpdate<IN::InstallResult>), UpdateCheckError>
{
Ok((
update_check::Response {
app_responses: Self::make_app_responses(response, action),
},
RebootAfterUpdate::NotNeeded,
))
}
async fn yield_state(state: State, co: &mut async_generator::Yield<StateMachineEvent>) {
co.yield_(StateMachineEvent::StateChange(state)).await;
}
fn report_metrics(&mut self, metrics: Metrics) {
if let Err(err) = self.metrics_reporter.report_metrics(metrics) {
warn!("Unable to report metrics: {:?}", err);
}
}
async fn record_update_first_seen_time(
&mut self,
install_plan_id: &str,
now: SystemTime,
) -> SystemTime {
let mut storage = self.storage_ref.lock().await;
let previous_id = storage.get_string(INSTALL_PLAN_ID).await;
if let Some(previous_id) = previous_id {
if previous_id == install_plan_id {
return storage
.get_time(UPDATE_FIRST_SEEN_TIME)
.await
.unwrap_or(now);
}
}
if let Err(e) = storage.set_string(INSTALL_PLAN_ID, install_plan_id).await {
error!("Unable to persist {}: {}", INSTALL_PLAN_ID, e);
return now;
}
if let Err(e) = storage.set_time(UPDATE_FIRST_SEEN_TIME, now).await {
error!("Unable to persist {}: {}", UPDATE_FIRST_SEEN_TIME, e);
let _ = storage.remove(INSTALL_PLAN_ID).await;
return now;
}
storage.commit_or_log().await;
now
}
}
fn randomize(n: u64, range: u64) -> u64 {
n - range / 2 + rand::random::<u64>() % range
}
#[cfg(test)]
impl<PE, HR, IN, TM, MR, ST, AS, IR, PL, CH> StateMachine<PE, HR, IN, TM, MR, ST, AS, CH>
where
PE: PolicyEngine<InstallResult = IR, InstallPlan = PL>,
HR: HttpRequest,
IN: Installer<InstallResult = IR, InstallPlan = PL>,
TM: Timer,
MR: MetricsReporter,
ST: Storage,
AS: AppSet,
CH: Cupv2Handler,
IR: 'static + Send,
PL: Plan,
{
async fn oneshot(
&mut self,
request_params: RequestParams,
) -> Result<(update_check::Response, RebootAfterUpdate<IN::InstallResult>), UpdateCheckError>
{
let apps = self.app_set.lock().await.get_apps();
async_generator::generate(move |mut co| async move {
self.perform_update_check(request_params, apps, &mut co)
.await
})
.into_complete()
.await
}
async fn run_once(&mut self) {
let request_params = RequestParams::default();
async_generator::generate(move |mut co| async move {
self.start_update_check(request_params, &mut co).await;
})
.map(|_| ())
.collect::<()>()
.await;
}
}
#[cfg(test)]
mod tests {
use super::update_check::{
Action, CONSECUTIVE_FAILED_UPDATE_CHECKS, LAST_UPDATE_TIME, SERVER_DICTATED_POLL_INTERVAL,
};
use super::*;
use crate::{
app_set::VecAppSet,
common::{
App, CheckOptions, PersistedApp, ProtocolState, UpdateCheckSchedule, UserCounting,
},
configuration::Updater,
cup_ecdsa::test_support::{make_cup_handler_for_test, MockCupv2Handler},
http_request::mock::MockHttpRequest,
installer::{
stub::{StubInstallErrors, StubInstaller, StubPlan},
ProgressObserver,
},
metrics::MockMetricsReporter,
policy::{MockPolicyEngine, StubPolicyEngine},
protocol::{request::OS, response, Cohort},
storage::MemStorage,
time::{
timers::{BlockingTimer, MockTimer, RequestedWait},
MockTimeSource, PartialComplexTime,
},
version::Version,
};
use assert_matches::assert_matches;
use futures::executor::{block_on, LocalPool};
use futures::future::LocalBoxFuture;
use futures::task::LocalSpawnExt;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::cell::RefCell;
use std::time::Duration;
use tracing::info;
fn make_test_app_set() -> Rc<Mutex<VecAppSet>> {
Rc::new(Mutex::new(VecAppSet::new(vec![App::builder()
.id("{00000000-0000-0000-0000-000000000001}")
.version([1, 2, 3, 4])
.cohort(Cohort::new("stable-channel"))
.build()])))
}
fn make_update_available_response() -> HttpResponse<Vec<u8>> {
let response = json!({"response":{
"server": "prod",
"protocol": "3.0",
"app": [{
"appid": "{00000000-0000-0000-0000-000000000001}",
"status": "ok",
"updatecheck": {
"status": "ok"
}
}],
}});
HttpResponse::new(serde_json::to_vec(&response).unwrap())
}
fn make_noupdate_httpresponse() -> Vec<u8> {
serde_json::to_vec(
&(json!({"response":{
"server": "prod",
"protocol": "3.0",
"app": [{
"appid": "{00000000-0000-0000-0000-000000000001}",
"status": "ok",
"updatecheck": {
"status": "noupdate"
}
}]
}})),
)
.unwrap()
}
async fn assert_request<'a>(http: &MockHttpRequest, request_builder: RequestBuilder<'a>) {
let cup_handler = make_cup_handler_for_test();
let (request, _request_metadata) = request_builder.build(Some(&cup_handler)).unwrap();
let body = hyper::body::to_bytes(request).await.unwrap();
let body_str = String::from_utf8_lossy(&body);
http.assert_body_str(&body_str).await;
}
#[test]
fn run_simple_check_with_noupdate_result() {
block_on(async {
let http = MockHttpRequest::new(HttpResponse::new(make_noupdate_httpresponse()));
StateMachineBuilder::new_stub()
.http(http)
.oneshot(RequestParams::default())
.await
.unwrap();
info!("update check complete!");
});
}
#[test]
fn test_cohort_returned_with_noupdate_result() {
block_on(async {
let response = json!({"response":{
"server": "prod",
"protocol": "3.0",
"app": [{
"appid": "{00000000-0000-0000-0000-000000000001}",
"status": "ok",
"cohort": "1",
"cohortname": "stable-channel",
"updatecheck": {
"status": "noupdate"
}
}]
}});
let response = serde_json::to_vec(&response).unwrap();
let http = MockHttpRequest::new(HttpResponse::new(response));
let (response, reboot_after_update) = StateMachineBuilder::new_stub()
.http(http)
.oneshot(RequestParams::default())
.await
.unwrap();
assert_eq!(
"{00000000-0000-0000-0000-000000000001}",
response.app_responses[0].app_id
);
assert_eq!(Some("1".into()), response.app_responses[0].cohort.id);
assert_eq!(
Some("stable-channel".into()),
response.app_responses[0].cohort.name
);
assert_eq!(None, response.app_responses[0].cohort.hint);
assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
});
}
#[test]
fn test_cohort_returned_with_update_result() {
block_on(async {
let response = json!({"response":{
"server": "prod",
"protocol": "3.0",
"app": [{
"appid": "{00000000-0000-0000-0000-000000000001}",
"status": "ok",
"cohort": "1",
"cohortname": "stable-channel",
"updatecheck": {
"status": "ok"
}
}]
}});
let response = serde_json::to_vec(&response).unwrap();
let http = MockHttpRequest::new(HttpResponse::new(response));
let (response, reboot_after_update) = StateMachineBuilder::new_stub()
.http(http)
.oneshot(RequestParams::default())
.await
.unwrap();
assert_eq!(
"{00000000-0000-0000-0000-000000000001}",
response.app_responses[0].app_id
);
assert_eq!(Some("1".into()), response.app_responses[0].cohort.id);
assert_eq!(
Some("stable-channel".into()),
response.app_responses[0].cohort.name
);
assert_eq!(None, response.app_responses[0].cohort.hint);
assert_matches!(reboot_after_update, RebootAfterUpdate::Needed(()));
});
}
#[test]
fn test_report_parse_response_error() {
block_on(async {
let http = MockHttpRequest::new(HttpResponse::new("invalid response".into()));
let mut state_machine = StateMachineBuilder::new_stub().http(http).build().await;
let response = state_machine.oneshot(RequestParams::default()).await;
assert_matches!(response, Err(UpdateCheckError::ResponseParser(_)));
let request_params = RequestParams::default();
let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
let event = Event {
previous_version: Some("1.2.3.4".to_string()),
..Event::error(EventErrorCode::ParseResponse)
};
let apps = state_machine.app_set.lock().await.get_apps();
request_builder = request_builder
.add_event(&apps[0], event)
.session_id(GUID::from_u128(0))
.request_id(GUID::from_u128(2));
assert_request(&state_machine.http, request_builder).await;
});
}
#[test]
fn test_report_construct_install_plan_error() {
block_on(async {
let response = json!({"response":{
"server": "prod",
"protocol": "4.0",
"app": [{
"appid": "{00000000-0000-0000-0000-000000000001}",
"status": "ok",
"updatecheck": {
"status": "ok"
}
}],
}});
let response = serde_json::to_vec(&response).unwrap();
let http = MockHttpRequest::new(HttpResponse::new(response));
let mut state_machine = StateMachineBuilder::new_stub().http(http).build().await;
let response = state_machine.oneshot(RequestParams::default()).await;
assert_matches!(response, Err(UpdateCheckError::InstallPlan(_)));
let request_params = RequestParams::default();
let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
let event = Event {
previous_version: Some("1.2.3.4".to_string()),
..Event::error(EventErrorCode::ConstructInstallPlan)
};
let apps = state_machine.app_set.lock().await.get_apps();
request_builder = request_builder
.add_event(&apps[0], event)
.session_id(GUID::from_u128(0))
.request_id(GUID::from_u128(2));
assert_request(&state_machine.http, request_builder).await;
});
}
#[test]
fn test_report_installation_error() {
block_on(async {
let response = json!({"response":{
"server": "prod",
"protocol": "3.0",
"app": [{
"appid": "{00000000-0000-0000-0000-000000000001}",
"status": "ok",
"updatecheck": {
"status": "ok",
"manifest": {
"version": "5.6.7.8",
"actions": {
"action": [],
},
"packages": {
"package": [],
},
}
}
}],
}});
let response = serde_json::to_vec(&response).unwrap();
let http = MockHttpRequest::new(HttpResponse::new(response));
let mut state_machine = StateMachineBuilder::new_stub()
.http(http)
.installer(StubInstaller { should_fail: true })
.build()
.await;
let (response, reboot_after_update) = state_machine
.oneshot(RequestParams::default())
.await
.unwrap();
assert_eq!(
Action::InstallPlanExecutionError,
response.app_responses[0].result
);
assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
let request_params = RequestParams::default();
let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
let event = Event {
previous_version: Some("1.2.3.4".to_string()),
next_version: Some("5.6.7.8".to_string()),
download_time_ms: Some(0),
..Event::error(EventErrorCode::Installation)
};
let apps = state_machine.app_set.lock().await.get_apps();
request_builder = request_builder
.add_event(&apps[0], event)
.session_id(GUID::from_u128(0))
.request_id(GUID::from_u128(3));
assert_request(&state_machine.http, request_builder).await;
});
}
#[test]
fn test_report_installation_error_multi_app() {
block_on(async {
let response = json!({"response":{
"server": "prod",
"protocol": "3.0",
"app": [{
"appid": "appid_3",
"status": "ok",
"updatecheck": {
"status": "ok",
"manifest": {
"version": "5.6.7.8",
"actions": {
"action": [],
},
"packages": {
"package": [],
},
}
}
},{
"appid": "appid_1",
"status": "ok",
"updatecheck": {
"status": "ok",
"manifest": {
"version": "1.2.3.4",
"actions": {
"action": [],
},
"packages": {
"package": [],
},
}
}
},{
"appid": "appid_2",
"status": "ok",
"updatecheck": {
"status": "noupdate",
}
}],
}});
let response = serde_json::to_vec(&response).unwrap();
let mut http = MockHttpRequest::new(HttpResponse::new(response));
http.add_response(HttpResponse::new(vec![]));
let app_set = VecAppSet::new(vec![
App::builder().id("appid_1").version([1, 2, 3, 3]).build(),
App::builder().id("appid_2").version([9, 9, 9, 9]).build(),
App::builder().id("appid_3").version([5, 6, 7, 7]).build(),
]);
let app_set = Rc::new(Mutex::new(app_set));
let (send_install, mut recv_install) = mpsc::channel(0);
let mut state_machine = StateMachineBuilder::new_stub()
.app_set(Rc::clone(&app_set))
.http(http)
.installer(BlockingInstaller {
on_install: send_install,
on_reboot: None,
})
.build()
.await;
let recv_install_fut = async move {
let unblock_install = recv_install.next().await.unwrap();
unblock_install
.send(vec![
AppInstallResult::Deferred,
AppInstallResult::Installed,
])
.unwrap();
};
let (oneshot_result, ()) = future::join(
state_machine.oneshot(RequestParams::default()),
recv_install_fut,
)
.await;
let (response, reboot_after_update) = oneshot_result.unwrap();
assert_eq!("appid_3", response.app_responses[0].app_id);
assert_eq!(Action::DeferredByPolicy, response.app_responses[0].result);
assert_eq!("appid_1", response.app_responses[1].app_id);
assert_eq!(Action::Updated, response.app_responses[1].result);
assert_eq!("appid_2", response.app_responses[2].app_id);
assert_eq!(Action::NoUpdate, response.app_responses[2].result);
assert_matches!(reboot_after_update, RebootAfterUpdate::Needed(()));
let request_params = RequestParams::default();
let apps = app_set.lock().await.get_apps();
let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
let event = Event {
previous_version: Some("1.2.3.3".to_string()),
next_version: Some("1.2.3.4".to_string()),
download_time_ms: Some(0),
..Event::success(EventType::UpdateComplete)
};
request_builder = request_builder
.add_event(&apps[0], event)
.session_id(GUID::from_u128(0))
.request_id(GUID::from_u128(4));
assert_request(&state_machine.http, request_builder).await;
let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
let event1 = Event {
previous_version: Some("1.2.3.3".to_string()),
next_version: Some("1.2.3.4".to_string()),
download_time_ms: Some(0),
..Event::success(EventType::UpdateDownloadFinished)
};
let event2 = Event {
previous_version: Some("5.6.7.7".to_string()),
next_version: Some("5.6.7.8".to_string()),
download_time_ms: Some(0),
event_type: EventType::UpdateComplete,
event_result: EventResult::UpdateDeferred,
..Event::default()
};
request_builder = request_builder
.add_event(&apps[2], event2)
.add_event(&apps[0], event1)
.session_id(GUID::from_u128(0))
.request_id(GUID::from_u128(3));
assert_request(&state_machine.http, request_builder).await;
});
}
#[test]
fn test_observe_installation_error() {
block_on(async {
let http = MockHttpRequest::new(make_update_available_response());
let actual_errors = StateMachineBuilder::new_stub()
.http(http)
.installer(StubInstaller { should_fail: true })
.oneshot_check()
.await
.filter_map(|event| {
future::ready(match event {
StateMachineEvent::InstallerError(Some(e)) => {
Some(*e.downcast::<StubInstallErrors>().unwrap())
}
_ => None,
})
})
.collect::<Vec<StubInstallErrors>>()
.await;
let expected_errors = vec![StubInstallErrors::Failed];
assert_eq!(actual_errors, expected_errors);
});
}
#[test]
fn test_report_deferred_by_policy() {
block_on(async {
let http = MockHttpRequest::new(make_update_available_response());
let policy_engine = MockPolicyEngine {
update_decision: UpdateDecision::DeferredByPolicy,
..MockPolicyEngine::default()
};
let mut state_machine = StateMachineBuilder::new_stub()
.policy_engine(policy_engine)
.http(http)
.build()
.await;
let (response, reboot_after_update) = state_machine
.oneshot(RequestParams::default())
.await
.unwrap();
assert_eq!(Action::DeferredByPolicy, response.app_responses[0].result);
assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
let request_params = RequestParams::default();
let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
let event = Event {
event_type: EventType::UpdateComplete,
event_result: EventResult::UpdateDeferred,
previous_version: Some("1.2.3.4".to_string()),
..Event::default()
};
let apps = state_machine.app_set.lock().await.get_apps();
request_builder = request_builder
.add_event(&apps[0], event)
.session_id(GUID::from_u128(0))
.request_id(GUID::from_u128(2));
assert_request(&state_machine.http, request_builder).await;
});
}
#[test]
fn test_report_denied_by_policy() {
block_on(async {
let response = make_update_available_response();
let http = MockHttpRequest::new(response);
let policy_engine = MockPolicyEngine {
update_decision: UpdateDecision::DeniedByPolicy,
..MockPolicyEngine::default()
};
let mut state_machine = StateMachineBuilder::new_stub()
.policy_engine(policy_engine)
.http(http)
.build()
.await;
let (response, reboot_after_update) = state_machine
.oneshot(RequestParams::default())
.await
.unwrap();
assert_eq!(Action::DeniedByPolicy, response.app_responses[0].result);
assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
let request_params = RequestParams::default();
let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
let event = Event {
previous_version: Some("1.2.3.4".to_string()),
..Event::error(EventErrorCode::DeniedByPolicy)
};
let apps = state_machine.app_set.lock().await.get_apps();
request_builder = request_builder
.add_event(&apps[0], event)
.session_id(GUID::from_u128(0))
.request_id(GUID::from_u128(2));
assert_request(&state_machine.http, request_builder).await;
});
}
#[test]
fn test_wait_timer() {
let mut pool = LocalPool::new();
let mock_time = MockTimeSource::new_from_now();
let next_update_time = mock_time.now() + Duration::from_secs(111);
let (timer, mut timers) = BlockingTimer::new();
let policy_engine = MockPolicyEngine {
check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
time_source: mock_time,
..MockPolicyEngine::default()
};
let (_ctl, state_machine) = pool.run_until(
StateMachineBuilder::new_stub()
.policy_engine(policy_engine)
.timer(timer)
.start(),
);
pool.spawner()
.spawn_local(state_machine.map(|_| ()).collect())
.unwrap();
let blocked_timer = pool.run_until(timers.next()).unwrap();
assert_eq!(
blocked_timer.requested_wait(),
RequestedWait::Until(next_update_time.into())
);
}
#[test]
fn test_cohort_and_user_counting_updates_are_used_in_subsequent_requests() {
block_on(async {
let response = json!({"response":{
"server": "prod",
"protocol": "3.0",
"daystart": {
"elapsed_days": 1234567,
"elapsed_seconds": 3645
},
"app": [{
"appid": "{00000000-0000-0000-0000-000000000001}",
"status": "ok",
"cohort": "1",
"cohortname": "stable-channel",
"updatecheck": {
"status": "noupdate"
}
}]
}});
let response = serde_json::to_vec(&response).unwrap();
let mut http = MockHttpRequest::new(HttpResponse::new(response.clone()));
http.add_response(HttpResponse::new(response));
let apps = make_test_app_set();
let mut state_machine = StateMachineBuilder::new_stub()
.http(http)
.app_set(apps.clone())
.build()
.await;
state_machine.run_once().await;
let apps = apps.lock().await.get_apps();
assert_eq!(Some("1".to_string()), apps[0].cohort.id);
assert_eq!(None, apps[0].cohort.hint);
assert_eq!(Some("stable-channel".to_string()), apps[0].cohort.name);
assert_eq!(
UserCounting::ClientRegulatedByDate(Some(1234567)),
apps[0].user_counting
);
state_machine.run_once().await;
let request_params = RequestParams::default();
let expected_request_builder =
RequestBuilder::new(&state_machine.config, &request_params)
.add_update_check(&apps[0])
.add_ping(&apps[0])
.session_id(GUID::from_u128(2))
.request_id(GUID::from_u128(3));
assert_request(&state_machine.http, expected_request_builder).await;
});
}
#[test]
fn test_user_counting_returned() {
block_on(async {
let response = json!({"response":{
"server": "prod",
"protocol": "3.0",
"daystart": {
"elapsed_days": 1234567,
"elapsed_seconds": 3645
},
"app": [{
"appid": "{00000000-0000-0000-0000-000000000001}",
"status": "ok",
"cohort": "1",
"cohortname": "stable-channel",
"updatecheck": {
"status": "noupdate"
}
}]
}});
let response = serde_json::to_vec(&response).unwrap();
let http = MockHttpRequest::new(HttpResponse::new(response));
let (response, reboot_after_update) = StateMachineBuilder::new_stub()
.http(http)
.oneshot(RequestParams::default())
.await
.unwrap();
assert_eq!(
UserCounting::ClientRegulatedByDate(Some(1234567)),
response.app_responses[0].user_counting
);
assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
});
}
#[test]
fn test_observe_state() {
block_on(async {
let actual_states = StateMachineBuilder::new_stub()
.oneshot_check()
.await
.filter_map(|event| {
future::ready(match event {
StateMachineEvent::StateChange(state) => Some(state),
_ => None,
})
})
.collect::<Vec<State>>()
.await;
let expected_states = vec![
State::CheckingForUpdates(InstallSource::ScheduledTask),
State::ErrorCheckingForUpdate,
];
assert_eq!(actual_states, expected_states);
});
}
#[test]
fn test_observe_schedule() {
block_on(async {
let mock_time = MockTimeSource::new_from_now();
let actual_schedules = StateMachineBuilder::new_stub()
.policy_engine(StubPolicyEngine::new(&mock_time))
.oneshot_check()
.await
.filter_map(|event| {
future::ready(match event {
StateMachineEvent::ScheduleChange(schedule) => Some(schedule),
_ => None,
})
})
.collect::<Vec<UpdateCheckSchedule>>()
.await;
let expected_schedule = UpdateCheckSchedule::builder()
.last_update_time(mock_time.now())
.last_update_check_time(mock_time.now())
.build();
assert_eq!(actual_schedules, vec![expected_schedule]);
});
}
#[test]
fn test_observe_protocol_state() {
block_on(async {
let actual_protocol_states = StateMachineBuilder::new_stub()
.oneshot_check()
.await
.filter_map(|event| {
future::ready(match event {
StateMachineEvent::ProtocolStateChange(state) => Some(state),
_ => None,
})
})
.collect::<Vec<ProtocolState>>()
.await;
let expected_protocol_state = ProtocolState {
consecutive_failed_update_checks: 1,
..ProtocolState::default()
};
assert_eq!(actual_protocol_states, vec![expected_protocol_state]);
});
}
#[test]
fn test_observe_omaha_server_response() {
block_on(async {
let response = json!({"response":{
"server": "prod",
"protocol": "3.0",
"app": [{
"appid": "{00000000-0000-0000-0000-000000000001}",
"status": "ok",
"cohort": "1",
"cohortname": "stable-channel",
"updatecheck": {
"status": "noupdate"
}
}]
}});
let response = serde_json::to_vec(&response).unwrap();
let expected_omaha_response = response::parse_json_response(&response).unwrap();
let http = MockHttpRequest::new(HttpResponse::new(response));
let actual_omaha_response = StateMachineBuilder::new_stub()
.http(http)
.oneshot_check()
.await
.filter_map(|event| {
future::ready(match event {
StateMachineEvent::OmahaServerResponse(response) => Some(response),
_ => None,
})
})
.collect::<Vec<response::Response>>()
.await;
assert_eq!(actual_omaha_response, vec![expected_omaha_response]);
});
}
#[test]
fn test_metrics_report_omaha_event_lost() {
block_on(async {
let http = MockHttpRequest::new(HttpResponse::new("invalid response".into()));
let mut metrics_reporter = MockMetricsReporter::new();
let _response = StateMachineBuilder::new_stub()
.http(http)
.metrics_reporter(&mut metrics_reporter)
.oneshot(RequestParams::default())
.await;
#[rustfmt::skip]
assert_matches!(
metrics_reporter.metrics.as_slice(),
[
Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
Metrics::RequestsPerCheck { count: 1, successful: true },
Metrics::OmahaEventLost(Event {
event_type: EventType::UpdateComplete,
event_result: EventResult::Error,
errorcode: Some(EventErrorCode::ParseResponse),
previous_version: None,
next_version: None,
download_time_ms: None,
})
]
);
});
}
#[test]
fn test_metrics_report_update_check_response_time() {
block_on(async {
let mut metrics_reporter = MockMetricsReporter::new();
let _response = StateMachineBuilder::new_stub()
.metrics_reporter(&mut metrics_reporter)
.oneshot(RequestParams::default())
.await;
#[rustfmt::skip]
assert_matches!(
metrics_reporter.metrics.as_slice(),
[
Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
Metrics::RequestsPerCheck { count: 1, successful: true },
]
);
});
}
#[test]
fn test_metrics_report_update_check_response_time_on_failure() {
block_on(async {
let mut metrics_reporter = MockMetricsReporter::new();
let mut http = MockHttpRequest::default();
for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS {
http.add_error(http_request::mock_errors::make_transport_error());
}
http.add_response(hyper::Response::default());
let _response = StateMachineBuilder::new_stub()
.http(http)
.metrics_reporter(&mut metrics_reporter)
.oneshot(RequestParams::default())
.await;
#[rustfmt::skip]
assert_matches!(
metrics_reporter.metrics.as_slice(),
[
Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
Metrics::RequestsPerCheck { count: 3, successful: false },
]
);
});
}
#[test]
fn test_metrics_report_update_check_response_time_on_failure_followed_by_success() {
block_on(async {
let mut metrics_reporter = MockMetricsReporter::new();
let mut http = MockHttpRequest::default();
for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS - 1 {
http.add_error(http_request::mock_errors::make_transport_error());
}
http.add_response(hyper::Response::default());
let _response = StateMachineBuilder::new_stub()
.http(http)
.metrics_reporter(&mut metrics_reporter)
.oneshot(RequestParams::default())
.await;
#[rustfmt::skip]
assert_matches!(
metrics_reporter.metrics.as_slice(),
[
Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
Metrics::RequestsPerCheck { count: 3, successful: true },
Metrics::OmahaEventLost(Event {
event_type: EventType::UpdateComplete,
event_result: EventResult::Error,
errorcode: Some(EventErrorCode::ParseResponse),
previous_version: None,
next_version: None,
download_time_ms: None
}),
]
);
});
}
#[test]
fn test_metrics_report_requests_per_check() {
block_on(async {
let mut metrics_reporter = MockMetricsReporter::new();
let _response = StateMachineBuilder::new_stub()
.metrics_reporter(&mut metrics_reporter)
.oneshot(RequestParams::default())
.await;
assert!(metrics_reporter
.metrics
.contains(&Metrics::RequestsPerCheck {
count: 1,
successful: true
}));
});
}
#[test]
fn test_metrics_report_requests_per_check_on_failure_followed_by_success() {
block_on(async {
let mut metrics_reporter = MockMetricsReporter::new();
let mut http = MockHttpRequest::default();
for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS - 1 {
http.add_error(http_request::mock_errors::make_transport_error());
}
http.add_response(hyper::Response::default());
let _response = StateMachineBuilder::new_stub()
.http(http)
.metrics_reporter(&mut metrics_reporter)
.oneshot(RequestParams::default())
.await;
assert!(!metrics_reporter.metrics.is_empty());
assert!(metrics_reporter
.metrics
.contains(&Metrics::RequestsPerCheck {
count: MAX_OMAHA_REQUEST_ATTEMPTS,
successful: true
}));
});
}
#[test]
fn test_metrics_report_requests_per_check_on_failure() {
block_on(async {
let mut metrics_reporter = MockMetricsReporter::new();
let mut http = MockHttpRequest::default();
for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS {
http.add_error(http_request::mock_errors::make_transport_error());
}
http.add_response(hyper::Response::default());
let _response = StateMachineBuilder::new_stub()
.http(http)
.metrics_reporter(&mut metrics_reporter)
.oneshot(RequestParams::default())
.await;
assert!(!metrics_reporter.metrics.is_empty());
assert!(metrics_reporter
.metrics
.contains(&Metrics::RequestsPerCheck {
count: MAX_OMAHA_REQUEST_ATTEMPTS,
successful: false
}));
});
}
#[test]
fn test_requests_per_check_backoff_with_mock_timer() {
block_on(async {
let mut timer = MockTimer::new();
timer.expect_for_range(Duration::from_millis(500), Duration::from_millis(1500));
timer.expect_for_range(Duration::from_millis(1500), Duration::from_millis(2500));
let requested_waits = timer.get_requested_waits_view();
let response = StateMachineBuilder::new_stub()
.http(MockHttpRequest::empty())
.timer(timer)
.oneshot(RequestParams::default())
.await;
let waits = requested_waits.borrow();
assert_eq!(waits.len(), 2);
assert_matches!(
waits[0],
RequestedWait::For(d) if d >= Duration::from_millis(500) && d <= Duration::from_millis(1500)
);
assert_matches!(
waits[1],
RequestedWait::For(d) if d >= Duration::from_millis(1500) && d <= Duration::from_millis(2500)
);
assert_matches!(
response,
Err(UpdateCheckError::OmahaRequest(
OmahaRequestError::HttpStatus(_)
))
);
});
}
#[test]
fn test_metrics_report_update_check_failure_reason_omaha() {
block_on(async {
let mut metrics_reporter = MockMetricsReporter::new();
let mut state_machine = StateMachineBuilder::new_stub()
.metrics_reporter(&mut metrics_reporter)
.build()
.await;
state_machine.run_once().await;
assert!(metrics_reporter
.metrics
.contains(&Metrics::UpdateCheckFailureReason(
UpdateCheckFailureReason::Omaha
)));
});
}
#[test]
fn test_metrics_report_update_check_failure_reason_network() {
block_on(async {
let mut metrics_reporter = MockMetricsReporter::new();
let mut state_machine = StateMachineBuilder::new_stub()
.http(MockHttpRequest::empty())
.metrics_reporter(&mut metrics_reporter)
.build()
.await;
state_machine.run_once().await;
assert!(metrics_reporter
.metrics
.contains(&Metrics::UpdateCheckFailureReason(
UpdateCheckFailureReason::Network
)));
});
}
#[test]
fn test_persist_last_update_time() {
block_on(async {
let storage = Rc::new(Mutex::new(MemStorage::new()));
StateMachineBuilder::new_stub()
.storage(Rc::clone(&storage))
.oneshot_check()
.await
.map(|_| ())
.collect::<()>()
.await;
let storage = storage.lock().await;
storage.get_int(LAST_UPDATE_TIME).await.unwrap();
assert!(storage.committed());
});
}
#[test]
fn test_persist_server_dictated_poll_interval() {
block_on(async {
let response = HttpResponse::builder()
.header(X_RETRY_AFTER, 1234)
.body(make_noupdate_httpresponse())
.unwrap();
let http = MockHttpRequest::new(response);
let storage = Rc::new(Mutex::new(MemStorage::new()));
let mut state_machine = StateMachineBuilder::new_stub()
.http(http)
.storage(Rc::clone(&storage))
.build()
.await;
state_machine
.oneshot(RequestParams::default())
.await
.unwrap();
assert_eq!(
state_machine.context.state.server_dictated_poll_interval,
Some(Duration::from_secs(1234))
);
let storage = storage.lock().await;
assert_eq!(
storage.get_int(SERVER_DICTATED_POLL_INTERVAL).await,
Some(1234000000)
);
assert!(storage.committed());
});
}
#[test]
fn test_persist_server_dictated_poll_interval_http_error() {
block_on(async {
let response = HttpResponse::builder()
.status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
.header(X_RETRY_AFTER, 1234)
.body(vec![])
.unwrap();
let http = MockHttpRequest::new(response);
let storage = Rc::new(Mutex::new(MemStorage::new()));
let mut state_machine = StateMachineBuilder::new_stub()
.http(http)
.storage(Rc::clone(&storage))
.build()
.await;
assert_matches!(
state_machine.oneshot(RequestParams::default()).await,
Err(UpdateCheckError::OmahaRequest(
OmahaRequestError::HttpStatus(_)
))
);
assert_eq!(
state_machine.context.state.server_dictated_poll_interval,
Some(Duration::from_secs(1234))
);
let storage = storage.lock().await;
assert_eq!(
storage.get_int(SERVER_DICTATED_POLL_INTERVAL).await,
Some(1234000000)
);
assert!(storage.committed());
});
}
#[test]
fn test_persist_server_dictated_poll_interval_max_duration() {
block_on(async {
let response = HttpResponse::builder()
.status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
.header(X_RETRY_AFTER, 123456789)
.body(vec![])
.unwrap();
let http = MockHttpRequest::new(response);
let storage = Rc::new(Mutex::new(MemStorage::new()));
let mut state_machine = StateMachineBuilder::new_stub()
.http(http)
.storage(Rc::clone(&storage))
.build()
.await;
assert_matches!(
state_machine.oneshot(RequestParams::default()).await,
Err(UpdateCheckError::OmahaRequest(
OmahaRequestError::HttpStatus(_)
))
);
assert_eq!(
state_machine.context.state.server_dictated_poll_interval,
Some(Duration::from_secs(86400))
);
let storage = storage.lock().await;
assert_eq!(
storage.get_int(SERVER_DICTATED_POLL_INTERVAL).await,
Some(86400000000)
);
assert!(storage.committed());
});
}
#[test]
fn test_server_dictated_poll_interval_with_transport_error_no_retry() {
block_on(async {
let mut http = MockHttpRequest::empty();
http.add_error(http_request::mock_errors::make_transport_error());
let mut storage = MemStorage::new();
let _ = storage.set_int(SERVER_DICTATED_POLL_INTERVAL, 1234000000);
let _ = storage.commit();
let storage = Rc::new(Mutex::new(storage));
let mut state_machine = StateMachineBuilder::new_stub()
.http(http)
.storage(Rc::clone(&storage))
.build()
.await;
assert_matches!(
state_machine.oneshot(RequestParams::default()).await,
Err(UpdateCheckError::OmahaRequest(
OmahaRequestError::HttpTransport(_)
))
);
assert_eq!(
state_machine.context.state.server_dictated_poll_interval,
Some(Duration::from_secs(1234))
);
});
}
#[test]
fn test_persist_app() {
block_on(async {
let storage = Rc::new(Mutex::new(MemStorage::new()));
let app_set = make_test_app_set();
StateMachineBuilder::new_stub()
.storage(Rc::clone(&storage))
.app_set(app_set.clone())
.oneshot_check()
.await
.map(|_| ())
.collect::<()>()
.await;
let storage = storage.lock().await;
let apps = app_set.lock().await.get_apps();
storage.get_string(&apps[0].id).await.unwrap();
assert!(storage.committed());
});
}
#[test]
fn test_load_last_update_time() {
block_on(async {
let mut storage = MemStorage::new();
let mut mock_time = MockTimeSource::new_from_now();
mock_time.truncate_submicrosecond_walltime();
let last_update_time = mock_time.now_in_walltime() - Duration::from_secs(999);
storage
.set_time(LAST_UPDATE_TIME, last_update_time)
.await
.unwrap();
let state_machine = StateMachineBuilder::new_stub()
.policy_engine(StubPolicyEngine::new(&mock_time))
.storage(Rc::new(Mutex::new(storage)))
.build()
.await;
assert_eq!(
state_machine.context.schedule.last_update_time.unwrap(),
PartialComplexTime::Wall(last_update_time)
);
});
}
#[test]
fn test_load_server_dictated_poll_interval() {
block_on(async {
let mut storage = MemStorage::new();
storage
.set_int(SERVER_DICTATED_POLL_INTERVAL, 56789)
.await
.unwrap();
let state_machine = StateMachineBuilder::new_stub()
.storage(Rc::new(Mutex::new(storage)))
.build()
.await;
assert_eq!(
Some(Duration::from_micros(56789)),
state_machine.context.state.server_dictated_poll_interval
);
});
}
#[test]
fn test_load_app() {
block_on(async {
let app_set = VecAppSet::new(vec![App::builder()
.id("{00000000-0000-0000-0000-000000000001}")
.version([1, 2, 3, 4])
.build()]);
let mut storage = MemStorage::new();
let persisted_app = PersistedApp {
cohort: Cohort {
id: Some("cohort_id".to_string()),
hint: Some("test_channel".to_string()),
name: None,
},
user_counting: UserCounting::ClientRegulatedByDate(Some(22222)),
};
let json = serde_json::to_string(&persisted_app).unwrap();
let apps = app_set.get_apps();
storage.set_string(&apps[0].id, &json).await.unwrap();
let app_set = Rc::new(Mutex::new(app_set));
let _state_machine = StateMachineBuilder::new_stub()
.storage(Rc::new(Mutex::new(storage)))
.app_set(Rc::clone(&app_set))
.build()
.await;
let apps = app_set.lock().await.get_apps();
assert_eq!(persisted_app.cohort, apps[0].cohort);
assert_eq!(
UserCounting::ClientRegulatedByDate(Some(22222)),
apps[0].user_counting
);
});
}
#[test]
fn test_report_check_interval_with_no_storage() {
block_on(async {
let mut mock_time = MockTimeSource::new_from_now();
let mut state_machine = StateMachineBuilder::new_stub()
.policy_engine(StubPolicyEngine::new(mock_time.clone()))
.metrics_reporter(MockMetricsReporter::new())
.build()
.await;
state_machine
.report_check_interval(InstallSource::ScheduledTask)
.await;
assert!(state_machine.metrics_reporter.metrics.is_empty());
let interval = Duration::from_micros(999999);
mock_time.advance(interval);
state_machine
.report_check_interval(InstallSource::ScheduledTask)
.await;
assert_eq!(
state_machine.metrics_reporter.metrics,
vec![Metrics::UpdateCheckInterval {
interval,
clock: ClockType::Monotonic,
install_source: InstallSource::ScheduledTask,
}]
);
});
}
#[test]
fn test_report_check_interval_mono_transition() {
block_on(async {
let mut mock_time = MockTimeSource::new_from_now();
let mut state_machine = StateMachineBuilder::new_stub()
.policy_engine(StubPolicyEngine::new(mock_time.clone()))
.metrics_reporter(MockMetricsReporter::new())
.build()
.await;
let initial_duration = Duration::from_secs(999);
let initial_time = mock_time.now_in_walltime() - initial_duration;
state_machine.context.schedule.last_update_check_time =
Some(PartialComplexTime::Wall(initial_time));
state_machine
.report_check_interval(InstallSource::ScheduledTask)
.await;
let interval = Duration::from_micros(999999);
mock_time.advance(interval);
state_machine
.report_check_interval(InstallSource::ScheduledTask)
.await;
mock_time.advance(interval);
state_machine
.report_check_interval(InstallSource::ScheduledTask)
.await;
assert_eq!(
state_machine.metrics_reporter.metrics,
vec![
Metrics::UpdateCheckInterval {
interval: initial_duration,
clock: ClockType::Wall,
install_source: InstallSource::ScheduledTask,
},
Metrics::UpdateCheckInterval {
interval,
clock: ClockType::Monotonic,
install_source: InstallSource::ScheduledTask,
},
Metrics::UpdateCheckInterval {
interval,
clock: ClockType::Monotonic,
install_source: InstallSource::ScheduledTask,
},
]
);
});
}
#[derive(Debug)]
pub struct TestInstaller {
reboot_called: Rc<RefCell<bool>>,
install_fails: usize,
mock_time: MockTimeSource,
}
struct TestInstallerBuilder {
install_fails: usize,
mock_time: MockTimeSource,
}
impl TestInstaller {
fn builder(mock_time: MockTimeSource) -> TestInstallerBuilder {
TestInstallerBuilder {
install_fails: 0,
mock_time,
}
}
}
impl TestInstallerBuilder {
fn add_install_fail(mut self) -> Self {
self.install_fails += 1;
self
}
fn build(self) -> TestInstaller {
TestInstaller {
reboot_called: Rc::new(RefCell::new(false)),
install_fails: self.install_fails,
mock_time: self.mock_time,
}
}
}
const INSTALL_DURATION: Duration = Duration::from_micros(98765433);
impl Installer for TestInstaller {
type InstallPlan = StubPlan;
type Error = StubInstallErrors;
type InstallResult = ();
fn perform_install<'a>(
&'a mut self,
_install_plan: &StubPlan,
observer: Option<&'a dyn ProgressObserver>,
) -> LocalBoxFuture<'a, (Self::InstallResult, Vec<AppInstallResult<Self::Error>>)> {
if self.install_fails > 0 {
self.install_fails -= 1;
future::ready((
(),
vec![AppInstallResult::Failed(StubInstallErrors::Failed)],
))
.boxed()
} else {
self.mock_time.advance(INSTALL_DURATION);
async move {
if let Some(observer) = observer {
observer.receive_progress(None, 0.0, None, None).await;
observer.receive_progress(None, 0.3, None, None).await;
observer.receive_progress(None, 0.9, None, None).await;
observer.receive_progress(None, 1.0, None, None).await;
}
((), vec![AppInstallResult::Installed])
}
.boxed_local()
}
}
fn perform_reboot(&mut self) -> LocalBoxFuture<'_, Result<(), anyhow::Error>> {
self.reboot_called.replace(true);
future::ready(Ok(())).boxed_local()
}
fn try_create_install_plan<'a>(
&'a self,
_request_params: &'a RequestParams,
_request_metadata: Option<&'a RequestMetadata>,
_response: &'a Response,
_response_bytes: Vec<u8>,
_ecdsa_signature: Option<Vec<u8>>,
) -> LocalBoxFuture<'a, Result<Self::InstallPlan, Self::Error>> {
future::ready(Ok(StubPlan)).boxed_local()
}
}
#[test]
fn test_report_successful_update_duration() {
block_on(async {
let http = MockHttpRequest::new(make_update_available_response());
let storage = Rc::new(Mutex::new(MemStorage::new()));
let mut mock_time = MockTimeSource::new_from_now();
mock_time.truncate_submicrosecond_walltime();
let now = mock_time.now();
let update_completed_time = now + INSTALL_DURATION;
let expected_update_duration = update_completed_time.wall_duration_since(now).unwrap();
let first_seen_time = now - Duration::from_micros(1000);
let expected_duration_since_first_seen = update_completed_time
.wall_duration_since(first_seen_time)
.unwrap();
let mut state_machine = StateMachineBuilder::new_stub()
.http(http)
.installer(TestInstaller::builder(mock_time.clone()).build())
.policy_engine(StubPolicyEngine::new(mock_time.clone()))
.metrics_reporter(MockMetricsReporter::new())
.storage(Rc::clone(&storage))
.build()
.await;
{
let mut storage = storage.lock().await;
storage.set_string(INSTALL_PLAN_ID, "").await.unwrap();
storage
.set_time(UPDATE_FIRST_SEEN_TIME, first_seen_time)
.await
.unwrap();
storage.commit().await.unwrap();
}
state_machine.run_once().await;
#[rustfmt::skip]
assert_matches!(
state_machine.metrics_reporter.metrics.as_slice(),
[
Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
Metrics::RequestsPerCheck { count: 1, successful: true },
Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadStarted, event_result: EventResult::Success, .. }),
Metrics::SuccessfulUpdateDuration(install_duration),
Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadFinished, event_result: EventResult::Success, .. }),
Metrics::OmahaEventLost(Event { event_type: EventType::UpdateComplete, event_result: EventResult::Success, .. }),
Metrics::SuccessfulUpdateFromFirstSeen(duration_since_first_seen),
Metrics::AttemptsToSuccessfulCheck(1),
Metrics::AttemptsToSuccessfulInstall { count: 1, successful: true },
]
if
*install_duration == expected_update_duration &&
*duration_since_first_seen == expected_duration_since_first_seen
);
});
}
#[test]
fn test_report_failed_update_duration() {
block_on(async {
let http = MockHttpRequest::new(make_update_available_response());
let mut state_machine = StateMachineBuilder::new_stub()
.http(http)
.installer(StubInstaller { should_fail: true })
.metrics_reporter(MockMetricsReporter::new())
.build()
.await;
state_machine.run_once().await;
assert!(state_machine
.metrics_reporter
.metrics
.contains(&Metrics::FailedUpdateDuration(Duration::from_micros(0))));
});
}
#[test]
fn test_record_update_first_seen_time() {
block_on(async {
let storage = Rc::new(Mutex::new(MemStorage::new()));
let mut state_machine = StateMachineBuilder::new_stub()
.storage(Rc::clone(&storage))
.build()
.await;
let mut mock_time = MockTimeSource::new_from_now();
mock_time.truncate_submicrosecond_walltime();
let now = mock_time.now_in_walltime();
assert_eq!(
state_machine.record_update_first_seen_time("id", now).await,
now
);
{
let storage = storage.lock().await;
assert_eq!(
storage.get_string(INSTALL_PLAN_ID).await,
Some("id".to_string())
);
assert_eq!(storage.get_time(UPDATE_FIRST_SEEN_TIME).await, Some(now));
assert_eq!(storage.len(), 2);
assert!(storage.committed());
}
mock_time.advance(Duration::from_secs(1000));
let now2 = mock_time.now_in_walltime();
assert_eq!(
state_machine
.record_update_first_seen_time("id", now2)
.await,
now
);
{
let storage = storage.lock().await;
assert_eq!(
storage.get_string(INSTALL_PLAN_ID).await,
Some("id".to_string())
);
assert_eq!(storage.get_time(UPDATE_FIRST_SEEN_TIME).await, Some(now));
assert_eq!(storage.len(), 2);
assert!(storage.committed());
}
assert_eq!(
state_machine
.record_update_first_seen_time("id2", now2)
.await,
now2
);
{
let storage = storage.lock().await;
assert_eq!(
storage.get_string(INSTALL_PLAN_ID).await,
Some("id2".to_string())
);
assert_eq!(storage.get_time(UPDATE_FIRST_SEEN_TIME).await, Some(now2));
assert_eq!(storage.len(), 2);
assert!(storage.committed());
}
});
}
#[test]
fn test_report_attempts_to_successful_check() {
block_on(async {
let storage = Rc::new(Mutex::new(MemStorage::new()));
let mut state_machine = StateMachineBuilder::new_stub()
.installer(StubInstaller { should_fail: true })
.metrics_reporter(MockMetricsReporter::new())
.storage(Rc::clone(&storage))
.build()
.await;
state_machine
.report_attempts_to_successful_check(true)
.await;
assert_eq!(
state_machine.context.state.consecutive_failed_update_checks,
0
);
assert_eq!(
state_machine.metrics_reporter.metrics,
vec![Metrics::AttemptsToSuccessfulCheck(1)]
);
state_machine
.report_attempts_to_successful_check(false)
.await;
assert_eq!(
state_machine.context.state.consecutive_failed_update_checks,
1
);
state_machine
.report_attempts_to_successful_check(false)
.await;
assert_eq!(
state_machine.context.state.consecutive_failed_update_checks,
2
);
state_machine
.report_attempts_to_successful_check(true)
.await;
assert_eq!(
state_machine.context.state.consecutive_failed_update_checks,
0
);
assert_eq!(
state_machine.metrics_reporter.metrics,
vec![
Metrics::AttemptsToSuccessfulCheck(1),
Metrics::AttemptsToSuccessfulCheck(3)
]
);
});
}
#[test]
fn test_ping_omaha_updates_consecutive_failed_update_checks_and_persists() {
block_on(async {
let mut http = MockHttpRequest::empty();
http.add_error(http_request::mock_errors::make_transport_error());
http.add_response(HttpResponse::new(vec![]));
let response = json!({"response":{
"server": "prod",
"protocol": "3.0",
"app": [{
"appid": "{00000000-0000-0000-0000-000000000001}",
"status": "ok",
}],
}});
let response = serde_json::to_vec(&response).unwrap();
http.add_response(HttpResponse::new(response));
let storage = Rc::new(Mutex::new(MemStorage::new()));
{
let mut storage = storage.lock().await;
let _ = storage.set_int(CONSECUTIVE_FAILED_UPDATE_CHECKS, 1);
let _ = storage.commit();
}
let mut state_machine = StateMachineBuilder::new_stub()
.storage(Rc::clone(&storage))
.http(http)
.build()
.await;
async_generator::generate(move |mut co| async move {
state_machine.ping_omaha(&mut co).await;
assert_eq!(
state_machine.context.state.consecutive_failed_update_checks,
2
);
{
let storage = storage.lock().await;
assert_eq!(
storage.get_int(CONSECUTIVE_FAILED_UPDATE_CHECKS).await,
Some(2)
);
}
state_machine.ping_omaha(&mut co).await;
assert_eq!(
state_machine.context.state.consecutive_failed_update_checks,
3
);
{
let storage = storage.lock().await;
assert_eq!(
storage.get_int(CONSECUTIVE_FAILED_UPDATE_CHECKS).await,
Some(3)
);
}
state_machine.ping_omaha(&mut co).await;
assert_eq!(
state_machine.context.state.consecutive_failed_update_checks,
0
);
{
let storage = storage.lock().await;
assert_eq!(
storage.get_int(CONSECUTIVE_FAILED_UPDATE_CHECKS).await,
None
);
}
})
.into_complete()
.await;
});
}
#[test]
fn test_report_attempts_to_successful_install() {
block_on(async {
let http = MockHttpRequest::new(make_update_available_response());
let storage = Rc::new(Mutex::new(MemStorage::new()));
let mock_time = MockTimeSource::new_from_now();
let mut state_machine = StateMachineBuilder::new_stub()
.http(http)
.installer(TestInstaller::builder(mock_time.clone()).build())
.policy_engine(StubPolicyEngine::new(mock_time.clone()))
.metrics_reporter(MockMetricsReporter::new())
.storage(Rc::clone(&storage))
.build()
.await;
state_machine.run_once().await;
#[rustfmt::skip]
assert_matches!(
state_machine.metrics_reporter.metrics.as_slice(),
[
Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
Metrics::RequestsPerCheck { count: 1, successful: true },
Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadStarted, event_result: EventResult::Success, .. }),
Metrics::SuccessfulUpdateDuration(_),
Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadFinished, event_result: EventResult::Success, .. }),
Metrics::OmahaEventLost(Event { event_type: EventType::UpdateComplete, event_result: EventResult::Success, .. }),
Metrics::SuccessfulUpdateFromFirstSeen(_),
Metrics::AttemptsToSuccessfulCheck(1),
Metrics::AttemptsToSuccessfulInstall { count: 1, successful: true },
]
);
});
}
#[test]
fn test_report_attempts_to_successful_install_fails_then_succeeds() {
block_on(async {
let mut http = MockHttpRequest::new(make_update_available_response());
http.add_response(HttpResponse::new(vec![]));
http.add_response(HttpResponse::new(vec![]));
http.add_response(make_update_available_response());
http.add_response(HttpResponse::new(vec![]));
http.add_response(HttpResponse::new(vec![]));
let storage = Rc::new(Mutex::new(MemStorage::new()));
let mock_time = MockTimeSource::new_from_now();
let mut state_machine = StateMachineBuilder::new_stub()
.http(http)
.installer(
TestInstaller::builder(mock_time.clone())
.add_install_fail()
.build(),
)
.policy_engine(StubPolicyEngine::new(mock_time.clone()))
.metrics_reporter(MockMetricsReporter::new())
.storage(Rc::clone(&storage))
.build()
.await;
state_machine.run_once().await;
state_machine.run_once().await;
#[rustfmt::skip]
assert_matches!(
state_machine.metrics_reporter.metrics.as_slice(),
[
Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
Metrics::RequestsPerCheck { count: 1, successful: true },
Metrics::FailedUpdateDuration(_),
Metrics::AttemptsToSuccessfulCheck(1),
Metrics::AttemptsToSuccessfulInstall { count: 1, successful: false },
Metrics::UpdateCheckInterval { .. },
Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
Metrics::RequestsPerCheck { count: 1, successful: true },
Metrics::SuccessfulUpdateDuration(_),
Metrics::OmahaEventLost(Event { .. }),
Metrics::SuccessfulUpdateFromFirstSeen(_),
Metrics::AttemptsToSuccessfulCheck(1),
Metrics::AttemptsToSuccessfulInstall { count: 2, successful: true }
]
);
});
}
#[test]
fn test_report_attempts_to_successful_install_does_not_report_for_no_update() {
block_on(async {
let response = json!({"response":{
"server": "prod",
"protocol": "3.0",
"app": [{
"appid": "{00000000-0000-0000-0000-000000000001}",
"status": "ok",
"updatecheck": {
"status": "noupdate",
"info": "no update for you"
}
}],
}});
let response = serde_json::to_vec(&response).unwrap();
let http = MockHttpRequest::new(HttpResponse::new(response.clone()));
let storage = Rc::new(Mutex::new(MemStorage::new()));
let mock_time = MockTimeSource::new_from_now();
let mut state_machine = StateMachineBuilder::new_stub()
.http(http)
.installer(TestInstaller::builder(mock_time.clone()).build())
.policy_engine(StubPolicyEngine::new(mock_time.clone()))
.metrics_reporter(MockMetricsReporter::new())
.storage(Rc::clone(&storage))
.build()
.await;
state_machine.run_once().await;
#[rustfmt::skip]
assert_matches!(
state_machine.metrics_reporter.metrics.as_slice(),
[
Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
Metrics::RequestsPerCheck { count: 1, successful: true },
Metrics::AttemptsToSuccessfulCheck(1),
]
);
});
}
#[test]
fn test_successful_update_triggers_reboot() {
let mut pool = LocalPool::new();
let spawner = pool.spawner();
let http = MockHttpRequest::new(make_update_available_response());
let mock_time = MockTimeSource::new_from_now();
let next_update_time = mock_time.now();
let (timer, mut timers) = BlockingTimer::new();
let installer = TestInstaller::builder(mock_time.clone()).build();
let reboot_called = Rc::clone(&installer.reboot_called);
let (_ctl, state_machine) = pool.run_until(
StateMachineBuilder::new_stub()
.http(http)
.installer(installer)
.policy_engine(StubPolicyEngine::new(mock_time))
.timer(timer)
.start(),
);
let observer = TestObserver::default();
spawner
.spawn_local(observer.observe(state_machine))
.unwrap();
let blocked_timer = pool.run_until(timers.next()).unwrap();
assert_eq!(
blocked_timer.requested_wait(),
RequestedWait::Until(next_update_time.into())
);
blocked_timer.unblock();
pool.run_until_stalled();
assert!(*reboot_called.borrow());
}
#[test]
fn test_skip_reboot_if_not_needed() {
let mut pool = LocalPool::new();
let spawner = pool.spawner();
let http = MockHttpRequest::new(make_update_available_response());
let mock_time = MockTimeSource::new_from_now();
let next_update_time = mock_time.now();
let reboot_check_options_received = Rc::new(RefCell::new(vec![]));
let policy_engine = MockPolicyEngine {
reboot_check_options_received: Rc::clone(&reboot_check_options_received),
check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
time_source: mock_time.clone(),
reboot_needed: Rc::new(RefCell::new(false)),
..MockPolicyEngine::default()
};
let (timer, mut timers) = BlockingTimer::new();
let installer = TestInstaller::builder(mock_time).build();
let reboot_called = Rc::clone(&installer.reboot_called);
let (_ctl, state_machine) = pool.run_until(
StateMachineBuilder::new_stub()
.http(http)
.installer(installer)
.policy_engine(policy_engine)
.timer(timer)
.start(),
);
let observer = TestObserver::default();
spawner
.spawn_local(observer.observe(state_machine))
.unwrap();
let blocked_timer = pool.run_until(timers.next()).unwrap();
assert_eq!(
blocked_timer.requested_wait(),
RequestedWait::Until(next_update_time.into())
);
blocked_timer.unblock();
pool.run_until_stalled();
assert_eq!(
observer.take_states(),
vec![
State::CheckingForUpdates(InstallSource::ScheduledTask),
State::InstallingUpdate,
State::Idle
]
);
assert_eq!(*reboot_check_options_received.borrow(), vec![]);
assert!(!*reboot_called.borrow());
}
#[test]
fn test_failed_update_does_not_trigger_reboot() {
let mut pool = LocalPool::new();
let spawner = pool.spawner();
let http = MockHttpRequest::new(make_update_available_response());
let mock_time = MockTimeSource::new_from_now();
let next_update_time = mock_time.now();
let (timer, mut timers) = BlockingTimer::new();
let installer = TestInstaller::builder(mock_time.clone())
.add_install_fail()
.build();
let reboot_called = Rc::clone(&installer.reboot_called);
let (_ctl, state_machine) = pool.run_until(
StateMachineBuilder::new_stub()
.http(http)
.installer(installer)
.policy_engine(StubPolicyEngine::new(mock_time))
.timer(timer)
.start(),
);
let observer = TestObserver::default();
spawner
.spawn_local(observer.observe(state_machine))
.unwrap();
let blocked_timer = pool.run_until(timers.next()).unwrap();
assert_eq!(
blocked_timer.requested_wait(),
RequestedWait::Until(next_update_time.into())
);
blocked_timer.unblock();
pool.run_until_stalled();
assert!(!*reboot_called.borrow());
}
#[test]
fn test_reboots_immediately_if_user_initiated_update_requests_occurs_during_install() {
let mut pool = LocalPool::new();
let spawner = pool.spawner();
let http = MockHttpRequest::new(make_update_available_response());
let mock_time = MockTimeSource::new_from_now();
let (send_install, mut recv_install) = mpsc::channel(0);
let (send_reboot, mut recv_reboot) = mpsc::channel(0);
let reboot_check_options_received = Rc::new(RefCell::new(vec![]));
let policy_engine = MockPolicyEngine {
reboot_check_options_received: Rc::clone(&reboot_check_options_received),
check_timing: Some(CheckTiming::builder().time(mock_time.now()).build()),
..MockPolicyEngine::default()
};
let (mut ctl, state_machine) = pool.run_until(
StateMachineBuilder::new_stub()
.http(http)
.installer(BlockingInstaller {
on_install: send_install,
on_reboot: Some(send_reboot),
})
.policy_engine(policy_engine)
.start(),
);
let observer = TestObserver::default();
spawner
.spawn_local(observer.observe(state_machine))
.unwrap();
let unblock_install = pool.run_until(recv_install.next()).unwrap();
pool.run_until_stalled();
assert_eq!(
observer.take_states(),
vec![
State::CheckingForUpdates(InstallSource::ScheduledTask),
State::InstallingUpdate
]
);
pool.run_until(async {
assert_eq!(
ctl.start_update_check(CheckOptions {
source: InstallSource::OnDemand
})
.await,
Ok(StartUpdateCheckResponse::AlreadyRunning)
);
});
pool.run_until_stalled();
assert_eq!(observer.take_states(), vec![]);
unblock_install
.send(vec![AppInstallResult::Installed])
.unwrap();
pool.run_until_stalled();
assert_eq!(observer.take_states(), vec![State::WaitingForReboot]);
let unblock_reboot = pool.run_until(recv_reboot.next()).unwrap();
pool.run_until_stalled();
unblock_reboot.send(Ok(())).unwrap();
assert_eq!(
*reboot_check_options_received.borrow(),
vec![CheckOptions {
source: InstallSource::OnDemand
}]
);
}
#[test]
fn test_reboots_immediately_when_check_now_comes_in_during_wait() {
let mut pool = LocalPool::new();
let spawner = pool.spawner();
let mut http = MockHttpRequest::new(make_update_available_response());
http.add_response(HttpResponse::new(vec![]));
http.add_response(HttpResponse::new(vec![]));
http.add_response(HttpResponse::new(vec![]));
http.add_response(make_update_available_response());
let mut mock_time = MockTimeSource::new_from_now();
mock_time.truncate_submicrosecond_walltime();
let next_update_time = mock_time.now() + Duration::from_secs(1000);
let (timer, mut timers) = BlockingTimer::new();
let reboot_allowed = Rc::new(RefCell::new(false));
let reboot_check_options_received = Rc::new(RefCell::new(vec![]));
let policy_engine = MockPolicyEngine {
time_source: mock_time.clone(),
reboot_allowed: Rc::clone(&reboot_allowed),
check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
reboot_check_options_received: Rc::clone(&reboot_check_options_received),
..MockPolicyEngine::default()
};
let installer = TestInstaller::builder(mock_time.clone()).build();
let reboot_called = Rc::clone(&installer.reboot_called);
let storage_ref = Rc::new(Mutex::new(MemStorage::new()));
let apps = make_test_app_set();
let (mut ctl, state_machine) = pool.run_until(
StateMachineBuilder::new_stub()
.app_set(apps)
.http(http)
.installer(installer)
.policy_engine(policy_engine)
.timer(timer)
.storage(Rc::clone(&storage_ref))
.start(),
);
let observer = TestObserver::default();
spawner
.spawn_local(observer.observe(state_machine))
.unwrap();
let blocked_timer = pool.run_until(timers.next()).unwrap();
assert_eq!(
blocked_timer.requested_wait(),
RequestedWait::Until(next_update_time.into())
);
blocked_timer.unblock();
pool.run_until_stalled();
let blocked_timer1 = pool.run_until(timers.next()).unwrap();
let blocked_timer2 = pool.run_until(timers.next()).unwrap();
let (wait_for_reboot_timer, _wait_for_next_ping_timer) =
match blocked_timer1.requested_wait() {
RequestedWait::For(_) => (blocked_timer1, blocked_timer2),
RequestedWait::Until(_) => (blocked_timer2, blocked_timer1),
};
assert_eq!(
wait_for_reboot_timer.requested_wait(),
RequestedWait::For(CHECK_REBOOT_ALLOWED_INTERVAL)
);
assert!(!*reboot_called.borrow());
*reboot_allowed.borrow_mut() = true;
pool.run_until(async {
assert_eq!(
ctl.start_update_check(CheckOptions {
source: InstallSource::OnDemand
})
.await,
Ok(StartUpdateCheckResponse::AlreadyRunning)
);
});
pool.run_until_stalled();
assert!(*reboot_called.borrow());
assert_eq!(
*reboot_check_options_received.borrow(),
vec![
CheckOptions {
source: InstallSource::ScheduledTask
},
CheckOptions {
source: InstallSource::OnDemand
},
]
);
}
#[test]
fn test_wait_for_reboot() {
let mut pool = LocalPool::new();
let spawner = pool.spawner();
let mut http = MockHttpRequest::new(make_update_available_response());
http.add_response(HttpResponse::new(vec![]));
http.add_response(HttpResponse::new(vec![]));
http.add_response(HttpResponse::new(vec![]));
http.add_response(make_update_available_response());
let ping_request_viewer = MockHttpRequest::from_request_cell(http.get_request_cell());
let mut mock_time = MockTimeSource::new_from_now();
mock_time.truncate_submicrosecond_walltime();
let next_update_time = mock_time.now() + Duration::from_secs(1000);
let (timer, mut timers) = BlockingTimer::new();
let reboot_allowed = Rc::new(RefCell::new(false));
let policy_engine = MockPolicyEngine {
time_source: mock_time.clone(),
reboot_allowed: Rc::clone(&reboot_allowed),
check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
..MockPolicyEngine::default()
};
let installer = TestInstaller::builder(mock_time.clone()).build();
let reboot_called = Rc::clone(&installer.reboot_called);
let storage_ref = Rc::new(Mutex::new(MemStorage::new()));
let apps = make_test_app_set();
let (mut ctl, state_machine) = pool.run_until(
StateMachineBuilder::new_stub()
.app_set(apps.clone())
.http(http)
.installer(installer)
.policy_engine(policy_engine)
.timer(timer)
.storage(Rc::clone(&storage_ref))
.start(),
);
let observer = TestObserver::default();
spawner
.spawn_local(observer.observe(state_machine))
.unwrap();
let blocked_timer = pool.run_until(timers.next()).unwrap();
assert_eq!(
blocked_timer.requested_wait(),
RequestedWait::Until(next_update_time.into())
);
blocked_timer.unblock();
pool.run_until_stalled();
let blocked_timer1 = pool.run_until(timers.next()).unwrap();
let blocked_timer2 = pool.run_until(timers.next()).unwrap();
let (wait_for_reboot_timer, wait_for_next_ping_timer) =
match blocked_timer1.requested_wait() {
RequestedWait::For(_) => (blocked_timer1, blocked_timer2),
RequestedWait::Until(_) => (blocked_timer2, blocked_timer1),
};
assert_eq!(
wait_for_reboot_timer.requested_wait(),
RequestedWait::For(CHECK_REBOOT_ALLOWED_INTERVAL)
);
assert_eq!(
wait_for_next_ping_timer.requested_wait(),
RequestedWait::Until(next_update_time.into())
);
mock_time.advance(Duration::from_secs(1000));
wait_for_next_ping_timer.unblock();
pool.run_until_stalled();
let config = crate::configuration::test_support::config_generator();
let request_params = RequestParams::default();
let apps = pool.run_until(apps.lock()).get_apps();
let mut expected_request_builder = RequestBuilder::new(&config, &request_params)
.session_id(GUID::from_u128(5))
.request_id(GUID::from_u128(6));
for app in &apps {
expected_request_builder = expected_request_builder.add_ping(app);
}
pool.run_until(assert_request(
&ping_request_viewer,
expected_request_builder,
));
pool.run_until(async {
assert_eq!(
ctl.start_update_check(CheckOptions::default()).await,
Ok(StartUpdateCheckResponse::AlreadyRunning)
);
});
pool.run_until(async {
let storage = storage_ref.lock().await;
let context = update_check::Context::load(&*storage).await;
assert_eq!(
context.schedule.last_update_time,
Some(mock_time.now_in_walltime().into())
);
});
let wait_for_next_ping_timer = pool.run_until(timers.next()).unwrap();
assert_eq!(
wait_for_next_ping_timer.requested_wait(),
RequestedWait::Until(next_update_time.into())
);
wait_for_reboot_timer.unblock();
pool.run_until_stalled();
assert!(!*reboot_called.borrow());
let wait_for_reboot_timer = pool.run_until(timers.next()).unwrap();
assert_eq!(
wait_for_reboot_timer.requested_wait(),
RequestedWait::For(CHECK_REBOOT_ALLOWED_INTERVAL)
);
wait_for_next_ping_timer.unblock();
pool.run_until_stalled();
let mut expected_request_builder = RequestBuilder::new(&config, &request_params)
.session_id(GUID::from_u128(7))
.request_id(GUID::from_u128(8));
for app in &apps {
expected_request_builder = expected_request_builder.add_ping(app);
}
pool.run_until(assert_request(
&ping_request_viewer,
expected_request_builder,
));
assert!(!*reboot_called.borrow());
*reboot_called.borrow_mut() = true;
wait_for_reboot_timer.unblock();
pool.run_until_stalled();
assert!(*reboot_called.borrow());
}
#[derive(Debug)]
struct BlockingInstaller {
on_install: mpsc::Sender<oneshot::Sender<Vec<AppInstallResult<StubInstallErrors>>>>,
on_reboot: Option<mpsc::Sender<oneshot::Sender<Result<(), anyhow::Error>>>>,
}
impl Installer for BlockingInstaller {
type InstallPlan = StubPlan;
type Error = StubInstallErrors;
type InstallResult = ();
fn perform_install(
&mut self,
_install_plan: &StubPlan,
_observer: Option<&dyn ProgressObserver>,
) -> LocalBoxFuture<'_, (Self::InstallResult, Vec<AppInstallResult<Self::Error>>)> {
let (send, recv) = oneshot::channel();
let send_fut = self.on_install.send(send);
async move {
send_fut.await.unwrap();
((), recv.await.unwrap())
}
.boxed_local()
}
fn perform_reboot(&mut self) -> LocalBoxFuture<'_, Result<(), anyhow::Error>> {
match &mut self.on_reboot {
Some(on_reboot) => {
let (send, recv) = oneshot::channel();
let send_fut = on_reboot.send(send);
async move {
send_fut.await.unwrap();
recv.await.unwrap()
}
.boxed_local()
}
None => future::ready(Ok(())).boxed_local(),
}
}
fn try_create_install_plan<'a>(
&'a self,
_request_params: &'a RequestParams,
_request_metadata: Option<&'a RequestMetadata>,
_response: &'a Response,
_response_bytes: Vec<u8>,
_ecdsa_signature: Option<Vec<u8>>,
) -> LocalBoxFuture<'a, Result<Self::InstallPlan, Self::Error>> {
future::ready(Ok(StubPlan)).boxed_local()
}
}
#[derive(Debug, Default)]
struct TestObserver {
states: Rc<RefCell<Vec<State>>>,
}
impl TestObserver {
fn observe(&self, s: impl Stream<Item = StateMachineEvent>) -> impl Future<Output = ()> {
let states = Rc::clone(&self.states);
async move {
futures::pin_mut!(s);
while let Some(event) = s.next().await {
if let StateMachineEvent::StateChange(state) = event {
states.borrow_mut().push(state);
}
}
}
}
fn observe_until_terminal(
&self,
s: impl Stream<Item = StateMachineEvent>,
) -> impl Future<Output = ()> {
let states = Rc::clone(&self.states);
async move {
futures::pin_mut!(s);
while let Some(event) = s.next().await {
if let StateMachineEvent::StateChange(state) = event {
states.borrow_mut().push(state);
match state {
State::Idle | State::WaitingForReboot => return,
_ => {}
}
}
}
}
}
fn take_states(&self) -> Vec<State> {
std::mem::take(&mut *self.states.borrow_mut())
}
}
#[test]
fn test_start_update_during_update_replies_with_in_progress() {
let mut pool = LocalPool::new();
let spawner = pool.spawner();
let http = MockHttpRequest::new(make_update_available_response());
let (send_install, mut recv_install) = mpsc::channel(0);
let (mut ctl, state_machine) = pool.run_until(
StateMachineBuilder::new_stub()
.http(http)
.installer(BlockingInstaller {
on_install: send_install,
on_reboot: None,
})
.start(),
);
let observer = TestObserver::default();
spawner
.spawn_local(observer.observe_until_terminal(state_machine))
.unwrap();
let unblock_install = pool.run_until(recv_install.next()).unwrap();
pool.run_until_stalled();
assert_eq!(
observer.take_states(),
vec![
State::CheckingForUpdates(InstallSource::ScheduledTask),
State::InstallingUpdate
]
);
pool.run_until(async {
assert_eq!(
ctl.start_update_check(CheckOptions::default()).await,
Ok(StartUpdateCheckResponse::AlreadyRunning)
);
});
pool.run_until_stalled();
assert_eq!(observer.take_states(), vec![]);
unblock_install
.send(vec![AppInstallResult::Installed])
.unwrap();
pool.run_until_stalled();
assert_eq!(observer.take_states(), vec![State::WaitingForReboot]);
}
#[test]
fn test_start_update_during_timer_starts_update() {
let mut pool = LocalPool::new();
let spawner = pool.spawner();
let mut mock_time = MockTimeSource::new_from_now();
let next_update_time = mock_time.now() + Duration::from_secs(321);
let (timer, mut timers) = BlockingTimer::new();
let policy_engine = MockPolicyEngine {
check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
time_source: mock_time.clone(),
..MockPolicyEngine::default()
};
let (mut ctl, state_machine) = pool.run_until(
StateMachineBuilder::new_stub()
.policy_engine(policy_engine)
.timer(timer)
.start(),
);
let observer = TestObserver::default();
spawner
.spawn_local(observer.observe(state_machine))
.unwrap();
let blocked_timer = pool.run_until(timers.next()).unwrap();
assert_eq!(
blocked_timer.requested_wait(),
RequestedWait::Until(next_update_time.into())
);
mock_time.advance(Duration::from_secs(200));
assert_eq!(observer.take_states(), vec![]);
pool.run_until_stalled();
assert_eq!(observer.take_states(), vec![]);
blocked_timer.unblock();
let blocked_timer = pool.run_until(timers.next()).unwrap();
assert_eq!(
blocked_timer.requested_wait(),
RequestedWait::Until(next_update_time.into())
);
assert_eq!(
observer.take_states(),
vec![
State::CheckingForUpdates(InstallSource::ScheduledTask),
State::ErrorCheckingForUpdate,
State::Idle
]
);
pool.run_until(async {
assert_eq!(
ctl.start_update_check(CheckOptions::default()).await,
Ok(StartUpdateCheckResponse::Started)
);
});
pool.run_until_stalled();
assert_eq!(
observer.take_states(),
vec![
State::CheckingForUpdates(InstallSource::ScheduledTask),
State::ErrorCheckingForUpdate,
State::Idle
]
);
}
#[test]
fn test_start_update_check_returns_throttled() {
let mut pool = LocalPool::new();
let spawner = pool.spawner();
let mut mock_time = MockTimeSource::new_from_now();
let next_update_time = mock_time.now() + Duration::from_secs(321);
let (timer, mut timers) = BlockingTimer::new();
let policy_engine = MockPolicyEngine {
check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
time_source: mock_time.clone(),
check_decision: CheckDecision::ThrottledByPolicy,
..MockPolicyEngine::default()
};
let (mut ctl, state_machine) = pool.run_until(
StateMachineBuilder::new_stub()
.policy_engine(policy_engine)
.timer(timer)
.start(),
);
let observer = TestObserver::default();
spawner
.spawn_local(observer.observe(state_machine))
.unwrap();
let blocked_timer = pool.run_until(timers.next()).unwrap();
assert_eq!(
blocked_timer.requested_wait(),
RequestedWait::Until(next_update_time.into())
);
mock_time.advance(Duration::from_secs(200));
assert_eq!(observer.take_states(), vec![]);
pool.run_until(async {
assert_eq!(
ctl.start_update_check(CheckOptions::default()).await,
Ok(StartUpdateCheckResponse::Throttled)
);
});
pool.run_until_stalled();
assert_eq!(observer.take_states(), vec![]);
}
#[test]
fn test_progress_observer() {
block_on(async {
let http = MockHttpRequest::new(make_update_available_response());
let mock_time = MockTimeSource::new_from_now();
let progresses = StateMachineBuilder::new_stub()
.http(http)
.installer(TestInstaller::builder(mock_time.clone()).build())
.policy_engine(StubPolicyEngine::new(mock_time))
.oneshot_check()
.await
.filter_map(|event| {
future::ready(match event {
StateMachineEvent::InstallProgressChange(InstallProgress { progress }) => {
Some(progress)
}
_ => None,
})
})
.collect::<Vec<f32>>()
.await;
assert_eq!(progresses, [0.0, 0.3, 0.9, 1.0]);
});
}
#[test]
fn test_report_waited_for_reboot_duration_doesnt_panic_on_wrong_current_time() {
block_on(async {
let metrics_reporter = MockMetricsReporter::new();
let state_machine_start_monotonic = Instant::now();
let update_finish_time = SystemTime::now();
let now_wall = update_finish_time + Duration::from_secs(1);
let now_monotonic = state_machine_start_monotonic + Duration::from_secs(10);
let mut state_machine = StateMachineBuilder::new_stub()
.metrics_reporter(metrics_reporter)
.build()
.await;
state_machine
.report_waited_for_reboot_duration(
update_finish_time,
state_machine_start_monotonic,
ComplexTime {
wall: now_wall,
mono: now_monotonic,
},
)
.expect_err("should overflow and error out");
assert!(state_machine.metrics_reporter.metrics.is_empty());
});
}
#[test]
fn test_report_waited_for_reboot_duration() {
let mut pool = LocalPool::new();
let spawner = pool.spawner();
let response = json!({"response": {
"server": "prod",
"protocol": "3.0",
"app": [{
"appid": "{00000000-0000-0000-0000-000000000001}",
"status": "ok",
"updatecheck": {
"status": "ok",
"manifest": {
"version": "1.2.3.5",
"actions": {
"action": [],
},
"packages": {
"package": [],
},
}
}
}],
}});
let response = serde_json::to_vec(&response).unwrap();
let http = MockHttpRequest::new(HttpResponse::new(response));
let mut mock_time = MockTimeSource::new_from_now();
mock_time.truncate_submicrosecond_walltime();
let storage = Rc::new(Mutex::new(MemStorage::new()));
assert_matches!(
pool.run_until(
StateMachineBuilder::new_stub()
.http(http)
.policy_engine(StubPolicyEngine::new(mock_time.clone()))
.storage(Rc::clone(&storage))
.oneshot(RequestParams::default())
),
Ok(_)
);
mock_time.advance(Duration::from_secs(999));
let config = Config {
updater: Updater {
name: "updater".to_string(),
version: Version::from([0, 1]),
},
os: OS {
version: "1.2.3.5".to_string(),
..OS::default()
},
service_url: "http://example.com/".to_string(),
omaha_public_keys: None,
};
let metrics_reporter = Rc::new(RefCell::new(MockMetricsReporter::new()));
let (_ctl, state_machine) = pool.run_until(
StateMachineBuilder::new_stub()
.config(config)
.metrics_reporter(Rc::clone(&metrics_reporter))
.policy_engine(StubPolicyEngine::new(mock_time.clone()))
.storage(Rc::clone(&storage))
.timer(MockTimer::new())
.start(),
);
let observer = TestObserver::default();
spawner
.spawn_local(observer.observe(state_machine))
.unwrap();
pool.run_until_stalled();
assert_eq!(
metrics_reporter
.borrow()
.metrics
.iter()
.filter(|m| matches!(m, Metrics::WaitedForRebootDuration(_)))
.collect::<Vec<_>>(),
vec![&Metrics::WaitedForRebootDuration(Duration::from_secs(999))]
);
pool.run_until(async {
let storage = storage.lock().await;
assert_eq!(storage.get_time(UPDATE_FINISH_TIME).await, None);
assert_eq!(storage.get_string(TARGET_VERSION).await, None);
assert!(storage.committed());
})
}
#[test]
fn run_cup_but_decoration_error() {
block_on(async {
let http = MockHttpRequest::new(HttpResponse::new(make_noupdate_httpresponse()));
let stub_cup_handler = MockCupv2Handler::new().set_decoration_error(|| {
Some(CupDecorationError::ParseError(
"".parse::<http::Uri>().unwrap_err(),
))
});
assert_matches!(
StateMachineBuilder::new_stub()
.http(http)
.cup_handler(Some(stub_cup_handler))
.oneshot(RequestParams::default())
.await,
Err(UpdateCheckError::OmahaRequest(
OmahaRequestError::CupDecoration(CupDecorationError::ParseError(_))
))
);
info!("update check complete!");
});
}
#[test]
fn run_cup_but_verification_error() {
block_on(async {
let http = MockHttpRequest::new(HttpResponse::new(make_noupdate_httpresponse()));
let stub_cup_handler = MockCupv2Handler::new()
.set_verification_error(|| Some(CupVerificationError::EtagHeaderMissing));
assert_matches!(
StateMachineBuilder::new_stub()
.http(http)
.cup_handler(Some(stub_cup_handler))
.oneshot(RequestParams::default())
.await,
Err(UpdateCheckError::OmahaRequest(
OmahaRequestError::CupValidation(CupVerificationError::EtagHeaderMissing)
))
);
info!("update check complete!");
});
}
#[test]
fn run_cup_valid() {
block_on(async {
let http = MockHttpRequest::new(HttpResponse::new(make_noupdate_httpresponse()));
assert_matches!(
StateMachineBuilder::new_stub()
.http(http)
.oneshot(RequestParams::default())
.await,
Ok(_)
);
info!("update check complete!");
});
}
}