1#![deny(missing_docs)]
6
7pub mod state;
11pub use state::{
12 FailFetchData, FailStageData, FetchFailureReason, PrepareFailureReason, Progress,
13 StageFailureReason, State, StateId, UpdateInfo, UpdateInfoAndProgress,
14};
15
16pub mod options;
17pub use options::{Initiator, Options};
18
19use fidl::endpoints::{ClientEnd, ServerEnd};
20use fidl_fuchsia_update_installer::{
21 InstallerProxy, MonitorMarker, MonitorRequest, MonitorRequestStream, RebootControllerMarker,
22 UpdateNotStartedReason,
23};
24use futures::prelude::*;
25use futures::task::{Context, Poll};
26use log::info;
27use pin_project::pin_project;
28use std::fmt;
29use std::pin::Pin;
30use thiserror::Error;
31
32#[derive(Debug, Error)]
34pub enum UpdateAttemptError {
35 #[error("FIDL error")]
37 FIDL(#[source] fidl::Error),
38
39 #[error("an installation was already in progress")]
41 InstallInProgress,
42}
43
44#[derive(Debug, Error)]
46pub enum MonitorUpdateAttemptError {
47 #[error("FIDL error")]
49 FIDL(#[source] fidl::Error),
50
51 #[error("unable to decode State")]
53 DecodeState(#[source] state::DecodeStateError),
54}
55
56#[pin_project(project = UpdateAttemptProj)]
58#[derive(Debug)]
59pub struct UpdateAttempt {
60 attempt_id: String,
62
63 #[pin]
65 monitor: UpdateAttemptMonitor,
66}
67
68#[pin_project(project = UpdateAttemptMonitorProj)]
70pub struct UpdateAttemptMonitor {
71 #[pin]
73 stream: MonitorRequestStream,
74}
75
76impl UpdateAttempt {
77 pub fn attempt_id(&self) -> &str {
79 &self.attempt_id
80 }
81}
82
83impl UpdateAttemptMonitor {
84 fn new() -> Result<(ClientEnd<MonitorMarker>, Self), fidl::Error> {
85 let (monitor_client_end, stream) =
86 fidl::endpoints::create_request_stream::<MonitorMarker>();
87
88 Ok((monitor_client_end, Self { stream }))
89 }
90
91 pub fn from_stream(stream: MonitorRequestStream) -> Self {
93 Self { stream }
94 }
95}
96
97pub async fn start_update(
100 update_url: &http::Uri,
101 options: Options,
102 installer_proxy: &InstallerProxy,
103 reboot_controller_server_end: Option<ServerEnd<RebootControllerMarker>>,
104) -> Result<UpdateAttempt, UpdateAttemptError> {
105 let url = fidl_fuchsia_pkg::PackageUrl { url: update_url.to_string() };
106 let (monitor_client_end, monitor) =
107 UpdateAttemptMonitor::new().map_err(UpdateAttemptError::FIDL)?;
108
109 let attempt_id = installer_proxy
110 .start_update(&url, &options.into(), monitor_client_end, reboot_controller_server_end)
111 .await
112 .map_err(UpdateAttemptError::FIDL)?
113 .map_err(|reason| match reason {
114 UpdateNotStartedReason::AlreadyInProgress => UpdateAttemptError::InstallInProgress,
115 })?;
116
117 info!("Update started with attempt id: {}", attempt_id);
118 Ok(UpdateAttempt { attempt_id, monitor })
119}
120
121pub async fn monitor_update(
124 attempt_id: Option<&str>,
125 installer_proxy: &InstallerProxy,
126) -> Result<Option<UpdateAttemptMonitor>, fidl::Error> {
127 let (monitor_client_end, monitor) = UpdateAttemptMonitor::new()?;
128
129 let attached = installer_proxy.monitor_update(attempt_id, monitor_client_end).await?;
130
131 if attached { Ok(Some(monitor)) } else { Ok(None) }
132}
133
134impl fmt::Debug for UpdateAttemptMonitor {
135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136 f.debug_struct("UpdateAttemptMonitor").field("stream", &"MonitorRequestStream").finish()
137 }
138}
139
140impl Stream for UpdateAttemptMonitor {
141 type Item = Result<State, MonitorUpdateAttemptError>;
142
143 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
144 let UpdateAttemptMonitorProj { stream } = self.project();
145 let poll_res = match stream.poll_next(cx) {
146 Poll::Ready(None) => return Poll::Ready(None),
147 Poll::Ready(Some(res)) => res.map_err(MonitorUpdateAttemptError::FIDL)?,
148 Poll::Pending => return Poll::Pending,
149 };
150 let MonitorRequest::OnState { state, responder } = poll_res;
151 let _ = responder.send();
152 let state = state.try_into().map_err(MonitorUpdateAttemptError::DecodeState)?;
153 Poll::Ready(Some(Ok(state)))
154 }
155}
156
157impl Stream for UpdateAttempt {
158 type Item = Result<State, MonitorUpdateAttemptError>;
159
160 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
161 let UpdateAttemptProj { attempt_id: _, monitor } = self.project();
162 monitor.poll_next(cx)
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169 use assert_matches::assert_matches;
170 use fidl_fuchsia_update_installer::{
171 InstallationProgress, InstallerMarker, InstallerRequest, MonitorProxy,
172 };
173 use fuchsia_async as fasync;
174 use futures::stream::StreamExt;
175
176 const TEST_URL: &str = "fuchsia-pkg://fuchsia.com/update/0";
177
178 impl UpdateAttemptMonitor {
179 fn new_test() -> (TestAttempt, Self) {
182 let (monitor_client_end, monitor) = Self::new().unwrap();
183
184 (TestAttempt::new(monitor_client_end), monitor)
185 }
186 }
187
188 struct TestAttempt {
189 proxy: MonitorProxy,
190 }
191
192 impl TestAttempt {
193 fn new(monitor_client_end: ClientEnd<MonitorMarker>) -> Self {
196 let proxy = monitor_client_end.into_proxy();
197
198 Self { proxy }
199 }
200
201 async fn send_state_and_recv_ack(&mut self, state: State) {
202 self.send_raw_state_and_recv_ack(state.into()).await;
203 }
204
205 async fn send_raw_state_and_recv_ack(
206 &mut self,
207 state: fidl_fuchsia_update_installer::State,
208 ) {
209 let () = self.proxy.on_state(&state).await.unwrap();
210 }
211 }
212
213 #[fasync::run_singlethreaded(test)]
214 async fn update_attempt_monitor_forwards_and_acks_progress() {
215 let (mut send, monitor) = UpdateAttemptMonitor::new_test();
216
217 let expected_fetch_state = &State::Fetch(
218 UpdateInfoAndProgress::builder()
219 .info(UpdateInfo::builder().download_size(1000).build())
220 .progress(Progress::builder().fraction_completed(0.5).bytes_downloaded(500).build())
221 .build(),
222 );
223
224 let client_fut = async move {
225 assert_eq!(
226 monitor.try_collect::<Vec<State>>().await.unwrap(),
227 vec![State::Prepare, expected_fetch_state.clone()]
228 );
229 };
230
231 let server_fut = async move {
232 send.send_state_and_recv_ack(State::Prepare).await;
233 send.send_state_and_recv_ack(expected_fetch_state.clone()).await;
234 };
235
236 future::join(client_fut, server_fut).await;
237 }
238
239 #[fasync::run_singlethreaded(test)]
240 async fn update_attempt_monitor_rejects_invalid_state() {
241 let (mut send, mut monitor) = UpdateAttemptMonitor::new_test();
242
243 let client_fut = async move {
244 assert_matches!(
245 monitor.next().await.unwrap(),
246 Err(MonitorUpdateAttemptError::DecodeState(_))
247 );
248 assert_matches!(monitor.next().await, Some(Ok(State::Prepare)));
249 };
250
251 let server_fut = async move {
252 send.send_raw_state_and_recv_ack(fidl_fuchsia_update_installer::State::Fetch(
253 fidl_fuchsia_update_installer::FetchData {
254 info: Some(fidl_fuchsia_update_installer::UpdateInfo {
255 download_size: None,
256 ..Default::default()
257 }),
258 progress: Some(InstallationProgress {
259 fraction_completed: Some(2.0),
260 bytes_downloaded: None,
261 ..Default::default()
262 }),
263 ..Default::default()
264 },
265 ))
266 .await;
267
268 send.send_state_and_recv_ack(State::Prepare).await;
271 };
272
273 future::join(client_fut, server_fut).await;
274 }
275
276 #[fasync::run_singlethreaded(test)]
277 async fn start_update_forwards_args_and_returns_attempt_id() {
278 let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
279
280 let opts = Options {
281 initiator: Initiator::User,
282 allow_attach_to_existing_attempt: false,
283 should_write_recovery: true,
284 };
285
286 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
287
288 let (_reboot_controller, reboot_controller_server_end) =
289 fidl::endpoints::create_proxy::<RebootControllerMarker>();
290
291 let installer_fut = async move {
292 let returned_update_attempt =
293 start_update(&pkgurl, opts, &proxy, Some(reboot_controller_server_end))
294 .await
295 .unwrap();
296 assert_eq!(
297 returned_update_attempt.attempt_id(),
298 "00000000-0000-0000-0000-000000000001"
299 );
300 };
301
302 let stream_fut = async move {
303 match stream.next().await.unwrap() {
304 Ok(InstallerRequest::StartUpdate {
305 url,
306 options:
307 fidl_fuchsia_update_installer::Options {
308 initiator,
309 should_write_recovery,
310 allow_attach_to_existing_attempt,
311 ..
312 },
313 monitor: _,
314 reboot_controller,
315 responder,
316 }) => {
317 assert_eq!(url.url, TEST_URL);
318 assert_eq!(initiator, Some(fidl_fuchsia_update_installer::Initiator::User));
319 assert_matches!(reboot_controller, Some(_));
320 assert_eq!(should_write_recovery, Some(true));
321 assert_eq!(allow_attach_to_existing_attempt, Some(false));
322 responder.send(Ok("00000000-0000-0000-0000-000000000001")).unwrap();
323 }
324 request => panic!("Unexpected request: {request:?}"),
325 }
326 };
327 future::join(installer_fut, stream_fut).await;
328 }
329
330 #[fasync::run_singlethreaded(test)]
331 async fn test_install_error() {
332 let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
333
334 let opts = Options {
335 initiator: Initiator::User,
336 allow_attach_to_existing_attempt: false,
337 should_write_recovery: true,
338 };
339
340 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
341
342 let (_reboot_controller, reboot_controller_server_end) =
343 fidl::endpoints::create_proxy::<RebootControllerMarker>();
344
345 let installer_fut = async move {
346 let returned_update_attempt =
347 start_update(&pkgurl, opts, &proxy, Some(reboot_controller_server_end))
348 .await
349 .unwrap();
350
351 assert_eq!(
352 returned_update_attempt.try_collect::<Vec<State>>().await.unwrap(),
353 vec![State::FailPrepare(PrepareFailureReason::Internal)]
354 );
355 };
356
357 let stream_fut = async move {
358 match stream.next().await.unwrap() {
359 Ok(InstallerRequest::StartUpdate { monitor, responder, .. }) => {
360 responder.send(Ok("00000000-0000-0000-0000-000000000002")).unwrap();
361
362 let mut attempt = TestAttempt::new(monitor);
363 attempt
364 .send_state_and_recv_ack(State::FailPrepare(PrepareFailureReason::Internal))
365 .await;
366 }
367 request => panic!("Unexpected request: {request:?}"),
368 }
369 };
370 future::join(installer_fut, stream_fut).await;
371 }
372
373 #[fasync::run_singlethreaded(test)]
374 async fn start_update_forwards_fidl_error() {
375 let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
376
377 let opts = Options {
378 initiator: Initiator::User,
379 allow_attach_to_existing_attempt: false,
380 should_write_recovery: true,
381 };
382
383 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
384
385 let installer_fut = async move {
386 match start_update(&pkgurl, opts, &proxy, None).await {
387 Err(UpdateAttemptError::FIDL(_)) => {} _ => panic!("Unexpected result"),
389 }
390 };
391 let stream_fut = async move {
392 match stream.next().await.unwrap() {
393 Ok(InstallerRequest::StartUpdate { .. }) => {
394 }
396 request => panic!("Unexpected request: {request:?}"),
397 }
398 };
399 future::join(installer_fut, stream_fut).await;
400 }
401
402 #[fasync::run_singlethreaded(test)]
403 async fn test_state_decode_error() {
404 let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
405
406 let opts = Options {
407 initiator: Initiator::User,
408 allow_attach_to_existing_attempt: false,
409 should_write_recovery: true,
410 };
411
412 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
413
414 let (_reboot_controller, reboot_controller_server_end) =
415 fidl::endpoints::create_proxy::<RebootControllerMarker>();
416
417 let installer_fut = async move {
418 let mut returned_update_attempt =
419 start_update(&pkgurl, opts, &proxy, Some(reboot_controller_server_end))
420 .await
421 .unwrap();
422 assert_matches!(
423 returned_update_attempt.next().await,
424 Some(Err(MonitorUpdateAttemptError::DecodeState(
425 state::DecodeStateError::DecodeProgress(
426 state::DecodeProgressError::FractionCompletedOutOfRange
427 )
428 )))
429 );
430 };
431
432 let stream_fut = async move {
433 match stream.next().await.unwrap() {
434 Ok(InstallerRequest::StartUpdate { monitor, responder, .. }) => {
435 responder.send(Ok("00000000-0000-0000-0000-000000000002")).unwrap();
436
437 let mut monitor = TestAttempt::new(monitor);
438 monitor
439 .send_raw_state_and_recv_ack(fidl_fuchsia_update_installer::State::Fetch(
440 fidl_fuchsia_update_installer::FetchData {
441 info: Some(fidl_fuchsia_update_installer::UpdateInfo {
442 download_size: None,
443 ..Default::default()
444 }),
445 progress: Some(InstallationProgress {
446 fraction_completed: Some(2.0),
447 bytes_downloaded: None,
448 ..Default::default()
449 }),
450 ..Default::default()
451 },
452 ))
453 .await;
454 }
455 request => panic!("Unexpected request: {request:?}"),
456 }
457 };
458 future::join(installer_fut, stream_fut).await;
459 }
460
461 #[fasync::run_singlethreaded(test)]
462 async fn test_server_close_unexpectedly() {
463 let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
464
465 let opts = Options {
466 initiator: Initiator::User,
467 allow_attach_to_existing_attempt: false,
468 should_write_recovery: true,
469 };
470
471 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
472
473 let (_reboot_controller, reboot_controller_server_end) =
474 fidl::endpoints::create_proxy::<RebootControllerMarker>();
475
476 let expected_states = vec![
477 State::Prepare,
478 State::Fetch(
479 UpdateInfoAndProgress::builder()
480 .info(UpdateInfo::builder().download_size(0).build())
481 .progress(
482 Progress::builder().fraction_completed(0.0).bytes_downloaded(0).build(),
483 )
484 .build(),
485 ),
486 ];
487
488 let installer_fut = async move {
489 let returned_update_attempt =
490 start_update(&pkgurl, opts, &proxy, Some(reboot_controller_server_end))
491 .await
492 .unwrap();
493
494 assert_eq!(
495 returned_update_attempt.try_collect::<Vec<State>>().await.unwrap(),
496 expected_states,
497 );
498 };
499 let stream_fut = async move {
500 match stream.next().await.unwrap() {
501 Ok(InstallerRequest::StartUpdate { monitor, responder, .. }) => {
502 responder.send(Ok("00000000-0000-0000-0000-000000000003")).unwrap();
503
504 let mut monitor = TestAttempt::new(monitor);
505 monitor.send_state_and_recv_ack(State::Prepare).await;
506 monitor
507 .send_raw_state_and_recv_ack(fidl_fuchsia_update_installer::State::Fetch(
508 fidl_fuchsia_update_installer::FetchData {
509 info: Some(fidl_fuchsia_update_installer::UpdateInfo {
510 download_size: None,
511 ..Default::default()
512 }),
513 progress: Some(InstallationProgress {
514 fraction_completed: Some(0.0),
515 bytes_downloaded: None,
516 ..Default::default()
517 }),
518 ..Default::default()
519 },
520 ))
521 .await;
522
523 }
527 request => panic!("Unexpected request: {request:?}"),
528 }
529 };
530 future::join(installer_fut, stream_fut).await;
531 }
532
533 #[fasync::run_singlethreaded(test)]
534 async fn monitor_update_uses_provided_attempt_id() {
535 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
536
537 let client_fut = async move {
538 let _ = monitor_update(Some("id"), &proxy).await;
539 };
540
541 let server_fut = async move {
542 match stream.next().await.unwrap().unwrap() {
543 InstallerRequest::MonitorUpdate { attempt_id, .. } => {
544 assert_eq!(attempt_id.as_deref(), Some("id"));
545 }
546 request => panic!("Unexpected request: {request:?}"),
547 }
548 };
549
550 future::join(client_fut, server_fut).await;
551 }
552
553 #[fasync::run_singlethreaded(test)]
554 async fn monitor_update_handles_no_update_in_progress() {
555 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
556
557 let client_fut = async move {
558 assert_matches!(monitor_update(None, &proxy).await, Ok(None));
559 };
560
561 let server_fut = async move {
562 match stream.next().await.unwrap().unwrap() {
563 InstallerRequest::MonitorUpdate { attempt_id, monitor, responder } => {
564 assert_eq!(attempt_id, None);
565 drop(monitor);
566 responder.send(false).unwrap();
567 }
568 request => panic!("Unexpected request: {request:?}"),
569 }
570 assert_matches!(stream.next().await, None);
571 };
572
573 future::join(client_fut, server_fut).await;
574 }
575
576 #[fasync::run_singlethreaded(test)]
577 async fn monitor_update_forwards_fidl_error() {
578 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
579
580 let client_fut = async move {
581 assert_matches!(monitor_update(None, &proxy).await, Err(_));
582 };
583 let server_fut = async move {
584 match stream.next().await.unwrap() {
585 Ok(InstallerRequest::MonitorUpdate { .. }) => {
586 }
588 request => panic!("Unexpected request: {request:?}"),
589 }
590 };
591 future::join(client_fut, server_fut).await;
592 }
593
594 #[fasync::run_singlethreaded(test)]
595 async fn monitor_update_forwards_and_acks_progress() {
596 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
597
598 let client_fut = async move {
599 let monitor = monitor_update(None, &proxy).await.unwrap().unwrap();
600
601 assert_eq!(
602 monitor.try_collect::<Vec<State>>().await.unwrap(),
603 vec![State::Prepare, State::FailPrepare(PrepareFailureReason::Internal)]
604 );
605 };
606
607 let server_fut = async move {
608 match stream.next().await.unwrap().unwrap() {
609 InstallerRequest::MonitorUpdate { attempt_id, monitor, responder } => {
610 assert_eq!(attempt_id, None);
611 responder.send(true).unwrap();
612 let mut monitor = TestAttempt::new(monitor);
613
614 monitor.send_state_and_recv_ack(State::Prepare).await;
615 monitor
616 .send_state_and_recv_ack(State::FailPrepare(PrepareFailureReason::Internal))
617 .await;
618 }
619 request => panic!("Unexpected request: {request:?}"),
620 }
621 assert_matches!(stream.next().await, None);
622 };
623
624 future::join(client_fut, server_fut).await;
625 }
626}