1use crate::{
10 app_set::{AppSet, AppSetExt as _},
11 async_generator,
12 common::{App, CheckOptions, CheckTiming},
13 configuration::Config,
14 cup_ecdsa::{CupDecorationError, CupVerificationError, Cupv2Handler, RequestMetadata},
15 http_request::{self, HttpRequest},
16 installer::{AppInstallResult, Installer, Plan},
17 metrics::{ClockType, Metrics, MetricsReporter, UpdateCheckFailureReason},
18 policy::{CheckDecision, PolicyEngine, UpdateDecision},
19 protocol::{
20 self,
21 request::{Event, EventErrorCode, EventResult, EventType, GUID, InstallSource},
22 response::{OmahaStatus, Response, UpdateCheck, parse_json_response},
23 },
24 request_builder::{self, RequestBuilder, RequestParams},
25 storage::{Storage, StorageExt},
26 time::{ComplexTime, PartialComplexTime, TimeSource, Timer},
27};
28
29use anyhow::anyhow;
30use futures::{
31 channel::{mpsc, oneshot},
32 future::{self, BoxFuture, Fuse},
33 lock::Mutex,
34 prelude::*,
35 select,
36};
37use http::{Response as HttpResponse, response::Parts};
38use log::{error, info, warn};
39use p256::ecdsa::DerSignature;
40use std::{
41 cmp::min,
42 collections::HashMap,
43 convert::TryInto,
44 rc::Rc,
45 str::Utf8Error,
46 time::{Duration, Instant, SystemTime},
47};
48use thiserror::Error;
49
50pub mod update_check;
51
52mod builder;
53pub use builder::StateMachineBuilder;
54
55mod observer;
56use observer::StateMachineProgressObserver;
57pub use observer::{InstallProgress, StateMachineEvent};
58
59const INSTALL_PLAN_ID: &str = "install_plan_id";
60const UPDATE_FIRST_SEEN_TIME: &str = "update_first_seen_time";
61const UPDATE_FINISH_TIME: &str = "update_finish_time";
62const TARGET_VERSION: &str = "target_version";
63const CONSECUTIVE_FAILED_INSTALL_ATTEMPTS: &str = "consecutive_failed_install_attempts";
64const CHECK_REBOOT_ALLOWED_INTERVAL: Duration = Duration::from_secs(30 * 60);
66const X_RETRY_AFTER: &str = "X-Retry-After";
68const MAX_OMAHA_REQUEST_ATTEMPTS: u64 = 3;
70
71#[derive(Debug)]
74pub struct StateMachine<PE, HR, IN, TM, MR, ST, AS, CH>
75where
76 PE: PolicyEngine,
77 HR: HttpRequest,
78 IN: Installer,
79 TM: Timer,
80 MR: MetricsReporter,
81 ST: Storage,
82 AS: AppSet,
83{
84 config: Config,
86
87 policy_engine: PE,
88
89 http: HR,
90
91 installer: IN,
92
93 timer: TM,
94
95 time_source: PE::TimeSource,
96
97 metrics_reporter: MR,
98
99 storage_ref: Rc<Mutex<ST>>,
100
101 context: update_check::Context,
103
104 app_set: Rc<Mutex<AS>>,
107
108 cup_handler: Option<CH>,
109}
110
111#[derive(Copy, Clone, Debug, Eq, PartialEq)]
112pub enum State {
113 Idle,
114 CheckingForUpdates(InstallSource),
115 ErrorCheckingForUpdate,
116 NoUpdateAvailable,
117 InstallationDeferredByPolicy,
118 InstallingUpdate,
119 WaitingForReboot,
120 InstallationError,
121}
122
123#[derive(Error, Debug)]
126pub enum OmahaRequestError {
127 #[error("Unexpected JSON error constructing update check")]
128 Json(#[from] serde_json::Error),
129
130 #[error("Error building update check HTTP request")]
131 HttpBuilder(#[from] http::Error),
132
133 #[error("Error decorating outgoing request with CUPv2 parameters")]
134 CupDecoration(#[from] CupDecorationError),
135
136 #[error("Error validating incoming response with CUPv2 protocol")]
137 CupValidation(#[from] CupVerificationError),
138
139 #[error("HTTP transport error performing update check")]
141 HttpTransport(#[from] http_request::Error),
142
143 #[error("HTTP error performing update check: {0}")]
144 HttpStatus(hyper::StatusCode),
145}
146
147impl From<request_builder::Error> for OmahaRequestError {
148 fn from(err: request_builder::Error) -> Self {
149 match err {
150 request_builder::Error::Json(e) => OmahaRequestError::Json(e),
151 request_builder::Error::Http(e) => OmahaRequestError::HttpBuilder(e),
152 request_builder::Error::Cup(e) => OmahaRequestError::CupDecoration(e),
153 }
154 }
155}
156
157impl From<http::StatusCode> for OmahaRequestError {
158 fn from(sc: http::StatusCode) -> Self {
159 OmahaRequestError::HttpStatus(sc)
160 }
161}
162
163#[derive(Error, Debug)]
166pub enum ResponseParseError {
167 #[error("Response was not valid UTF-8")]
168 Utf8(#[from] Utf8Error),
169
170 #[error("Unexpected JSON error parsing update check response")]
171 Json(#[from] serde_json::Error),
172}
173
174#[derive(Error, Debug)]
175pub enum UpdateCheckError {
176 #[error("Error checking with Omaha")]
177 OmahaRequest(#[from] OmahaRequestError),
178
179 #[error("Error parsing Omaha response")]
180 ResponseParser(#[from] ResponseParseError),
181
182 #[error("Unable to create an install plan")]
183 InstallPlan(#[source] anyhow::Error),
184}
185
186#[derive(Clone)]
188pub struct ControlHandle(mpsc::Sender<ControlRequest>);
189
190#[derive(Debug, Clone, Error, PartialEq, Eq)]
192#[error("state machine dropped before all its control handles")]
193pub struct StateMachineGone;
194
195impl From<mpsc::SendError> for StateMachineGone {
196 fn from(_: mpsc::SendError) -> Self {
197 StateMachineGone
198 }
199}
200
201impl From<oneshot::Canceled> for StateMachineGone {
202 fn from(_: oneshot::Canceled) -> Self {
203 StateMachineGone
204 }
205}
206
207enum ControlRequest {
208 StartUpdateCheck {
209 options: CheckOptions,
210 responder: oneshot::Sender<StartUpdateCheckResponse>,
211 },
212}
213
214#[derive(Debug, Clone, PartialEq, Eq)]
216pub enum StartUpdateCheckResponse {
217 Started,
219
220 AlreadyRunning,
223
224 Throttled,
226}
227
228impl ControlHandle {
229 pub async fn start_update_check(
232 &mut self,
233 options: CheckOptions,
234 ) -> Result<StartUpdateCheckResponse, StateMachineGone> {
235 let (responder, receive_response) = oneshot::channel();
236 self.0
237 .send(ControlRequest::StartUpdateCheck { options, responder })
238 .await?;
239 Ok(receive_response.await?)
240 }
241}
242
243#[derive(Debug)]
244enum RebootAfterUpdate<T> {
245 Needed(T),
246 NotNeeded,
247}
248
249impl<PE, HR, IN, TM, MR, ST, AS, IR, PL, CH> StateMachine<PE, HR, IN, TM, MR, ST, AS, CH>
250where
251 PE: PolicyEngine<InstallResult = IR, InstallPlan = PL>,
252 HR: HttpRequest,
253 IN: Installer<InstallResult = IR, InstallPlan = PL>,
254 TM: Timer,
255 MR: MetricsReporter,
256 ST: Storage,
257 AS: AppSet,
258 CH: Cupv2Handler,
259 IR: 'static + Send,
260 PL: Plan,
261{
262 async fn update_next_update_time(
264 &mut self,
265 co: &mut async_generator::Yield<StateMachineEvent>,
266 ) -> CheckTiming {
267 let apps = self.app_set.lock().await.get_apps();
268 let timing = self
269 .policy_engine
270 .compute_next_update_time(&apps, &self.context.schedule, &self.context.state)
271 .await;
272 self.context.schedule.next_update_time = Some(timing);
273
274 co.yield_(StateMachineEvent::ScheduleChange(self.context.schedule))
275 .await;
276 info!("Calculated check timing: {}", timing);
277 timing
278 }
279
280 async fn make_wait_to_next_check(
282 &mut self,
283 check_timing: CheckTiming,
284 ) -> Fuse<BoxFuture<'static, ()>> {
285 if let Some(minimum_wait) = check_timing.minimum_wait {
286 future::join(
290 self.timer.wait_for(minimum_wait),
291 self.timer.wait_until(check_timing.time),
292 )
293 .map(|_| ())
294 .boxed()
295 .fuse()
296 } else {
297 self.timer.wait_until(check_timing.time).fuse()
300 }
301 }
302
303 async fn run(
304 mut self,
305 mut control: mpsc::Receiver<ControlRequest>,
306 mut co: async_generator::Yield<StateMachineEvent>,
307 ) {
308 {
309 let app_set = self.app_set.lock().await;
310 if !app_set.all_valid() {
311 error!(
312 "App set not valid, not starting state machine: {:#?}",
313 app_set.get_apps()
314 );
315 return;
316 }
317 }
318
319 let state_machine_start_monotonic_time = self.time_source.now_in_monotonic();
320
321 let mut should_report_waited_for_reboot_duration = false;
322
323 let update_finish_time = {
324 let storage = self.storage_ref.lock().await;
325 let update_finish_time = storage.get_time(UPDATE_FINISH_TIME).await;
326 if update_finish_time.is_some()
327 && let Some(target_version) = storage.get_string(TARGET_VERSION).await
328 && target_version == self.config.os.version
329 {
330 should_report_waited_for_reboot_duration = true;
331 }
332 update_finish_time
333 };
334
335 loop {
336 info!("Initial context: {:?}", self.context);
337
338 if should_report_waited_for_reboot_duration {
339 match self.report_waited_for_reboot_duration(
340 update_finish_time.unwrap(),
341 state_machine_start_monotonic_time,
342 self.time_source.now(),
343 ) {
344 Ok(()) => {
345 should_report_waited_for_reboot_duration = false;
347
348 let mut storage = self.storage_ref.lock().await;
349 storage.remove_or_log(UPDATE_FINISH_TIME).await;
350 storage.remove_or_log(TARGET_VERSION).await;
351 storage.commit_or_log().await;
352 }
353 Err(e) => {
354 warn!(
355 "Couldn't report wait for reboot duration: {:#}, will try again",
356 e
357 );
358 }
359 }
360 }
361
362 let (mut options, responder) = {
363 let check_timing = self.update_next_update_time(&mut co).await;
364 let mut wait_to_next_check = self.make_wait_to_next_check(check_timing).await;
365
366 select! {
369 () = wait_to_next_check => (CheckOptions::default(), None),
370 ControlRequest::StartUpdateCheck{options, responder} = control.select_next_some() => {
371 (options, Some(responder))
372 }
373 }
374 };
375
376 let reboot_after_update = {
377 let apps = self.app_set.lock().await.get_apps();
378 info!(
379 "Checking to see if an update check is allowed at this time for {:?}",
380 apps
381 );
382 let decision = self
383 .policy_engine
384 .update_check_allowed(
385 &apps,
386 &self.context.schedule,
387 &self.context.state,
388 &options,
389 )
390 .await;
391
392 info!("The update check decision is: {:?}", decision);
393
394 let request_params = match decision {
395 CheckDecision::Ok(rp) | CheckDecision::OkUpdateDeferred(rp) => rp,
397
398 CheckDecision::TooSoon
400 | CheckDecision::ThrottledByPolicy
401 | CheckDecision::DeniedByPolicy => {
402 info!("The update check is not allowed at this time.");
403 if let Some(responder) = responder {
404 let _ = responder.send(StartUpdateCheckResponse::Throttled);
405 }
406 continue;
407 }
408 };
409 if let Some(responder) = responder {
410 let _ = responder.send(StartUpdateCheckResponse::Started);
411 }
412
413 let update_check = self.start_update_check(request_params, &mut co).fuse();
415 futures::pin_mut!(update_check);
416
417 loop {
420 select! {
421 update_check_result = update_check => break update_check_result,
422 ControlRequest::StartUpdateCheck{
423 options: new_options,
424 responder
425 } = control.select_next_some() => {
426 if new_options.source == InstallSource::OnDemand {
427 info!("Got on demand update check request, ensuring ongoing check is on demand");
428 options.source = InstallSource::OnDemand;
430 }
431
432 let _ = responder.send(StartUpdateCheckResponse::AlreadyRunning);
433 }
434 }
435 }
436 };
437
438 if let RebootAfterUpdate::Needed(install_result) = reboot_after_update {
439 Self::yield_state(State::WaitingForReboot, &mut co).await;
440 self.wait_for_reboot(options, &mut control, install_result, &mut co)
441 .await;
442 }
443
444 Self::yield_state(State::Idle, &mut co).await;
445 }
446 }
447
448 async fn wait_for_reboot(
449 &mut self,
450 mut options: CheckOptions,
451 control: &mut mpsc::Receiver<ControlRequest>,
452 install_result: IN::InstallResult,
453 co: &mut async_generator::Yield<StateMachineEvent>,
454 ) {
455 if !self
456 .policy_engine
457 .reboot_allowed(&options, &install_result)
458 .await
459 {
460 let wait_to_see_if_reboot_allowed =
461 self.timer.wait_for(CHECK_REBOOT_ALLOWED_INTERVAL).fuse();
462 futures::pin_mut!(wait_to_see_if_reboot_allowed);
463
464 let check_timing = self.update_next_update_time(co).await;
465 let wait_to_next_ping = self.make_wait_to_next_check(check_timing).await;
466 futures::pin_mut!(wait_to_next_ping);
467
468 loop {
469 select! {
473 () = wait_to_see_if_reboot_allowed => {
474 if self.policy_engine.reboot_allowed(&options, &install_result).await {
475 break;
476 }
477 info!("Reboot not allowed at the moment, will try again in 30 minutes...");
478 wait_to_see_if_reboot_allowed.set(
479 self.timer.wait_for(CHECK_REBOOT_ALLOWED_INTERVAL).fuse()
480 );
481 },
482 () = wait_to_next_ping => {
483 self.ping_omaha(co).await;
484 let check_timing = self.update_next_update_time(co).await;
485 wait_to_next_ping.set(self.make_wait_to_next_check(check_timing).await);
486 },
487 ControlRequest::StartUpdateCheck{
488 options: new_options,
489 responder
490 } = control.select_next_some() => {
491 let _ = responder.send(StartUpdateCheckResponse::AlreadyRunning);
492 if new_options.source == InstallSource::OnDemand {
493 info!("Waiting for reboot, but ensuring that InstallSource is OnDemand");
494 options.source = InstallSource::OnDemand;
495
496 if self.policy_engine.reboot_allowed(&options, &install_result).await {
497 info!("Upgraded update check request to on demand, policy allowed reboot");
498 break;
499 }
500 };
501 }
502 }
503 }
504 }
505 info!("Rebooting the system at the end of a successful update");
506 if let Err(e) = self.installer.perform_reboot().await {
507 error!("Unable to reboot the system: {}", e);
508 }
509 }
510
511 fn report_waited_for_reboot_duration(
516 &mut self,
517 update_finish_time: SystemTime,
518 state_machine_start_monotonic_time: Instant,
519 now: ComplexTime,
520 ) -> Result<(), anyhow::Error> {
521 let update_finish_time_to_now =
524 now.wall_duration_since(update_finish_time).map_err(|e| {
525 anyhow!(
526 "Update finish time later than now, can't report waited for reboot duration,
527 update finish time: {:?}, now: {:?}, error: {:?}",
528 update_finish_time,
529 now,
530 e,
531 )
532 })?;
533
534 let state_machine_start_to_now = now
543 .mono
544 .checked_duration_since(state_machine_start_monotonic_time)
545 .ok_or_else(|| {
546 error!("Monotonic time appears to have gone backwards");
547 anyhow!(
548 "State machine start later than now, can't report waited for reboot duration. \
549 State machine start: {:?}, now: {:?}",
550 state_machine_start_monotonic_time,
551 now.mono,
552 )
553 })?;
554
555 let waited_for_reboot_duration = update_finish_time_to_now
556 .checked_sub(state_machine_start_to_now)
557 .ok_or_else(|| {
558 anyhow!(
559 "Can't report waiting for reboot duration, update finish time to now smaller \
560 than state machine start to now. Update finish time to now: {:?}, state \
561 machine start to now: {:?}",
562 update_finish_time_to_now,
563 state_machine_start_to_now,
564 )
565 })?;
566
567 info!(
568 "Waited {} seconds for reboot.",
569 waited_for_reboot_duration.as_secs()
570 );
571 self.report_metrics(Metrics::WaitedForRebootDuration(waited_for_reboot_duration));
572 Ok(())
573 }
574
575 async fn report_check_interval(&mut self, install_source: InstallSource) {
578 let now = self.time_source.now();
579
580 match self.context.schedule.last_update_check_time {
581 Some(PartialComplexTime::Wall(t)) => match now.wall_duration_since(t) {
584 Ok(interval) => self.report_metrics(Metrics::UpdateCheckInterval {
585 interval,
586 clock: ClockType::Wall,
587 install_source,
588 }),
589 Err(e) => warn!("Last check time is in the future: {}", e),
590 },
591
592 Some(PartialComplexTime::Complex(t)) => match now.mono.checked_duration_since(t.mono) {
597 Some(interval) => self.report_metrics(Metrics::UpdateCheckInterval {
598 interval,
599 clock: ClockType::Monotonic,
600 install_source,
601 }),
602 None => error!("Monotonic time in the past"),
603 },
604
605 _ => {}
611 }
612
613 self.context.schedule.last_update_check_time = now.into();
614 }
615
616 async fn start_update_check(
620 &mut self,
621 request_params: RequestParams,
622 co: &mut async_generator::Yield<StateMachineEvent>,
623 ) -> RebootAfterUpdate<IN::InstallResult> {
624 let apps = self.app_set.lock().await.get_apps();
625 let result = self.perform_update_check(request_params, apps, co).await;
626
627 let (result, reboot_after_update) = match result {
628 Ok((result, reboot_after_update)) => {
629 info!("Update check result: {:?}", result);
630 self.context.schedule.last_update_time = Some(self.time_source.now().into());
632
633 let install_success =
635 result.app_responses.iter().fold(None, |result, app| {
636 match (result, &app.result) {
637 (_, update_check::Action::InstallPlanExecutionError) => Some(false),
638 (None, update_check::Action::Updated) => Some(true),
639 (result, _) => result,
640 }
641 });
642
643 self.report_attempts_to_successful_check(true).await;
646
647 self.app_set
648 .lock()
649 .await
650 .update_from_omaha(&result.app_responses);
651
652 if let Some(success) = install_success {
655 self.report_attempts_to_successful_install(success).await;
656 }
657
658 (Ok(result), reboot_after_update)
659 }
661 Err(error) => {
662 error!("Update check failed: {:?}", error);
663
664 let failure_reason = match &error {
665 UpdateCheckError::ResponseParser(_) | UpdateCheckError::InstallPlan(_) => {
666 self.context.schedule.last_update_time =
668 Some(self.time_source.now().into());
669
670 UpdateCheckFailureReason::Omaha
671 }
672 UpdateCheckError::OmahaRequest(request_error) => match request_error {
673 OmahaRequestError::Json(_)
674 | OmahaRequestError::HttpBuilder(_)
675 | OmahaRequestError::CupDecoration(_)
676 | OmahaRequestError::CupValidation(_) => UpdateCheckFailureReason::Internal,
677 OmahaRequestError::HttpTransport(_) | OmahaRequestError::HttpStatus(_) => {
678 UpdateCheckFailureReason::Network
679 }
680 },
681 };
682 self.report_metrics(Metrics::UpdateCheckFailureReason(failure_reason));
683
684 self.report_attempts_to_successful_check(false).await;
685 (Err(error), RebootAfterUpdate::NotNeeded)
686 }
687 };
688
689 co.yield_(StateMachineEvent::ScheduleChange(self.context.schedule))
690 .await;
691 co.yield_(StateMachineEvent::ProtocolStateChange(
692 self.context.state.clone(),
693 ))
694 .await;
695 co.yield_(StateMachineEvent::UpdateCheckResult(result))
696 .await;
697
698 self.persist_data().await;
699
700 reboot_after_update
701 }
702
703 async fn report_attempts_to_successful_check(&mut self, success: bool) {
706 let attempts = self.context.state.consecutive_failed_update_checks + 1;
707 if success {
708 self.context.state.consecutive_failed_update_checks = 0;
709 self.report_metrics(Metrics::AttemptsToSuccessfulCheck(attempts as u64));
710 } else {
711 self.context.state.consecutive_failed_update_checks = attempts;
712 }
713 }
714
715 async fn report_attempts_to_successful_install(&mut self, success: bool) {
718 let storage_ref = self.storage_ref.clone();
719 let mut storage = storage_ref.lock().await;
720 let attempts = storage
721 .get_int(CONSECUTIVE_FAILED_INSTALL_ATTEMPTS)
722 .await
723 .unwrap_or(0)
724 + 1;
725
726 self.report_metrics(Metrics::AttemptsToSuccessfulInstall {
727 count: attempts as u64,
728 successful: success,
729 });
730
731 if success {
732 storage
733 .remove_or_log(CONSECUTIVE_FAILED_INSTALL_ATTEMPTS)
734 .await;
735 } else if let Err(e) = storage
736 .set_int(CONSECUTIVE_FAILED_INSTALL_ATTEMPTS, attempts)
737 .await
738 {
739 error!(
740 "Unable to persist {}: {}",
741 CONSECUTIVE_FAILED_INSTALL_ATTEMPTS, e
742 );
743 }
744 }
745
746 async fn persist_data(&self) {
748 let mut storage = self.storage_ref.lock().await;
749 self.context.persist(&mut *storage).await;
750 self.app_set.lock().await.persist(&mut *storage).await;
751
752 storage.commit_or_log().await;
753 }
754
755 async fn perform_update_check(
758 &mut self,
759 request_params: RequestParams,
760 apps: Vec<App>,
761 co: &mut async_generator::Yield<StateMachineEvent>,
762 ) -> Result<(update_check::Response, RebootAfterUpdate<IN::InstallResult>), UpdateCheckError>
763 {
764 Self::yield_state(State::CheckingForUpdates(request_params.source), co).await;
765
766 self.report_check_interval(request_params.source).await;
767
768 let config = self.config.clone();
770 let mut request_builder = RequestBuilder::new(&config, &request_params);
771 for app in &apps {
772 request_builder = request_builder.add_update_check(app).add_ping(app);
773 }
774 let session_id = GUID::new();
775 request_builder = request_builder.session_id(session_id.clone());
776
777 let mut omaha_request_attempt = 1;
778
779 let loop_result = loop {
782 let omaha_check_start_time = self.time_source.now_in_monotonic();
784 request_builder = request_builder.request_id(GUID::new());
785 let result = self
786 .do_omaha_request_and_update_context(&request_builder, co)
787 .await;
788
789 {
791 let now = self.time_source.now_in_monotonic();
794 let duration = now.checked_duration_since(omaha_check_start_time);
795
796 if let Some(response_time) = duration {
797 self.report_metrics(Metrics::UpdateCheckResponseTime {
798 response_time,
799 successful: result.is_ok(),
800 });
801 } else {
802 error!(
804 "now: {:?}, is before omaha_check_start_time: {:?}",
805 now, omaha_check_start_time
806 );
807 }
808 }
809
810 match result {
811 Ok(res) => {
812 break Ok(res);
813 }
814 Err(OmahaRequestError::Json(e)) => {
815 error!("Unable to construct request body! {:?}", e);
816 Self::yield_state(State::ErrorCheckingForUpdate, co).await;
817 break Err(UpdateCheckError::OmahaRequest(e.into()));
818 }
819 Err(OmahaRequestError::HttpBuilder(e)) => {
820 error!("Unable to construct HTTP request! {:?}", e);
821 Self::yield_state(State::ErrorCheckingForUpdate, co).await;
822 break Err(UpdateCheckError::OmahaRequest(e.into()));
823 }
824 Err(OmahaRequestError::CupDecoration(e)) => {
825 error!(
826 "Unable to decorate HTTP request with CUPv2 parameters! {:?}",
827 e
828 );
829 Self::yield_state(State::ErrorCheckingForUpdate, co).await;
830 break Err(UpdateCheckError::OmahaRequest(e.into()));
831 }
832 Err(OmahaRequestError::CupValidation(e)) => {
833 error!(
834 "Unable to validate HTTP response with CUPv2 parameters! {:?}",
835 e
836 );
837 Self::yield_state(State::ErrorCheckingForUpdate, co).await;
838 break Err(UpdateCheckError::OmahaRequest(e.into()));
839 }
840 Err(OmahaRequestError::HttpTransport(e)) => {
841 warn!("Unable to contact Omaha: {:?}", e);
842 if omaha_request_attempt >= MAX_OMAHA_REQUEST_ATTEMPTS
845 || e.is_user()
846 || self.context.state.server_dictated_poll_interval.is_some()
847 {
848 Self::yield_state(State::ErrorCheckingForUpdate, co).await;
849 break Err(UpdateCheckError::OmahaRequest(e.into()));
850 }
851 }
852 Err(OmahaRequestError::HttpStatus(e)) => {
853 warn!("Unable to contact Omaha: {:?}", e);
854 if omaha_request_attempt >= MAX_OMAHA_REQUEST_ATTEMPTS
855 || self.context.state.server_dictated_poll_interval.is_some()
856 {
857 Self::yield_state(State::ErrorCheckingForUpdate, co).await;
858 break Err(UpdateCheckError::OmahaRequest(e.into()));
859 }
860 }
861 }
862
863 let backoff_time_secs = 1 << (omaha_request_attempt - 1);
866 let backoff_time = randomize(backoff_time_secs * 1000, 1000);
867 info!("Waiting {} ms before retrying...", backoff_time);
868 self.timer
869 .wait_for(Duration::from_millis(backoff_time))
870 .await;
871
872 omaha_request_attempt += 1;
873 };
874
875 self.report_metrics(Metrics::RequestsPerCheck {
876 count: omaha_request_attempt,
877 successful: loop_result.is_ok(),
878 });
879
880 let (_parts, data, request_metadata, signature) = loop_result?;
881
882 let response = match Self::parse_omaha_response(&data) {
883 Ok(res) => res,
884 Err(err) => {
885 warn!("Unable to parse Omaha response: {:?}", err);
886 Self::yield_state(State::ErrorCheckingForUpdate, co).await;
887 self.report_omaha_event_and_update_context(
888 &request_params,
889 Event::error(EventErrorCode::ParseResponse),
890 &apps,
891 &session_id,
892 &apps.iter().map(|app| (app.id.clone(), None)).collect(),
893 None,
894 co,
895 )
896 .await;
897 return Err(UpdateCheckError::ResponseParser(err));
898 }
899 };
900
901 info!("result: {:?}", response);
902
903 co.yield_(StateMachineEvent::OmahaServerResponse(response.clone()))
904 .await;
905
906 let statuses = Self::get_app_update_statuses(&response);
907 for (app_id, status) in &statuses {
908 info!("Omaha update check status: {} => {:?}", app_id, status);
910 }
911
912 let apps_with_update: Vec<_> = response
913 .apps
914 .iter()
915 .filter(|app| {
916 matches!(
917 app.update_check,
918 Some(UpdateCheck {
919 status: OmahaStatus::Ok,
920 ..
921 })
922 )
923 })
924 .collect();
925
926 if apps_with_update.is_empty() {
927 Self::yield_state(State::NoUpdateAvailable, co).await;
930 Self::make_not_updated_result(response, update_check::Action::NoUpdate)
931 } else {
932 info!(
933 "At least one app has an update, proceeding to build and process an Install Plan"
934 );
935 let next_versions: HashMap<String, Option<String>> = apps_with_update
939 .iter()
940 .map(|app| (app.id.clone(), app.get_manifest_version()))
941 .collect();
942 let install_plan = match self
943 .installer
944 .try_create_install_plan(
945 &request_params,
946 request_metadata.as_ref(),
947 &response,
948 data,
949 signature.map(|s| s.as_bytes().to_vec()),
950 )
951 .await
952 {
953 Ok(plan) => plan,
954 Err(e) => {
955 error!("Unable to construct install plan! {}", e);
956 Self::yield_state(State::InstallingUpdate, co).await;
957 Self::yield_state(State::InstallationError, co).await;
958 self.report_omaha_event_and_update_context(
959 &request_params,
960 Event::error(EventErrorCode::ConstructInstallPlan),
961 &apps,
962 &session_id,
963 &next_versions,
964 None,
965 co,
966 )
967 .await;
968 return Err(UpdateCheckError::InstallPlan(e.into()));
969 }
970 };
971
972 info!("Validating Install Plan with Policy");
973 let install_plan_decision = self.policy_engine.update_can_start(&install_plan).await;
974 match install_plan_decision {
975 UpdateDecision::Ok => {
976 info!("Proceeding with install plan.");
977 }
978 UpdateDecision::DeferredByPolicy => {
979 info!("Install plan was deferred by Policy.");
980 let event = Event {
983 event_type: EventType::UpdateComplete,
984 event_result: EventResult::UpdateDeferred,
985 ..Event::default()
986 };
987 self.report_omaha_event_and_update_context(
988 &request_params,
989 event,
990 &apps,
991 &session_id,
992 &next_versions,
993 None,
994 co,
995 )
996 .await;
997
998 Self::yield_state(State::InstallationDeferredByPolicy, co).await;
999
1000 return Self::make_not_updated_result(
1001 response,
1002 update_check::Action::DeferredByPolicy,
1003 );
1004 }
1005 UpdateDecision::DeniedByPolicy => {
1006 warn!("Install plan was denied by Policy, see Policy logs for reasoning");
1007 self.report_omaha_event_and_update_context(
1008 &request_params,
1009 Event::error(EventErrorCode::DeniedByPolicy),
1010 &apps,
1011 &session_id,
1012 &next_versions,
1013 None,
1014 co,
1015 )
1016 .await;
1017
1018 return Self::make_not_updated_result(
1019 response,
1020 update_check::Action::DeniedByPolicy,
1021 );
1022 }
1023 }
1024
1025 Self::yield_state(State::InstallingUpdate, co).await;
1026 self.report_omaha_event_and_update_context(
1027 &request_params,
1028 Event::success(EventType::UpdateDownloadStarted),
1029 &apps,
1030 &session_id,
1031 &next_versions,
1032 None,
1033 co,
1034 )
1035 .await;
1036
1037 let install_plan_id = install_plan.id();
1038 let update_start_time = self.time_source.now_in_walltime();
1039 let update_first_seen_time = self
1040 .record_update_first_seen_time(&install_plan_id, update_start_time)
1041 .await;
1042
1043 let (send, mut recv) = mpsc::channel(0);
1044 let observer = StateMachineProgressObserver(send);
1045 let perform_install = async {
1046 let result = self
1047 .installer
1048 .perform_install(&install_plan, Some(&observer))
1049 .await;
1050 drop(observer);
1052 result
1053 };
1054 let yield_progress = async {
1055 while let Some(progress) = recv.next().await {
1056 co.yield_(StateMachineEvent::InstallProgressChange(progress))
1057 .await;
1058 }
1059 };
1060
1061 let ((install_result, mut app_install_results), ()) =
1062 future::join(perform_install, yield_progress).await;
1063 let no_apps_failed = app_install_results.iter().all(|result| {
1064 matches!(
1065 result,
1066 AppInstallResult::Installed | AppInstallResult::Deferred
1067 )
1068 });
1069 let update_finish_time = self.time_source.now_in_walltime();
1070 let install_duration = match update_finish_time.duration_since(update_start_time) {
1071 Ok(duration) => {
1072 let metrics = if no_apps_failed {
1073 Metrics::SuccessfulUpdateDuration(duration)
1074 } else {
1075 Metrics::FailedUpdateDuration(duration)
1076 };
1077 self.report_metrics(metrics);
1078 Some(duration)
1079 }
1080 Err(e) => {
1081 warn!("Update start time is in the future: {}", e);
1082 None
1083 }
1084 };
1085
1086 let config = self.config.clone();
1087 let mut request_builder = RequestBuilder::new(&config, &request_params);
1088 let mut events = vec![];
1089 let mut installed_apps = vec![];
1090 for (response_app, app_install_result) in
1091 apps_with_update.iter().zip(&app_install_results)
1092 {
1093 match apps.iter().find(|app| app.id == response_app.id) {
1094 Some(app) => {
1095 let event = match app_install_result {
1096 AppInstallResult::Installed => {
1097 installed_apps.push(app);
1098 Event::success(EventType::UpdateDownloadFinished)
1099 }
1100 AppInstallResult::Deferred => Event {
1101 event_type: EventType::UpdateComplete,
1102 event_result: EventResult::UpdateDeferred,
1103 ..Event::default()
1104 },
1105 AppInstallResult::Failed(_) => {
1106 Event::error(EventErrorCode::Installation)
1107 }
1108 };
1109 let event = Event {
1110 previous_version: Some(app.version.to_string()),
1111 next_version: response_app.get_manifest_version(),
1112 download_time_ms: install_duration
1113 .and_then(|d| d.as_millis().try_into().ok()),
1114 ..event
1115 };
1116 request_builder = request_builder.add_event(app, event.clone());
1117 events.push(event);
1118 }
1119 None => {
1120 error!("unknown app id in omaha response: {:?}", response_app.id);
1121 }
1122 }
1123 }
1124 request_builder = request_builder
1125 .session_id(session_id.clone())
1126 .request_id(GUID::new());
1127 if let Err(e) = self
1128 .do_omaha_request_and_update_context(&request_builder, co)
1129 .await
1130 {
1131 for event in events {
1132 self.report_metrics(Metrics::OmahaEventLost(event));
1133 }
1134 warn!("Unable to report event to Omaha: {:?}", e);
1135 }
1136
1137 if !installed_apps.is_empty() {
1141 self.report_omaha_event_and_update_context(
1142 &request_params,
1143 Event::success(EventType::UpdateComplete),
1144 installed_apps,
1145 &session_id,
1146 &next_versions,
1147 install_duration,
1148 co,
1149 )
1150 .await;
1151 }
1152
1153 let mut errors = vec![];
1154 let daystart = response.daystart;
1155 let app_responses = response
1156 .apps
1157 .into_iter()
1158 .map(|app| update_check::AppResponse {
1159 app_id: app.id,
1160 cohort: app.cohort,
1161 user_counting: daystart.clone().into(),
1162 result: match app.update_check {
1163 Some(UpdateCheck {
1164 status: OmahaStatus::Ok,
1165 ..
1166 }) => match app_install_results.remove(0) {
1167 AppInstallResult::Installed => update_check::Action::Updated,
1168 AppInstallResult::Deferred => update_check::Action::DeferredByPolicy,
1169 AppInstallResult::Failed(e) => {
1170 errors.push(e);
1171 update_check::Action::InstallPlanExecutionError
1172 }
1173 },
1174 _ => update_check::Action::NoUpdate,
1175 },
1176 })
1177 .collect();
1178
1179 if !errors.is_empty() {
1180 for e in errors {
1181 co.yield_(StateMachineEvent::InstallerError(Some(Box::new(e))))
1182 .await;
1183 }
1184 Self::yield_state(State::InstallationError, co).await;
1185
1186 return Ok((
1187 update_check::Response { app_responses },
1188 RebootAfterUpdate::NotNeeded,
1189 ));
1190 }
1191
1192 match update_finish_time.duration_since(update_first_seen_time) {
1193 Ok(duration) => {
1194 self.report_metrics(Metrics::SuccessfulUpdateFromFirstSeen(duration))
1195 }
1196 Err(e) => warn!("Update first seen time is in the future: {}", e),
1197 }
1198 {
1199 let mut storage = self.storage_ref.lock().await;
1200 if let Err(e) = storage
1201 .set_time(UPDATE_FINISH_TIME, update_finish_time)
1202 .await
1203 {
1204 error!("Unable to persist {}: {}", UPDATE_FINISH_TIME, e);
1205 }
1206 let app_set = self.app_set.lock().await;
1207 let system_app_id = app_set.get_system_app_id();
1208 if let Some(next_version) = next_versions.get(system_app_id) {
1210 let target_version = next_version.as_deref().unwrap_or_else(|| {
1211 error!("Target version string not found in Omaha response.");
1212 "UNKNOWN"
1213 });
1214 if let Err(e) = storage.set_string(TARGET_VERSION, target_version).await {
1215 error!("Unable to persist {}: {}", TARGET_VERSION, e);
1216 }
1217 }
1218 storage.commit_or_log().await;
1219 }
1220
1221 let reboot_after_update = if self.policy_engine.reboot_needed(&install_plan).await {
1222 RebootAfterUpdate::Needed(install_result)
1223 } else {
1224 RebootAfterUpdate::NotNeeded
1225 };
1226
1227 Ok((
1228 update_check::Response { app_responses },
1229 reboot_after_update,
1230 ))
1231 }
1232 }
1233
1234 #[allow(clippy::too_many_arguments)]
1237 async fn report_omaha_event_and_update_context<'a>(
1238 &'a mut self,
1239 request_params: &'a RequestParams,
1240 event: Event,
1241 apps: impl IntoIterator<Item = &App>,
1242 session_id: &GUID,
1243 next_versions: &HashMap<String, Option<String>>,
1244 install_duration: Option<Duration>,
1245 co: &mut async_generator::Yield<StateMachineEvent>,
1246 ) {
1247 let config = self.config.clone();
1248 let mut request_builder = RequestBuilder::new(&config, request_params);
1249 for app in apps {
1250 if let Some(next_version) = next_versions.get(&app.id) {
1252 let event = Event {
1253 previous_version: Some(app.version.to_string()),
1254 next_version: next_version.clone(),
1255 download_time_ms: install_duration.and_then(|d| d.as_millis().try_into().ok()),
1256 ..event.clone()
1257 };
1258 request_builder = request_builder.add_event(app, event);
1259 }
1260 }
1261 request_builder = request_builder
1262 .session_id(session_id.clone())
1263 .request_id(GUID::new());
1264 if let Err(e) = self
1265 .do_omaha_request_and_update_context(&request_builder, co)
1266 .await
1267 {
1268 self.report_metrics(Metrics::OmahaEventLost(event));
1269 warn!("Unable to report event to Omaha: {:?}", e);
1270 }
1271 }
1272
1273 async fn ping_omaha(&mut self, co: &mut async_generator::Yield<StateMachineEvent>) {
1275 let apps = self.app_set.lock().await.get_apps();
1276 let request_params = RequestParams {
1277 source: InstallSource::ScheduledTask,
1278 use_configured_proxies: true,
1279 disable_updates: false,
1280 offer_update_if_same_version: false,
1281 };
1282 let config = self.config.clone();
1283 let mut request_builder = RequestBuilder::new(&config, &request_params);
1284 for app in &apps {
1285 request_builder = request_builder.add_ping(app);
1286 }
1287 request_builder = request_builder
1288 .session_id(GUID::new())
1289 .request_id(GUID::new());
1290
1291 let (_parts, data, _request_metadata, _signature) = match self
1292 .do_omaha_request_and_update_context(&request_builder, co)
1293 .await
1294 {
1295 Ok(res) => res,
1296 Err(e) => {
1297 error!("Ping Omaha failed: {:#}", anyhow!(e));
1298 self.context.state.consecutive_failed_update_checks += 1;
1299 self.persist_data().await;
1300 return;
1301 }
1302 };
1303
1304 let response = match Self::parse_omaha_response(&data) {
1305 Ok(res) => res,
1306 Err(e) => {
1307 error!("Unable to parse Omaha response: {:#}", anyhow!(e));
1308 self.context.state.consecutive_failed_update_checks += 1;
1309 self.persist_data().await;
1310 return;
1311 }
1312 };
1313
1314 self.context.state.consecutive_failed_update_checks = 0;
1315
1316 self.context.schedule.last_update_time = Some(self.time_source.now().into());
1319 co.yield_(StateMachineEvent::ScheduleChange(self.context.schedule))
1320 .await;
1321
1322 let app_responses = Self::make_app_responses(response, update_check::Action::NoUpdate);
1323 self.app_set.lock().await.update_from_omaha(&app_responses);
1324
1325 self.persist_data().await;
1326 }
1327
1328 async fn do_omaha_request_and_update_context<'a>(
1341 &'a mut self,
1342 builder: &RequestBuilder<'a>,
1343 co: &mut async_generator::Yield<StateMachineEvent>,
1344 ) -> Result<
1345 (
1346 Parts,
1347 Vec<u8>,
1348 Option<RequestMetadata>,
1349 Option<DerSignature>,
1350 ),
1351 OmahaRequestError,
1352 > {
1353 let (request, request_metadata) = builder.build(self.cup_handler.as_ref())?;
1354 let response = Self::make_request(&mut self.http, request).await?;
1355
1356 let signature: Option<DerSignature> = if let (Some(handler), Some(metadata)) =
1357 (self.cup_handler.as_ref(), &request_metadata)
1358 {
1359 let signature = handler
1360 .verify_response(metadata, &response, metadata.public_key_id)
1361 .map_err(|e| {
1362 error!("Could not verify response: {:?}", e);
1363 e
1364 })?;
1365 Some(signature)
1366 } else {
1367 None
1368 };
1369
1370 let (parts, body) = response.into_parts();
1371
1372 let server_dictated_poll_interval = parts.headers.get(X_RETRY_AFTER).and_then(|header| {
1374 match header
1375 .to_str()
1376 .map_err(|e| anyhow!(e))
1377 .and_then(|s| s.parse::<u64>().map_err(|e| anyhow!(e)))
1378 {
1379 Ok(seconds) => {
1380 Some(Duration::from_secs(min(seconds, 86400)))
1383 }
1384 Err(e) => {
1385 error!("Unable to parse {} header: {:#}", X_RETRY_AFTER, e);
1386 None
1387 }
1388 }
1389 });
1390 if self.context.state.server_dictated_poll_interval != server_dictated_poll_interval {
1391 self.context.state.server_dictated_poll_interval = server_dictated_poll_interval;
1392 co.yield_(StateMachineEvent::ProtocolStateChange(
1393 self.context.state.clone(),
1394 ))
1395 .await;
1396 let mut storage = self.storage_ref.lock().await;
1397 self.context.persist(&mut *storage).await;
1398 storage.commit_or_log().await;
1399 }
1400 if !parts.status.is_success() {
1401 Err(OmahaRequestError::HttpStatus(parts.status))
1403 } else {
1404 info!("Omaha HTTP response: {}", parts.status);
1406 Ok((parts, body, request_metadata, signature))
1407 }
1408 }
1409
1410 async fn make_request(
1416 http_client: &mut HR,
1417 request: http::Request<hyper::Body>,
1418 ) -> Result<HttpResponse<Vec<u8>>, http_request::Error> {
1419 info!("Making http request to: {}", request.uri());
1420 http_client.request(request).await.map_err(|err| {
1421 warn!("Unable to perform request: {}", err);
1422 err
1423 })
1424 }
1425
1426 fn parse_omaha_response(data: &[u8]) -> Result<Response, ResponseParseError> {
1430 parse_json_response(data).map_err(ResponseParseError::Json)
1431 }
1432
1433 fn get_app_update_statuses(response: &Response) -> Vec<(&str, &OmahaStatus)> {
1436 response
1437 .apps
1438 .iter()
1439 .filter_map(|app| {
1440 app.update_check
1441 .as_ref()
1442 .map(|u| (app.id.as_str(), &u.status))
1443 })
1444 .collect()
1445 }
1446
1447 fn make_app_responses(
1453 response: protocol::response::Response,
1454 action: update_check::Action,
1455 ) -> Vec<update_check::AppResponse> {
1456 let daystart = response.daystart;
1457 response
1458 .apps
1459 .into_iter()
1460 .map(|app| update_check::AppResponse {
1461 app_id: app.id,
1462 cohort: app.cohort,
1463 user_counting: daystart.clone().into(),
1464 result: action.clone(),
1465 })
1466 .collect()
1467 }
1468
1469 fn make_not_updated_result(
1471 response: protocol::response::Response,
1472 action: update_check::Action,
1473 ) -> Result<(update_check::Response, RebootAfterUpdate<IN::InstallResult>), UpdateCheckError>
1474 {
1475 Ok((
1476 update_check::Response {
1477 app_responses: Self::make_app_responses(response, action),
1478 },
1479 RebootAfterUpdate::NotNeeded,
1480 ))
1481 }
1482
1483 async fn yield_state(state: State, co: &mut async_generator::Yield<StateMachineEvent>) {
1485 co.yield_(StateMachineEvent::StateChange(state)).await;
1486 }
1487
1488 fn report_metrics(&mut self, metrics: Metrics) {
1489 if let Err(err) = self.metrics_reporter.report_metrics(metrics) {
1490 warn!("Unable to report metrics: {:?}", err);
1491 }
1492 }
1493
1494 async fn record_update_first_seen_time(
1495 &mut self,
1496 install_plan_id: &str,
1497 now: SystemTime,
1498 ) -> SystemTime {
1499 let mut storage = self.storage_ref.lock().await;
1500 let previous_id = storage.get_string(INSTALL_PLAN_ID).await;
1501 if let Some(previous_id) = previous_id
1502 && previous_id == install_plan_id
1503 {
1504 return storage
1505 .get_time(UPDATE_FIRST_SEEN_TIME)
1506 .await
1507 .unwrap_or(now);
1508 }
1509 if let Err(e) = storage.set_string(INSTALL_PLAN_ID, install_plan_id).await {
1511 error!("Unable to persist {}: {}", INSTALL_PLAN_ID, e);
1512 return now;
1513 }
1514 if let Err(e) = storage.set_time(UPDATE_FIRST_SEEN_TIME, now).await {
1515 error!("Unable to persist {}: {}", UPDATE_FIRST_SEEN_TIME, e);
1516 let _ = storage.remove(INSTALL_PLAN_ID).await;
1517 return now;
1518 }
1519 storage.commit_or_log().await;
1520 now
1521 }
1522}
1523
1524fn randomize(n: u64, range: u64) -> u64 {
1526 n - range / 2 + rand::random::<u64>() % range
1527}
1528
1529#[cfg(test)]
1530impl<PE, HR, IN, TM, MR, ST, AS, IR, PL, CH> StateMachine<PE, HR, IN, TM, MR, ST, AS, CH>
1531where
1532 PE: PolicyEngine<InstallResult = IR, InstallPlan = PL>,
1533 HR: HttpRequest,
1534 IN: Installer<InstallResult = IR, InstallPlan = PL>,
1535 TM: Timer,
1536 MR: MetricsReporter,
1537 ST: Storage,
1538 AS: AppSet,
1539 CH: Cupv2Handler,
1540 IR: 'static + Send,
1541 PL: Plan,
1542{
1543 async fn oneshot(
1545 &mut self,
1546 request_params: RequestParams,
1547 ) -> Result<(update_check::Response, RebootAfterUpdate<IN::InstallResult>), UpdateCheckError>
1548 {
1549 let apps = self.app_set.lock().await.get_apps();
1550
1551 async_generator::generate(move |mut co| async move {
1552 self.perform_update_check(request_params, apps, &mut co)
1553 .await
1554 })
1555 .into_complete()
1556 .await
1557 }
1558
1559 async fn run_once(&mut self) {
1561 let request_params = RequestParams::default();
1562
1563 async_generator::generate(move |mut co| async move {
1564 self.start_update_check(request_params, &mut co).await;
1565 })
1566 .map(|_| ())
1567 .collect::<()>()
1568 .await;
1569 }
1570}
1571
1572#[cfg(test)]
1573mod tests {
1574 use super::update_check::{
1575 Action, CONSECUTIVE_FAILED_UPDATE_CHECKS, LAST_UPDATE_TIME, SERVER_DICTATED_POLL_INTERVAL,
1576 };
1577 use super::*;
1578 use crate::{
1579 app_set::VecAppSet,
1580 common::{
1581 App, CheckOptions, PersistedApp, ProtocolState, UpdateCheckSchedule, UserCounting,
1582 },
1583 configuration::Updater,
1584 cup_ecdsa::test_support::{MockCupv2Handler, make_cup_handler_for_test},
1585 http_request::mock::MockHttpRequest,
1586 installer::{
1587 ProgressObserver,
1588 stub::{StubInstallErrors, StubInstaller, StubPlan},
1589 },
1590 metrics::MockMetricsReporter,
1591 policy::{MockPolicyEngine, StubPolicyEngine},
1592 protocol::{Cohort, request::OS, response},
1593 storage::MemStorage,
1594 time::{
1595 MockTimeSource, PartialComplexTime,
1596 timers::{BlockingTimer, MockTimer, RequestedWait},
1597 },
1598 version::Version,
1599 };
1600 use assert_matches::assert_matches;
1601 use futures::executor::{LocalPool, block_on};
1602 use futures::future::LocalBoxFuture;
1603 use futures::task::LocalSpawnExt;
1604 use pretty_assertions::assert_eq;
1605 use serde_json::json;
1606 use std::cell::RefCell;
1607 use std::time::Duration;
1608
1609 fn make_test_app_set() -> Rc<Mutex<VecAppSet>> {
1610 Rc::new(Mutex::new(VecAppSet::new(vec![
1611 App::builder()
1612 .id("{00000000-0000-0000-0000-000000000001}")
1613 .version([1, 2, 3, 4])
1614 .cohort(Cohort::new("stable-channel"))
1615 .build(),
1616 ])))
1617 }
1618
1619 fn make_update_available_response() -> HttpResponse<Vec<u8>> {
1620 let response = json!({"response":{
1621 "server": "prod",
1622 "protocol": "3.0",
1623 "app": [{
1624 "appid": "{00000000-0000-0000-0000-000000000001}",
1625 "status": "ok",
1626 "updatecheck": {
1627 "status": "ok"
1628 }
1629 }],
1630 }});
1631 HttpResponse::new(serde_json::to_vec(&response).unwrap())
1632 }
1633
1634 fn make_noupdate_httpresponse() -> Vec<u8> {
1635 serde_json::to_vec(
1636 &(json!({"response":{
1637 "server": "prod",
1638 "protocol": "3.0",
1639 "app": [{
1640 "appid": "{00000000-0000-0000-0000-000000000001}",
1641 "status": "ok",
1642 "updatecheck": {
1643 "status": "noupdate"
1644 }
1645 }]
1646 }})),
1647 )
1648 .unwrap()
1649 }
1650
1651 async fn assert_request<'a>(http: &MockHttpRequest, request_builder: RequestBuilder<'a>) {
1654 let cup_handler = make_cup_handler_for_test();
1655 let (request, _request_metadata) = request_builder.build(Some(&cup_handler)).unwrap();
1656 let body = hyper::body::to_bytes(request).await.unwrap();
1657 let body_str = String::from_utf8_lossy(&body);
1659 http.assert_body_str(&body_str).await;
1660 }
1661
1662 #[test]
1663 fn run_simple_check_with_noupdate_result() {
1664 block_on(async {
1665 let http = MockHttpRequest::new(HttpResponse::new(make_noupdate_httpresponse()));
1666
1667 StateMachineBuilder::new_stub()
1668 .http(http)
1669 .oneshot(RequestParams::default())
1670 .await
1671 .unwrap();
1672
1673 info!("update check complete!");
1674 });
1675 }
1676
1677 #[test]
1678 fn test_cohort_returned_with_noupdate_result() {
1679 block_on(async {
1680 let response = json!({"response":{
1681 "server": "prod",
1682 "protocol": "3.0",
1683 "app": [{
1684 "appid": "{00000000-0000-0000-0000-000000000001}",
1685 "status": "ok",
1686 "cohort": "1",
1687 "cohortname": "stable-channel",
1688 "updatecheck": {
1689 "status": "noupdate"
1690 }
1691 }]
1692 }});
1693 let response = serde_json::to_vec(&response).unwrap();
1694 let http = MockHttpRequest::new(HttpResponse::new(response));
1695
1696 let (response, reboot_after_update) = StateMachineBuilder::new_stub()
1697 .http(http)
1698 .oneshot(RequestParams::default())
1699 .await
1700 .unwrap();
1701 assert_eq!(
1702 "{00000000-0000-0000-0000-000000000001}",
1703 response.app_responses[0].app_id
1704 );
1705 assert_eq!(Some("1".into()), response.app_responses[0].cohort.id);
1706 assert_eq!(
1707 Some("stable-channel".into()),
1708 response.app_responses[0].cohort.name
1709 );
1710 assert_eq!(None, response.app_responses[0].cohort.hint);
1711
1712 assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
1713 });
1714 }
1715
1716 #[test]
1717 fn test_cohort_returned_with_update_result() {
1718 block_on(async {
1719 let response = json!({"response":{
1720 "server": "prod",
1721 "protocol": "3.0",
1722 "app": [{
1723 "appid": "{00000000-0000-0000-0000-000000000001}",
1724 "status": "ok",
1725 "cohort": "1",
1726 "cohortname": "stable-channel",
1727 "updatecheck": {
1728 "status": "ok"
1729 }
1730 }]
1731 }});
1732 let response = serde_json::to_vec(&response).unwrap();
1733 let http = MockHttpRequest::new(HttpResponse::new(response));
1734
1735 let (response, reboot_after_update) = StateMachineBuilder::new_stub()
1736 .http(http)
1737 .oneshot(RequestParams::default())
1738 .await
1739 .unwrap();
1740 assert_eq!(
1741 "{00000000-0000-0000-0000-000000000001}",
1742 response.app_responses[0].app_id
1743 );
1744 assert_eq!(Some("1".into()), response.app_responses[0].cohort.id);
1745 assert_eq!(
1746 Some("stable-channel".into()),
1747 response.app_responses[0].cohort.name
1748 );
1749 assert_eq!(None, response.app_responses[0].cohort.hint);
1750
1751 assert_matches!(reboot_after_update, RebootAfterUpdate::Needed(()));
1752 });
1753 }
1754
1755 #[test]
1756 fn test_report_parse_response_error() {
1757 block_on(async {
1758 let http = MockHttpRequest::new(HttpResponse::new("invalid response".into()));
1759
1760 let mut state_machine = StateMachineBuilder::new_stub().http(http).build().await;
1761
1762 let response = state_machine.oneshot(RequestParams::default()).await;
1763 assert_matches!(response, Err(UpdateCheckError::ResponseParser(_)));
1764
1765 let request_params = RequestParams::default();
1766 let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
1767 let event = Event {
1768 previous_version: Some("1.2.3.4".to_string()),
1769 ..Event::error(EventErrorCode::ParseResponse)
1770 };
1771 let apps = state_machine.app_set.lock().await.get_apps();
1772 request_builder = request_builder
1773 .add_event(&apps[0], event)
1774 .session_id(GUID::from_u128(0))
1775 .request_id(GUID::from_u128(2));
1776 assert_request(&state_machine.http, request_builder).await;
1777 });
1778 }
1779
1780 #[test]
1781 fn test_report_construct_install_plan_error() {
1782 block_on(async {
1783 let response = json!({"response":{
1784 "server": "prod",
1785 "protocol": "4.0",
1786 "app": [{
1787 "appid": "{00000000-0000-0000-0000-000000000001}",
1788 "status": "ok",
1789 "updatecheck": {
1790 "status": "ok"
1791 }
1792 }],
1793 }});
1794 let response = serde_json::to_vec(&response).unwrap();
1795 let http = MockHttpRequest::new(HttpResponse::new(response));
1796
1797 let mut state_machine = StateMachineBuilder::new_stub().http(http).build().await;
1798
1799 let response = state_machine.oneshot(RequestParams::default()).await;
1800 assert_matches!(response, Err(UpdateCheckError::InstallPlan(_)));
1801
1802 let request_params = RequestParams::default();
1803 let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
1804 let event = Event {
1805 previous_version: Some("1.2.3.4".to_string()),
1806 ..Event::error(EventErrorCode::ConstructInstallPlan)
1807 };
1808 let apps = state_machine.app_set.lock().await.get_apps();
1809 request_builder = request_builder
1810 .add_event(&apps[0], event)
1811 .session_id(GUID::from_u128(0))
1812 .request_id(GUID::from_u128(2));
1813 assert_request(&state_machine.http, request_builder).await;
1814 });
1815 }
1816
1817 #[test]
1818 fn test_report_installation_error() {
1819 block_on(async {
1820 let response = json!({"response":{
1821 "server": "prod",
1822 "protocol": "3.0",
1823 "app": [{
1824 "appid": "{00000000-0000-0000-0000-000000000001}",
1825 "status": "ok",
1826 "updatecheck": {
1827 "status": "ok",
1828 "manifest": {
1829 "version": "5.6.7.8",
1830 "actions": {
1831 "action": [],
1832 },
1833 "packages": {
1834 "package": [],
1835 },
1836 }
1837 }
1838 }],
1839 }});
1840 let response = serde_json::to_vec(&response).unwrap();
1841 let http = MockHttpRequest::new(HttpResponse::new(response));
1842
1843 let mut state_machine = StateMachineBuilder::new_stub()
1844 .http(http)
1845 .installer(StubInstaller { should_fail: true })
1846 .build()
1847 .await;
1848
1849 let (response, reboot_after_update) = state_machine
1850 .oneshot(RequestParams::default())
1851 .await
1852 .unwrap();
1853 assert_eq!(
1854 Action::InstallPlanExecutionError,
1855 response.app_responses[0].result
1856 );
1857 assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
1858
1859 let request_params = RequestParams::default();
1860 let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
1861 let event = Event {
1862 previous_version: Some("1.2.3.4".to_string()),
1863 next_version: Some("5.6.7.8".to_string()),
1864 download_time_ms: Some(0),
1865 ..Event::error(EventErrorCode::Installation)
1866 };
1867 let apps = state_machine.app_set.lock().await.get_apps();
1868 request_builder = request_builder
1869 .add_event(&apps[0], event)
1870 .session_id(GUID::from_u128(0))
1871 .request_id(GUID::from_u128(3));
1872 assert_request(&state_machine.http, request_builder).await;
1873 });
1874 }
1875
1876 #[test]
1877 fn test_report_installation_error_multi_app() {
1878 block_on(async {
1879 let response = json!({"response":{
1881 "server": "prod",
1882 "protocol": "3.0",
1883 "app": [{
1884 "appid": "appid_3",
1885 "status": "ok",
1886 "updatecheck": {
1887 "status": "ok",
1888 "manifest": {
1889 "version": "5.6.7.8",
1890 "actions": {
1891 "action": [],
1892 },
1893 "packages": {
1894 "package": [],
1895 },
1896 }
1897 }
1898 },{
1899 "appid": "appid_1",
1900 "status": "ok",
1901 "updatecheck": {
1902 "status": "ok",
1903 "manifest": {
1904 "version": "1.2.3.4",
1905 "actions": {
1906 "action": [],
1907 },
1908 "packages": {
1909 "package": [],
1910 },
1911 }
1912 }
1913 },{
1914 "appid": "appid_2",
1915 "status": "ok",
1916 "updatecheck": {
1917 "status": "noupdate",
1918 }
1919 }],
1920 }});
1921 let response = serde_json::to_vec(&response).unwrap();
1922 let mut http = MockHttpRequest::new(HttpResponse::new(response));
1923 http.add_response(HttpResponse::new(vec![]));
1924 let app_set = VecAppSet::new(vec![
1925 App::builder().id("appid_1").version([1, 2, 3, 3]).build(),
1926 App::builder().id("appid_2").version([9, 9, 9, 9]).build(),
1927 App::builder().id("appid_3").version([5, 6, 7, 7]).build(),
1928 ]);
1929 let app_set = Rc::new(Mutex::new(app_set));
1930 let (send_install, mut recv_install) = mpsc::channel(0);
1931
1932 let mut state_machine = StateMachineBuilder::new_stub()
1933 .app_set(Rc::clone(&app_set))
1934 .http(http)
1935 .installer(BlockingInstaller {
1936 on_install: send_install,
1937 on_reboot: None,
1938 })
1939 .build()
1940 .await;
1941
1942 let recv_install_fut = async move {
1943 let unblock_install = recv_install.next().await.unwrap();
1944 unblock_install
1945 .send(vec![
1946 AppInstallResult::Deferred,
1947 AppInstallResult::Installed,
1948 ])
1949 .unwrap();
1950 };
1951
1952 let (oneshot_result, ()) = future::join(
1953 state_machine.oneshot(RequestParams::default()),
1954 recv_install_fut,
1955 )
1956 .await;
1957 let (response, reboot_after_update) = oneshot_result.unwrap();
1958
1959 assert_eq!("appid_3", response.app_responses[0].app_id);
1960 assert_eq!(Action::DeferredByPolicy, response.app_responses[0].result);
1961 assert_eq!("appid_1", response.app_responses[1].app_id);
1962 assert_eq!(Action::Updated, response.app_responses[1].result);
1963 assert_eq!("appid_2", response.app_responses[2].app_id);
1964 assert_eq!(Action::NoUpdate, response.app_responses[2].result);
1965 assert_matches!(reboot_after_update, RebootAfterUpdate::Needed(()));
1966
1967 let request_params = RequestParams::default();
1968 let apps = app_set.lock().await.get_apps();
1969
1970 let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
1971 let event = Event {
1972 previous_version: Some("1.2.3.3".to_string()),
1973 next_version: Some("1.2.3.4".to_string()),
1974 download_time_ms: Some(0),
1975 ..Event::success(EventType::UpdateComplete)
1976 };
1977 request_builder = request_builder
1978 .add_event(&apps[0], event)
1979 .session_id(GUID::from_u128(0))
1980 .request_id(GUID::from_u128(4));
1981 assert_request(&state_machine.http, request_builder).await;
1982
1983 let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
1984 let event1 = Event {
1985 previous_version: Some("1.2.3.3".to_string()),
1986 next_version: Some("1.2.3.4".to_string()),
1987 download_time_ms: Some(0),
1988 ..Event::success(EventType::UpdateDownloadFinished)
1989 };
1990 let event2 = Event {
1991 previous_version: Some("5.6.7.7".to_string()),
1992 next_version: Some("5.6.7.8".to_string()),
1993 download_time_ms: Some(0),
1994 event_type: EventType::UpdateComplete,
1995 event_result: EventResult::UpdateDeferred,
1996 ..Event::default()
1997 };
1998 request_builder = request_builder
1999 .add_event(&apps[2], event2)
2000 .add_event(&apps[0], event1)
2001 .session_id(GUID::from_u128(0))
2002 .request_id(GUID::from_u128(3));
2003 assert_request(&state_machine.http, request_builder).await;
2004 });
2005 }
2006
2007 #[test]
2010 fn test_observe_installation_error() {
2011 block_on(async {
2012 let http = MockHttpRequest::new(make_update_available_response());
2013
2014 let actual_errors = StateMachineBuilder::new_stub()
2015 .http(http)
2016 .installer(StubInstaller { should_fail: true })
2017 .oneshot_check()
2018 .await
2019 .filter_map(|event| {
2020 future::ready(match event {
2021 StateMachineEvent::InstallerError(Some(e)) => {
2022 Some(*e.downcast::<StubInstallErrors>().unwrap())
2023 }
2024 _ => None,
2025 })
2026 })
2027 .collect::<Vec<StubInstallErrors>>()
2028 .await;
2029
2030 let expected_errors = vec![StubInstallErrors::Failed];
2031 assert_eq!(actual_errors, expected_errors);
2032 });
2033 }
2034
2035 #[test]
2036 fn test_report_deferred_by_policy() {
2037 block_on(async {
2038 let http = MockHttpRequest::new(make_update_available_response());
2039
2040 let policy_engine = MockPolicyEngine {
2041 update_decision: UpdateDecision::DeferredByPolicy,
2042 ..MockPolicyEngine::default()
2043 };
2044 let mut state_machine = StateMachineBuilder::new_stub()
2045 .policy_engine(policy_engine)
2046 .http(http)
2047 .build()
2048 .await;
2049
2050 let (response, reboot_after_update) = state_machine
2051 .oneshot(RequestParams::default())
2052 .await
2053 .unwrap();
2054 assert_eq!(Action::DeferredByPolicy, response.app_responses[0].result);
2055 assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
2056
2057 let request_params = RequestParams::default();
2058 let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
2059 let event = Event {
2060 event_type: EventType::UpdateComplete,
2061 event_result: EventResult::UpdateDeferred,
2062 previous_version: Some("1.2.3.4".to_string()),
2063 ..Event::default()
2064 };
2065 let apps = state_machine.app_set.lock().await.get_apps();
2066 request_builder = request_builder
2067 .add_event(&apps[0], event)
2068 .session_id(GUID::from_u128(0))
2069 .request_id(GUID::from_u128(2));
2070 assert_request(&state_machine.http, request_builder).await;
2071 });
2072 }
2073
2074 #[test]
2075 fn test_report_denied_by_policy() {
2076 block_on(async {
2077 let response = make_update_available_response();
2078 let http = MockHttpRequest::new(response);
2079 let policy_engine = MockPolicyEngine {
2080 update_decision: UpdateDecision::DeniedByPolicy,
2081 ..MockPolicyEngine::default()
2082 };
2083
2084 let mut state_machine = StateMachineBuilder::new_stub()
2085 .policy_engine(policy_engine)
2086 .http(http)
2087 .build()
2088 .await;
2089
2090 let (response, reboot_after_update) = state_machine
2091 .oneshot(RequestParams::default())
2092 .await
2093 .unwrap();
2094 assert_eq!(Action::DeniedByPolicy, response.app_responses[0].result);
2095 assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
2096
2097 let request_params = RequestParams::default();
2098 let mut request_builder = RequestBuilder::new(&state_machine.config, &request_params);
2099 let event = Event {
2100 previous_version: Some("1.2.3.4".to_string()),
2101 ..Event::error(EventErrorCode::DeniedByPolicy)
2102 };
2103 let apps = state_machine.app_set.lock().await.get_apps();
2104 request_builder = request_builder
2105 .add_event(&apps[0], event)
2106 .session_id(GUID::from_u128(0))
2107 .request_id(GUID::from_u128(2));
2108 assert_request(&state_machine.http, request_builder).await;
2109 });
2110 }
2111
2112 #[test]
2113 fn test_wait_timer() {
2114 let mut pool = LocalPool::new();
2115 let mock_time = MockTimeSource::new_from_now();
2116 let next_update_time = mock_time.now() + Duration::from_secs(111);
2117 let (timer, mut timers) = BlockingTimer::new();
2118 let policy_engine = MockPolicyEngine {
2119 check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
2120 time_source: mock_time,
2121 ..MockPolicyEngine::default()
2122 };
2123
2124 let (_ctl, state_machine) = pool.run_until(
2125 StateMachineBuilder::new_stub()
2126 .policy_engine(policy_engine)
2127 .timer(timer)
2128 .start(),
2129 );
2130
2131 pool.spawner()
2132 .spawn_local(state_machine.map(|_| ()).collect())
2133 .unwrap();
2134
2135 let blocked_timer = pool.run_until(timers.next()).unwrap();
2138 assert_eq!(
2139 blocked_timer.requested_wait(),
2140 RequestedWait::Until(next_update_time.into())
2141 );
2142 }
2143
2144 #[test]
2145 fn test_cohort_and_user_counting_updates_are_used_in_subsequent_requests() {
2146 block_on(async {
2147 let response = json!({"response":{
2148 "server": "prod",
2149 "protocol": "3.0",
2150 "daystart": {
2151 "elapsed_days": 1234567,
2152 "elapsed_seconds": 3645
2153 },
2154 "app": [{
2155 "appid": "{00000000-0000-0000-0000-000000000001}",
2156 "status": "ok",
2157 "cohort": "1",
2158 "cohortname": "stable-channel",
2159 "updatecheck": {
2160 "status": "noupdate"
2161 }
2162 }]
2163 }});
2164 let response = serde_json::to_vec(&response).unwrap();
2165 let mut http = MockHttpRequest::new(HttpResponse::new(response.clone()));
2166 http.add_response(HttpResponse::new(response));
2167 let apps = make_test_app_set();
2168
2169 let mut state_machine = StateMachineBuilder::new_stub()
2170 .http(http)
2171 .app_set(apps.clone())
2172 .build()
2173 .await;
2174
2175 state_machine.run_once().await;
2177
2178 let apps = apps.lock().await.get_apps();
2179 assert_eq!(Some("1".to_string()), apps[0].cohort.id);
2180 assert_eq!(None, apps[0].cohort.hint);
2181 assert_eq!(Some("stable-channel".to_string()), apps[0].cohort.name);
2182 assert_eq!(
2183 UserCounting::ClientRegulatedByDate(Some(1234567)),
2184 apps[0].user_counting
2185 );
2186
2187 state_machine.run_once().await;
2189
2190 let request_params = RequestParams::default();
2191 let expected_request_builder =
2192 RequestBuilder::new(&state_machine.config, &request_params)
2193 .add_update_check(&apps[0])
2194 .add_ping(&apps[0])
2195 .session_id(GUID::from_u128(2))
2196 .request_id(GUID::from_u128(3));
2197 assert_request(&state_machine.http, expected_request_builder).await;
2199 });
2200 }
2201
2202 #[test]
2203 fn test_user_counting_returned() {
2204 block_on(async {
2205 let response = json!({"response":{
2206 "server": "prod",
2207 "protocol": "3.0",
2208 "daystart": {
2209 "elapsed_days": 1234567,
2210 "elapsed_seconds": 3645
2211 },
2212 "app": [{
2213 "appid": "{00000000-0000-0000-0000-000000000001}",
2214 "status": "ok",
2215 "cohort": "1",
2216 "cohortname": "stable-channel",
2217 "updatecheck": {
2218 "status": "noupdate"
2219 }
2220 }]
2221 }});
2222 let response = serde_json::to_vec(&response).unwrap();
2223 let http = MockHttpRequest::new(HttpResponse::new(response));
2224
2225 let (response, reboot_after_update) = StateMachineBuilder::new_stub()
2226 .http(http)
2227 .oneshot(RequestParams::default())
2228 .await
2229 .unwrap();
2230
2231 assert_eq!(
2232 UserCounting::ClientRegulatedByDate(Some(1234567)),
2233 response.app_responses[0].user_counting
2234 );
2235 assert_matches!(reboot_after_update, RebootAfterUpdate::NotNeeded);
2236 });
2237 }
2238
2239 #[test]
2240 fn test_observe_state() {
2241 block_on(async {
2242 let actual_states = StateMachineBuilder::new_stub()
2243 .oneshot_check()
2244 .await
2245 .filter_map(|event| {
2246 future::ready(match event {
2247 StateMachineEvent::StateChange(state) => Some(state),
2248 _ => None,
2249 })
2250 })
2251 .collect::<Vec<State>>()
2252 .await;
2253
2254 let expected_states = vec![
2255 State::CheckingForUpdates(InstallSource::ScheduledTask),
2256 State::ErrorCheckingForUpdate,
2257 ];
2258 assert_eq!(actual_states, expected_states);
2259 });
2260 }
2261
2262 #[test]
2263 fn test_observe_schedule() {
2264 block_on(async {
2265 let mock_time = MockTimeSource::new_from_now();
2266 let actual_schedules = StateMachineBuilder::new_stub()
2267 .policy_engine(StubPolicyEngine::new(&mock_time))
2268 .oneshot_check()
2269 .await
2270 .filter_map(|event| {
2271 future::ready(match event {
2272 StateMachineEvent::ScheduleChange(schedule) => Some(schedule),
2273 _ => None,
2274 })
2275 })
2276 .collect::<Vec<UpdateCheckSchedule>>()
2277 .await;
2278
2279 let expected_schedule = UpdateCheckSchedule::builder()
2281 .last_update_time(mock_time.now())
2282 .last_update_check_time(mock_time.now())
2283 .build();
2284
2285 assert_eq!(actual_schedules, vec![expected_schedule]);
2286 });
2287 }
2288
2289 #[test]
2290 fn test_observe_protocol_state() {
2291 block_on(async {
2292 let actual_protocol_states = StateMachineBuilder::new_stub()
2293 .oneshot_check()
2294 .await
2295 .filter_map(|event| {
2296 future::ready(match event {
2297 StateMachineEvent::ProtocolStateChange(state) => Some(state),
2298 _ => None,
2299 })
2300 })
2301 .collect::<Vec<ProtocolState>>()
2302 .await;
2303
2304 let expected_protocol_state = ProtocolState {
2305 consecutive_failed_update_checks: 1,
2306 ..ProtocolState::default()
2307 };
2308
2309 assert_eq!(actual_protocol_states, vec![expected_protocol_state]);
2310 });
2311 }
2312
2313 #[test]
2314 fn test_observe_omaha_server_response() {
2315 block_on(async {
2316 let response = json!({"response":{
2317 "server": "prod",
2318 "protocol": "3.0",
2319 "app": [{
2320 "appid": "{00000000-0000-0000-0000-000000000001}",
2321 "status": "ok",
2322 "cohort": "1",
2323 "cohortname": "stable-channel",
2324 "updatecheck": {
2325 "status": "noupdate"
2326 }
2327 }]
2328 }});
2329 let response = serde_json::to_vec(&response).unwrap();
2330 let expected_omaha_response = response::parse_json_response(&response).unwrap();
2331 let http = MockHttpRequest::new(HttpResponse::new(response));
2332
2333 let actual_omaha_response = StateMachineBuilder::new_stub()
2334 .http(http)
2335 .oneshot_check()
2336 .await
2337 .filter_map(|event| {
2338 future::ready(match event {
2339 StateMachineEvent::OmahaServerResponse(response) => Some(response),
2340 _ => None,
2341 })
2342 })
2343 .collect::<Vec<response::Response>>()
2344 .await;
2345
2346 assert_eq!(actual_omaha_response, vec![expected_omaha_response]);
2347 });
2348 }
2349
2350 #[test]
2351 fn test_metrics_report_omaha_event_lost() {
2352 block_on(async {
2353 let http = MockHttpRequest::new(HttpResponse::new("invalid response".into()));
2368 let mut metrics_reporter = MockMetricsReporter::new();
2369 let _response = StateMachineBuilder::new_stub()
2370 .http(http)
2371 .metrics_reporter(&mut metrics_reporter)
2372 .oneshot(RequestParams::default())
2373 .await;
2374
2375 #[rustfmt::skip]
2378 assert_matches!(
2379 metrics_reporter.metrics.as_slice(),
2380 [
2381 Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
2382 Metrics::RequestsPerCheck { count: 1, successful: true },
2383 Metrics::OmahaEventLost(Event {
2384 event_type: EventType::UpdateComplete,
2385 event_result: EventResult::Error,
2386 errorcode: Some(EventErrorCode::ParseResponse),
2387 previous_version: None,
2388 next_version: None,
2389 download_time_ms: None,
2390 })
2391 ]
2392 );
2393 });
2394 }
2395
2396 #[test]
2397 fn test_metrics_report_update_check_response_time() {
2398 block_on(async {
2399 let mut metrics_reporter = MockMetricsReporter::new();
2400 let _response = StateMachineBuilder::new_stub()
2401 .metrics_reporter(&mut metrics_reporter)
2402 .oneshot(RequestParams::default())
2403 .await;
2404
2405 #[rustfmt::skip]
2408 assert_matches!(
2409 metrics_reporter.metrics.as_slice(),
2410 [
2411 Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
2412 Metrics::RequestsPerCheck { count: 1, successful: true },
2413 ]
2414 );
2415 });
2416 }
2417
2418 #[test]
2419 fn test_metrics_report_update_check_response_time_on_failure() {
2420 block_on(async {
2421 let mut metrics_reporter = MockMetricsReporter::new();
2422 let mut http = MockHttpRequest::default();
2423
2424 for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS {
2425 http.add_error(http_request::mock_errors::make_transport_error());
2426 }
2427
2428 http.add_response(hyper::Response::default());
2431
2432 let _response = StateMachineBuilder::new_stub()
2433 .http(http)
2434 .metrics_reporter(&mut metrics_reporter)
2435 .oneshot(RequestParams::default())
2436 .await;
2437
2438 #[rustfmt::skip]
2441 assert_matches!(
2442 metrics_reporter.metrics.as_slice(),
2443 [
2444 Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
2445 Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
2446 Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
2447 Metrics::RequestsPerCheck { count: 3, successful: false },
2448 ]
2449 );
2450 });
2451 }
2452
2453 #[test]
2454 fn test_metrics_report_update_check_response_time_on_failure_followed_by_success() {
2455 block_on(async {
2456 let mut metrics_reporter = MockMetricsReporter::new();
2457 let mut http = MockHttpRequest::default();
2458
2459 for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS - 1 {
2460 http.add_error(http_request::mock_errors::make_transport_error());
2461 }
2462 http.add_response(hyper::Response::default());
2463
2464 let _response = StateMachineBuilder::new_stub()
2465 .http(http)
2466 .metrics_reporter(&mut metrics_reporter)
2467 .oneshot(RequestParams::default())
2468 .await;
2469
2470 #[rustfmt::skip]
2473 assert_matches!(
2474 metrics_reporter.metrics.as_slice(),
2475 [
2476 Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
2477 Metrics::UpdateCheckResponseTime { response_time: _, successful: false },
2478 Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
2479 Metrics::RequestsPerCheck { count: 3, successful: true },
2480 Metrics::OmahaEventLost(Event {
2481 event_type: EventType::UpdateComplete,
2482 event_result: EventResult::Error,
2483 errorcode: Some(EventErrorCode::ParseResponse),
2484 previous_version: None,
2485 next_version: None,
2486 download_time_ms: None
2487 }),
2488 ]
2489 );
2490 });
2491 }
2492
2493 #[test]
2494 fn test_metrics_report_requests_per_check() {
2495 block_on(async {
2496 let mut metrics_reporter = MockMetricsReporter::new();
2497 let _response = StateMachineBuilder::new_stub()
2498 .metrics_reporter(&mut metrics_reporter)
2499 .oneshot(RequestParams::default())
2500 .await;
2501
2502 assert!(
2503 metrics_reporter
2504 .metrics
2505 .contains(&Metrics::RequestsPerCheck {
2506 count: 1,
2507 successful: true
2508 })
2509 );
2510 });
2511 }
2512
2513 #[test]
2514 fn test_metrics_report_requests_per_check_on_failure_followed_by_success() {
2515 block_on(async {
2516 let mut metrics_reporter = MockMetricsReporter::new();
2517 let mut http = MockHttpRequest::default();
2518
2519 for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS - 1 {
2520 http.add_error(http_request::mock_errors::make_transport_error());
2521 }
2522
2523 http.add_response(hyper::Response::default());
2524
2525 let _response = StateMachineBuilder::new_stub()
2526 .http(http)
2527 .metrics_reporter(&mut metrics_reporter)
2528 .oneshot(RequestParams::default())
2529 .await;
2530
2531 assert!(!metrics_reporter.metrics.is_empty());
2532 assert!(
2533 metrics_reporter
2534 .metrics
2535 .contains(&Metrics::RequestsPerCheck {
2536 count: MAX_OMAHA_REQUEST_ATTEMPTS,
2537 successful: true
2538 })
2539 );
2540 });
2541 }
2542
2543 #[test]
2544 fn test_metrics_report_requests_per_check_on_failure() {
2545 block_on(async {
2546 let mut metrics_reporter = MockMetricsReporter::new();
2547 let mut http = MockHttpRequest::default();
2548
2549 for _ in 0..MAX_OMAHA_REQUEST_ATTEMPTS {
2550 http.add_error(http_request::mock_errors::make_transport_error());
2551 }
2552
2553 http.add_response(hyper::Response::default());
2555
2556 let _response = StateMachineBuilder::new_stub()
2557 .http(http)
2558 .metrics_reporter(&mut metrics_reporter)
2559 .oneshot(RequestParams::default())
2560 .await;
2561
2562 assert!(!metrics_reporter.metrics.is_empty());
2563 assert!(
2564 metrics_reporter
2565 .metrics
2566 .contains(&Metrics::RequestsPerCheck {
2567 count: MAX_OMAHA_REQUEST_ATTEMPTS,
2568 successful: false
2569 })
2570 );
2571 });
2572 }
2573
2574 #[test]
2575 fn test_requests_per_check_backoff_with_mock_timer() {
2576 block_on(async {
2577 let mut timer = MockTimer::new();
2578 timer.expect_for_range(Duration::from_millis(500), Duration::from_millis(1500));
2579 timer.expect_for_range(Duration::from_millis(1500), Duration::from_millis(2500));
2580 let requested_waits = timer.get_requested_waits_view();
2581 let response = StateMachineBuilder::new_stub()
2582 .http(MockHttpRequest::empty())
2583 .timer(timer)
2584 .oneshot(RequestParams::default())
2585 .await;
2586
2587 let waits = requested_waits.borrow();
2588 assert_eq!(waits.len(), 2);
2589 assert_matches!(
2590 waits[0],
2591 RequestedWait::For(d) if d >= Duration::from_millis(500) && d <= Duration::from_millis(1500)
2592 );
2593 assert_matches!(
2594 waits[1],
2595 RequestedWait::For(d) if d >= Duration::from_millis(1500) && d <= Duration::from_millis(2500)
2596 );
2597
2598 assert_matches!(
2599 response,
2600 Err(UpdateCheckError::OmahaRequest(
2601 OmahaRequestError::HttpStatus(_)
2602 ))
2603 );
2604 });
2605 }
2606
2607 #[test]
2608 fn test_metrics_report_update_check_failure_reason_omaha() {
2609 block_on(async {
2610 let mut metrics_reporter = MockMetricsReporter::new();
2611 let mut state_machine = StateMachineBuilder::new_stub()
2612 .metrics_reporter(&mut metrics_reporter)
2613 .build()
2614 .await;
2615
2616 state_machine.run_once().await;
2617
2618 assert!(
2619 metrics_reporter
2620 .metrics
2621 .contains(&Metrics::UpdateCheckFailureReason(
2622 UpdateCheckFailureReason::Omaha
2623 ))
2624 );
2625 });
2626 }
2627
2628 #[test]
2629 fn test_metrics_report_update_check_failure_reason_network() {
2630 block_on(async {
2631 let mut metrics_reporter = MockMetricsReporter::new();
2632 let mut state_machine = StateMachineBuilder::new_stub()
2633 .http(MockHttpRequest::empty())
2634 .metrics_reporter(&mut metrics_reporter)
2635 .build()
2636 .await;
2637
2638 state_machine.run_once().await;
2639
2640 assert!(
2641 metrics_reporter
2642 .metrics
2643 .contains(&Metrics::UpdateCheckFailureReason(
2644 UpdateCheckFailureReason::Network
2645 ))
2646 );
2647 });
2648 }
2649
2650 #[test]
2651 fn test_persist_last_update_time() {
2652 block_on(async {
2653 let storage = Rc::new(Mutex::new(MemStorage::new()));
2654
2655 StateMachineBuilder::new_stub()
2656 .storage(Rc::clone(&storage))
2657 .oneshot_check()
2658 .await
2659 .map(|_| ())
2660 .collect::<()>()
2661 .await;
2662
2663 let storage = storage.lock().await;
2664 storage.get_int(LAST_UPDATE_TIME).await.unwrap();
2665 assert!(storage.committed());
2666 });
2667 }
2668
2669 #[test]
2670 fn test_persist_server_dictated_poll_interval() {
2671 block_on(async {
2672 let response = HttpResponse::builder()
2673 .header(X_RETRY_AFTER, 1234)
2674 .body(make_noupdate_httpresponse())
2675 .unwrap();
2676 let http = MockHttpRequest::new(response);
2677 let storage = Rc::new(Mutex::new(MemStorage::new()));
2678
2679 let mut state_machine = StateMachineBuilder::new_stub()
2680 .http(http)
2681 .storage(Rc::clone(&storage))
2682 .build()
2683 .await;
2684 state_machine
2685 .oneshot(RequestParams::default())
2686 .await
2687 .unwrap();
2688
2689 assert_eq!(
2690 state_machine.context.state.server_dictated_poll_interval,
2691 Some(Duration::from_secs(1234))
2692 );
2693
2694 let storage = storage.lock().await;
2695 assert_eq!(
2696 storage.get_int(SERVER_DICTATED_POLL_INTERVAL).await,
2697 Some(1234000000)
2698 );
2699 assert!(storage.committed());
2700 });
2701 }
2702
2703 #[test]
2704 fn test_persist_server_dictated_poll_interval_http_error() {
2705 block_on(async {
2706 let response = HttpResponse::builder()
2707 .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
2708 .header(X_RETRY_AFTER, 1234)
2709 .body(vec![])
2710 .unwrap();
2711 let http = MockHttpRequest::new(response);
2712 let storage = Rc::new(Mutex::new(MemStorage::new()));
2713
2714 let mut state_machine = StateMachineBuilder::new_stub()
2715 .http(http)
2716 .storage(Rc::clone(&storage))
2717 .build()
2718 .await;
2719 assert_matches!(
2720 state_machine.oneshot(RequestParams::default()).await,
2721 Err(UpdateCheckError::OmahaRequest(
2722 OmahaRequestError::HttpStatus(_)
2723 ))
2724 );
2725
2726 assert_eq!(
2727 state_machine.context.state.server_dictated_poll_interval,
2728 Some(Duration::from_secs(1234))
2729 );
2730
2731 let storage = storage.lock().await;
2732 assert_eq!(
2733 storage.get_int(SERVER_DICTATED_POLL_INTERVAL).await,
2734 Some(1234000000)
2735 );
2736 assert!(storage.committed());
2737 });
2738 }
2739
2740 #[test]
2741 fn test_persist_server_dictated_poll_interval_max_duration() {
2742 block_on(async {
2743 let response = HttpResponse::builder()
2744 .status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
2745 .header(X_RETRY_AFTER, 123456789)
2746 .body(vec![])
2747 .unwrap();
2748 let http = MockHttpRequest::new(response);
2749 let storage = Rc::new(Mutex::new(MemStorage::new()));
2750
2751 let mut state_machine = StateMachineBuilder::new_stub()
2752 .http(http)
2753 .storage(Rc::clone(&storage))
2754 .build()
2755 .await;
2756 assert_matches!(
2757 state_machine.oneshot(RequestParams::default()).await,
2758 Err(UpdateCheckError::OmahaRequest(
2759 OmahaRequestError::HttpStatus(_)
2760 ))
2761 );
2762
2763 assert_eq!(
2764 state_machine.context.state.server_dictated_poll_interval,
2765 Some(Duration::from_secs(86400))
2766 );
2767
2768 let storage = storage.lock().await;
2769 assert_eq!(
2770 storage.get_int(SERVER_DICTATED_POLL_INTERVAL).await,
2771 Some(86400000000)
2772 );
2773 assert!(storage.committed());
2774 });
2775 }
2776
2777 #[test]
2778 fn test_server_dictated_poll_interval_with_transport_error_no_retry() {
2779 block_on(async {
2780 let mut http = MockHttpRequest::empty();
2781 http.add_error(http_request::mock_errors::make_transport_error());
2782 let mut storage = MemStorage::new();
2783 let _ = storage.set_int(SERVER_DICTATED_POLL_INTERVAL, 1234000000);
2784 let _ = storage.commit();
2785 let storage = Rc::new(Mutex::new(storage));
2786
2787 let mut state_machine = StateMachineBuilder::new_stub()
2788 .http(http)
2789 .storage(Rc::clone(&storage))
2790 .build()
2791 .await;
2792 assert_matches!(
2796 state_machine.oneshot(RequestParams::default()).await,
2797 Err(UpdateCheckError::OmahaRequest(
2798 OmahaRequestError::HttpTransport(_)
2799 ))
2800 );
2801
2802 assert_eq!(
2803 state_machine.context.state.server_dictated_poll_interval,
2804 Some(Duration::from_secs(1234))
2805 );
2806 });
2807 }
2808
2809 #[test]
2810 fn test_persist_app() {
2811 block_on(async {
2812 let storage = Rc::new(Mutex::new(MemStorage::new()));
2813 let app_set = make_test_app_set();
2814
2815 StateMachineBuilder::new_stub()
2816 .storage(Rc::clone(&storage))
2817 .app_set(app_set.clone())
2818 .oneshot_check()
2819 .await
2820 .map(|_| ())
2821 .collect::<()>()
2822 .await;
2823
2824 let storage = storage.lock().await;
2825 let apps = app_set.lock().await.get_apps();
2826 storage.get_string(&apps[0].id).await.unwrap();
2827 assert!(storage.committed());
2828 });
2829 }
2830
2831 #[test]
2832 fn test_load_last_update_time() {
2833 block_on(async {
2834 let mut storage = MemStorage::new();
2835 let mut mock_time = MockTimeSource::new_from_now();
2836 mock_time.truncate_submicrosecond_walltime();
2837 let last_update_time = mock_time.now_in_walltime() - Duration::from_secs(999);
2838 storage
2839 .set_time(LAST_UPDATE_TIME, last_update_time)
2840 .await
2841 .unwrap();
2842
2843 let state_machine = StateMachineBuilder::new_stub()
2844 .policy_engine(StubPolicyEngine::new(&mock_time))
2845 .storage(Rc::new(Mutex::new(storage)))
2846 .build()
2847 .await;
2848
2849 assert_eq!(
2850 state_machine.context.schedule.last_update_time.unwrap(),
2851 PartialComplexTime::Wall(last_update_time)
2852 );
2853 });
2854 }
2855
2856 #[test]
2857 fn test_load_server_dictated_poll_interval() {
2858 block_on(async {
2859 let mut storage = MemStorage::new();
2860 storage
2861 .set_int(SERVER_DICTATED_POLL_INTERVAL, 56789)
2862 .await
2863 .unwrap();
2864
2865 let state_machine = StateMachineBuilder::new_stub()
2866 .storage(Rc::new(Mutex::new(storage)))
2867 .build()
2868 .await;
2869
2870 assert_eq!(
2871 Some(Duration::from_micros(56789)),
2872 state_machine.context.state.server_dictated_poll_interval
2873 );
2874 });
2875 }
2876
2877 #[test]
2878 fn test_load_app() {
2879 block_on(async {
2880 let app_set = VecAppSet::new(vec![
2881 App::builder()
2882 .id("{00000000-0000-0000-0000-000000000001}")
2883 .version([1, 2, 3, 4])
2884 .build(),
2885 ]);
2886 let mut storage = MemStorage::new();
2887 let persisted_app = PersistedApp {
2888 cohort: Cohort {
2889 id: Some("cohort_id".to_string()),
2890 hint: Some("test_channel".to_string()),
2891 name: None,
2892 },
2893 user_counting: UserCounting::ClientRegulatedByDate(Some(22222)),
2894 };
2895 let json = serde_json::to_string(&persisted_app).unwrap();
2896 let apps = app_set.get_apps();
2897 storage.set_string(&apps[0].id, &json).await.unwrap();
2898
2899 let app_set = Rc::new(Mutex::new(app_set));
2900
2901 let _state_machine = StateMachineBuilder::new_stub()
2902 .storage(Rc::new(Mutex::new(storage)))
2903 .app_set(Rc::clone(&app_set))
2904 .build()
2905 .await;
2906
2907 let apps = app_set.lock().await.get_apps();
2908 assert_eq!(persisted_app.cohort, apps[0].cohort);
2909 assert_eq!(
2910 UserCounting::ClientRegulatedByDate(Some(22222)),
2911 apps[0].user_counting
2912 );
2913 });
2914 }
2915
2916 #[test]
2917 fn test_report_check_interval_with_no_storage() {
2918 block_on(async {
2919 let mut mock_time = MockTimeSource::new_from_now();
2920 let mut state_machine = StateMachineBuilder::new_stub()
2921 .policy_engine(StubPolicyEngine::new(mock_time.clone()))
2922 .metrics_reporter(MockMetricsReporter::new())
2923 .build()
2924 .await;
2925
2926 state_machine
2927 .report_check_interval(InstallSource::ScheduledTask)
2928 .await;
2929 assert!(state_machine.metrics_reporter.metrics.is_empty());
2931
2932 let interval = Duration::from_micros(999999);
2934 mock_time.advance(interval);
2935
2936 state_machine
2937 .report_check_interval(InstallSource::ScheduledTask)
2938 .await;
2939
2940 assert_eq!(
2941 state_machine.metrics_reporter.metrics,
2942 vec![Metrics::UpdateCheckInterval {
2943 interval,
2944 clock: ClockType::Monotonic,
2945 install_source: InstallSource::ScheduledTask,
2946 }]
2947 );
2948 });
2949 }
2950
2951 #[test]
2952 fn test_report_check_interval_mono_transition() {
2953 block_on(async {
2954 let mut mock_time = MockTimeSource::new_from_now();
2955 let mut state_machine = StateMachineBuilder::new_stub()
2956 .policy_engine(StubPolicyEngine::new(mock_time.clone()))
2957 .metrics_reporter(MockMetricsReporter::new())
2958 .build()
2959 .await;
2960
2961 let initial_duration = Duration::from_secs(999);
2964 let initial_time = mock_time.now_in_walltime() - initial_duration;
2965 state_machine.context.schedule.last_update_check_time =
2966 Some(PartialComplexTime::Wall(initial_time));
2967 state_machine
2968 .report_check_interval(InstallSource::ScheduledTask)
2969 .await;
2970
2971 let interval = Duration::from_micros(999999);
2973 mock_time.advance(interval);
2974 state_machine
2975 .report_check_interval(InstallSource::ScheduledTask)
2976 .await;
2977
2978 mock_time.advance(interval);
2981 state_machine
2982 .report_check_interval(InstallSource::ScheduledTask)
2983 .await;
2984 assert_eq!(
2985 state_machine.metrics_reporter.metrics,
2986 vec![
2987 Metrics::UpdateCheckInterval {
2988 interval: initial_duration,
2989 clock: ClockType::Wall,
2990 install_source: InstallSource::ScheduledTask,
2991 },
2992 Metrics::UpdateCheckInterval {
2993 interval,
2994 clock: ClockType::Monotonic,
2995 install_source: InstallSource::ScheduledTask,
2996 },
2997 Metrics::UpdateCheckInterval {
2998 interval,
2999 clock: ClockType::Monotonic,
3000 install_source: InstallSource::ScheduledTask,
3001 },
3002 ]
3003 );
3004 });
3005 }
3006
3007 #[derive(Debug)]
3008 pub struct TestInstaller {
3009 reboot_called: Rc<RefCell<bool>>,
3010 install_fails: usize,
3011 mock_time: MockTimeSource,
3012 }
3013 struct TestInstallerBuilder {
3014 install_fails: usize,
3015 mock_time: MockTimeSource,
3016 }
3017 impl TestInstaller {
3018 fn builder(mock_time: MockTimeSource) -> TestInstallerBuilder {
3019 TestInstallerBuilder {
3020 install_fails: 0,
3021 mock_time,
3022 }
3023 }
3024 }
3025 impl TestInstallerBuilder {
3026 fn add_install_fail(mut self) -> Self {
3027 self.install_fails += 1;
3028 self
3029 }
3030 fn build(self) -> TestInstaller {
3031 TestInstaller {
3032 reboot_called: Rc::new(RefCell::new(false)),
3033 install_fails: self.install_fails,
3034 mock_time: self.mock_time,
3035 }
3036 }
3037 }
3038 const INSTALL_DURATION: Duration = Duration::from_micros(98765433);
3039
3040 impl Installer for TestInstaller {
3041 type InstallPlan = StubPlan;
3042 type Error = StubInstallErrors;
3043 type InstallResult = ();
3044
3045 fn perform_install<'a>(
3046 &'a mut self,
3047 _install_plan: &StubPlan,
3048 observer: Option<&'a dyn ProgressObserver>,
3049 ) -> LocalBoxFuture<'a, (Self::InstallResult, Vec<AppInstallResult<Self::Error>>)> {
3050 if self.install_fails > 0 {
3051 self.install_fails -= 1;
3052 future::ready((
3053 (),
3054 vec![AppInstallResult::Failed(StubInstallErrors::Failed)],
3055 ))
3056 .boxed()
3057 } else {
3058 self.mock_time.advance(INSTALL_DURATION);
3059 async move {
3060 if let Some(observer) = observer {
3061 observer.receive_progress(None, 0.0, None, None).await;
3062 observer.receive_progress(None, 0.3, None, None).await;
3063 observer.receive_progress(None, 0.9, None, None).await;
3064 observer.receive_progress(None, 1.0, None, None).await;
3065 }
3066 ((), vec![AppInstallResult::Installed])
3067 }
3068 .boxed_local()
3069 }
3070 }
3071
3072 fn perform_reboot(&mut self) -> LocalBoxFuture<'_, Result<(), anyhow::Error>> {
3073 self.reboot_called.replace(true);
3074 future::ready(Ok(())).boxed_local()
3075 }
3076
3077 fn try_create_install_plan<'a>(
3078 &'a self,
3079 _request_params: &'a RequestParams,
3080 _request_metadata: Option<&'a RequestMetadata>,
3081 _response: &'a Response,
3082 _response_bytes: Vec<u8>,
3083 _ecdsa_signature: Option<Vec<u8>>,
3084 ) -> LocalBoxFuture<'a, Result<Self::InstallPlan, Self::Error>> {
3085 future::ready(Ok(StubPlan)).boxed_local()
3086 }
3087 }
3088
3089 #[test]
3090 fn test_report_successful_update_duration() {
3091 block_on(async {
3092 let http = MockHttpRequest::new(make_update_available_response());
3093 let storage = Rc::new(Mutex::new(MemStorage::new()));
3094
3095 let mut mock_time = MockTimeSource::new_from_now();
3096 mock_time.truncate_submicrosecond_walltime();
3097 let now = mock_time.now();
3098
3099 let update_completed_time = now + INSTALL_DURATION;
3100 let expected_update_duration = update_completed_time.wall_duration_since(now).unwrap();
3101
3102 let first_seen_time = now - Duration::from_micros(1000);
3103
3104 let expected_duration_since_first_seen = update_completed_time
3105 .wall_duration_since(first_seen_time)
3106 .unwrap();
3107
3108 let mut state_machine = StateMachineBuilder::new_stub()
3109 .http(http)
3110 .installer(TestInstaller::builder(mock_time.clone()).build())
3111 .policy_engine(StubPolicyEngine::new(mock_time.clone()))
3112 .metrics_reporter(MockMetricsReporter::new())
3113 .storage(Rc::clone(&storage))
3114 .build()
3115 .await;
3116
3117 {
3118 let mut storage = storage.lock().await;
3119 storage.set_string(INSTALL_PLAN_ID, "").await.unwrap();
3120 storage
3121 .set_time(UPDATE_FIRST_SEEN_TIME, first_seen_time)
3122 .await
3123 .unwrap();
3124 storage.commit().await.unwrap();
3125 }
3126
3127 state_machine.run_once().await;
3128
3129 #[rustfmt::skip]
3130 assert_matches!(
3131 state_machine.metrics_reporter.metrics.as_slice(),
3132 [
3133 Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
3134 Metrics::RequestsPerCheck { count: 1, successful: true },
3135 Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadStarted, event_result: EventResult::Success, .. }),
3136 Metrics::SuccessfulUpdateDuration(install_duration),
3137 Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadFinished, event_result: EventResult::Success, .. }),
3138 Metrics::OmahaEventLost(Event { event_type: EventType::UpdateComplete, event_result: EventResult::Success, .. }),
3139 Metrics::SuccessfulUpdateFromFirstSeen(duration_since_first_seen),
3140 Metrics::AttemptsToSuccessfulCheck(1),
3141 Metrics::AttemptsToSuccessfulInstall { count: 1, successful: true },
3142 ]
3143 if
3144 *install_duration == expected_update_duration &&
3145 *duration_since_first_seen == expected_duration_since_first_seen
3146 );
3147 });
3148 }
3149
3150 #[test]
3151 fn test_report_failed_update_duration() {
3152 block_on(async {
3153 let http = MockHttpRequest::new(make_update_available_response());
3154 let mut state_machine = StateMachineBuilder::new_stub()
3155 .http(http)
3156 .installer(StubInstaller { should_fail: true })
3157 .metrics_reporter(MockMetricsReporter::new())
3158 .build()
3159 .await;
3160 state_machine.run_once().await;
3163
3164 assert!(
3165 state_machine
3166 .metrics_reporter
3167 .metrics
3168 .contains(&Metrics::FailedUpdateDuration(Duration::from_micros(0)))
3169 );
3170 });
3171 }
3172
3173 #[test]
3174 fn test_record_update_first_seen_time() {
3175 block_on(async {
3176 let storage = Rc::new(Mutex::new(MemStorage::new()));
3177 let mut state_machine = StateMachineBuilder::new_stub()
3178 .storage(Rc::clone(&storage))
3179 .build()
3180 .await;
3181
3182 let mut mock_time = MockTimeSource::new_from_now();
3183 mock_time.truncate_submicrosecond_walltime();
3184 let now = mock_time.now_in_walltime();
3185 assert_eq!(
3186 state_machine.record_update_first_seen_time("id", now).await,
3187 now
3188 );
3189 {
3190 let storage = storage.lock().await;
3191 assert_eq!(
3192 storage.get_string(INSTALL_PLAN_ID).await,
3193 Some("id".to_string())
3194 );
3195 assert_eq!(storage.get_time(UPDATE_FIRST_SEEN_TIME).await, Some(now));
3196 assert_eq!(storage.len(), 2);
3197 assert!(storage.committed());
3198 }
3199
3200 mock_time.advance(Duration::from_secs(1000));
3201 let now2 = mock_time.now_in_walltime();
3202 assert_eq!(
3203 state_machine
3204 .record_update_first_seen_time("id", now2)
3205 .await,
3206 now
3207 );
3208 {
3209 let storage = storage.lock().await;
3210 assert_eq!(
3211 storage.get_string(INSTALL_PLAN_ID).await,
3212 Some("id".to_string())
3213 );
3214 assert_eq!(storage.get_time(UPDATE_FIRST_SEEN_TIME).await, Some(now));
3215 assert_eq!(storage.len(), 2);
3216 assert!(storage.committed());
3217 }
3218 assert_eq!(
3219 state_machine
3220 .record_update_first_seen_time("id2", now2)
3221 .await,
3222 now2
3223 );
3224 {
3225 let storage = storage.lock().await;
3226 assert_eq!(
3227 storage.get_string(INSTALL_PLAN_ID).await,
3228 Some("id2".to_string())
3229 );
3230 assert_eq!(storage.get_time(UPDATE_FIRST_SEEN_TIME).await, Some(now2));
3231 assert_eq!(storage.len(), 2);
3232 assert!(storage.committed());
3233 }
3234 });
3235 }
3236
3237 #[test]
3238 fn test_report_attempts_to_successful_check() {
3239 block_on(async {
3240 let storage = Rc::new(Mutex::new(MemStorage::new()));
3241 let mut state_machine = StateMachineBuilder::new_stub()
3242 .installer(StubInstaller { should_fail: true })
3243 .metrics_reporter(MockMetricsReporter::new())
3244 .storage(Rc::clone(&storage))
3245 .build()
3246 .await;
3247
3248 state_machine
3249 .report_attempts_to_successful_check(true)
3250 .await;
3251
3252 assert_eq!(
3255 state_machine.context.state.consecutive_failed_update_checks,
3256 0
3257 );
3258 assert_eq!(
3259 state_machine.metrics_reporter.metrics,
3260 vec![Metrics::AttemptsToSuccessfulCheck(1)]
3261 );
3262
3263 state_machine
3264 .report_attempts_to_successful_check(false)
3265 .await;
3266 assert_eq!(
3267 state_machine.context.state.consecutive_failed_update_checks,
3268 1
3269 );
3270
3271 state_machine
3272 .report_attempts_to_successful_check(false)
3273 .await;
3274 assert_eq!(
3275 state_machine.context.state.consecutive_failed_update_checks,
3276 2
3277 );
3278
3279 state_machine
3282 .report_attempts_to_successful_check(true)
3283 .await;
3284 assert_eq!(
3285 state_machine.context.state.consecutive_failed_update_checks,
3286 0
3287 );
3288 assert_eq!(
3289 state_machine.metrics_reporter.metrics,
3290 vec![
3291 Metrics::AttemptsToSuccessfulCheck(1),
3292 Metrics::AttemptsToSuccessfulCheck(3)
3293 ]
3294 );
3295 });
3296 }
3297
3298 #[test]
3299 fn test_ping_omaha_updates_consecutive_failed_update_checks_and_persists() {
3300 block_on(async {
3301 let mut http = MockHttpRequest::empty();
3302 http.add_error(http_request::mock_errors::make_transport_error());
3303 http.add_response(HttpResponse::new(vec![]));
3304 let response = json!({"response":{
3305 "server": "prod",
3306 "protocol": "3.0",
3307 "app": [{
3308 "appid": "{00000000-0000-0000-0000-000000000001}",
3309 "status": "ok",
3310 }],
3311 }});
3312 let response = serde_json::to_vec(&response).unwrap();
3313 http.add_response(HttpResponse::new(response));
3314
3315 let storage = Rc::new(Mutex::new(MemStorage::new()));
3316
3317 {
3319 let mut storage = storage.lock().await;
3320 let _ = storage.set_int(CONSECUTIVE_FAILED_UPDATE_CHECKS, 1);
3321 let _ = storage.commit();
3322 }
3323
3324 let mut state_machine = StateMachineBuilder::new_stub()
3325 .storage(Rc::clone(&storage))
3326 .http(http)
3327 .build()
3328 .await;
3329
3330 async_generator::generate(move |mut co| async move {
3331 state_machine.ping_omaha(&mut co).await;
3334 assert_eq!(
3335 state_machine.context.state.consecutive_failed_update_checks,
3336 2
3337 );
3338 {
3339 let storage = storage.lock().await;
3340 assert_eq!(
3341 storage.get_int(CONSECUTIVE_FAILED_UPDATE_CHECKS).await,
3342 Some(2)
3343 );
3344 }
3345
3346 state_machine.ping_omaha(&mut co).await;
3347 assert_eq!(
3348 state_machine.context.state.consecutive_failed_update_checks,
3349 3
3350 );
3351 {
3352 let storage = storage.lock().await;
3353 assert_eq!(
3354 storage.get_int(CONSECUTIVE_FAILED_UPDATE_CHECKS).await,
3355 Some(3)
3356 );
3357 }
3358
3359 state_machine.ping_omaha(&mut co).await;
3361 assert_eq!(
3362 state_machine.context.state.consecutive_failed_update_checks,
3363 0
3364 );
3365 {
3366 let storage = storage.lock().await;
3367 assert_eq!(
3368 storage.get_int(CONSECUTIVE_FAILED_UPDATE_CHECKS).await,
3369 None
3370 );
3371 }
3372 })
3373 .into_complete()
3374 .await;
3375 });
3376 }
3377
3378 #[test]
3379 fn test_report_attempts_to_successful_install() {
3380 block_on(async {
3381 let http = MockHttpRequest::new(make_update_available_response());
3382 let storage = Rc::new(Mutex::new(MemStorage::new()));
3383
3384 let mock_time = MockTimeSource::new_from_now();
3385
3386 let mut state_machine = StateMachineBuilder::new_stub()
3387 .http(http)
3388 .installer(TestInstaller::builder(mock_time.clone()).build())
3389 .policy_engine(StubPolicyEngine::new(mock_time.clone()))
3390 .metrics_reporter(MockMetricsReporter::new())
3391 .storage(Rc::clone(&storage))
3392 .build()
3393 .await;
3394
3395 state_machine.run_once().await;
3396
3397 #[rustfmt::skip]
3400 assert_matches!(
3401 state_machine.metrics_reporter.metrics.as_slice(),
3402 [
3403 Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
3404 Metrics::RequestsPerCheck { count: 1, successful: true },
3405 Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadStarted, event_result: EventResult::Success, .. }),
3406 Metrics::SuccessfulUpdateDuration(_),
3407 Metrics::OmahaEventLost(Event { event_type: EventType::UpdateDownloadFinished, event_result: EventResult::Success, .. }),
3408 Metrics::OmahaEventLost(Event { event_type: EventType::UpdateComplete, event_result: EventResult::Success, .. }),
3409 Metrics::SuccessfulUpdateFromFirstSeen(_),
3410 Metrics::AttemptsToSuccessfulCheck(1),
3411 Metrics::AttemptsToSuccessfulInstall { count: 1, successful: true },
3412 ]
3413 );
3414 });
3415 }
3416
3417 #[test]
3418 fn test_report_attempts_to_successful_install_fails_then_succeeds() {
3419 block_on(async {
3420 let mut http = MockHttpRequest::new(make_update_available_response());
3421 http.add_response(HttpResponse::new(vec![]));
3425 http.add_response(HttpResponse::new(vec![]));
3426
3427 http.add_response(make_update_available_response());
3429 http.add_response(HttpResponse::new(vec![]));
3432 http.add_response(HttpResponse::new(vec![]));
3433
3434 let storage = Rc::new(Mutex::new(MemStorage::new()));
3435 let mock_time = MockTimeSource::new_from_now();
3436
3437 let mut state_machine = StateMachineBuilder::new_stub()
3438 .http(http)
3439 .installer(
3440 TestInstaller::builder(mock_time.clone())
3441 .add_install_fail()
3442 .build(),
3443 )
3444 .policy_engine(StubPolicyEngine::new(mock_time.clone()))
3445 .metrics_reporter(MockMetricsReporter::new())
3446 .storage(Rc::clone(&storage))
3447 .build()
3448 .await;
3449
3450 state_machine.run_once().await;
3451 state_machine.run_once().await;
3452
3453 #[rustfmt::skip]
3456 assert_matches!(
3457 state_machine.metrics_reporter.metrics.as_slice(),
3458 [
3459 Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
3460 Metrics::RequestsPerCheck { count: 1, successful: true },
3461 Metrics::FailedUpdateDuration(_),
3462 Metrics::AttemptsToSuccessfulCheck(1),
3463 Metrics::AttemptsToSuccessfulInstall { count: 1, successful: false },
3464 Metrics::UpdateCheckInterval { .. },
3465 Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
3466 Metrics::RequestsPerCheck { count: 1, successful: true },
3467 Metrics::SuccessfulUpdateDuration(_),
3468 Metrics::OmahaEventLost(Event { .. }),
3469 Metrics::SuccessfulUpdateFromFirstSeen(_),
3470 Metrics::AttemptsToSuccessfulCheck(1),
3471 Metrics::AttemptsToSuccessfulInstall { count: 2, successful: true }
3472 ]
3473 );
3474 });
3475 }
3476
3477 #[test]
3478 fn test_report_attempts_to_successful_install_does_not_report_for_no_update() {
3479 block_on(async {
3480 let response = json!({"response":{
3481 "server": "prod",
3482 "protocol": "3.0",
3483 "app": [{
3484 "appid": "{00000000-0000-0000-0000-000000000001}",
3485 "status": "ok",
3486 "updatecheck": {
3487 "status": "noupdate",
3488 "info": "no update for you"
3489 }
3490 }],
3491 }});
3492 let response = serde_json::to_vec(&response).unwrap();
3493 let http = MockHttpRequest::new(HttpResponse::new(response.clone()));
3494
3495 let storage = Rc::new(Mutex::new(MemStorage::new()));
3496 let mock_time = MockTimeSource::new_from_now();
3497
3498 let mut state_machine = StateMachineBuilder::new_stub()
3499 .http(http)
3500 .installer(TestInstaller::builder(mock_time.clone()).build())
3501 .policy_engine(StubPolicyEngine::new(mock_time.clone()))
3502 .metrics_reporter(MockMetricsReporter::new())
3503 .storage(Rc::clone(&storage))
3504 .build()
3505 .await;
3506
3507 state_machine.run_once().await;
3508
3509 #[rustfmt::skip]
3512 assert_matches!(
3513 state_machine.metrics_reporter.metrics.as_slice(),
3514 [
3515 Metrics::UpdateCheckResponseTime { response_time: _, successful: true },
3516 Metrics::RequestsPerCheck { count: 1, successful: true },
3517 Metrics::AttemptsToSuccessfulCheck(1),
3518 ]
3519 );
3520 });
3521 }
3522
3523 #[test]
3524 fn test_successful_update_triggers_reboot() {
3525 let mut pool = LocalPool::new();
3526 let spawner = pool.spawner();
3527
3528 let http = MockHttpRequest::new(make_update_available_response());
3529 let mock_time = MockTimeSource::new_from_now();
3530 let next_update_time = mock_time.now();
3531 let (timer, mut timers) = BlockingTimer::new();
3532
3533 let installer = TestInstaller::builder(mock_time.clone()).build();
3534 let reboot_called = Rc::clone(&installer.reboot_called);
3535 let (_ctl, state_machine) = pool.run_until(
3536 StateMachineBuilder::new_stub()
3537 .http(http)
3538 .installer(installer)
3539 .policy_engine(StubPolicyEngine::new(mock_time))
3540 .timer(timer)
3541 .start(),
3542 );
3543 let observer = TestObserver::default();
3544 spawner
3545 .spawn_local(observer.observe(state_machine))
3546 .unwrap();
3547
3548 let blocked_timer = pool.run_until(timers.next()).unwrap();
3549 assert_eq!(
3550 blocked_timer.requested_wait(),
3551 RequestedWait::Until(next_update_time.into())
3552 );
3553 blocked_timer.unblock();
3554 pool.run_until_stalled();
3555
3556 assert!(*reboot_called.borrow());
3557 }
3558
3559 #[test]
3560 fn test_skip_reboot_if_not_needed() {
3561 let mut pool = LocalPool::new();
3562 let spawner = pool.spawner();
3563
3564 let http = MockHttpRequest::new(make_update_available_response());
3565 let mock_time = MockTimeSource::new_from_now();
3566 let next_update_time = mock_time.now();
3567 let reboot_check_options_received = Rc::new(RefCell::new(vec![]));
3568 let policy_engine = MockPolicyEngine {
3569 reboot_check_options_received: Rc::clone(&reboot_check_options_received),
3570 check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
3571 time_source: mock_time.clone(),
3572 reboot_needed: Rc::new(RefCell::new(false)),
3573 ..MockPolicyEngine::default()
3574 };
3575 let (timer, mut timers) = BlockingTimer::new();
3576
3577 let installer = TestInstaller::builder(mock_time).build();
3578 let reboot_called = Rc::clone(&installer.reboot_called);
3579 let (_ctl, state_machine) = pool.run_until(
3580 StateMachineBuilder::new_stub()
3581 .http(http)
3582 .installer(installer)
3583 .policy_engine(policy_engine)
3584 .timer(timer)
3585 .start(),
3586 );
3587 let observer = TestObserver::default();
3588 spawner
3589 .spawn_local(observer.observe(state_machine))
3590 .unwrap();
3591
3592 let blocked_timer = pool.run_until(timers.next()).unwrap();
3593 assert_eq!(
3594 blocked_timer.requested_wait(),
3595 RequestedWait::Until(next_update_time.into())
3596 );
3597 blocked_timer.unblock();
3598 pool.run_until_stalled();
3599
3600 assert_eq!(
3601 observer.take_states(),
3602 vec![
3603 State::CheckingForUpdates(InstallSource::ScheduledTask),
3604 State::InstallingUpdate,
3605 State::Idle
3606 ]
3607 );
3608
3609 assert_eq!(*reboot_check_options_received.borrow(), vec![]);
3610 assert!(!*reboot_called.borrow());
3611 }
3612
3613 #[test]
3614 fn test_failed_update_does_not_trigger_reboot() {
3615 let mut pool = LocalPool::new();
3616 let spawner = pool.spawner();
3617
3618 let http = MockHttpRequest::new(make_update_available_response());
3619 let mock_time = MockTimeSource::new_from_now();
3620 let next_update_time = mock_time.now();
3621 let (timer, mut timers) = BlockingTimer::new();
3622
3623 let installer = TestInstaller::builder(mock_time.clone())
3624 .add_install_fail()
3625 .build();
3626 let reboot_called = Rc::clone(&installer.reboot_called);
3627 let (_ctl, state_machine) = pool.run_until(
3628 StateMachineBuilder::new_stub()
3629 .http(http)
3630 .installer(installer)
3631 .policy_engine(StubPolicyEngine::new(mock_time))
3632 .timer(timer)
3633 .start(),
3634 );
3635 let observer = TestObserver::default();
3636 spawner
3637 .spawn_local(observer.observe(state_machine))
3638 .unwrap();
3639
3640 let blocked_timer = pool.run_until(timers.next()).unwrap();
3641 assert_eq!(
3642 blocked_timer.requested_wait(),
3643 RequestedWait::Until(next_update_time.into())
3644 );
3645 blocked_timer.unblock();
3646 pool.run_until_stalled();
3647
3648 assert!(!*reboot_called.borrow());
3649 }
3650
3651 #[test]
3655 fn test_reboots_immediately_if_user_initiated_update_requests_occurs_during_install() {
3656 let mut pool = LocalPool::new();
3657 let spawner = pool.spawner();
3658
3659 let http = MockHttpRequest::new(make_update_available_response());
3660 let mock_time = MockTimeSource::new_from_now();
3661
3662 let (send_install, mut recv_install) = mpsc::channel(0);
3663 let (send_reboot, mut recv_reboot) = mpsc::channel(0);
3664 let reboot_check_options_received = Rc::new(RefCell::new(vec![]));
3665 let policy_engine = MockPolicyEngine {
3666 reboot_check_options_received: Rc::clone(&reboot_check_options_received),
3667 check_timing: Some(CheckTiming::builder().time(mock_time.now()).build()),
3668 ..MockPolicyEngine::default()
3669 };
3670
3671 let (mut ctl, state_machine) = pool.run_until(
3672 StateMachineBuilder::new_stub()
3673 .http(http)
3674 .installer(BlockingInstaller {
3675 on_install: send_install,
3676 on_reboot: Some(send_reboot),
3677 })
3678 .policy_engine(policy_engine)
3679 .start(),
3680 );
3681
3682 let observer = TestObserver::default();
3683 spawner
3684 .spawn_local(observer.observe(state_machine))
3685 .unwrap();
3686
3687 let unblock_install = pool.run_until(recv_install.next()).unwrap();
3688 pool.run_until_stalled();
3689 assert_eq!(
3690 observer.take_states(),
3691 vec![
3692 State::CheckingForUpdates(InstallSource::ScheduledTask),
3693 State::InstallingUpdate
3694 ]
3695 );
3696
3697 pool.run_until(async {
3698 assert_eq!(
3699 ctl.start_update_check(CheckOptions {
3700 source: InstallSource::OnDemand
3701 })
3702 .await,
3703 Ok(StartUpdateCheckResponse::AlreadyRunning)
3704 );
3705 });
3706
3707 pool.run_until_stalled();
3708 assert_eq!(observer.take_states(), vec![]);
3709
3710 unblock_install
3711 .send(vec![AppInstallResult::Installed])
3712 .unwrap();
3713 pool.run_until_stalled();
3714 assert_eq!(observer.take_states(), vec![State::WaitingForReboot]);
3715
3716 let unblock_reboot = pool.run_until(recv_reboot.next()).unwrap();
3717 pool.run_until_stalled();
3718 unblock_reboot.send(Ok(())).unwrap();
3719
3720 assert_eq!(
3722 *reboot_check_options_received.borrow(),
3723 vec![CheckOptions {
3724 source: InstallSource::OnDemand
3725 }]
3726 );
3727 }
3728
3729 #[test]
3732 fn test_reboots_immediately_when_check_now_comes_in_during_wait() {
3733 let mut pool = LocalPool::new();
3734 let spawner = pool.spawner();
3735
3736 let mut http = MockHttpRequest::new(make_update_available_response());
3737 http.add_response(HttpResponse::new(vec![]));
3739 http.add_response(HttpResponse::new(vec![]));
3740 http.add_response(HttpResponse::new(vec![]));
3741 http.add_response(make_update_available_response());
3743 let mut mock_time = MockTimeSource::new_from_now();
3744 mock_time.truncate_submicrosecond_walltime();
3745 let next_update_time = mock_time.now() + Duration::from_secs(1000);
3746 let (timer, mut timers) = BlockingTimer::new();
3747 let reboot_allowed = Rc::new(RefCell::new(false));
3748 let reboot_check_options_received = Rc::new(RefCell::new(vec![]));
3749 let policy_engine = MockPolicyEngine {
3750 time_source: mock_time.clone(),
3751 reboot_allowed: Rc::clone(&reboot_allowed),
3752 check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
3753 reboot_check_options_received: Rc::clone(&reboot_check_options_received),
3754 ..MockPolicyEngine::default()
3755 };
3756 let installer = TestInstaller::builder(mock_time.clone()).build();
3757 let reboot_called = Rc::clone(&installer.reboot_called);
3758 let storage_ref = Rc::new(Mutex::new(MemStorage::new()));
3759 let apps = make_test_app_set();
3760
3761 let (mut ctl, state_machine) = pool.run_until(
3762 StateMachineBuilder::new_stub()
3763 .app_set(apps)
3764 .http(http)
3765 .installer(installer)
3766 .policy_engine(policy_engine)
3767 .timer(timer)
3768 .storage(Rc::clone(&storage_ref))
3769 .start(),
3770 );
3771
3772 let observer = TestObserver::default();
3773 spawner
3774 .spawn_local(observer.observe(state_machine))
3775 .unwrap();
3776
3777 let blocked_timer = pool.run_until(timers.next()).unwrap();
3779 assert_eq!(
3780 blocked_timer.requested_wait(),
3781 RequestedWait::Until(next_update_time.into())
3782 );
3783 blocked_timer.unblock();
3784 pool.run_until_stalled();
3785
3786 let blocked_timer1 = pool.run_until(timers.next()).unwrap();
3789 let blocked_timer2 = pool.run_until(timers.next()).unwrap();
3790 let (wait_for_reboot_timer, _wait_for_next_ping_timer) =
3791 match blocked_timer1.requested_wait() {
3792 RequestedWait::For(_) => (blocked_timer1, blocked_timer2),
3793 RequestedWait::Until(_) => (blocked_timer2, blocked_timer1),
3794 };
3795 assert_eq!(
3797 wait_for_reboot_timer.requested_wait(),
3798 RequestedWait::For(CHECK_REBOOT_ALLOWED_INTERVAL)
3799 );
3800
3801 assert!(!*reboot_called.borrow());
3804 *reboot_allowed.borrow_mut() = true;
3805 pool.run_until(async {
3806 assert_eq!(
3807 ctl.start_update_check(CheckOptions {
3808 source: InstallSource::OnDemand
3809 })
3810 .await,
3811 Ok(StartUpdateCheckResponse::AlreadyRunning)
3812 );
3813 });
3814 pool.run_until_stalled();
3815 assert!(*reboot_called.borrow());
3816
3817 assert_eq!(
3821 *reboot_check_options_received.borrow(),
3822 vec![
3823 CheckOptions {
3824 source: InstallSource::ScheduledTask
3825 },
3826 CheckOptions {
3827 source: InstallSource::OnDemand
3828 },
3829 ]
3830 );
3831 }
3832
3833 #[test]
3837 fn test_wait_for_reboot() {
3838 let mut pool = LocalPool::new();
3839 let spawner = pool.spawner();
3840
3841 let mut http = MockHttpRequest::new(make_update_available_response());
3842 http.add_response(HttpResponse::new(vec![]));
3844 http.add_response(HttpResponse::new(vec![]));
3845 http.add_response(HttpResponse::new(vec![]));
3846 http.add_response(make_update_available_response());
3848 let ping_request_viewer = MockHttpRequest::from_request_cell(http.get_request_cell());
3849 let mut mock_time = MockTimeSource::new_from_now();
3850 mock_time.truncate_submicrosecond_walltime();
3851 let next_update_time = mock_time.now() + Duration::from_secs(1000);
3852 let (timer, mut timers) = BlockingTimer::new();
3853 let reboot_allowed = Rc::new(RefCell::new(false));
3854 let policy_engine = MockPolicyEngine {
3855 time_source: mock_time.clone(),
3856 reboot_allowed: Rc::clone(&reboot_allowed),
3857 check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
3858 ..MockPolicyEngine::default()
3859 };
3860 let installer = TestInstaller::builder(mock_time.clone()).build();
3861 let reboot_called = Rc::clone(&installer.reboot_called);
3862 let storage_ref = Rc::new(Mutex::new(MemStorage::new()));
3863 let apps = make_test_app_set();
3864
3865 let (mut ctl, state_machine) = pool.run_until(
3866 StateMachineBuilder::new_stub()
3867 .app_set(apps.clone())
3868 .http(http)
3869 .installer(installer)
3870 .policy_engine(policy_engine)
3871 .timer(timer)
3872 .storage(Rc::clone(&storage_ref))
3873 .start(),
3874 );
3875
3876 let observer = TestObserver::default();
3877 spawner
3878 .spawn_local(observer.observe(state_machine))
3879 .unwrap();
3880
3881 let blocked_timer = pool.run_until(timers.next()).unwrap();
3883 assert_eq!(
3884 blocked_timer.requested_wait(),
3885 RequestedWait::Until(next_update_time.into())
3886 );
3887 blocked_timer.unblock();
3888 pool.run_until_stalled();
3889
3890 let blocked_timer1 = pool.run_until(timers.next()).unwrap();
3893 let blocked_timer2 = pool.run_until(timers.next()).unwrap();
3894 let (wait_for_reboot_timer, wait_for_next_ping_timer) =
3895 match blocked_timer1.requested_wait() {
3896 RequestedWait::For(_) => (blocked_timer1, blocked_timer2),
3897 RequestedWait::Until(_) => (blocked_timer2, blocked_timer1),
3898 };
3899 assert_eq!(
3901 wait_for_reboot_timer.requested_wait(),
3902 RequestedWait::For(CHECK_REBOOT_ALLOWED_INTERVAL)
3903 );
3904 assert_eq!(
3906 wait_for_next_ping_timer.requested_wait(),
3907 RequestedWait::Until(next_update_time.into())
3908 );
3909 mock_time.advance(Duration::from_secs(1000));
3911 wait_for_next_ping_timer.unblock();
3912 pool.run_until_stalled();
3913
3914 let config = crate::configuration::test_support::config_generator();
3916 let request_params = RequestParams::default();
3917
3918 let apps = pool.run_until(apps.lock()).get_apps();
3919 let mut expected_request_builder = RequestBuilder::new(&config, &request_params)
3920 .session_id(GUID::from_u128(5))
3924 .request_id(GUID::from_u128(6));
3925 for app in &apps {
3926 expected_request_builder = expected_request_builder.add_ping(app);
3927 }
3928 pool.run_until(assert_request(
3929 &ping_request_viewer,
3930 expected_request_builder,
3931 ));
3932
3933 pool.run_until(async {
3934 assert_eq!(
3935 ctl.start_update_check(CheckOptions::default()).await,
3936 Ok(StartUpdateCheckResponse::AlreadyRunning)
3937 );
3938 });
3939
3940 pool.run_until(async {
3942 let storage = storage_ref.lock().await;
3943 let context = update_check::Context::load(&*storage).await;
3944 assert_eq!(
3945 context.schedule.last_update_time,
3946 Some(mock_time.now_in_walltime().into())
3947 );
3948 });
3949
3950 let wait_for_next_ping_timer = pool.run_until(timers.next()).unwrap();
3952 assert_eq!(
3953 wait_for_next_ping_timer.requested_wait(),
3954 RequestedWait::Until(next_update_time.into())
3955 );
3956
3957 wait_for_reboot_timer.unblock();
3959 pool.run_until_stalled();
3960 assert!(!*reboot_called.borrow());
3961
3962 let wait_for_reboot_timer = pool.run_until(timers.next()).unwrap();
3964 assert_eq!(
3965 wait_for_reboot_timer.requested_wait(),
3966 RequestedWait::For(CHECK_REBOOT_ALLOWED_INTERVAL)
3967 );
3968
3969 wait_for_next_ping_timer.unblock();
3971 pool.run_until_stalled();
3972
3973 let mut expected_request_builder = RequestBuilder::new(&config, &request_params)
3975 .session_id(GUID::from_u128(7))
3976 .request_id(GUID::from_u128(8));
3977 for app in &apps {
3978 expected_request_builder = expected_request_builder.add_ping(app);
3979 }
3980 pool.run_until(assert_request(
3981 &ping_request_viewer,
3982 expected_request_builder,
3983 ));
3984
3985 assert!(!*reboot_called.borrow());
3986
3987 *reboot_called.borrow_mut() = true;
3989 wait_for_reboot_timer.unblock();
3990 pool.run_until_stalled();
3991 assert!(*reboot_called.borrow());
3992 }
3993
3994 #[derive(Debug)]
3995 struct BlockingInstaller {
3996 on_install: mpsc::Sender<oneshot::Sender<Vec<AppInstallResult<StubInstallErrors>>>>,
3997 on_reboot: Option<mpsc::Sender<oneshot::Sender<Result<(), anyhow::Error>>>>,
3998 }
3999
4000 impl Installer for BlockingInstaller {
4001 type InstallPlan = StubPlan;
4002 type Error = StubInstallErrors;
4003 type InstallResult = ();
4004
4005 fn perform_install(
4006 &mut self,
4007 _install_plan: &StubPlan,
4008 _observer: Option<&dyn ProgressObserver>,
4009 ) -> LocalBoxFuture<'_, (Self::InstallResult, Vec<AppInstallResult<Self::Error>>)> {
4010 let (send, recv) = oneshot::channel();
4011 let send_fut = self.on_install.send(send);
4012
4013 async move {
4014 send_fut.await.unwrap();
4015 ((), recv.await.unwrap())
4016 }
4017 .boxed_local()
4018 }
4019
4020 fn perform_reboot(&mut self) -> LocalBoxFuture<'_, Result<(), anyhow::Error>> {
4021 match &mut self.on_reboot {
4022 Some(on_reboot) => {
4023 let (send, recv) = oneshot::channel();
4024 let send_fut = on_reboot.send(send);
4025
4026 async move {
4027 send_fut.await.unwrap();
4028 recv.await.unwrap()
4029 }
4030 .boxed_local()
4031 }
4032 None => future::ready(Ok(())).boxed_local(),
4033 }
4034 }
4035
4036 fn try_create_install_plan<'a>(
4037 &'a self,
4038 _request_params: &'a RequestParams,
4039 _request_metadata: Option<&'a RequestMetadata>,
4040 _response: &'a Response,
4041 _response_bytes: Vec<u8>,
4042 _ecdsa_signature: Option<Vec<u8>>,
4043 ) -> LocalBoxFuture<'a, Result<Self::InstallPlan, Self::Error>> {
4044 future::ready(Ok(StubPlan)).boxed_local()
4045 }
4046 }
4047
4048 #[derive(Debug, Default)]
4049 struct TestObserver {
4050 states: Rc<RefCell<Vec<State>>>,
4051 }
4052
4053 impl TestObserver {
4054 fn observe<T: Stream<Item = StateMachineEvent>>(
4055 &self,
4056 s: T,
4057 ) -> impl Future<Output = ()> + use<T> {
4058 let states = Rc::clone(&self.states);
4059 async move {
4060 futures::pin_mut!(s);
4061 while let Some(event) = s.next().await {
4062 if let StateMachineEvent::StateChange(state) = event {
4063 states.borrow_mut().push(state);
4064 }
4065 }
4066 }
4067 }
4068
4069 fn observe_until_terminal<T: Stream<Item = StateMachineEvent>>(
4070 &self,
4071 s: T,
4072 ) -> impl Future<Output = ()> + use<T> {
4073 let states = Rc::clone(&self.states);
4074 async move {
4075 futures::pin_mut!(s);
4076 while let Some(event) = s.next().await {
4077 if let StateMachineEvent::StateChange(state) = event {
4078 states.borrow_mut().push(state);
4079 match state {
4080 State::Idle | State::WaitingForReboot => return,
4081 _ => {}
4082 }
4083 }
4084 }
4085 }
4086 }
4087
4088 fn take_states(&self) -> Vec<State> {
4089 std::mem::take(&mut *self.states.borrow_mut())
4090 }
4091 }
4092
4093 #[test]
4094 fn test_start_update_during_update_replies_with_in_progress() {
4095 let mut pool = LocalPool::new();
4096 let spawner = pool.spawner();
4097
4098 let http = MockHttpRequest::new(make_update_available_response());
4099 let (send_install, mut recv_install) = mpsc::channel(0);
4100 let (mut ctl, state_machine) = pool.run_until(
4101 StateMachineBuilder::new_stub()
4102 .http(http)
4103 .installer(BlockingInstaller {
4104 on_install: send_install,
4105 on_reboot: None,
4106 })
4107 .start(),
4108 );
4109
4110 let observer = TestObserver::default();
4111 spawner
4112 .spawn_local(observer.observe_until_terminal(state_machine))
4113 .unwrap();
4114
4115 let unblock_install = pool.run_until(recv_install.next()).unwrap();
4116 pool.run_until_stalled();
4117 assert_eq!(
4118 observer.take_states(),
4119 vec![
4120 State::CheckingForUpdates(InstallSource::ScheduledTask),
4121 State::InstallingUpdate
4122 ]
4123 );
4124
4125 pool.run_until(async {
4126 assert_eq!(
4127 ctl.start_update_check(CheckOptions::default()).await,
4128 Ok(StartUpdateCheckResponse::AlreadyRunning)
4129 );
4130 });
4131 pool.run_until_stalled();
4132 assert_eq!(observer.take_states(), vec![]);
4133
4134 unblock_install
4135 .send(vec![AppInstallResult::Installed])
4136 .unwrap();
4137 pool.run_until_stalled();
4138
4139 assert_eq!(observer.take_states(), vec![State::WaitingForReboot]);
4140 }
4141
4142 #[test]
4143 fn test_start_update_during_timer_starts_update() {
4144 let mut pool = LocalPool::new();
4145 let spawner = pool.spawner();
4146
4147 let mut mock_time = MockTimeSource::new_from_now();
4148 let next_update_time = mock_time.now() + Duration::from_secs(321);
4149
4150 let (timer, mut timers) = BlockingTimer::new();
4151 let policy_engine = MockPolicyEngine {
4152 check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
4153 time_source: mock_time.clone(),
4154 ..MockPolicyEngine::default()
4155 };
4156 let (mut ctl, state_machine) = pool.run_until(
4157 StateMachineBuilder::new_stub()
4158 .policy_engine(policy_engine)
4159 .timer(timer)
4160 .start(),
4161 );
4162
4163 let observer = TestObserver::default();
4164 spawner
4165 .spawn_local(observer.observe(state_machine))
4166 .unwrap();
4167
4168 let blocked_timer = pool.run_until(timers.next()).unwrap();
4169 assert_eq!(
4170 blocked_timer.requested_wait(),
4171 RequestedWait::Until(next_update_time.into())
4172 );
4173 mock_time.advance(Duration::from_secs(200));
4174 assert_eq!(observer.take_states(), vec![]);
4175
4176 pool.run_until_stalled();
4178 assert_eq!(observer.take_states(), vec![]);
4179
4180 blocked_timer.unblock();
4181 let blocked_timer = pool.run_until(timers.next()).unwrap();
4182 assert_eq!(
4183 blocked_timer.requested_wait(),
4184 RequestedWait::Until(next_update_time.into())
4185 );
4186 assert_eq!(
4187 observer.take_states(),
4188 vec![
4189 State::CheckingForUpdates(InstallSource::ScheduledTask),
4190 State::ErrorCheckingForUpdate,
4191 State::Idle
4192 ]
4193 );
4194
4195 pool.run_until(async {
4197 assert_eq!(
4198 ctl.start_update_check(CheckOptions::default()).await,
4199 Ok(StartUpdateCheckResponse::Started)
4200 );
4201 });
4202 pool.run_until_stalled();
4203 assert_eq!(
4204 observer.take_states(),
4205 vec![
4206 State::CheckingForUpdates(InstallSource::ScheduledTask),
4207 State::ErrorCheckingForUpdate,
4208 State::Idle
4209 ]
4210 );
4211 }
4212
4213 #[test]
4214 fn test_start_update_check_returns_throttled() {
4215 let mut pool = LocalPool::new();
4216 let spawner = pool.spawner();
4217
4218 let mut mock_time = MockTimeSource::new_from_now();
4219 let next_update_time = mock_time.now() + Duration::from_secs(321);
4220
4221 let (timer, mut timers) = BlockingTimer::new();
4222 let policy_engine = MockPolicyEngine {
4223 check_timing: Some(CheckTiming::builder().time(next_update_time).build()),
4224 time_source: mock_time.clone(),
4225 check_decision: CheckDecision::ThrottledByPolicy,
4226 ..MockPolicyEngine::default()
4227 };
4228 let (mut ctl, state_machine) = pool.run_until(
4229 StateMachineBuilder::new_stub()
4230 .policy_engine(policy_engine)
4231 .timer(timer)
4232 .start(),
4233 );
4234
4235 let observer = TestObserver::default();
4236 spawner
4237 .spawn_local(observer.observe(state_machine))
4238 .unwrap();
4239
4240 let blocked_timer = pool.run_until(timers.next()).unwrap();
4241 assert_eq!(
4242 blocked_timer.requested_wait(),
4243 RequestedWait::Until(next_update_time.into())
4244 );
4245 mock_time.advance(Duration::from_secs(200));
4246 assert_eq!(observer.take_states(), vec![]);
4247
4248 pool.run_until(async {
4249 assert_eq!(
4250 ctl.start_update_check(CheckOptions::default()).await,
4251 Ok(StartUpdateCheckResponse::Throttled)
4252 );
4253 });
4254 pool.run_until_stalled();
4255 assert_eq!(observer.take_states(), vec![]);
4256 }
4257
4258 #[test]
4259 fn test_progress_observer() {
4260 block_on(async {
4261 let http = MockHttpRequest::new(make_update_available_response());
4262 let mock_time = MockTimeSource::new_from_now();
4263 let progresses = StateMachineBuilder::new_stub()
4264 .http(http)
4265 .installer(TestInstaller::builder(mock_time.clone()).build())
4266 .policy_engine(StubPolicyEngine::new(mock_time))
4267 .oneshot_check()
4268 .await
4269 .filter_map(|event| {
4270 future::ready(match event {
4271 StateMachineEvent::InstallProgressChange(InstallProgress { progress }) => {
4272 Some(progress)
4273 }
4274 _ => None,
4275 })
4276 })
4277 .collect::<Vec<f32>>()
4278 .await;
4279 assert_eq!(progresses, [0.0, 0.3, 0.9, 1.0]);
4280 });
4281 }
4282
4283 #[test]
4284 fn test_report_waited_for_reboot_duration_doesnt_panic_on_wrong_current_time() {
4288 block_on(async {
4289 let metrics_reporter = MockMetricsReporter::new();
4290
4291 let state_machine_start_monotonic = Instant::now();
4292 let update_finish_time = SystemTime::now();
4293
4294 let now_wall = update_finish_time + Duration::from_secs(1);
4298 let now_monotonic = state_machine_start_monotonic + Duration::from_secs(10);
4299
4300 let mut state_machine = StateMachineBuilder::new_stub()
4301 .metrics_reporter(metrics_reporter)
4302 .build()
4303 .await;
4304
4305 state_machine
4309 .report_waited_for_reboot_duration(
4310 update_finish_time,
4311 state_machine_start_monotonic,
4312 ComplexTime {
4313 wall: now_wall,
4314 mono: now_monotonic,
4315 },
4316 )
4317 .expect_err("should overflow and error out");
4318
4319 assert!(state_machine.metrics_reporter.metrics.is_empty());
4321 });
4322 }
4323
4324 #[test]
4325 fn test_report_waited_for_reboot_duration() {
4326 let mut pool = LocalPool::new();
4327 let spawner = pool.spawner();
4328
4329 let response = json!({"response": {
4330 "server": "prod",
4331 "protocol": "3.0",
4332 "app": [{
4333 "appid": "{00000000-0000-0000-0000-000000000001}",
4334 "status": "ok",
4335 "updatecheck": {
4336 "status": "ok",
4337 "manifest": {
4338 "version": "1.2.3.5",
4339 "actions": {
4340 "action": [],
4341 },
4342 "packages": {
4343 "package": [],
4344 },
4345 }
4346 }
4347 }],
4348 }});
4349 let response = serde_json::to_vec(&response).unwrap();
4350 let http = MockHttpRequest::new(HttpResponse::new(response));
4351 let mut mock_time = MockTimeSource::new_from_now();
4352 mock_time.truncate_submicrosecond_walltime();
4353 let storage = Rc::new(Mutex::new(MemStorage::new()));
4354
4355 assert_matches!(
4357 pool.run_until(
4358 StateMachineBuilder::new_stub()
4359 .http(http)
4360 .policy_engine(StubPolicyEngine::new(mock_time.clone()))
4361 .storage(Rc::clone(&storage))
4362 .oneshot(RequestParams::default())
4363 ),
4364 Ok(_)
4365 );
4366
4367 mock_time.advance(Duration::from_secs(999));
4368
4369 let config = Config {
4371 updater: Updater {
4372 name: "updater".to_string(),
4373 version: Version::from([0, 1]),
4374 },
4375 os: OS {
4376 version: "1.2.3.5".to_string(),
4377 ..OS::default()
4378 },
4379 service_url: "http://example.com/".to_string(),
4380 omaha_public_keys: None,
4381 };
4382 let metrics_reporter = Rc::new(RefCell::new(MockMetricsReporter::new()));
4383 let (_ctl, state_machine) = pool.run_until(
4384 StateMachineBuilder::new_stub()
4385 .config(config)
4386 .metrics_reporter(Rc::clone(&metrics_reporter))
4387 .policy_engine(StubPolicyEngine::new(mock_time.clone()))
4388 .storage(Rc::clone(&storage))
4389 .timer(MockTimer::new())
4390 .start(),
4391 );
4392
4393 let observer = TestObserver::default();
4395 spawner
4396 .spawn_local(observer.observe(state_machine))
4397 .unwrap();
4398 pool.run_until_stalled();
4399
4400 assert_eq!(
4401 metrics_reporter
4402 .borrow()
4403 .metrics
4404 .iter()
4405 .filter(|m| matches!(m, Metrics::WaitedForRebootDuration(_)))
4406 .collect::<Vec<_>>(),
4407 vec![&Metrics::WaitedForRebootDuration(Duration::from_secs(999))]
4408 );
4409
4410 pool.run_until(async {
4412 let storage = storage.lock().await;
4413 assert_eq!(storage.get_time(UPDATE_FINISH_TIME).await, None);
4414 assert_eq!(storage.get_string(TARGET_VERSION).await, None);
4415 assert!(storage.committed());
4416 })
4417 }
4418
4419 #[test]
4421 fn run_cup_but_decoration_error() {
4422 block_on(async {
4423 let http = MockHttpRequest::new(HttpResponse::new(make_noupdate_httpresponse()));
4424
4425 let stub_cup_handler = MockCupv2Handler::new().set_decoration_error(|| {
4426 Some(CupDecorationError::ParseError(
4427 "".parse::<http::Uri>().unwrap_err(),
4428 ))
4429 });
4430
4431 assert_matches!(
4432 StateMachineBuilder::new_stub()
4433 .http(http)
4434 .cup_handler(Some(stub_cup_handler))
4435 .oneshot(RequestParams::default())
4436 .await,
4437 Err(UpdateCheckError::OmahaRequest(
4438 OmahaRequestError::CupDecoration(CupDecorationError::ParseError(_))
4439 ))
4440 );
4441
4442 info!("update check complete!");
4443 });
4444 }
4445
4446 #[test]
4447 fn run_cup_but_verification_error() {
4448 block_on(async {
4449 let http = MockHttpRequest::new(HttpResponse::new(make_noupdate_httpresponse()));
4450
4451 let stub_cup_handler = MockCupv2Handler::new()
4452 .set_verification_error(|| Some(CupVerificationError::EtagHeaderMissing));
4453
4454 assert_matches!(
4455 StateMachineBuilder::new_stub()
4456 .http(http)
4457 .cup_handler(Some(stub_cup_handler))
4458 .oneshot(RequestParams::default())
4459 .await,
4460 Err(UpdateCheckError::OmahaRequest(
4461 OmahaRequestError::CupValidation(CupVerificationError::EtagHeaderMissing)
4462 ))
4463 );
4464
4465 info!("update check complete!");
4466 });
4467 }
4468
4469 #[test]
4470 fn run_cup_valid() {
4471 block_on(async {
4472 let http = MockHttpRequest::new(HttpResponse::new(make_noupdate_httpresponse()));
4473
4474 assert_matches!(
4475 StateMachineBuilder::new_stub()
4476 .http(http)
4477 .oneshot(RequestParams::default())
4479 .await,
4480 Ok(_)
4481 );
4482
4483 info!("update check complete!");
4484 });
4485 }
4486}