1use crate::{
10 app_set::{AppSet, AppSetExt as _},
11 async_generator,
12 common::{App, CheckOptions, CheckTiming},
13 configuration::Config,
14 cup_ecdsa::{CupDecorationError, CupVerificationError, Cupv2Handler, RequestMetadata},
15 http_request::{self, HttpRequest},
16 installer::{AppInstallResult, Installer, Plan},
17 metrics::{ClockType, Metrics, MetricsReporter, UpdateCheckFailureReason},
18 policy::{CheckDecision, PolicyEngine, UpdateDecision},
19 protocol::{
20 self,
21 request::{Event, EventErrorCode, EventResult, EventType, InstallSource, GUID},
22 response::{parse_json_response, OmahaStatus, Response, UpdateCheck},
23 },
24 request_builder::{self, RequestBuilder, RequestParams},
25 storage::{Storage, StorageExt},
26 time::{ComplexTime, PartialComplexTime, TimeSource, Timer},
27};
28
29use anyhow::anyhow;
30use futures::{
31 channel::{mpsc, oneshot},
32 future::{self, BoxFuture, Fuse},
33 lock::Mutex,
34 prelude::*,
35 select,
36};
37use http::{response::Parts, Response as HttpResponse};
38use p256::ecdsa::DerSignature;
39use std::{
40 cmp::min,
41 collections::HashMap,
42 convert::TryInto,
43 rc::Rc,
44 str::Utf8Error,
45 time::{Duration, Instant, SystemTime},
46};
47use thiserror::Error;
48use tracing::{error, info, warn};
49
50pub mod update_check;
51
52mod builder;
53pub use builder::StateMachineBuilder;
54
55mod observer;
56use observer::StateMachineProgressObserver;
57pub use observer::{InstallProgress, StateMachineEvent};
58
59const INSTALL_PLAN_ID: &str = "install_plan_id";
60const UPDATE_FIRST_SEEN_TIME: &str = "update_first_seen_time";
61const UPDATE_FINISH_TIME: &str = "update_finish_time";
62const TARGET_VERSION: &str = "target_version";
63const CONSECUTIVE_FAILED_INSTALL_ATTEMPTS: &str = "consecutive_failed_install_attempts";
64const CHECK_REBOOT_ALLOWED_INTERVAL: Duration = Duration::from_secs(30 * 60);
66const X_RETRY_AFTER: &str = "X-Retry-After";
68const MAX_OMAHA_REQUEST_ATTEMPTS: u64 = 3;
70
71#[derive(Debug)]
74pub struct StateMachine<PE, HR, IN, TM, MR, ST, AS, CH>
75where
76 PE: PolicyEngine,
77 HR: HttpRequest,
78 IN: Installer,
79 TM: Timer,
80 MR: MetricsReporter,
81 ST: Storage,
82 AS: AppSet,
83{
84 config: Config,
86
87 policy_engine: PE,
88
89 http: HR,
90
91 installer: IN,
92
93 timer: TM,
94
95 time_source: PE::TimeSource,
96
97 metrics_reporter: MR,
98
99 storage_ref: Rc<Mutex<ST>>,
100
101 context: update_check::Context,
103
104 app_set: Rc<Mutex<AS>>,
107
108 cup_handler: Option<CH>,
109}
110
111#[derive(Copy, Clone, Debug, Eq, PartialEq)]
112pub enum State {
113 Idle,
114 CheckingForUpdates(InstallSource),
115 ErrorCheckingForUpdate,
116 NoUpdateAvailable,
117 InstallationDeferredByPolicy,
118 InstallingUpdate,
119 WaitingForReboot,
120 InstallationError,
121}
122
123#[derive(Error, Debug)]
126pub enum OmahaRequestError {
127 #[error("Unexpected JSON error constructing update check")]
128 Json(#[from] serde_json::Error),
129
130 #[error("Error building update check HTTP request")]
131 HttpBuilder(#[from] http::Error),
132
133 #[error("Error decorating outgoing request with CUPv2 parameters")]
134 CupDecoration(#[from] CupDecorationError),
135
136 #[error("Error validating incoming response with CUPv2 protocol")]
137 CupValidation(#[from] CupVerificationError),
138
139 #[error("HTTP transport error performing update check")]
141 HttpTransport(#[from] http_request::Error),
142
143 #[error("HTTP error performing update check: {0}")]
144 HttpStatus(hyper::StatusCode),
145}
146
147impl From<request_builder::Error> for OmahaRequestError {
148 fn from(err: request_builder::Error) -> Self {
149 match err {
150 request_builder::Error::Json(e) => OmahaRequestError::Json(e),
151 request_builder::Error::Http(e) => OmahaRequestError::HttpBuilder(e),
152 request_builder::Error::Cup(e) => OmahaRequestError::CupDecoration(e),
153 }
154 }
155}
156
157impl From<http::StatusCode> for OmahaRequestError {
158 fn from(sc: http::StatusCode) -> Self {
159 OmahaRequestError::HttpStatus(sc)
160 }
161}
162
163#[derive(Error, Debug)]
166pub enum ResponseParseError {
167 #[error("Response was not valid UTF-8")]
168 Utf8(#[from] Utf8Error),
169
170 #[error("Unexpected JSON error parsing update check response")]
171 Json(#[from] serde_json::Error),
172}
173
174#[derive(Error, Debug)]
175pub enum UpdateCheckError {
176 #[error("Error checking with Omaha")]
177 OmahaRequest(#[from] OmahaRequestError),
178
179 #[error("Error parsing Omaha response")]
180 ResponseParser(#[from] ResponseParseError),
181
182 #[error("Unable to create an install plan")]
183 InstallPlan(#[source] anyhow::Error),
184}
185
186#[derive(Clone)]
188pub struct ControlHandle(mpsc::Sender<ControlRequest>);
189
190#[derive(Debug, Clone, Error, PartialEq, Eq)]
192#[error("state machine dropped before all its control handles")]
193pub struct StateMachineGone;
194
195impl From<mpsc::SendError> for StateMachineGone {
196 fn from(_: mpsc::SendError) -> Self {
197 StateMachineGone
198 }
199}
200
201impl From<oneshot::Canceled> for StateMachineGone {
202 fn from(_: oneshot::Canceled) -> Self {
203 StateMachineGone
204 }
205}
206
207enum ControlRequest {
208 StartUpdateCheck {
209 options: CheckOptions,
210 responder: oneshot::Sender<StartUpdateCheckResponse>,
211 },
212}
213
214#[derive(Debug, Clone, PartialEq, Eq)]
216pub enum StartUpdateCheckResponse {
217 Started,
219
220 AlreadyRunning,
223
224 Throttled,
226}
227
228impl ControlHandle {
229 pub async fn start_update_check(
232 &mut self,
233 options: CheckOptions,
234 ) -> Result<StartUpdateCheckResponse, StateMachineGone> {
235 let (responder, receive_response) = oneshot::channel();
236 self.0
237 .send(ControlRequest::StartUpdateCheck { options, responder })
238 .await?;
239 Ok(receive_response.await?)
240 }
241}
242
243#[derive(Debug)]
244enum RebootAfterUpdate<T> {
245 Needed(T),
246 NotNeeded,
247}
248
249impl<PE, HR, IN, TM, MR, ST, AS, IR, PL, CH> StateMachine<PE, HR, IN, TM, MR, ST, AS, CH>
250where
251 PE: PolicyEngine<InstallResult = IR, InstallPlan = PL>,
252 HR: HttpRequest,
253 IN: Installer<InstallResult = IR, InstallPlan = PL>,
254 TM: Timer,
255 MR: MetricsReporter,
256 ST: Storage,
257 AS: AppSet,
258 CH: Cupv2Handler,
259 IR: 'static + Send,
260 PL: Plan,
261{
262 async fn update_next_update_time(
264 &mut self,
265 co: &mut async_generator::Yield<StateMachineEvent>,
266 ) -> CheckTiming {
267 let apps = self.app_set.lock().await.get_apps();
268 let timing = self
269 .policy_engine
270 .compute_next_update_time(&apps, &self.context.schedule, &self.context.state)
271 .await;
272 self.context.schedule.next_update_time = Some(timing);
273
274 co.yield_(StateMachineEvent::ScheduleChange(self.context.schedule))
275 .await;
276 info!("Calculated check timing: {}", timing);
277 timing
278 }
279
280 async fn make_wait_to_next_check(
282 &mut self,
283 check_timing: CheckTiming,
284 ) -> Fuse<BoxFuture<'static, ()>> {
285 if let Some(minimum_wait) = check_timing.minimum_wait {
286 future::join(
290 self.timer.wait_for(minimum_wait),
291 self.timer.wait_until(check_timing.time),
292 )
293 .map(|_| ())
294 .boxed()
295 .fuse()
296 } else {
297 self.timer.wait_until(check_timing.time).fuse()
300 }
301 }
302
303 async fn run(
304 mut self,
305 mut control: mpsc::Receiver<ControlRequest>,
306 mut co: async_generator::Yield<StateMachineEvent>,
307 ) {
308 {
309 let app_set = self.app_set.lock().await;
310 if !app_set.all_valid() {
311 error!(
312 "App set not valid, not starting state machine: {:#?}",
313 app_set.get_apps()
314 );
315 return;
316 }
317 }
318
319 let state_machine_start_monotonic_time = self.time_source.now_in_monotonic();
320
321 let mut should_report_waited_for_reboot_duration = false;
322
323 let update_finish_time = {
324 let storage = self.storage_ref.lock().await;
325 let update_finish_time = storage.get_time(UPDATE_FINISH_TIME).await;
326 if update_finish_time.is_some() {
327 if let Some(target_version) = storage.get_string(TARGET_VERSION).await {
328 if target_version == self.config.os.version {
329 should_report_waited_for_reboot_duration = true;
330 }
331 }
332 }
333 update_finish_time
334 };
335
336 loop {
337 info!("Initial context: {:?}", self.context);
338
339 if should_report_waited_for_reboot_duration {
340 match self.report_waited_for_reboot_duration(
341 update_finish_time.unwrap(),
342 state_machine_start_monotonic_time,
343 self.time_source.now(),
344 ) {
345 Ok(()) => {
346 should_report_waited_for_reboot_duration = false;
348
349 let mut storage = self.storage_ref.lock().await;
350 storage.remove_or_log(UPDATE_FINISH_TIME).await;
351 storage.remove_or_log(TARGET_VERSION).await;
352 storage.commit_or_log().await;
353 }
354 Err(e) => {
355 warn!(
356 "Couldn't report wait for reboot duration: {:#}, will try again",
357 e
358 );
359 }
360 }
361 }
362
363 let (mut options, responder) = {
364 let check_timing = self.update_next_update_time(&mut co).await;
365 let mut wait_to_next_check = self.make_wait_to_next_check(check_timing).await;
366
367 select! {
370 () = wait_to_next_check => (CheckOptions::default(), None),
371 ControlRequest::StartUpdateCheck{options, responder} = control.select_next_some() => {
372 (options, Some(responder))
373 }
374 }
375 };
376
377 let reboot_after_update = {
378 let apps = self.app_set.lock().await.get_apps();
379 info!(
380 "Checking to see if an update check is allowed at this time for {:?}",
381 apps
382 );
383 let decision = self
384 .policy_engine
385 .update_check_allowed(
386 &apps,
387 &self.context.schedule,
388 &self.context.state,
389 &options,
390 )
391 .await;
392
393 info!("The update check decision is: {:?}", decision);
394
395 let request_params = match decision {
396 CheckDecision::Ok(rp) | CheckDecision::OkUpdateDeferred(rp) => rp,
398
399 CheckDecision::TooSoon
401 | CheckDecision::ThrottledByPolicy
402 | CheckDecision::DeniedByPolicy => {
403 info!("The update check is not allowed at this time.");
404 if let Some(responder) = responder {
405 let _ = responder.send(StartUpdateCheckResponse::Throttled);
406 }
407 continue;
408 }
409 };
410 if let Some(responder) = responder {
411 let _ = responder.send(StartUpdateCheckResponse::Started);
412 }
413
414 let update_check = self.start_update_check(request_params, &mut co).fuse();
416 futures::pin_mut!(update_check);
417
418 loop {
421 select! {
422 update_check_result = update_check => break update_check_result,
423 ControlRequest::StartUpdateCheck{
424 options: new_options,
425 responder
426 } = control.select_next_some() => {
427 if new_options.source == InstallSource::OnDemand {
428 info!("Got on demand update check request, ensuring ongoing check is on demand");
429 options.source = InstallSource::OnDemand;
431 }
432
433 let _ = responder.send(StartUpdateCheckResponse::AlreadyRunning);
434 }
435 }
436 }
437 };
438
439 if let RebootAfterUpdate::Needed(install_result) = reboot_after_update {
440 Self::yield_state(State::WaitingForReboot, &mut co).await;
441 self.wait_for_reboot(options, &mut control, install_result, &mut co)
442 .await;
443 }
444
445 Self::yield_state(State::Idle, &mut co).await;
446 }
447 }
448
449 async fn wait_for_reboot(
450 &mut self,
451 mut options: CheckOptions,
452 control: &mut mpsc::Receiver<ControlRequest>,
453 install_result: IN::InstallResult,
454 co: &mut async_generator::Yield<StateMachineEvent>,
455 ) {
456 if !self
457 .policy_engine
458 .reboot_allowed(&options, &install_result)
459 .await
460 {
461 let wait_to_see_if_reboot_allowed =
462 self.timer.wait_for(CHECK_REBOOT_ALLOWED_INTERVAL).fuse();
463 futures::pin_mut!(wait_to_see_if_reboot_allowed);
464
465 let check_timing = self.update_next_update_time(co).await;
466 let wait_to_next_ping = self.make_wait_to_next_check(check_timing).await;
467 futures::pin_mut!(wait_to_next_ping);
468
469 loop {
470 select! {
474 () = wait_to_see_if_reboot_allowed => {
475 if self.policy_engine.reboot_allowed(&options, &install_result).await {
476 break;
477 }
478 info!("Reboot not allowed at the moment, will try again in 30 minutes...");
479 wait_to_see_if_reboot_allowed.set(
480 self.timer.wait_for(CHECK_REBOOT_ALLOWED_INTERVAL).fuse()
481 );
482 },
483 () = wait_to_next_ping => {
484 self.ping_omaha(co).await;
485 let check_timing = self.update_next_update_time(co).await;
486 wait_to_next_ping.set(self.make_wait_to_next_check(check_timing).await);
487 },
488 ControlRequest::StartUpdateCheck{
489 options: new_options,
490 responder
491 } = control.select_next_some() => {
492 let _ = responder.send(StartUpdateCheckResponse::AlreadyRunning);
493 if new_options.source == InstallSource::OnDemand {
494 info!("Waiting for reboot, but ensuring that InstallSource is OnDemand");
495 options.source = InstallSource::OnDemand;
496
497 if self.policy_engine.reboot_allowed(&options, &install_result).await {
498 info!("Upgraded update check request to on demand, policy allowed reboot");
499 break;
500 }
501 };
502 }
503 }
504 }
505 }
506 info!("Rebooting the system at the end of a successful update");
507 if let Err(e) = self.installer.perform_reboot().await {
508 error!("Unable to reboot the system: {}", e);
509 }
510 }
511
512 fn report_waited_for_reboot_duration(
517 &mut self,
518 update_finish_time: SystemTime,
519 state_machine_start_monotonic_time: Instant,
520 now: ComplexTime,
521 ) -> Result<(), anyhow::Error> {
522 let update_finish_time_to_now =
525 now.wall_duration_since(update_finish_time).map_err(|e| {
526 anyhow!(
527 "Update finish time later than now, can't report waited for reboot duration,
528 update finish time: {:?}, now: {:?}, error: {:?}",
529 update_finish_time,
530 now,
531 e,
532 )
533 })?;
534
535 let state_machine_start_to_now = now
544 .mono
545 .checked_duration_since(state_machine_start_monotonic_time)
546 .ok_or_else(|| {
547 error!("Monotonic time appears to have gone backwards");
548 anyhow!(
549 "State machine start later than now, can't report waited for reboot duration. \
550 State machine start: {:?}, now: {:?}",
551 state_machine_start_monotonic_time,
552 now.mono,
553 )
554 })?;
555
556 let waited_for_reboot_duration = update_finish_time_to_now
557 .checked_sub(state_machine_start_to_now)
558 .ok_or_else(|| {
559 anyhow!(
560 "Can't report waiting for reboot duration, update finish time to now smaller \
561 than state machine start to now. Update finish time to now: {:?}, state \
562 machine start to now: {:?}",
563 update_finish_time_to_now,
564 state_machine_start_to_now,
565 )
566 })?;
567
568 info!(
569 "Waited {} seconds for reboot.",
570 waited_for_reboot_duration.as_secs()
571 );
572 self.report_metrics(Metrics::WaitedForRebootDuration(waited_for_reboot_duration));
573 Ok(())
574 }
575
576 async fn report_check_interval(&mut self, install_source: InstallSource) {
579 let now = self.time_source.now();
580
581 match self.context.schedule.last_update_check_time {
582 Some(PartialComplexTime::Wall(t)) => match now.wall_duration_since(t) {
585 Ok(interval) => self.report_metrics(Metrics::UpdateCheckInterval {
586 interval,
587 clock: ClockType::Wall,
588 install_source,
589 }),
590 Err(e) => warn!("Last check time is in the future: {}", e),
591 },
592
593 Some(PartialComplexTime::Complex(t)) => match now.mono.checked_duration_since(t.mono) {
598 Some(interval) => self.report_metrics(Metrics::UpdateCheckInterval {
599 interval,
600 clock: ClockType::Monotonic,
601 install_source,
602 }),
603 None => error!("Monotonic time in the past"),
604 },
605
606 _ => {}
612 }
613
614 self.context.schedule.last_update_check_time = now.into();
615 }
616
617 async fn start_update_check(
621 &mut self,
622 request_params: RequestParams,
623 co: &mut async_generator::Yield<StateMachineEvent>,
624 ) -> RebootAfterUpdate<IN::InstallResult> {
625 let apps = self.app_set.lock().await.get_apps();
626 let result = self.perform_update_check(request_params, apps, co).await;
627
628 let (result, reboot_after_update) = match result {
629 Ok((result, reboot_after_update)) => {
630 info!("Update check result: {:?}", result);
631 self.context.schedule.last_update_time = Some(self.time_source.now().into());
633
634 let install_success =
636 result.app_responses.iter().fold(None, |result, app| {
637 match (result, &app.result) {
638 (_, update_check::Action::InstallPlanExecutionError) => Some(false),
639 (None, update_check::Action::Updated) => Some(true),
640 (result, _) => result,
641 }
642 });
643
644 self.report_attempts_to_successful_check(true).await;
647
648 self.app_set
649 .lock()
650 .await
651 .update_from_omaha(&result.app_responses);
652
653 if let Some(success) = install_success {
656 self.report_attempts_to_successful_install(success).await;
657 }
658
659 (Ok(result), reboot_after_update)
660 }
662 Err(error) => {
663 error!("Update check failed: {:?}", error);
664
665 let failure_reason = match &error {
666 UpdateCheckError::ResponseParser(_) | UpdateCheckError::InstallPlan(_) => {
667 self.context.schedule.last_update_time =
669 Some(self.time_source.now().into());
670
671 UpdateCheckFailureReason::Omaha
672 }
673 UpdateCheckError::OmahaRequest(request_error) => match request_error {
674 OmahaRequestError::Json(_)
675 | OmahaRequestError::HttpBuilder(_)
676 | OmahaRequestError::CupDecoration(_)
677 | OmahaRequestError::CupValidation(_) => UpdateCheckFailureReason::Internal,
678 OmahaRequestError::HttpTransport(_) | OmahaRequestError::HttpStatus(_) => {
679 UpdateCheckFailureReason::Network
680 }
681 },
682 };
683 self.report_metrics(Metrics::UpdateCheckFailureReason(failure_reason));
684
685 self.report_attempts_to_successful_check(false).await;
686 (Err(error), RebootAfterUpdate::NotNeeded)
687 }
688 };
689
690 co.yield_(StateMachineEvent::ScheduleChange(self.context.schedule))
691 .await;
692 co.yield_(StateMachineEvent::ProtocolStateChange(
693 self.context.state.clone(),
694 ))
695 .await;
696 co.yield_(StateMachineEvent::UpdateCheckResult(result))
697 .await;
698
699 self.persist_data().await;
700
701 reboot_after_update
702 }
703
704 async fn report_attempts_to_successful_check(&mut self, success: bool) {
707 let attempts = self.context.state.consecutive_failed_update_checks + 1;
708 if success {
709 self.context.state.consecutive_failed_update_checks = 0;
710 self.report_metrics(Metrics::AttemptsToSuccessfulCheck(attempts as u64));
711 } else {
712 self.context.state.consecutive_failed_update_checks = attempts;
713 }
714 }
715
716 async fn report_attempts_to_successful_install(&mut self, success: bool) {
719 let storage_ref = self.storage_ref.clone();
720 let mut storage = storage_ref.lock().await;
721 let attempts = storage
722 .get_int(CONSECUTIVE_FAILED_INSTALL_ATTEMPTS)
723 .await
724 .unwrap_or(0)
725 + 1;
726
727 self.report_metrics(Metrics::AttemptsToSuccessfulInstall {
728 count: attempts as u64,
729 successful: success,
730 });
731
732 if success {
733 storage
734 .remove_or_log(CONSECUTIVE_FAILED_INSTALL_ATTEMPTS)
735 .await;
736 } else if let Err(e) = storage
737 .set_int(CONSECUTIVE_FAILED_INSTALL_ATTEMPTS, attempts)
738 .await
739 {
740 error!(
741 "Unable to persist {}: {}",
742 CONSECUTIVE_FAILED_INSTALL_ATTEMPTS, e
743 );
744 }
745 }
746
747 async fn persist_data(&self) {
749 let mut storage = self.storage_ref.lock().await;
750 self.context.persist(&mut *storage).await;
751 self.app_set.lock().await.persist(&mut *storage).await;
752
753 storage.commit_or_log().await;
754 }
755
756 async fn perform_update_check(
759 &mut self,
760 request_params: RequestParams,
761 apps: Vec<App>,
762 co: &mut async_generator::Yield<StateMachineEvent>,
763 ) -> Result<(update_check::Response, RebootAfterUpdate<IN::InstallResult>), UpdateCheckError>
764 {
765 Self::yield_state(State::CheckingForUpdates(request_params.source), co).await;
766
767 self.report_check_interval(request_params.source).await;
768
769 let config = self.config.clone();
771 let mut request_builder = RequestBuilder::new(&config, &request_params);
772 for app in &apps {
773 request_builder = request_builder.add_update_check(app).add_ping(app);
774 }
775 let session_id = GUID::new();
776 request_builder = request_builder.session_id(session_id.clone());
777
778 let mut omaha_request_attempt = 1;
779
780 let loop_result = loop {
783 let omaha_check_start_time = self.time_source.now_in_monotonic();
785 request_builder = request_builder.request_id(GUID::new());
786 let result = self
787 .do_omaha_request_and_update_context(&request_builder, co)
788 .await;
789
790 {
792 let now = self.time_source.now_in_monotonic();
795 let duration = now.checked_duration_since(omaha_check_start_time);
796
797 if let Some(response_time) = duration {
798 self.report_metrics(Metrics::UpdateCheckResponseTime {
799 response_time,
800 successful: result.is_ok(),
801 });
802 } else {
803 error!(
805 "now: {:?}, is before omaha_check_start_time: {:?}",
806 now, omaha_check_start_time
807 );
808 }
809 }
810
811 match result {
812 Ok(res) => {
813 break Ok(res);
814 }
815 Err(OmahaRequestError::Json(e)) => {
816 error!("Unable to construct request body! {:?}", e);
817 Self::yield_state(State::ErrorCheckingForUpdate, co).await;
818 break Err(UpdateCheckError::OmahaRequest(e.into()));
819 }
820 Err(OmahaRequestError::HttpBuilder(e)) => {
821 error!("Unable to construct HTTP request! {:?}", e);
822 Self::yield_state(State::ErrorCheckingForUpdate, co).await;
823 break Err(UpdateCheckError::OmahaRequest(e.into()));
824 }
825 Err(OmahaRequestError::CupDecoration(e)) => {
826 error!(
827 "Unable to decorate HTTP request with CUPv2 parameters! {:?}",
828 e
829 );
830 Self::yield_state(State::ErrorCheckingForUpdate, co).await;
831 break Err(UpdateCheckError::OmahaRequest(e.into()));
832 }
833 Err(OmahaRequestError::CupValidation(e)) => {
834 error!(
835 "Unable to validate HTTP response with CUPv2 parameters! {:?}",
836 e
837 );
838 Self::yield_state(State::ErrorCheckingForUpdate, co).await;
839 break Err(UpdateCheckError::OmahaRequest(e.into()));
840 }
841 Err(OmahaRequestError::HttpTransport(e)) => {
842 warn!("Unable to contact Omaha: {:?}", e);
843 if omaha_request_attempt >= MAX_OMAHA_REQUEST_ATTEMPTS
846 || e.is_user()
847 || self.context.state.server_dictated_poll_interval.is_some()
848 {
849 Self::yield_state(State::ErrorCheckingForUpdate, co).await;
850 break Err(UpdateCheckError::OmahaRequest(e.into()));
851 }
852 }
853 Err(OmahaRequestError::HttpStatus(e)) => {
854 warn!("Unable to contact Omaha: {:?}", e);
855 if omaha_request_attempt >= MAX_OMAHA_REQUEST_ATTEMPTS
856 || self.context.state.server_dictated_poll_interval.is_some()
857 {
858 Self::yield_state(State::ErrorCheckingForUpdate, co).await;
859 break Err(UpdateCheckError::OmahaRequest(e.into()));
860 }
861 }
862 }
863
864 let backoff_time_secs = 1 << (omaha_request_attempt - 1);
867 let backoff_time = randomize(backoff_time_secs * 1000, 1000);
868 info!("Waiting {} ms before retrying...", backoff_time);
869 self.timer
870 .wait_for(Duration::from_millis(backoff_time))
871 .await;
872
873 omaha_request_attempt += 1;
874 };
875
876 self.report_metrics(Metrics::RequestsPerCheck {
877 count: omaha_request_attempt,
878 successful: loop_result.is_ok(),
879 });
880
881 let (_parts, data, request_metadata, signature) = loop_result?;
882
883 let response = match Self::parse_omaha_response(&data) {
884 Ok(res) => res,
885 Err(err) => {
886 warn!("Unable to parse Omaha response: {:?}", err);
887 Self::yield_state(State::ErrorCheckingForUpdate, co).await;
888 self.report_omaha_event_and_update_context(
889 &request_params,
890 Event::error(EventErrorCode::ParseResponse),
891 &apps,
892 &session_id,
893 &apps.iter().map(|app| (app.id.clone(), None)).collect(),
894 None,
895 co,
896 )
897 .await;
898 return Err(UpdateCheckError::ResponseParser(err));
899 }
900 };
901
902 info!("result: {:?}", response);
903
904 co.yield_(StateMachineEvent::OmahaServerResponse(response.clone()))
905 .await;
906
907 let statuses = Self::get_app_update_statuses(&response);
908 for (app_id, status) in &statuses {
909 info!("Omaha update check status: {} => {:?}", app_id, status);
911 }
912
913 let apps_with_update: Vec<_> = response
914 .apps
915 .iter()
916 .filter(|app| {
917 matches!(
918 app.update_check,
919 Some(UpdateCheck {
920 status: OmahaStatus::Ok,
921 ..
922 })
923 )
924 })
925 .collect();
926
927 if apps_with_update.is_empty() {
928 Self::yield_state(State::NoUpdateAvailable, co).await;
931 Self::make_not_updated_result(response, update_check::Action::NoUpdate)
932 } else {
933 info!(
934 "At least one app has an update, proceeding to build and process an Install Plan"
935 );
936 let next_versions: HashMap<String, Option<String>> = apps_with_update
940 .iter()
941 .map(|app| (app.id.clone(), app.get_manifest_version()))
942 .collect();
943 let install_plan = match self
944 .installer
945 .try_create_install_plan(
946 &request_params,
947 request_metadata.as_ref(),
948 &response,
949 data,
950 signature.map(|s| s.as_bytes().to_vec()),
951 )
952 .await
953 {
954 Ok(plan) => plan,
955 Err(e) => {
956 error!("Unable to construct install plan! {}", e);
957 Self::yield_state(State::InstallingUpdate, co).await;
958 Self::yield_state(State::InstallationError, co).await;
959 self.report_omaha_event_and_update_context(
960 &request_params,
961 Event::error(EventErrorCode::ConstructInstallPlan),
962 &apps,
963 &session_id,
964 &next_versions,
965 None,
966 co,
967 )
968 .await;
969 return Err(UpdateCheckError::InstallPlan(e.into()));
970 }
971 };
972
973 info!("Validating Install Plan with Policy");
974 let install_plan_decision = self.policy_engine.update_can_start(&install_plan).await;
975 match install_plan_decision {
976 UpdateDecision::Ok => {
977 info!("Proceeding with install plan.");
978 }
979 UpdateDecision::DeferredByPolicy => {
980 info!("Install plan was deferred by Policy.");
981 let event = Event {
984 event_type: EventType::UpdateComplete,
985 event_result: EventResult::UpdateDeferred,
986 ..Event::default()
987 };
988 self.report_omaha_event_and_update_context(
989 &request_params,
990 event,
991 &apps,
992 &session_id,
993 &next_versions,
994 None,
995 co,
996 )
997 .await;
998
999 Self::yield_state(State::InstallationDeferredByPolicy, co).await;
1000
1001 return Self::make_not_updated_result(
1002 response,
1003 update_check::Action::DeferredByPolicy,
1004 );
1005 }
1006 UpdateDecision::DeniedByPolicy => {
1007 warn!("Install plan was denied by Policy, see Policy logs for reasoning");
1008 self.report_omaha_event_and_update_context(
1009 &request_params,
1010 Event::error(EventErrorCode::DeniedByPolicy),
1011 &apps,
1012 &session_id,
1013 &next_versions,
1014 None,
1015 co,
1016 )
1017 .await;
1018
1019 return Self::make_not_updated_result(
1020 response,
1021 update_check::Action::DeniedByPolicy,
1022 );
1023 }
1024 }
1025
1026 Self::yield_state(State::InstallingUpdate, co).await;
1027 self.report_omaha_event_and_update_context(
1028 &request_params,
1029 Event::success(EventType::UpdateDownloadStarted),
1030 &apps,
1031 &session_id,
1032 &next_versions,
1033 None,
1034 co,
1035 )
1036 .await;
1037
1038 let install_plan_id = install_plan.id();
1039 let update_start_time = self.time_source.now_in_walltime();
1040 let update_first_seen_time = self
1041 .record_update_first_seen_time(&install_plan_id, update_start_time)
1042 .await;
1043
1044 let (send, mut recv) = mpsc::channel(0);
1045 let observer = StateMachineProgressObserver(send);
1046 let perform_install = async {
1047 let result = self
1048 .installer
1049 .perform_install(&install_plan, Some(&observer))
1050 .await;
1051 drop(observer);
1053 result
1054 };
1055 let yield_progress = async {
1056 while let Some(progress) = recv.next().await {
1057 co.yield_(StateMachineEvent::InstallProgressChange(progress))
1058 .await;
1059 }
1060 };
1061
1062 let ((install_result, mut app_install_results), ()) =
1063 future::join(perform_install, yield_progress).await;
1064 let no_apps_failed = app_install_results.iter().all(|result| {
1065 matches!(
1066 result,
1067 AppInstallResult::Installed | AppInstallResult::Deferred
1068 )
1069 });
1070 let update_finish_time = self.time_source.now_in_walltime();
1071 let install_duration = match update_finish_time.duration_since(update_start_time) {
1072 Ok(duration) => {
1073 let metrics = if no_apps_failed {
1074 Metrics::SuccessfulUpdateDuration(duration)
1075 } else {
1076 Metrics::FailedUpdateDuration(duration)
1077 };
1078 self.report_metrics(metrics);
1079 Some(duration)
1080 }
1081 Err(e) => {
1082 warn!("Update start time is in the future: {}", e);
1083 None
1084 }
1085 };
1086
1087 let config = self.config.clone();
1088 let mut request_builder = RequestBuilder::new(&config, &request_params);
1089 let mut events = vec![];
1090 let mut installed_apps = vec![];
1091 for (response_app, app_install_result) in
1092 apps_with_update.iter().zip(&app_install_results)
1093 {
1094 match apps.iter().find(|app| app.id == response_app.id) {
1095 Some(app) => {
1096 let event = match app_install_result {
1097 AppInstallResult::Installed => {
1098 installed_apps.push(app);
1099 Event::success(EventType::UpdateDownloadFinished)
1100 }
1101 AppInstallResult::Deferred => Event {
1102 event_type: EventType::UpdateComplete,
1103 event_result: EventResult::UpdateDeferred,
1104 ..Event::default()
1105 },
1106 AppInstallResult::Failed(_) => {
1107 Event::error(EventErrorCode::Installation)
1108 }
1109 };
1110 let event = Event {
1111 previous_version: Some(app.version.to_string()),
1112 next_version: response_app.get_manifest_version(),
1113 download_time_ms: install_duration
1114 .and_then(|d| d.as_millis().try_into().ok()),
1115 ..event
1116 };
1117 request_builder = request_builder.add_event(app, event.clone());
1118 events.push(event);
1119 }
1120 None => {
1121 error!("unknown app id in omaha response: {:?}", response_app.id);
1122 }
1123 }
1124 }
1125 request_builder = request_builder
1126 .session_id(session_id.clone())
1127 .request_id(GUID::new());
1128 if let Err(e) = self
1129 .do_omaha_request_and_update_context(&request_builder, co)
1130 .await
1131 {
1132 for event in events {
1133 self.report_metrics(Metrics::OmahaEventLost(event));
1134 }
1135 warn!("Unable to report event to Omaha: {:?}", e);
1136 }
1137
1138 if !installed_apps.is_empty() {
1142 self.report_omaha_event_and_update_context(
1143 &request_params,
1144 Event::success(EventType::UpdateComplete),
1145 installed_apps,
1146 &session_id,
1147 &next_versions,
1148 install_duration,
1149 co,
1150 )
1151 .await;
1152 }
1153
1154 let mut errors = vec![];
1155 let daystart = response.daystart;
1156 let app_responses = response
1157 .apps
1158 .into_iter()
1159 .map(|app| update_check::AppResponse {
1160 app_id: app.id,
1161 cohort: app.cohort,
1162 user_counting: daystart.clone().into(),
1163 result: match app.update_check {
1164 Some(UpdateCheck {
1165 status: OmahaStatus::Ok,
1166 ..
1167 }) => match app_install_results.remove(0) {
1168 AppInstallResult::Installed => update_check::Action::Updated,
1169 AppInstallResult::Deferred => update_check::Action::DeferredByPolicy,
1170 AppInstallResult::Failed(e) => {
1171 errors.push(e);
1172 update_check::Action::InstallPlanExecutionError
1173 }
1174 },
1175 _ => update_check::Action::NoUpdate,
1176 },
1177 })
1178 .collect();
1179
1180 if !errors.is_empty() {
1181 for e in errors {
1182 co.yield_(StateMachineEvent::InstallerError(Some(Box::new(e))))
1183 .await;
1184 }
1185 Self::yield_state(State::InstallationError, co).await;
1186
1187 return Ok((
1188 update_check::Response { app_responses },
1189 RebootAfterUpdate::NotNeeded,
1190 ));
1191 }
1192
1193 match update_finish_time.duration_since(update_first_seen_time) {
1194 Ok(duration) => {
1195 self.report_metrics(Metrics::SuccessfulUpdateFromFirstSeen(duration))
1196 }
1197 Err(e) => warn!("Update first seen time is in the future: {}", e),
1198 }
1199 {
1200 let mut storage = self.storage_ref.lock().await;
1201 if let Err(e) = storage
1202 .set_time(UPDATE_FINISH_TIME, update_finish_time)
1203 .await
1204 {
1205 error!("Unable to persist {}: {}", UPDATE_FINISH_TIME, e);
1206 }
1207 let app_set = self.app_set.lock().await;
1208 let system_app_id = app_set.get_system_app_id();
1209 if let Some(next_version) = next_versions.get(system_app_id) {
1211 let target_version = next_version.as_deref().unwrap_or_else(|| {
1212 error!("Target version string not found in Omaha response.");
1213 "UNKNOWN"
1214 });
1215 if let Err(e) = storage.set_string(TARGET_VERSION, target_version).await {
1216 error!("Unable to persist {}: {}", TARGET_VERSION, e);
1217 }
1218 }
1219 storage.commit_or_log().await;
1220 }
1221
1222 let reboot_after_update = if self.policy_engine.reboot_needed(&install_plan).await {
1223 RebootAfterUpdate::Needed(install_result)
1224 } else {
1225 RebootAfterUpdate::NotNeeded
1226 };
1227
1228 Ok((
1229 update_check::Response { app_responses },
1230 reboot_after_update,
1231 ))
1232 }
1233 }
1234
1235 #[allow(clippy::too_many_arguments)]
1238 async fn report_omaha_event_and_update_context<'a>(
1239 &'a mut self,
1240 request_params: &'a RequestParams,
1241 event: Event,
1242 apps: impl IntoIterator<Item = &App>,
1243 session_id: &GUID,
1244 next_versions: &HashMap<String, Option<String>>,
1245 install_duration: Option<Duration>,
1246 co: &mut async_generator::Yield<StateMachineEvent>,
1247 ) {
1248 let config = self.config.clone();
1249 let mut request_builder = RequestBuilder::new(&config, request_params);
1250 for app in apps {
1251 if let Some(next_version) = next_versions.get(&app.id) {
1253 let event = Event {
1254 previous_version: Some(app.version.to_string()),
1255 next_version: next_version.clone(),
1256 download_time_ms: install_duration.and_then(|d| d.as_millis().try_into().ok()),
1257 ..event.clone()
1258 };
1259 request_builder = request_builder.add_event(app, event);
1260 }
1261 }
1262 request_builder = request_builder
1263 .session_id(session_id.clone())
1264 .request_id(GUID::new());
1265 if let Err(e) = self
1266 .do_omaha_request_and_update_context(&request_builder, co)
1267 .await
1268 {
1269 self.report_metrics(Metrics::OmahaEventLost(event));
1270 warn!("Unable to report event to Omaha: {:?}", e);
1271 }
1272 }
1273
1274 async fn ping_omaha(&mut self, co: &mut async_generator::Yield<StateMachineEvent>) {
1276 let apps = self.app_set.lock().await.get_apps();
1277 let request_params = RequestParams {
1278 source: InstallSource::ScheduledTask,
1279 use_configured_proxies: true,
1280 disable_updates: false,
1281 offer_update_if_same_version: false,
1282 };
1283 let config = self.config.clone();
1284 let mut request_builder = RequestBuilder::new(&config, &request_params);
1285 for app in &apps {
1286 request_builder = request_builder.add_ping(app);
1287 }
1288 request_builder = request_builder
1289 .session_id(GUID::new())
1290 .request_id(GUID::new());
1291
1292 let (_parts, data, _request_metadata, _signature) = match self
1293 .do_omaha_request_and_update_context(&request_builder, co)
1294 .await
1295 {
1296 Ok(res) => res,
1297 Err(e) => {
1298 error!("Ping Omaha failed: {:#}", anyhow!(e));
1299 self.context.state.consecutive_failed_update_checks += 1;
1300 self.persist_data().await;
1301 return;
1302 }
1303 };
1304
1305 let response = match Self::parse_omaha_response(&data) {
1306 Ok(res) => res,
1307 Err(e) => {
1308 error!("Unable to parse Omaha response: {:#}", anyhow!(e));
1309 self.context.state.consecutive_failed_update_checks += 1;
1310 self.persist_data().await;
1311 return;
1312 }
1313 };
1314
1315 self.context.state.consecutive_failed_update_checks = 0;
1316
1317 self.context.schedule.last_update_time = Some(self.time_source.now().into());
1320 co.yield_(StateMachineEvent::ScheduleChange(self.context.schedule))
1321 .await;
1322
1323 let app_responses = Self::make_app_responses(response, update_check::Action::NoUpdate);
1324 self.app_set.lock().await.update_from_omaha(&app_responses);
1325
1326 self.persist_data().await;
1327 }
1328
1329 async fn do_omaha_request_and_update_context<'a>(
1342 &'a mut self,
1343 builder: &RequestBuilder<'a>,
1344 co: &mut async_generator::Yield<StateMachineEvent>,
1345 ) -> Result<
1346 (
1347 Parts,
1348 Vec<u8>,
1349 Option<RequestMetadata>,
1350 Option<DerSignature>,
1351 ),
1352 OmahaRequestError,
1353 > {
1354 let (request, request_metadata) = builder.build(self.cup_handler.as_ref())?;
1355 let response = Self::make_request(&mut self.http, request).await?;
1356
1357 let signature: Option<DerSignature> = if let (Some(handler), Some(metadata)) =
1358 (self.cup_handler.as_ref(), &request_metadata)
1359 {
1360 let signature = handler
1361 .verify_response(metadata, &response, metadata.public_key_id)
1362 .map_err(|e| {
1363 error!("Could not verify response: {:?}", e);
1364 e
1365 })?;
1366 Some(signature)
1367 } else {
1368 None
1369 };
1370
1371 let (parts, body) = response.into_parts();
1372
1373 let server_dictated_poll_interval = parts.headers.get(X_RETRY_AFTER).and_then(|header| {
1375 match header
1376 .to_str()
1377 .map_err(|e| anyhow!(e))
1378 .and_then(|s| s.parse::<u64>().map_err(|e| anyhow!(e)))
1379 {
1380 Ok(seconds) => {
1381 Some(Duration::from_secs(min(seconds, 86400)))
1384 }
1385 Err(e) => {
1386 error!("Unable to parse {} header: {:#}", X_RETRY_AFTER, e);
1387 None
1388 }
1389 }
1390 });
1391 if self.context.state.server_dictated_poll_interval != server_dictated_poll_interval {
1392 self.context.state.server_dictated_poll_interval = server_dictated_poll_interval;
1393 co.yield_(StateMachineEvent::ProtocolStateChange(
1394 self.context.state.clone(),
1395 ))
1396 .await;
1397 let mut storage = self.storage_ref.lock().await;
1398 self.context.persist(&mut *storage).await;
1399 storage.commit_or_log().await;
1400 }
1401 if !parts.status.is_success() {
1402 Err(OmahaRequestError::HttpStatus(parts.status))
1404 } else {
1405 info!("Omaha HTTP response: {}", parts.status);
1407 Ok((parts, body, request_metadata, signature))
1408 }
1409 }
1410
1411 async fn make_request(
1417 http_client: &mut HR,
1418 request: http::Request<hyper::Body>,
1419 ) -> Result<HttpResponse<Vec<u8>>, http_request::Error> {
1420 info!("Making http request to: {}", request.uri());
1421 http_client.request(request).await.map_err(|err| {
1422 warn!("Unable to perform request: {}", err);
1423 err
1424 })
1425 }
1426
1427 fn parse_omaha_response(data: &[u8]) -> Result<Response, ResponseParseError> {
1431 parse_json_response(data).map_err(ResponseParseError::Json)
1432 }
1433
1434 fn get_app_update_statuses(response: &Response) -> Vec<(&str, &OmahaStatus)> {
1437 response
1438 .apps
1439 .iter()
1440 .filter_map(|app| {
1441 app.update_check
1442 .as_ref()
1443 .map(|u| (app.id.as_str(), &u.status))
1444 })
1445 .collect()
1446 }
1447
1448 fn make_app_responses(
1454 response: protocol::response::Response,
1455 action: update_check::Action,
1456 ) -> Vec<update_check::AppResponse> {
1457 let daystart = response.daystart;
1458 response
1459 .apps
1460 .into_iter()
1461 .map(|app| update_check::AppResponse {
1462 app_id: app.id,
1463 cohort: app.cohort,
1464 user_counting: daystart.clone().into(),
1465 result: action.clone(),
1466 })
1467 .collect()
1468 }
1469
1470 fn make_not_updated_result(
1472 response: protocol::response::Response,
1473 action: update_check::Action,
1474 ) -> Result<(update_check::Response, RebootAfterUpdate<IN::InstallResult>), UpdateCheckError>
1475 {
1476 Ok((
1477 update_check::Response {
1478 app_responses: Self::make_app_responses(response, action),
1479 },
1480 RebootAfterUpdate::NotNeeded,
1481 ))
1482 }
1483
1484 async fn yield_state(state: State, co: &mut async_generator::Yield<StateMachineEvent>) {
1486 co.yield_(StateMachineEvent::StateChange(state)).await;
1487 }
1488
1489 fn report_metrics(&mut self, metrics: Metrics) {
1490 if let Err(err) = self.metrics_reporter.report_metrics(metrics) {
1491 warn!("Unable to report metrics: {:?}", err);
1492 }
1493 }
1494
1495 async fn record_update_first_seen_time(
1496 &mut self,
1497 install_plan_id: &str,
1498 now: SystemTime,
1499 ) -> SystemTime {
1500 let mut storage = self.storage_ref.lock().await;
1501 let previous_id = storage.get_string(INSTALL_PLAN_ID).await;
1502 if let Some(previous_id) = previous_id {
1503 if previous_id == install_plan_id {
1504 return storage
1505 .get_time(UPDATE_FIRST_SEEN_TIME)
1506 .await
1507 .unwrap_or(now);
1508 }
1509 }
1510 if let Err(e) = storage.set_string(INSTALL_PLAN_ID, install_plan_id).await {
1512 error!("Unable to persist {}: {}", INSTALL_PLAN_ID, e);
1513 return now;
1514 }
1515 if let Err(e) = storage.set_time(UPDATE_FIRST_SEEN_TIME, now).await {
1516 error!("Unable to persist {}: {}", UPDATE_FIRST_SEEN_TIME, e);
1517 let _ = storage.remove(INSTALL_PLAN_ID).await;
1518 return now;
1519 }
1520 storage.commit_or_log().await;
1521 now
1522 }
1523}
1524
1525fn randomize(n: u64, range: u64) -> u64 {
1527 n - range / 2 + rand::random::<u64>() % range
1528}
1529
1530#[cfg(test)]
1531impl<PE, HR, IN, TM, MR, ST, AS, IR, PL, CH> StateMachine<PE, HR, IN, TM, MR, ST, AS, CH>
1532where
1533 PE: PolicyEngine<InstallResult = IR, InstallPlan = PL>,
1534 HR: HttpRequest,
1535 IN: Installer<InstallResult = IR, InstallPlan = PL>,
1536 TM: Timer,
1537 MR: MetricsReporter,
1538 ST: Storage,
1539 AS: AppSet,
1540 CH: Cupv2Handler,
1541 IR: 'static + Send,
1542 PL: Plan,
1543{
1544 async fn oneshot(
1546 &mut self,
1547 request_params: RequestParams,
1548 ) -> Result<(update_check::Response, RebootAfterUpdate<IN::InstallResult>), UpdateCheckError>
1549 {
1550 let apps = self.app_set.lock().await.get_apps();
1551
1552 async_generator::generate(move |mut co| async move {
1553 self.perform_update_check(request_params, apps, &mut co)
1554 .await
1555 })
1556 .into_complete()
1557 .await
1558 }
1559
1560 async fn run_once(&mut self) {
1562 let request_params = RequestParams::default();
1563
1564 async_generator::generate(move |mut co| async move {
1565 self.start_update_check(request_params, &mut co).await;
1566 })
1567 .map(|_| ())
1568 .collect::<()>()
1569 .await;
1570 }
1571}
1572
1573#[cfg(test)]
1574mod tests {
1575 use super::update_check::{
1576 Action, CONSECUTIVE_FAILED_UPDATE_CHECKS, LAST_UPDATE_TIME, SERVER_DICTATED_POLL_INTERVAL,
1577 };
1578 use super::*;
1579 use crate::{
1580 app_set::VecAppSet,
1581 common::{
1582 App, CheckOptions, PersistedApp, ProtocolState, UpdateCheckSchedule, UserCounting,
1583 },
1584 configuration::Updater,
1585 cup_ecdsa::test_support::{make_cup_handler_for_test, MockCupv2Handler},
1586 http_request::mock::MockHttpRequest,
1587 installer::{
1588 stub::{StubInstallErrors, StubInstaller, StubPlan},
1589 ProgressObserver,
1590 },
1591 metrics::MockMetricsReporter,
1592 policy::{MockPolicyEngine, StubPolicyEngine},
1593 protocol::{request::OS, response, Cohort},
1594 storage::MemStorage,
1595 time::{
1596 timers::{BlockingTimer, MockTimer, RequestedWait},
1597 MockTimeSource, PartialComplexTime,
1598 },
1599 version::Version,
1600 };
1601 use assert_matches::assert_matches;
1602 use futures::executor::{block_on, LocalPool};
1603 use futures::future::LocalBoxFuture;
1604 use futures::task::LocalSpawnExt;
1605 use pretty_assertions::assert_eq;
1606 use serde_json::json;
1607 use std::cell::RefCell;
1608 use std::time::Duration;
1609 use tracing::info;
1610
1611 fn make_test_app_set() -> Rc<Mutex<VecAppSet>> {
1612 Rc::new(Mutex::new(VecAppSet::new(vec![App::builder()
1613 .id("{00000000-0000-0000-0000-000000000001}")
1614 .version([1, 2, 3, 4])
1615 .cohort(Cohort::new("stable-channel"))
1616 .build()])))
1617 }
1618
1619 fn make_update_available_response() -> HttpResponse<Vec<u8>> {
1620 let response = json!({"response":{
1621 "server": "prod",
1622 "protocol": "3.0",
1623 "app": [{
1624 "appid": "{00000000-0000-0000-0000-000000000001}",
1625 "status": "ok",
1626 "updatecheck": {
1627 "status": "ok"
1628 }
1629 }],
1630 }});
1631 HttpResponse::new(serde_json::to_vec(&response).unwrap())
1632 }
1633
1634 fn make_noupdate_httpresponse() -> Vec<u8> {
1635 serde_json::to_vec(
1636 &(json!({"response":{
1637 "server": "prod",
1638 "protocol": "3.0",
1639 "app": [{
1640 "appid": "{00000000-0000-0000-0000-000000000001}",
1641 "status": "ok",
1642 "updatecheck": {
1643 "status": "noupdate"
1644 }
1645 }]
1646 }})),
1647 )
1648 .unwrap()
1649 }
1650
1651 async fn assert_request<'a>(http: &MockHttpRequest, request_builder: RequestBuilder<'a>) {
1654 let cup_handler = make_cup_handler_for_test();
1655 let (request, _request_metadata) = request_builder.build(Some(&cup_handler)).unwrap();
1656 let body = hyper::body::to_bytes(request).await.unwrap();
1657 let body_str = String::from_utf8_lossy(&body);
1659 http.assert_body_str(&body_str).await;
1660 }
1661
1662 #[test]
1663 fn run_simple_check_with_noupdate_result() {
1664 block_on(async {
1665 let http = MockHttpRequest::new(HttpResponse::new(make_noupdate_httpresponse()));
1666
1667 StateMachineBuilder::new_stub()
1668 .http(http)
1669 .oneshot(RequestParams::default())
1670 .await
1671 .unwrap();
1672
1673 info!("update check complete!");
1674 });
1675 }
1676
1677 #[test]
1678 fn test_cohort_returned_with_noupdate_result() {
1679 block_on(async {
1680 let response = json!({"response":{
1681 "server": "prod",
1682 "protocol": "3.0",
1683 "app": [{
1684 "appid": "{00000000-0000-0000-0000-000000000001}",
1685 "status": "ok",
1686 "cohort": "1",
1687 "cohortname": "stable-channel",
1688 "updatecheck": {
1689 "status": "noupdate"
1690 }
1691 }]
1692 }});
1693 let response = serde_json::to_vec(&response).unwrap();
1694 let http = MockHttpRequest::new(HttpResponse::new(response));
1695
1696 let (response, reboot_after_update) = StateMachineBuilder::new_stub()
1697 .http(http)
1698 .oneshot(RequestParams::default())
1699 .await
1700 .unwrap();
1701 assert_eq!(
1702 "{00000000-0000-0000-0000-000000000001}",
1703 response.app_responses[0].app_id
1704 );
1705 assert_eq!(Some("1".into()), response.app_responses[0].cohort.id);
1706 assert_eq!(
1707 Some("stable-channel".into()),
1708 response.app_responses[0].cohort.name
1709 );
1710 assert_eq!(None, response.app_responses[0].cohort.hint);
1711
1712 assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
1713 });
1714 }
1715
1716 #[test]
1717 fn test_cohort_returned_with_update_result() {
1718 block_on(async {
1719 let response = json!({"response":{
1720 "server": "prod",
1721 "protocol": "3.0",
1722 "app": [{
1723 "appid": "{00000000-0000-0000-0000-000000000001}",
1724 "status": "ok",
1725 "cohort": "1",
1726 "cohortname": "stable-channel",
1727 "updatecheck": {
1728 "status": "ok"
1729 }
1730 }]
1731 }});
1732 let response = serde_json::to_vec(&response).unwrap();
1733 let http = MockHttpRequest::new(HttpResponse::new(response));
1734
1735 let (response, reboot_after_update) = StateMachineBuilder::new_stub()
1736 .http(http)
1737 .oneshot(RequestParams::default())
1738 .await
1739 .unwrap();
1740 assert_eq!(
1741 "{00000000-0000-0000-0000-000000000001}",
1742 response.app_responses[0].app_id
1743 );
1744 assert_eq!(Some("1".into()), response.app_responses[0].cohort.id);
1745 assert_eq!(
1746 Some("stable-channel".into()),
1747 response.app_responses[0].cohort.name
1748 );
1749 assert_eq!(None, response.app_responses[0].cohort.hint);
1750
1751 assert_matches!(reboot_after_update, RebootAfterUpdate::Needed(()));
1752 });
1753 }
1754
1755 #[test]
1756 fn test_report_parse_response_error() {
1757 block_on(async {
1758 let http = MockHttpRequest::new(HttpResponse::new("invalid response".into()));
1759
1760 let mut state_machine = StateMachineBuilder::new_stub().http(http).build().await;
1761
1762 let response = state_machine.oneshot(RequestParams::default()).await;
1763 assert_matches!(response, Err(UpdateCheckError::ResponseParser(_)));
1764
1765 let request_params = RequestParams::default();
1766 let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
1767 let event = Event {
1768 previous_version: Some("1.2.3.4".to_string()),
1769 ..Event::error(EventErrorCode::ParseResponse)
1770 };
1771 let apps = state_machine.app_set.lock().await.get_apps();
1772 request_builder = request_builder
1773 .add_event(&apps[0], event)
1774 .session_id(GUID::from_u128(0))
1775 .request_id(GUID::from_u128(2));
1776 assert_request(&state_machine.http, request_builder).await;
1777 });
1778 }
1779
1780 #[test]
1781 fn test_report_construct_install_plan_error() {
1782 block_on(async {
1783 let response = json!({"response":{
1784 "server": "prod",
1785 "protocol": "4.0",
1786 "app": [{
1787 "appid": "{00000000-0000-0000-0000-000000000001}",
1788 "status": "ok",
1789 "updatecheck": {
1790 "status": "ok"
1791 }
1792 }],
1793 }});
1794 let response = serde_json::to_vec(&response).unwrap();
1795 let http = MockHttpRequest::new(HttpResponse::new(response));
1796
1797 let mut state_machine = StateMachineBuilder::new_stub().http(http).build().await;
1798
1799 let response = state_machine.oneshot(RequestParams::default()).await;
1800 assert_matches!(response, Err(UpdateCheckError::InstallPlan(_)));
1801
1802 let request_params = RequestParams::default();
1803 let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
1804 let event = Event {
1805 previous_version: Some("1.2.3.4".to_string()),
1806 ..Event::error(EventErrorCode::ConstructInstallPlan)
1807 };
1808 let apps = state_machine.app_set.lock().await.get_apps();
1809 request_builder = request_builder
1810 .add_event(&apps[0], event)
1811 .session_id(GUID::from_u128(0))
1812 .request_id(GUID::from_u128(2));
1813 assert_request(&state_machine.http, request_builder).await;
1814 });
1815 }
1816
1817 #[test]
1818 fn test_report_installation_error() {
1819 block_on(async {
1820 let response = json!({"response":{
1821 "server": "prod",
1822 "protocol": "3.0",
1823 "app": [{
1824 "appid": "{00000000-0000-0000-0000-000000000001}",
1825 "status": "ok",
1826 "updatecheck": {
1827 "status": "ok",
1828 "manifest": {
1829 "version": "5.6.7.8",
1830 "actions": {
1831 "action": [],
1832 },
1833 "packages": {
1834 "package": [],
1835 },
1836 }
1837 }
1838 }],
1839 }});
1840 let response = serde_json::to_vec(&response).unwrap();
1841 let http = MockHttpRequest::new(HttpResponse::new(response));
1842
1843 let mut state_machine = StateMachineBuilder::new_stub()
1844 .http(http)
1845 .installer(StubInstaller { should_fail: true })
1846 .build()
1847 .await;
1848
1849 let (response, reboot_after_update) = state_machine
1850 .oneshot(RequestParams::default())
1851 .await
1852 .unwrap();
1853 assert_eq!(
1854 Action::InstallPlanExecutionError,
1855 response.app_responses[0].result
1856 );
1857 assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
1858
1859 let request_params = RequestParams::default();
1860 let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
1861 let event = Event {
1862 previous_version: Some("1.2.3.4".to_string()),
1863 next_version: Some("5.6.7.8".to_string()),
1864 download_time_ms: Some(0),
1865 ..Event::error(EventErrorCode::Installation)
1866 };
1867 let apps = state_machine.app_set.lock().await.get_apps();
1868 request_builder = request_builder
1869 .add_event(&apps[0], event)
1870 .session_id(GUID::from_u128(0))
1871 .request_id(GUID::from_u128(3));
1872 assert_request(&state_machine.http, request_builder).await;
1873 });
1874 }
1875
1876 #[test]
1877 fn test_report_installation_error_multi_app() {
1878 block_on(async {
1879 let response = json!({"response":{
1881 "server": "prod",
1882 "protocol": "3.0",
1883 "app": [{
1884 "appid": "appid_3",
1885 "status": "ok",
1886 "updatecheck": {
1887 "status": "ok",
1888 "manifest": {
1889 "version": "5.6.7.8",
1890 "actions": {
1891 "action": [],
1892 },
1893 "packages": {
1894 "package": [],
1895 },
1896 }
1897 }
1898 },{
1899 "appid": "appid_1",
1900 "status": "ok",
1901 "updatecheck": {
1902 "status": "ok",
1903 "manifest": {
1904 "version": "1.2.3.4",
1905 "actions": {
1906 "action": [],
1907 },
1908 "packages": {
1909 "package": [],
1910 },
1911 }
1912 }
1913 },{
1914 "appid": "appid_2",
1915 "status": "ok",
1916 "updatecheck": {
1917 "status": "noupdate",
1918 }
1919 }],
1920 }});
1921 let response = serde_json::to_vec(&response).unwrap();
1922 let mut http = MockHttpRequest::new(HttpResponse::new(response));
1923 http.add_response(HttpResponse::new(vec![]));
1924 let app_set = VecAppSet::new(vec![
1925 App::builder().id("appid_1").version([1, 2, 3, 3]).build(),
1926 App::builder().id("appid_2").version([9, 9, 9, 9]).build(),
1927 App::builder().id("appid_3").version([5, 6, 7, 7]).build(),
1928 ]);
1929 let app_set = Rc::new(Mutex::new(app_set));
1930 let (send_install, mut recv_install) = mpsc::channel(0);
1931
1932 let mut state_machine = StateMachineBuilder::new_stub()
1933 .app_set(Rc::clone(&app_set))
1934 .http(http)
1935 .installer(BlockingInstaller {
1936 on_install: send_install,
1937 on_reboot: None,
1938 })
1939 .build()
1940 .await;
1941
1942 let recv_install_fut = async move {
1943 let unblock_install = recv_install.next().await.unwrap();
1944 unblock_install
1945 .send(vec![
1946 AppInstallResult::Deferred,
1947 AppInstallResult::Installed,
1948 ])
1949 .unwrap();
1950 };
1951
1952 let (oneshot_result, ()) = future::join(
1953 state_machine.oneshot(RequestParams::default()),
1954 recv_install_fut,
1955 )
1956 .await;
1957 let (response, reboot_after_update) = oneshot_result.unwrap();
1958
1959 assert_eq!("appid_3", response.app_responses[0].app_id);
1960 assert_eq!(Action::DeferredByPolicy, response.app_responses[0].result);
1961 assert_eq!("appid_1", response.app_responses[1].app_id);
1962 assert_eq!(Action::Updated, response.app_responses[1].result);
1963 assert_eq!("appid_2", response.app_responses[2].app_id);
1964 assert_eq!(Action::NoUpdate, response.app_responses[2].result);
1965 assert_matches!(reboot_after_update, RebootAfterUpdate::Needed(()));
1966
1967 let request_params = RequestParams::default();
1968 let apps = app_set.lock().await.get_apps();
1969
1970 let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
1971 let event = Event {
1972 previous_version: Some("1.2.3.3".to_string()),
1973 next_version: Some("1.2.3.4".to_string()),
1974 download_time_ms: Some(0),
1975 ..Event::success(EventType::UpdateComplete)
1976 };
1977 request_builder = request_builder
1978 .add_event(&apps[0], event)
1979 .session_id(GUID::from_u128(0))
1980 .request_id(GUID::from_u128(4));
1981 assert_request(&state_machine.http, request_builder).await;
1982
1983 let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
1984 let event1 = Event {
1985 previous_version: Some("1.2.3.3".to_string()),
1986 next_version: Some("1.2.3.4".to_string()),
1987 download_time_ms: Some(0),
1988 ..Event::success(EventType::UpdateDownloadFinished)
1989 };
1990 let event2 = Event {
1991 previous_version: Some("5.6.7.7".to_string()),
1992 next_version: Some("5.6.7.8".to_string()),
1993 download_time_ms: Some(0),
1994 event_type: EventType::UpdateComplete,
1995 event_result: EventResult::UpdateDeferred,
1996 ..Event::default()
1997 };
1998 request_builder = request_builder
1999 .add_event(&apps[2], event2)
2000 .add_event(&apps[0], event1)
2001 .session_id(GUID::from_u128(0))
2002 .request_id(GUID::from_u128(3));
2003 assert_request(&state_machine.http, request_builder).await;
2004 });
2005 }
2006
2007 #[test]
2010 fn test_observe_installation_error() {
2011 block_on(async {
2012 let http = MockHttpRequest::new(make_update_available_response());
2013
2014 let actual_errors = StateMachineBuilder::new_stub()
2015 .http(http)
2016 .installer(StubInstaller { should_fail: true })
2017 .oneshot_check()
2018 .await
2019 .filter_map(|event| {
2020 future::ready(match event {
2021 StateMachineEvent::InstallerError(Some(e)) => {
2022 Some(*e.downcast::<StubInstallErrors>().unwrap())
2023 }
2024 _ => None,
2025 })
2026 })
2027 .collect::<Vec<StubInstallErrors>>()
2028 .await;
2029
2030 let expected_errors = vec![StubInstallErrors::Failed];
2031 assert_eq!(actual_errors, expected_errors);
2032 });
2033 }
2034
2035 #[test]
2036 fn test_report_deferred_by_policy() {
2037 block_on(async {
2038 let http = MockHttpRequest::new(make_update_available_response());
2039
2040 let policy_engine = MockPolicyEngine {
2041 update_decision: UpdateDecision::DeferredByPolicy,
2042 ..MockPolicyEngine::default()
2043 };
2044 let mut state_machine = StateMachineBuilder::new_stub()
2045 .policy_engine(policy_engine)
2046 .http(http)
2047 .build()
2048 .await;
2049
2050 let (response, reboot_after_update) = state_machine
2051 .oneshot(RequestParams::default())
2052 .await
2053 .unwrap();
2054 assert_eq!(Action::DeferredByPolicy, response.app_responses[0].result);
2055 assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
2056
2057 let request_params = RequestParams::default();
2058 let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
2059 let event = Event {
2060 event_type: EventType::UpdateComplete,
2061 event_result: EventResult::UpdateDeferred,
2062 previous_version: Some("1.2.3.4".to_string()),
2063 ..Event::default()
2064 };
2065 let apps = state_machine.app_set.lock().await.get_apps();
2066 request_builder = request_builder
2067 .add_event(&apps[0], event)
2068 .session_id(GUID::from_u128(0))
2069 .request_id(GUID::from_u128(2));
2070 assert_request(&state_machine.http, request_builder).await;
2071 });
2072 }
2073
2074 #[test]
2075 fn test_report_denied_by_policy() {
2076 block_on(async {
2077 let response = make_update_available_response();
2078 let http = MockHttpRequest::new(response);
2079 let policy_engine = MockPolicyEngine {
2080 update_decision: UpdateDecision::DeniedByPolicy,
2081 ..MockPolicyEngine::default()
2082 };
2083
2084 let mut state_machine = StateMachineBuilder::new_stub()
2085 .policy_engine(policy_engine)
2086 .http(http)
2087 .build()
2088 .await;
2089
2090 let (response, reboot_after_update) = state_machine
2091 .oneshot(RequestParams::default())
2092 .await
2093 .unwrap();
2094 assert_eq!(Action::DeniedByPolicy, response.app_responses[0].result);
2095 assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
2096
2097 let request_params = RequestParams::default();
2098 let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
2099 let event = Event {
2100 previous_version: Some("1.2.3.4".to_string()),
2101 ..Event::error(EventErrorCode::DeniedByPolicy)
2102 };
2103 let apps = state_machine.app_set.lock().await.get_apps();
2104 request_builder = request_builder
2105 .add_event(&apps[0], event)
2106 .session_id(GUID::from_u128(0))
2107 .request_id(GUID::from_u128(2));
2108 assert_request(&state_machine.http, request_builder).await;
2109 });
2110 }
2111
2112 #[test]
2113 fn test_wait_timer() {
2114 let mut pool = LocalPool::new();
2115 let mock_time = MockTimeSource::new_from_now();
2116 let next_update_time = mock_time.now() + Duration::from_secs(111);
2117 let (timer, mut timers) = BlockingTimer::new();
2118 let policy_engine = MockPolicyEngine {
2119 check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
2120 time_source: mock_time,
2121 ..MockPolicyEngine::default()
2122 };
2123
2124 let (_ctl, state_machine) = pool.run_until(
2125 StateMachineBuilder::new_stub()
2126 .policy_engine(policy_engine)
2127 .timer(timer)
2128 .start(),
2129 );
2130
2131 pool.spawner()
2132 .spawn_local(state_machine.map(|_| ()).collect())
2133 .unwrap();
2134
2135 let blocked_timer = pool.run_until(timers.next()).unwrap();
2138 assert_eq!(
2139 blocked_timer.requested_wait(),
2140 RequestedWait::Until(next_update_time.into())
2141 );
2142 }
2143
2144 #[test]
2145 fn test_cohort_and_user_counting_updates_are_used_in_subsequent_requests() {
2146 block_on(async {
2147 let response = json!({"response":{
2148 "server": "prod",
2149 "protocol": "3.0",
2150 "daystart": {
2151 "elapsed_days": 1234567,
2152 "elapsed_seconds": 3645
2153 },
2154 "app": [{
2155 "appid": "{00000000-0000-0000-0000-000000000001}",
2156 "status": "ok",
2157 "cohort": "1",
2158 "cohortname": "stable-channel",
2159 "updatecheck": {
2160 "status": "noupdate"
2161 }
2162 }]
2163 }});
2164 let response = serde_json::to_vec(&response).unwrap();
2165 let mut http = MockHttpRequest::new(HttpResponse::new(response.clone()));
2166 http.add_response(HttpResponse::new(response));
2167 let apps = make_test_app_set();
2168
2169 let mut state_machine = StateMachineBuilder::new_stub()
2170 .http(http)
2171 .app_set(apps.clone())
2172 .build()
2173 .await;
2174
2175 state_machine.run_once().await;
2177
2178 let apps = apps.lock().await.get_apps();
2179 assert_eq!(Some("1".to_string()), apps[0].cohort.id);
2180 assert_eq!(None, apps[0].cohort.hint);
2181 assert_eq!(Some("stable-channel".to_string()), apps[0].cohort.name);
2182 assert_eq!(
2183 UserCounting::ClientRegulatedByDate(Some(1234567)),
2184 apps[0].user_counting
2185 );
2186
2187 state_machine.run_once().await;
2189
2190 let request_params = RequestParams::default();
2191 let expected_request_builder =
2192 RequestBuilder::new(&state_machine.config, &request_params)
2193 .add_update_check(&apps[0])
2194 .add_ping(&apps[0])
2195 .session_id(GUID::from_u128(2))
2196 .request_id(GUID::from_u128(3));
2197 assert_request(&state_machine.http, expected_request_builder).await;
2199 });
2200 }
2201
2202 #[test]
2203 fn test_user_counting_returned() {
2204 block_on(async {
2205 let response = json!({"response":{
2206 "server": "prod",
2207 "protocol": "3.0",
2208 "daystart": {
2209 "elapsed_days": 1234567,
2210 "elapsed_seconds": 3645
2211 },
2212 "app": [{
2213 "appid": "{00000000-0000-0000-0000-000000000001}",
2214 "status": "ok",
2215 "cohort": "1",
2216 "cohortname": "stable-channel",
2217 "updatecheck": {
2218 "status": "noupdate"
2219 }
2220 }]
2221 }});
2222 let response = serde_json::to_vec(&response).unwrap();
2223 let http = MockHttpRequest::new(HttpResponse::new(response));
2224
2225 let (response, reboot_after_update) = StateMachineBuilder::new_stub()
2226 .http(http)
2227 .oneshot(RequestParams::default())
2228 .await
2229 .unwrap();
2230
2231 assert_eq!(
2232 UserCounting::ClientRegulatedByDate(Some(1234567)),
2233 response.app_responses[0].user_counting
2234 );
2235 assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
2236 });
2237 }
2238
2239 #[test]
2240 fn test_observe_state() {
2241 block_on(async {
2242 let actual_states = StateMachineBuilder::new_stub()
2243 .oneshot_check()
2244 .await
2245 .filter_map(|event| {
2246 future::ready(match event {
2247 StateMachineEvent::StateChange(state) => Some(state),
2248 _ => None,
2249 })
2250 })
2251 .collect::<Vec<State>>()
2252 .await;
2253
2254 let expected_states = vec![
2255 State::CheckingForUpdates(InstallSource::ScheduledTask),
2256 State::ErrorCheckingForUpdate,
2257 ];
2258 assert_eq!(actual_states, expected_states);
2259 });
2260 }
2261
2262 #[test]
2263 fn test_observe_schedule() {
2264 block_on(async {
2265 let mock_time = MockTimeSource::new_from_now();
2266 let actual_schedules = StateMachineBuilder::new_stub()
2267 .policy_engine(StubPolicyEngine::new(&mock_time))
2268 .oneshot_check()
2269 .await
2270 .filter_map(|event| {
2271 future::ready(match event {
2272 StateMachineEvent::ScheduleChange(schedule) => Some(schedule),
2273 _ => None,
2274 })
2275 })
2276 .collect::<Vec<UpdateCheckSchedule>>()
2277 .await;
2278
2279 let expected_schedule = UpdateCheckSchedule::builder()
2281 .last_update_time(mock_time.now())
2282 .last_update_check_time(mock_time.now())
2283 .build();
2284
2285 assert_eq!(actual_schedules, vec![expected_schedule]);
2286 });
2287 }
2288
2289 #[test]
2290 fn test_observe_protocol_state() {
2291 block_on(async {
2292 let actual_protocol_states = StateMachineBuilder::new_stub()
2293 .oneshot_check()
2294 .await
2295 .filter_map(|event| {
2296 future::ready(match event {
2297 StateMachineEvent::ProtocolStateChange(state) => Some(state),
2298 _ => None,
2299 })
2300 })
2301 .collect::<Vec<ProtocolState>>()
2302 .await;
2303
2304 let expected_protocol_state = ProtocolState {
2305 consecutive_failed_update_checks: 1,
2306 ..ProtocolState::default()
2307 };
2308
2309 assert_eq!(actual_protocol_states, vec![expected_protocol_state]);
2310 });
2311 }
2312
2313 #[test]
2314 fn test_observe_omaha_server_response() {
2315 block_on(async {
2316 let response = json!({"response":{
2317 "server": "prod",
2318 "protocol": "3.0",
2319 "app": [{
2320 "appid": "{00000000-0000-0000-0000-000000000001}",
2321 "status": "ok",
2322 "cohort": "1",
2323 "cohortname": "stable-channel",
2324 "updatecheck": {
2325 "status": "noupdate"
2326 }
2327 }]
2328 }});
2329 let response = serde_json::to_vec(&response).unwrap();
2330 let expected_omaha_response = response::parse_json_response(&response).unwrap();
2331 let http = MockHttpRequest::new(HttpResponse::new(response));
2332
2333 let actual_omaha_response = StateMachineBuilder::new_stub()
2334 .http(http)
2335 .oneshot_check()
2336 .await
2337 .filter_map(|event| {
2338 future::ready(match event {
2339 StateMachineEvent::OmahaServerResponse(response) => Some(response),
2340 _ => None,
2341 })
2342 })
2343 .collect::<Vec<response::Response>>()
2344 .await;
2345
2346 assert_eq!(actual_omaha_response, vec![expected_omaha_response]);
2347 });
2348 }
2349
2350 #[test]
2351 fn test_metrics_report_omaha_event_lost() {
2352 block_on(async {
2353 let http = MockHttpRequest::new(HttpResponse::new("invalid response".into()));
2368 let mut metrics_reporter = MockMetricsReporter::new();
2369 let _response = StateMachineBuilder::new_stub()
2370 .http(http)
2371 .metrics_reporter(&mut metrics_reporter)
2372 .oneshot(RequestParams::default())
2373 .await;
2374
2375 #[rustfmt::skip]
2378 assert_matches!(
2379 metrics_reporter.metrics.as_slice(),
2380 [
2381 Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
2382 Metrics::RequestsPerCheck { count: 1, successful: true },
2383 Metrics::OmahaEventLost(Event {
2384 event_type: EventType::UpdateComplete,
2385 event_result: EventResult::Error,
2386 errorcode: Some(EventErrorCode::ParseResponse),
2387 previous_version: None,
2388 next_version: None,
2389 download_time_ms: None,
2390 })
2391 ]
2392 );
2393 });
2394 }
2395
2396 #[test]
2397 fn test_metrics_report_update_check_response_time() {
2398 block_on(async {
2399 let mut metrics_reporter = MockMetricsReporter::new();
2400 let _response = StateMachineBuilder::new_stub()
2401 .metrics_reporter(&mut metrics_reporter)
2402 .oneshot(RequestParams::default())
2403 .await;
2404
2405 #[rustfmt::skip]
2408 assert_matches!(
2409 metrics_reporter.metrics.as_slice(),
2410 [
2411 Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
2412 Metrics::RequestsPerCheck { count: 1, successful: true },
2413 ]
2414 );
2415 });
2416 }
2417
2418 #[test]
2419 fn test_metrics_report_update_check_response_time_on_failure() {
2420 block_on(async {
2421 let mut metrics_reporter = MockMetricsReporter::new();
2422 let mut http = MockHttpRequest::default();
2423
2424 for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS {
2425 http.add_error(http_request::mock_errors::make_transport_error());
2426 }
2427
2428 http.add_response(hyper::Response::default());
2431
2432 let _response = StateMachineBuilder::new_stub()
2433 .http(http)
2434 .metrics_reporter(&mut metrics_reporter)
2435 .oneshot(RequestParams::default())
2436 .await;
2437
2438 #[rustfmt::skip]
2441 assert_matches!(
2442 metrics_reporter.metrics.as_slice(),
2443 [
2444 Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
2445 Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
2446 Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
2447 Metrics::RequestsPerCheck { count: 3, successful: false },
2448 ]
2449 );
2450 });
2451 }
2452
2453 #[test]
2454 fn test_metrics_report_update_check_response_time_on_failure_followed_by_success() {
2455 block_on(async {
2456 let mut metrics_reporter = MockMetricsReporter::new();
2457 let mut http = MockHttpRequest::default();
2458
2459 for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS - 1 {
2460 http.add_error(http_request::mock_errors::make_transport_error());
2461 }
2462 http.add_response(hyper::Response::default());
2463
2464 let _response = StateMachineBuilder::new_stub()
2465 .http(http)
2466 .metrics_reporter(&mut metrics_reporter)
2467 .oneshot(RequestParams::default())
2468 .await;
2469
2470 #[rustfmt::skip]
2473 assert_matches!(
2474 metrics_reporter.metrics.as_slice(),
2475 [
2476 Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
2477 Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
2478 Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
2479 Metrics::RequestsPerCheck { count: 3, successful: true },
2480 Metrics::OmahaEventLost(Event {
2481 event_type: EventType::UpdateComplete,
2482 event_result: EventResult::Error,
2483 errorcode: Some(EventErrorCode::ParseResponse),
2484 previous_version: None,
2485 next_version: None,
2486 download_time_ms: None
2487 }),
2488 ]
2489 );
2490 });
2491 }
2492
2493 #[test]
2494 fn test_metrics_report_requests_per_check() {
2495 block_on(async {
2496 let mut metrics_reporter = MockMetricsReporter::new();
2497 let _response = StateMachineBuilder::new_stub()
2498 .metrics_reporter(&mut metrics_reporter)
2499 .oneshot(RequestParams::default())
2500 .await;
2501
2502 assert!(metrics_reporter
2503 .metrics
2504 .contains(&Metrics::RequestsPerCheck {
2505 count: 1,
2506 successful: true
2507 }));
2508 });
2509 }
2510
2511 #[test]
2512 fn test_metrics_report_requests_per_check_on_failure_followed_by_success() {
2513 block_on(async {
2514 let mut metrics_reporter = MockMetricsReporter::new();
2515 let mut http = MockHttpRequest::default();
2516
2517 for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS - 1 {
2518 http.add_error(http_request::mock_errors::make_transport_error());
2519 }
2520
2521 http.add_response(hyper::Response::default());
2522
2523 let _response = StateMachineBuilder::new_stub()
2524 .http(http)
2525 .metrics_reporter(&mut metrics_reporter)
2526 .oneshot(RequestParams::default())
2527 .await;
2528
2529 assert!(!metrics_reporter.metrics.is_empty());
2530 assert!(metrics_reporter
2531 .metrics
2532 .contains(&Metrics::RequestsPerCheck {
2533 count: MAX_OMAHA_REQUEST_ATTEMPTS,
2534 successful: true
2535 }));
2536 });
2537 }
2538
2539 #[test]
2540 fn test_metrics_report_requests_per_check_on_failure() {
2541 block_on(async {
2542 let mut metrics_reporter = MockMetricsReporter::new();
2543 let mut http = MockHttpRequest::default();
2544
2545 for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS {
2546 http.add_error(http_request::mock_errors::make_transport_error());
2547 }
2548
2549 http.add_response(hyper::Response::default());
2551
2552 let _response = StateMachineBuilder::new_stub()
2553 .http(http)
2554 .metrics_reporter(&mut metrics_reporter)
2555 .oneshot(RequestParams::default())
2556 .await;
2557
2558 assert!(!metrics_reporter.metrics.is_empty());
2559 assert!(metrics_reporter
2560 .metrics
2561 .contains(&Metrics::RequestsPerCheck {
2562 count: MAX_OMAHA_REQUEST_ATTEMPTS,
2563 successful: false
2564 }));
2565 });
2566 }
2567
2568 #[test]
2569 fn test_requests_per_check_backoff_with_mock_timer() {
2570 block_on(async {
2571 let mut timer = MockTimer::new();
2572 timer.expect_for_range(Duration::from_millis(500), Duration::from_millis(1500));
2573 timer.expect_for_range(Duration::from_millis(1500), Duration::from_millis(2500));
2574 let requested_waits = timer.get_requested_waits_view();
2575 let response = StateMachineBuilder::new_stub()
2576 .http(MockHttpRequest::empty())
2577 .timer(timer)
2578 .oneshot(RequestParams::default())
2579 .await;
2580
2581 let waits = requested_waits.borrow();
2582 assert_eq!(waits.len(), 2);
2583 assert_matches!(
2584 waits[0],
2585 RequestedWait::For(d) if d >= Duration::from_millis(500) && d <= Duration::from_millis(1500)
2586 );
2587 assert_matches!(
2588 waits[1],
2589 RequestedWait::For(d) if d >= Duration::from_millis(1500) && d <= Duration::from_millis(2500)
2590 );
2591
2592 assert_matches!(
2593 response,
2594 Err(UpdateCheckError::OmahaRequest(
2595 OmahaRequestError::HttpStatus(_)
2596 ))
2597 );
2598 });
2599 }
2600
2601 #[test]
2602 fn test_metrics_report_update_check_failure_reason_omaha() {
2603 block_on(async {
2604 let mut metrics_reporter = MockMetricsReporter::new();
2605 let mut state_machine = StateMachineBuilder::new_stub()
2606 .metrics_reporter(&mut metrics_reporter)
2607 .build()
2608 .await;
2609
2610 state_machine.run_once().await;
2611
2612 assert!(metrics_reporter
2613 .metrics
2614 .contains(&Metrics::UpdateCheckFailureReason(
2615 UpdateCheckFailureReason::Omaha
2616 )));
2617 });
2618 }
2619
2620 #[test]
2621 fn test_metrics_report_update_check_failure_reason_network() {
2622 block_on(async {
2623 let mut metrics_reporter = MockMetricsReporter::new();
2624 let mut state_machine = StateMachineBuilder::new_stub()
2625 .http(MockHttpRequest::empty())
2626 .metrics_reporter(&mut metrics_reporter)
2627 .build()
2628 .await;
2629
2630 state_machine.run_once().await;
2631
2632 assert!(metrics_reporter
2633 .metrics
2634 .contains(&Metrics::UpdateCheckFailureReason(
2635 UpdateCheckFailureReason::Network
2636 )));
2637 });
2638 }
2639
2640 #[test]
2641 fn test_persist_last_update_time() {
2642 block_on(async {
2643 let storage = Rc::new(Mutex::new(MemStorage::new()));
2644
2645 StateMachineBuilder::new_stub()
2646 .storage(Rc::clone(&storage))
2647 .oneshot_check()
2648 .await
2649 .map(|_| ())
2650 .collect::<()>()
2651 .await;
2652
2653 let storage = storage.lock().await;
2654 storage.get_int(LAST_UPDATE_TIME).await.unwrap();
2655 assert!(storage.committed());
2656 });
2657 }
2658
2659 #[test]
2660 fn test_persist_server_dictated_poll_interval() {
2661 block_on(async {
2662 let response = HttpResponse::builder()
2663 .header(X_RETRY_AFTER, 1234)
2664 .body(make_noupdate_httpresponse())
2665 .unwrap();
2666 let http = MockHttpRequest::new(response);
2667 let storage = Rc::new(Mutex::new(MemStorage::new()));
2668
2669 let mut state_machine = StateMachineBuilder::new_stub()
2670 .http(http)
2671 .storage(Rc::clone(&storage))
2672 .build()
2673 .await;
2674 state_machine
2675 .oneshot(RequestParams::default())
2676 .await
2677 .unwrap();
2678
2679 assert_eq!(
2680 state_machine.context.state.server_dictated_poll_interval,
2681 Some(Duration::from_secs(1234))
2682 );
2683
2684 let storage = storage.lock().await;
2685 assert_eq!(
2686 storage.get_int(SERVER_DICTATED_POLL_INTERVAL).await,
2687 Some(1234000000)
2688 );
2689 assert!(storage.committed());
2690 });
2691 }
2692
2693 #[test]
2694 fn test_persist_server_dictated_poll_interval_http_error() {
2695 block_on(async {
2696 let response = HttpResponse::builder()
2697 .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
2698 .header(X_RETRY_AFTER, 1234)
2699 .body(vec![])
2700 .unwrap();
2701 let http = MockHttpRequest::new(response);
2702 let storage = Rc::new(Mutex::new(MemStorage::new()));
2703
2704 let mut state_machine = StateMachineBuilder::new_stub()
2705 .http(http)
2706 .storage(Rc::clone(&storage))
2707 .build()
2708 .await;
2709 assert_matches!(
2710 state_machine.oneshot(RequestParams::default()).await,
2711 Err(UpdateCheckError::OmahaRequest(
2712 OmahaRequestError::HttpStatus(_)
2713 ))
2714 );
2715
2716 assert_eq!(
2717 state_machine.context.state.server_dictated_poll_interval,
2718 Some(Duration::from_secs(1234))
2719 );
2720
2721 let storage = storage.lock().await;
2722 assert_eq!(
2723 storage.get_int(SERVER_DICTATED_POLL_INTERVAL).await,
2724 Some(1234000000)
2725 );
2726 assert!(storage.committed());
2727 });
2728 }
2729
2730 #[test]
2731 fn test_persist_server_dictated_poll_interval_max_duration() {
2732 block_on(async {
2733 let response = HttpResponse::builder()
2734 .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
2735 .header(X_RETRY_AFTER, 123456789)
2736 .body(vec![])
2737 .unwrap();
2738 let http = MockHttpRequest::new(response);
2739 let storage = Rc::new(Mutex::new(MemStorage::new()));
2740
2741 let mut state_machine = StateMachineBuilder::new_stub()
2742 .http(http)
2743 .storage(Rc::clone(&storage))
2744 .build()
2745 .await;
2746 assert_matches!(
2747 state_machine.oneshot(RequestParams::default()).await,
2748 Err(UpdateCheckError::OmahaRequest(
2749 OmahaRequestError::HttpStatus(_)
2750 ))
2751 );
2752
2753 assert_eq!(
2754 state_machine.context.state.server_dictated_poll_interval,
2755 Some(Duration::from_secs(86400))
2756 );
2757
2758 let storage = storage.lock().await;
2759 assert_eq!(
2760 storage.get_int(SERVER_DICTATED_POLL_INTERVAL).await,
2761 Some(86400000000)
2762 );
2763 assert!(storage.committed());
2764 });
2765 }
2766
2767 #[test]
2768 fn test_server_dictated_poll_interval_with_transport_error_no_retry() {
2769 block_on(async {
2770 let mut http = MockHttpRequest::empty();
2771 http.add_error(http_request::mock_errors::make_transport_error());
2772 let mut storage = MemStorage::new();
2773 let _ = storage.set_int(SERVER_DICTATED_POLL_INTERVAL, 1234000000);
2774 let _ = storage.commit();
2775 let storage = Rc::new(Mutex::new(storage));
2776
2777 let mut state_machine = StateMachineBuilder::new_stub()
2778 .http(http)
2779 .storage(Rc::clone(&storage))
2780 .build()
2781 .await;
2782 assert_matches!(
2786 state_machine.oneshot(RequestParams::default()).await,
2787 Err(UpdateCheckError::OmahaRequest(
2788 OmahaRequestError::HttpTransport(_)
2789 ))
2790 );
2791
2792 assert_eq!(
2793 state_machine.context.state.server_dictated_poll_interval,
2794 Some(Duration::from_secs(1234))
2795 );
2796 });
2797 }
2798
2799 #[test]
2800 fn test_persist_app() {
2801 block_on(async {
2802 let storage = Rc::new(Mutex::new(MemStorage::new()));
2803 let app_set = make_test_app_set();
2804
2805 StateMachineBuilder::new_stub()
2806 .storage(Rc::clone(&storage))
2807 .app_set(app_set.clone())
2808 .oneshot_check()
2809 .await
2810 .map(|_| ())
2811 .collect::<()>()
2812 .await;
2813
2814 let storage = storage.lock().await;
2815 let apps = app_set.lock().await.get_apps();
2816 storage.get_string(&apps[0].id).await.unwrap();
2817 assert!(storage.committed());
2818 });
2819 }
2820
2821 #[test]
2822 fn test_load_last_update_time() {
2823 block_on(async {
2824 let mut storage = MemStorage::new();
2825 let mut mock_time = MockTimeSource::new_from_now();
2826 mock_time.truncate_submicrosecond_walltime();
2827 let last_update_time = mock_time.now_in_walltime() - Duration::from_secs(999);
2828 storage
2829 .set_time(LAST_UPDATE_TIME, last_update_time)
2830 .await
2831 .unwrap();
2832
2833 let state_machine = StateMachineBuilder::new_stub()
2834 .policy_engine(StubPolicyEngine::new(&mock_time))
2835 .storage(Rc::new(Mutex::new(storage)))
2836 .build()
2837 .await;
2838
2839 assert_eq!(
2840 state_machine.context.schedule.last_update_time.unwrap(),
2841 PartialComplexTime::Wall(last_update_time)
2842 );
2843 });
2844 }
2845
2846 #[test]
2847 fn test_load_server_dictated_poll_interval() {
2848 block_on(async {
2849 let mut storage = MemStorage::new();
2850 storage
2851 .set_int(SERVER_DICTATED_POLL_INTERVAL, 56789)
2852 .await
2853 .unwrap();
2854
2855 let state_machine = StateMachineBuilder::new_stub()
2856 .storage(Rc::new(Mutex::new(storage)))
2857 .build()
2858 .await;
2859
2860 assert_eq!(
2861 Some(Duration::from_micros(56789)),
2862 state_machine.context.state.server_dictated_poll_interval
2863 );
2864 });
2865 }
2866
2867 #[test]
2868 fn test_load_app() {
2869 block_on(async {
2870 let app_set = VecAppSet::new(vec![App::builder()
2871 .id("{00000000-0000-0000-0000-000000000001}")
2872 .version([1, 2, 3, 4])
2873 .build()]);
2874 let mut storage = MemStorage::new();
2875 let persisted_app = PersistedApp {
2876 cohort: Cohort {
2877 id: Some("cohort_id".to_string()),
2878 hint: Some("test_channel".to_string()),
2879 name: None,
2880 },
2881 user_counting: UserCounting::ClientRegulatedByDate(Some(22222)),
2882 };
2883 let json = serde_json::to_string(&persisted_app).unwrap();
2884 let apps = app_set.get_apps();
2885 storage.set_string(&apps[0].id, &json).await.unwrap();
2886
2887 let app_set = Rc::new(Mutex::new(app_set));
2888
2889 let _state_machine = StateMachineBuilder::new_stub()
2890 .storage(Rc::new(Mutex::new(storage)))
2891 .app_set(Rc::clone(&app_set))
2892 .build()
2893 .await;
2894
2895 let apps = app_set.lock().await.get_apps();
2896 assert_eq!(persisted_app.cohort, apps[0].cohort);
2897 assert_eq!(
2898 UserCounting::ClientRegulatedByDate(Some(22222)),
2899 apps[0].user_counting
2900 );
2901 });
2902 }
2903
2904 #[test]
2905 fn test_report_check_interval_with_no_storage() {
2906 block_on(async {
2907 let mut mock_time = MockTimeSource::new_from_now();
2908 let mut state_machine = StateMachineBuilder::new_stub()
2909 .policy_engine(StubPolicyEngine::new(mock_time.clone()))
2910 .metrics_reporter(MockMetricsReporter::new())
2911 .build()
2912 .await;
2913
2914 state_machine
2915 .report_check_interval(InstallSource::ScheduledTask)
2916 .await;
2917 assert!(state_machine.metrics_reporter.metrics.is_empty());
2919
2920 let interval = Duration::from_micros(999999);
2922 mock_time.advance(interval);
2923
2924 state_machine
2925 .report_check_interval(InstallSource::ScheduledTask)
2926 .await;
2927
2928 assert_eq!(
2929 state_machine.metrics_reporter.metrics,
2930 vec![Metrics::UpdateCheckInterval {
2931 interval,
2932 clock: ClockType::Monotonic,
2933 install_source: InstallSource::ScheduledTask,
2934 }]
2935 );
2936 });
2937 }
2938
2939 #[test]
2940 fn test_report_check_interval_mono_transition() {
2941 block_on(async {
2942 let mut mock_time = MockTimeSource::new_from_now();
2943 let mut state_machine = StateMachineBuilder::new_stub()
2944 .policy_engine(StubPolicyEngine::new(mock_time.clone()))
2945 .metrics_reporter(MockMetricsReporter::new())
2946 .build()
2947 .await;
2948
2949 let initial_duration = Duration::from_secs(999);
2952 let initial_time = mock_time.now_in_walltime() - initial_duration;
2953 state_machine.context.schedule.last_update_check_time =
2954 Some(PartialComplexTime::Wall(initial_time));
2955 state_machine
2956 .report_check_interval(InstallSource::ScheduledTask)
2957 .await;
2958
2959 let interval = Duration::from_micros(999999);
2961 mock_time.advance(interval);
2962 state_machine
2963 .report_check_interval(InstallSource::ScheduledTask)
2964 .await;
2965
2966 mock_time.advance(interval);
2969 state_machine
2970 .report_check_interval(InstallSource::ScheduledTask)
2971 .await;
2972 assert_eq!(
2973 state_machine.metrics_reporter.metrics,
2974 vec![
2975 Metrics::UpdateCheckInterval {
2976 interval: initial_duration,
2977 clock: ClockType::Wall,
2978 install_source: InstallSource::ScheduledTask,
2979 },
2980 Metrics::UpdateCheckInterval {
2981 interval,
2982 clock: ClockType::Monotonic,
2983 install_source: InstallSource::ScheduledTask,
2984 },
2985 Metrics::UpdateCheckInterval {
2986 interval,
2987 clock: ClockType::Monotonic,
2988 install_source: InstallSource::ScheduledTask,
2989 },
2990 ]
2991 );
2992 });
2993 }
2994
2995 #[derive(Debug)]
2996 pub struct TestInstaller {
2997 reboot_called: Rc<RefCell<bool>>,
2998 install_fails: usize,
2999 mock_time: MockTimeSource,
3000 }
3001 struct TestInstallerBuilder {
3002 install_fails: usize,
3003 mock_time: MockTimeSource,
3004 }
3005 impl TestInstaller {
3006 fn builder(mock_time: MockTimeSource) -> TestInstallerBuilder {
3007 TestInstallerBuilder {
3008 install_fails: 0,
3009 mock_time,
3010 }
3011 }
3012 }
3013 impl TestInstallerBuilder {
3014 fn add_install_fail(mut self) -> Self {
3015 self.install_fails += 1;
3016 self
3017 }
3018 fn build(self) -> TestInstaller {
3019 TestInstaller {
3020 reboot_called: Rc::new(RefCell::new(false)),
3021 install_fails: self.install_fails,
3022 mock_time: self.mock_time,
3023 }
3024 }
3025 }
3026 const INSTALL_DURATION: Duration = Duration::from_micros(98765433);
3027
3028 impl Installer for TestInstaller {
3029 type InstallPlan = StubPlan;
3030 type Error = StubInstallErrors;
3031 type InstallResult = ();
3032
3033 fn perform_install<'a>(
3034 &'a mut self,
3035 _install_plan: &StubPlan,
3036 observer: Option<&'a dyn ProgressObserver>,
3037 ) -> LocalBoxFuture<'a, (Self::InstallResult, Vec<AppInstallResult<Self::Error>>)> {
3038 if self.install_fails > 0 {
3039 self.install_fails -= 1;
3040 future::ready((
3041 (),
3042 vec![AppInstallResult::Failed(StubInstallErrors::Failed)],
3043 ))
3044 .boxed()
3045 } else {
3046 self.mock_time.advance(INSTALL_DURATION);
3047 async move {
3048 if let Some(observer) = observer {
3049 observer.receive_progress(None, 0.0, None, None).await;
3050 observer.receive_progress(None, 0.3, None, None).await;
3051 observer.receive_progress(None, 0.9, None, None).await;
3052 observer.receive_progress(None, 1.0, None, None).await;
3053 }
3054 ((), vec![AppInstallResult::Installed])
3055 }
3056 .boxed_local()
3057 }
3058 }
3059
3060 fn perform_reboot(&mut self) -> LocalBoxFuture<'_, Result<(), anyhow::Error>> {
3061 self.reboot_called.replace(true);
3062 future::ready(Ok(())).boxed_local()
3063 }
3064
3065 fn try_create_install_plan<'a>(
3066 &'a self,
3067 _request_params: &'a RequestParams,
3068 _request_metadata: Option<&'a RequestMetadata>,
3069 _response: &'a Response,
3070 _response_bytes: Vec<u8>,
3071 _ecdsa_signature: Option<Vec<u8>>,
3072 ) -> LocalBoxFuture<'a, Result<Self::InstallPlan, Self::Error>> {
3073 future::ready(Ok(StubPlan)).boxed_local()
3074 }
3075 }
3076
3077 #[test]
3078 fn test_report_successful_update_duration() {
3079 block_on(async {
3080 let http = MockHttpRequest::new(make_update_available_response());
3081 let storage = Rc::new(Mutex::new(MemStorage::new()));
3082
3083 let mut mock_time = MockTimeSource::new_from_now();
3084 mock_time.truncate_submicrosecond_walltime();
3085 let now = mock_time.now();
3086
3087 let update_completed_time = now + INSTALL_DURATION;
3088 let expected_update_duration = update_completed_time.wall_duration_since(now).unwrap();
3089
3090 let first_seen_time = now - Duration::from_micros(1000);
3091
3092 let expected_duration_since_first_seen = update_completed_time
3093 .wall_duration_since(first_seen_time)
3094 .unwrap();
3095
3096 let mut state_machine = StateMachineBuilder::new_stub()
3097 .http(http)
3098 .installer(TestInstaller::builder(mock_time.clone()).build())
3099 .policy_engine(StubPolicyEngine::new(mock_time.clone()))
3100 .metrics_reporter(MockMetricsReporter::new())
3101 .storage(Rc::clone(&storage))
3102 .build()
3103 .await;
3104
3105 {
3106 let mut storage = storage.lock().await;
3107 storage.set_string(INSTALL_PLAN_ID, "").await.unwrap();
3108 storage
3109 .set_time(UPDATE_FIRST_SEEN_TIME, first_seen_time)
3110 .await
3111 .unwrap();
3112 storage.commit().await.unwrap();
3113 }
3114
3115 state_machine.run_once().await;
3116
3117 #[rustfmt::skip]
3118 assert_matches!(
3119 state_machine.metrics_reporter.metrics.as_slice(),
3120 [
3121 Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
3122 Metrics::RequestsPerCheck { count: 1, successful: true },
3123 Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadStarted, event_result: EventResult::Success, .. }),
3124 Metrics::SuccessfulUpdateDuration(install_duration),
3125 Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadFinished, event_result: EventResult::Success, .. }),
3126 Metrics::OmahaEventLost(Event { event_type: EventType::UpdateComplete, event_result: EventResult::Success, .. }),
3127 Metrics::SuccessfulUpdateFromFirstSeen(duration_since_first_seen),
3128 Metrics::AttemptsToSuccessfulCheck(1),
3129 Metrics::AttemptsToSuccessfulInstall { count: 1, successful: true },
3130 ]
3131 if
3132 *install_duration == expected_update_duration &&
3133 *duration_since_first_seen == expected_duration_since_first_seen
3134 );
3135 });
3136 }
3137
3138 #[test]
3139 fn test_report_failed_update_duration() {
3140 block_on(async {
3141 let http = MockHttpRequest::new(make_update_available_response());
3142 let mut state_machine = StateMachineBuilder::new_stub()
3143 .http(http)
3144 .installer(StubInstaller { should_fail: true })
3145 .metrics_reporter(MockMetricsReporter::new())
3146 .build()
3147 .await;
3148 state_machine.run_once().await;
3151
3152 assert!(state_machine
3153 .metrics_reporter
3154 .metrics
3155 .contains(&Metrics::FailedUpdateDuration(Duration::from_micros(0))));
3156 });
3157 }
3158
3159 #[test]
3160 fn test_record_update_first_seen_time() {
3161 block_on(async {
3162 let storage = Rc::new(Mutex::new(MemStorage::new()));
3163 let mut state_machine = StateMachineBuilder::new_stub()
3164 .storage(Rc::clone(&storage))
3165 .build()
3166 .await;
3167
3168 let mut mock_time = MockTimeSource::new_from_now();
3169 mock_time.truncate_submicrosecond_walltime();
3170 let now = mock_time.now_in_walltime();
3171 assert_eq!(
3172 state_machine.record_update_first_seen_time("id", now).await,
3173 now
3174 );
3175 {
3176 let storage = storage.lock().await;
3177 assert_eq!(
3178 storage.get_string(INSTALL_PLAN_ID).await,
3179 Some("id".to_string())
3180 );
3181 assert_eq!(storage.get_time(UPDATE_FIRST_SEEN_TIME).await, Some(now));
3182 assert_eq!(storage.len(), 2);
3183 assert!(storage.committed());
3184 }
3185
3186 mock_time.advance(Duration::from_secs(1000));
3187 let now2 = mock_time.now_in_walltime();
3188 assert_eq!(
3189 state_machine
3190 .record_update_first_seen_time("id", now2)
3191 .await,
3192 now
3193 );
3194 {
3195 let storage = storage.lock().await;
3196 assert_eq!(
3197 storage.get_string(INSTALL_PLAN_ID).await,
3198 Some("id".to_string())
3199 );
3200 assert_eq!(storage.get_time(UPDATE_FIRST_SEEN_TIME).await, Some(now));
3201 assert_eq!(storage.len(), 2);
3202 assert!(storage.committed());
3203 }
3204 assert_eq!(
3205 state_machine
3206 .record_update_first_seen_time("id2", now2)
3207 .await,
3208 now2
3209 );
3210 {
3211 let storage = storage.lock().await;
3212 assert_eq!(
3213 storage.get_string(INSTALL_PLAN_ID).await,
3214 Some("id2".to_string())
3215 );
3216 assert_eq!(storage.get_time(UPDATE_FIRST_SEEN_TIME).await, Some(now2));
3217 assert_eq!(storage.len(), 2);
3218 assert!(storage.committed());
3219 }
3220 });
3221 }
3222
3223 #[test]
3224 fn test_report_attempts_to_successful_check() {
3225 block_on(async {
3226 let storage = Rc::new(Mutex::new(MemStorage::new()));
3227 let mut state_machine = StateMachineBuilder::new_stub()
3228 .installer(StubInstaller { should_fail: true })
3229 .metrics_reporter(MockMetricsReporter::new())
3230 .storage(Rc::clone(&storage))
3231 .build()
3232 .await;
3233
3234 state_machine
3235 .report_attempts_to_successful_check(true)
3236 .await;
3237
3238 assert_eq!(
3241 state_machine.context.state.consecutive_failed_update_checks,
3242 0
3243 );
3244 assert_eq!(
3245 state_machine.metrics_reporter.metrics,
3246 vec![Metrics::AttemptsToSuccessfulCheck(1)]
3247 );
3248
3249 state_machine
3250 .report_attempts_to_successful_check(false)
3251 .await;
3252 assert_eq!(
3253 state_machine.context.state.consecutive_failed_update_checks,
3254 1
3255 );
3256
3257 state_machine
3258 .report_attempts_to_successful_check(false)
3259 .await;
3260 assert_eq!(
3261 state_machine.context.state.consecutive_failed_update_checks,
3262 2
3263 );
3264
3265 state_machine
3268 .report_attempts_to_successful_check(true)
3269 .await;
3270 assert_eq!(
3271 state_machine.context.state.consecutive_failed_update_checks,
3272 0
3273 );
3274 assert_eq!(
3275 state_machine.metrics_reporter.metrics,
3276 vec![
3277 Metrics::AttemptsToSuccessfulCheck(1),
3278 Metrics::AttemptsToSuccessfulCheck(3)
3279 ]
3280 );
3281 });
3282 }
3283
3284 #[test]
3285 fn test_ping_omaha_updates_consecutive_failed_update_checks_and_persists() {
3286 block_on(async {
3287 let mut http = MockHttpRequest::empty();
3288 http.add_error(http_request::mock_errors::make_transport_error());
3289 http.add_response(HttpResponse::new(vec![]));
3290 let response = json!({"response":{
3291 "server": "prod",
3292 "protocol": "3.0",
3293 "app": [{
3294 "appid": "{00000000-0000-0000-0000-000000000001}",
3295 "status": "ok",
3296 }],
3297 }});
3298 let response = serde_json::to_vec(&response).unwrap();
3299 http.add_response(HttpResponse::new(response));
3300
3301 let storage = Rc::new(Mutex::new(MemStorage::new()));
3302
3303 {
3305 let mut storage = storage.lock().await;
3306 let _ = storage.set_int(CONSECUTIVE_FAILED_UPDATE_CHECKS, 1);
3307 let _ = storage.commit();
3308 }
3309
3310 let mut state_machine = StateMachineBuilder::new_stub()
3311 .storage(Rc::clone(&storage))
3312 .http(http)
3313 .build()
3314 .await;
3315
3316 async_generator::generate(move |mut co| async move {
3317 state_machine.ping_omaha(&mut co).await;
3320 assert_eq!(
3321 state_machine.context.state.consecutive_failed_update_checks,
3322 2
3323 );
3324 {
3325 let storage = storage.lock().await;
3326 assert_eq!(
3327 storage.get_int(CONSECUTIVE_FAILED_UPDATE_CHECKS).await,
3328 Some(2)
3329 );
3330 }
3331
3332 state_machine.ping_omaha(&mut co).await;
3333 assert_eq!(
3334 state_machine.context.state.consecutive_failed_update_checks,
3335 3
3336 );
3337 {
3338 let storage = storage.lock().await;
3339 assert_eq!(
3340 storage.get_int(CONSECUTIVE_FAILED_UPDATE_CHECKS).await,
3341 Some(3)
3342 );
3343 }
3344
3345 state_machine.ping_omaha(&mut co).await;
3347 assert_eq!(
3348 state_machine.context.state.consecutive_failed_update_checks,
3349 0
3350 );
3351 {
3352 let storage = storage.lock().await;
3353 assert_eq!(
3354 storage.get_int(CONSECUTIVE_FAILED_UPDATE_CHECKS).await,
3355 None
3356 );
3357 }
3358 })
3359 .into_complete()
3360 .await;
3361 });
3362 }
3363
3364 #[test]
3365 fn test_report_attempts_to_successful_install() {
3366 block_on(async {
3367 let http = MockHttpRequest::new(make_update_available_response());
3368 let storage = Rc::new(Mutex::new(MemStorage::new()));
3369
3370 let mock_time = MockTimeSource::new_from_now();
3371
3372 let mut state_machine = StateMachineBuilder::new_stub()
3373 .http(http)
3374 .installer(TestInstaller::builder(mock_time.clone()).build())
3375 .policy_engine(StubPolicyEngine::new(mock_time.clone()))
3376 .metrics_reporter(MockMetricsReporter::new())
3377 .storage(Rc::clone(&storage))
3378 .build()
3379 .await;
3380
3381 state_machine.run_once().await;
3382
3383 #[rustfmt::skip]
3386 assert_matches!(
3387 state_machine.metrics_reporter.metrics.as_slice(),
3388 [
3389 Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
3390 Metrics::RequestsPerCheck { count: 1, successful: true },
3391 Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadStarted, event_result: EventResult::Success, .. }),
3392 Metrics::SuccessfulUpdateDuration(_),
3393 Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadFinished, event_result: EventResult::Success, .. }),
3394 Metrics::OmahaEventLost(Event { event_type: EventType::UpdateComplete, event_result: EventResult::Success, .. }),
3395 Metrics::SuccessfulUpdateFromFirstSeen(_),
3396 Metrics::AttemptsToSuccessfulCheck(1),
3397 Metrics::AttemptsToSuccessfulInstall { count: 1, successful: true },
3398 ]
3399 );
3400 });
3401 }
3402
3403 #[test]
3404 fn test_report_attempts_to_successful_install_fails_then_succeeds() {
3405 block_on(async {
3406 let mut http = MockHttpRequest::new(make_update_available_response());
3407 http.add_response(HttpResponse::new(vec![]));
3411 http.add_response(HttpResponse::new(vec![]));
3412
3413 http.add_response(make_update_available_response());
3415 http.add_response(HttpResponse::new(vec![]));
3418 http.add_response(HttpResponse::new(vec![]));
3419
3420 let storage = Rc::new(Mutex::new(MemStorage::new()));
3421 let mock_time = MockTimeSource::new_from_now();
3422
3423 let mut state_machine = StateMachineBuilder::new_stub()
3424 .http(http)
3425 .installer(
3426 TestInstaller::builder(mock_time.clone())
3427 .add_install_fail()
3428 .build(),
3429 )
3430 .policy_engine(StubPolicyEngine::new(mock_time.clone()))
3431 .metrics_reporter(MockMetricsReporter::new())
3432 .storage(Rc::clone(&storage))
3433 .build()
3434 .await;
3435
3436 state_machine.run_once().await;
3437 state_machine.run_once().await;
3438
3439 #[rustfmt::skip]
3442 assert_matches!(
3443 state_machine.metrics_reporter.metrics.as_slice(),
3444 [
3445 Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
3446 Metrics::RequestsPerCheck { count: 1, successful: true },
3447 Metrics::FailedUpdateDuration(_),
3448 Metrics::AttemptsToSuccessfulCheck(1),
3449 Metrics::AttemptsToSuccessfulInstall { count: 1, successful: false },
3450 Metrics::UpdateCheckInterval { .. },
3451 Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
3452 Metrics::RequestsPerCheck { count: 1, successful: true },
3453 Metrics::SuccessfulUpdateDuration(_),
3454 Metrics::OmahaEventLost(Event { .. }),
3455 Metrics::SuccessfulUpdateFromFirstSeen(_),
3456 Metrics::AttemptsToSuccessfulCheck(1),
3457 Metrics::AttemptsToSuccessfulInstall { count: 2, successful: true }
3458 ]
3459 );
3460 });
3461 }
3462
3463 #[test]
3464 fn test_report_attempts_to_successful_install_does_not_report_for_no_update() {
3465 block_on(async {
3466 let response = json!({"response":{
3467 "server": "prod",
3468 "protocol": "3.0",
3469 "app": [{
3470 "appid": "{00000000-0000-0000-0000-000000000001}",
3471 "status": "ok",
3472 "updatecheck": {
3473 "status": "noupdate",
3474 "info": "no update for you"
3475 }
3476 }],
3477 }});
3478 let response = serde_json::to_vec(&response).unwrap();
3479 let http = MockHttpRequest::new(HttpResponse::new(response.clone()));
3480
3481 let storage = Rc::new(Mutex::new(MemStorage::new()));
3482 let mock_time = MockTimeSource::new_from_now();
3483
3484 let mut state_machine = StateMachineBuilder::new_stub()
3485 .http(http)
3486 .installer(TestInstaller::builder(mock_time.clone()).build())
3487 .policy_engine(StubPolicyEngine::new(mock_time.clone()))
3488 .metrics_reporter(MockMetricsReporter::new())
3489 .storage(Rc::clone(&storage))
3490 .build()
3491 .await;
3492
3493 state_machine.run_once().await;
3494
3495 #[rustfmt::skip]
3498 assert_matches!(
3499 state_machine.metrics_reporter.metrics.as_slice(),
3500 [
3501 Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
3502 Metrics::RequestsPerCheck { count: 1, successful: true },
3503 Metrics::AttemptsToSuccessfulCheck(1),
3504 ]
3505 );
3506 });
3507 }
3508
3509 #[test]
3510 fn test_successful_update_triggers_reboot() {
3511 let mut pool = LocalPool::new();
3512 let spawner = pool.spawner();
3513
3514 let http = MockHttpRequest::new(make_update_available_response());
3515 let mock_time = MockTimeSource::new_from_now();
3516 let next_update_time = mock_time.now();
3517 let (timer, mut timers) = BlockingTimer::new();
3518
3519 let installer = TestInstaller::builder(mock_time.clone()).build();
3520 let reboot_called = Rc::clone(&installer.reboot_called);
3521 let (_ctl, state_machine) = pool.run_until(
3522 StateMachineBuilder::new_stub()
3523 .http(http)
3524 .installer(installer)
3525 .policy_engine(StubPolicyEngine::new(mock_time))
3526 .timer(timer)
3527 .start(),
3528 );
3529 let observer = TestObserver::default();
3530 spawner
3531 .spawn_local(observer.observe(state_machine))
3532 .unwrap();
3533
3534 let blocked_timer = pool.run_until(timers.next()).unwrap();
3535 assert_eq!(
3536 blocked_timer.requested_wait(),
3537 RequestedWait::Until(next_update_time.into())
3538 );
3539 blocked_timer.unblock();
3540 pool.run_until_stalled();
3541
3542 assert!(*reboot_called.borrow());
3543 }
3544
3545 #[test]
3546 fn test_skip_reboot_if_not_needed() {
3547 let mut pool = LocalPool::new();
3548 let spawner = pool.spawner();
3549
3550 let http = MockHttpRequest::new(make_update_available_response());
3551 let mock_time = MockTimeSource::new_from_now();
3552 let next_update_time = mock_time.now();
3553 let reboot_check_options_received = Rc::new(RefCell::new(vec![]));
3554 let policy_engine = MockPolicyEngine {
3555 reboot_check_options_received: Rc::clone(&reboot_check_options_received),
3556 check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
3557 time_source: mock_time.clone(),
3558 reboot_needed: Rc::new(RefCell::new(false)),
3559 ..MockPolicyEngine::default()
3560 };
3561 let (timer, mut timers) = BlockingTimer::new();
3562
3563 let installer = TestInstaller::builder(mock_time).build();
3564 let reboot_called = Rc::clone(&installer.reboot_called);
3565 let (_ctl, state_machine) = pool.run_until(
3566 StateMachineBuilder::new_stub()
3567 .http(http)
3568 .installer(installer)
3569 .policy_engine(policy_engine)
3570 .timer(timer)
3571 .start(),
3572 );
3573 let observer = TestObserver::default();
3574 spawner
3575 .spawn_local(observer.observe(state_machine))
3576 .unwrap();
3577
3578 let blocked_timer = pool.run_until(timers.next()).unwrap();
3579 assert_eq!(
3580 blocked_timer.requested_wait(),
3581 RequestedWait::Until(next_update_time.into())
3582 );
3583 blocked_timer.unblock();
3584 pool.run_until_stalled();
3585
3586 assert_eq!(
3587 observer.take_states(),
3588 vec![
3589 State::CheckingForUpdates(InstallSource::ScheduledTask),
3590 State::InstallingUpdate,
3591 State::Idle
3592 ]
3593 );
3594
3595 assert_eq!(*reboot_check_options_received.borrow(), vec![]);
3596 assert!(!*reboot_called.borrow());
3597 }
3598
3599 #[test]
3600 fn test_failed_update_does_not_trigger_reboot() {
3601 let mut pool = LocalPool::new();
3602 let spawner = pool.spawner();
3603
3604 let http = MockHttpRequest::new(make_update_available_response());
3605 let mock_time = MockTimeSource::new_from_now();
3606 let next_update_time = mock_time.now();
3607 let (timer, mut timers) = BlockingTimer::new();
3608
3609 let installer = TestInstaller::builder(mock_time.clone())
3610 .add_install_fail()
3611 .build();
3612 let reboot_called = Rc::clone(&installer.reboot_called);
3613 let (_ctl, state_machine) = pool.run_until(
3614 StateMachineBuilder::new_stub()
3615 .http(http)
3616 .installer(installer)
3617 .policy_engine(StubPolicyEngine::new(mock_time))
3618 .timer(timer)
3619 .start(),
3620 );
3621 let observer = TestObserver::default();
3622 spawner
3623 .spawn_local(observer.observe(state_machine))
3624 .unwrap();
3625
3626 let blocked_timer = pool.run_until(timers.next()).unwrap();
3627 assert_eq!(
3628 blocked_timer.requested_wait(),
3629 RequestedWait::Until(next_update_time.into())
3630 );
3631 blocked_timer.unblock();
3632 pool.run_until_stalled();
3633
3634 assert!(!*reboot_called.borrow());
3635 }
3636
3637 #[test]
3641 fn test_reboots_immediately_if_user_initiated_update_requests_occurs_during_install() {
3642 let mut pool = LocalPool::new();
3643 let spawner = pool.spawner();
3644
3645 let http = MockHttpRequest::new(make_update_available_response());
3646 let mock_time = MockTimeSource::new_from_now();
3647
3648 let (send_install, mut recv_install) = mpsc::channel(0);
3649 let (send_reboot, mut recv_reboot) = mpsc::channel(0);
3650 let reboot_check_options_received = Rc::new(RefCell::new(vec![]));
3651 let policy_engine = MockPolicyEngine {
3652 reboot_check_options_received: Rc::clone(&reboot_check_options_received),
3653 check_timing: Some(CheckTiming::builder().time(mock_time.now()).build()),
3654 ..MockPolicyEngine::default()
3655 };
3656
3657 let (mut ctl, state_machine) = pool.run_until(
3658 StateMachineBuilder::new_stub()
3659 .http(http)
3660 .installer(BlockingInstaller {
3661 on_install: send_install,
3662 on_reboot: Some(send_reboot),
3663 })
3664 .policy_engine(policy_engine)
3665 .start(),
3666 );
3667
3668 let observer = TestObserver::default();
3669 spawner
3670 .spawn_local(observer.observe(state_machine))
3671 .unwrap();
3672
3673 let unblock_install = pool.run_until(recv_install.next()).unwrap();
3674 pool.run_until_stalled();
3675 assert_eq!(
3676 observer.take_states(),
3677 vec![
3678 State::CheckingForUpdates(InstallSource::ScheduledTask),
3679 State::InstallingUpdate
3680 ]
3681 );
3682
3683 pool.run_until(async {
3684 assert_eq!(
3685 ctl.start_update_check(CheckOptions {
3686 source: InstallSource::OnDemand
3687 })
3688 .await,
3689 Ok(StartUpdateCheckResponse::AlreadyRunning)
3690 );
3691 });
3692
3693 pool.run_until_stalled();
3694 assert_eq!(observer.take_states(), vec![]);
3695
3696 unblock_install
3697 .send(vec![AppInstallResult::Installed])
3698 .unwrap();
3699 pool.run_until_stalled();
3700 assert_eq!(observer.take_states(), vec![State::WaitingForReboot]);
3701
3702 let unblock_reboot = pool.run_until(recv_reboot.next()).unwrap();
3703 pool.run_until_stalled();
3704 unblock_reboot.send(Ok(())).unwrap();
3705
3706 assert_eq!(
3708 *reboot_check_options_received.borrow(),
3709 vec![CheckOptions {
3710 source: InstallSource::OnDemand
3711 }]
3712 );
3713 }
3714
3715 #[test]
3718 fn test_reboots_immediately_when_check_now_comes_in_during_wait() {
3719 let mut pool = LocalPool::new();
3720 let spawner = pool.spawner();
3721
3722 let mut http = MockHttpRequest::new(make_update_available_response());
3723 http.add_response(HttpResponse::new(vec![]));
3725 http.add_response(HttpResponse::new(vec![]));
3726 http.add_response(HttpResponse::new(vec![]));
3727 http.add_response(make_update_available_response());
3729 let mut mock_time = MockTimeSource::new_from_now();
3730 mock_time.truncate_submicrosecond_walltime();
3731 let next_update_time = mock_time.now() + Duration::from_secs(1000);
3732 let (timer, mut timers) = BlockingTimer::new();
3733 let reboot_allowed = Rc::new(RefCell::new(false));
3734 let reboot_check_options_received = Rc::new(RefCell::new(vec![]));
3735 let policy_engine = MockPolicyEngine {
3736 time_source: mock_time.clone(),
3737 reboot_allowed: Rc::clone(&reboot_allowed),
3738 check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
3739 reboot_check_options_received: Rc::clone(&reboot_check_options_received),
3740 ..MockPolicyEngine::default()
3741 };
3742 let installer = TestInstaller::builder(mock_time.clone()).build();
3743 let reboot_called = Rc::clone(&installer.reboot_called);
3744 let storage_ref = Rc::new(Mutex::new(MemStorage::new()));
3745 let apps = make_test_app_set();
3746
3747 let (mut ctl, state_machine) = pool.run_until(
3748 StateMachineBuilder::new_stub()
3749 .app_set(apps)
3750 .http(http)
3751 .installer(installer)
3752 .policy_engine(policy_engine)
3753 .timer(timer)
3754 .storage(Rc::clone(&storage_ref))
3755 .start(),
3756 );
3757
3758 let observer = TestObserver::default();
3759 spawner
3760 .spawn_local(observer.observe(state_machine))
3761 .unwrap();
3762
3763 let blocked_timer = pool.run_until(timers.next()).unwrap();
3765 assert_eq!(
3766 blocked_timer.requested_wait(),
3767 RequestedWait::Until(next_update_time.into())
3768 );
3769 blocked_timer.unblock();
3770 pool.run_until_stalled();
3771
3772 let blocked_timer1 = pool.run_until(timers.next()).unwrap();
3775 let blocked_timer2 = pool.run_until(timers.next()).unwrap();
3776 let (wait_for_reboot_timer, _wait_for_next_ping_timer) =
3777 match blocked_timer1.requested_wait() {
3778 RequestedWait::For(_) => (blocked_timer1, blocked_timer2),
3779 RequestedWait::Until(_) => (blocked_timer2, blocked_timer1),
3780 };
3781 assert_eq!(
3783 wait_for_reboot_timer.requested_wait(),
3784 RequestedWait::For(CHECK_REBOOT_ALLOWED_INTERVAL)
3785 );
3786
3787 assert!(!*reboot_called.borrow());
3790 *reboot_allowed.borrow_mut() = true;
3791 pool.run_until(async {
3792 assert_eq!(
3793 ctl.start_update_check(CheckOptions {
3794 source: InstallSource::OnDemand
3795 })
3796 .await,
3797 Ok(StartUpdateCheckResponse::AlreadyRunning)
3798 );
3799 });
3800 pool.run_until_stalled();
3801 assert!(*reboot_called.borrow());
3802
3803 assert_eq!(
3807 *reboot_check_options_received.borrow(),
3808 vec![
3809 CheckOptions {
3810 source: InstallSource::ScheduledTask
3811 },
3812 CheckOptions {
3813 source: InstallSource::OnDemand
3814 },
3815 ]
3816 );
3817 }
3818
3819 #[test]
3823 fn test_wait_for_reboot() {
3824 let mut pool = LocalPool::new();
3825 let spawner = pool.spawner();
3826
3827 let mut http = MockHttpRequest::new(make_update_available_response());
3828 http.add_response(HttpResponse::new(vec![]));
3830 http.add_response(HttpResponse::new(vec![]));
3831 http.add_response(HttpResponse::new(vec![]));
3832 http.add_response(make_update_available_response());
3834 let ping_request_viewer = MockHttpRequest::from_request_cell(http.get_request_cell());
3835 let mut mock_time = MockTimeSource::new_from_now();
3836 mock_time.truncate_submicrosecond_walltime();
3837 let next_update_time = mock_time.now() + Duration::from_secs(1000);
3838 let (timer, mut timers) = BlockingTimer::new();
3839 let reboot_allowed = Rc::new(RefCell::new(false));
3840 let policy_engine = MockPolicyEngine {
3841 time_source: mock_time.clone(),
3842 reboot_allowed: Rc::clone(&reboot_allowed),
3843 check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
3844 ..MockPolicyEngine::default()
3845 };
3846 let installer = TestInstaller::builder(mock_time.clone()).build();
3847 let reboot_called = Rc::clone(&installer.reboot_called);
3848 let storage_ref = Rc::new(Mutex::new(MemStorage::new()));
3849 let apps = make_test_app_set();
3850
3851 let (mut ctl, state_machine) = pool.run_until(
3852 StateMachineBuilder::new_stub()
3853 .app_set(apps.clone())
3854 .http(http)
3855 .installer(installer)
3856 .policy_engine(policy_engine)
3857 .timer(timer)
3858 .storage(Rc::clone(&storage_ref))
3859 .start(),
3860 );
3861
3862 let observer = TestObserver::default();
3863 spawner
3864 .spawn_local(observer.observe(state_machine))
3865 .unwrap();
3866
3867 let blocked_timer = pool.run_until(timers.next()).unwrap();
3869 assert_eq!(
3870 blocked_timer.requested_wait(),
3871 RequestedWait::Until(next_update_time.into())
3872 );
3873 blocked_timer.unblock();
3874 pool.run_until_stalled();
3875
3876 let blocked_timer1 = pool.run_until(timers.next()).unwrap();
3879 let blocked_timer2 = pool.run_until(timers.next()).unwrap();
3880 let (wait_for_reboot_timer, wait_for_next_ping_timer) =
3881 match blocked_timer1.requested_wait() {
3882 RequestedWait::For(_) => (blocked_timer1, blocked_timer2),
3883 RequestedWait::Until(_) => (blocked_timer2, blocked_timer1),
3884 };
3885 assert_eq!(
3887 wait_for_reboot_timer.requested_wait(),
3888 RequestedWait::For(CHECK_REBOOT_ALLOWED_INTERVAL)
3889 );
3890 assert_eq!(
3892 wait_for_next_ping_timer.requested_wait(),
3893 RequestedWait::Until(next_update_time.into())
3894 );
3895 mock_time.advance(Duration::from_secs(1000));
3897 wait_for_next_ping_timer.unblock();
3898 pool.run_until_stalled();
3899
3900 let config = crate::configuration::test_support::config_generator();
3902 let request_params = RequestParams::default();
3903
3904 let apps = pool.run_until(apps.lock()).get_apps();
3905 let mut expected_request_builder = RequestBuilder::new(&config, &request_params)
3906 .session_id(GUID::from_u128(5))
3910 .request_id(GUID::from_u128(6));
3911 for app in &apps {
3912 expected_request_builder = expected_request_builder.add_ping(app);
3913 }
3914 pool.run_until(assert_request(
3915 &ping_request_viewer,
3916 expected_request_builder,
3917 ));
3918
3919 pool.run_until(async {
3920 assert_eq!(
3921 ctl.start_update_check(CheckOptions::default()).await,
3922 Ok(StartUpdateCheckResponse::AlreadyRunning)
3923 );
3924 });
3925
3926 pool.run_until(async {
3928 let storage = storage_ref.lock().await;
3929 let context = update_check::Context::load(&*storage).await;
3930 assert_eq!(
3931 context.schedule.last_update_time,
3932 Some(mock_time.now_in_walltime().into())
3933 );
3934 });
3935
3936 let wait_for_next_ping_timer = pool.run_until(timers.next()).unwrap();
3938 assert_eq!(
3939 wait_for_next_ping_timer.requested_wait(),
3940 RequestedWait::Until(next_update_time.into())
3941 );
3942
3943 wait_for_reboot_timer.unblock();
3945 pool.run_until_stalled();
3946 assert!(!*reboot_called.borrow());
3947
3948 let wait_for_reboot_timer = pool.run_until(timers.next()).unwrap();
3950 assert_eq!(
3951 wait_for_reboot_timer.requested_wait(),
3952 RequestedWait::For(CHECK_REBOOT_ALLOWED_INTERVAL)
3953 );
3954
3955 wait_for_next_ping_timer.unblock();
3957 pool.run_until_stalled();
3958
3959 let mut expected_request_builder = RequestBuilder::new(&config, &request_params)
3961 .session_id(GUID::from_u128(7))
3962 .request_id(GUID::from_u128(8));
3963 for app in &apps {
3964 expected_request_builder = expected_request_builder.add_ping(app);
3965 }
3966 pool.run_until(assert_request(
3967 &ping_request_viewer,
3968 expected_request_builder,
3969 ));
3970
3971 assert!(!*reboot_called.borrow());
3972
3973 *reboot_called.borrow_mut() = true;
3975 wait_for_reboot_timer.unblock();
3976 pool.run_until_stalled();
3977 assert!(*reboot_called.borrow());
3978 }
3979
3980 #[derive(Debug)]
3981 struct BlockingInstaller {
3982 on_install: mpsc::Sender<oneshot::Sender<Vec<AppInstallResult<StubInstallErrors>>>>,
3983 on_reboot: Option<mpsc::Sender<oneshot::Sender<Result<(), anyhow::Error>>>>,
3984 }
3985
3986 impl Installer for BlockingInstaller {
3987 type InstallPlan = StubPlan;
3988 type Error = StubInstallErrors;
3989 type InstallResult = ();
3990
3991 fn perform_install(
3992 &mut self,
3993 _install_plan: &StubPlan,
3994 _observer: Option<&dyn ProgressObserver>,
3995 ) -> LocalBoxFuture<'_, (Self::InstallResult, Vec<AppInstallResult<Self::Error>>)> {
3996 let (send, recv) = oneshot::channel();
3997 let send_fut = self.on_install.send(send);
3998
3999 async move {
4000 send_fut.await.unwrap();
4001 ((), recv.await.unwrap())
4002 }
4003 .boxed_local()
4004 }
4005
4006 fn perform_reboot(&mut self) -> LocalBoxFuture<'_, Result<(), anyhow::Error>> {
4007 match &mut self.on_reboot {
4008 Some(on_reboot) => {
4009 let (send, recv) = oneshot::channel();
4010 let send_fut = on_reboot.send(send);
4011
4012 async move {
4013 send_fut.await.unwrap();
4014 recv.await.unwrap()
4015 }
4016 .boxed_local()
4017 }
4018 None => future::ready(Ok(())).boxed_local(),
4019 }
4020 }
4021
4022 fn try_create_install_plan<'a>(
4023 &'a self,
4024 _request_params: &'a RequestParams,
4025 _request_metadata: Option<&'a RequestMetadata>,
4026 _response: &'a Response,
4027 _response_bytes: Vec<u8>,
4028 _ecdsa_signature: Option<Vec<u8>>,
4029 ) -> LocalBoxFuture<'a, Result<Self::InstallPlan, Self::Error>> {
4030 future::ready(Ok(StubPlan)).boxed_local()
4031 }
4032 }
4033
4034 #[derive(Debug, Default)]
4035 struct TestObserver {
4036 states: Rc<RefCell<Vec<State>>>,
4037 }
4038
4039 impl TestObserver {
4040 fn observe(&self, s: impl Stream<Item = StateMachineEvent>) -> impl Future<Output = ()> {
4041 let states = Rc::clone(&self.states);
4042 async move {
4043 futures::pin_mut!(s);
4044 while let Some(event) = s.next().await {
4045 if let StateMachineEvent::StateChange(state) = event {
4046 states.borrow_mut().push(state);
4047 }
4048 }
4049 }
4050 }
4051
4052 fn observe_until_terminal(
4053 &self,
4054 s: impl Stream<Item = StateMachineEvent>,
4055 ) -> impl Future<Output = ()> {
4056 let states = Rc::clone(&self.states);
4057 async move {
4058 futures::pin_mut!(s);
4059 while let Some(event) = s.next().await {
4060 if let StateMachineEvent::StateChange(state) = event {
4061 states.borrow_mut().push(state);
4062 match state {
4063 State::Idle | State::WaitingForReboot => return,
4064 _ => {}
4065 }
4066 }
4067 }
4068 }
4069 }
4070
4071 fn take_states(&self) -> Vec<State> {
4072 std::mem::take(&mut *self.states.borrow_mut())
4073 }
4074 }
4075
4076 #[test]
4077 fn test_start_update_during_update_replies_with_in_progress() {
4078 let mut pool = LocalPool::new();
4079 let spawner = pool.spawner();
4080
4081 let http = MockHttpRequest::new(make_update_available_response());
4082 let (send_install, mut recv_install) = mpsc::channel(0);
4083 let (mut ctl, state_machine) = pool.run_until(
4084 StateMachineBuilder::new_stub()
4085 .http(http)
4086 .installer(BlockingInstaller {
4087 on_install: send_install,
4088 on_reboot: None,
4089 })
4090 .start(),
4091 );
4092
4093 let observer = TestObserver::default();
4094 spawner
4095 .spawn_local(observer.observe_until_terminal(state_machine))
4096 .unwrap();
4097
4098 let unblock_install = pool.run_until(recv_install.next()).unwrap();
4099 pool.run_until_stalled();
4100 assert_eq!(
4101 observer.take_states(),
4102 vec![
4103 State::CheckingForUpdates(InstallSource::ScheduledTask),
4104 State::InstallingUpdate
4105 ]
4106 );
4107
4108 pool.run_until(async {
4109 assert_eq!(
4110 ctl.start_update_check(CheckOptions::default()).await,
4111 Ok(StartUpdateCheckResponse::AlreadyRunning)
4112 );
4113 });
4114 pool.run_until_stalled();
4115 assert_eq!(observer.take_states(), vec![]);
4116
4117 unblock_install
4118 .send(vec![AppInstallResult::Installed])
4119 .unwrap();
4120 pool.run_until_stalled();
4121
4122 assert_eq!(observer.take_states(), vec![State::WaitingForReboot]);
4123 }
4124
4125 #[test]
4126 fn test_start_update_during_timer_starts_update() {
4127 let mut pool = LocalPool::new();
4128 let spawner = pool.spawner();
4129
4130 let mut mock_time = MockTimeSource::new_from_now();
4131 let next_update_time = mock_time.now() + Duration::from_secs(321);
4132
4133 let (timer, mut timers) = BlockingTimer::new();
4134 let policy_engine = MockPolicyEngine {
4135 check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
4136 time_source: mock_time.clone(),
4137 ..MockPolicyEngine::default()
4138 };
4139 let (mut ctl, state_machine) = pool.run_until(
4140 StateMachineBuilder::new_stub()
4141 .policy_engine(policy_engine)
4142 .timer(timer)
4143 .start(),
4144 );
4145
4146 let observer = TestObserver::default();
4147 spawner
4148 .spawn_local(observer.observe(state_machine))
4149 .unwrap();
4150
4151 let blocked_timer = pool.run_until(timers.next()).unwrap();
4152 assert_eq!(
4153 blocked_timer.requested_wait(),
4154 RequestedWait::Until(next_update_time.into())
4155 );
4156 mock_time.advance(Duration::from_secs(200));
4157 assert_eq!(observer.take_states(), vec![]);
4158
4159 pool.run_until_stalled();
4161 assert_eq!(observer.take_states(), vec![]);
4162
4163 blocked_timer.unblock();
4164 let blocked_timer = pool.run_until(timers.next()).unwrap();
4165 assert_eq!(
4166 blocked_timer.requested_wait(),
4167 RequestedWait::Until(next_update_time.into())
4168 );
4169 assert_eq!(
4170 observer.take_states(),
4171 vec![
4172 State::CheckingForUpdates(InstallSource::ScheduledTask),
4173 State::ErrorCheckingForUpdate,
4174 State::Idle
4175 ]
4176 );
4177
4178 pool.run_until(async {
4180 assert_eq!(
4181 ctl.start_update_check(CheckOptions::default()).await,
4182 Ok(StartUpdateCheckResponse::Started)
4183 );
4184 });
4185 pool.run_until_stalled();
4186 assert_eq!(
4187 observer.take_states(),
4188 vec![
4189 State::CheckingForUpdates(InstallSource::ScheduledTask),
4190 State::ErrorCheckingForUpdate,
4191 State::Idle
4192 ]
4193 );
4194 }
4195
4196 #[test]
4197 fn test_start_update_check_returns_throttled() {
4198 let mut pool = LocalPool::new();
4199 let spawner = pool.spawner();
4200
4201 let mut mock_time = MockTimeSource::new_from_now();
4202 let next_update_time = mock_time.now() + Duration::from_secs(321);
4203
4204 let (timer, mut timers) = BlockingTimer::new();
4205 let policy_engine = MockPolicyEngine {
4206 check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
4207 time_source: mock_time.clone(),
4208 check_decision: CheckDecision::ThrottledByPolicy,
4209 ..MockPolicyEngine::default()
4210 };
4211 let (mut ctl, state_machine) = pool.run_until(
4212 StateMachineBuilder::new_stub()
4213 .policy_engine(policy_engine)
4214 .timer(timer)
4215 .start(),
4216 );
4217
4218 let observer = TestObserver::default();
4219 spawner
4220 .spawn_local(observer.observe(state_machine))
4221 .unwrap();
4222
4223 let blocked_timer = pool.run_until(timers.next()).unwrap();
4224 assert_eq!(
4225 blocked_timer.requested_wait(),
4226 RequestedWait::Until(next_update_time.into())
4227 );
4228 mock_time.advance(Duration::from_secs(200));
4229 assert_eq!(observer.take_states(), vec![]);
4230
4231 pool.run_until(async {
4232 assert_eq!(
4233 ctl.start_update_check(CheckOptions::default()).await,
4234 Ok(StartUpdateCheckResponse::Throttled)
4235 );
4236 });
4237 pool.run_until_stalled();
4238 assert_eq!(observer.take_states(), vec![]);
4239 }
4240
4241 #[test]
4242 fn test_progress_observer() {
4243 block_on(async {
4244 let http = MockHttpRequest::new(make_update_available_response());
4245 let mock_time = MockTimeSource::new_from_now();
4246 let progresses = StateMachineBuilder::new_stub()
4247 .http(http)
4248 .installer(TestInstaller::builder(mock_time.clone()).build())
4249 .policy_engine(StubPolicyEngine::new(mock_time))
4250 .oneshot_check()
4251 .await
4252 .filter_map(|event| {
4253 future::ready(match event {
4254 StateMachineEvent::InstallProgressChange(InstallProgress { progress }) => {
4255 Some(progress)
4256 }
4257 _ => None,
4258 })
4259 })
4260 .collect::<Vec<f32>>()
4261 .await;
4262 assert_eq!(progresses, [0.0, 0.3, 0.9, 1.0]);
4263 });
4264 }
4265
4266 #[test]
4267 fn test_report_waited_for_reboot_duration_doesnt_panic_on_wrong_current_time() {
4271 block_on(async {
4272 let metrics_reporter = MockMetricsReporter::new();
4273
4274 let state_machine_start_monotonic = Instant::now();
4275 let update_finish_time = SystemTime::now();
4276
4277 let now_wall = update_finish_time + Duration::from_secs(1);
4281 let now_monotonic = state_machine_start_monotonic + Duration::from_secs(10);
4282
4283 let mut state_machine = StateMachineBuilder::new_stub()
4284 .metrics_reporter(metrics_reporter)
4285 .build()
4286 .await;
4287
4288 state_machine
4292 .report_waited_for_reboot_duration(
4293 update_finish_time,
4294 state_machine_start_monotonic,
4295 ComplexTime {
4296 wall: now_wall,
4297 mono: now_monotonic,
4298 },
4299 )
4300 .expect_err("should overflow and error out");
4301
4302 assert!(state_machine.metrics_reporter.metrics.is_empty());
4304 });
4305 }
4306
4307 #[test]
4308 fn test_report_waited_for_reboot_duration() {
4309 let mut pool = LocalPool::new();
4310 let spawner = pool.spawner();
4311
4312 let response = json!({"response": {
4313 "server": "prod",
4314 "protocol": "3.0",
4315 "app": [{
4316 "appid": "{00000000-0000-0000-0000-000000000001}",
4317 "status": "ok",
4318 "updatecheck": {
4319 "status": "ok",
4320 "manifest": {
4321 "version": "1.2.3.5",
4322 "actions": {
4323 "action": [],
4324 },
4325 "packages": {
4326 "package": [],
4327 },
4328 }
4329 }
4330 }],
4331 }});
4332 let response = serde_json::to_vec(&response).unwrap();
4333 let http = MockHttpRequest::new(HttpResponse::new(response));
4334 let mut mock_time = MockTimeSource::new_from_now();
4335 mock_time.truncate_submicrosecond_walltime();
4336 let storage = Rc::new(Mutex::new(MemStorage::new()));
4337
4338 assert_matches!(
4340 pool.run_until(
4341 StateMachineBuilder::new_stub()
4342 .http(http)
4343 .policy_engine(StubPolicyEngine::new(mock_time.clone()))
4344 .storage(Rc::clone(&storage))
4345 .oneshot(RequestParams::default())
4346 ),
4347 Ok(_)
4348 );
4349
4350 mock_time.advance(Duration::from_secs(999));
4351
4352 let config = Config {
4354 updater: Updater {
4355 name: "updater".to_string(),
4356 version: Version::from([0, 1]),
4357 },
4358 os: OS {
4359 version: "1.2.3.5".to_string(),
4360 ..OS::default()
4361 },
4362 service_url: "http://example.com/".to_string(),
4363 omaha_public_keys: None,
4364 };
4365 let metrics_reporter = Rc::new(RefCell::new(MockMetricsReporter::new()));
4366 let (_ctl, state_machine) = pool.run_until(
4367 StateMachineBuilder::new_stub()
4368 .config(config)
4369 .metrics_reporter(Rc::clone(&metrics_reporter))
4370 .policy_engine(StubPolicyEngine::new(mock_time.clone()))
4371 .storage(Rc::clone(&storage))
4372 .timer(MockTimer::new())
4373 .start(),
4374 );
4375
4376 let observer = TestObserver::default();
4378 spawner
4379 .spawn_local(observer.observe(state_machine))
4380 .unwrap();
4381 pool.run_until_stalled();
4382
4383 assert_eq!(
4384 metrics_reporter
4385 .borrow()
4386 .metrics
4387 .iter()
4388 .filter(|m| matches!(m, Metrics::WaitedForRebootDuration(_)))
4389 .collect::<Vec<_>>(),
4390 vec![&Metrics::WaitedForRebootDuration(Duration::from_secs(999))]
4391 );
4392
4393 pool.run_until(async {
4395 let storage = storage.lock().await;
4396 assert_eq!(storage.get_time(UPDATE_FINISH_TIME).await, None);
4397 assert_eq!(storage.get_string(TARGET_VERSION).await, None);
4398 assert!(storage.committed());
4399 })
4400 }
4401
4402 #[test]
4404 fn run_cup_but_decoration_error() {
4405 block_on(async {
4406 let http = MockHttpRequest::new(HttpResponse::new(make_noupdate_httpresponse()));
4407
4408 let stub_cup_handler = MockCupv2Handler::new().set_decoration_error(|| {
4409 Some(CupDecorationError::ParseError(
4410 "".parse::<http::Uri>().unwrap_err(),
4411 ))
4412 });
4413
4414 assert_matches!(
4415 StateMachineBuilder::new_stub()
4416 .http(http)
4417 .cup_handler(Some(stub_cup_handler))
4418 .oneshot(RequestParams::default())
4419 .await,
4420 Err(UpdateCheckError::OmahaRequest(
4421 OmahaRequestError::CupDecoration(CupDecorationError::ParseError(_))
4422 ))
4423 );
4424
4425 info!("update check complete!");
4426 });
4427 }
4428
4429 #[test]
4430 fn run_cup_but_verification_error() {
4431 block_on(async {
4432 let http = MockHttpRequest::new(HttpResponse::new(make_noupdate_httpresponse()));
4433
4434 let stub_cup_handler = MockCupv2Handler::new()
4435 .set_verification_error(|| Some(CupVerificationError::EtagHeaderMissing));
4436
4437 assert_matches!(
4438 StateMachineBuilder::new_stub()
4439 .http(http)
4440 .cup_handler(Some(stub_cup_handler))
4441 .oneshot(RequestParams::default())
4442 .await,
4443 Err(UpdateCheckError::OmahaRequest(
4444 OmahaRequestError::CupValidation(CupVerificationError::EtagHeaderMissing)
4445 ))
4446 );
4447
4448 info!("update check complete!");
4449 });
4450 }
4451
4452 #[test]
4453 fn run_cup_valid() {
4454 block_on(async {
4455 let http = MockHttpRequest::new(HttpResponse::new(make_noupdate_httpresponse()));
4456
4457 assert_matches!(
4458 StateMachineBuilder::new_stub()
4459 .http(http)
4460 .oneshot(RequestParams::default())
4462 .await,
4463 Ok(_)
4464 );
4465
4466 info!("update check complete!");
4467 });
4468 }
4469}