1#![deny(missing_docs)]
6
7pub mod state;
11pub use state::{
12 FailFetchData, FailStageData, FetchFailureReason, PrepareFailureReason, Progress,
13 StageFailureReason, State, StateId, UpdateInfo, UpdateInfoAndProgress,
14};
15
16pub mod options;
17pub use options::{Initiator, Options};
18
19use fidl::endpoints::{ClientEnd, ServerEnd};
20use fidl_fuchsia_update_installer::{
21 InstallerProxy, MonitorMarker, MonitorRequest, MonitorRequestStream, RebootControllerMarker,
22 UpdateNotStartedReason,
23};
24use fuchsia_url::AbsolutePackageUrl;
25use futures::prelude::*;
26use futures::task::{Context, Poll};
27use log::info;
28use pin_project::pin_project;
29use std::fmt;
30use std::pin::Pin;
31use thiserror::Error;
32
33#[derive(Debug, Error)]
35pub enum UpdateAttemptError {
36 #[error("FIDL error")]
38 FIDL(#[source] fidl::Error),
39
40 #[error("an installation was already in progress")]
42 InstallInProgress,
43}
44
45#[derive(Debug, Error)]
47pub enum MonitorUpdateAttemptError {
48 #[error("FIDL error")]
50 FIDL(#[source] fidl::Error),
51
52 #[error("unable to decode State")]
54 DecodeState(#[source] state::DecodeStateError),
55}
56
57#[pin_project(project = UpdateAttemptProj)]
59#[derive(Debug)]
60pub struct UpdateAttempt {
61 attempt_id: String,
63
64 #[pin]
66 monitor: UpdateAttemptMonitor,
67}
68
69#[pin_project(project = UpdateAttemptMonitorProj)]
71pub struct UpdateAttemptMonitor {
72 #[pin]
74 stream: MonitorRequestStream,
75}
76
77impl UpdateAttempt {
78 pub fn attempt_id(&self) -> &str {
80 &self.attempt_id
81 }
82}
83
84impl UpdateAttemptMonitor {
85 fn new() -> Result<(ClientEnd<MonitorMarker>, Self), fidl::Error> {
86 let (monitor_client_end, stream) =
87 fidl::endpoints::create_request_stream::<MonitorMarker>();
88
89 Ok((monitor_client_end, Self { stream }))
90 }
91
92 pub fn from_stream(stream: MonitorRequestStream) -> Self {
94 Self { stream }
95 }
96}
97
98pub async fn start_update(
101 update_url: &AbsolutePackageUrl,
102 options: Options,
103 installer_proxy: &InstallerProxy,
104 reboot_controller_server_end: Option<ServerEnd<RebootControllerMarker>>,
105) -> Result<UpdateAttempt, UpdateAttemptError> {
106 let url = fidl_fuchsia_pkg::PackageUrl { url: update_url.to_string() };
107 let (monitor_client_end, monitor) =
108 UpdateAttemptMonitor::new().map_err(UpdateAttemptError::FIDL)?;
109
110 let attempt_id = installer_proxy
111 .start_update(&url, &options.into(), monitor_client_end, reboot_controller_server_end)
112 .await
113 .map_err(UpdateAttemptError::FIDL)?
114 .map_err(|reason| match reason {
115 UpdateNotStartedReason::AlreadyInProgress => UpdateAttemptError::InstallInProgress,
116 })?;
117
118 info!("Update started with attempt id: {}", attempt_id);
119 Ok(UpdateAttempt { attempt_id, monitor })
120}
121
122pub async fn monitor_update(
125 attempt_id: Option<&str>,
126 installer_proxy: &InstallerProxy,
127) -> Result<Option<UpdateAttemptMonitor>, fidl::Error> {
128 let (monitor_client_end, monitor) = UpdateAttemptMonitor::new()?;
129
130 let attached = installer_proxy.monitor_update(attempt_id, monitor_client_end).await?;
131
132 if attached {
133 Ok(Some(monitor))
134 } else {
135 Ok(None)
136 }
137}
138
139impl fmt::Debug for UpdateAttemptMonitor {
140 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
141 f.debug_struct("UpdateAttemptMonitor").field("stream", &"MonitorRequestStream").finish()
142 }
143}
144
145impl Stream for UpdateAttemptMonitor {
146 type Item = Result<State, MonitorUpdateAttemptError>;
147
148 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
149 let UpdateAttemptMonitorProj { stream } = self.project();
150 let poll_res = match stream.poll_next(cx) {
151 Poll::Ready(None) => return Poll::Ready(None),
152 Poll::Ready(Some(res)) => res.map_err(MonitorUpdateAttemptError::FIDL)?,
153 Poll::Pending => return Poll::Pending,
154 };
155 let MonitorRequest::OnState { state, responder } = poll_res;
156 let _ = responder.send();
157 let state = state.try_into().map_err(MonitorUpdateAttemptError::DecodeState)?;
158 Poll::Ready(Some(Ok(state)))
159 }
160}
161
162impl Stream for UpdateAttempt {
163 type Item = Result<State, MonitorUpdateAttemptError>;
164
165 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
166 let UpdateAttemptProj { attempt_id: _, monitor } = self.project();
167 monitor.poll_next(cx)
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use assert_matches::assert_matches;
175 use fidl_fuchsia_update_installer::{
176 InstallationProgress, InstallerMarker, InstallerRequest, MonitorProxy,
177 };
178 use fuchsia_async as fasync;
179 use futures::stream::StreamExt;
180
181 const TEST_URL: &str = "fuchsia-pkg://fuchsia.com/update/0";
182
183 impl UpdateAttemptMonitor {
184 fn new_test() -> (TestAttempt, Self) {
187 let (monitor_client_end, monitor) = Self::new().unwrap();
188
189 (TestAttempt::new(monitor_client_end), monitor)
190 }
191 }
192
193 struct TestAttempt {
194 proxy: MonitorProxy,
195 }
196
197 impl TestAttempt {
198 fn new(monitor_client_end: ClientEnd<MonitorMarker>) -> Self {
201 let proxy = monitor_client_end.into_proxy();
202
203 Self { proxy }
204 }
205
206 async fn send_state_and_recv_ack(&mut self, state: State) {
207 self.send_raw_state_and_recv_ack(state.into()).await;
208 }
209
210 async fn send_raw_state_and_recv_ack(
211 &mut self,
212 state: fidl_fuchsia_update_installer::State,
213 ) {
214 let () = self.proxy.on_state(&state).await.unwrap();
215 }
216 }
217
218 #[fasync::run_singlethreaded(test)]
219 async fn update_attempt_monitor_forwards_and_acks_progress() {
220 let (mut send, monitor) = UpdateAttemptMonitor::new_test();
221
222 let expected_fetch_state = &State::Fetch(
223 UpdateInfoAndProgress::builder()
224 .info(UpdateInfo::builder().download_size(1000).build())
225 .progress(Progress::builder().fraction_completed(0.5).bytes_downloaded(500).build())
226 .build(),
227 );
228
229 let client_fut = async move {
230 assert_eq!(
231 monitor.try_collect::<Vec<State>>().await.unwrap(),
232 vec![State::Prepare, expected_fetch_state.clone()]
233 );
234 };
235
236 let server_fut = async move {
237 send.send_state_and_recv_ack(State::Prepare).await;
238 send.send_state_and_recv_ack(expected_fetch_state.clone()).await;
239 };
240
241 future::join(client_fut, server_fut).await;
242 }
243
244 #[fasync::run_singlethreaded(test)]
245 async fn update_attempt_monitor_rejects_invalid_state() {
246 let (mut send, mut monitor) = UpdateAttemptMonitor::new_test();
247
248 let client_fut = async move {
249 assert_matches!(
250 monitor.next().await.unwrap(),
251 Err(MonitorUpdateAttemptError::DecodeState(_))
252 );
253 assert_matches!(monitor.next().await, Some(Ok(State::Prepare)));
254 };
255
256 let server_fut = async move {
257 send.send_raw_state_and_recv_ack(fidl_fuchsia_update_installer::State::Fetch(
258 fidl_fuchsia_update_installer::FetchData {
259 info: Some(fidl_fuchsia_update_installer::UpdateInfo {
260 download_size: None,
261 ..Default::default()
262 }),
263 progress: Some(InstallationProgress {
264 fraction_completed: Some(2.0),
265 bytes_downloaded: None,
266 ..Default::default()
267 }),
268 ..Default::default()
269 },
270 ))
271 .await;
272
273 send.send_state_and_recv_ack(State::Prepare).await;
276 };
277
278 future::join(client_fut, server_fut).await;
279 }
280
281 #[fasync::run_singlethreaded(test)]
282 async fn start_update_forwards_args_and_returns_attempt_id() {
283 let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
284
285 let opts = Options {
286 initiator: Initiator::User,
287 allow_attach_to_existing_attempt: false,
288 should_write_recovery: true,
289 };
290
291 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
292
293 let (_reboot_controller, reboot_controller_server_end) =
294 fidl::endpoints::create_proxy::<RebootControllerMarker>();
295
296 let installer_fut = async move {
297 let returned_update_attempt =
298 start_update(&pkgurl, opts, &proxy, Some(reboot_controller_server_end))
299 .await
300 .unwrap();
301 assert_eq!(
302 returned_update_attempt.attempt_id(),
303 "00000000-0000-0000-0000-000000000001"
304 );
305 };
306
307 let stream_fut = async move {
308 match stream.next().await.unwrap() {
309 Ok(InstallerRequest::StartUpdate {
310 url,
311 options:
312 fidl_fuchsia_update_installer::Options {
313 initiator,
314 should_write_recovery,
315 allow_attach_to_existing_attempt,
316 ..
317 },
318 monitor: _,
319 reboot_controller,
320 responder,
321 }) => {
322 assert_eq!(url.url, TEST_URL);
323 assert_eq!(initiator, Some(fidl_fuchsia_update_installer::Initiator::User));
324 assert_matches!(reboot_controller, Some(_));
325 assert_eq!(should_write_recovery, Some(true));
326 assert_eq!(allow_attach_to_existing_attempt, Some(false));
327 responder.send(Ok("00000000-0000-0000-0000-000000000001")).unwrap();
328 }
329 request => panic!("Unexpected request: {request:?}"),
330 }
331 };
332 future::join(installer_fut, stream_fut).await;
333 }
334
335 #[fasync::run_singlethreaded(test)]
336 async fn test_install_error() {
337 let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
338
339 let opts = Options {
340 initiator: Initiator::User,
341 allow_attach_to_existing_attempt: false,
342 should_write_recovery: true,
343 };
344
345 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
346
347 let (_reboot_controller, reboot_controller_server_end) =
348 fidl::endpoints::create_proxy::<RebootControllerMarker>();
349
350 let installer_fut = async move {
351 let returned_update_attempt =
352 start_update(&pkgurl, opts, &proxy, Some(reboot_controller_server_end))
353 .await
354 .unwrap();
355
356 assert_eq!(
357 returned_update_attempt.try_collect::<Vec<State>>().await.unwrap(),
358 vec![State::FailPrepare(PrepareFailureReason::Internal)]
359 );
360 };
361
362 let stream_fut = async move {
363 match stream.next().await.unwrap() {
364 Ok(InstallerRequest::StartUpdate { monitor, responder, .. }) => {
365 responder.send(Ok("00000000-0000-0000-0000-000000000002")).unwrap();
366
367 let mut attempt = TestAttempt::new(monitor);
368 attempt
369 .send_state_and_recv_ack(State::FailPrepare(PrepareFailureReason::Internal))
370 .await;
371 }
372 request => panic!("Unexpected request: {request:?}"),
373 }
374 };
375 future::join(installer_fut, stream_fut).await;
376 }
377
378 #[fasync::run_singlethreaded(test)]
379 async fn start_update_forwards_fidl_error() {
380 let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
381
382 let opts = Options {
383 initiator: Initiator::User,
384 allow_attach_to_existing_attempt: false,
385 should_write_recovery: true,
386 };
387
388 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
389
390 let installer_fut = async move {
391 match start_update(&pkgurl, opts, &proxy, None).await {
392 Err(UpdateAttemptError::FIDL(_)) => {} _ => panic!("Unexpected result"),
394 }
395 };
396 let stream_fut = async move {
397 match stream.next().await.unwrap() {
398 Ok(InstallerRequest::StartUpdate { .. }) => {
399 }
401 request => panic!("Unexpected request: {request:?}"),
402 }
403 };
404 future::join(installer_fut, stream_fut).await;
405 }
406
407 #[fasync::run_singlethreaded(test)]
408 async fn test_state_decode_error() {
409 let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
410
411 let opts = Options {
412 initiator: Initiator::User,
413 allow_attach_to_existing_attempt: false,
414 should_write_recovery: true,
415 };
416
417 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
418
419 let (_reboot_controller, reboot_controller_server_end) =
420 fidl::endpoints::create_proxy::<RebootControllerMarker>();
421
422 let installer_fut = async move {
423 let mut returned_update_attempt =
424 start_update(&pkgurl, opts, &proxy, Some(reboot_controller_server_end))
425 .await
426 .unwrap();
427 assert_matches!(
428 returned_update_attempt.next().await,
429 Some(Err(MonitorUpdateAttemptError::DecodeState(
430 state::DecodeStateError::DecodeProgress(
431 state::DecodeProgressError::FractionCompletedOutOfRange
432 )
433 )))
434 );
435 };
436
437 let stream_fut = async move {
438 match stream.next().await.unwrap() {
439 Ok(InstallerRequest::StartUpdate { monitor, responder, .. }) => {
440 responder.send(Ok("00000000-0000-0000-0000-000000000002")).unwrap();
441
442 let mut monitor = TestAttempt::new(monitor);
443 monitor
444 .send_raw_state_and_recv_ack(fidl_fuchsia_update_installer::State::Fetch(
445 fidl_fuchsia_update_installer::FetchData {
446 info: Some(fidl_fuchsia_update_installer::UpdateInfo {
447 download_size: None,
448 ..Default::default()
449 }),
450 progress: Some(InstallationProgress {
451 fraction_completed: Some(2.0),
452 bytes_downloaded: None,
453 ..Default::default()
454 }),
455 ..Default::default()
456 },
457 ))
458 .await;
459 }
460 request => panic!("Unexpected request: {request:?}"),
461 }
462 };
463 future::join(installer_fut, stream_fut).await;
464 }
465
466 #[fasync::run_singlethreaded(test)]
467 async fn test_server_close_unexpectedly() {
468 let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
469
470 let opts = Options {
471 initiator: Initiator::User,
472 allow_attach_to_existing_attempt: false,
473 should_write_recovery: true,
474 };
475
476 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
477
478 let (_reboot_controller, reboot_controller_server_end) =
479 fidl::endpoints::create_proxy::<RebootControllerMarker>();
480
481 let expected_states = vec![
482 State::Prepare,
483 State::Fetch(
484 UpdateInfoAndProgress::builder()
485 .info(UpdateInfo::builder().download_size(0).build())
486 .progress(
487 Progress::builder().fraction_completed(0.0).bytes_downloaded(0).build(),
488 )
489 .build(),
490 ),
491 ];
492
493 let installer_fut = async move {
494 let returned_update_attempt =
495 start_update(&pkgurl, opts, &proxy, Some(reboot_controller_server_end))
496 .await
497 .unwrap();
498
499 assert_eq!(
500 returned_update_attempt.try_collect::<Vec<State>>().await.unwrap(),
501 expected_states,
502 );
503 };
504 let stream_fut = async move {
505 match stream.next().await.unwrap() {
506 Ok(InstallerRequest::StartUpdate { monitor, responder, .. }) => {
507 responder.send(Ok("00000000-0000-0000-0000-000000000003")).unwrap();
508
509 let mut monitor = TestAttempt::new(monitor);
510 monitor.send_state_and_recv_ack(State::Prepare).await;
511 monitor
512 .send_raw_state_and_recv_ack(fidl_fuchsia_update_installer::State::Fetch(
513 fidl_fuchsia_update_installer::FetchData {
514 info: Some(fidl_fuchsia_update_installer::UpdateInfo {
515 download_size: None,
516 ..Default::default()
517 }),
518 progress: Some(InstallationProgress {
519 fraction_completed: Some(0.0),
520 bytes_downloaded: None,
521 ..Default::default()
522 }),
523 ..Default::default()
524 },
525 ))
526 .await;
527
528 }
532 request => panic!("Unexpected request: {request:?}"),
533 }
534 };
535 future::join(installer_fut, stream_fut).await;
536 }
537
538 #[fasync::run_singlethreaded(test)]
539 async fn monitor_update_uses_provided_attempt_id() {
540 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
541
542 let client_fut = async move {
543 let _ = monitor_update(Some("id"), &proxy).await;
544 };
545
546 let server_fut = async move {
547 match stream.next().await.unwrap().unwrap() {
548 InstallerRequest::MonitorUpdate { attempt_id, .. } => {
549 assert_eq!(attempt_id.as_deref(), Some("id"));
550 }
551 request => panic!("Unexpected request: {request:?}"),
552 }
553 };
554
555 future::join(client_fut, server_fut).await;
556 }
557
558 #[fasync::run_singlethreaded(test)]
559 async fn monitor_update_handles_no_update_in_progress() {
560 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
561
562 let client_fut = async move {
563 assert_matches!(monitor_update(None, &proxy).await, Ok(None));
564 };
565
566 let server_fut = async move {
567 match stream.next().await.unwrap().unwrap() {
568 InstallerRequest::MonitorUpdate { attempt_id, monitor, responder } => {
569 assert_eq!(attempt_id, None);
570 drop(monitor);
571 responder.send(false).unwrap();
572 }
573 request => panic!("Unexpected request: {request:?}"),
574 }
575 assert_matches!(stream.next().await, None);
576 };
577
578 future::join(client_fut, server_fut).await;
579 }
580
581 #[fasync::run_singlethreaded(test)]
582 async fn monitor_update_forwards_fidl_error() {
583 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
584
585 let client_fut = async move {
586 assert_matches!(monitor_update(None, &proxy).await, Err(_));
587 };
588 let server_fut = async move {
589 match stream.next().await.unwrap() {
590 Ok(InstallerRequest::MonitorUpdate { .. }) => {
591 }
593 request => panic!("Unexpected request: {request:?}"),
594 }
595 };
596 future::join(client_fut, server_fut).await;
597 }
598
599 #[fasync::run_singlethreaded(test)]
600 async fn monitor_update_forwards_and_acks_progress() {
601 let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
602
603 let client_fut = async move {
604 let monitor = monitor_update(None, &proxy).await.unwrap().unwrap();
605
606 assert_eq!(
607 monitor.try_collect::<Vec<State>>().await.unwrap(),
608 vec![State::Prepare, State::FailPrepare(PrepareFailureReason::Internal)]
609 );
610 };
611
612 let server_fut = async move {
613 match stream.next().await.unwrap().unwrap() {
614 InstallerRequest::MonitorUpdate { attempt_id, monitor, responder } => {
615 assert_eq!(attempt_id, None);
616 responder.send(true).unwrap();
617 let mut monitor = TestAttempt::new(monitor);
618
619 monitor.send_state_and_recv_ack(State::Prepare).await;
620 monitor
621 .send_state_and_recv_ack(State::FailPrepare(PrepareFailureReason::Internal))
622 .await;
623 }
624 request => panic!("Unexpected request: {request:?}"),
625 }
626 assert_matches!(stream.next().await, None);
627 };
628
629 future::join(client_fut, server_fut).await;
630 }
631}