Skip to main content

fidl_fuchsia_update_installer_ext/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![deny(missing_docs)]
6
7//! `fidl_fuchsia_update_installer_ext` contains wrapper types around the auto-generated
8//! `fidl_fuchsia_update_installer` bindings.
9
10pub mod state;
11pub use state::{
12    FailFetchData, FailStageData, FetchFailureReason, PrepareFailureReason, Progress,
13    StageFailureReason, State, StateId, UpdateInfo, UpdateInfoAndProgress,
14};
15
16pub mod options;
17pub use options::{Initiator, Options};
18
19use fdomain_client::fidl::Proxy;
20use fdomain_fuchsia_update_installer as fd_installer;
21use fidl::endpoints::{ClientEnd, ServerEnd};
22use fidl_fuchsia_update_installer::{
23    InstallerProxy, MonitorMarker, MonitorRequest, MonitorRequestStream, RebootControllerMarker,
24    UpdateNotStartedReason,
25};
26use futures::prelude::*;
27use futures::task::{Context, Poll};
28use log::info;
29use pin_project::pin_project;
30use std::fmt;
31use std::pin::Pin;
32use std::sync::Arc;
33use thiserror::Error;
34
35/// Describes the errors encountered by UpdateAttempt.
36#[derive(Debug, Error)]
37pub enum UpdateAttemptError {
38    /// Fidl error.
39    #[error("FIDL error")]
40    FIDL(#[source] fidl::Error),
41
42    /// Install already in progress.
43    #[error("an installation was already in progress")]
44    InstallInProgress,
45}
46
47/// Describes the errors encountered by the UpdateAttempt's monitor stream.
48#[derive(Debug, Error)]
49pub enum MonitorUpdateAttemptError {
50    /// Fidl error.
51    #[error("FIDL error")]
52    FIDL(#[source] fidl::Error),
53
54    /// Error while decoding a [`fidl_fuchsia_update_installer::State`].
55    #[error("unable to decode State")]
56    DecodeState(#[source] state::DecodeStateError),
57}
58
59/// An update attempt.
60#[pin_project(project = UpdateAttemptProj)]
61#[derive(Debug)]
62pub struct UpdateAttempt {
63    /// UUID identifying the update attempt.
64    attempt_id: String,
65
66    /// The monitor for this update attempt.
67    #[pin]
68    monitor: UpdateAttemptMonitor,
69}
70
71/// A remote update attempt.
72#[pin_project(project = UpdateAttemptFDomainProj)]
73#[derive(Debug)]
74pub struct UpdateAttemptFDomain {
75    /// UUID identifying the update attempt.
76    attempt_id: String,
77
78    /// The monitor for this update attempt.
79    #[pin]
80    monitor: UpdateAttemptMonitorFDomain,
81}
82
83/// A monitor of an update attempt.
84#[pin_project(project = UpdateAttemptMonitorFDomainProj)]
85pub struct UpdateAttemptMonitorFDomain {
86    /// Server end of a fdomain_fuchsia_update_installer.Monitor protocol.
87    #[pin]
88    stream: fd_installer::MonitorRequestStream,
89}
90
91/// A monitor of an update attempt.
92#[pin_project(project = UpdateAttemptMonitorProj)]
93pub struct UpdateAttemptMonitor {
94    /// Server end of a fidl_fuchsia_update_installer.Monitor protocol.
95    #[pin]
96    stream: MonitorRequestStream,
97}
98
99impl UpdateAttempt {
100    /// Getter for the attempt_id.
101    pub fn attempt_id(&self) -> &str {
102        &self.attempt_id
103    }
104}
105
106impl UpdateAttemptFDomain {
107    /// Getter for the attempt_id.
108    pub fn attempt_id(&self) -> &str {
109        &self.attempt_id
110    }
111}
112
113impl UpdateAttemptMonitorFDomain {
114    fn new(
115        client: Arc<fdomain_client::Client>,
116    ) -> Result<(fdomain_client::fidl::ClientEnd<fd_installer::MonitorMarker>, Self), fidl::Error>
117    {
118        let (monitor_client_end, stream) =
119            client.create_request_stream::<fd_installer::MonitorMarker>();
120
121        Ok((monitor_client_end, Self { stream }))
122    }
123
124    /// Create a new UpdateAttemptMonitorFDomain using the given stream.
125    pub fn from_stream(stream: fd_installer::MonitorRequestStream) -> Self {
126        Self { stream }
127    }
128}
129
130impl UpdateAttemptMonitor {
131    fn new() -> Result<(ClientEnd<MonitorMarker>, Self), fidl::Error> {
132        let (monitor_client_end, stream) =
133            fidl::endpoints::create_request_stream::<MonitorMarker>();
134
135        Ok((monitor_client_end, Self { stream }))
136    }
137
138    /// Create a new UpdateAttemptMonitor using the given stream.
139    pub fn from_stream(stream: MonitorRequestStream) -> Self {
140        Self { stream }
141    }
142}
143
144/// Checks if an update can be started and returns the UpdateAttempt containing
145/// the attempt_id and MonitorRequestStream to the client.
146pub async fn start_update(
147    update_url: &http::Uri,
148    options: Options,
149    installer_proxy: &InstallerProxy,
150    reboot_controller_server_end: Option<ServerEnd<RebootControllerMarker>>,
151) -> Result<UpdateAttempt, UpdateAttemptError> {
152    let url = fidl_fuchsia_pkg::PackageUrl { url: update_url.to_string() };
153    let (monitor_client_end, monitor) =
154        UpdateAttemptMonitor::new().map_err(UpdateAttemptError::FIDL)?;
155
156    let attempt_id = installer_proxy
157        .start_update(&url, &options.into(), monitor_client_end, reboot_controller_server_end)
158        .await
159        .map_err(UpdateAttemptError::FIDL)?
160        .map_err(|reason| match reason {
161            UpdateNotStartedReason::AlreadyInProgress => UpdateAttemptError::InstallInProgress,
162        })?;
163
164    info!("Update started with attempt id: {}", attempt_id);
165    Ok(UpdateAttempt { attempt_id, monitor })
166}
167
168/// Checks if an update can be started and returns the UpdateAttempt containing
169/// the attempt_id and MonitorRequestStream to the client.
170pub async fn start_update_fdomain(
171    update_url: &http::Uri,
172    options: Options,
173    installer_proxy: &fd_installer::InstallerProxy,
174    reboot_controller_server_end: Option<
175        fdomain_client::fidl::ServerEnd<fd_installer::RebootControllerMarker>,
176    >,
177) -> Result<UpdateAttemptFDomain, UpdateAttemptError> {
178    let url = fidl_fuchsia_pkg::PackageUrl { url: update_url.to_string() };
179    let (monitor_client_end, monitor) = UpdateAttemptMonitorFDomain::new(installer_proxy.domain())
180        .map_err(UpdateAttemptError::FIDL)?;
181
182    let attempt_id = installer_proxy
183        .start_update(&url, &options.into(), monitor_client_end, reboot_controller_server_end)
184        .await
185        .map_err(UpdateAttemptError::FIDL)?
186        .map_err(|reason| match reason {
187            UpdateNotStartedReason::AlreadyInProgress => UpdateAttemptError::InstallInProgress,
188        })?;
189
190    info!("Update started with attempt id: {}", attempt_id);
191    Ok(UpdateAttemptFDomain { attempt_id, monitor })
192}
193
194/// Monitors the running update attempt given by `attempt_id`, or any running update attempt if no
195/// `attempt_id` is provided.
196pub async fn monitor_update(
197    attempt_id: Option<&str>,
198    installer_proxy: &InstallerProxy,
199) -> Result<Option<UpdateAttemptMonitor>, fidl::Error> {
200    let (monitor_client_end, monitor) = UpdateAttemptMonitor::new()?;
201
202    let attached = installer_proxy.monitor_update(attempt_id, monitor_client_end).await?;
203
204    if attached { Ok(Some(monitor)) } else { Ok(None) }
205}
206
207impl fmt::Debug for UpdateAttemptMonitor {
208    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209        f.debug_struct("UpdateAttemptMonitor").field("stream", &"MonitorRequestStream").finish()
210    }
211}
212
213impl Stream for UpdateAttemptMonitor {
214    type Item = Result<State, MonitorUpdateAttemptError>;
215
216    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
217        let UpdateAttemptMonitorProj { stream } = self.project();
218        let poll_res = match stream.poll_next(cx) {
219            Poll::Ready(None) => return Poll::Ready(None),
220            Poll::Ready(Some(res)) => res.map_err(MonitorUpdateAttemptError::FIDL)?,
221            Poll::Pending => return Poll::Pending,
222        };
223        let MonitorRequest::OnState { state, responder } = poll_res;
224        let _ = responder.send();
225        let state = state.try_into().map_err(MonitorUpdateAttemptError::DecodeState)?;
226        Poll::Ready(Some(Ok(state)))
227    }
228}
229
230impl fmt::Debug for UpdateAttemptMonitorFDomain {
231    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
232        f.debug_struct("UpdateAttemptMonitor").field("stream", &"MonitorRequestStream").finish()
233    }
234}
235
236impl Stream for UpdateAttemptMonitorFDomain {
237    type Item = Result<State, MonitorUpdateAttemptError>;
238
239    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
240        let UpdateAttemptMonitorFDomainProj { stream } = self.project();
241        let poll_res = match stream.poll_next(cx) {
242            Poll::Ready(None) => return Poll::Ready(None),
243            Poll::Ready(Some(res)) => res.map_err(MonitorUpdateAttemptError::FIDL)?,
244            Poll::Pending => return Poll::Pending,
245        };
246        let fd_installer::MonitorRequest::OnState { state, responder } = poll_res;
247        let _ = responder.send();
248        let state = state.try_into().map_err(MonitorUpdateAttemptError::DecodeState)?;
249        Poll::Ready(Some(Ok(state)))
250    }
251}
252
253impl Stream for UpdateAttempt {
254    type Item = Result<State, MonitorUpdateAttemptError>;
255
256    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
257        let UpdateAttemptProj { attempt_id: _, monitor } = self.project();
258        monitor.poll_next(cx)
259    }
260}
261
262impl Stream for UpdateAttemptFDomain {
263    type Item = Result<State, MonitorUpdateAttemptError>;
264
265    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
266        let UpdateAttemptFDomainProj { attempt_id: _, monitor } = self.project();
267        monitor.poll_next(cx)
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274    use assert_matches::assert_matches;
275    use fidl_fuchsia_update_installer::{
276        InstallationProgress, InstallerMarker, InstallerRequest, MonitorProxy,
277    };
278    use fuchsia_async as fasync;
279    use futures::stream::StreamExt;
280
281    const TEST_URL: &str = "fuchsia-pkg://fuchsia.com/update/0";
282
283    impl UpdateAttemptMonitor {
284        /// Returns an UpdateAttemptMonitor and a TestAttempt that can be used to send states to
285        /// the monitor.
286        fn new_test() -> (TestAttempt, Self) {
287            let (monitor_client_end, monitor) = Self::new().unwrap();
288
289            (TestAttempt::new(monitor_client_end), monitor)
290        }
291    }
292
293    struct TestAttempt {
294        proxy: MonitorProxy,
295    }
296
297    impl TestAttempt {
298        /// Wraps the given monitor proxy in a helper type that verifies sending state to the
299        /// remote end of the Monitor results in state being acknowledged as expected.
300        fn new(monitor_client_end: ClientEnd<MonitorMarker>) -> Self {
301            let proxy = monitor_client_end.into_proxy();
302
303            Self { proxy }
304        }
305
306        async fn send_state_and_recv_ack(&mut self, state: State) {
307            self.send_raw_state_and_recv_ack(state.into()).await;
308        }
309
310        async fn send_raw_state_and_recv_ack(
311            &mut self,
312            state: fidl_fuchsia_update_installer::State,
313        ) {
314            let () = self.proxy.on_state(&state).await.unwrap();
315        }
316    }
317
318    struct TestAttemptFDomain {
319        proxy: fd_installer::MonitorProxy,
320    }
321
322    impl TestAttemptFDomain {
323        /// Wraps the given monitor proxy in a helper type that verifies sending state to the
324        /// remote end of the Monitor results in state being acknowledged as expected.
325        fn new(
326            monitor_client_end: fdomain_client::fidl::ClientEnd<fd_installer::MonitorMarker>,
327        ) -> Self {
328            let proxy = monitor_client_end.into_proxy();
329
330            Self { proxy }
331        }
332
333        async fn send_state_and_recv_ack(&mut self, state: State) {
334            self.send_raw_state_and_recv_ack(state.into()).await;
335        }
336
337        async fn send_raw_state_and_recv_ack(
338            &mut self,
339            state: fidl_fuchsia_update_installer::State,
340        ) {
341            let () = self.proxy.on_state(&state).await.unwrap();
342        }
343
344        async fn close(self) {
345            use fdomain_client::HandleBased;
346            let channel = self.proxy.into_channel().expect("into_channel failed");
347            let _ = channel.close().await;
348        }
349    }
350
351    impl UpdateAttemptMonitorFDomain {
352        /// Returns an UpdateAttemptMonitorFDomain and a TestAttemptFDomain that can be used to send states to
353        /// the monitor.
354        fn new_test() -> (TestAttemptFDomain, Self, Arc<fdomain_client::Client>) {
355            let client = fdomain_local::local_client_empty();
356            let (monitor_client_end, monitor) = Self::new(client.clone()).unwrap();
357
358            (TestAttemptFDomain::new(monitor_client_end), monitor, client)
359        }
360    }
361
362    #[fasync::run_singlethreaded(test)]
363    async fn update_attempt_monitor_forwards_and_acks_progress() {
364        let (mut send, monitor) = UpdateAttemptMonitor::new_test();
365
366        let expected_fetch_state = &State::Fetch(
367            UpdateInfoAndProgress::builder()
368                .info(UpdateInfo::builder().download_size(1000).build())
369                .progress(Progress::builder().fraction_completed(0.5).bytes_downloaded(500).build())
370                .build(),
371        );
372
373        let client_fut = async move {
374            assert_eq!(
375                monitor.try_collect::<Vec<State>>().await.unwrap(),
376                vec![State::Prepare, expected_fetch_state.clone()]
377            );
378        };
379
380        let server_fut = async move {
381            send.send_state_and_recv_ack(State::Prepare).await;
382            send.send_state_and_recv_ack(expected_fetch_state.clone()).await;
383        };
384
385        future::join(client_fut, server_fut).await;
386    }
387
388    #[fasync::run_singlethreaded(test)]
389    async fn update_attempt_monitor_rejects_invalid_state() {
390        let (mut send, mut monitor) = UpdateAttemptMonitor::new_test();
391
392        let client_fut = async move {
393            assert_matches!(
394                monitor.next().await.unwrap(),
395                Err(MonitorUpdateAttemptError::DecodeState(_))
396            );
397            assert_matches!(monitor.next().await, Some(Ok(State::Prepare)));
398        };
399
400        let server_fut = async move {
401            send.send_raw_state_and_recv_ack(fidl_fuchsia_update_installer::State::Fetch(
402                fidl_fuchsia_update_installer::FetchData {
403                    info: Some(fidl_fuchsia_update_installer::UpdateInfo {
404                        download_size: None,
405                        ..Default::default()
406                    }),
407                    progress: Some(InstallationProgress {
408                        fraction_completed: Some(2.0),
409                        bytes_downloaded: None,
410                        ..Default::default()
411                    }),
412                    ..Default::default()
413                },
414            ))
415            .await;
416
417            // Even though the previous state was invalid and the monitor stream yielded an error,
418            // further states will continue to be processed by the client.
419            send.send_state_and_recv_ack(State::Prepare).await;
420        };
421
422        future::join(client_fut, server_fut).await;
423    }
424
425    #[fasync::run_singlethreaded(test)]
426    async fn start_update_forwards_args_and_returns_attempt_id() {
427        let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
428
429        let opts = Options {
430            initiator: Initiator::User,
431            allow_attach_to_existing_attempt: false,
432            should_write_recovery: true,
433        };
434
435        let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
436
437        let (_reboot_controller, reboot_controller_server_end) =
438            fidl::endpoints::create_proxy::<RebootControllerMarker>();
439
440        let installer_fut = async move {
441            let returned_update_attempt =
442                start_update(&pkgurl, opts, &proxy, Some(reboot_controller_server_end))
443                    .await
444                    .unwrap();
445            assert_eq!(
446                returned_update_attempt.attempt_id(),
447                "00000000-0000-0000-0000-000000000001"
448            );
449        };
450
451        let stream_fut = async move {
452            match stream.next().await.unwrap() {
453                Ok(InstallerRequest::StartUpdate {
454                    url,
455                    options:
456                        fidl_fuchsia_update_installer::Options {
457                            initiator,
458                            should_write_recovery,
459                            allow_attach_to_existing_attempt,
460                            ..
461                        },
462                    monitor: _,
463                    reboot_controller,
464                    responder,
465                }) => {
466                    assert_eq!(url.url, TEST_URL);
467                    assert_eq!(initiator, Some(fidl_fuchsia_update_installer::Initiator::User));
468                    assert_matches!(reboot_controller, Some(_));
469                    assert_eq!(should_write_recovery, Some(true));
470                    assert_eq!(allow_attach_to_existing_attempt, Some(false));
471                    responder.send(Ok("00000000-0000-0000-0000-000000000001")).unwrap();
472                }
473                request => panic!("Unexpected request: {request:?}"),
474            }
475        };
476        future::join(installer_fut, stream_fut).await;
477    }
478
479    #[fasync::run_singlethreaded(test)]
480    async fn test_install_error() {
481        let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
482
483        let opts = Options {
484            initiator: Initiator::User,
485            allow_attach_to_existing_attempt: false,
486            should_write_recovery: true,
487        };
488
489        let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
490
491        let (_reboot_controller, reboot_controller_server_end) =
492            fidl::endpoints::create_proxy::<RebootControllerMarker>();
493
494        let installer_fut = async move {
495            let returned_update_attempt =
496                start_update(&pkgurl, opts, &proxy, Some(reboot_controller_server_end))
497                    .await
498                    .unwrap();
499
500            assert_eq!(
501                returned_update_attempt.try_collect::<Vec<State>>().await.unwrap(),
502                vec![State::FailPrepare(PrepareFailureReason::Internal)]
503            );
504        };
505
506        let stream_fut = async move {
507            match stream.next().await.unwrap() {
508                Ok(InstallerRequest::StartUpdate { monitor, responder, .. }) => {
509                    responder.send(Ok("00000000-0000-0000-0000-000000000002")).unwrap();
510
511                    let mut attempt = TestAttempt::new(monitor);
512                    attempt
513                        .send_state_and_recv_ack(State::FailPrepare(PrepareFailureReason::Internal))
514                        .await;
515                }
516                request => panic!("Unexpected request: {request:?}"),
517            }
518        };
519        future::join(installer_fut, stream_fut).await;
520    }
521
522    #[fasync::run_singlethreaded(test)]
523    async fn start_update_forwards_fidl_error() {
524        let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
525
526        let opts = Options {
527            initiator: Initiator::User,
528            allow_attach_to_existing_attempt: false,
529            should_write_recovery: true,
530        };
531
532        let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
533
534        let installer_fut = async move {
535            match start_update(&pkgurl, opts, &proxy, None).await {
536                Err(UpdateAttemptError::FIDL(_)) => {} // expected
537                _ => panic!("Unexpected result"),
538            }
539        };
540        let stream_fut = async move {
541            match stream.next().await.unwrap() {
542                Ok(InstallerRequest::StartUpdate { .. }) => {
543                    // Don't send attempt id.
544                }
545                request => panic!("Unexpected request: {request:?}"),
546            }
547        };
548        future::join(installer_fut, stream_fut).await;
549    }
550
551    #[fasync::run_singlethreaded(test)]
552    async fn test_state_decode_error() {
553        let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
554
555        let opts = Options {
556            initiator: Initiator::User,
557            allow_attach_to_existing_attempt: false,
558            should_write_recovery: true,
559        };
560
561        let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
562
563        let (_reboot_controller, reboot_controller_server_end) =
564            fidl::endpoints::create_proxy::<RebootControllerMarker>();
565
566        let installer_fut = async move {
567            let mut returned_update_attempt =
568                start_update(&pkgurl, opts, &proxy, Some(reboot_controller_server_end))
569                    .await
570                    .unwrap();
571            assert_matches!(
572                returned_update_attempt.next().await,
573                Some(Err(MonitorUpdateAttemptError::DecodeState(
574                    state::DecodeStateError::DecodeProgress(
575                        state::DecodeProgressError::FractionCompletedOutOfRange
576                    )
577                )))
578            );
579        };
580
581        let stream_fut = async move {
582            match stream.next().await.unwrap() {
583                Ok(InstallerRequest::StartUpdate { monitor, responder, .. }) => {
584                    responder.send(Ok("00000000-0000-0000-0000-000000000002")).unwrap();
585
586                    let mut monitor = TestAttempt::new(monitor);
587                    monitor
588                        .send_raw_state_and_recv_ack(fidl_fuchsia_update_installer::State::Fetch(
589                            fidl_fuchsia_update_installer::FetchData {
590                                info: Some(fidl_fuchsia_update_installer::UpdateInfo {
591                                    download_size: None,
592                                    ..Default::default()
593                                }),
594                                progress: Some(InstallationProgress {
595                                    fraction_completed: Some(2.0),
596                                    bytes_downloaded: None,
597                                    ..Default::default()
598                                }),
599                                ..Default::default()
600                            },
601                        ))
602                        .await;
603                }
604                request => panic!("Unexpected request: {request:?}"),
605            }
606        };
607        future::join(installer_fut, stream_fut).await;
608    }
609
610    #[fasync::run_singlethreaded(test)]
611    async fn test_server_close_unexpectedly() {
612        let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
613
614        let opts = Options {
615            initiator: Initiator::User,
616            allow_attach_to_existing_attempt: false,
617            should_write_recovery: true,
618        };
619
620        let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
621
622        let (_reboot_controller, reboot_controller_server_end) =
623            fidl::endpoints::create_proxy::<RebootControllerMarker>();
624
625        let expected_states = vec![
626            State::Prepare,
627            State::Fetch(
628                UpdateInfoAndProgress::builder()
629                    .info(UpdateInfo::builder().download_size(0).build())
630                    .progress(
631                        Progress::builder().fraction_completed(0.0).bytes_downloaded(0).build(),
632                    )
633                    .build(),
634            ),
635        ];
636
637        let installer_fut = async move {
638            let returned_update_attempt =
639                start_update(&pkgurl, opts, &proxy, Some(reboot_controller_server_end))
640                    .await
641                    .unwrap();
642
643            assert_eq!(
644                returned_update_attempt.try_collect::<Vec<State>>().await.unwrap(),
645                expected_states,
646            );
647        };
648        let stream_fut = async move {
649            match stream.next().await.unwrap() {
650                Ok(InstallerRequest::StartUpdate { monitor, responder, .. }) => {
651                    responder.send(Ok("00000000-0000-0000-0000-000000000003")).unwrap();
652
653                    let mut monitor = TestAttempt::new(monitor);
654                    monitor.send_state_and_recv_ack(State::Prepare).await;
655                    monitor
656                        .send_raw_state_and_recv_ack(fidl_fuchsia_update_installer::State::Fetch(
657                            fidl_fuchsia_update_installer::FetchData {
658                                info: Some(fidl_fuchsia_update_installer::UpdateInfo {
659                                    download_size: None,
660                                    ..Default::default()
661                                }),
662                                progress: Some(InstallationProgress {
663                                    fraction_completed: Some(0.0),
664                                    bytes_downloaded: None,
665                                    ..Default::default()
666                                }),
667                                ..Default::default()
668                            },
669                        ))
670                        .await;
671
672                    // monitor never sends a terminal state, but the client stream doesn't mind.
673                    // Higher layers of the system (ex. omaha-client/system-update-checker) convert
674                    // this situation into an error.
675                }
676                request => panic!("Unexpected request: {request:?}"),
677            }
678        };
679        future::join(installer_fut, stream_fut).await;
680    }
681
682    #[fasync::run_singlethreaded(test)]
683    async fn monitor_update_uses_provided_attempt_id() {
684        let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
685
686        let client_fut = async move {
687            let _ = monitor_update(Some("id"), &proxy).await;
688        };
689
690        let server_fut = async move {
691            match stream.next().await.unwrap().unwrap() {
692                InstallerRequest::MonitorUpdate { attempt_id, .. } => {
693                    assert_eq!(attempt_id.as_deref(), Some("id"));
694                }
695                request => panic!("Unexpected request: {request:?}"),
696            }
697        };
698
699        future::join(client_fut, server_fut).await;
700    }
701
702    #[fasync::run_singlethreaded(test)]
703    async fn monitor_update_handles_no_update_in_progress() {
704        let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
705
706        let client_fut = async move {
707            assert_matches!(monitor_update(None, &proxy).await, Ok(None));
708        };
709
710        let server_fut = async move {
711            match stream.next().await.unwrap().unwrap() {
712                InstallerRequest::MonitorUpdate { attempt_id, monitor, responder } => {
713                    assert_eq!(attempt_id, None);
714                    drop(monitor);
715                    responder.send(false).unwrap();
716                }
717                request => panic!("Unexpected request: {request:?}"),
718            }
719            assert_matches!(stream.next().await, None);
720        };
721
722        future::join(client_fut, server_fut).await;
723    }
724
725    #[fasync::run_singlethreaded(test)]
726    async fn monitor_update_forwards_fidl_error() {
727        let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
728
729        let client_fut = async move {
730            assert_matches!(monitor_update(None, &proxy).await, Err(_));
731        };
732        let server_fut = async move {
733            match stream.next().await.unwrap() {
734                Ok(InstallerRequest::MonitorUpdate { .. }) => {
735                    // Close the channel instead of sending a response.
736                }
737                request => panic!("Unexpected request: {request:?}"),
738            }
739        };
740        future::join(client_fut, server_fut).await;
741    }
742
743    #[fasync::run_singlethreaded(test)]
744    async fn monitor_update_forwards_and_acks_progress() {
745        let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
746
747        let client_fut = async move {
748            let monitor = monitor_update(None, &proxy).await.unwrap().unwrap();
749
750            assert_eq!(
751                monitor.try_collect::<Vec<State>>().await.unwrap(),
752                vec![State::Prepare, State::FailPrepare(PrepareFailureReason::Internal)]
753            );
754        };
755
756        let server_fut = async move {
757            match stream.next().await.unwrap().unwrap() {
758                InstallerRequest::MonitorUpdate { attempt_id, monitor, responder } => {
759                    assert_eq!(attempt_id, None);
760                    responder.send(true).unwrap();
761                    let mut monitor = TestAttempt::new(monitor);
762
763                    monitor.send_state_and_recv_ack(State::Prepare).await;
764                    monitor
765                        .send_state_and_recv_ack(State::FailPrepare(PrepareFailureReason::Internal))
766                        .await;
767                }
768                request => panic!("Unexpected request: {request:?}"),
769            }
770            assert_matches!(stream.next().await, None);
771        };
772
773        future::join(client_fut, server_fut).await;
774    }
775    #[fasync::run_singlethreaded(test)]
776    async fn update_attempt_monitor_fdomain_forwards_and_acks_progress() {
777        let (mut send, monitor, _client) = UpdateAttemptMonitorFDomain::new_test();
778
779        let expected_fetch_state = &State::Fetch(
780            UpdateInfoAndProgress::builder()
781                .info(UpdateInfo::builder().download_size(1000).build())
782                .progress(Progress::builder().fraction_completed(0.5).bytes_downloaded(500).build())
783                .build(),
784        );
785
786        let client_fut = async move {
787            assert_eq!(
788                monitor.try_collect::<Vec<State>>().await.unwrap(),
789                vec![State::Prepare, expected_fetch_state.clone()]
790            );
791        };
792
793        let server_fut = async move {
794            send.send_state_and_recv_ack(State::Prepare).await;
795            send.send_state_and_recv_ack(expected_fetch_state.clone()).await;
796            send.close().await;
797        };
798
799        future::join(client_fut, server_fut).await;
800    }
801
802    #[fasync::run_singlethreaded(test)]
803    async fn update_attempt_monitor_fdomain_rejects_invalid_state() {
804        let (mut send, mut monitor, _client) = UpdateAttemptMonitorFDomain::new_test();
805
806        let client_fut = async move {
807            assert_matches!(
808                monitor.next().await.unwrap(),
809                Err(MonitorUpdateAttemptError::DecodeState(_))
810            );
811            assert_matches!(monitor.next().await, Some(Ok(State::Prepare)));
812        };
813
814        let server_fut = async move {
815            send.send_raw_state_and_recv_ack(fidl_fuchsia_update_installer::State::Fetch(
816                fidl_fuchsia_update_installer::FetchData {
817                    info: Some(fidl_fuchsia_update_installer::UpdateInfo {
818                        download_size: None,
819                        ..Default::default()
820                    }),
821                    progress: Some(InstallationProgress {
822                        fraction_completed: Some(2.0),
823                        bytes_downloaded: None,
824                        ..Default::default()
825                    }),
826                    ..Default::default()
827                },
828            ))
829            .await;
830
831            // Even though the previous state was invalid and the monitor stream yielded an error,
832            // further states will continue to be processed by the client.
833            send.send_state_and_recv_ack(State::Prepare).await;
834        };
835
836        future::join(client_fut, server_fut).await;
837    }
838
839    #[fasync::run_singlethreaded(test)]
840    async fn start_update_fdomain_forwards_args_and_returns_attempt_id() {
841        let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
842
843        let opts = Options {
844            initiator: Initiator::User,
845            allow_attach_to_existing_attempt: false,
846            should_write_recovery: true,
847        };
848
849        let client = fdomain_local::local_client_empty();
850        let (proxy, mut stream) = client.create_proxy_and_stream::<fd_installer::InstallerMarker>();
851
852        let (_reboot_controller, reboot_controller_server_end) =
853            client.create_proxy::<fd_installer::RebootControllerMarker>();
854
855        let installer_fut = async move {
856            let returned_update_attempt =
857                start_update_fdomain(&pkgurl, opts, &proxy, Some(reboot_controller_server_end))
858                    .await
859                    .unwrap();
860            assert_eq!(
861                returned_update_attempt.attempt_id(),
862                "00000000-0000-0000-0000-000000000001"
863            );
864        };
865
866        let stream_fut = async move {
867            match stream.next().await.unwrap() {
868                Ok(fd_installer::InstallerRequest::StartUpdate {
869                    url,
870                    options:
871                        fidl_fuchsia_update_installer::Options {
872                            initiator,
873                            should_write_recovery,
874                            allow_attach_to_existing_attempt,
875                            ..
876                        },
877                    monitor: _,
878                    reboot_controller,
879                    responder,
880                }) => {
881                    assert_eq!(url.url, TEST_URL);
882                    assert_eq!(initiator, Some(fidl_fuchsia_update_installer::Initiator::User));
883                    assert_matches!(reboot_controller, Some(_));
884                    assert_eq!(should_write_recovery, Some(true));
885                    assert_eq!(allow_attach_to_existing_attempt, Some(false));
886                    responder.send(Ok("00000000-0000-0000-0000-000000000001")).unwrap();
887                }
888                request => panic!("Unexpected request: {request:?}"),
889            }
890        };
891        future::join(installer_fut, stream_fut).await;
892    }
893
894    #[fasync::run_singlethreaded(test)]
895    async fn test_install_error_fdomain() {
896        let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
897
898        let opts = Options {
899            initiator: Initiator::User,
900            allow_attach_to_existing_attempt: false,
901            should_write_recovery: true,
902        };
903
904        let client = fdomain_local::local_client_empty();
905        let (proxy, mut stream) = client.create_proxy_and_stream::<fd_installer::InstallerMarker>();
906
907        let (_reboot_controller, reboot_controller_server_end) =
908            client.create_proxy::<fd_installer::RebootControllerMarker>();
909
910        let installer_fut = async move {
911            let returned_update_attempt =
912                start_update_fdomain(&pkgurl, opts, &proxy, Some(reboot_controller_server_end))
913                    .await
914                    .unwrap();
915
916            assert_eq!(
917                returned_update_attempt.try_collect::<Vec<State>>().await.unwrap(),
918                vec![State::FailPrepare(PrepareFailureReason::Internal)]
919            );
920        };
921
922        let stream_fut = async move {
923            match stream.next().await.unwrap() {
924                Ok(fd_installer::InstallerRequest::StartUpdate { monitor, responder, .. }) => {
925                    responder.send(Ok("00000000-0000-0000-0000-000000000002")).unwrap();
926
927                    let mut attempt = TestAttemptFDomain::new(monitor);
928                    attempt
929                        .send_state_and_recv_ack(State::FailPrepare(PrepareFailureReason::Internal))
930                        .await;
931                }
932                request => panic!("Unexpected request: {request:?}"),
933            }
934        };
935        future::join(installer_fut, stream_fut).await;
936    }
937}